Coverage for lobster/common/io.py: 86%
61 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import json
21from typing import Dict, Optional, Sequence, TextIO, Type, Union, Iterable
23from lobster.common.errors import Message_Handler
24from lobster.common.location import File_Reference
25from lobster.common.items import Requirement, Implementation, Activity
28def lobster_write(
29 fd: TextIO,
30 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]],
31 generator: str,
32 items: Iterable,
33):
34 if not all(isinstance(item, kind) for item in items): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true
35 raise ValueError(
36 f"All elements in 'items' must be of the type {kind.__name__}!",
37 )
39 if kind is Requirement:
40 schema = "lobster-req-trace"
41 version = 4
42 elif kind is Implementation:
43 schema = "lobster-imp-trace"
44 version = 3
45 else:
46 schema = "lobster-act-trace"
47 version = 3
49 data = {"data" : list(x.to_json() for x in items),
50 "generator" : generator,
51 "schema" : schema,
52 "version" : version}
53 json.dump(data, fd, indent=2)
54 fd.write("\n")
57def lobster_read(
58 mh,
59 filename: str,
60 level: str,
61 items: Dict[str, Union[Activity, Implementation, Requirement]],
62 source_info: Optional[Dict] = None,
63):
64 loc = File_Reference(filename)
66 # Read and validate JSON
67 with open(filename, "r", encoding="UTF-8") as fd:
68 try:
69 data = json.load(fd)
70 except json.decoder.JSONDecodeError as err:
71 mh.error(File_Reference(filename,
72 err.lineno,
73 err.colno),
74 err.msg)
76 # Validate basic structure
77 if not isinstance(data, dict): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 mh.error(loc, "parsed json is not an object")
80 for rkey in ("schema", "version", "generator", "data"):
81 if rkey not in data:
82 mh.error(loc, "required top-levelkey %s not present" % rkey)
83 if rkey == "data":
84 if not isinstance(data[rkey], list): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 mh.error(loc, "data is not an array")
86 elif rkey == "version":
87 if not isinstance(data[rkey], int): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 mh.error(loc, "version is not an integer")
89 else:
90 if not isinstance(data[rkey], str): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 mh.error(loc, "%s is not a string" % rkey)
93 # Validate indicated schema
94 supported_schema = {
95 "lobster-req-trace" : set([3, 4]),
96 "lobster-imp-trace" : set([3]),
97 "lobster-act-trace" : set([3]),
98 }
99 if data["schema"] not in supported_schema:
100 mh.error(loc, "unknown schema kind %s" % data["schema"])
101 if data["version"] not in supported_schema[data["schema"]]:
102 mh.error(loc,
103 "version %u for schema %s is not supported" %
104 (data["version"], data["schema"]))
106 duplicate_items = []
107 # Convert to items, and integrate into symbol table
108 for raw in data["data"]:
109 if data["schema"] == "lobster-req-trace":
110 item = Requirement.from_json(level, raw, data["version"])
111 elif data["schema"] == "lobster-imp-trace":
112 item = Implementation.from_json(level, raw, data["version"])
113 else:
114 item = Activity.from_json(level, raw, data["version"])
116 if source_info is not None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 item.perform_source_checks(source_info)
119 if item.tag.key() in items: 119 ↛ 123line 119 didn't jump to line 123 because the condition on line 119 was never true
120 # 'duplicate definition' errors are fatal, but the user wants to see all
121 # of them. So store the affected items in a list first, and create
122 # errors later.
123 duplicate_items.append(item)
124 else:
125 items[item.tag.key()] = item
127 signal_duplicate_items(mh, items, duplicate_items)
130def signal_duplicate_items(
131 mh: Message_Handler,
132 items,
133 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]],
134):
135 """
136 Report errors for duplicate items to the message handler.
137 If there are any duplicate items, the last one is considered fatal.
138 """
139 if duplicate_items:
140 for counter, item in enumerate(duplicate_items, start=1):
141 mh.error(
142 location=item.location,
143 message=f"duplicate definition of {item.tag.key()}, "
144 f"previously defined at "
145 f"{items[item.tag.key()].location.to_string()}",
146 fatal=(counter == len(duplicate_items)),
147 )