Coverage for lobster/common/io.py: 79%
61 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import json
21from typing import Dict, Optional, Sequence, TextIO, Type, Union, Iterable
23from lobster.common.errors import Message_Handler
24from lobster.common.location import File_Reference
25from lobster.common.items import Requirement, Implementation, Activity
28def lobster_write(
29 fd: TextIO,
30 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]],
31 generator: str,
32 items: Iterable,
33):
34 if not all(isinstance(item, kind) for item in items): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true
35 raise ValueError(
36 f"All elements in 'items' must be of the type {kind.__name__}!",
37 )
39 if kind is Requirement:
40 schema = "lobster-req-trace"
41 version = 4
42 elif kind is Implementation: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true
43 schema = "lobster-imp-trace"
44 version = 3
45 else:
46 schema = "lobster-act-trace"
47 version = 3
49 data = {"data" : list(x.to_json() for x in items),
50 "generator" : generator,
51 "schema" : schema,
52 "version" : version}
53 json.dump(data, fd, indent=2)
54 fd.write("\n")
57def lobster_read(
58 mh,
59 filename: str,
60 level: str,
61 items: Dict[str, Union[Activity, Implementation, Requirement]],
62 source_info: Optional[Dict] = None,
63):
64 loc = File_Reference(filename)
66 # Read and validate JSON
67 with open(filename, "r", encoding="UTF-8") as fd:
68 try:
69 data = json.load(fd)
70 except json.decoder.JSONDecodeError as err:
71 mh.error(File_Reference(filename,
72 err.lineno,
73 err.colno),
74 err.msg)
76 # Validate basic structure
77 if not isinstance(data, dict):
78 mh.error(loc, "parsed json is not an object")
80 for rkey in ("schema", "version", "generator", "data"):
81 if rkey not in data:
82 mh.error(loc, "required top-levelkey %s not present" % rkey)
83 if rkey == "data":
84 if not isinstance(data[rkey], list): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 mh.error(loc, "data is not an array")
86 elif rkey == "version":
87 if not isinstance(data[rkey], int): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 mh.error(loc, "version is not an integer")
89 else:
90 if not isinstance(data[rkey], str): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 mh.error(loc, "%s is not a string" % rkey)
93 # Validate indicated schema
94 supported_schema = {
95 "lobster-req-trace" : set([3, 4]),
96 "lobster-imp-trace" : set([3]),
97 "lobster-act-trace" : set([3]),
98 }
99 if data["schema"] not in supported_schema:
100 mh.error(loc, "unknown schema kind %s" % data["schema"])
101 if data["version"] not in supported_schema[data["schema"]]:
102 mh.error(loc,
103 "version %u for schema %s is not supported" %
104 (data["version"], data["schema"]))
106 duplicate_items = []
107 # Convert to items, and integrate into symbol table
108 for raw in data["data"]:
109 if data["schema"] == "lobster-req-trace":
110 item = Requirement.from_json(level, raw, data["version"])
111 elif data["schema"] == "lobster-imp-trace": 111 ↛ 114line 111 didn't jump to line 114 because the condition on line 111 was always true
112 item = Implementation.from_json(level, raw, data["version"])
113 else:
114 item = Activity.from_json(level, raw, data["version"])
116 if source_info is not None: 116 ↛ 119line 116 didn't jump to line 119 because the condition on line 116 was always true
117 item.perform_source_checks(source_info)
119 if item.tag.key() in items: 119 ↛ 123line 119 didn't jump to line 123 because the condition on line 119 was never true
120 # 'duplicate definition' errors are fatal, but the user wants to see all
121 # of them. So store the affected items in a list first, and create
122 # errors later.
123 duplicate_items.append(item)
124 else:
125 items[item.tag.key()] = item
127 signal_duplicate_items(mh, items, duplicate_items)
130def signal_duplicate_items(
131 mh: Message_Handler,
132 items,
133 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]],
134):
135 """
136 Report errors for duplicate items to the message handler.
137 If there are any duplicate items, the last one is considered fatal.
138 """
139 if duplicate_items: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 for counter, item in enumerate(duplicate_items, start=1):
141 mh.error(
142 location=item.location,
143 message=f"duplicate definition of {item.tag.key()}, "
144 f"previously defined at "
145 f"{items[item.tag.key()].location.to_string()}",
146 fatal=(counter == len(duplicate_items)),
147 )