Coverage for lobster/report.py: 54%
126 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023-2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os.path
21import json
22from collections import OrderedDict
23from dataclasses import dataclass
25from lobster.items import Tracing_Status, Requirement, Implementation, Activity
26from lobster.config.parser import load as load_config
27from lobster.errors import Message_Handler
28from lobster.io import lobster_read
29from lobster.location import File_Reference
32@dataclass
33class Coverage:
34 level : str
35 items : int
36 ok : int
37 coverage : None
40class Report:
41 def __init__(self):
42 self.mh = Message_Handler()
43 self.config = OrderedDict()
44 self.items = {}
45 self.coverage = {}
46 self.custom_data = {}
48 def parse_config(self, filename):
49 """
50 Function parses the lobster config file to generate a .lobster file.
51 Parameters
52 ----------
53 filename - configuration file
55 Returns - Nothing
56 -------
58 """
59 assert isinstance(filename, str)
60 assert os.path.isfile(filename)
62 # Load config
63 self.config = load_config(self.mh, filename)
65 # Load requested files
66 for level in self.config:
67 for source in self.config[level]["source"]:
68 lobster_read(self.mh, source["file"], level, self.items,
69 source)
71 # Resolve references for items
72 self.resolve_references_for_items()
74 # Compute status and items count
75 self.compute_item_count_and_status()
77 # Compute coverage for items
78 self.compute_coverage_for_items()
80 def resolve_references_for_items(self):
81 for src_item in self.items.values():
82 while src_item.unresolved_references:
83 dst_tag = src_item.unresolved_references.pop()
84 if dst_tag.key() not in self.items:
85 src_item.error("unknown tracing target %s" % dst_tag.key())
86 continue
87 dst_item = self.items[dst_tag.key()]
88 # TODO: Check if policy allows this link
89 src_item.ref_up.append(dst_tag)
90 dst_item.ref_down.append(src_item.tag)
92 # Check versions match, if specified
93 if dst_tag.version is not None:
94 if dst_item.tag.version is None:
95 src_item.error("tracing destination %s is unversioned"
96 % dst_tag.key())
97 elif dst_tag.version != dst_item.tag.version:
98 src_item.error("tracing destination %s has version %s"
99 " (expected %s)" %
100 (dst_tag.key(),
101 dst_item.tag.version,
102 dst_tag.version))
104 def compute_coverage_for_items(self):
105 for level_obj in self.coverage.values():
106 if level_obj.items == 0:
107 level_obj.coverage = 0.0
108 else:
109 level_obj.coverage = float(level_obj.ok * 100) / float(level_obj.items)
111 def compute_item_count_and_status(self):
112 for level in self.config:
113 coverage = Coverage(level=level, items=0, ok=0, coverage=None)
114 self.coverage.update({level: coverage})
115 for item in self.items.values():
116 item.determine_status(self.config, self.items)
117 self.coverage[item.level].items += 1
118 if item.tracing_status in (Tracing_Status.OK,
119 Tracing_Status.JUSTIFIED):
120 self.coverage[item.level].ok += 1
122 def write_report(self, filename):
123 assert isinstance(filename, str)
125 levels = []
126 for level_config in self.config.values():
127 level = {
128 "name" : level_config["name"],
129 "kind" : level_config["kind"],
130 "items" : [item.to_json()
131 for item in self.items.values()
132 if item.level == level_config["name"]],
133 "coverage" : self.coverage[level_config["name"]].coverage
134 }
135 levels.append(level)
137 report = {
138 "schema" : "lobster-report",
139 "version" : 2,
140 "generator" : "lobster_report",
141 "levels" : levels,
142 "policy" : self.config,
143 "matrix" : [],
144 }
146 with open(filename, "w", encoding="UTF-8") as fd:
147 json.dump(report, fd, indent=2)
148 fd.write("\n")
150 def load_report(self, filename):
151 assert isinstance(filename, str)
153 loc = File_Reference(filename)
155 # Read and validate JSON
156 with open(filename, "r", encoding="UTF-8") as fd:
157 try:
158 data = json.load(fd)
159 except json.decoder.JSONDecodeError as err:
160 self.mh.error(File_Reference(filename,
161 err.lineno,
162 err.colno),
163 err.msg)
165 # Validate basic structure
166 self.validate_basic_structure_of_lobster_file(data, loc)
168 # Validate indicated schema
169 self.validate_indicated_schema(data, loc)
171 # Validate and parse custom data
172 self.validate_and_parse_custom_data(data, loc)
174 # Read in data
175 self.compute_items_and_coverage_for_items(data)
177 def compute_items_and_coverage_for_items(self, data):
178 """
179 Function calculates items and coverage for the items
180 Parameters
181 ----------
182 data - contents of lobster json file.
184 Returns - Nothing
185 -------
187 """
188 self.config = data["policy"]
189 for level in data["levels"]:
190 assert level["name"] in self.config, (
191 f"level '{level['name']}' not found in config"
192 )
193 coverage = Coverage(
194 level=level["name"], items=0, ok=0, coverage=level["coverage"]
195 )
196 self.coverage.update({level["name"]: coverage})
198 for item_data in level["items"]:
199 if level["kind"] == "requirements": 199 ↛ 200line 199 didn't jump to line 200 because the condition on line 199 was never true
200 item = Requirement.from_json(level["name"],
201 item_data,
202 3)
203 elif level["kind"] == "implementation": 203 ↛ 208line 203 didn't jump to line 208 because the condition on line 203 was always true
204 item = Implementation.from_json(level["name"],
205 item_data,
206 3)
207 else:
208 assert level["kind"] == "activity", (
209 f"unknown level kind '{level['kind']}'"
210 )
211 item = Activity.from_json(level["name"],
212 item_data,
213 3)
215 self.items[item.tag.key()] = item
216 self.coverage[item.level].items += 1
217 if item.tracing_status in (Tracing_Status.OK,
218 Tracing_Status.JUSTIFIED):
219 self.coverage[item.level].ok += 1
221 def validate_and_parse_custom_data(self, data, loc):
222 """
223 Function validates the optional 'custom_data' field in the lobster report.
224 Ensures that if present, it is a dictionary with string keys and string values.
226 Parameters
227 ----------
228 data - contents of lobster json file.
229 loc - location from where the error was raised.
231 Returns - Nothing
232 -------
233 """
234 self.custom_data = data.get('custom_data', None)
235 if self.custom_data: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 if not isinstance(self.custom_data, dict):
237 self.mh.error(loc, "'custom_data' must be an object (dictionary).")
239 for key, value in self.custom_data.items():
240 if not isinstance(key, str):
241 self.mh.error(loc,
242 f"Key in 'custom_data' must be a "
243 f"string, got {type(key).__name__}.")
244 if not isinstance(value, str):
245 self.mh.error(loc,
246 f"Value for key '{key}' in 'custom_data' "
247 f"must be a string, got {type(value).__name__}.")
249 def validate_indicated_schema(self, data, loc):
250 """
251 Function validates the schema and version.
252 Parameters
253 ----------
254 data - contents of lobster json file.
255 loc - location from where the error was raised.
257 Returns - Nothing
258 -------
260 """
261 supported_schema = {
262 "lobster-report": set([2]),
263 }
264 if data["schema"] not in supported_schema: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 self.mh.error(loc, "unknown schema kind %s" % data["schema"])
266 if data["version"] not in supported_schema[data["schema"]]: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 self.mh.error(loc,
268 "version %u for schema %s is not supported" %
269 (data["version"], data["schema"]))
271 def validate_basic_structure_of_lobster_file(self, data, loc):
272 """
273 Function validates the basic structure of lobster file. All the first level
274 keys of the lobster json file are validated here.
275 Parameters
276 ----------
277 data - contents of lobster json file.
278 loc - location from where the error was raised.
280 Returns - Nothing
281 -------
283 """
284 if not isinstance(data, dict): 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 self.mh.error(loc, "parsed json is not an object")
287 rkey_dict = {"schema": str, "version": int, "generator": str, "levels": list,
288 "policy": dict, "matrix": list}
289 type_dict = {int: "an integer", str: "a string", list: "an array",
290 dict: "an object"}
291 for rkey, rvalue in rkey_dict.items():
292 if rkey not in data: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true
293 self.mh.error(loc, "required top-levelkey %s not present" % rkey)
294 if not isinstance(data[rkey], rvalue): 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true
295 self.mh.error(loc, "%s is not %s." % (rkey, type_dict[rvalue]))