Coverage for lobster/tools/json/json.py: 86%
124 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-04 12:54 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-04 12:54 +0000
1#!/usr/bin/env python3
2#
3# lobster_json - Extract JSON tags for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import argparse
20import json
21import sys
22from pathlib import PurePath
23from pprint import pprint
24from typing import Optional, Sequence, Tuple, List, Set
26from lobster.common.tool import LOBSTER_Per_File_Tool
27from lobster.common.items import Tracing_Tag, Activity
28from lobster.common.location import File_Reference
31class Malformed_Input(Exception):
32 def __init__(self, msg, data):
33 super().__init__(msg)
34 self.msg = msg
35 self.data = data
38def get_item(root, path, required):
39 assert isinstance(path, str)
40 assert isinstance(required, bool)
42 if path == "":
43 return root
45 if "." in path: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 field, tail = path.split(".", 1)
47 else:
48 field = path
49 tail = ""
51 if isinstance(root, dict): 51 ↛ 59line 51 didn't jump to line 59 because the condition on line 51 was always true
52 if field in root:
53 return get_item(root[field], tail, required)
54 elif required:
55 raise Malformed_Input("object does not contain %s" % field,
56 root)
57 return None
59 elif required:
60 raise Malformed_Input("not an object", root)
61 return None
64def syn_test_name(file_name):
65 assert isinstance(file_name, PurePath)
66 if file_name.is_absolute(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 components = list(file_name.parts)[1:-1]
68 else:
69 components = list(file_name.parts)[:-1]
70 components.append(file_name.name.replace(".json", ""))
71 components = [item
72 for item in components
73 if item and item not in (".", "..")]
74 return ".".join(components)
77class LOBSTER_Json(LOBSTER_Per_File_Tool):
78 def __init__(self):
79 super().__init__(
80 name = "json",
81 description = "Extract tracing data from JSON files.",
82 extensions = ["json"],
83 official = True)
85 # Supported config parameters for lobster-json
86 TEST_LIST = "test_list"
87 NAME_ATTRIBUTE = "name_attribute"
88 TAG_ATTRIBUTE = "tag_attribute"
89 JUSTIFICATION_ATTRIBUTE = "justification_attribute"
90 SINGLE = "single"
92 @classmethod
93 def get_config_keys_manual(cls):
94 help_dict = super().get_config_keys_manual()
95 help_dict.update(
96 {
97 cls.TEST_LIST: "Member name indicator resulting in a "
98 "list containing objects carrying test "
99 "data.",
100 cls.NAME_ATTRIBUTE: "Member name indicator for test name.",
101 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.",
102 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for "
103 "justifications.",
104 cls.SINGLE: "Avoid use of multiprocessing."
105 }
106 )
107 return help_dict
109 def get_mandatory_parameters(self) -> Set[str]:
110 return {self.TAG_ATTRIBUTE}
112 def process_commandline_and_yaml_options(
113 self,
114 options: argparse.Namespace,
115 ) -> List[Tuple[File_Reference, str]]:
116 """
117 Overrides the parent class method and add fetch tool specific options from the
118 yaml
119 config
121 Returns
122 -------
123 options - command-line and yaml options
124 worklist - list of json files
125 """
126 work_list = super().process_commandline_and_yaml_options(options)
127 options.test_list = self.config.get(self.TEST_LIST, '')
128 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE)
129 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE)
130 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE)
131 options.single = self.config.get(self.SINGLE, False)
132 return work_list
134 def process_tool_options(
135 self,
136 options: argparse.Namespace,
137 work_list: List[Tuple[File_Reference, str]],
138 ):
139 super().process_tool_options(options, work_list)
140 self.schema = Activity
142 @classmethod
143 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]:
144 try:
145 with open(file_name, "r", encoding="UTF-8") as fd:
146 data = json.load(fd)
147 data = get_item(root = data,
148 path = options.test_list,
149 required = True)
150 except json.JSONDecodeError:
151 print("%s: Input file contains invalid JSON." % file_name,
152 file=sys.stderr)
153 return False, []
154 except UnicodeDecodeError as decode_error:
155 print("%s: File is not encoded in utf-8: %s" % (file_name, decode_error))
156 return False, []
157 except Malformed_Input as err:
158 pprint(err.data)
159 print("%s: malformed input: %s" % (file_name, err.msg))
160 return False, []
162 # Ensure we actually have a list now
163 if not isinstance(data, list):
164 data = [data]
166 # Convert individual items
167 items = []
168 ok = True
169 for item_id, item in enumerate(data, 1):
170 try:
171 if options.name_attribute:
172 item_name = get_item(root = item,
173 path = options.name_attribute,
174 required = True)
175 else:
176 item_name = "%s.%u" % (syn_test_name(PurePath(file_name)),
177 item_id)
178 if not isinstance(item_name, str): 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 raise Malformed_Input("name is not a string",
180 item_name)
182 item_tags = get_item(root = item,
183 path = options.tag_attribute,
184 required = False)
185 if isinstance(item_tags, list):
186 pass
187 elif isinstance(item_tags, str):
188 item_tags = [item_tags]
189 elif item_tags is None: 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was always true
190 item_tags = []
191 else:
192 raise Malformed_Input("tags are not a string or list",
193 item_name)
195 if options.justification_attribute:
196 item_just = get_item(
197 root = item,
198 path = options.justification_attribute,
199 required = False)
200 else:
201 item_just = []
202 if isinstance(item_just, list):
203 pass
204 elif isinstance(item_just, str):
205 item_just = [item_just]
206 elif item_just is None: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true
207 item_just = []
208 else:
209 raise Malformed_Input("justification is not a string"
210 " or list",
211 item_just)
213 l_item = Activity(
214 tag = Tracing_Tag(namespace = "json",
215 tag = "%s:%s" %
216 (file_name, item_name)),
217 location = File_Reference(file_name),
218 framework = "JSON",
219 kind = "Test Vector")
220 for tag in item_tags:
221 l_item.add_tracing_target(
222 Tracing_Tag(namespace = "req",
223 tag = tag))
224 for just_up in item_just:
225 l_item.just_up.append(just_up)
227 items.append(l_item)
228 except Malformed_Input as err:
229 pprint(err.data)
230 print("%s: malformed input: %s" % (file_name, err.msg))
231 ok = False
233 return ok, items
236def main(args: Optional[Sequence[str]] = None) -> int:
237 return LOBSTER_Json().run(args)