Coverage for lobster/common/items.py: 98%
220 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from enum import Enum, auto
21from abc import ABCMeta
22from hashlib import sha1
23from typing import Optional
25from lobster.common.location import Location
28class Tracing_Tag:
29 def __init__(
30 self,
31 namespace: str,
32 tag: str,
33 version: Optional[str] = None,
34 ):
35 assert isinstance(namespace, str) and " " not in namespace
36 assert isinstance(tag, str)
37 assert version is None or isinstance(version, (str, int))
38 assert not isinstance(version, str) or version != "None"
40 self.namespace = namespace
41 self.tag = tag
42 self.version = version
43 self.hash_val = None
45 def __str__(self):
46 rv = f"{self.namespace} {self.tag}"
47 if self.version:
48 rv += f"@{self.version}"
49 return rv
51 def key(self) -> str:
52 return self.namespace + " " + self.tag
54 def to_json(self) -> str:
55 return str(self)
57 @classmethod
58 def from_json(cls, json):
59 assert isinstance(json, str)
60 namespace, rest = json.split(" ", 1)
61 return Tracing_Tag.from_text(namespace, rest)
63 @classmethod
64 def from_text(cls, namespace, text):
65 assert isinstance(namespace, str)
66 assert isinstance(text, str)
68 if "@" in text:
69 tag, version = text.split("@", 1)
70 else:
71 tag = text
72 version = None
73 return Tracing_Tag(namespace, tag, version)
75 def hash(self):
76 if not self.hash_val:
77 hfunc = sha1()
78 hfunc.update(self.key().encode("UTF-8"))
79 self.hash_val = hfunc.hexdigest()
80 return self.hash_val
83class Tracing_Status(Enum):
84 OK = auto()
85 PARTIAL = auto()
86 MISSING = auto()
87 JUSTIFIED = auto()
88 ERROR = auto()
91class Item(metaclass=ABCMeta):
92 def __init__(self, tag: Tracing_Tag, location: Location):
93 assert isinstance(tag, Tracing_Tag)
94 assert isinstance(location, Location)
96 self.level = None
97 self.tag = tag
98 self.location = location
99 self.name = tag.tag
101 self.ref_up = []
102 self.ref_down = []
104 self.unresolved_references_cache = set()
105 self.unresolved_references = []
107 self.messages = []
108 self.just_up = []
109 self.just_down = []
110 self.just_global = []
112 self.tracing_status = None
113 self.has_error = False
115 def set_level(self, level):
116 assert isinstance(level, str)
117 self.level = level
119 def error(self, message: str):
120 assert isinstance(message, str)
121 self.messages.append(message)
122 self.has_error = True
124 def add_tracing_target(self, target: Tracing_Tag):
125 assert isinstance(target, Tracing_Tag)
126 if target.key() in self.unresolved_references_cache: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 return
129 self.unresolved_references.append(target)
130 self.unresolved_references_cache.add(target.key())
132 def perform_source_checks(self, source_info):
133 assert isinstance(source_info, dict)
135 def determine_status(self, config, stab):
136 assert self.level in config
137 assert self.tag.key() in stab
139 level = config[self.level]
141 has_up_ref = len(self.ref_up) > 0
142 has_just_up = len(self.just_up) > 0 or len(self.just_global) > 0
143 has_just_down = len(self.just_down) > 0 or len(self.just_global) > 0
144 has_init_errors = len(self.messages) > 0
146 # Check up references
147 ok_up = True
148 if level.needs_tracing_up and (not has_up_ref) and (not has_just_up):
149 ok_up = False
150 self.messages.append("missing up reference")
152 # Check set of down references
153 ok_down = True
154 if level.needs_tracing_down:
155 has_trace = {name : False
156 for name in config
157 if self.level in config[name].traces}
158 for ref in self.ref_down:
159 has_trace[stab[ref.key()].level] = True
160 for chain in level.breakdown_requirements:
161 if not any(has_trace[src] for src in chain) and \
162 not has_just_down:
163 ok_down = False
164 chain_str = " or ".join(sorted(chain))
165 self.messages.append(f"missing reference to {chain_str}")
167 # Set status
168 if self.has_error:
169 self.tracing_status = Tracing_Status.MISSING
170 elif ok_up and ok_down:
171 if has_just_up or has_just_down:
172 self.tracing_status = Tracing_Status.JUSTIFIED
173 else:
174 self.tracing_status = Tracing_Status.OK
175 elif (ok_up or ok_down) and \
176 level.needs_tracing_up and \
177 level.needs_tracing_down:
178 self.tracing_status = Tracing_Status.PARTIAL
179 else:
180 self.tracing_status = Tracing_Status.MISSING
182 # Overwrite status if there are initial errors
183 if self.tracing_status == Tracing_Status.OK and has_init_errors:
184 self.tracing_status = Tracing_Status.PARTIAL
186 def additional_data_from_json(self, level, data, schema_version):
187 assert isinstance(level, str)
188 assert isinstance(data, dict)
189 assert schema_version >= 3
191 self.set_level(level)
192 for ref in data.get("refs", []):
193 self.add_tracing_target(Tracing_Tag.from_json(ref))
194 self.ref_up = [Tracing_Tag.from_json(ref)
195 for ref in data.get("ref_up", [])]
196 self.ref_down = [Tracing_Tag.from_json(ref)
197 for ref in data.get("ref_down", [])]
198 self.messages = data.get("messages", [])
199 self.just_up = data.get("just_up", [])
200 self.just_down = data.get("just_down", [])
201 self.just_global = data.get("just_global", [])
202 if "tracing_status" in data:
203 self.tracing_status = Tracing_Status[data["tracing_status"]]
205 def to_json(self):
206 rv = {
207 "tag" : self.tag.to_json(),
208 "location" : self.location.to_json(),
209 "name" : self.name,
210 "messages" : self.messages,
211 "just_up" : self.just_up,
212 "just_down" : self.just_down,
213 "just_global" : self.just_global,
214 }
215 if self.unresolved_references:
216 rv["refs"] = [tag.to_json()
217 for tag in self.unresolved_references]
218 if self.ref_up or self.ref_down:
219 rv["ref_up"] = [tag.to_json() for tag in self.ref_up]
220 rv["ref_down"] = [tag.to_json() for tag in self.ref_down]
221 if self.tracing_status:
222 rv["tracing_status"] = self.tracing_status.name
223 return rv
226class Requirement(Item):
227 def __init__(
228 self,
229 tag: Tracing_Tag,
230 location: Location,
231 framework: str,
232 kind: str,
233 name: str,
234 text: Optional[str] = None,
235 status: Optional[str] = None,
236 ):
237 super().__init__(tag, location)
238 assert isinstance(framework, str)
239 assert isinstance(kind, str)
240 assert isinstance(name, str)
241 assert isinstance(text, str) or text is None
242 assert isinstance(status, str) or status is None
244 self.framework = framework
245 self.kind = kind
246 self.name = name
247 self.text = text
248 self.status = status
250 def to_json(self):
251 rv = super().to_json()
252 rv["framework"] = self.framework
253 rv["kind"] = self.kind
254 rv["text"] = self.text
255 rv["status"] = self.status
256 return rv
258 def perform_source_checks(self, source_info):
259 assert isinstance(source_info, dict)
261 @classmethod
262 def from_json(cls, level, data, schema_version):
263 assert isinstance(level, str)
264 assert isinstance(data, dict)
265 assert schema_version in (3, 4)
267 item = Requirement(tag = Tracing_Tag.from_json(data["tag"]),
268 location = Location.from_json(data["location"]),
269 framework = data["framework"],
270 kind = data["kind"],
271 name = data["name"],
272 text = data.get("text", None),
273 status = data.get("status", None))
274 item.additional_data_from_json(level, data, schema_version)
276 return item
279class Implementation(Item):
280 def __init__(
281 self,
282 tag: Tracing_Tag,
283 location: Location,
284 language: str,
285 kind: str,
286 name: str,
287 ):
288 super().__init__(tag, location)
289 assert isinstance(language, str)
290 assert isinstance(kind, str)
291 assert isinstance(name, str)
293 self.language = language
294 self.kind = kind
295 self.name = name
297 def to_json(self):
298 rv = super().to_json()
299 rv["language"] = self.language
300 rv["kind"] = self.kind
301 return rv
303 @classmethod
304 def from_json(cls, level, data, schema_version):
305 assert isinstance(level, str)
306 assert isinstance(data, dict)
307 assert schema_version == 3
309 item = Implementation(tag = Tracing_Tag.from_json(data["tag"]),
310 location = Location.from_json(data["location"]),
311 language = data["language"],
312 kind = data["kind"],
313 name = data["name"])
314 item.additional_data_from_json(level, data, schema_version)
316 return item
319class Activity(Item):
320 def __init__(
321 self,
322 tag: Tracing_Tag,
323 location: Location,
324 framework: str,
325 kind: str,
326 text: Optional[str] = None,
327 status: Optional[str] = None,
328 ):
329 super().__init__(tag, location)
330 assert isinstance(framework, str)
331 assert isinstance(kind, str)
332 assert isinstance(text, str) or text is None
333 assert isinstance(status, str) or status is None
335 self.framework = framework
336 self.kind = kind
337 self.text = text
338 self.status = status
340 def to_json(self):
341 rv = super().to_json()
342 rv["framework"] = self.framework
343 rv["kind"] = self.kind
344 if self.text is not None: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 rv["text"] = self.text
346 rv["status"] = self.status
347 return rv
349 @classmethod
350 def from_json(cls, level, data, schema_version):
351 assert isinstance(level, str)
352 assert isinstance(data, dict)
353 assert schema_version == 3
355 item = Activity(tag = Tracing_Tag.from_json(data["tag"]),
356 location = Location.from_json(data["location"]),
357 framework = data["framework"],
358 kind = data["kind"],
359 text = data.get("text", None),
360 status = data.get("status", None))
361 item.additional_data_from_json(level, data, schema_version)
363 return item