Coverage for lobster/io.py: 86%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 09:51 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20from collections.abc import Iterable 

21import json 

22from typing import Dict, Optional, Sequence, TextIO, Type, Union 

23 

24from lobster.errors import Message_Handler 

25from lobster.location import File_Reference 

26from lobster.items import Requirement, Implementation, Activity 

27 

28 

29def lobster_write( 

30 fd: TextIO, 

31 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]], 

32 generator: str, 

33 items: Iterable, 

34): 

35 if not all(isinstance(item, kind) for item in items): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true

36 raise ValueError( 

37 f"All elements in 'items' must be of the type {kind.__name__}!", 

38 ) 

39 

40 if kind is Requirement: 

41 schema = "lobster-req-trace" 

42 version = 4 

43 elif kind is Implementation: 

44 schema = "lobster-imp-trace" 

45 version = 3 

46 else: 

47 schema = "lobster-act-trace" 

48 version = 3 

49 

50 data = {"data" : list(x.to_json() for x in items), 

51 "generator" : generator, 

52 "schema" : schema, 

53 "version" : version} 

54 json.dump(data, fd, indent=2) 

55 fd.write("\n") 

56 

57 

58def lobster_read( 

59 mh, 

60 filename: str, 

61 level: str, 

62 items: Dict[str, Union[Activity, Implementation, Requirement]], 

63 source_info: Optional[Dict] = None, 

64): 

65 loc = File_Reference(filename) 

66 

67 # Read and validate JSON 

68 with open(filename, "r", encoding="UTF-8") as fd: 

69 try: 

70 data = json.load(fd) 

71 except json.decoder.JSONDecodeError as err: 

72 mh.error(File_Reference(filename, 

73 err.lineno, 

74 err.colno), 

75 err.msg) 

76 

77 # Validate basic structure 

78 if not isinstance(data, dict): 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 mh.error(loc, "parsed json is not an object") 

80 

81 for rkey in ("schema", "version", "generator", "data"): 

82 if rkey not in data: 

83 mh.error(loc, "required top-levelkey %s not present" % rkey) 

84 if rkey == "data": 

85 if not isinstance(data[rkey], list): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 mh.error(loc, "data is not an array") 

87 elif rkey == "version": 

88 if not isinstance(data[rkey], int): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 mh.error(loc, "version is not an integer") 

90 else: 

91 if not isinstance(data[rkey], str): 91 ↛ 92line 91 didn't jump to line 92 because the condition on line 91 was never true

92 mh.error(loc, "%s is not a string" % rkey) 

93 

94 # Validate indicated schema 

95 supported_schema = { 

96 "lobster-req-trace" : set([3, 4]), 

97 "lobster-imp-trace" : set([3]), 

98 "lobster-act-trace" : set([3]), 

99 } 

100 if data["schema"] not in supported_schema: 

101 mh.error(loc, "unknown schema kind %s" % data["schema"]) 

102 if data["version"] not in supported_schema[data["schema"]]: 

103 mh.error(loc, 

104 "version %u for schema %s is not supported" % 

105 (data["version"], data["schema"])) 

106 

107 duplicate_items = [] 

108 # Convert to items, and integrate into symbol table 

109 for raw in data["data"]: 

110 if data["schema"] == "lobster-req-trace": 

111 item = Requirement.from_json(level, raw, data["version"]) 

112 elif data["schema"] == "lobster-imp-trace": 

113 item = Implementation.from_json(level, raw, data["version"]) 

114 else: 

115 item = Activity.from_json(level, raw, data["version"]) 

116 

117 if source_info is not None: 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 item.perform_source_checks(source_info) 

119 

120 if item.tag.key() in items: 120 ↛ 124line 120 didn't jump to line 124 because the condition on line 120 was never true

121 # 'duplicate definition' errors are fatal, but the user wants to see all 

122 # of them. So store the affected items in a list first, and create 

123 # errors later. 

124 duplicate_items.append(item) 

125 else: 

126 items[item.tag.key()] = item 

127 

128 signal_duplicate_items(mh, items, duplicate_items) 

129 

130 

131def signal_duplicate_items( 

132 mh: Message_Handler, 

133 items, 

134 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]], 

135): 

136 """ 

137 Report errors for duplicate items to the message handler. 

138 If there are any duplicate items, the last one is considered fatal. 

139 """ 

140 if duplicate_items: 

141 for counter, item in enumerate(duplicate_items, start=1): 

142 mh.error( 

143 location=item.location, 

144 message=f"duplicate definition of {item.tag.key()}, " 

145 f"previously defined at " 

146 f"{items[item.tag.key()].location.to_string()}", 

147 fatal=(counter == len(duplicate_items)), 

148 )