Coverage for lobster/common/tool.py: 56%

150 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-04-20 08:15 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023-2026 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os 

21import sys 

22import argparse 

23import multiprocessing 

24 

25from abc import ABCMeta, abstractmethod 

26from functools import partial 

27from typing import Iterable, List, Sequence, Union, Tuple, Set, Dict 

28import yaml 

29from lobster.common.errors import Message_Handler 

30from lobster.common.location import File_Reference 

31from lobster.common.items import Requirement, Implementation, Activity 

32from lobster.common.io import lobster_write, ensure_output_directory 

33from lobster.common.meta_data_tool_base import MetaDataToolBase 

34 

35 

36class SupportedCommonConfigKeys: 

37 """Helper class to define supported configuration keys.""" 

38 INPUTS_FROM_FILE = "inputs_from_file" 

39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs" 

40 INPUTS = "inputs" 

41 

42 @classmethod 

43 def get_config_keys_manual(cls) -> Dict[str, str]: 

44 help_dict = { 

45 cls.INPUTS_FROM_FILE : "Read input files or directories from this " 

46 "file. Each non-empty line is interpreted as " 

47 "one input. Supports comments starting with #.", 

48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded " 

49 "by default.", 

50 cls.INPUTS : "List of files to process or directories to " 

51 "search for relevant input files.", 

52 } 

53 return help_dict 

54 

55 @abstractmethod 

56 def get_mandatory_parameters(self) -> Set[str]: 

57 """Return a set of config file parameters that are mandatory""" 

58 

59 def get_formatted_help_text(self): 

60 help_dict = self.get_config_keys_manual() 

61 help_text = '' 

62 max_length = max(len(config_key) for config_key in help_dict) 

63 for config_key, help_info in help_dict.items(): 

64 spaces = " " * (max_length - len(config_key)) 

65 help_text += f"\n{config_key} {spaces}- {help_info}" 

66 return help_text 

67 

68 @classmethod 

69 def get_config_keys_as_set(cls) -> Set[str]: 

70 return set(cls.get_config_keys_manual().keys()) 

71 

72 

73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta): 

74 def __init__( 

75 self, 

76 name: str, 

77 description: str, 

78 extensions: Iterable[str], 

79 official: bool, 

80 ): 

81 super().__init__( 

82 name=name, 

83 description=description, 

84 official=official, 

85 ) 

86 assert isinstance(extensions, (list, set, frozenset, tuple)) 

87 assert all(isinstance(extension, str) 

88 for extension in extensions) 

89 

90 self.extensions = [f".{extension}" for extension in (sorted(extensions))] 

91 self.exclude_pat = [] 

92 self.schema = None 

93 self.mh = Message_Handler() 

94 self.config = {} 

95 

96 self._argument_parser.add_argument( 

97 "--out", 

98 default = f'{self.name}.lobster', 

99 help = "Write output to the given file instead of stdout." 

100 ) 

101 self._argument_parser.add_argument( 

102 "--config", 

103 default=None, 

104 help=(f"Path to YAML file with arguments as below," 

105 f"{self.get_formatted_help_text()}"), 

106 required=True 

107 ) 

108 

109 def load_yaml_config(self, config_path): 

110 """ 

111 Loads configuration from a YAML file. 

112 Parameters 

113 ---------- 

114 config_path - Yaml config file path. 

115 

116 Returns 

117 ------- 

118 data - Returns the Yaml file contents in dictionary format. 

119 """ 

120 if not config_path: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return {} 

122 if not os.path.isfile(config_path): 

123 sys.exit(f"Error: Config file '{config_path}' not found.") 

124 with open(config_path, "r", encoding="UTF-8") as f: 

125 data = yaml.safe_load(f) or {} 

126 return data 

127 

128 def validate_yaml_supported_config_parameters(self, config): 

129 """ 

130 Function to check if the parameters mentioned in the Yaml config are 

131 supported by the tool in execution 

132 Parameters 

133 ---------- 

134 data - Yaml config file contents 

135 

136 Returns 

137 ------- 

138 Nothing 

139 """ 

140 if config: 140 ↛ exitline 140 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 140 was always true

141 supported_keys = self.get_config_keys_as_set() 

142 unsupported_keys = set(config.keys()) - supported_keys 

143 if unsupported_keys: 143 ↛ exitline 143 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 143 was always true

144 raise KeyError( 

145 f"Unsupported config keys: {', '.join(unsupported_keys)}. " 

146 f"Supported keys are: {', '.join(supported_keys)}." 

147 ) 

148 

149 def check_mandatory_config_parameters(self, config): 

150 """ 

151 Function to check if the mandatory parameters are provided in the config file 

152 Parameters 

153 ---------- 

154 data - Yaml config file contents 

155 

156 Returns 

157 ------- 

158 Nothing 

159 """ 

160 if self.get_mandatory_parameters(): 160 ↛ exitline 160 didn't return from function 'check_mandatory_config_parameters' because the condition on line 160 was always true

161 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys()) 

162 if mandatory_parameters: 162 ↛ exitline 162 didn't return from function 'check_mandatory_config_parameters' because the condition on line 162 was always true

163 sys.exit(f"Required mandatory parameters missing - " 

164 f"{','.join(mandatory_parameters)}") 

165 

166 def process_commandline_and_yaml_options( 

167 self, 

168 options: argparse.Namespace, 

169 ) -> List[Tuple[File_Reference, str]]: 

170 """Processes all command line options""" 

171 

