Coverage for lobster/report.py: 54%

126 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 14:55 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os.path 

21import json 

22from collections import OrderedDict 

23from dataclasses import dataclass 

24 

25from lobster.items import Tracing_Status, Requirement, Implementation, Activity 

26from lobster.config.parser import load as load_config 

27from lobster.errors import Message_Handler 

28from lobster.io import lobster_read 

29from lobster.location import File_Reference 

30 

31 

32@dataclass 

33class Coverage: 

34 level : str 

35 items : int 

36 ok : int 

37 coverage : None 

38 

39 

40class Report: 

41 def __init__(self): 

42 self.mh = Message_Handler() 

43 self.config = OrderedDict() 

44 self.items = {} 

45 self.coverage = {} 

46 self.custom_data = {} 

47 

48 def parse_config(self, filename): 

49 """ 

50 Function parses the lobster config file to generate a .lobster file. 

51 Parameters 

52 ---------- 

53 filename - configuration file 

54 

55 Returns - Nothing 

56 ------- 

57 

58 """ 

59 assert isinstance(filename, str) 

60 assert os.path.isfile(filename) 

61 

62 # Load config 

63 self.config = load_config(self.mh, filename) 

64 

65 # Load requested files 

66 for level in self.config: 

67 for source in self.config[level]["source"]: 

68 lobster_read(self.mh, source["file"], level, self.items, 

69 source) 

70 

71 # Resolve references for items 

72 self.resolve_references_for_items() 

73 

74 # Compute status and items count 

75 self.compute_item_count_and_status() 

76 

77 # Compute coverage for items 

78 self.compute_coverage_for_items() 

79 

80 def resolve_references_for_items(self): 

81 for src_item in self.items.values(): 

82 while src_item.unresolved_references: 

83 dst_tag = src_item.unresolved_references.pop() 

84 if dst_tag.key() not in self.items: 

85 src_item.error("unknown tracing target %s" % dst_tag.key()) 

86 continue 

87 dst_item = self.items[dst_tag.key()] 

88 # TODO: Check if policy allows this link 

89 src_item.ref_up.append(dst_tag) 

90 dst_item.ref_down.append(src_item.tag) 

91 

92 # Check versions match, if specified 

93 if dst_tag.version is not None: 

94 if dst_item.tag.version is None: 

95 src_item.error("tracing destination %s is unversioned" 

96 % dst_tag.key()) 

97 elif dst_tag.version != dst_item.tag.version: 

98 src_item.error("tracing destination %s has version %s" 

99 " (expected %s)" % 

100 (dst_tag.key(), 

101 dst_item.tag.version, 

102 dst_tag.version)) 

103 

104 def compute_coverage_for_items(self): 

105 for level_obj in self.coverage.values(): 

106 if level_obj.items == 0: 

107 level_obj.coverage = 0.0 

108 else: 

109 level_obj.coverage = float(level_obj.ok * 100) / float(level_obj.items) 

110 

111 def compute_item_count_and_status(self): 

112 for level in self.config: 

113 coverage = Coverage(level=level, items=0, ok=0, coverage=None) 

114 self.coverage.update({level: coverage}) 

115 for item in self.items.values(): 

116 item.determine_status(self.config, self.items) 

117 self.coverage[item.level].items += 1 

118 if item.tracing_status in (Tracing_Status.OK, 

119 Tracing_Status.JUSTIFIED): 

120 self.coverage[item.level].ok += 1 

121 

122 def write_report(self, filename): 

123 assert isinstance(filename, str) 

124 

125 levels = [] 

126 for level_config in self.config.values(): 

127 level = { 

128 "name" : level_config["name"], 

129 "kind" : level_config["kind"], 

130 "items" : [item.to_json() 

131 for item in self.items.values() 

132 if item.level == level_config["name"]], 

133 "coverage" : self.coverage[level_config["name"]].coverage 

134 } 

135 levels.append(level) 

136 

137 report = { 

138 "schema" : "lobster-report", 

139 "version" : 2, 

140 "generator" : "lobster_report", 

141 "levels" : levels, 

142 "policy" : self.config, 

143 "matrix" : [], 

144 } 

145 

146 with open(filename, "w", encoding="UTF-8") as fd: 

147 json.dump(report, fd, indent=2) 

148 fd.write("\n") 

149 

150 def load_report(self, filename): 

151 assert isinstance(filename, str) 

152 

153 loc = File_Reference(filename) 

154 

155 # Read and validate JSON 

156 with open(filename, "r", encoding="UTF-8") as fd: 

157 try: 

158 data = json.load(fd) 

159 except json.decoder.JSONDecodeError as err: 

160 self.mh.error(File_Reference(filename, 

161 err.lineno, 

162 err.colno), 

163 err.msg) 

