Coverage for lobster/common/tool.py: 53%

149 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-01-09 10:06 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os 

21import sys 

22import argparse 

23import multiprocessing 

24 

25from abc import ABCMeta, abstractmethod 

26from functools import partial 

27from typing import Iterable, List, Sequence, Union, Tuple, Set, Dict 

28import yaml 

29from lobster.common.errors import Message_Handler 

30from lobster.common.location import File_Reference 

31from lobster.common.items import Requirement, Implementation, Activity 

32from lobster.common.io import lobster_write, ensure_output_directory 

33from lobster.common.meta_data_tool_base import MetaDataToolBase 

34 

35 

36class SupportedCommonConfigKeys: 

37 """Helper class to define supported configuration keys.""" 

38 INPUTS_FROM_FILE = "inputs_from_file" 

39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs" 

40 INPUTS = "inputs" 

41 

42 @classmethod 

43 def get_config_keys_manual(cls) -> Dict[str, str]: 

44 help_dict = { 

45 cls.INPUTS_FROM_FILE : "Read input files or directories from this " 

46 "file. Each non-empty line is interpreted as " 

47 "one input. Supports comments starting with #.", 

48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded " 

49 "by default.", 

50 cls.INPUTS : "List of files to process or directories to " 

51 "search for relevant input files.", 

52 } 

53 return help_dict 

54 

55 @abstractmethod 

56 def get_mandatory_parameters(self) -> Set[str]: 

57 """Return a set of config file parameters that are mandatory""" 

58 

59 def get_formatted_help_text(self): 

60 help_dict = self.get_config_keys_manual() 

61 help_text = '' 

62 max_length = max(len(config_key) for config_key in help_dict) 

63 for config_key, help_info in help_dict.items(): 

64 spaces = " " * (max_length - len(config_key)) 

65 help_text += f"\n{config_key} {spaces}- {help_info}" 

66 return help_text 

67 

68 @classmethod 

69 def get_config_keys_as_set(cls) -> Set[str]: 

70 return set(cls.get_config_keys_manual().keys()) 

71 

72 

73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta): 

74 def __init__( 

75 self, 

76 name: str, 

77 description: str, 

78 extensions: Iterable[str], 

79 official: bool, 

80 ): 

81 super().__init__( 

82 name=name, 

83 description=description, 

84 official=official, 

85 ) 

86 assert isinstance(extensions, (list, set, frozenset, tuple)) 

87 assert all(isinstance(extension, str) 

88 for extension in extensions) 

89 

90 self.extensions = [f".{extension}" for extension in (sorted(extensions))] 

91 self.exclude_pat = [] 

92 self.schema = None 

93 self.mh = Message_Handler() 

94 self.config = {} 

95 

96 self._argument_parser.add_argument( 

97 "--out", 

98 default = f'{self.name}.lobster', 

99 help = "Write output to the given file instead of stdout." 

100 ) 

101 self._argument_parser.add_argument( 

102 "--config", 

103 default=None, 

104 help=(f"Path to YAML file with arguments as below," 

105 f"{self.get_formatted_help_text()}"), 

106 required=True 

107 ) 

108 

109 def load_yaml_config(self, config_path): 

110 """ 

111 Loads configuration from a YAML file. 

112 Parameters 

113 ---------- 

114 config_path - Yaml config file path. 

115 

116 Returns 

117 ------- 

118 data - Returns the Yaml file contents in dictionary format. 

119 """ 

120 if not config_path: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return {} 

122 if not os.path.isfile(config_path): 

123 sys.exit(f"Error: Config file '{config_path}' not found.") 

124 with open(config_path, "r", encoding="UTF-8") as f: 

125 data = yaml.safe_load(f) or {} 

126 return data 

127 

128 def validate_yaml_supported_config_parameters(self, config): 

129 """ 

130 Function to check if the parameters mentioned in the Yaml config are 

131 supported by the tool in execution 

132 Parameters 

133 ---------- 

134 data - Yaml config file contents 

135 

136 Returns 

137 ------- 

138 Nothing 

139 """ 

140 if config: 140 ↛ exitline 140 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 140 was always true

141 supported_keys = self.get_config_keys_as_set() 

142 unsupported_keys = set(config.keys()) - supported_keys 

143 if unsupported_keys: 143 ↛ exitline 143 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 143 was always true

144 raise KeyError( 

145 f"Unsupported config keys: {', '.join(unsupported_keys)}. " 

146 f"Supported keys are: {', '.join(supported_keys)}." 

147 ) 

148 

149 def check_mandatory_config_parameters(self, config): 

150 """ 

151 Function to check if the mandatory parameters are provided in the config file 

152 Parameters 

153 ---------- 

154 data - Yaml config file contents 

155 

156 Returns 

157 ------- 

158 Nothing 

159 """ 

160 if self.get_mandatory_parameters(): 160 ↛ exitline 160 didn't return from function 'check_mandatory_config_parameters' because the condition on line 160 was always true

161 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys()) 

162 if mandatory_parameters: 162 ↛ exitline 162 didn't return from function 'check_mandatory_config_parameters' because the condition on line 162 was always true

163 sys.exit(f"Required mandatory parameters missing - " 

164 f"{','.join(mandatory_parameters)}") 

165 

