Coverage for lobster/tools/json/json.py: 26%

120 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 09:51 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_json - Extract JSON tags for LOBSTER 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19import argparse 

20import json 

21from pathlib import PurePath 

22from pprint import pprint 

23from typing import Tuple, List, Set 

24 

25from lobster.tool import LOBSTER_Per_File_Tool 

26from lobster.items import Tracing_Tag, Activity 

27from lobster.location import File_Reference 

28 

29 

30class Malformed_Input(Exception): 

31 def __init__(self, msg, data): 

32 super().__init__(msg) 

33 self.msg = msg 

34 self.data = data 

35 

36 

37def get_item(root, path, required): 

38 assert isinstance(path, str) 

39 assert isinstance(required, bool) 

40 

41 if path == "": 

42 return root 

43 

44 if "." in path: 

45 field, tail = path.split(".", 1) 

46 else: 

47 field = path 

48 tail = "" 

49 

50 if isinstance(root, dict): 

51 if field in root: 

52 return get_item(root[field], tail, required) 

53 elif required: 

54 raise Malformed_Input("object does not contain %s" % field, 

55 root) 

56 return None 

57 

58 elif required: 

59 raise Malformed_Input("not an object", root) 

60 return None 

61 

62 

63def syn_test_name(file_name): 

64 assert isinstance(file_name, PurePath) 

65 if file_name.is_absolute(): 

66 components = list(file_name.parts)[1:-1] 

67 else: 

68 components = list(file_name.parts)[:-1] 

69 components.append(file_name.name.replace(".json", "")) 

70 components = [item 

71 for item in components 

72 if item and item not in (".", "..")] 

73 return ".".join(components) 

74 

75 

76class LOBSTER_Json(LOBSTER_Per_File_Tool): 

77 def __init__(self): 

78 super().__init__( 

79 name = "json", 

80 description = "Extract tracing data from JSON files.", 

81 extensions = ["json"], 

82 official = True) 

83 

84 # Supported config parameters for lobster-json 

85 TEST_LIST = "test_list" 

86 NAME_ATTRIBUTE = "name_attribute" 

87 TAG_ATTRIBUTE = "tag_attribute" 

88 JUSTIFICATION_ATTRIBUTE = "justification_attribute" 

89 SINGLE = "single" 

90 

91 @classmethod 

92 def get_config_keys_manual(cls): 

93 help_dict = super().get_config_keys_manual() 

94 help_dict.update( 

95 { 

96 cls.TEST_LIST: "Member name indicator resulting in a " 

97 "list containing objects carrying test " 

98 "data.", 

99 cls.NAME_ATTRIBUTE: "Member name indicator for test name.", 

100 cls.TAG_ATTRIBUTE: "Member name indicator for test tracing tags.", 

101 cls.JUSTIFICATION_ATTRIBUTE: "Member name indicator for " 

102 "justifications.", 

103 cls.SINGLE: "Avoid use of multiprocessing." 

104 } 

105 ) 

106 return help_dict 

107 

108 def get_mandatory_parameters(self) -> Set[str]: 

109 return {self.TAG_ATTRIBUTE} 

110 

111 def process_commandline_and_yaml_options( 

112 self, 

113 options: argparse.Namespace, 

114 ) -> List[Tuple[File_Reference, str]]: 

115 """ 

116 Overrides the parent class method and add fetch tool specific options from the 

117 yaml 

118 config 

119 

120 Returns 

121 ------- 

122 options - command-line and yaml options 

123 worklist - list of json files 

124 """ 

125 work_list = super().process_commandline_and_yaml_options(options) 

126 options.test_list = self.config.get(self.TEST_LIST, '') 

127 options.name_attribute = self.config.get(self.NAME_ATTRIBUTE) 

128 options.tag_attribute = self.config.get(self.TAG_ATTRIBUTE) 

129 options.justification_attribute = self.config.get(self.JUSTIFICATION_ATTRIBUTE) 

130 options.single = self.config.get(self.SINGLE, False) 

131 return work_list 

132 

133 def process_tool_options( 

134 self, 

135 options: argparse.Namespace, 

136 work_list: List[Tuple[File_Reference, str]], 

137 ): 

138 super().process_tool_options(options, work_list) 

139 self.schema = Activity 

140 

141 @classmethod 

142 def process(cls, options, file_name) -> Tuple[bool, List[Activity]]: 

143 try: 

144 with open(file_name, "r", encoding="UTF-8") as fd: 

145 data = json.load(fd) 

146 data = get_item(root = data, 

147 path = options.test_list, 

148 required = True) 

149 except UnicodeDecodeError as decode_error: 

150 print("%s: File is not encoded in utf-8: %s" % (file_name, decode_error)) 

151 return False, [] 

152 except Malformed_Input as err: 

153 pprint(err.data) 

154 print("%s: malformed input: %s" % (file_name, err.msg)) 

155 return False, [] 

156 

157 # Ensure we actually have a list now 

158 if not isinstance(data, list): 

159 data = [data] 

160 

161 # Convert individual items 

162 items = [] 

163 ok = True 

164 for item_id, item in enumerate(data, 1): 

165 try: 

166 if options.name_attribute: 

167 item_name = get_item(root = item, 

168 path = options.name_attribute, 

169 required = True) 

170 else: 

171 item_name = "%s.%u" % (syn_test_name(PurePath(file_name)), 

172 item_id) 

173 if not isinstance(item_name, str): 

174 raise Malformed_Input("name is not a string", 

175 item_name) 

176 

177 item_tags = get_item(root = item, 

178 path = options.tag_attribute, 

179 required = False) 

180 if isinstance(item_tags, list): 

181 pass 

182 elif isinstance(item_tags, str): 

183 item_tags = [item_tags] 

184 elif item_tags is None: 

185 item_tags = [] 

186 else: 

187 raise Malformed_Input("tags are not a string or list", 

188 item_name) 

189 

190 if options.justification_attribute: 

191 item_just = get_item( 

192 root = item, 

193 path = options.justification_attribute, 

194 required = False) 

195 else: 

196 item_just = [] 

197 if isinstance(item_just, list): 

198 pass 

199 elif isinstance(item_just, str): 

200 item_just = [item_just] 

201 elif item_just is None: 

202 item_just = [] 

203 else: 

204 raise Malformed_Input("justification is not a string" 

205 " or list", 

206 item_just) 

207 

208 l_item = Activity( 

209 tag = Tracing_Tag(namespace = "json", 

210 tag = "%s:%s" % 

211 (file_name, item_name)), 

212 location = File_Reference(file_name), 

213 framework = "JSON", 

214 kind = "Test Vector") 

215 for tag in item_tags: 

216 l_item.add_tracing_target( 

217 Tracing_Tag(namespace = "req", 

218 tag = tag)) 

219 for just_up in item_just: 

220 l_item.just_up.append(just_up) 

221 

222 items.append(l_item) 

223 except Malformed_Input as err: 

224 pprint(err.data) 

225 print("%s: malformed input: %s" % (file_name, err.msg)) 

226 ok = False 

227 

228 return ok, items 

229 

230 

231def main(): 

232 return LOBSTER_Json().run()