Coverage for lobster/common/io.py: 85%
66 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import json
22from typing import Dict, Optional, Sequence, TextIO, Type, Union, Iterable
24from lobster.common.errors import Message_Handler
25from lobster.common.location import File_Reference
26from lobster.common.items import Requirement, Implementation, Activity
29def lobster_write(
30 fd: TextIO,
31 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]],
32 generator: str,
33 items: Iterable,
34):
35 if not all(isinstance(item, kind) for item in items): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 raise ValueError(
37 f"All elements in 'items' must be of the type {kind.__name__}!",
38 )
40 if kind is Requirement:
41 schema = "lobster-req-trace"
42 version = 4
43 elif kind is Implementation:
44 schema = "lobster-imp-trace"
45 version = 3
46 else:
47 schema = "lobster-act-trace"
48 version = 3
50 data = {"data" : list(x.to_json() for x in items),
51 "generator" : generator,
52 "schema" : schema,
53 "version" : version}
54 json.dump(data, fd, indent=2)
55 fd.write("\n")
58def lobster_read(
59 mh,
60 filename: str,
61 level: str,
62 items: Dict[str, Union[Activity, Implementation, Requirement]],
63 source_info: Optional[Dict] = None,
64):
65 loc = File_Reference(filename)
67 # Read and validate JSON
68 with open(filename, "r", encoding="UTF-8") as fd:
69 try:
70 data = json.load(fd)
71 except json.decoder.JSONDecodeError as err:
72 mh.error(File_Reference(filename,
73 err.lineno,
74 err.colno),
75 err.msg)
77 # Validate basic structure
78 if not isinstance(data, dict):
79 mh.error(loc, "parsed json is not an object")
81 for rkey in ("schema", "version", "generator", "data"):
82 if rkey not in data:
83 mh.error(loc, "required top-levelkey %s not present" % rkey)
84 if rkey == "data":
85 if not isinstance(data[rkey], list): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 mh.error(loc, "data is not an array")
87 elif rkey == "version":
88 if not isinstance(data[rkey], int): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 mh.error(loc, "version is not an integer")
90 else:
91 if not isinstance(data[rkey], str): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true
92 mh.error(loc, "%s is not a string" % rkey)
94 # Validate indicated schema
95 supported_schema = {
96 "lobster-req-trace" : set([3, 4]),
97 "lobster-imp-trace" : set([3]),
98 "lobster-act-trace" : set([3]),
99 }
100 if data["schema"] not in supported_schema:
101 mh.error(loc, "unknown schema kind %s" % data["schema"])
102 if data["version"] not in supported_schema[data["schema"]]:
103 mh.error(loc,
104 "version %u for schema %s is not supported" %
105 (data["version"], data["schema"]))
107 duplicate_items = []
108 # Convert to items, and integrate into symbol table
109 for raw in data["data"]:
110 if data["schema"] == "lobster-req-trace":
111 item = Requirement.from_json(level, raw, data["version"])
112 elif data["schema"] == "lobster-imp-trace":
113 item = Implementation.from_json(level, raw, data["version"])
114 else:
115 item = Activity.from_json(level, raw, data["version"])
117 if source_info is not None: 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true
118 item.perform_source_checks(source_info)
120 if item.tag.key() in items: 120 ↛ 124line 120 didn't jump to line 124 because the condition on line 120 was never true
121 # 'duplicate definition' errors are fatal, but the user wants to see all
122 # of them. So store the affected items in a list first, and create
123 # errors later.
124 duplicate_items.append(item)
125 else:
126 items[item.tag.key()] = item
128 signal_duplicate_items(mh, items, duplicate_items)
131def signal_duplicate_items(
132 mh: Message_Handler,
133 items,
134 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]],
135):
136 """
137 Report errors for duplicate items to the message handler.
138 If there are any duplicate items, the last one is considered fatal.
139 """
140 if duplicate_items: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true
141 for counter, item in enumerate(duplicate_items, start=1):
142 mh.error(
143 location=item.location,
144 message=f"duplicate definition of {item.tag.key()}, "
145 f"previously defined at "
146 f"{items[item.tag.key()].location.to_string()}",
147 fatal=(counter == len(duplicate_items)),
148 )
151def ensure_output_directory(file_path: str) -> None:
152 """Create parent directories for the output file if they don't exist."""
153 output_dir = os.path.dirname(file_path)
154 if output_dir:
155 os.makedirs(output_dir, exist_ok=True)