Coverage for lobster/common/report.py: 51%
118 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-04 12:54 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-04 12:54 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import json
20from collections import OrderedDict
21from dataclasses import dataclass
23from lobster.common.level_definition import LevelDefinition
24from lobster.common.items import Tracing_Status, Requirement, Implementation, Activity
25from lobster.common.parser import load as load_config
26from lobster.common.errors import Message_Handler
27from lobster.common.io import lobster_read, ensure_output_directory
28from lobster.common.location import File_Reference
31@dataclass
32class Coverage:
33 level : str
34 items : int
35 ok : int
36 coverage : None
39class Report:
40 def __init__(self):
41 self.mh = Message_Handler()
42 self.config = OrderedDict()
43 self.items = {}
44 self.coverage = {}
45 self.custom_data = {}
46 self.source_root = ""
48 def parse_config(self, filename):
49 """
50 Function parses the lobster config file to generate a .lobster file.
51 Parameters
52 ----------
53 filename - configuration file
55 Returns - Nothing
56 -------
58 """
60 # Load config
61 self.config = load_config(self.mh, filename)
63 # Load requested files
64 for level in self.config:
65 for source in self.config[level].source:
66 lobster_read(self.mh, source["file"], level, self.items,
67 source)
69 # Resolve references for items
70 self.resolve_references_for_items()
72 # Compute status and items count
73 self.compute_item_count_and_status()
75 # Compute coverage for items
76 self.compute_coverage_for_items()
78 def resolve_references_for_items(self):
79 for src_item in self.items.values():
80 while src_item.unresolved_references:
81 dst_tag = src_item.unresolved_references.pop()
82 if dst_tag.key() not in self.items:
83 src_item.error("unknown tracing target %s" % dst_tag.key())
84 continue
85 dst_item = self.items[dst_tag.key()]
86 # TODO: Check if policy allows this link
87 src_item.ref_up.append(dst_tag)
88 dst_item.ref_down.append(src_item.tag)
90 # Check versions match, if specified
91 if dst_tag.version is not None:
92 if dst_item.tag.version is None:
93 src_item.error("tracing destination %s is unversioned"
94 % dst_tag.key())
95 elif dst_tag.version != dst_item.tag.version:
96 src_item.error("tracing destination %s has version %s"
97 " (expected %s)" %
98 (dst_tag.key(),
99 dst_item.tag.version,
100 dst_tag.version))
102 def compute_coverage_for_items(self):
103 for level_obj in self.coverage.values():
104 if level_obj.items == 0:
105 level_obj.coverage = 0.0
106 else:
107 level_obj.coverage = float(level_obj.ok * 100) / float(level_obj.items)
109 def compute_item_count_and_status(self):
110 for level in self.config:
111 coverage = Coverage(level=level, items=0, ok=0, coverage=None)
112 self.coverage.update({level: coverage})
113 for item in self.items.values():
114 item.determine_status(self.config, self.items)
115 self.coverage[item.level].items += 1
116 if item.tracing_status in (Tracing_Status.OK,
117 Tracing_Status.JUSTIFIED):
118 self.coverage[item.level].ok += 1
120 def write_report(self, filename):
122 levels = []
123 for level_config in self.config.values():
124 level = {
125 "name" : level_config.name,
126 "kind" : level_config.kind,
127 "items" : [item.to_json()
128 for item in self.items.values()
129 if item.level == level_config.name],
130 "coverage" : self.coverage[level_config.name].coverage
131 }
132 levels.append(level)
134 report = {
135 "schema" : "lobster-report",
136 "version" : 2,
137 "generator" : "lobster_report",
138 "levels" : levels,
139 "policy" : {key: value.to_json()
140 for key, value in self.config.items()},
141 "matrix" : [],
142 }
144 ensure_output_directory(filename)
145 with open(filename, "w", encoding="UTF-8") as fd:
146 json.dump(report, fd, indent=2)
147 fd.write("\n")
149 def load_report(self, filename):
151 loc = File_Reference(filename)
153 # Read and validate JSON
154 with open(filename, "r", encoding="UTF-8") as fd:
155 try:
156 data = json.load(fd)
157 except json.decoder.JSONDecodeError as err:
158 self.mh.error(File_Reference(filename,
159 err.lineno,
160 err.colno),
161 err.msg)
163 # Validate basic structure
164 self.validate_basic_structure_of_lobster_file(data, loc)
166 # Validate indicated schema
167 self.validate_indicated_schema(data, loc)
169 # Validate and parse custom data
170 self.parse_custom_data(data)
172 # Read in data
173 self.compute_items_and_coverage_for_items(data)
175 def compute_items_and_coverage_for_items(self, data):
176 """
177 Function calculates items and coverage for the items
178 Parameters
179 ----------
180 data - contents of lobster json file.
182 Returns - Nothing
183 -------
185 """
186 self.config = {key: LevelDefinition.from_json(value)
187 for key, value in data["policy"].items()}
188 for level in data["levels"]:
189 if level["name"] not in self.config: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 raise KeyError(f"level '{level['name']}' not found in config")
191 coverage = Coverage(
192 level=level["name"], items=0, ok=0, coverage=level["coverage"]
193 )
194 self.coverage.update({level["name"]: coverage})
196 for item_data in level["items"]:
197 if level["kind"] == "requirements": 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 item = Requirement.from_json(level["name"],
199 item_data,
200 3)
201 elif level["kind"] == "implementation": 201 ↛ 206line 201 didn't jump to line 206 because the condition on line 201 was always true
202 item = Implementation.from_json(level["name"],
203 item_data,
204 3)
205 else:
206 if level["kind"] != "activity":
207 raise ValueError(f"unknown level kind '{level['kind']}'")
208 item = Activity.from_json(level["name"],
209 item_data,
210 3)
212 self.items[item.tag.key()] = item
213 self.coverage[item.level].items += 1
214 if item.tracing_status in (Tracing_Status.OK,
215 Tracing_Status.JUSTIFIED):
216 self.coverage[item.level].ok += 1
218 def parse_custom_data(self, data):
219 self.custom_data = data.get('custom_data', None)
221 def validate_indicated_schema(self, data, loc):
222 """
223 Function validates the schema and version.
224 Parameters
225 ----------
226 data - contents of lobster json file.
227 loc - location from where the error was raised.
229 Returns - Nothing
230 -------
232 """
233 supported_schema = {
234 "lobster-report": set([2]),
235 }
236 if data["schema"] not in supported_schema: 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 self.mh.error(loc, "unknown schema kind %s" % data["schema"])
238 if data["version"] not in supported_schema[data["schema"]]: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 self.mh.error(loc,
240 "version %u for schema %s is not supported" %
241 (data["version"], data["schema"]))
243 def validate_basic_structure_of_lobster_file(self, data, loc):
244 """
245 Function validates the basic structure of lobster file. All the first level
246 keys of the lobster json file are validated here.
247 Parameters
248 ----------
249 data - contents of lobster json file.
250 loc - location from where the error was raised.
252 Returns - Nothing
253 -------
255 """
256 if not isinstance(data, dict): 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 self.mh.error(loc, "parsed json is not an object")
259 rkey_dict = {"schema": str, "version": int, "generator": str, "levels": list,
260 "policy": dict, "matrix": list}
261 type_dict = {int: "an integer", str: "a string", list: "an array",
262 dict: "an object"}
263 for rkey, rvalue in rkey_dict.items():
264 if rkey not in data: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 self.mh.error(loc, "required top-levelkey %s not present" % rkey)
266 if not isinstance(data[rkey], rvalue): 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 self.mh.error(loc, "%s is not %s." % (rkey, type_dict[rvalue]))