Coverage for lobster/common/items.py: 94%

219 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-04-16 05:31 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20from enum import Enum, auto 

21from abc import ABCMeta 

22from hashlib import sha1 

23from typing import Optional 

24 

25from lobster.common.location import Location 

26 

27 

28class Tracing_Tag: 

29 def __init__( 

30 self, 

31 namespace: str, 

32 tag: str, 

33 version: Optional[str] = None, 

34 ): 

35 assert isinstance(namespace, str) and " " not in namespace 

36 assert isinstance(tag, str) 

37 assert version is None or isinstance(version, (str, int)) 

38 assert not isinstance(version, str) or version != "None" 

39 

40 self.namespace = namespace 

41 self.tag = tag 

42 self.version = version 

43 self.hash_val = None 

44 

45 def __str__(self): 

46 rv = "%s %s" % (self.namespace, 

47 self.tag) 

48 if self.version: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 rv += "@%s" % str(self.version) 

50 return rv 

51 

52 def key(self) -> str: 

53 return self.namespace + " " + self.tag 

54 

55 def to_json(self) -> str: 

56 return str(self) 

57 

58 @classmethod 

59 def from_json(cls, json): 

60 assert isinstance(json, str) 

61 namespace, rest = json.split(" ", 1) 

62 return Tracing_Tag.from_text(namespace, rest) 

63 

64 @classmethod 

65 def from_text(cls, namespace, text): 

66 assert isinstance(namespace, str) 

67 assert isinstance(text, str) 

68 

69 if "@" in text: 

70 tag, version = text.split("@", 1) 

71 else: 

72 tag = text 

73 version = None 

74 return Tracing_Tag(namespace, tag, version) 

75 

76 def hash(self): 

77 if not self.hash_val: 77 ↛ 81line 77 didn't jump to line 81 because the condition on line 77 was always true

78 hfunc = sha1() 

79 hfunc.update(self.key().encode("UTF-8")) 

80 self.hash_val = hfunc.hexdigest() 

81 return self.hash_val 

82 

83 

84class Tracing_Status(Enum): 

85 OK = auto() 

86 PARTIAL = auto() 

87 MISSING = auto() 

88 JUSTIFIED = auto() 

89 ERROR = auto() 

90 

91 

92class Item(metaclass=ABCMeta): 

93 def __init__(self, tag: Tracing_Tag, location: Location): 

94 assert isinstance(tag, Tracing_Tag) 

95 assert isinstance(location, Location) 

96 

97 self.level = None 

98 self.tag = tag 

99 self.location = location 

100 self.name = tag.tag 

101 

102 self.ref_up = [] 

103 self.ref_down = [] 

104 

105 self.unresolved_references_cache = set() 

106 self.unresolved_references = [] 

107 

108 self.messages = [] 

109 self.just_up = [] 

110 self.just_down = [] 

111 self.just_global = [] 

112 

113 self.tracing_status = None 

114 self.has_error = False 

115 

116 def set_level(self, level): 

117 assert isinstance(level, str) 

118 self.level = level 

119 

120 def error(self, message: str): 

121 assert isinstance(message, str) 

122 self.messages.append(message) 

123 self.has_error = True 

124 

125 def add_tracing_target(self, target: Tracing_Tag): 

126 assert isinstance(target, Tracing_Tag) 

127 if target.key() in self.unresolved_references_cache: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 return 

129 

130 self.unresolved_references.append(target) 

131 self.unresolved_references_cache.add(target.key()) 

132 

133 def perform_source_checks(self, source_info): 

134 assert isinstance(source_info, dict) 

135 

136 def determine_status(self, config, stab): 

137 assert self.level in config 

138 assert self.tag.key() in stab 

139 

140 level = config[self.level] 

141 

142 has_up_ref = len(self.ref_up) > 0 

143 has_just_up = len(self.just_up) > 0 or len(self.just_global) > 0 

144 has_just_down = len(self.just_down) > 0 or len(self.just_global) > 0 

145 has_init_errors = len(self.messages) > 0 

146 

147 # Check up references 

148 ok_up = True 

149 if level.needs_tracing_up and (not has_up_ref) and (not has_just_up): 

150 ok_up = False 

151 self.messages.append("missing up reference") 

152 

153 # Check set of down references 

154 ok_down = True 

155 if level.needs_tracing_down: 

156 has_trace = {name : False 

157 for name in config 

158 if self.level in config[name].traces} 

159 for ref in self.ref_down: 

160 has_trace[stab[ref.key()].level] = True 

161 for chain in level.breakdown_requirements: 

162 if not any(has_trace[src] for src in chain) and \ 

163 not has_just_down: 

164 ok_down = False 

165 self.messages.append("missing reference to %s" % 

166 " or ".join(sorted(chain))) 

167 

168 # Set status 

169 if self.has_error: 169 ↛ 170line 169 didn't jump to line 170 because the condition on line 169 was never true

170 self.tracing_status = Tracing_Status.MISSING 

171 elif ok_up and ok_down: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 if has_just_up or has_just_down: 

173 self.tracing_status = Tracing_Status.JUSTIFIED 

174 else: 

175 self.tracing_status = Tracing_Status.OK 

176 elif (ok_up or ok_down) and \ 

177 level.needs_tracing_up and \ 

178 level.needs_tracing_down: 

179 self.tracing_status = Tracing_Status.PARTIAL 

180 else: 

181 self.tracing_status = Tracing_Status.MISSING 

182 

183 # Overwrite status if there are initial errors 

184 if self.tracing_status == Tracing_Status.OK and has_init_errors: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 self.tracing_status = Tracing_Status.PARTIAL 

186 

187 def additional_data_from_json(self, level, data, schema_version): 

188 assert isinstance(level, str) 

