Coverage for lobster/common/tool.py: 85%

148 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-27 13:02 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os 

21import sys 

22import argparse 

23import multiprocessing 

24 

25from abc import ABCMeta, abstractmethod 

26from functools import partial 

27from typing import Iterable, List, Sequence, Union, Tuple, Set, Dict 

28import yaml 

29from lobster.common.errors import Message_Handler 

30from lobster.common.location import File_Reference 

31from lobster.common.items import Requirement, Implementation, Activity 

32from lobster.common.io import lobster_write 

33from lobster.common.meta_data_tool_base import MetaDataToolBase 

34 

35 

36class SupportedCommonConfigKeys: 

37 """Helper class to define supported configuration keys.""" 

38 INPUTS_FROM_FILE = "inputs_from_file" 

39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs" 

40 INPUTS = "inputs" 

41 

42 @classmethod 

43 def get_config_keys_manual(cls) -> Dict[str, str]: 

44 help_dict = { 

45 cls.INPUTS_FROM_FILE : "Read input files or directories from this " 

46 "file. Each non-empty line is interpreted as " 

47 "one input. Supports comments starting with #.", 

48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded " 

49 "by default.", 

50 cls.INPUTS : "List of files to process or directories to " 

51 "search for relevant input files.", 

52 } 

53 return help_dict 

54 

55 @abstractmethod 

56 def get_mandatory_parameters(self) -> Set[str]: 

57 """Return a set of config file parameters that are mandatory""" 

58 

59 def get_formatted_help_text(self): 

60 help_dict = self.get_config_keys_manual() 

61 help_text = '' 

62 max_length = max(len(config_key) for config_key in help_dict) 

63 for config_key, help_info in help_dict.items(): 

64 spaces = " " * (max_length - len(config_key)) 

65 help_text += f"\n{config_key} {spaces}- {help_info}" 

66 return help_text 

67 

68 @classmethod 

69 def get_config_keys_as_set(cls) -> Set[str]: 

70 return set(cls.get_config_keys_manual().keys()) 

71 

72 

73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta): 

74 def __init__( 

75 self, 

76 name: str, 

77 description: str, 

78 extensions: Iterable[str], 

79 official: bool, 

80 ): 

81 super().__init__( 

82 name=name, 

83 description=description, 

84 official=official, 

85 ) 

86 assert isinstance(extensions, (list, set, frozenset, tuple)) 

87 assert all(isinstance(extension, str) 

88 for extension in extensions) 

89 

90 self.extensions = [f".{extension}" for extension in (sorted(extensions))] 

91 self.exclude_pat = [] 

92 self.schema = None 

93 self.mh = Message_Handler() 

94 self.config = {} 

95 

96 self._argument_parser.add_argument( 

97 "--out", 

98 default = f'{self.name}.lobster', 

99 help = "Write output to the given file instead of stdout." 

100 ) 

101 self._argument_parser.add_argument( 

102 "--config", 

103 default=None, 

104 help=(f"Path to YAML file with arguments as below," 

105 f"{self.get_formatted_help_text()}"), 

106 required=True 

107 ) 

108 

109 def load_yaml_config(self, config_path): 

110 """ 

111 Loads configuration from a YAML file. 

112 Parameters 

113 ---------- 

114 config_path - Yaml config file path. 

115 

116 Returns 

117 ------- 

118 data - Returns the Yaml file contents in dictionary format. 

119 """ 

120 if not config_path: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return {} 

122 if not os.path.isfile(config_path): 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 sys.exit(f"Error: Config file '{config_path}' not found.") 

124 with open(config_path, "r", encoding="UTF-8") as f: 

125 data = yaml.safe_load(f) or {} 

126 return data 

127 

128 def validate_yaml_supported_config_parameters(self, config): 

129 """ 

130 Function to check if the parameters mentioned in the Yaml config are 

131 supported by the tool in execution 

132 Parameters 

133 ---------- 

134 data - Yaml config file contents 

135 

136 Returns 

137 ------- 

138 Nothing 

139 """ 

140 if config: 

141 supported_keys = self.get_config_keys_as_set() 

142 unsupported_keys = set(config.keys()) - supported_keys 

143 if unsupported_keys: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 raise KeyError( 

145 f"Unsupported config keys: {', '.join(unsupported_keys)}. " 

146 f"Supported keys are: {', '.join(supported_keys)}." 

147 ) 

148 

149 def check_mandatory_config_parameters(self, config): 

150 """ 

151 Function to check if the mandatory parameters are provided in the config file 

152 Parameters 

153 ---------- 

154 data - Yaml config file contents 

155 

156 Returns 

157 ------- 

158 Nothing 

159 """ 

160 if self.get_mandatory_parameters(): 160 ↛ exitline 160 didn't return from function 'check_mandatory_config_parameters' because the condition on line 160 was always true

161 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys()) 

162 if mandatory_parameters: 

163 sys.exit(f"Required mandatory parameters missing - " 

164 f"{','.join(mandatory_parameters)}") 

165 

166 def process_commandline_and_yaml_options( 

167 self, 

168 options: argparse.Namespace, 

169 ) -> List[Tuple[File_Reference, str]]: 

170 """Processes all command line options""" 

