Coverage for lobster/common/io.py: 85%

66 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-01-09 10:06 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os 

21import json 

22from typing import Dict, Optional, Sequence, TextIO, Type, Union, Iterable 

23 

24from lobster.common.errors import Message_Handler 

25from lobster.common.location import File_Reference 

26from lobster.common.items import Requirement, Implementation, Activity 

27 

28 

29def lobster_write( 

30 fd: TextIO, 

31 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]], 

32 generator: str, 

33 items: Iterable, 

34): 

35 if not all(isinstance(item, kind) for item in items): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 raise ValueError( 

37 f"All elements in 'items' must be of the type {kind.__name__}!", 

38 ) 

39 

40 if kind is Requirement: 

41 schema = "lobster-req-trace" 

42 version = 4 

43 elif kind is Implementation: 

44 schema = "lobster-imp-trace" 

45 version = 3 

46 else: 

47 schema = "lobster-act-trace" 

48 version = 3 

49 

50 data = {"data" : list(x.to_json() for x in items), 

51 "generator" : generator, 

52 "schema" : schema, 

53 "version" : version} 

54 json.dump(data, fd, indent=2) 

55 fd.write("\n") 

56 

57 

58def lobster_read( 

59 mh, 

60 filename: str, 

61 level: str, 

62 items: Dict[str, Union[Activity, Implementation, Requirement]], 

63 source_info: Optional[Dict] = None, 

64): 

65 loc = File_Reference(filename) 

66 

67 # Read and validate JSON 

68 with open(filename, "r", encoding="UTF-8") as fd: 

69 try: 

70 data = json.load(fd) 

71 except json.decoder.JSONDecodeError as err: 

72 mh.error(File_Reference(filename, 

73 err.lineno, 

74 err.colno), 

75 err.msg) 

76 

77 # Validate basic structure 

78 if not isinstance(data, dict): 

79 mh.error(loc, "parsed json is not an object") 

80 

81 for rkey in ("schema", "version", "generator", "data"): 

82 if rkey not in data: 

83 mh.error(loc, "required top-levelkey %s not present" % rkey) 

84 if rkey == "data": 

85 if not isinstance(data[rkey], list): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 mh.error(loc, "data is not an array") 

87 elif rkey == "version": 

88 if not isinstance(data[rkey], int): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 mh.error(loc, "version is not an integer") 

90 else: 

91 if not isinstance(data[rkey], str): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 mh.error(loc, "%s is not a string" % rkey) 

93 

94 # Validate indicated schema 

95 supported_schema = { 

96 "lobster-req-trace" : set([3, 4]), 

97 "lobster-imp-trace" : set([3]), 

98 "lobster-act-trace" : set([3]), 

99 } 

100 if data["schema"] not in supported_schema: 

101 mh.error(loc, "unknown schema kind %s" % data["schema"]) 

102 if data["version"] not in supported_schema[data["schema"]]: 

103 mh.error(loc, 

104 "version %u for schema %s is not supported" % 

105 (data["version"], data["schema"])) 

106 

107 duplicate_items = [] 

108 # Convert to items, and integrate into symbol table 

109 for raw in data["data"]: 

110 if data["schema"] == "lobster-req-trace": 

111 item = Requirement.from_json(level, raw, data["version"]) 

112 elif data["schema"] == "lobster-imp-trace": 

113 item = Implementation.from_json(level, raw, data["version"]) 

114 else: 

115 item = Activity.from_json(level, raw, data["version"]) 

116 

117 if source_info is not None: 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true

118 item.perform_source_checks(source_info) 

119 

120 if item.tag.key() in items: 120 ↛ 124line 120 didn't jump to line 124 because the condition on line 120 was never true

121 # 'duplicate definition' errors are fatal, but the user wants to see all 

122 # of them. So store the affected items in a list first, and create 

123 # errors later. 

124 duplicate_items.append(item) 

125 else: 

126 items[item.tag.key()] = item 

127 

128 signal_duplicate_items(mh, items, duplicate_items) 

129 

130 

131def signal_duplicate_items( 

132 mh: Message_Handler, 

133 items, 

134 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]], 

135): 

136 """ 

137 Report errors for duplicate items to the message handler. 

138 If there are any duplicate items, the last one is considered fatal. 

139 """ 

140 if duplicate_items: 140 ↛ 141line 140 didn't jump to line 141 because the condition on line 140 was never true

141 for counter, item in enumerate(duplicate_items, start=1): 

142 mh.error( 

143 location=item.location, 

144 message=f"duplicate definition of {item.tag.key()}, " 

145 f"previously defined at " 

146 f"{items[item.tag.key()].location.to_string()}", 

147 fatal=(counter == len(duplicate_items)), 

148 ) 

149 

150 

151def ensure_output_directory(file_path: str) -> None: 

152 """Create parent directories for the output file if they don't exist.""" 

153 output_dir = os.path.dirname(file_path) 

154 if output_dir: 

155 os.makedirs(output_dir, exist_ok=True)