Coverage for lobster/items.py: 95%

216 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 09:51 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20from enum import Enum, auto 

21from abc import ABCMeta 

22from hashlib import sha1 

23from typing import Optional 

24 

25from lobster.location import Location 

26 

27 

28class Tracing_Tag: 

29 def __init__( 

30 self, 

31 namespace: str, 

32 tag: str, 

33 version: Optional[str] = None, 

34 ): 

35 assert isinstance(namespace, str) and " " not in namespace 

36 assert isinstance(tag, str) 

37 assert version is None or isinstance(version, (str, int)) 

38 assert not isinstance(version, str) or version != "None" 

39 

40 self.namespace = namespace 

41 self.tag = tag 

42 self.version = version 

43 self.hash_val = None 

44 

45 def __str__(self): 

46 rv = "%s %s" % (self.namespace, 

47 self.tag) 

48 if self.version: 

49 rv += "@%s" % str(self.version) 

50 return rv 

51 

52 def key(self) -> str: 

53 return self.namespace + " " + self.tag 

54 

55 def to_json(self) -> str: 

56 return str(self) 

57 

58 @classmethod 

59 def from_json(cls, json): 

60 assert isinstance(json, str) 

61 namespace, rest = json.split(" ", 1) 

62 return Tracing_Tag.from_text(namespace, rest) 

63 

64 @classmethod 

65 def from_text(cls, namespace, text): 

66 assert isinstance(namespace, str) 

67 assert isinstance(text, str) 

68 

69 if "@" in text: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 tag, version = text.split("@", 1) 

71 else: 

72 tag = text 

73 version = None 

74 return Tracing_Tag(namespace, tag, version) 

75 

76 def hash(self): 

77 if not self.hash_val: 

78 hfunc = sha1() 

79 hfunc.update(self.key().encode("UTF-8")) 

80 self.hash_val = hfunc.hexdigest() 

81 return self.hash_val 

82 

83 

84class Tracing_Status(Enum): 

85 OK = auto() 

86 PARTIAL = auto() 

87 MISSING = auto() 

88 JUSTIFIED = auto() 

89 ERROR = auto() 

90 

91 

92class Item(metaclass=ABCMeta): 

93 def __init__(self, tag: Tracing_Tag, location: Location): 

94 assert isinstance(tag, Tracing_Tag) 

95 assert isinstance(location, Location) 

96 

97 self.level = None 

98 self.tag = tag 

99 self.location = location 

100 self.name = tag.tag 

101 

102 self.ref_up = [] 

103 self.ref_down = [] 

104 

105 self.unresolved_references_cache = set() 

106 self.unresolved_references = [] 

107 

108 self.messages = [] 

109 self.just_up = [] 

110 self.just_down = [] 

111 self.just_global = [] 

112 

113 self.tracing_status = None 

114 self.has_error = False 

115 

116 def set_level(self, level): 

117 assert isinstance(level, str) 

118 self.level = level 

119 

120 def error(self, message): 

121 assert isinstance(message, str) 

122 self.messages.append(message) 

123 self.has_error = True 

124 

125 def add_tracing_target(self, target): 

126 assert isinstance(target, Tracing_Tag) 

127 if target.key() in self.unresolved_references_cache: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 return 

129 

130 self.unresolved_references.append(target) 

131 self.unresolved_references_cache.add(target.key()) 

132 

133 def perform_source_checks(self, source_info): 

134 assert isinstance(source_info, dict) 

135 

136 def determine_status(self, config, stab): 

137 assert self.level in config 

138 assert self.tag.key() in stab 

139 

140 level = config[self.level] 

141 

142 has_up_ref = len(self.ref_up) > 0 

143 has_just_up = len(self.just_up) > 0 or len(self.just_global) > 0 

144 has_just_down = len(self.just_down) > 0 or len(self.just_global) > 0 

145 has_init_errors = len(self.messages) > 0 

146 

147 # Check up references 

148 ok_up = True 

149 if level["needs_tracing_up"]: 

150 if not has_up_ref and not has_just_up: 

151 ok_up = False 

152 self.messages.append("missing up reference") 

153 

154 # Check set of down references 

155 ok_down = True 

156 if level["needs_tracing_down"]: 

157 has_trace = {name : False 

158 for name in config 

159 if self.level in config[name]["traces"]} 

160 for ref in self.ref_down: 

161 has_trace[stab[ref.key()].level] = True 

162 for chain in level["breakdown_requirements"]: 

163 if not any(has_trace[src] for src in chain) and \ 

164 not has_just_down: 

165 ok_down = False 

166 self.messages.append("missing reference to %s" % 

167 " or ".join(sorted(chain))) 

168 

169 # Set status 

170 if self.has_error: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 self.tracing_status = Tracing_Status.MISSING 

172 elif ok_up and ok_down: 

173 if has_just_up or has_just_down: 

174 self.tracing_status = Tracing_Status.JUSTIFIED 

175 else: 

176 self.tracing_status = Tracing_Status.OK 

177 elif (ok_up or ok_down) and \ 177 ↛ 180line 177 didn't jump to line 180 because the condition on line 177 was never true

178 level["needs_tracing_up"] and \ 

179 level["needs_tracing_down"]: 

180 self.tracing_status = Tracing_Status.PARTIAL 

181 else: 

182 self.tracing_status = Tracing_Status.MISSING 

183 

184 # Overwrite status if there are initial errors 

185 if self.tracing_status == Tracing_Status.OK and has_init_errors: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 self.tracing_status = Tracing_Status.PARTIAL 

187 

188 def additional_data_from_json(self, level, data, schema_version): 