166 def process_commandline_and_yaml_options( 

167 self, 

168 options: argparse.Namespace, 

169 ) -> List[Tuple[File_Reference, str]]: 

170 """Processes all command line options""" 

171 

172 self.config = self.load_yaml_config(options.config) 

173 self.validate_yaml_supported_config_parameters(self.config) 

174 self.check_mandatory_config_parameters(self.config) 

175 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None) 

176 options.inputs = self.config.get(self.INPUTS, []) 

177 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False) 

178 work_list = self.process_common_options(options) 

179 self.process_tool_options(options, work_list) 

180 return work_list 

181 

182 def process_common_options( 

183 self, 

184 options: argparse.Namespace, 

185 ) -> List[Tuple[File_Reference, str]]: 

186 """Generates the exact list of files to work on later. The list is sorted 

187 alphabetically.""" 

188 # Sanity check output 

189 if options.out and \ 

190 os.path.exists(options.out) and \ 

191 not os.path.isfile(options.out): 

192 self._argument_parser.error( 

193 f"output {options.out} already exists and is not a file", 

194 ) 

195 

196 # Assemble input requests 

197 inputs = [] 

198 if options.inputs: 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true

199 inputs += [(File_Reference("<config>"), item) 

200 for item in options.inputs] 

201 if options.inputs_from_file: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 if not os.path.isfile(options.inputs_from_file): 

203 self._argument_parser.error(f"cannot open {options.inputs_from_file}") 

204 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd: 

205 for line_no, raw_line in enumerate(fd, 1): 

206 line = raw_line.split("#", 1)[0].strip() 

207 if not line: 

208 continue 

209 inputs.append((File_Reference(options.inputs_from_file, 

210 line_no), 

211 line)) 

212 if not options.inputs and not options.inputs_from_file: 212 ↛ 213line 212 didn't jump to line 213 because the condition on line 212 was never true

213 inputs.append((File_Reference("<config>"), ".")) 

214 # Sanity check and search directories 

215 work_list = [] 

216 ok = True 

217 for location, item in inputs: 

218 if os.path.isfile(item): 

219 if os.path.splitext(item)[1] not in self.extensions: 219 ↛ 222line 219 didn't jump to line 222 because the condition on line 219 was always true

220 self.mh.warning(location, 

221 f"not a {' or '.join(self.extensions)} file") 

222 work_list.append(item) 

223 

224 elif os.path.isdir(item): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 for path, dirs, files in os.walk(item): 

226 for n, dir_name in reversed(list(enumerate(dirs))): 

227 keep = True 

228 for pattern in self.exclude_pat: 

229 if pattern.match(dir_name): 

230 keep = False 

231 break 

232 if not keep: 

233 del dirs[n] 

234 

235 for file_name in files: 

236 if os.path.splitext(file_name)[1] in self.extensions: 

237 work_list.append(os.path.join(path, file_name)) 

238 

239 else: 

240 self.mh.error(location, 

241 f"{item} is not a file or directory", 

242 fatal = False) 

243 ok = False 

244 

245 if not ok: 

246 sys.exit(1) 

247 

248 work_list.sort() 

249 

250 return work_list 

251 

252 def write_output( 

253 self, 

254 options: argparse.Namespace, 

255 items: List[Union[Activity, Implementation, Requirement]], 

256 ): 

257 assert isinstance(options, argparse.Namespace) 

258 assert isinstance(items, list) 

259 assert all(isinstance(item, (Requirement, 

260 Implementation, 

261 Activity)) 

262 for item in items) 

263 

264 if options.out: 

265 ensure_output_directory(options.out) 

266 with open(options.out, "w", encoding="UTF-8") as fd: 

267 lobster_write(fd, self.schema, self.name, items) 

268 print(f"{self.name}: wrote {len(items)} items to {options.out}") 

269 else: 

270 lobster_write(sys.stdout, self.schema, self.name, items) 

271 

272 @abstractmethod 

273 def process_tool_options( 

274 self, 

275 options: argparse.Namespace, 

276 work_list: List[Tuple[File_Reference, str]], 

277 ): 

278 assert isinstance(options, argparse.Namespace) 

279 assert isinstance(work_list, list) 

280 

281 

282class LOBSTER_Per_File_Tool(LOBSTER_Tool): 

283 def __init__(self, name, description, extensions, official=False): 

284 super().__init__(name, description, extensions, official) 

285 

286 @classmethod 

287 @abstractmethod 

288 def process( 

289 cls, 

290 options, 

291 file_name, 

292 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]: 

293 pass 

294 

295 def _run_impl(self, options: argparse.Namespace) -> int: 

296 work_list = self.process_commandline_and_yaml_options(options) 

297 

298 ok = True 

299 items = [] 

300 pfun = partial(self.process, options) 

301 

302 if options.single: 

303 for file_name in work_list: 

304 new_ok, new_items = pfun(file_name) 

305 ok &= new_ok 

306 items += new_items 

307 else: 

308 with multiprocessing.Pool() as pool: 

309 for new_ok, new_items in pool.imap(pfun, work_list, 4): 

310 ok &= new_ok 

311 items += new_items 

312 pool.close() 

313 pool.join() 

314 

315 if ok: 

316 self.write_output(options, items) 

317 else: 

318 print(f"{self.name}: aborting due to earlier errors") 

319 

320 return int(not ok)