Coverage for lobster/common/report.py: 51%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-27 13:02 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import json 

21from collections import OrderedDict 

22from dataclasses import dataclass 

23 

24from lobster.common.level_definition import LevelDefinition 

25from lobster.common.items import Tracing_Status, Requirement, Implementation, Activity 

26from lobster.common.parser import load as load_config 

27from lobster.common.errors import Message_Handler 

28from lobster.common.io import lobster_read 

29from lobster.common.location import File_Reference 

30 

31 

32@dataclass 

33class Coverage: 

34 level : str 

35 items : int 

36 ok : int 

37 coverage : None 

38 

39 

40class Report: 

41 def __init__(self): 

42 self.mh = Message_Handler() 

43 self.config = OrderedDict() 

44 self.items = {} 

45 self.coverage = {} 

46 self.custom_data = {} 

47 

48 def parse_config(self, filename): 

49 """ 

50 Function parses the lobster config file to generate a .lobster file. 

51 Parameters 

52 ---------- 

53 filename - configuration file 

54 

55 Returns - Nothing 

56 ------- 

57 

58 """ 

59 

60 # Load config 

61 self.config = load_config(self.mh, filename) 

62 

63 # Load requested files 

64 for level in self.config: 

65 for source in self.config[level].source: 

66 lobster_read(self.mh, source["file"], level, self.items, 

67 source) 

68 

69 # Resolve references for items 

70 self.resolve_references_for_items() 

71 

72 # Compute status and items count 

73 self.compute_item_count_and_status() 

74 

75 # Compute coverage for items 

76 self.compute_coverage_for_items() 

77 

78 def resolve_references_for_items(self): 

79 for src_item in self.items.values(): 

80 while src_item.unresolved_references: 

81 dst_tag = src_item.unresolved_references.pop() 

82 if dst_tag.key() not in self.items: 

83 src_item.error("unknown tracing target %s" % dst_tag.key()) 

84 continue 

85 dst_item = self.items[dst_tag.key()] 

86 # TODO: Check if policy allows this link 

87 src_item.ref_up.append(dst_tag) 

88 dst_item.ref_down.append(src_item.tag) 

89 

90 # Check versions match, if specified 

91 if dst_tag.version is not None: 

92 if dst_item.tag.version is None: 

93 src_item.error("tracing destination %s is unversioned" 

94 % dst_tag.key()) 

95 elif dst_tag.version != dst_item.tag.version: 

96 src_item.error("tracing destination %s has version %s" 

97 " (expected %s)" % 

98 (dst_tag.key(), 

99 dst_item.tag.version, 

100 dst_tag.version)) 

101 

102 def compute_coverage_for_items(self): 

103 for level_obj in self.coverage.values(): 

104 if level_obj.items == 0: 

105 level_obj.coverage = 0.0 

106 else: 

107 level_obj.coverage = float(level_obj.ok * 100) / float(level_obj.items) 

108 

109 def compute_item_count_and_status(self): 

110 for level in self.config: 

111 coverage = Coverage(level=level, items=0, ok=0, coverage=None) 

112 self.coverage.update({level: coverage}) 

113 for item in self.items.values(): 

114 item.determine_status(self.config, self.items) 

115 self.coverage[item.level].items += 1 

116 if item.tracing_status in (Tracing_Status.OK, 

117 Tracing_Status.JUSTIFIED): 

118 self.coverage[item.level].ok += 1 

119 

120 def write_report(self, filename): 

121 

122 levels = [] 

123 for level_config in self.config.values(): 

124 level = { 

125 "name" : level_config.name, 

126 "kind" : level_config.kind, 

127 "items" : [item.to_json() 

128 for item in self.items.values() 

129 if item.level == level_config.name], 

130 "coverage" : self.coverage[level_config.name].coverage 

131 } 

132 levels.append(level) 

133 

134 report = { 

135 "schema" : "lobster-report", 

136 "version" : 2, 

137 "generator" : "lobster_report", 

138 "levels" : levels, 

139 "policy" : {key: value.to_json() 

140 for key, value in self.config.items()}, 

141 "matrix" : [], 

142 } 

143 

144 with open(filename, "w", encoding="UTF-8") as fd: 

145 json.dump(report, fd, indent=2) 

