Coverage for lobster/io.py: 76%

76 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 14:55 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os.path 

21import io 

22from collections.abc import Iterable 

23import json 

24 

25from lobster.errors import Message_Handler 

26from lobster.location import File_Reference 

27from lobster.items import Requirement, Implementation, Activity 

28 

29 

30def lobster_write(fd, kind, generator, items): 

31 assert isinstance(fd, io.TextIOBase) 

32 assert kind in (Requirement, Implementation, Activity) 

33 assert isinstance(generator, str) 

34 assert isinstance(items, Iterable) 

35 assert all(isinstance(item, kind) for item in items) 

36 

37 if kind is Requirement: 

38 schema = "lobster-req-trace" 

39 version = 4 

40 elif kind is Implementation: 

41 schema = "lobster-imp-trace" 

42 version = 3 

43 else: 

44 schema = "lobster-act-trace" 

45 version = 3 

46 

47 data = {"data" : list(x.to_json() for x in items), 

48 "generator" : generator, 

49 "schema" : schema, 

50 "version" : version} 

51 json.dump(data, fd, indent=2) 

52 fd.write("\n") 

53 

54 

55def lobster_read(mh, filename, level, items, source_info=None): 

56 assert isinstance(mh, Message_Handler) 

57 assert isinstance(filename, str) 

58 assert isinstance(level, str) 

59 assert os.path.isfile(filename) 

60 assert isinstance(source_info, dict) or source_info is None 

61 

62 loc = File_Reference(filename) 

63 

64 # Read and validate JSON 

65 with open(filename, "r", encoding="UTF-8") as fd: 

66 try: 

67 data = json.load(fd) 

68 except json.decoder.JSONDecodeError as err: 

69 mh.error(File_Reference(filename, 

70 err.lineno, 

71 err.colno), 

72 err.msg) 

73 

74 # Validate basic structure 

75 if not isinstance(data, dict): 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 mh.error(loc, "parsed json is not an object") 

77 

78 for rkey in ("schema", "version", "generator", "data"): 

79 if rkey not in data: 

80 mh.error(loc, "required top-levelkey %s not present" % rkey) 

81 if rkey == "data": 

82 if not isinstance(data[rkey], list): 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 mh.error(loc, "data is not an array") 

84 elif rkey == "version": 

85 if not isinstance(data[rkey], int): 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 mh.error(loc, "version is not an integer") 

87 else: 

88 if not isinstance(data[rkey], str): 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 mh.error(loc, "%s is not a string" % rkey) 

90 

91 # Validate indicated schema 

92 supported_schema = { 

93 "lobster-req-trace" : set([3, 4]), 

94 "lobster-imp-trace" : set([3]), 

95 "lobster-act-trace" : set([3]), 

96 } 

97 if data["schema"] not in supported_schema: 

98 mh.error(loc, "unknown schema kind %s" % data["schema"]) 

99 if data["version"] not in supported_schema[data["schema"]]: 

100 mh.error(loc, 

101 "version %u for schema %s is not supported" % 

102 (data["version"], data["schema"])) 

103 

104 duplicate_items = [] 

105 # Convert to items, and integrate into symbol table 

106 for raw in data["data"]: 

107 if data["schema"] == "lobster-req-trace": 

108 item = Requirement.from_json(level, raw, data["version"]) 

109 elif data["schema"] == "lobster-imp-trace": 

110 item = Implementation.from_json(level, raw, data["version"]) 

111 else: 

112 item = Activity.from_json(level, raw, data["version"]) 

113 

114 filter_conditions = [] 

115 

116 if source_info is not None: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 item.perform_source_checks(source_info) 

118 

119 # evaluate source_info filters 

120 for f, v in source_info['filters']: 

121 if f == 'prefix': 

122 filter_conditions.append(item.tag.tag.startswith(v)) 

123 if f == 'kind': 

124 filter_conditions.append(item.kind == v) 

125 

126 if all(filter_conditions): 126 ↛ 106line 126 didn't jump to line 106 because the condition on line 126 was always true

127 if item.tag.key() in items: 127 ↛ 131line 127 didn't jump to line 131 because the condition on line 127 was never true

128 # 'duplicate definition' errors are fatal, but the user wants to see all 

129 # of them. So store the affected items in a list first, and create 

130 # errors later. 

131 duplicate_items.append(item) 

132 else: 

133 items[item.tag.key()] = item 

134 

135 if duplicate_items: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 for counter, item in enumerate(duplicate_items, start=1): 

137 mh.error( 

138 item.location, 

139 f"duplicate definition of {item.tag.key()}, " 

140 f"previously defined at {items[item.tag.key()].location.to_string()}", 

141 fatal=(counter == len(duplicate_items)), 

142 )