Coverage for lobster/tools/json/json.py: 26%
122 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# lobster_json - Extract JSON tags for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import argparse
20import sys
21import json
22from pathlib import PurePath
23from pprint import pprint
24from typing import Tuple, List, Set
26from lobster.tool import LOBSTER_Per_File_Tool
27from lobster.items import Tracing_Tag, Activity
28from lobster.location import File_Reference
31class Malformed_Input(Exception):
32 def __init__(self, msg, data):
33 super().__init__(msg)
34 self.msg = msg
35 self.data = data
38def get_item(root, path, required):
39 assert isinstance(path, str)
40 assert isinstance(required, bool)
42 if path == "":
43 return root
45 if "." in path:
46 field, tail = path.split(".", 1)
47 else:
48 field = path
49 tail = ""
51 if isinstance(root, dict):
52 if field in root:
53 return get_item(root[field], tail, required)
54 elif required:
55 raise Malformed_Input("object does not contain %s" % field,
56 root)
57 return None
59 elif required:
60 raise Malformed_Input("not an object", root)
61 return None
64def syn_test_name(file_name):
65 assert isinstance(file_name, PurePath)
66 if file_name.is_absolute():
67 components = list(file_name.parts)[1:-1]
68 else:
69 components = list(file_name.parts)[:-1]
70 components.append(file_name.name.replace(".json", ""))
71 components = [item
72 for item in components
73 if item and item not in (".", "..")]
74 return ".".join(components)
77class LOBSTER_Json(LOBSTER_Per_File_Tool):
78 def __init__(self):
79 super().__init__(
80 name = "json",
81 description = "Extract tracing data from JSON files.",
82 extensions = ["json"],
83 official = True)
85 # Supported config parameters for lobster-json
86 TEST_LIST = "test_list"
87 NAME_ATTRIBUTE = "name_attribute"
88 TAG_ATTRIBUTE = "tag_attribute"
89 JUSTIFICATION_ATTRIBUTE = "justification_attribute"
90 SINGLE = "single"
92 @classmethod
93 def get_config_keys_manual(cls):
94 help_dict = super().get_config_keys_manual()
95 help_dict.update(
96 {
97 cls.TEST_LIST: "Member name indicator resulting in a "
98 "list containing objects carrying test "
99 "data.",
100 cls.NAME_ATTRIBUTE: "Member name indicator for test name.",
101 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.",
102 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for "
103 "justifications.",
104 cls.SINGLE: "Avoid use of multiprocessing."
105 }
106 )
107 return help_dict
109 def get_mandatory_parameters(self) -> Set[str]:
110 return {self.TAG_ATTRIBUTE}
112 def process_commandline_and_yaml_options(
113 self,
114 ) -> Tuple[argparse.Namespace, List[Tuple[File_Reference, str]]]:
115 """
116 Overrides the parent class method and add fetch tool specific options from the
117 yaml
118 config
120 Returns
121 -------
122 options - command-line and yaml options
123 worklist - list of json files
124 """
125 options, work_list = super().process_commandline_and_yaml_options()
126 options.test_list = self.config.get(self.TEST_LIST, '')
127 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE)
128 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE)
129 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE)
130 options.single = self.config.get(self.SINGLE, False)
131 return options, work_list
133 def process_tool_options(
134 self,
135 options: argparse.Namespace,
136 work_list: List[Tuple[File_Reference, str]],
137 ):
138 super().process_tool_options(options, work_list)
139 self.schema = Activity
141 @classmethod
142 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]:
143 try:
144 with open(file_name, "r", encoding="UTF-8") as fd:
145 data = json.load(fd)
146 data = get_item(root = data,
147 path = options.test_list,
148 required = True)
149 except UnicodeDecodeError as decode_error:
150 print("%s: File is not encoded in utf-8: %s" % (file_name, decode_error))
151 return False, []
152 except Malformed_Input as err:
153 pprint(err.data)
154 print("%s: malformed input: %s" % (file_name, err.msg))
155 return False, []
157 # Ensure we actually have a list now
158 if not isinstance(data, list):
159 data = [data]
161 # Convert individual items
162 items = []
163 ok = True
164 for item_id, item in enumerate(data, 1):
165 try:
166 if options.name_attribute:
167 item_name = get_item(root = item,
168 path = options.name_attribute,
169 required = True)
170 else:
171 item_name = "%s.%u" % (syn_test_name(PurePath(file_name)),
172 item_id)
173 if not isinstance(item_name, str):
174 raise Malformed_Input("name is not a string",
175 item_name)
177 item_tags = get_item(root = item,
178 path = options.tag_attribute,
179 required = False)
180 if isinstance(item_tags, list):
181 pass
182 elif isinstance(item_tags, str):
183 item_tags = [item_tags]
184 elif item_tags is None:
185 item_tags = []
186 else:
187 raise Malformed_Input("tags are not a string or list",
188 item_name)
190 if options.justification_attribute:
191 item_just = get_item(
192 root = item,
193 path = options.justification_attribute,
194 required = False)
195 else:
196 item_just = []
197 if isinstance(item_just, list):
198 pass
199 elif isinstance(item_just, str):
200 item_just = [item_just]
201 elif item_just is None:
202 item_just = []
203 else:
204 raise Malformed_Input("justification is not a string"
205 " or list",
206 item_just)
208 l_item = Activity(
209 tag = Tracing_Tag(namespace = "json",
210 tag = "%s:%s" %
211 (file_name, item_name)),
212 location = File_Reference(file_name),
213 framework = "JSON",
214 kind = "Test Vector")
215 for tag in item_tags:
216 l_item.add_tracing_target(
217 Tracing_Tag(namespace = "req",
218 tag = tag))
219 for just_up in item_just:
220 l_item.just_up.append(just_up)
222 items.append(l_item)
223 except Malformed_Input as err:
224 pprint(err.data)
225 print("%s: malformed input: %s" % (file_name, err.msg))
226 ok = False
228 return ok, items
231def main():
232 # lobster-trace: json_req.Dummy_Requirement
233 tool = LOBSTER_Json()
234 return tool.execute()
237if __name__ == "__main__":
238 sys.exit(main())