146 fd.write("\n") 

147 

148 def load_report(self, filename): 

149 

150 loc = File_Reference(filename) 

151 

152 # Read and validate JSON 

153 with open(filename, "r", encoding="UTF-8") as fd: 

154 try: 

155 data = json.load(fd) 

156 except json.decoder.JSONDecodeError as err: 

157 self.mh.error(File_Reference(filename, 

158 err.lineno, 

159 err.colno), 

160 err.msg) 

161 

162 # Validate basic structure 

163 self.validate_basic_structure_of_lobster_file(data, loc) 

164 

165 # Validate indicated schema 

166 self.validate_indicated_schema(data, loc) 

167 

168 # Validate and parse custom data 

169 self.parse_custom_data(data) 

170 

171 # Read in data 

172 self.compute_items_and_coverage_for_items(data) 

173 

174 def compute_items_and_coverage_for_items(self, data): 

175 """ 

176 Function calculates items and coverage for the items 

177 Parameters 

178 ---------- 

179 data - contents of lobster json file. 

180 

181 Returns - Nothing 

182 ------- 

183 

184 """ 

185 self.config = {key: LevelDefinition.from_json(value) 

186 for key, value in data["policy"].items()} 

187 for level in data["levels"]: 

188 if level["name"] not in self.config: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 raise KeyError(f"level '{level['name']}' not found in config") 

190 coverage = Coverage( 

191 level=level["name"], items=0, ok=0, coverage=level["coverage"] 

192 ) 

193 self.coverage.update({level["name"]: coverage}) 

194 

195 for item_data in level["items"]: 

196 if level["kind"] == "requirements": 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 item = Requirement.from_json(level["name"], 

198 item_data, 

199 3) 

200 elif level["kind"] == "implementation": 200 ↛ 205line 200 didn't jump to line 205 because the condition on line 200 was always true

201 item = Implementation.from_json(level["name"], 

202 item_data, 

203 3) 

204 else: 

205 if level["kind"] != "activity": 

206 raise ValueError(f"unknown level kind '{level['kind']}'") 

207 item = Activity.from_json(level["name"], 

208 item_data, 

209 3) 

210 

211 self.items[item.tag.key()] = item 

212 self.coverage[item.level].items += 1 

213 if item.tracing_status in (Tracing_Status.OK, 

214 Tracing_Status.JUSTIFIED): 

215 self.coverage[item.level].ok += 1 

216 

217 def parse_custom_data(self, data): 

218 self.custom_data = data.get('custom_data', None) 

219 

220 def validate_indicated_schema(self, data, loc): 

221 """ 

222 Function validates the schema and version. 

223 Parameters 

224 ---------- 

225 data - contents of lobster json file. 

226 loc - location from where the error was raised. 

227 

228 Returns - Nothing 

229 ------- 

230 

231 """ 

232 supported_schema = { 

233 "lobster-report": set([2]), 

234 } 

235 if data["schema"] not in supported_schema: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 self.mh.error(loc, "unknown schema kind %s" % data["schema"]) 

237 if data["version"] not in supported_schema[data["schema"]]: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 self.mh.error(loc, 

239 "version %u for schema %s is not supported" % 

240 (data["version"], data["schema"])) 

241 

242 def validate_basic_structure_of_lobster_file(self, data, loc): 

243 """ 

244 Function validates the basic structure of lobster file. All the first level 

245 keys of the lobster json file are validated here. 

246 Parameters 

247 ---------- 

248 data - contents of lobster json file. 

249 loc - location from where the error was raised. 

250 

251 Returns - Nothing 

252 ------- 

253 

254 """ 

255 if not isinstance(data, dict): 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 self.mh.error(loc, "parsed json is not an object") 

257 

258 rkey_dict = {"schema": str, "version": int, "generator": str, "levels": list, 

259 "policy": dict, "matrix": list} 

260 type_dict = {int: "an integer", str: "a string", list: "an array", 

261 dict: "an object"} 

262 for rkey, rvalue in rkey_dict.items(): 

263 if rkey not in data: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 self.mh.error(loc, "required top-levelkey %s not present" % rkey) 

265 if not isinstance(data[rkey], rvalue): 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 self.mh.error(loc, "%s is not %s." % (rkey, type_dict[rvalue]))