Coverage for lobster/common/io.py: 79%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-27 13:02 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import json 

21from typing import Dict, Optional, Sequence, TextIO, Type, Union, Iterable 

22 

23from lobster.common.errors import Message_Handler 

24from lobster.common.location import File_Reference 

25from lobster.common.items import Requirement, Implementation, Activity 

26 

27 

28def lobster_write( 

29 fd: TextIO, 

30 kind: Union[Type[Requirement], Type[Implementation], Type[Activity]], 

31 generator: str, 

32 items: Iterable, 

33): 

34 if not all(isinstance(item, kind) for item in items): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true

35 raise ValueError( 

36 f"All elements in 'items' must be of the type {kind.__name__}!", 

37 ) 

38 

39 if kind is Requirement: 

40 schema = "lobster-req-trace" 

41 version = 4 

42 elif kind is Implementation: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 schema = "lobster-imp-trace" 

44 version = 3 

45 else: 

46 schema = "lobster-act-trace" 

47 version = 3 

48 

49 data = {"data" : list(x.to_json() for x in items), 

50 "generator" : generator, 

51 "schema" : schema, 

52 "version" : version} 

53 json.dump(data, fd, indent=2) 

54 fd.write("\n") 

55 

56 

57def lobster_read( 

58 mh, 

59 filename: str, 

60 level: str, 

61 items: Dict[str, Union[Activity, Implementation, Requirement]], 

62 source_info: Optional[Dict] = None, 

63): 

64 loc = File_Reference(filename) 

65 

66 # Read and validate JSON 

67 with open(filename, "r", encoding="UTF-8") as fd: 

68 try: 

69 data = json.load(fd) 

70 except json.decoder.JSONDecodeError as err: 

71 mh.error(File_Reference(filename, 

72 err.lineno, 

73 err.colno), 

74 err.msg) 

75 

76 # Validate basic structure 

77 if not isinstance(data, dict): 

78 mh.error(loc, "parsed json is not an object") 

79 

80 for rkey in ("schema", "version", "generator", "data"): 

81 if rkey not in data: 

82 mh.error(loc, "required top-levelkey %s not present" % rkey) 

83 if rkey == "data": 

84 if not isinstance(data[rkey], list): 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 mh.error(loc, "data is not an array") 

86 elif rkey == "version": 

87 if not isinstance(data[rkey], int): 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 mh.error(loc, "version is not an integer") 

89 else: 

90 if not isinstance(data[rkey], str): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 mh.error(loc, "%s is not a string" % rkey) 

92 

93 # Validate indicated schema 

94 supported_schema = { 

95 "lobster-req-trace" : set([3, 4]), 

96 "lobster-imp-trace" : set([3]), 

97 "lobster-act-trace" : set([3]), 

98 } 

99 if data["schema"] not in supported_schema: 

100 mh.error(loc, "unknown schema kind %s" % data["schema"]) 

101 if data["version"] not in supported_schema[data["schema"]]: 

102 mh.error(loc, 

103 "version %u for schema %s is not supported" % 

104 (data["version"], data["schema"])) 

105 

106 duplicate_items = [] 

107 # Convert to items, and integrate into symbol table 

108 for raw in data["data"]: 

109 if data["schema"] == "lobster-req-trace": 

110 item = Requirement.from_json(level, raw, data["version"]) 

111 elif data["schema"] == "lobster-imp-trace": 111 ↛ 114line 111 didn't jump to line 114 because the condition on line 111 was always true

112 item = Implementation.from_json(level, raw, data["version"]) 

113 else: 

114 item = Activity.from_json(level, raw, data["version"]) 

115 

116 if source_info is not None: 116 ↛ 119line 116 didn't jump to line 119 because the condition on line 116 was always true

117 item.perform_source_checks(source_info) 

118 

119 if item.tag.key() in items: 119 ↛ 123line 119 didn't jump to line 123 because the condition on line 119 was never true

120 # 'duplicate definition' errors are fatal, but the user wants to see all 

121 # of them. So store the affected items in a list first, and create 

122 # errors later. 

123 duplicate_items.append(item) 

124 else: 

125 items[item.tag.key()] = item 

126 

127 signal_duplicate_items(mh, items, duplicate_items) 

128 

129 

130def signal_duplicate_items( 

131 mh: Message_Handler, 

132 items, 

133 duplicate_items: Sequence[Union[Activity, Implementation, Requirement]], 

134): 

135 """ 

136 Report errors for duplicate items to the message handler. 

137 If there are any duplicate items, the last one is considered fatal. 

138 """ 

139 if duplicate_items: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 for counter, item in enumerate(duplicate_items, start=1): 

141 mh.error( 

142 location=item.location, 

143 message=f"duplicate definition of {item.tag.key()}, " 

144 f"previously defined at " 

145 f"{items[item.tag.key()].location.to_string()}", 

146 fatal=(counter == len(duplicate_items)), 

147 )