189 assert isinstance(level, str) 

190 assert isinstance(data, dict) 

191 assert schema_version >= 3 

192 

193 self.set_level(level) 

194 for ref in data.get("refs", []): 

195 self.add_tracing_target(Tracing_Tag.from_json(ref)) 

196 self.ref_up = [Tracing_Tag.from_json(ref) 

197 for ref in data.get("ref_up", [])] 

198 self.ref_down = [Tracing_Tag.from_json(ref) 

199 for ref in data.get("ref_down", [])] 

200 self.messages = data.get("messages", []) 

201 self.just_up = data.get("just_up", []) 

202 self.just_down = data.get("just_down", []) 

203 self.just_global = data.get("just_global", []) 

204 if "tracing_status" in data: 

205 self.tracing_status = Tracing_Status[data["tracing_status"]] 

206 

207 def to_json(self): 

208 rv = { 

209 "tag" : self.tag.to_json(), 

210 "location" : self.location.to_json(), 

211 "name" : self.name, 

212 "messages" : self.messages, 

213 "just_up" : self.just_up, 

214 "just_down" : self.just_down, 

215 "just_global" : self.just_global, 

216 } 

217 if self.unresolved_references: 

218 rv["refs"] = [tag.to_json() 

219 for tag in self.unresolved_references] 

220 if self.ref_up or self.ref_down: 

221 rv["ref_up"] = [tag.to_json() for tag in self.ref_up] 

222 rv["ref_down"] = [tag.to_json() for tag in self.ref_down] 

223 if self.tracing_status: 

224 rv["tracing_status"] = self.tracing_status.name 

225 return rv 

226 

227 

228class Requirement(Item): 

229 def __init__( 

230 self, 

231 tag: Tracing_Tag, 

232 location: Location, 

233 framework: str, 

234 kind: str, 

235 name: str, 

236 text: Optional[str] = None, 

237 status: Optional[str] = None, 

238 ): 

239 super().__init__(tag, location) 

240 assert isinstance(framework, str) 

241 assert isinstance(kind, str) 

242 assert isinstance(name, str) 

243 assert isinstance(text, str) or text is None 

244 assert isinstance(status, str) or status is None 

245 

246 self.framework = framework 

247 self.kind = kind 

248 self.name = name 

249 self.text = text 

250 self.status = status 

251 

252 def to_json(self): 

253 rv = super().to_json() 

254 rv["framework"] = self.framework 

255 rv["kind"] = self.kind 

256 rv["text"] = self.text 

257 rv["status"] = self.status 

258 return rv 

259 

260 def perform_source_checks(self, source_info): 

261 assert isinstance(source_info, dict) 

262 

263 @classmethod 

264 def from_json(cls, level, data, schema_version): 

265 assert isinstance(level, str) 

266 assert isinstance(data, dict) 

267 assert schema_version in (3, 4) 

268 

269 item = Requirement(tag = Tracing_Tag.from_json(data["tag"]), 

270 location = Location.from_json(data["location"]), 

271 framework = data["framework"], 

272 kind = data["kind"], 

273 name = data["name"], 

274 text = data.get("text", None), 

275 status = data.get("status", None)) 

276 item.additional_data_from_json(level, data, schema_version) 

277 

278 return item 

279 

280 

281class Implementation(Item): 

282 def __init__( 

283 self, 

284 tag: Tracing_Tag, 

285 location: Location, 

286 language: str, 

287 kind: str, 

288 name: str, 

289 ): 

290 super().__init__(tag, location) 

291 assert isinstance(language, str) 

292 assert isinstance(kind, str) 

293 assert isinstance(name, str) 

294 

295 self.language = language 

296 self.kind = kind 

297 self.name = name 

298 

299 def to_json(self): 

300 rv = super().to_json() 

301 rv["language"] = self.language 

302 rv["kind"] = self.kind 

303 return rv 

304 

305 @classmethod 

306 def from_json(cls, level, data, schema_version): 

307 assert isinstance(level, str) 

308 assert isinstance(data, dict) 

309 assert schema_version == 3 

310 

311 item = Implementation(tag = Tracing_Tag.from_json(data["tag"]), 

312 location = Location.from_json(data["location"]), 

313 language = data["language"], 

314 kind = data["kind"], 

315 name = data["name"]) 

316 item.additional_data_from_json(level, data, schema_version) 

317 

318 return item 

319 

320 

321class Activity(Item): 

322 def __init__( 

323 self, 

324 tag: Tracing_Tag, 

325 location: Location, 

326 framework: str, 

327 kind: str, 

328 status: Optional[str] = None, 

329 ): 

330 super().__init__(tag, location) 

331 assert isinstance(framework, str) 

332 assert isinstance(kind, str) 

333 assert isinstance(status, str) or status is None 

334 

335 self.framework = framework 

336 self.kind = kind 

337 self.status = status 

338 

339 def to_json(self): 

340 rv = super().to_json() 

341 rv["framework"] = self.framework 

342 rv["kind"] = self.kind 

343 rv["status"] = self.status 

344 return rv 

345 

346 @classmethod 

347 def from_json(cls, level, data, schema_version): 

348 assert isinstance(level, str) 

349 assert isinstance(data, dict) 

350 assert schema_version == 3 

351 

352 item = Activity(tag = Tracing_Tag.from_json(data["tag"]), 

353 location = Location.from_json(data["location"]), 

354 framework = data["framework"], 

355 kind = data["kind"], 

356 status = data.get("status", None)) 

357 item.additional_data_from_json(level, data, schema_version) 

358 

359 return item