Coverage for lobster/io.py: 70%
62 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from collections.abc import Iterable
21import json
22from typing import Dict, Optional, Sequence, TextIO, Type, Union
24from lobster.errors import Message_Handler
25from lobster.location import File_Reference
26from lobster.items import Requirement, Implementation, Activity
29def lobster_write(
30 fd: TextIO,
31 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]],
32 generator: str,
33 items: Iterable,
34):
35 if not all(isinstance(item, kind) for item in items): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 raise ValueError(
37 f"All elements in 'items' must be of the type {kind.__name__}!",
38 )
40 if kind is Requirement:
41 schema = "lobster-req-trace"
42 version = 4
43 elif kind is Implementation: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true
44 schema = "lobster-imp-trace"
45 version = 3
46 else:
47 schema = "lobster-act-trace"
48 version = 3
50 data = {"data" : list(x.to_json() for x in items),
51 "generator" : generator,
52 "schema" : schema,
53 "version" : version}
54 json.dump(data, fd, indent=2)
55 fd.write("\n")
58def lobster_read(
59 mh,
60 filename: str,
61 level: str,
62 items: Dict[str, Union[Activity, Implementation, Requirement]],
63 source_info: Optional[Dict] = None,
64):
65 loc = File_Reference(filename)
67 # Read and validate JSON
68 with open(filename, "r", encoding="UTF-8") as fd:
69 try:
70 data = json.load(fd)
71 except json.decoder.JSONDecodeError as err:
72 mh.error(File_Reference(filename,
73 err.lineno,
74 err.colno),
75 err.msg)
77 # Validate basic structure
78 if not isinstance(data, dict): 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 mh.error(loc, "parsed json is not an object")
81 for rkey in ("schema", "version", "generator", "data"):
82 if rkey not in data: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 mh.error(loc, "required top-levelkey %s not present" % rkey)
84 if rkey == "data":
85 if not isinstance(data[rkey], list): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 mh.error(loc, "data is not an array")
87 elif rkey == "version":
88 if not isinstance(data[rkey], int): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 mh.error(loc, "version is not an integer")
90 else:
91 if not isinstance(data[rkey], str): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 mh.error(loc, "%s is not a string" % rkey)
94 # Validate indicated schema
95 supported_schema = {
96 "lobster-req-trace" : set([3, 4]),
97 "lobster-imp-trace" : set([3]),
98 "lobster-act-trace" : set([3]),
99 }
100 if data["schema"] not in supported_schema: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 mh.error(loc, "unknown schema kind %s" % data["schema"])
102 if data["version"] not in supported_schema[data["schema"]]: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 mh.error(loc,
104 "version %u for schema %s is not supported" %
105 (data["version"], data["schema"]))
107 duplicate_items = []
108 # Convert to items, and integrate into symbol table
109 for raw in data["data"]:
110 if data["schema"] == "lobster-req-trace":
111 item = Requirement.from_json(level, raw, data["version"])
112 elif data["schema"] == "lobster-imp-trace": 112 ↛ 115line 112 didn't jump to line 115 because the condition on line 112 was always true
113 item = Implementation.from_json(level, raw, data["version"])
114 else:
115 item = Activity.from_json(level, raw, data["version"])
117 if source_info is not None: 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true
118 item.perform_source_checks(source_info)
120 if item.tag.key() in items: 120 ↛ 124line 120 didn't jump to line 124 because the condition on line 120 was never true
121 # 'duplicate definition' errors are fatal, but the user wants to see all
122 # of them. So store the affected items in a list first, and create
123 # errors later.
124 duplicate_items.append(item)
125 else:
126 items[item.tag.key()] = item
128 signal_duplicate_items(mh, items, duplicate_items)
131def signal_duplicate_items(
132 mh: Message_Handler,
133 items,
134 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]],
135):
136 """
137 Report errors for duplicate items to the message handler.
138 If there are any duplicate items, the last one is considered fatal.
139 """
140 if duplicate_items: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 for counter, item in enumerate(duplicate_items, start=1):
142 mh.error(
143 location=item.location,
144 message=f"duplicate definition of {item.tag.key()}, "
145 f"previously defined at "
146 f"{items[item.tag.key()].location.to_string()}",
147 fatal=(counter == len(duplicate_items)),
148 )