Coverage for lobster/tools/json/json.py: 87%
132 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-16 05:31 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-16 05:31 +0000
1#!/usr/bin/env python3
2#
3# lobster_json - Extract JSON tags for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import argparse
20import json
21import sys
22from pathlib import PurePath
23from pprint import pprint
24from typing import Optional, Sequence, Tuple, List, Set
26from lobster.common.tool import LOBSTER_Per_File_Tool
27from lobster.common.items import Tracing_Tag, Activity
28from lobster.common.location import File_Reference
31class Malformed_Input(Exception):
32 def __init__(self, msg, data):
33 super().__init__(msg)
34 self.msg = msg
35 self.data = data
38def get_item(root, path, required):
39 assert isinstance(path, str)
40 assert isinstance(required, bool)
42 if path == "":
43 return root
45 if "." in path: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 field, tail = path.split(".", 1)
47 else:
48 field = path
49 tail = ""
51 if isinstance(root, dict): 51 ↛ 59line 51 didn't jump to line 59 because the condition on line 51 was always true
52 if field in root:
53 return get_item(root[field], tail, required)
54 if required:
55 raise Malformed_Input("object does not contain %s" % field,
56 root)
57 return None
59 if required:
60 raise Malformed_Input("not an object", root)
61 return None
64def syn_test_name(file_name):
65 assert isinstance(file_name, PurePath)
66 if file_name.is_absolute(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 components = list(file_name.parts)[1:-1]
68 else:
69 components = list(file_name.parts)[:-1]
70 components.append(file_name.name.replace(".json", ""))
71 components = [item
72 for item in components
73 if item and item not in (".", "..")]
74 return ".".join(components)
77class LOBSTER_Json(LOBSTER_Per_File_Tool):
78 def __init__(self):
79 super().__init__(
80 name = "json",
81 description = "Extract tracing data from JSON files.",
82 extensions = ["json"],
83 official = True)
85 # Supported config parameters for lobster-json
86 TEST_LIST = "test_list"
87 NAME_ATTRIBUTE = "name_attribute"
88 TAG_ATTRIBUTE = "tag_attribute"
89 JUSTIFICATION_ATTRIBUTE = "justification_attribute"
90 SINGLE = "single"
92 @classmethod
93 def get_config_keys_manual(cls):
94 help_dict = super().get_config_keys_manual()
95 help_dict.update(
96 {
97 cls.TEST_LIST: "Member name indicator resulting in a "
98 "list containing objects carrying test "
99 "data.",
100 cls.NAME_ATTRIBUTE: "Member name indicator for test name.",
101 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.",
102 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for "
103 "justifications.",
104 cls.SINGLE: "Avoid use of multiprocessing."
105 }
106 )
107 return help_dict
109 def get_mandatory_parameters(self) -> Set[str]:
110 return {self.TAG_ATTRIBUTE}
112 def process_commandline_and_yaml_options(
113 self,
114 options: argparse.Namespace,
115 ) -> List[Tuple[File_Reference, str]]:
116 """
117 Overrides the parent class method and add fetch tool specific options from the
118 yaml
119 config
121 Returns
122 -------
123 options - command-line and yaml options
124 worklist - list of json files
125 """
126 work_list = super().process_commandline_and_yaml_options(options)
127 options.test_list = self.config.get(self.TEST_LIST, '')
128 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE)
129 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE)
130 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE)
131 options.single = self.config.get(self.SINGLE, False)
132 return work_list
134 def process_tool_options(
135 self,
136 options: argparse.Namespace,
137 work_list: List[Tuple[File_Reference, str]],
138 ):
139 super().process_tool_options(options, work_list)
140 self.schema = Activity
142 @classmethod
143 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]:
144 ok, data = load_item(file_name, options.test_list)
145 if not ok:
146 return False, []
148 # Ensure we actually have a list now
149 if not isinstance(data, list):
150 data = [data]
152 # Convert individual items
153 items = []
154 ok = True
155 for item_id, item in enumerate(data, 1):
156 try:
157 item_name = build_name(item, options.name_attribute, file_name, item_id)
158 if not isinstance(item_name, str): 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true
159 raise Malformed_Input("name is not a string",
160 item_name)
162 item_tags = get_item(root = item,
163 path = options.tag_attribute,
164 required = False)
165 if isinstance(item_tags, list):
166 pass
167 elif isinstance(item_tags, str):
168 item_tags = [item_tags]
169 elif item_tags is None: 169 ↛ 172line 169 didn't jump to line 172 because the condition on line 169 was always true
170 item_tags = []
171 else:
172 raise Malformed_Input("tags are not a string or list",
173 item_name)
175 if options.justification_attribute:
176 item_just = get_item(
177 root = item,
178 path = options.justification_attribute,
179 required = False)
180 else:
181 item_just = []
182 if isinstance(item_just, list):
183 pass
184 elif isinstance(item_just, str):
185 item_just = [item_just]
186 elif item_just is None: 186 ↛ 189line 186 didn't jump to line 189 because the condition on line 186 was always true
187 item_just = []
188 else:
189 raise Malformed_Input("justification is not a string"
190 " or list",
191 item_just)
193 l_item = Activity(
194 tag = Tracing_Tag(namespace = "json",
195 tag = "%s:%s" %
196 (file_name, item_name)),
197 location = File_Reference(file_name),
198 framework = "JSON",
199 kind = "Test Vector")
200 for tag in item_tags:
201 l_item.add_tracing_target(
202 Tracing_Tag(namespace = "req",
203 tag = tag))
204 for just_up in item_just:
205 l_item.just_up.append(just_up)
207 items.append(l_item)
208 except Malformed_Input as err:
209 pprint(err.data)
210 print("%s: malformed input: %s" % (file_name, err.msg))
211 ok = False
213 return ok, items
216def load_item(file_name, options_test_list):
217 try:
218 with open(file_name, "r", encoding="UTF-8") as fd:
219 data = json.load(fd)
220 data = get_item(root = data,
221 path = options_test_list,
222 required = True)
223 return True, data
225 except json.JSONDecodeError:
226 print(f"{file_name}: Input file contains invalid JSON.", file=sys.stderr)
227 except UnicodeDecodeError as decode_error:
228 print(f"{file_name}: File is not encoded in utf-8: {decode_error}",
229 file=sys.stderr)
230 except Malformed_Input as err:
231 pprint(err.data)
232 print(f"{file_name}: malformed input: {err.msg}", file=sys.stderr)
234 return False, []
237def build_name(item, name_attribute, file_name, item_id):
238 if name_attribute:
239 item_name = get_item(root = item,
240 path = name_attribute,
241 required = True)
242 if not isinstance(item_name, str): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 raise Malformed_Input("name is not a string",
244 item_name)
245 else:
246 item_name = f"{syn_test_name(PurePath(file_name))}.{item_id}"
247 return item_name
250def main(args: Optional[Sequence[str]] = None) -> int:
251 return LOBSTER_Json().run(args)