Coverage for lobster/common/items.py: 98%
219 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-16 05:31 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-16 05:31 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from enum import Enum, auto
21from abc import ABCMeta
22from hashlib import sha1
23from typing import Optional
25from lobster.common.location import Location
28class Tracing_Tag:
29 def __init__(
30 self,
31 namespace: str,
32 tag: str,
33 version: Optional[str] = None,
34 ):
35 assert isinstance(namespace, str) and " " not in namespace
36 assert isinstance(tag, str)
37 assert version is None or isinstance(version, (str, int))
38 assert not isinstance(version, str) or version != "None"
40 self.namespace = namespace
41 self.tag = tag
42 self.version = version
43 self.hash_val = None
45 def __str__(self):
46 rv = "%s %s" % (self.namespace,
47 self.tag)
48 if self.version:
49 rv += "@%s" % str(self.version)
50 return rv
52 def key(self) -> str:
53 return self.namespace + " " + self.tag
55 def to_json(self) -> str:
56 return str(self)
58 @classmethod
59 def from_json(cls, json):
60 assert isinstance(json, str)
61 namespace, rest = json.split(" ", 1)
62 return Tracing_Tag.from_text(namespace, rest)
64 @classmethod
65 def from_text(cls, namespace, text):
66 assert isinstance(namespace, str)
67 assert isinstance(text, str)
69 if "@" in text:
70 tag, version = text.split("@", 1)
71 else:
72 tag = text
73 version = None
74 return Tracing_Tag(namespace, tag, version)
76 def hash(self):
77 if not self.hash_val:
78 hfunc = sha1()
79 hfunc.update(self.key().encode("UTF-8"))
80 self.hash_val = hfunc.hexdigest()
81 return self.hash_val
84class Tracing_Status(Enum):
85 OK = auto()
86 PARTIAL = auto()
87 MISSING = auto()
88 JUSTIFIED = auto()
89 ERROR = auto()
92class Item(metaclass=ABCMeta):
93 def __init__(self, tag: Tracing_Tag, location: Location):
94 assert isinstance(tag, Tracing_Tag)
95 assert isinstance(location, Location)
97 self.level = None
98 self.tag = tag
99 self.location = location
100 self.name = tag.tag
102 self.ref_up = []
103 self.ref_down = []
105 self.unresolved_references_cache = set()
106 self.unresolved_references = []
108 self.messages = []
109 self.just_up = []
110 self.just_down = []
111 self.just_global = []
113 self.tracing_status = None
114 self.has_error = False
116 def set_level(self, level):
117 assert isinstance(level, str)
118 self.level = level
120 def error(self, message: str):
121 assert isinstance(message, str)
122 self.messages.append(message)
123 self.has_error = True
125 def add_tracing_target(self, target: Tracing_Tag):
126 assert isinstance(target, Tracing_Tag)
127 if target.key() in self.unresolved_references_cache: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 return
130 self.unresolved_references.append(target)
131 self.unresolved_references_cache.add(target.key())
133 def perform_source_checks(self, source_info):
134 assert isinstance(source_info, dict)
136 def determine_status(self, config, stab):
137 assert self.level in config
138 assert self.tag.key() in stab
140 level = config[self.level]
142 has_up_ref = len(self.ref_up) > 0
143 has_just_up = len(self.just_up) > 0 or len(self.just_global) > 0
144 has_just_down = len(self.just_down) > 0 or len(self.just_global) > 0
145 has_init_errors = len(self.messages) > 0
147 # Check up references
148 ok_up = True
149 if level.needs_tracing_up and (not has_up_ref) and (not has_just_up):
150 ok_up = False
151 self.messages.append("missing up reference")
153 # Check set of down references
154 ok_down = True
155 if level.needs_tracing_down:
156 has_trace = {name : False
157 for name in config
158 if self.level in config[name].traces}
159 for ref in self.ref_down:
160 has_trace[stab[ref.key()].level] = True
161 for chain in level.breakdown_requirements:
162 if not any(has_trace[src] for src in chain) and \
163 not has_just_down:
164 ok_down = False
165 self.messages.append("missing reference to %s" %
166 " or ".join(sorted(chain)))
168 # Set status
169 if self.has_error:
170 self.tracing_status = Tracing_Status.MISSING
171 elif ok_up and ok_down:
172 if has_just_up or has_just_down:
173 self.tracing_status = Tracing_Status.JUSTIFIED
174 else:
175 self.tracing_status = Tracing_Status.OK
176 elif (ok_up or ok_down) and \
177 level.needs_tracing_up and \
178 level.needs_tracing_down:
179 self.tracing_status = Tracing_Status.PARTIAL
180 else:
181 self.tracing_status = Tracing_Status.MISSING
183 # Overwrite status if there are initial errors
184 if self.tracing_status == Tracing_Status.OK and has_init_errors:
185 self.tracing_status = Tracing_Status.PARTIAL
187 def additional_data_from_json(self, level, data, schema_version):
188 assert isinstance(level, str)
189 assert isinstance(data, dict)
190 assert schema_version >= 3
192 self.set_level(level)
193 for ref in data.get("refs", []):
194 self.add_tracing_target(Tracing_Tag.from_json(ref))
195 self.ref_up = [Tracing_Tag.from_json(ref)
196 for ref in data.get("ref_up", [])]
197 self.ref_down = [Tracing_Tag.from_json(ref)
198 for ref in data.get("ref_down", [])]
199 self.messages = data.get("messages", [])
200 self.just_up = data.get("just_up", [])
201 self.just_down = data.get("just_down", [])
202 self.just_global = data.get("just_global", [])
203 if "tracing_status" in data:
204 self.tracing_status = Tracing_Status[data["tracing_status"]]
206 def to_json(self):
207 rv = {
208 "tag" : self.tag.to_json(),
209 "location" : self.location.to_json(),
210 "name" : self.name,
211 "messages" : self.messages,
212 "just_up" : self.just_up,
213 "just_down" : self.just_down,
214 "just_global" : self.just_global,
215 }
216 if self.unresolved_references:
217 rv["refs"] = [tag.to_json()
218 for tag in self.unresolved_references]
219 if self.ref_up or self.ref_down:
220 rv["ref_up"] = [tag.to_json() for tag in self.ref_up]
221 rv["ref_down"] = [tag.to_json() for tag in self.ref_down]
222 if self.tracing_status:
223 rv["tracing_status"] = self.tracing_status.name
224 return rv
227class Requirement(Item):
228 def __init__(
229 self,
230 tag: Tracing_Tag,
231 location: Location,
232 framework: str,
233 kind: str,
234 name: str,
235 text: Optional[str] = None,
236 status: Optional[str] = None,
237 ):
238 super().__init__(tag, location)
239 assert isinstance(framework, str)
240 assert isinstance(kind, str)
241 assert isinstance(name, str)
242 assert isinstance(text, str) or text is None
243 assert isinstance(status, str) or status is None
245 self.framework = framework
246 self.kind = kind
247 self.name = name
248 self.text = text
249 self.status = status
251 def to_json(self):
252 rv = super().to_json()
253 rv["framework"] = self.framework
254 rv["kind"] = self.kind
255 rv["text"] = self.text
256 rv["status"] = self.status
257 return rv
259 def perform_source_checks(self, source_info):
260 assert isinstance(source_info, dict)
262 @classmethod
263 def from_json(cls, level, data, schema_version):
264 assert isinstance(level, str)
265 assert isinstance(data, dict)
266 assert schema_version in (3, 4)
268 item = Requirement(tag = Tracing_Tag.from_json(data["tag"]),
269 location = Location.from_json(data["location"]),
270 framework = data["framework"],
271 kind = data["kind"],
272 name = data["name"],
273 text = data.get("text", None),
274 status = data.get("status", None))
275 item.additional_data_from_json(level, data, schema_version)
277 return item
280class Implementation(Item):
281 def __init__(
282 self,
283 tag: Tracing_Tag,
284 location: Location,
285 language: str,
286 kind: str,
287 name: str,
288 ):
289 super().__init__(tag, location)
290 assert isinstance(language, str)
291 assert isinstance(kind, str)
292 assert isinstance(name, str)
294 self.language = language
295 self.kind = kind
296 self.name = name
298 def to_json(self):
299 rv = super().to_json()
300 rv["language"] = self.language
301 rv["kind"] = self.kind
302 return rv
304 @classmethod
305 def from_json(cls, level, data, schema_version):
306 assert isinstance(level, str)
307 assert isinstance(data, dict)
308 assert schema_version == 3
310 item = Implementation(tag = Tracing_Tag.from_json(data["tag"]),
311 location = Location.from_json(data["location"]),
312 language = data["language"],
313 kind = data["kind"],
314 name = data["name"])
315 item.additional_data_from_json(level, data, schema_version)
317 return item
320class Activity(Item):
321 def __init__(
322 self,
323 tag: Tracing_Tag,
324 location: Location,
325 framework: str,
326 kind: str,
327 text: Optional[str] = None,
328 status: Optional[str] = None,
329 ):
330 super().__init__(tag, location)
331 assert isinstance(framework, str)
332 assert isinstance(kind, str)
333 assert isinstance(text, str) or text is None
334 assert isinstance(status, str) or status is None
336 self.framework = framework
337 self.kind = kind
338 self.text = text
339 self.status = status
341 def to_json(self):
342 rv = super().to_json()
343 rv["framework"] = self.framework
344 rv["kind"] = self.kind
345 if self.text is not None: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 rv["text"] = self.text
347 rv["status"] = self.status
348 return rv
350 @classmethod
351 def from_json(cls, level, data, schema_version):
352 assert isinstance(level, str)
353 assert isinstance(data, dict)
354 assert schema_version == 3
356 item = Activity(tag = Tracing_Tag.from_json(data["tag"]),
357 location = Location.from_json(data["location"]),
358 framework = data["framework"],
359 kind = data["kind"],
360 text = data.get("text", None),
361 status = data.get("status", None))
362 item.additional_data_from_json(level, data, schema_version)
364 return item