Coverage for lobster/tools/json/json.py: 87%

122 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 14:55 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_json - Extract JSON tags for LOBSTER 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19import argparse 

20import sys 

21import json 

22from pathlib import PurePath 

23from pprint import pprint 

24from typing import Tuple, List, Set 

25 

26from lobster.tool import LOBSTER_Per_File_Tool 

27from lobster.items import Tracing_Tag, Activity 

28from lobster.location import File_Reference 

29 

30 

31class Malformed_Input(Exception): 

32 def __init__(self, msg, data): 

33 super().__init__(msg) 

34 self.msg = msg 

35 self.data = data 

36 

37 

38def get_item(root, path, required): 

39 assert isinstance(path, str) 

40 assert isinstance(required, bool) 

41 

42 if path == "": 

43 return root 

44 

45 if "." in path: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 field, tail = path.split(".", 1) 

47 else: 

48 field = path 

49 tail = "" 

50 

51 if isinstance(root, dict): 51 ↛ 59line 51 didn't jump to line 59 because the condition on line 51 was always true

52 if field in root: 

53 return get_item(root[field], tail, required) 

54 elif required: 

55 raise Malformed_Input("object does not contain %s" % field, 

56 root) 

57 return None 

58 

59 elif required: 

60 raise Malformed_Input("not an object", root) 

61 return None 

62 

63 

64def syn_test_name(file_name): 

65 assert isinstance(file_name, PurePath) 

66 if file_name.is_absolute(): 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 components = list(file_name.parts)[1:-1] 

68 else: 

69 components = list(file_name.parts)[:-1] 

70 components.append(file_name.name.replace(".json", "")) 

71 components = [item 

72 for item in components 

73 if item and item not in (".", "..")] 

74 return ".".join(components) 

75 

76 

77class LOBSTER_Json(LOBSTER_Per_File_Tool): 

78 def __init__(self): 

79 super().__init__( 

80 name = "json", 

81 description = "Extract tracing data from JSON files.", 

82 extensions = ["json"], 

83 official = True) 

84 

85 # Supported config parameters for lobster-json 

86 TEST_LIST = "test_list" 

87 NAME_ATTRIBUTE = "name_attribute" 

88 TAG_ATTRIBUTE = "tag_attribute" 

89 JUSTIFICATION_ATTRIBUTE = "justification_attribute" 

90 SINGLE = "single" 

91 

92 @classmethod 

93 def get_config_keys_manual(cls): 

94 help_dict = super().get_config_keys_manual() 

95 help_dict.update( 

96 { 

97 cls.TEST_LIST: "Member name indicator resulting in a " 

98 "list containing objects carrying test " 

99 "data.", 

100 cls.NAME_ATTRIBUTE: "Member name indicator for test name.", 

101 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.", 

102 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for " 

103 "justifications.", 

104 cls.SINGLE: "Avoid use of multiprocessing." 

105 } 

106 ) 

107 return help_dict 

108 

109 def get_mandatory_parameters(self) -> Set[str]: 

110 return {self.TAG_ATTRIBUTE} 

111 

112 def process_commandline_and_yaml_options( 

113 self, 

114 ) -> Tuple[argparse.Namespace, List[Tuple[File_Reference, str]]]: 

115 """ 

116 Overrides the parent class method and add fetch tool specific options from the 

117 yaml 

118 config 

119 

120 Returns 

121 ------- 

122 options - command-line and yaml options 

123 worklist - list of json files 

124 """ 

125 options, work_list = super().process_commandline_and_yaml_options() 

126 options.test_list = self.config.get(self.TEST_LIST, '') 

127 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE) 

128 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE) 

129 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE) 

130 options.single = self.config.get(self.SINGLE, False) 

131 return options, work_list 

132 

133 def process_tool_options( 

134 self, 

135 options: argparse.Namespace, 

136 work_list: List[Tuple[File_Reference, str]], 

137 ): 

138 super().process_tool_options(options, work_list) 

139 self.schema = Activity 

140 

141 @classmethod 

142 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]: 

143 try: 

144 with open(file_name, "r", encoding="UTF-8") as fd: 

145 data = json.load(fd) 

146 data = get_item(root = data, 

147 path = options.test_list, 

148 required = True) 

149 except UnicodeDecodeError as decode_error: 

150 print("%s: File is not encoded in utf-8: %s" % (file_name, decode_error)) 

151 return False, [] 

152 except Malformed_Input as err: 

153 pprint(err.data) 

154 print("%s: malformed input: %s" % (file_name, err.msg)) 

155 return False, [] 

156 

157 # Ensure we actually have a list now 

158 if not isinstance(data, list): 

159 data = [data] 

160 

161 # Convert individual items 

162 items = [] 

163 ok = True 

164 for item_id, item in enumerate(data, 1): 

165 try: 

166 if options.name_attribute: 

167 item_name = get_item(root = item, 

168 path = options.name_attribute, 

169 required = True) 

170 else: 

171 item_name = "%s.%u" % (syn_test_name(PurePath(file_name)), 

172 item_id) 

173 if not isinstance(item_name, str): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 raise Malformed_Input("name is not a string", 

175 item_name) 

176 

177 item_tags = get_item(root = item, 

178 path = options.tag_attribute, 

179 required = False) 

180 if isinstance(item_tags, list): 

181 pass 

182 elif isinstance(item_tags, str): 

183 item_tags = [item_tags] 

184 elif item_tags is None: 184 ↛ 187line 184 didn't jump to line 187 because the condition on line 184 was always true

185 item_tags = [] 

186 else: 

187 raise Malformed_Input("tags are not a string or list", 

188 item_name) 

189 

190 if options.justification_attribute: 

191 item_just = get_item( 

192 root = item, 

193 path = options.justification_attribute, 

194 required = False) 

195 else: 

196 item_just = [] 

197 if isinstance(item_just, list): 

198 pass 

199 elif isinstance(item_just, str): 

200 item_just = [item_just] 

201 elif item_just is None: 201 ↛ 204line 201 didn't jump to line 204 because the condition on line 201 was always true

202 item_just = [] 

203 else: 

204 raise Malformed_Input("justification is not a string" 

205 " or list", 

206 item_just) 

207 

208 l_item = Activity( 

209 tag = Tracing_Tag(namespace = "json", 

210 tag = "%s:%s" % 

211 (file_name, item_name)), 

212 location = File_Reference(file_name), 

213 framework = "JSON", 

214 kind = "Test Vector") 

215 for tag in item_tags: 

216 l_item.add_tracing_target( 

217 Tracing_Tag(namespace = "req", 

218 tag = tag)) 

219 for just_up in item_just: 

220 l_item.just_up.append(just_up) 

221 

222 items.append(l_item) 

223 except Malformed_Input as err: 

224 pprint(err.data) 

225 print("%s: malformed input: %s" % (file_name, err.msg)) 

226 ok = False 

227 

228 return ok, items 

229 

230 

231def main(): 

232 # lobster-trace: json_req.Dummy_Requirement 

233 tool = LOBSTER_Json() 

234 return tool.execute() 

235 

236 

237if __name__ == "__main__": 

238 sys.exit(main())