Coverage for lobster/common/io.py: 86%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-27 13:02 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import json 

21from typing import Dict, Optional, Sequence, TextIO, Type, Union, Iterable 

22 

23from lobster.common.errors import Message_Handler 

24from lobster.common.location import File_Reference 

25from lobster.common.items import Requirement, Implementation, Activity 

26 

27 

28def lobster_write( 

29 fd: TextIO, 

30 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]], 

31 generator: str, 

32 items: Iterable, 

33): 

34 if not all(isinstance(item, kind) for item in items): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 raise ValueError( 

36 f"All elements in 'items' must be of the type {kind.__name__}!", 

37 ) 

38 

39 if kind is Requirement: 

40 schema = "lobster-req-trace" 

41 version = 4 

42 elif kind is Implementation: 

43 schema = "lobster-imp-trace" 

44 version = 3 

45 else: 

46 schema = "lobster-act-trace" 

47 version = 3 

48 

49 data = {"data" : list(x.to_json() for x in items), 

50 "generator" : generator, 

51 "schema" : schema, 

52 "version" : version} 

53 json.dump(data, fd, indent=2) 

54 fd.write("\n") 

55 

56 

57def lobster_read( 

58 mh, 

59 filename: str, 

60 level: str, 

61 items: Dict[str, Union[Activity, Implementation, Requirement]], 

62 source_info: Optional[Dict] = None, 

63): 

64 loc = File_Reference(filename) 

65 

66 # Read and validate JSON 

67 with open(filename, "r", encoding="UTF-8") as fd: 

68 try: 

69 data = json.load(fd) 

70 except json.decoder.JSONDecodeError as err: 

71 mh.error(File_Reference(filename, 

72 err.lineno, 

73 err.colno), 

74 err.msg) 

75 

76 # Validate basic structure 

77 if not isinstance(data, dict): 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 mh.error(loc, "parsed json is not an object") 

79 

80 for rkey in ("schema", "version", "generator", "data"): 

81 if rkey not in data: 

82 mh.error(loc, "required top-levelkey %s not present" % rkey) 

83 if rkey == "data": 

84 if not isinstance(data[rkey], list): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 mh.error(loc, "data is not an array") 

86 elif rkey == "version": 

87 if not isinstance(data[rkey], int): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 mh.error(loc, "version is not an integer") 

89 else: 

90 if not isinstance(data[rkey], str): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 mh.error(loc, "%s is not a string" % rkey) 

92 

93 # Validate indicated schema 

94 supported_schema = { 

95 "lobster-req-trace" : set([3, 4]), 

96 "lobster-imp-trace" : set([3]), 

97 "lobster-act-trace" : set([3]), 

98 } 

99 if data["schema"] not in supported_schema: 

100 mh.error(loc, "unknown schema kind %s" % data["schema"]) 

101 if data["version"] not in supported_schema[data["schema"]]: 

102 mh.error(loc, 

103 "version %u for schema %s is not supported" % 

104 (data["version"], data["schema"])) 

105 

106 duplicate_items = [] 

107 # Convert to items, and integrate into symbol table 

108 for raw in data["data"]: 

109 if data["schema"] == "lobster-req-trace": 

110 item = Requirement.from_json(level, raw, data["version"]) 

111 elif data["schema"] == "lobster-imp-trace": 

112 item = Implementation.from_json(level, raw, data["version"]) 

113 else: 

114 item = Activity.from_json(level, raw, data["version"]) 

115 

116 if source_info is not None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 item.perform_source_checks(source_info) 

118 

119 if item.tag.key() in items: 119 ↛ 123line 119 didn't jump to line 123 because the condition on line 119 was never true

120 # 'duplicate definition' errors are fatal, but the user wants to see all 

121 # of them. So store the affected items in a list first, and create 

122 # errors later. 

123 duplicate_items.append(item) 

124 else: 

125 items[item.tag.key()] = item 

126 

127 signal_duplicate_items(mh, items, duplicate_items) 

128 

129 

130def signal_duplicate_items( 

131 mh: Message_Handler, 

132 items, 

133 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]], 

134): 

135 """ 

136 Report errors for duplicate items to the message handler. 

137 If there are any duplicate items, the last one is considered fatal. 

138 """ 

139 if duplicate_items: 

140 for counter, item in enumerate(duplicate_items, start=1): 

141 mh.error( 

142 location=item.location, 

143 message=f"duplicate definition of {item.tag.key()}, " 

144 f"previously defined at " 

145 f"{items[item.tag.key()].location.to_string()}", 

146 fatal=(counter == len(duplicate_items)), 

147 )