171 

172 self.config = self.load_yaml_config(options.config) 

173 self.validate_yaml_supported_config_parameters(self.config) 

174 self.check_mandatory_config_parameters(self.config) 

175 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None) 

176 options.inputs = self.config.get(self.INPUTS, []) 

177 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False) 

178 work_list = self.process_common_options(options) 

179 self.process_tool_options(options, work_list) 

180 return work_list 

181 

182 def process_common_options( 

183 self, 

184 options: argparse.Namespace, 

185 ) -> List[Tuple[File_Reference, str]]: 

186 """Generates the exact list of files to work on later. The list is sorted 

187 alphabetically.""" 

188 # Sanity check output 

189 if options.out and \ 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was never true

190 os.path.exists(options.out) and \ 

191 not os.path.isfile(options.out): 

192 self._argument_parser.error( 

193 f"output {options.out} already exists and is not a file", 

194 ) 

195 

196 # Assemble input requests 

197 inputs = [] 

198 if options.inputs: 

199 inputs += [(File_Reference("<config>"), item) 

200 for item in options.inputs] 

201 if options.inputs_from_file: 

202 if not os.path.isfile(options.inputs_from_file): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 self._argument_parser.error(f"cannot open {options.inputs_from_file}") 

204 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd: 

205 for line_no, raw_line in enumerate(fd, 1): 

206 line = raw_line.split("#", 1)[0].strip() 

207 if not line: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 continue 

209 inputs.append((File_Reference(options.inputs_from_file, 

210 line_no), 

211 line)) 

212 if not options.inputs and not options.inputs_from_file: 

213 inputs.append((File_Reference("<config>"), ".")) 

214 # Sanity check and search directories 

215 work_list = [] 

216 ok = True 

217 for location, item in inputs: 

218 if os.path.isfile(item): 

219 if os.path.splitext(item)[1] not in self.extensions: 

220 self.mh.warning(location, 

221 f"not a {' or '.join(self.extensions)} file") 

222 work_list.append(item) 

223 

224 elif os.path.isdir(item): 

225 for path, dirs, files in os.walk(item): 

226 for n, dir_name in reversed(list(enumerate(dirs))): 

227 keep = True 

228 for pattern in self.exclude_pat: 228 ↛ 229line 228 didn't jump to line 229 because the loop on line 228 never started

229 if pattern.match(dir_name): 

230 keep = False 

231 break 

232 if not keep: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true

233 del dirs[n] 

234 

235 for file_name in files: 

236 if os.path.splitext(file_name)[1] in self.extensions: 

237 work_list.append(os.path.join(path, file_name)) 

238 

239 else: 

240 self.mh.error(location, 

241 f"{item} is not a file or directory", 

242 fatal = False) 

243 ok = False 

244 

245 if not ok: 

246 sys.exit(1) 

247 

248 work_list.sort() 

249 

250 return work_list 

251 

252 def write_output( 

253 self, 

254 options: argparse.Namespace, 

255 items: List[Union[Activity, Implementation, Requirement]], 

256 ): 

257 assert isinstance(options, argparse.Namespace) 

258 assert isinstance(items, list) 

259 assert all(isinstance(item, (Requirement, 

260 Implementation, 

261 Activity)) 

262 for item in items) 

263 

264 if options.out: 264 ↛ 269line 264 didn't jump to line 269 because the condition on line 264 was always true

265 with open(options.out, "w", encoding="UTF-8") as fd: 

266 lobster_write(fd, self.schema, self.name, items) 

267 print(f"{self.name}: wrote {len(items)} items to {options.out}") 

268 else: 

269 lobster_write(sys.stdout, self.schema, self.name, items) 

270 

271 @abstractmethod 

272 def process_tool_options( 

273 self, 

274 options: argparse.Namespace, 

275 work_list: List[Tuple[File_Reference, str]], 

276 ): 

277 assert isinstance(options, argparse.Namespace) 

278 assert isinstance(work_list, list) 

279 

280 

281class LOBSTER_Per_File_Tool(LOBSTER_Tool): 

282 def __init__(self, name, description, extensions, official=False): 

283 super().__init__(name, description, extensions, official) 

284 

285 @classmethod 

286 @abstractmethod 

287 def process( 

288 cls, 

289 options, 

290 file_name, 

291 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]: 

292 pass 

293 

294 def _run_impl(self, options: argparse.Namespace) -> int: 

295 work_list = self.process_commandline_and_yaml_options(options) 

296 

297 ok = True 

298 items = [] 

299 pfun = partial(self.process, options) 

300 

301 if options.single: 301 ↛ 307line 301 didn't jump to line 307 because the condition on line 301 was always true

302 for file_name in work_list: 

303 new_ok, new_items = pfun(file_name) 

304 ok &= new_ok 

305 items += new_items 

306 else: 

307 with multiprocessing.Pool() as pool: 

308 for new_ok, new_items in pool.imap(pfun, work_list, 4): 

309 ok &= new_ok 

310 items += new_items 

311 pool.close() 

312 pool.join() 

313 

314 if ok: 

315 self.write_output(options, items) 

316 else: 

317 print(f"{self.name}: aborting due to earlier errors") 

318 

319 return int(not ok)