Coverage for lobster/report.py: 81%

123 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 09:51 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import json 

21from collections import OrderedDict 

22from dataclasses import dataclass 

23 

24from lobster.items import Tracing_Status, Requirement, Implementation, Activity 

25from lobster.config.parser import load as load_config 

26from lobster.errors import Message_Handler 

27from lobster.io import lobster_read 

28from lobster.location import File_Reference 

29 

30 

31@dataclass 

32class Coverage: 

33 level : str 

34 items : int 

35 ok : int 

36 coverage : None 

37 

38 

39class Report: 

40 def __init__(self): 

41 self.mh = Message_Handler() 

42 self.config = OrderedDict() 

43 self.items = {} 

44 self.coverage = {} 

45 self.custom_data = {} 

46 

47 def parse_config(self, filename): 

48 """ 

49 Function parses the lobster config file to generate a .lobster file. 

50 Parameters 

51 ---------- 

52 filename - configuration file 

53 

54 Returns - Nothing 

55 ------- 

56 

57 """ 

58 

59 # Load config 

60 self.config = load_config(self.mh, filename) 

61 

62 # Load requested files 

63 for level in self.config: 

64 for source in self.config[level]["source"]: 

65 lobster_read(self.mh, source["file"], level, self.items, 

66 source) 

67 

68 # Resolve references for items 

69 self.resolve_references_for_items() 

70 

71 # Compute status and items count 

72 self.compute_item_count_and_status() 

73 

74 # Compute coverage for items 

75 self.compute_coverage_for_items() 

76 

77 def resolve_references_for_items(self): 

78 for src_item in self.items.values(): 

79 while src_item.unresolved_references: 

80 dst_tag = src_item.unresolved_references.pop() 

81 if dst_tag.key() not in self.items: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 src_item.error("unknown tracing target %s" % dst_tag.key()) 

83 continue 

84 dst_item = self.items[dst_tag.key()] 

85 # TODO: Check if policy allows this link 

86 src_item.ref_up.append(dst_tag) 

87 dst_item.ref_down.append(src_item.tag) 

88 

89 # Check versions match, if specified 

90 if dst_tag.version is not None: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 if dst_item.tag.version is None: 

92 src_item.error("tracing destination %s is unversioned" 

93 % dst_tag.key()) 

94 elif dst_tag.version != dst_item.tag.version: 

95 src_item.error("tracing destination %s has version %s" 

96 " (expected %s)" % 

97 (dst_tag.key(), 

98 dst_item.tag.version, 

99 dst_tag.version)) 

100 

101 def compute_coverage_for_items(self): 

102 for level_obj in self.coverage.values(): 

103 if level_obj.items == 0: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 level_obj.coverage = 0.0 

105 else: 

106 level_obj.coverage = float(level_obj.ok * 100) / float(level_obj.items) 

107 

108 def compute_item_count_and_status(self): 

109 for level in self.config: 

110 coverage = Coverage(level=level, items=0, ok=0, coverage=None) 

111 self.coverage.update({level: coverage}) 

112 for item in self.items.values(): 

113 item.determine_status(self.config, self.items) 

114 self.coverage[item.level].items += 1 

115 if item.tracing_status in (Tracing_Status.OK, 

116 Tracing_Status.JUSTIFIED): 

117 self.coverage[item.level].ok += 1 

118 

119 def write_report(self, filename): 

120 

121 levels = [] 

122 for level_config in self.config.values(): 

123 level = { 

124 "name" : level_config["name"], 

125 "kind" : level_config["kind"], 

126 "items" : [item.to_json() 

127 for item in self.items.values() 

128 if item.level == level_config["name"]], 

129 "coverage" : self.coverage[level_config["name"]].coverage 

130 } 

131 levels.append(level) 

132 

133 report = { 

134 "schema" : "lobster-report", 

135 "version" : 2, 

136 "generator" : "lobster_report", 

137 "levels" : levels, 

138 "policy" : self.config, 

139 "matrix" : [], 

140 } 

141 

142 with open(filename, "w", encoding="UTF-8") as fd: 

143 json.dump(report, fd, indent=2) 

144 fd.write("\n") 

145 

146 def load_report(self, filename): 

147 

148 loc = File_Reference(filename) 

149 

150 # Read and validate JSON 

151 with open(filename, "r", encoding="UTF-8") as fd: 

152 try: 

153 data = json.load(fd) 

154 except json.decoder.JSONDecodeError as err: 

155 self.mh.error(File_Reference(filename, 

156 err.lineno, 

157 err.colno), 

158 err.msg) 

159 

160 # Validate basic structure 

