Coverage for lobster/tools/json/json.py: 87%

132 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-04-16 05:31 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_json - Extract JSON tags for LOBSTER 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19import argparse 

20import json 

21import sys 

22from pathlib import PurePath 

23from pprint import pprint 

24from typing import Optional, Sequence, Tuple, List, Set 

25 

26from lobster.common.tool import LOBSTER_Per_File_Tool 

27from lobster.common.items import Tracing_Tag, Activity 

28from lobster.common.location import File_Reference 

29 

30 

31class Malformed_Input(Exception): 

32 def __init__(self, msg, data): 

33 super().__init__(msg) 

34 self.msg = msg 

35 self.data = data 

36 

37 

38def get_item(root, path, required): 

39 assert isinstance(path, str) 

40 assert isinstance(required, bool) 

41 

42 if path == "": 

43 return root 

44 

45 if "." in path: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 field, tail = path.split(".", 1) 

47 else: 

48 field = path 

49 tail = "" 

50 

51 if isinstance(root, dict): 51 ↛ 59line 51 didn't jump to line 59 because the condition on line 51 was always true

52 if field in root: 

53 return get_item(root[field], tail, required) 

54 if required: 

55 raise Malformed_Input("object does not contain %s" % field, 

56 root) 

57 return None 

58 

59 if required: 

60 raise Malformed_Input("not an object", root) 

61 return None 

62 

63 

64def syn_test_name(file_name): 

65 assert isinstance(file_name, PurePath) 

66 if file_name.is_absolute(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 components = list(file_name.parts)[1:-1] 

68 else: 

69 components = list(file_name.parts)[:-1] 

70 components.append(file_name.name.replace(".json", "")) 

71 components = [item 

72 for item in components 

73 if item and item not in (".", "..")] 

74 return ".".join(components) 

75 

76 

77class LOBSTER_Json(LOBSTER_Per_File_Tool): 

78 def __init__(self): 

79 super().__init__( 

80 name = "json", 

81 description = "Extract tracing data from JSON files.", 

82 extensions = ["json"], 

83 official = True) 

84 

85 # Supported config parameters for lobster-json 

86 TEST_LIST = "test_list" 

87 NAME_ATTRIBUTE = "name_attribute" 

88 TAG_ATTRIBUTE = "tag_attribute" 

89 JUSTIFICATION_ATTRIBUTE = "justification_attribute" 

90 SINGLE = "single" 

91 

92 @classmethod 

93 def get_config_keys_manual(cls): 

94 help_dict = super().get_config_keys_manual() 

95 help_dict.update( 

96 { 

97 cls.TEST_LIST: "Member name indicator resulting in a " 

98 "list containing objects carrying test " 

99 "data.", 

100 cls.NAME_ATTRIBUTE: "Member name indicator for test name.", 

101 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.", 

102 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for " 

103 "justifications.", 

104 cls.SINGLE: "Avoid use of multiprocessing." 

105 } 

106 ) 

107 return help_dict 

108 

109 def get_mandatory_parameters(self) -> Set[str]: 

110 return {self.TAG_ATTRIBUTE} 

111 

112 def process_commandline_and_yaml_options( 

113 self, 

114 options: argparse.Namespace, 

115 ) -> List[Tuple[File_Reference, str]]: 

116 """ 

117 Overrides the parent class method and add fetch tool specific options from the 

118 yaml 

119 config 

120 

121 Returns 

122 ------- 

123 options - command-line and yaml options 

124 worklist - list of json files 

125 """ 

126 work_list = super().process_commandline_and_yaml_options(options) 

127 options.test_list = self.config.get(self.TEST_LIST, '') 

128 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE) 

129 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE) 

130 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE) 

131 options.single = self.config.get(self.SINGLE, False) 

132 return work_list 

133 

134 def process_tool_options( 

135 self, 

136 options: argparse.Namespace, 

137 work_list: List[Tuple[File_Reference, str]], 

138 ): 

139 super().process_tool_options(options, work_list) 

140 self.schema = Activity 

141 

142 @classmethod 

143 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]: 

144 ok, data = load_item(file_name, options.test_list) 

145 if not ok: 

146 return False, [] 

147 

148 # Ensure we actually have a list now 

149 if not isinstance(data, list): 

150 data = [data] 

151 

152 # Convert individual items 

153 items = [] 

154 ok = True 

155 for item_id, item in enumerate(data, 1): 

156 try: 

157 item_name = build_name(item, options.name_attribute, file_name, item_id) 

158 if not isinstance(item_name, str): 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true

159 raise Malformed_Input("name is not a string", 

160 item_name) 

161 

162 item_tags = get_item(root = item, 

163 path = options.tag_attribute, 

164 required = False) 

165 if isinstance(item_tags, list): 

166 pass 

167 elif isinstance(item_tags, str): 

168 item_tags = [item_tags] 

169 elif item_tags is None: 169 ↛ 172line 169 didn't jump to line 172 because the condition on line 169 was always true

170 item_tags = [] 

171 else: 

172 raise Malformed_Input("tags are not a string or list", 

173 item_name) 

174 

175 if options.justification_attribute: 

176 item_just = get_item( 

177 root = item, 

178 path = options.justification_attribute, 

179 required = False) 

180 else: 

181 item_just = [] 

182 if isinstance(item_just, list): 

183 pass 

184 elif isinstance(item_just, str): 

185 item_just = [item_just] 

186 elif item_just is None: 186 ↛ 189line 186 didn't jump to line 189 because the condition on line 186 was always true

187 item_just = [] 

188 else: 

189 raise Malformed_Input("justification is not a string" 

190 " or list", 

191 item_just) 

192 

193 l_item = Activity( 

194 tag = Tracing_Tag(namespace = "json", 

195 tag = "%s:%s" % 

196 (file_name, item_name)), 

197 location = File_Reference(file_name), 

198 framework = "JSON", 

199 kind = "Test Vector") 

200 for tag in item_tags: 

201 l_item.add_tracing_target( 

202 Tracing_Tag(namespace = "req", 

203 tag = tag)) 

204 for just_up in item_just: 

205 l_item.just_up.append(just_up) 

206 

207 items.append(l_item) 

208 except Malformed_Input as err: 

209 pprint(err.data) 

210 print("%s: malformed input: %s" % (file_name, err.msg)) 

211 ok = False 

212 

213 return ok, items 

214 

215 

216def load_item(file_name, options_test_list): 

217 try: 

218 with open(file_name, "r", encoding="UTF-8") as fd: 

219 data = json.load(fd) 

220 data = get_item(root = data, 

221 path = options_test_list, 

222 required = True) 

223 return True, data 

224 

225 except json.JSONDecodeError: 

226 print(f"{file_name}: Input file contains invalid JSON.", file=sys.stderr) 

227 except UnicodeDecodeError as decode_error: 

228 print(f"{file_name}: File is not encoded in utf-8: {decode_error}", 

229 file=sys.stderr) 

230 except Malformed_Input as err: 

231 pprint(err.data) 

232 print(f"{file_name}: malformed input: {err.msg}", file=sys.stderr) 

233 

234 return False, [] 

235 

236 

237def build_name(item, name_attribute, file_name, item_id): 

238 if name_attribute: 

239 item_name = get_item(root = item, 

240 path = name_attribute, 

241 required = True) 

242 if not isinstance(item_name, str): 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 raise Malformed_Input("name is not a string", 

244 item_name) 

245 else: 

246 item_name = f"{syn_test_name(PurePath(file_name))}.{item_id}" 

247 return item_name 

248 

249 

250def main(args: Optional[Sequence[str]] = None) -> int: 

251 return LOBSTER_Json().run(args)