Coverage for lobster/io.py: 68%
76 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os.path
21import io
22from collections.abc import Iterable
23import json
25from lobster.errors import Message_Handler
26from lobster.location import File_Reference
27from lobster.items import Requirement, Implementation, Activity
30def lobster_write(fd, kind, generator, items):
31 assert isinstance(fd, io.TextIOBase)
32 assert kind in (Requirement, Implementation, Activity)
33 assert isinstance(generator, str)
34 assert isinstance(items, Iterable)
35 assert all(isinstance(item, kind) for item in items)
37 if kind is Requirement:
38 schema = "lobster-req-trace"
39 version = 4
40 elif kind is Implementation: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true
41 schema = "lobster-imp-trace"
42 version = 3
43 else:
44 schema = "lobster-act-trace"
45 version = 3
47 data = {"data" : list(x.to_json() for x in items),
48 "generator" : generator,
49 "schema" : schema,
50 "version" : version}
51 json.dump(data, fd, indent=2)
52 fd.write("\n")
55def lobster_read(mh, filename, level, items, source_info=None):
56 assert isinstance(mh, Message_Handler)
57 assert isinstance(filename, str)
58 assert isinstance(level, str)
59 assert os.path.isfile(filename)
60 assert isinstance(source_info, dict) or source_info is None
62 loc = File_Reference(filename)
64 # Read and validate JSON
65 with open(filename, "r", encoding="UTF-8") as fd:
66 try:
67 data = json.load(fd)
68 except json.decoder.JSONDecodeError as err:
69 mh.error(File_Reference(filename,
70 err.lineno,
71 err.colno),
72 err.msg)
74 # Validate basic structure
75 if not isinstance(data, dict): 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 mh.error(loc, "parsed json is not an object")
78 for rkey in ("schema", "version", "generator", "data"):
79 if rkey not in data: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 mh.error(loc, "required top-levelkey %s not present" % rkey)
81 if rkey == "data":
82 if not isinstance(data[rkey], list): 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 mh.error(loc, "data is not an array")
84 elif rkey == "version":
85 if not isinstance(data[rkey], int): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 mh.error(loc, "version is not an integer")
87 else:
88 if not isinstance(data[rkey], str): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 mh.error(loc, "%s is not a string" % rkey)
91 # Validate indicated schema
92 supported_schema = {
93 "lobster-req-trace" : set([3, 4]),
94 "lobster-imp-trace" : set([3]),
95 "lobster-act-trace" : set([3]),
96 }
97 if data["schema"] not in supported_schema: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 mh.error(loc, "unknown schema kind %s" % data["schema"])
99 if data["version"] not in supported_schema[data["schema"]]: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true
100 mh.error(loc,
101 "version %u for schema %s is not supported" %
102 (data["version"], data["schema"]))
104 duplicate_items = []
105 # Convert to items, and integrate into symbol table
106 for raw in data["data"]:
107 if data["schema"] == "lobster-req-trace":
108 item = Requirement.from_json(level, raw, data["version"])
109 elif data["schema"] == "lobster-imp-trace": 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true
110 item = Implementation.from_json(level, raw, data["version"])
111 else:
112 item = Activity.from_json(level, raw, data["version"])
114 filter_conditions = []
116 if source_info is not None: 116 ↛ 126line 116 didn't jump to line 126 because the condition on line 116 was always true
117 item.perform_source_checks(source_info)
119 # evaluate source_info filters
120 for f, v in source_info['filters']: 120 ↛ 121line 120 didn't jump to line 121 because the loop on line 120 never started
121 if f == 'prefix':
122 filter_conditions.append(item.tag.tag.startswith(v))
123 if f == 'kind':
124 filter_conditions.append(item.kind == v)
126 if all(filter_conditions): 126 ↛ 106line 126 didn't jump to line 106 because the condition on line 126 was always true
127 if item.tag.key() in items: 127 ↛ 131line 127 didn't jump to line 131 because the condition on line 127 was never true
128 # 'duplicate definition' errors are fatal, but the user wants to see all
129 # of them. So store the affected items in a list first, and create
130 # errors later.
131 duplicate_items.append(item)
132 else:
133 items[item.tag.key()] = item
135 if duplicate_items: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 for counter, item in enumerate(duplicate_items, start=1):
137 mh.error(
138 item.location,
139 f"duplicate definition of {item.tag.key()}, "
140 f"previously defined at {items[item.tag.key()].location.to_string()}",
141 fatal=(counter == len(duplicate_items)),
142 )