Coverage for lobster/tools/json/json.py: 26%
120 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
1#!/usr/bin/env python3
2#
3# lobster_json - Extract JSON tags for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import argparse
20import json
21from pathlib import PurePath
22from pprint import pprint
23from typing import Tuple, List, Set
25from lobster.tool import LOBSTER_Per_File_Tool
26from lobster.items import Tracing_Tag, Activity
27from lobster.location import File_Reference
30class Malformed_Input(Exception):
31 def __init__(self, msg, data):
32 super().__init__(msg)
33 self.msg = msg
34 self.data = data
37def get_item(root, path, required):
38 assert isinstance(path, str)
39 assert isinstance(required, bool)
41 if path == "":
42 return root
44 if "." in path:
45 field, tail = path.split(".", 1)
46 else:
47 field = path
48 tail = ""
50 if isinstance(root, dict):
51 if field in root:
52 return get_item(root[field], tail, required)
53 elif required:
54 raise Malformed_Input("object does not contain %s" % field,
55 root)
56 return None
58 elif required:
59 raise Malformed_Input("not an object", root)
60 return None
63def syn_test_name(file_name):
64 assert isinstance(file_name, PurePath)
65 if file_name.is_absolute():
66 components = list(file_name.parts)[1:-1]
67 else:
68 components = list(file_name.parts)[:-1]
69 components.append(file_name.name.replace(".json", ""))
70 components = [item
71 for item in components
72 if item and item not in (".", "..")]
73 return ".".join(components)
76class LOBSTER_Json(LOBSTER_Per_File_Tool):
77 def __init__(self):
78 super().__init__(
79 name = "json",
80 description = "Extract tracing data from JSON files.",
81 extensions = ["json"],
82 official = True)
84 # Supported config parameters for lobster-json
85 TEST_LIST = "test_list"
86 NAME_ATTRIBUTE = "name_attribute"
87 TAG_ATTRIBUTE = "tag_attribute"
88 JUSTIFICATION_ATTRIBUTE = "justification_attribute"
89 SINGLE = "single"
91 @classmethod
92 def get_config_keys_manual(cls):
93 help_dict = super().get_config_keys_manual()
94 help_dict.update(
95 {
96 cls.TEST_LIST: "Member name indicator resulting in a "
97 "list containing objects carrying test "
98 "data.",
99 cls.NAME_ATTRIBUTE: "Member name indicator for test name.",
100 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.",
101 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for "
102 "justifications.",
103 cls.SINGLE: "Avoid use of multiprocessing."
104 }
105 )
106 return help_dict
108 def get_mandatory_parameters(self) -> Set[str]:
109 return {self.TAG_ATTRIBUTE}
111 def process_commandline_and_yaml_options(
112 self,
113 options: argparse.Namespace,
114 ) -> List[Tuple[File_Reference, str]]:
115 """
116 Overrides the parent class method and add fetch tool specific options from the
117 yaml
118 config
120 Returns
121 -------
122 options - command-line and yaml options
123 worklist - list of json files
124 """
125 work_list = super().process_commandline_and_yaml_options(options)
126 options.test_list = self.config.get(self.TEST_LIST, '')
127 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE)
128 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE)
129 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE)
130 options.single = self.config.get(self.SINGLE, False)
131 return work_list
133 def process_tool_options(
134 self,
135 options: argparse.Namespace,
136 work_list: List[Tuple[File_Reference, str]],
137 ):
138 super().process_tool_options(options, work_list)
139 self.schema = Activity
141 @classmethod
142 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]:
143 try:
144 with open(file_name, "r", encoding="UTF-8") as fd:
145 data = json.load(fd)
146 data = get_item(root = data,
147 path = options.test_list,
148 required = True)
149 except UnicodeDecodeError as decode_error:
150 print("%s: File is not encoded in utf-8: %s" % (file_name, decode_error))
151 return False, []
152 except Malformed_Input as err:
153 pprint(err.data)
154 print("%s: malformed input: %s" % (file_name, err.msg))
155 return False, []
157 # Ensure we actually have a list now
158 if not isinstance(data, list):
159 data = [data]
161 # Convert individual items
162 items = []
163 ok = True
164 for item_id, item in enumerate(data, 1):
165 try:
166 if options.name_attribute:
167 item_name = get_item(root = item,
168 path = options.name_attribute,
169 required = True)
170 else:
171 item_name = "%s.%u" % (syn_test_name(PurePath(file_name)),
172 item_id)
173 if not isinstance(item_name, str):
174 raise Malformed_Input("name is not a string",
175 item_name)
177 item_tags = get_item(root = item,
178 path = options.tag_attribute,
179 required = False)
180 if isinstance(item_tags, list):
181 pass
182 elif isinstance(item_tags, str):
183 item_tags = [item_tags]
184 elif item_tags is None:
185 item_tags = []
186 else:
187 raise Malformed_Input("tags are not a string or list",
188 item_name)
190 if options.justification_attribute:
191 item_just = get_item(
192 root = item,
193 path = options.justification_attribute,
194 required = False)
195 else:
196 item_just = []
197 if isinstance(item_just, list):
198 pass
199 elif isinstance(item_just, str):
200 item_just = [item_just]
201 elif item_just is None:
202 item_just = []
203 else:
204 raise Malformed_Input("justification is not a string"
205 " or list",
206 item_just)
208 l_item = Activity(
209 tag = Tracing_Tag(namespace = "json",
210 tag = "%s:%s" %
211 (file_name, item_name)),
212 location = File_Reference(file_name),
213 framework = "JSON",
214 kind = "Test Vector")
215 for tag in item_tags:
216 l_item.add_tracing_target(
217 Tracing_Tag(namespace = "req",
218 tag = tag))
219 for just_up in item_just:
220 l_item.just_up.append(just_up)
222 items.append(l_item)
223 except Malformed_Input as err:
224 pprint(err.data)
225 print("%s: malformed input: %s" % (file_name, err.msg))
226 ok = False
228 return ok, items
231def main():
232 return LOBSTER_Json().run()