161 self.validate_basic_structure_of_lobster_file(data, loc) 

162 

163 # Validate indicated schema 

164 self.validate_indicated_schema(data, loc) 

165 

166 # Validate and parse custom data 

167 self.validate_and_parse_custom_data(data, loc) 

168 

169 # Read in data 

170 self.compute_items_and_coverage_for_items(data) 

171 

172 def compute_items_and_coverage_for_items(self, data): 

173 """ 

174 Function calculates items and coverage for the items 

175 Parameters 

176 ---------- 

177 data - contents of lobster json file. 

178 

179 Returns - Nothing 

180 ------- 

181 

182 """ 

183 self.config = data["policy"] 

184 for level in data["levels"]: 

185 if level["name"] not in self.config: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 raise KeyError(f"level '{level['name']}' not found in config") 

187 coverage = Coverage( 

188 level=level["name"], items=0, ok=0, coverage=level["coverage"] 

189 ) 

190 self.coverage.update({level["name"]: coverage}) 

191 

192 for item_data in level["items"]: 

193 if level["kind"] == "requirements": 

194 item = Requirement.from_json(level["name"], 

195 item_data, 

196 3) 

197 elif level["kind"] == "implementation": 

198 item = Implementation.from_json(level["name"], 

199 item_data, 

200 3) 

201 else: 

202 if level["kind"] != "activity": 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 raise ValueError(f"unknown level kind '{level['kind']}'") 

204 item = Activity.from_json(level["name"], 

205 item_data, 

206 3) 

207 

208 self.items[item.tag.key()] = item 

209 self.coverage[item.level].items += 1 

210 if item.tracing_status in (Tracing_Status.OK, 

211 Tracing_Status.JUSTIFIED): 

212 self.coverage[item.level].ok += 1 

213 

214 def validate_and_parse_custom_data(self, data, loc): 

215 """ 

216 Function validates the optional 'custom_data' field in the lobster report. 

217 Ensures that if present, it is a dictionary with string keys and string values. 

218 

219 Parameters 

220 ---------- 

221 data - contents of lobster json file. 

222 loc - location from where the error was raised. 

223 

224 Returns - Nothing 

225 ------- 

226 """ 

227 self.custom_data = data.get('custom_data', None) 

228 if self.custom_data: 

229 if not isinstance(self.custom_data, dict): 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 self.mh.error(loc, "'custom_data' must be an object (dictionary).") 

231 

232 for key, value in self.custom_data.items(): 

233 if not isinstance(key, str): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 self.mh.error(loc, 

235 f"Key in 'custom_data' must be a " 

236 f"string, got {type(key).__name__}.") 

237 if not isinstance(value, str): 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 self.mh.error(loc, 

239 f"Value for key '{key}' in 'custom_data' " 

240 f"must be a string, got {type(value).__name__}.") 

241 

242 def validate_indicated_schema(self, data, loc): 

243 """ 

244 Function validates the schema and version. 

245 Parameters 

246 ---------- 

247 data - contents of lobster json file. 

248 loc - location from where the error was raised. 

249 

250 Returns - Nothing 

251 ------- 

252 

253 """ 

254 supported_schema = { 

255 "lobster-report": set([2]), 

256 } 

257 if data["schema"] not in supported_schema: 257 ↛ 258line 257 didn't jump to line 258 because the condition on line 257 was never true

258 self.mh.error(loc, "unknown schema kind %s" % data["schema"]) 

259 if data["version"] not in supported_schema[data["schema"]]: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 self.mh.error(loc, 

261 "version %u for schema %s is not supported" % 

262 (data["version"], data["schema"])) 

263 

264 def validate_basic_structure_of_lobster_file(self, data, loc): 

265 """ 

266 Function validates the basic structure of lobster file. All the first level 

267 keys of the lobster json file are validated here. 

268 Parameters 

269 ---------- 

270 data - contents of lobster json file. 

271 loc - location from where the error was raised. 

272 

273 Returns - Nothing 

274 ------- 

275 

276 """ 

277 if not isinstance(data, dict): 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true

278 self.mh.error(loc, "parsed json is not an object") 

279 

280 rkey_dict = {"schema": str, "version": int, "generator": str, "levels": list, 

281 "policy": dict, "matrix": list} 

282 type_dict = {int: "an integer", str: "a string", list: "an array", 

283 dict: "an object"} 

284 for rkey, rvalue in rkey_dict.items(): 

285 if rkey not in data: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 self.mh.error(loc, "required top-levelkey %s not present" % rkey) 

287 if not isinstance(data[rkey], rvalue): 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 self.mh.error(loc, "%s is not %s." % (rkey, type_dict[rvalue]))