Coverage for lobster/tools/json/json.py: 86%

124 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-04 12:54 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_json - Extract JSON tags for LOBSTER 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19import argparse 

20import json 

21import sys 

22from pathlib import PurePath 

23from pprint import pprint 

24from typing import Optional, Sequence, Tuple, List, Set 

25 

26from lobster.common.tool import LOBSTER_Per_File_Tool 

27from lobster.common.items import Tracing_Tag, Activity 

28from lobster.common.location import File_Reference 

29 

30 

31class Malformed_Input(Exception): 

32 def __init__(self, msg, data): 

33 super().__init__(msg) 

34 self.msg = msg 

35 self.data = data 

36 

37 

38def get_item(root, path, required): 

39 assert isinstance(path, str) 

40 assert isinstance(required, bool) 

41 

42 if path == "": 

43 return root 

44 

45 if "." in path: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 field, tail = path.split(".", 1) 

47 else: 

48 field = path 

49 tail = "" 

50 

51 if isinstance(root, dict): 51 ↛ 59line 51 didn't jump to line 59 because the condition on line 51 was always true

52 if field in root: 

53 return get_item(root[field], tail, required) 

54 elif required: 

55 raise Malformed_Input("object does not contain %s" % field, 

56 root) 

57 return None 

58 

59 elif required: 

60 raise Malformed_Input("not an object", root) 

61 return None 

62 

63 

64def syn_test_name(file_name): 

65 assert isinstance(file_name, PurePath) 

66 if file_name.is_absolute(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 components = list(file_name.parts)[1:-1] 

68 else: 

69 components = list(file_name.parts)[:-1] 

70 components.append(file_name.name.replace(".json", "")) 

71 components = [item 

72 for item in components 

73 if item and item not in (".", "..")] 

74 return ".".join(components) 

75 

76 

77class LOBSTER_Json(LOBSTER_Per_File_Tool): 

78 def __init__(self): 

79 super().__init__( 

80 name = "json", 

81 description = "Extract tracing data from JSON files.", 

82 extensions = ["json"], 

83 official = True) 

84 

85 # Supported config parameters for lobster-json 

86 TEST_LIST = "test_list" 

87 NAME_ATTRIBUTE = "name_attribute" 

88 TAG_ATTRIBUTE = "tag_attribute" 

89 JUSTIFICATION_ATTRIBUTE = "justification_attribute" 

90 SINGLE = "single" 

91 

92 @classmethod 

93 def get_config_keys_manual(cls): 

94 help_dict = super().get_config_keys_manual() 

95 help_dict.update( 

96 { 

97 cls.TEST_LIST: "Member name indicator resulting in a " 

98 "list containing objects carrying test " 

99 "data.", 

100 cls.NAME_ATTRIBUTE: "Member name indicator for test name.", 

101 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.", 

102 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for " 

103 "justifications.", 

104 cls.SINGLE: "Avoid use of multiprocessing." 

105 } 

106 ) 

107 return help_dict 

108 

109 def get_mandatory_parameters(self) -> Set[str]: 

110 return {self.TAG_ATTRIBUTE} 

111 

112 def process_commandline_and_yaml_options( 

113 self, 

114 options: argparse.Namespace, 

115 ) -> List[Tuple[File_Reference, str]]: 

116 """ 

117 Overrides the parent class method and add fetch tool specific options from the 

118 yaml 

119 config 

120 

121 Returns 

122 ------- 

123 options - command-line and yaml options 

124 worklist - list of json files 

125 """ 

126 work_list = super().process_commandline_and_yaml_options(options) 

127 options.test_list = self.config.get(self.TEST_LIST, '') 

128 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE) 

129 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE) 

130 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE) 

131 options.single = self.config.get(self.SINGLE, False) 

132 return work_list 

133 

134 def process_tool_options( 

135 self, 

136 options: argparse.Namespace, 

137 work_list: List[Tuple[File_Reference, str]], 

138 ): 

139 super().process_tool_options(options, work_list) 

140 self.schema = Activity 

141 

142 @classmethod 

143 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]: 

144 try: 

145 with open(file_name, "r", encoding="UTF-8") as fd: 

146 data = json.load(fd) 

147 data = get_item(root = data, 

148 path = options.test_list, 

149 required = True) 

150 except json.JSONDecodeError: 

151 print("%s: Input file contains invalid JSON." % file_name, 

152 file=sys.stderr) 

153 return False, [] 

154 except UnicodeDecodeError as decode_error: 

155 print("%s: File is not encoded in utf-8: %s" % (file_name, decode_error)) 

156 return False, [] 

157 except Malformed_Input as err: 

158 pprint(err.data) 

159 print("%s: malformed input: %s" % (file_name, err.msg)) 

160 return False, [] 

161 

162 # Ensure we actually have a list now 

163 if not isinstance(data, list): 

164 data = [data] 

165 

166 # Convert individual items 

167 items = [] 

168 ok = True 

169 for item_id, item in enumerate(data, 1): 

170 try: 

171 if options.name_attribute: 

172 item_name = get_item(root = item, 

173 path = options.name_attribute, 

174 required = True) 

175 else: 

176 item_name = "%s.%u" % (syn_test_name(PurePath(file_name)), 

177 item_id) 

178 if not isinstance(item_name, str): 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise Malformed_Input("name is not a string", 

180 item_name) 

181 

182 item_tags = get_item(root = item, 

183 path = options.tag_attribute, 

184 required = False) 

185 if isinstance(item_tags, list): 

186 pass 

187 elif isinstance(item_tags, str): 

188 item_tags = [item_tags] 

189 elif item_tags is None: 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was always true

190 item_tags = [] 

191 else: 

192 raise Malformed_Input("tags are not a string or list", 

193 item_name) 

194 

195 if options.justification_attribute: 

196 item_just = get_item( 

197 root = item, 

198 path = options.justification_attribute, 

199 required = False) 

200 else: 

201 item_just = [] 

202 if isinstance(item_just, list): 

203 pass 

204 elif isinstance(item_just, str): 

205 item_just = [item_just] 

206 elif item_just is None: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true

207 item_just = [] 

208 else: 

209 raise Malformed_Input("justification is not a string" 

210 " or list", 

211 item_just) 

212 

213 l_item = Activity( 

214 tag = Tracing_Tag(namespace = "json", 

215 tag = "%s:%s" % 

216 (file_name, item_name)), 

217 location = File_Reference(file_name), 

218 framework = "JSON", 

219 kind = "Test Vector") 

220 for tag in item_tags: 

221 l_item.add_tracing_target( 

222 Tracing_Tag(namespace = "req", 

223 tag = tag)) 

224 for just_up in item_just: 

225 l_item.just_up.append(just_up) 

226 

227 items.append(l_item) 

228 except Malformed_Input as err: 

229 pprint(err.data) 

230 print("%s: malformed input: %s" % (file_name, err.msg)) 

231 ok = False 

232 

233 return ok, items 

234 

235 

236def main(args: Optional[Sequence[str]] = None) -> int: 

237 return LOBSTER_Json().run(args)