189 assert isinstance(data, dict) 

190 assert schema_version >= 3 

191 

192 self.set_level(level) 

193 for ref in data.get("refs", []): 

194 self.add_tracing_target(Tracing_Tag.from_json(ref)) 

195 self.ref_up = [Tracing_Tag.from_json(ref) 

196 for ref in data.get("ref_up", [])] 

197 self.ref_down = [Tracing_Tag.from_json(ref) 

198 for ref in data.get("ref_down", [])] 

199 self.messages = data.get("messages", []) 

200 self.just_up = data.get("just_up", []) 

201 self.just_down = data.get("just_down", []) 

202 self.just_global = data.get("just_global", []) 

203 if "tracing_status" in data: 

204 self.tracing_status = Tracing_Status[data["tracing_status"]] 

205 

206 def to_json(self): 

207 rv = { 

208 "tag" : self.tag.to_json(), 

209 "location" : self.location.to_json(), 

210 "name" : self.name, 

211 "messages" : self.messages, 

212 "just_up" : self.just_up, 

213 "just_down" : self.just_down, 

214 "just_global" : self.just_global, 

215 } 

216 if self.unresolved_references: 

217 rv["refs"] = [tag.to_json() 

218 for tag in self.unresolved_references] 

219 if self.ref_up or self.ref_down: 

220 rv["ref_up"] = [tag.to_json() for tag in self.ref_up] 

221 rv["ref_down"] = [tag.to_json() for tag in self.ref_down] 

222 if self.tracing_status: 

223 rv["tracing_status"] = self.tracing_status.name 

224 return rv 

225 

226 

227class Requirement(Item): 

228 def __init__( 

229 self, 

230 tag: Tracing_Tag, 

231 location: Location, 

232 framework: str, 

233 kind: str, 

234 name: str, 

235 text: Optional[str] = None, 

236 status: Optional[str] = None, 

237 ): 

238 super().__init__(tag, location) 

239 assert isinstance(framework, str) 

240 assert isinstance(kind, str) 

241 assert isinstance(name, str) 

242 assert isinstance(text, str) or text is None 

243 assert isinstance(status, str) or status is None 

244 

245 self.framework = framework 

246 self.kind = kind 

247 self.name = name 

248 self.text = text 

249 self.status = status 

250 

251 def to_json(self): 

252 rv = super().to_json() 

253 rv["framework"] = self.framework 

254 rv["kind"] = self.kind 

255 rv["text"] = self.text 

256 rv["status"] = self.status 

257 return rv 

258 

259 def perform_source_checks(self, source_info): 

260 assert isinstance(source_info, dict) 

261 

262 @classmethod 

263 def from_json(cls, level, data, schema_version): 

264 assert isinstance(level, str) 

265 assert isinstance(data, dict) 

266 assert schema_version in (3, 4) 

267 

268 item = Requirement(tag = Tracing_Tag.from_json(data["tag"]), 

269 location = Location.from_json(data["location"]), 

270 framework = data["framework"], 

271 kind = data["kind"], 

272 name = data["name"], 

273 text = data.get("text", None), 

274 status = data.get("status", None)) 

275 item.additional_data_from_json(level, data, schema_version) 

276 

277 return item 

278 

279 

280class Implementation(Item): 

281 def __init__( 

282 self, 

283 tag: Tracing_Tag, 

284 location: Location, 

285 language: str, 

286 kind: str, 

287 name: str, 

288 ): 

289 super().__init__(tag, location) 

290 assert isinstance(language, str) 

291 assert isinstance(kind, str) 

292 assert isinstance(name, str) 

293 

294 self.language = language 

295 self.kind = kind 

296 self.name = name 

297 

298 def to_json(self): 

299 rv = super().to_json() 

300 rv["language"] = self.language 

301 rv["kind"] = self.kind 

302 return rv 

303 

304 @classmethod 

305 def from_json(cls, level, data, schema_version): 

306 assert isinstance(level, str) 

307 assert isinstance(data, dict) 

308 assert schema_version == 3 

309 

310 item = Implementation(tag = Tracing_Tag.from_json(data["tag"]), 

311 location = Location.from_json(data["location"]), 

312 language = data["language"], 

313 kind = data["kind"], 

314 name = data["name"]) 

315 item.additional_data_from_json(level, data, schema_version) 

316 

317 return item 

318 

319 

320class Activity(Item): 

321 def __init__( 

322 self, 

323 tag: Tracing_Tag, 

324 location: Location, 

325 framework: str, 

326 kind: str, 

327 text: Optional[str] = None, 

328 status: Optional[str] = None, 

329 ): 

330 super().__init__(tag, location) 

331 assert isinstance(framework, str) 

332 assert isinstance(kind, str) 

333 assert isinstance(text, str) or text is None 

334 assert isinstance(status, str) or status is None 

335 

336 self.framework = framework 

337 self.kind = kind 

338 self.text = text 

339 self.status = status 

340 

341 def to_json(self): 

342 rv = super().to_json() 

343 rv["framework"] = self.framework 

344 rv["kind"] = self.kind 

345 if self.text is not None: 

346 rv["text"] = self.text 

347 rv["status"] = self.status 

348 return rv 

349 

350 @classmethod 

351 def from_json(cls, level, data, schema_version): 

352 assert isinstance(level, str) 

353 assert isinstance(data, dict) 

354 assert schema_version == 3 

355 

356 item = Activity(tag = Tracing_Tag.from_json(data["tag"]), 

357 location = Location.from_json(data["location"]), 

358 framework = data["framework"], 

359 kind = data["kind"], 

360 text = data.get("text", None), 

361 status = data.get("status", None)) 

362 item.additional_data_from_json(level, data, schema_version) 

363 

364 return item