Coverage for lobster/items.py: 94%
218 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from enum import Enum, auto
21from abc import ABCMeta
22from hashlib import sha1
24from lobster.location import Location
27class Tracing_Tag:
28 def __init__(self, namespace, tag, version=None):
29 assert isinstance(namespace, str) and " " not in namespace
30 assert isinstance(tag, str)
31 assert version is None or isinstance(version, (str, int))
32 assert not isinstance(version, str) or version != "None"
34 self.namespace = namespace
35 self.tag = tag
36 self.version = version
37 self.hash_val = None
39 def __str__(self):
40 rv = "%s %s" % (self.namespace,
41 self.tag)
42 if self.version:
43 rv += "@%s" % str(self.version)
44 return rv
46 def key(self) -> str:
47 return self.namespace + " " + self.tag
49 def to_json(self) -> str:
50 return str(self)
52 @classmethod
53 def from_json(cls, json):
54 assert isinstance(json, str)
55 namespace, rest = json.split(" ", 1)
56 return Tracing_Tag.from_text(namespace, rest)
58 @classmethod
59 def from_text(cls, namespace, text):
60 assert isinstance(namespace, str)
61 assert isinstance(text, str)
63 if "@" in text:
64 tag, version = text.split("@", 1)
65 else:
66 tag = text
67 version = None
68 return Tracing_Tag(namespace, tag, version)
70 def hash(self):
71 if not self.hash_val:
72 hfunc = sha1()
73 hfunc.update(self.key().encode("UTF-8"))
74 self.hash_val = hfunc.hexdigest()
75 return self.hash_val
78class Tracing_Status(Enum):
79 OK = auto()
80 PARTIAL = auto()
81 MISSING = auto()
82 JUSTIFIED = auto()
83 ERROR = auto()
86class Item(metaclass=ABCMeta):
87 def __init__(self, tag, location):
88 assert isinstance(tag, Tracing_Tag)
89 assert isinstance(location, Location)
91 self.level = None
92 self.tag = tag
93 self.location = location
94 self.name = tag.tag
96 self.ref_up = []
97 self.ref_down = []
99 self.unresolved_references_cache = set()
100 self.unresolved_references = []
102 self.messages = []
103 self.just_up = []
104 self.just_down = []
105 self.just_global = []
107 self.tracing_status = None
108 self.has_error = False
110 def set_level(self, level):
111 assert isinstance(level, str)
112 self.level = level
114 def error(self, message):
115 assert isinstance(message, str)
116 self.messages.append(message)
117 self.has_error = True
119 def add_tracing_target(self, target):
120 assert isinstance(target, Tracing_Tag)
121 if target.key() in self.unresolved_references_cache: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 return
124 self.unresolved_references.append(target)
125 self.unresolved_references_cache.add(target.key())
127 def perform_source_checks(self, source_info):
128 assert isinstance(source_info, dict)
130 def determine_status(self, config, stab):
131 assert self.level in config
132 assert self.tag.key() in stab
134 level = config[self.level]
136 has_up_ref = len(self.ref_up) > 0
137 has_just_up = len(self.just_up) > 0 or len(self.just_global) > 0
138 has_just_down = len(self.just_down) > 0 or len(self.just_global) > 0
139 has_init_errors = len(self.messages) > 0
141 # Check up references
142 ok_up = True
143 if level["needs_tracing_up"]:
144 if not has_up_ref and not has_just_up:
145 ok_up = False
146 self.messages.append("missing up reference")
148 # Check set of down references
149 ok_down = True
150 if level["needs_tracing_down"]:
151 has_trace = {name : False
152 for name in config
153 if self.level in config[name]["traces"]}
154 for ref in self.ref_down:
155 has_trace[stab[ref.key()].level] = True
156 for chain in level["breakdown_requirements"]:
157 if not any(has_trace[src] for src in chain) and \
158 not has_just_down:
159 ok_down = False
160 self.messages.append("missing reference to %s" %
161 " or ".join(sorted(chain)))
163 # Set status
164 if self.has_error: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 self.tracing_status = Tracing_Status.MISSING
166 elif ok_up and ok_down:
167 if has_just_up or has_just_down:
168 self.tracing_status = Tracing_Status.JUSTIFIED
169 else:
170 self.tracing_status = Tracing_Status.OK
171 elif (ok_up or ok_down) and \ 171 ↛ 174line 171 didn't jump to line 174 because the condition on line 171 was never true
172 level["needs_tracing_up"] and \
173 level["needs_tracing_down"]:
174 self.tracing_status = Tracing_Status.PARTIAL
175 else:
176 self.tracing_status = Tracing_Status.MISSING
178 # Overwrite status if there are initial errors
179 if self.tracing_status == Tracing_Status.OK and has_init_errors: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 self.tracing_status = Tracing_Status.PARTIAL
182 def additional_data_from_json(self, level, data, schema_version):
183 assert isinstance(level, str)
184 assert isinstance(data, dict)
185 assert schema_version >= 3
187 self.set_level(level)
188 for ref in data.get("refs", []):
189 self.add_tracing_target(Tracing_Tag.from_json(ref))
190 self.ref_up = [Tracing_Tag.from_json(ref)
191 for ref in data.get("ref_up", [])]
192 self.ref_down = [Tracing_Tag.from_json(ref)
193 for ref in data.get("ref_down", [])]
194 self.messages = data.get("messages", [])
195 self.just_up = data.get("just_up", [])
196 self.just_down = data.get("just_down", [])
197 self.just_global = data.get("just_global", [])
198 if "tracing_status" in data:
199 self.tracing_status = Tracing_Status[data["tracing_status"]]
201 def to_json(self):
202 rv = {
203 "tag" : self.tag.to_json(),
204 "location" : self.location.to_json(),
205 "name" : self.name,
206 "messages" : self.messages,
207 "just_up" : self.just_up,
208 "just_down" : self.just_down,
209 "just_global" : self.just_global,
210 }
211 if self.unresolved_references:
212 rv["refs"] = [tag.to_json()
213 for tag in self.unresolved_references]
214 if self.ref_up or self.ref_down:
215 rv["ref_up"] = [tag.to_json() for tag in self.ref_up]
216 rv["ref_down"] = [tag.to_json() for tag in self.ref_down]
217 if self.tracing_status:
218 rv["tracing_status"] = self.tracing_status.name
219 return rv
222class Requirement(Item):
223 def __init__(self, tag, location, framework, kind, name,
224 text=None, status=None):
225 super().__init__(tag, location)
226 assert isinstance(framework, str)
227 assert isinstance(kind, str)
228 assert isinstance(name, str)
229 assert isinstance(text, str) or text is None
230 assert isinstance(status, str) or status is None
232 self.framework = framework
233 self.kind = kind
234 self.name = name
235 self.text = text
236 self.status = status
238 def to_json(self):
239 rv = super().to_json()
240 rv["framework"] = self.framework
241 rv["kind"] = self.kind
242 rv["text"] = self.text
243 rv["status"] = self.status
244 return rv
246 def perform_source_checks(self, source_info):
247 assert isinstance(source_info, dict)
248 if source_info.get("valid_status"): 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 if self.status not in source_info["valid_status"]:
250 self.error("status is %s, expected %s" %
251 (self.status,
252 " or ".join(sorted(source_info["valid_status"]))))
254 @classmethod
255 def from_json(cls, level, data, schema_version):
256 assert isinstance(level, str)
257 assert isinstance(data, dict)
258 assert schema_version in (3, 4)
260 item = Requirement(tag = Tracing_Tag.from_json(data["tag"]),
261 location = Location.from_json(data["location"]),
262 framework = data["framework"],
263 kind = data["kind"],
264 name = data["name"],
265 text = data.get("text", None),
266 status = data.get("status", None))
267 item.additional_data_from_json(level, data, schema_version)
269 return item
272class Implementation(Item):
273 def __init__(self, tag, location, language, kind, name):
274 super().__init__(tag, location)
275 assert isinstance(language, str)
276 assert isinstance(kind, str)
277 assert isinstance(name, str)
279 self.language = language
280 self.kind = kind
281 self.name = name
283 def to_json(self):
284 rv = super().to_json()
285 rv["language"] = self.language
286 rv["kind"] = self.kind
287 return rv
289 @classmethod
290 def from_json(cls, level, data, schema_version):
291 assert isinstance(level, str)
292 assert isinstance(data, dict)
293 assert schema_version == 3
295 item = Implementation(tag = Tracing_Tag.from_json(data["tag"]),
296 location = Location.from_json(data["location"]),
297 language = data["language"],
298 kind = data["kind"],
299 name = data["name"])
300 item.additional_data_from_json(level, data, schema_version)
302 return item
305class Activity(Item):
306 def __init__(self, tag, location, framework, kind, status=None):
307 super().__init__(tag, location)
308 assert isinstance(framework, str)
309 assert isinstance(kind, str)
310 assert isinstance(status, str) or status is None
312 self.framework = framework
313 self.kind = kind
314 self.status = status
316 def to_json(self):
317 rv = super().to_json()
318 rv["framework"] = self.framework
319 rv["kind"] = self.kind
320 rv["status"] = self.status
321 return rv
323 @classmethod
324 def from_json(cls, level, data, schema_version):
325 assert isinstance(level, str)
326 assert isinstance(data, dict)
327 assert schema_version == 3
329 item = Activity(tag = Tracing_Tag.from_json(data["tag"]),
330 location = Location.from_json(data["location"]),
331 framework = data["framework"],
332 kind = data["kind"],
333 status = data.get("status", None))
334 item.additional_data_from_json(level, data, schema_version)
336 return item