Coverage for lobster/tool.py: 53%

148 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 09:51 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os 

21import sys 

22import argparse 

23import multiprocessing 

24 

25from abc import ABCMeta, abstractmethod 

26from functools import partial 

27from typing import List, Sequence, Union, Tuple, Set, Dict 

28import yaml 

29from lobster.errors import Message_Handler 

30from lobster.location import File_Reference 

31from lobster.items import Requirement, Implementation, Activity 

32from lobster.io import lobster_write 

33from lobster.meta_data_tool_base import MetaDataToolBase 

34 

35 

36class SupportedCommonConfigKeys: 

37 """Helper class to define supported configuration keys.""" 

38 INPUTS_FROM_FILE = "inputs_from_file" 

39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs" 

40 INPUTS = "inputs" 

41 

42 @classmethod 

43 def get_config_keys_manual(cls) -> Dict[str, str]: 

44 help_dict = { 

45 cls.INPUTS_FROM_FILE : "Read input files or directories from this " 

46 "file. Each non-empty line is interpreted as " 

47 "one input. Supports comments starting with #.", 

48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded " 

49 "by default.", 

50 cls.INPUTS : "List of files to process or directories to " 

51 "search for relevant input files.", 

52 } 

53 return help_dict 

54 

55 @abstractmethod 

56 def get_mandatory_parameters(self) -> Set[str]: 

57 """Return a set of config file parameters that are mandatory""" 

58 

59 def get_formatted_help_text(self): 

60 help_dict = self.get_config_keys_manual() 

61 help_text = '' 

62 max_length = max(len(config_key) for config_key in help_dict) 

63 for config_key, help_info in help_dict.items(): 

64 spaces = " " * (max_length - len(config_key)) 

65 help_text += f"\n{config_key} {spaces}- {help_info}" 

66 return help_text 

67 

68 @classmethod 

69 def get_config_keys_as_set(cls) -> Set[str]: 

70 return set(cls.get_config_keys_manual().keys()) 

71 

72 

73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta): 

74 def __init__(self, name, description, extensions, official): 

75 super().__init__( 

76 name=name, 

77 description=description, 

78 official=official, 

79 ) 

80 assert isinstance(extensions, (list, set, frozenset, tuple)) 

81 assert all(isinstance(extension, str) 

82 for extension in extensions) 

83 

84 self.extensions = [f".{extension}" for extension in (sorted(extensions))] 

85 self.exclude_pat = [] 

86 self.schema = None 

87 self.mh = Message_Handler() 

88 self.config = {} 

89 

90 self._argument_parser.add_argument( 

91 "--out", 

92 default = f'{self.name}.lobster', 

93 help = "Write output to the given file instead of stdout." 

94 ) 

95 self._argument_parser.add_argument( 

96 "--config", 

97 default=None, 

98 help=(f"Path to YAML file with arguments as below," 

99 f"{self.get_formatted_help_text()}"), 

100 required=True 

101 ) 

102 

103 def load_yaml_config(self, config_path): 

104 """ 

105 Loads configuration from a YAML file. 

106 Parameters 

107 ---------- 

108 config_path - Yaml config file path. 

109 

110 Returns 

111 ------- 

112 data - Returns the Yaml file contents in dictionary format. 

113 """ 

114 if not config_path: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 return {} 

116 if not os.path.isfile(config_path): 

117 sys.exit(f"Error: Config file '{config_path}' not found.") 

118 with open(config_path, "r", encoding="UTF-8") as f: 

119 data = yaml.safe_load(f) or {} 

120 return data 

121 

122 def validate_yaml_supported_config_parameters(self, config): 

123 """ 

124 Function to check if the parameters mentioned in the Yaml config are 

125 supported by the tool in execution 

126 Parameters 

127 ---------- 

128 data - Yaml config file contents 

129 

130 Returns 

131 ------- 

132 Nothing 

133 """ 

134 if config: 134 ↛ exitline 134 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 134 was always true

135 supported_keys = self.get_config_keys_as_set() 

136 unsupported_keys = set(config.keys()) - supported_keys 

137 if unsupported_keys: 137 ↛ exitline 137 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 137 was always true

138 raise KeyError( 

139 f"Unsupported config keys: {', '.join(unsupported_keys)}. " 

140 f"Supported keys are: {', '.join(supported_keys)}." 

141 ) 

142 

143 def check_mandatory_config_parameters(self, config): 

144 """ 

145 Function to check if the mandatory parameters are provided in the config file 

146 Parameters 

147 ---------- 

148 data - Yaml config file contents 

149 

150 Returns 

151 ------- 

152 Nothing 

153 """ 

154 if self.get_mandatory_parameters(): 154 ↛ exitline 154 didn't return from function 'check_mandatory_config_parameters' because the condition on line 154 was always true

155 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys()) 

156 if mandatory_parameters: 156 ↛ exitline 156 didn't return from function 'check_mandatory_config_parameters' because the condition on line 156 was always true

157 sys.exit(f"Required mandatory parameters missing - " 

158 f"{','.join(mandatory_parameters)}") 

159 

160 def process_commandline_and_yaml_options( 

161 self, 

162 options: argparse.Namespace, 

163 ) -> List[Tuple[File_Reference, str]]: 