164 

165 # Validate basic structure 

166 self.validate_basic_structure_of_lobster_file(data, loc) 

167 

168 # Validate indicated schema 

169 self.validate_indicated_schema(data, loc) 

170 

171 # Validate and parse custom data 

172 self.validate_and_parse_custom_data(data, loc) 

173 

174 # Read in data 

175 self.compute_items_and_coverage_for_items(data) 

176 

177 def compute_items_and_coverage_for_items(self, data): 

178 """ 

179 Function calculates items and coverage for the items 

180 Parameters 

181 ---------- 

182 data - contents of lobster json file. 

183 

184 Returns - Nothing 

185 ------- 

186 

187 """ 

188 self.config = data["policy"] 

189 for level in data["levels"]: 

190 assert level["name"] in self.config, ( 

191 f"level '{level['name']}' not found in config" 

192 ) 

193 coverage = Coverage( 

194 level=level["name"], items=0, ok=0, coverage=level["coverage"] 

195 ) 

196 self.coverage.update({level["name"]: coverage}) 

197 

198 for item_data in level["items"]: 

199 if level["kind"] == "requirements": 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true

200 item = Requirement.from_json(level["name"], 

201 item_data, 

202 3) 

203 elif level["kind"] == "implementation": 203 ↛ 208line 203 didn't jump to line 208 because the condition on line 203 was always true

204 item = Implementation.from_json(level["name"], 

205 item_data, 

206 3) 

207 else: 

208 assert level["kind"] == "activity", ( 

209 f"unknown level kind '{level['kind']}'" 

210 ) 

211 item = Activity.from_json(level["name"], 

212 item_data, 

213 3) 

214 

215 self.items[item.tag.key()] = item 

216 self.coverage[item.level].items += 1 

217 if item.tracing_status in (Tracing_Status.OK, 

218 Tracing_Status.JUSTIFIED): 

219 self.coverage[item.level].ok += 1 

220 

221 def validate_and_parse_custom_data(self, data, loc): 

222 """ 

223 Function validates the optional 'custom_data' field in the lobster report. 

224 Ensures that if present, it is a dictionary with string keys and string values. 

225 

226 Parameters 

227 ---------- 

228 data - contents of lobster json file. 

229 loc - location from where the error was raised. 

230 

231 Returns - Nothing 

232 ------- 

233 """ 

234 self.custom_data = data.get('custom_data', None) 

235 if self.custom_data: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 if not isinstance(self.custom_data, dict): 

237 self.mh.error(loc, "'custom_data' must be an object (dictionary).") 

238 

239 for key, value in self.custom_data.items(): 

240 if not isinstance(key, str): 

241 self.mh.error(loc, 

242 f"Key in 'custom_data' must be a " 

243 f"string, got {type(key).__name__}.") 

244 if not isinstance(value, str): 

245 self.mh.error(loc, 

246 f"Value for key '{key}' in 'custom_data' " 

247 f"must be a string, got {type(value).__name__}.") 

248 

249 def validate_indicated_schema(self, data, loc): 

250 """ 

251 Function validates the schema and version. 

252 Parameters 

253 ---------- 

254 data - contents of lobster json file. 

255 loc - location from where the error was raised. 

256 

257 Returns - Nothing 

258 ------- 

259 

260 """ 

261 supported_schema = { 

262 "lobster-report": set([2]), 

263 } 

264 if data["schema"] not in supported_schema: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 self.mh.error(loc, "unknown schema kind %s" % data["schema"]) 

266 if data["version"] not in supported_schema[data["schema"]]: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 self.mh.error(loc, 

268 "version %u for schema %s is not supported" % 

269 (data["version"], data["schema"])) 

270 

271 def validate_basic_structure_of_lobster_file(self, data, loc): 

272 """ 

273 Function validates the basic structure of lobster file. All the first level 

274 keys of the lobster json file are validated here. 

275 Parameters 

276 ---------- 

277 data - contents of lobster json file. 

278 loc - location from where the error was raised. 

279 

280 Returns - Nothing 

281 ------- 

282 

283 """ 

284 if not isinstance(data, dict): 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true

285 self.mh.error(loc, "parsed json is not an object") 

286 

287 rkey_dict = {"schema": str, "version": int, "generator": str, "levels": list, 

288 "policy": dict, "matrix": list} 

289 type_dict = {int: "an integer", str: "a string", list: "an array", 

290 dict: "an object"} 

291 for rkey, rvalue in rkey_dict.items(): 

292 if rkey not in data: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 self.mh.error(loc, "required top-levelkey %s not present" % rkey) 

294 if not isinstance(data[rkey], rvalue): 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true

295 self.mh.error(loc, "%s is not %s." % (rkey, type_dict[rvalue]))