Coverage for lobster/items.py: 95%
216 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from enum import Enum, auto
21from abc import ABCMeta
22from hashlib import sha1
23from typing import Optional
25from lobster.location import Location
28class Tracing_Tag:
29 def __init__(
30 self,
31 namespace: str,
32 tag: str,
33 version: Optional[str] = None,
34 ):
35 assert isinstance(namespace, str) and " " not in namespace
36 assert isinstance(tag, str)
37 assert version is None or isinstance(version, (str, int))
38 assert not isinstance(version, str) or version != "None"
40 self.namespace = namespace
41 self.tag = tag
42 self.version = version
43 self.hash_val = None
45 def __str__(self):
46 rv = "%s %s" % (self.namespace,
47 self.tag)
48 if self.version:
49 rv += "@%s" % str(self.version)
50 return rv
52 def key(self) -> str:
53 return self.namespace + " " + self.tag
55 def to_json(self) -> str:
56 return str(self)
58 @classmethod
59 def from_json(cls, json):
60 assert isinstance(json, str)
61 namespace, rest = json.split(" ", 1)
62 return Tracing_Tag.from_text(namespace, rest)
64 @classmethod
65 def from_text(cls, namespace, text):
66 assert isinstance(namespace, str)
67 assert isinstance(text, str)
69 if "@" in text: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 tag, version = text.split("@", 1)
71 else:
72 tag = text
73 version = None
74 return Tracing_Tag(namespace, tag, version)
76 def hash(self):
77 if not self.hash_val:
78 hfunc = sha1()
79 hfunc.update(self.key().encode("UTF-8"))
80 self.hash_val = hfunc.hexdigest()
81 return self.hash_val
84class Tracing_Status(Enum):
85 OK = auto()
86 PARTIAL = auto()
87 MISSING = auto()
88 JUSTIFIED = auto()
89 ERROR = auto()
92class Item(metaclass=ABCMeta):
93 def __init__(self, tag: Tracing_Tag, location: Location):
94 assert isinstance(tag, Tracing_Tag)
95 assert isinstance(location, Location)
97 self.level = None
98 self.tag = tag
99 self.location = location
100 self.name = tag.tag
102 self.ref_up = []
103 self.ref_down = []
105 self.unresolved_references_cache = set()
106 self.unresolved_references = []
108 self.messages = []
109 self.just_up = []
110 self.just_down = []
111 self.just_global = []
113 self.tracing_status = None
114 self.has_error = False
116 def set_level(self, level):
117 assert isinstance(level, str)
118 self.level = level
120 def error(self, message):
121 assert isinstance(message, str)
122 self.messages.append(message)
123 self.has_error = True
125 def add_tracing_target(self, target):
126 assert isinstance(target, Tracing_Tag)
127 if target.key() in self.unresolved_references_cache: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 return
130 self.unresolved_references.append(target)
131 self.unresolved_references_cache.add(target.key())
133 def perform_source_checks(self, source_info):
134 assert isinstance(source_info, dict)
136 def determine_status(self, config, stab):
137 assert self.level in config
138 assert self.tag.key() in stab
140 level = config[self.level]
142 has_up_ref = len(self.ref_up) > 0
143 has_just_up = len(self.just_up) > 0 or len(self.just_global) > 0
144 has_just_down = len(self.just_down) > 0 or len(self.just_global) > 0
145 has_init_errors = len(self.messages) > 0
147 # Check up references
148 ok_up = True
149 if level["needs_tracing_up"]:
150 if not has_up_ref and not has_just_up:
151 ok_up = False
152 self.messages.append("missing up reference")
154 # Check set of down references
155 ok_down = True
156 if level["needs_tracing_down"]:
157 has_trace = {name : False
158 for name in config
159 if self.level in config[name]["traces"]}
160 for ref in self.ref_down:
161 has_trace[stab[ref.key()].level] = True
162 for chain in level["breakdown_requirements"]:
163 if not any(has_trace[src] for src in chain) and \
164 not has_just_down:
165 ok_down = False
166 self.messages.append("missing reference to %s" %
167 " or ".join(sorted(chain)))
169 # Set status
170 if self.has_error: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 self.tracing_status = Tracing_Status.MISSING
172 elif ok_up and ok_down:
173 if has_just_up or has_just_down:
174 self.tracing_status = Tracing_Status.JUSTIFIED
175 else:
176 self.tracing_status = Tracing_Status.OK
177 elif (ok_up or ok_down) and \ 177 ↛ 180line 177 didn't jump to line 180 because the condition on line 177 was never true
178 level["needs_tracing_up"] and \
179 level["needs_tracing_down"]:
180 self.tracing_status = Tracing_Status.PARTIAL
181 else:
182 self.tracing_status = Tracing_Status.MISSING
184 # Overwrite status if there are initial errors
185 if self.tracing_status == Tracing_Status.OK and has_init_errors: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 self.tracing_status = Tracing_Status.PARTIAL
188 def additional_data_from_json(self, level, data, schema_version):
189 assert isinstance(level, str)
190 assert isinstance(data, dict)
191 assert schema_version >= 3
193 self.set_level(level)
194 for ref in data.get("refs", []):
195 self.add_tracing_target(Tracing_Tag.from_json(ref))
196 self.ref_up = [Tracing_Tag.from_json(ref)
197 for ref in data.get("ref_up", [])]
198 self.ref_down = [Tracing_Tag.from_json(ref)
199 for ref in data.get("ref_down", [])]
200 self.messages = data.get("messages", [])
201 self.just_up = data.get("just_up", [])
202 self.just_down = data.get("just_down", [])
203 self.just_global = data.get("just_global", [])
204 if "tracing_status" in data:
205 self.tracing_status = Tracing_Status[data["tracing_status"]]
207 def to_json(self):
208 rv = {
209 "tag" : self.tag.to_json(),
210 "location" : self.location.to_json(),
211 "name" : self.name,
212 "messages" : self.messages,
213 "just_up" : self.just_up,
214 "just_down" : self.just_down,
215 "just_global" : self.just_global,
216 }
217 if self.unresolved_references:
218 rv["refs"] = [tag.to_json()
219 for tag in self.unresolved_references]
220 if self.ref_up or self.ref_down:
221 rv["ref_up"] = [tag.to_json() for tag in self.ref_up]
222 rv["ref_down"] = [tag.to_json() for tag in self.ref_down]
223 if self.tracing_status:
224 rv["tracing_status"] = self.tracing_status.name
225 return rv
228class Requirement(Item):
229 def __init__(
230 self,
231 tag: Tracing_Tag,
232 location: Location,
233 framework: str,
234 kind: str,
235 name: str,
236 text: Optional[str] = None,
237 status: Optional[str] = None,
238 ):
239 super().__init__(tag, location)
240 assert isinstance(framework, str)
241 assert isinstance(kind, str)
242 assert isinstance(name, str)
243 assert isinstance(text, str) or text is None
244 assert isinstance(status, str) or status is None
246 self.framework = framework
247 self.kind = kind
248 self.name = name
249 self.text = text
250 self.status = status
252 def to_json(self):
253 rv = super().to_json()
254 rv["framework"] = self.framework
255 rv["kind"] = self.kind
256 rv["text"] = self.text
257 rv["status"] = self.status
258 return rv
260 def perform_source_checks(self, source_info):
261 assert isinstance(source_info, dict)
263 @classmethod
264 def from_json(cls, level, data, schema_version):
265 assert isinstance(level, str)
266 assert isinstance(data, dict)
267 assert schema_version in (3, 4)
269 item = Requirement(tag = Tracing_Tag.from_json(data["tag"]),
270 location = Location.from_json(data["location"]),
271 framework = data["framework"],
272 kind = data["kind"],
273 name = data["name"],
274 text = data.get("text", None),
275 status = data.get("status", None))
276 item.additional_data_from_json(level, data, schema_version)
278 return item
281class Implementation(Item):
282 def __init__(
283 self,
284 tag: Tracing_Tag,
285 location: Location,
286 language: str,
287 kind: str,
288 name: str,
289 ):
290 super().__init__(tag, location)
291 assert isinstance(language, str)
292 assert isinstance(kind, str)
293 assert isinstance(name, str)
295 self.language = language
296 self.kind = kind
297 self.name = name
299 def to_json(self):
300 rv = super().to_json()
301 rv["language"] = self.language
302 rv["kind"] = self.kind
303 return rv
305 @classmethod
306 def from_json(cls, level, data, schema_version):
307 assert isinstance(level, str)
308 assert isinstance(data, dict)
309 assert schema_version == 3
311 item = Implementation(tag = Tracing_Tag.from_json(data["tag"]),
312 location = Location.from_json(data["location"]),
313 language = data["language"],
314 kind = data["kind"],
315 name = data["name"])
316 item.additional_data_from_json(level, data, schema_version)
318 return item
321class Activity(Item):
322 def __init__(
323 self,
324 tag: Tracing_Tag,
325 location: Location,
326 framework: str,
327 kind: str,
328 status: Optional[str] = None,
329 ):
330 super().__init__(tag, location)
331 assert isinstance(framework, str)
332 assert isinstance(kind, str)
333 assert isinstance(status, str) or status is None
335 self.framework = framework
336 self.kind = kind
337 self.status = status
339 def to_json(self):
340 rv = super().to_json()
341 rv["framework"] = self.framework
342 rv["kind"] = self.kind
343 rv["status"] = self.status
344 return rv
346 @classmethod
347 def from_json(cls, level, data, schema_version):
348 assert isinstance(level, str)
349 assert isinstance(data, dict)
350 assert schema_version == 3
352 item = Activity(tag = Tracing_Tag.from_json(data["tag"]),
353 location = Location.from_json(data["location"]),
354 framework = data["framework"],
355 kind = data["kind"],
356 status = data.get("status", None))
357 item.additional_data_from_json(level, data, schema_version)
359 return item