164 """Processes all command line options""" 

165 

166 self.config = self.load_yaml_config(options.config) 

167 self.validate_yaml_supported_config_parameters(self.config) 

168 self.check_mandatory_config_parameters(self.config) 

169 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None) 

170 options.inputs = self.config.get(self.INPUTS, []) 

171 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False) 

172 work_list = self.process_common_options(options) 

173 self.process_tool_options(options, work_list) 

174 return work_list 

175 

176 def process_common_options( 

177 self, 

178 options: argparse.Namespace, 

179 ) -> List[Tuple[File_Reference, str]]: 

180 """Generates the exact list of files to work on later. The list is sorted 

181 alphabetically.""" 

182 # Sanity check output 

183 if options.out and \ 

184 os.path.exists(options.out) and \ 

185 not os.path.isfile(options.out): 

186 self._argument_parser.error( 

187 f"output {options.out} already exists and is not a file", 

188 ) 

189 

190 # Assemble input requests 

191 inputs = [] 

192 if options.inputs: 192 ↛ 195line 192 didn't jump to line 195 because the condition on line 192 was always true

193 inputs += [(File_Reference("<config>"), item) 

194 for item in options.inputs] 

195 if options.inputs_from_file: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 if not os.path.isfile(options.inputs_from_file): 

197 self._argument_parser.error(f"cannot open {options.inputs_from_file}") 

198 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd: 

199 for line_no, raw_line in enumerate(fd, 1): 

200 line = raw_line.split("#", 1)[0].strip() 

201 if not line: 

202 continue 

203 inputs.append((File_Reference(options.inputs_from_file, 

204 line_no), 

205 line)) 

206 if not options.inputs and not options.inputs_from_file: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true

207 inputs.append((File_Reference("<config>"), ".")) 

208 # Sanity check and search directories 

209 work_list = [] 

210 ok = True 

211 for location, item in inputs: 

212 if os.path.isfile(item): 

213 if os.path.splitext(item)[1] not in self.extensions: 213 ↛ 216line 213 didn't jump to line 216 because the condition on line 213 was always true

214 self.mh.warning(location, 

215 f"not a {' or '.join(self.extensions)} file") 

216 work_list.append(item) 

217 

218 elif os.path.isdir(item): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 for path, dirs, files in os.walk(item): 

220 for n, dir_name in reversed(list(enumerate(dirs))): 

221 keep = True 

222 for pattern in self.exclude_pat: 

223 if pattern.match(dir_name): 

224 keep = False 

225 break 

226 if not keep: 

227 del dirs[n] 

228 

229 for file_name in files: 

230 if os.path.splitext(file_name)[1] in self.extensions: 

231 work_list.append(os.path.join(path, file_name)) 

232 

233 else: 

234 self.mh.error(location, 

235 f"{item} is not a file or directory", 

236 fatal = False) 

237 ok = False 

238 

239 if not ok: 

240 sys.exit(1) 

241 

242 work_list.sort() 

243 

244 return work_list 

245 

246 def write_output( 

247 self, 

248 options: argparse.Namespace, 

249 items: List[Union[Activity, Implementation, Requirement]], 

250 ): 

251 assert isinstance(options, argparse.Namespace) 

252 assert isinstance(items, list) 

253 assert all(isinstance(item, (Requirement, 

254 Implementation, 

255 Activity)) 

256 for item in items) 

257 

258 if options.out: 

259 with open(options.out, "w", encoding="UTF-8") as fd: 

260 lobster_write(fd, self.schema, self.name, items) 

261 print(f"{self.name}: wrote {len(items)} items to {options.out}") 

262 else: 

263 lobster_write(sys.stdout, self.schema, self.name, items) 

264 

265 @abstractmethod 

266 def process_tool_options( 

267 self, 

268 options: argparse.Namespace, 

269 work_list: List[Tuple[File_Reference, str]], 

270 ): 

271 assert isinstance(options, argparse.Namespace) 

272 assert isinstance(work_list, list) 

273 

274 

275class LOBSTER_Per_File_Tool(LOBSTER_Tool): 

276 def __init__(self, name, description, extensions, official=False): 

277 super().__init__(name, description, extensions, official) 

278 

279 @classmethod 

280 @abstractmethod 

281 def process( 

282 cls, 

283 options, 

284 file_name, 

285 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]: 

286 pass 

287 

288 def _run_impl(self, options: argparse.Namespace) -> int: 

289 work_list = self.process_commandline_and_yaml_options(options) 

290 

291 ok = True 

292 items = [] 

293 pfun = partial(self.process, options) 

294 

295 if options.single: 

296 for file_name in work_list: 

297 new_ok, new_items = pfun(file_name) 

298 ok &= new_ok 

299 items += new_items 

300 else: 

301 with multiprocessing.Pool() as pool: 

302 for new_ok, new_items in pool.imap(pfun, work_list, 4): 

303 ok &= new_ok 

304 items += new_items 

305 pool.close() 

306 pool.join() 

307 

308 if ok: 

309 self.write_output(options, items) 

310 else: 

311 print(f"{self.name}: aborting due to earlier errors") 

312 

313 return int(not ok)