172 self.config = self.load_yaml_config(options.config) 

173 self.validate_yaml_supported_config_parameters(self.config) 

174 self.check_mandatory_config_parameters(self.config) 

175 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None) 

176 options.inputs = self.config.get(self.INPUTS, []) 

177 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False) 

178 work_list = self.process_common_options(options) 

179 self.process_tool_options(options, work_list) 

180 return work_list 

181 

182 def _is_excluded_dir(self, dir_name: str) -> bool: 

183 return any(pattern.match(dir_name) for pattern in self.exclude_pat) 

184 

185 def _filter_walk_dirs(self, dirs: List[str]) -> None: 

186 dirs[:] = [dir_name for dir_name in dirs 

187 if not self._is_excluded_dir(dir_name)] 

188 

189 def _collect_files_from_directory(self, directory: str) -> List[str]: 

190 work_items = [] 

191 for path, dirs, files in os.walk(directory): 

192 self._filter_walk_dirs(dirs) 

193 for file_name in files: 

194 if os.path.splitext(file_name)[1] in self.extensions: 

195 work_items.append(os.path.join(path, file_name)) 

196 return work_items 

197 

198 def process_common_options( 

199 self, 

200 options: argparse.Namespace, 

201 ) -> List[Tuple[File_Reference, str]]: 

202 """Generates the exact list of files to work on later. The list is sorted 

203 alphabetically.""" 

204 # Sanity check output 

205 if options.out and \ 

206 os.path.exists(options.out) and \ 

207 not os.path.isfile(options.out): 

208 self._argument_parser.error( 

209 f"output {options.out} already exists and is not a file", 

210 ) 

211 

212 # Assemble input requests 

213 inputs = [] 

214 if options.inputs: 214 ↛ 217line 214 didn't jump to line 217 because the condition on line 214 was always true

215 inputs += [(File_Reference("<config>"), item) 

216 for item in options.inputs] 

217 if options.inputs_from_file: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 if not os.path.isfile(options.inputs_from_file): 

219 self._argument_parser.error(f"cannot open {options.inputs_from_file}") 

220 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd: 

221 for line_no, raw_line in enumerate(fd, 1): 

222 line = raw_line.split("#", 1)[0].strip() 

223 if not line: 

224 continue 

225 inputs.append((File_Reference(options.inputs_from_file, 

226 line_no), 

227 line)) 

228 if not options.inputs and not options.inputs_from_file: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 inputs.append((File_Reference("<config>"), ".")) 

230 # Sanity check and search directories 

231 work_list = [] 

232 ok = True 

233 for location, item in inputs: 

234 if os.path.isfile(item): 

235 if os.path.splitext(item)[1] not in self.extensions: 235 ↛ 238line 235 didn't jump to line 238 because the condition on line 235 was always true

236 self.mh.warning(location, 

237 f"not a {' or '.join(self.extensions)} file") 

238 work_list.append(item) 

239 

240 elif os.path.isdir(item): 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 work_list.extend(self._collect_files_from_directory(item)) 

242 

243 else: 

244 self.mh.error(location, 

245 f"{item} is not a file or directory", 

246 fatal = False) 

247 ok = False 

248 

249 if not ok: 

250 sys.exit(1) 

251 

252 work_list.sort() 

253 

254 return work_list 

255 

256 def write_output( 

257 self, 

258 options: argparse.Namespace, 

259 items: List[Union[Activity, Implementation, Requirement]], 

260 ): 

261 assert isinstance(options, argparse.Namespace) 

262 assert isinstance(items, list) 

263 assert all(isinstance(item, (Requirement, 

264 Implementation, 

265 Activity)) 

266 for item in items) 

267 

268 if options.out: 

269 ensure_output_directory(options.out) 

270 with open(options.out, "w", encoding="UTF-8") as fd: 

271 lobster_write(fd, self.schema, self.name, items) 

272 print(f"{self.name}: wrote {len(items)} items to {options.out}") 

273 else: 

274 lobster_write(sys.stdout, self.schema, self.name, items) 

275 

276 @abstractmethod 

277 def process_tool_options( 

278 self, 

279 options: argparse.Namespace, 

280 work_list: List[Tuple[File_Reference, str]], 

281 ): 

282 assert isinstance(options, argparse.Namespace) 

283 assert isinstance(work_list, list) 

284 

285 

286class LOBSTER_Per_File_Tool(LOBSTER_Tool): 

287 def __init__(self, name, description, extensions, official=False): 

288 super().__init__(name, description, extensions, official) 

289 

290 @classmethod 

291 @abstractmethod 

292 def process( 

293 cls, 

294 options, 

295 file_name, 

296 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]: 

297 pass 

298 

299 def _run_impl(self, options: argparse.Namespace) -> int: 

300 work_list = self.process_commandline_and_yaml_options(options) 

301 

302 ok = True 

303 items = [] 

304 pfun = partial(self.process, options) 

305 

306 if options.single: 

307 for file_name in work_list: 

308 new_ok, new_items = pfun(file_name) 

309 ok &= new_ok 

310 items += new_items 

311 else: 

312 with multiprocessing.Pool() as pool: 

313 for new_ok, new_items in pool.imap(pfun, work_list, 4): 

314 ok &= new_ok 

315 items += new_items 

316 pool.close() 

317 pool.join() 

318 

319 if ok: 

320 self.write_output(options, items) 

321 else: 

322 print(f"{self.name}: aborting due to earlier errors") 

323 

324 return int(not ok)