Coverage for lobster/tool.py: 54%
158 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import sys
22import argparse
23import multiprocessing
25from abc import ABCMeta, abstractmethod
26from functools import partial
27from typing import List, Union, Tuple, Set, Dict
28import yaml
29from lobster.version import FULL_NAME, get_version
30from lobster.errors import Message_Handler
31from lobster.location import File_Reference
32from lobster.items import Requirement, Implementation, Activity
33from lobster.io import lobster_write
35BUG_URL = "https://github.com/bmw-software-engineering/lobster/issues"
38class SupportedCommonConfigKeys:
39 """Helper class to define supported configuration keys."""
40 INPUTS_FROM_FILE = "inputs_from_file"
41 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs"
42 INPUTS = "inputs"
44 @classmethod
45 def get_config_keys_manual(cls) -> Dict[str, str]:
46 help_dict = {
47 cls.INPUTS_FROM_FILE : "Read input files or directories from this "
48 "file. Each non-empty line is interpreted as "
49 "one input. Supports comments starting with #.",
50 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded "
51 "by default.",
52 cls.INPUTS : "List of files to process or directories to "
53 "search for relevant input files.",
54 }
55 return help_dict
57 @abstractmethod
58 def get_mandatory_parameters(self) -> Set[str]:
59 """Return a set of config file parameters that are mandatory"""
61 def get_formatted_help_text(self):
62 help_dict = self.get_config_keys_manual()
63 help_text = ''
64 max_length = max(len(config_key) for config_key in help_dict)
65 for config_key, help_info in help_dict.items():
66 spaces = " " * (max_length - len(config_key))
67 help_text += f"\n{config_key} {spaces}- {help_info}"
68 return help_text
70 @classmethod
71 def get_config_keys_as_set(cls) -> Set[str]:
72 return set(cls.get_config_keys_manual().keys())
75class LOBSTER_Tool(SupportedCommonConfigKeys, metaclass=ABCMeta):
76 def __init__(self, name, description, extensions, official):
77 assert isinstance(name, str)
78 assert isinstance(description, str)
79 assert isinstance(extensions, (list, set, frozenset, tuple))
80 assert all(isinstance(extension, str)
81 for extension in extensions)
82 assert isinstance(official, bool)
84 self.name = "lobster-%s" % name
85 self.description = description
86 self.extensions = [".%s" % extension
87 for extension in sorted(extensions)]
88 self.exclude_pat = []
89 self.schema = None
90 self.mh = Message_Handler()
91 self.config = {}
93 self.ap = argparse.ArgumentParser(
94 prog = self.name,
95 description = description,
96 epilog = ("Part of %s, licensed under the AGPLv3."
97 " Please report bugs to %s." %
98 (FULL_NAME, BUG_URL)
99 if official else None),
100 allow_abbrev = False,
101 formatter_class=argparse.RawTextHelpFormatter
102 )
104 self.ap.add_argument(
105 "--out",
106 default = f'{self.name}.lobster',
107 help = "Write output to the given file instead of stdout."
108 )
109 self.ap.add_argument(
110 "--config",
111 default=None,
112 help=(f"Path to YAML file with arguments as below,"
113 f"{self.get_formatted_help_text()}"),
114 required=True
115 )
117 def load_yaml_config(self, config_path):
118 """
119 Loads configuration from a YAML file.
120 Parameters
121 ----------
122 config_path - Yaml config file path.
124 Returns
125 -------
126 data - Returns the Yaml file contents in dictionary format.
127 """
128 if not config_path: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 return {}
130 if not os.path.isfile(config_path):
131 sys.exit(f"Error: Config file '{config_path}' not found.")
132 with open(config_path, "r", encoding="UTF-8") as f:
133 data = yaml.safe_load(f) or {}
134 return data
136 def validate_yaml_supported_config_parameters(self, config):
137 """
138 Function to check if the parameters mentioned in the Yaml config are
139 supported by the tool in execution
140 Parameters
141 ----------
142 data - Yaml config file contents
144 Returns
145 -------
146 Nothing
147 """
148 if config: 148 ↛ exitline 148 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 148 was always true
149 supported_keys = self.get_config_keys_as_set()
150 unsupported_keys = set(config.keys()) - supported_keys
151 if unsupported_keys: 151 ↛ exitline 151 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 151 was always true
152 raise KeyError(
153 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
154 f"Supported keys are: {', '.join(supported_keys)}."
155 )
157 def check_mandatory_config_parameters(self, config):
158 """
159 Function to check if the mandatory parameters are provided in the config file
160 Parameters
161 ----------
162 data - Yaml config file contents
164 Returns
165 -------
166 Nothing
167 """
168 if self.get_mandatory_parameters(): 168 ↛ exitline 168 didn't return from function 'check_mandatory_config_parameters' because the condition on line 168 was always true
169 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys())
170 if mandatory_parameters: 170 ↛ exitline 170 didn't return from function 'check_mandatory_config_parameters' because the condition on line 170 was always true
171 sys.exit(f"Required mandatory parameters missing - "
172 f"{','.join(mandatory_parameters)}")
174 @get_version
175 def process_commandline_and_yaml_options(
176 self,
177 ) -> Tuple[argparse.Namespace, List[Tuple[File_Reference, str]]]:
178 """Processes all command line options"""
180 options = self.ap.parse_args()
181 self.config = self.load_yaml_config(options.config)
182 self.validate_yaml_supported_config_parameters(self.config)
183 self.check_mandatory_config_parameters(self.config)
184 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None)
185 options.inputs = self.config.get(self.INPUTS, [])
186 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False)
187 work_list = self.process_common_options(options)
188 self.process_tool_options(options, work_list)
189 return options, work_list
191 def process_common_options(
192 self,
193 options: argparse.Namespace,
194 ) -> List[Tuple[File_Reference, str]]:
195 """Generates the exact list of files to work on later. The list is sorted
196 alphabetically."""
197 # Sanity check output
198 if options.out and \
199 os.path.exists(options.out) and \
200 not os.path.isfile(options.out):
201 self.ap.error("output %s already exists and is not a file" %
202 options.out)
204 # Assemble input requests
205 inputs = []
206 if options.inputs: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true
207 inputs += [(File_Reference("<config>"), item)
208 for item in options.inputs]
209 if options.inputs_from_file: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 if not os.path.isfile(options.inputs_from_file):
211 self.ap.error("cannot open %s" % options.inputs_from_file)
212 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd:
213 for line_no, raw_line in enumerate(fd, 1):
214 line = raw_line.split("#", 1)[0].strip()
215 if not line:
216 continue
217 inputs.append((File_Reference(options.inputs_from_file,
218 line_no),
219 line))
220 if not options.inputs and not options.inputs_from_file: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 inputs.append((File_Reference("<config>"), "."))
222 # Sanity check and search directories
223 work_list = []
224 ok = True
225 for location, item in inputs:
226 if os.path.isfile(item):
227 if os.path.splitext(item)[1] not in self.extensions: 227 ↛ 231line 227 didn't jump to line 231 because the condition on line 227 was always true
228 self.mh.warning(location,
229 "not a %s file" %
230 " or ".join(self.extensions))
231 work_list.append(item)
233 elif os.path.isdir(item): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 for path, dirs, files in os.walk(item):
235 for n, dir_name in reversed(list(enumerate(dirs))):
236 keep = True
237 for pattern in self.exclude_pat:
238 if pattern.match(dir_name):
239 keep = False
240 break
241 if not keep:
242 del dirs[n]
244 for file_name in files:
245 if os.path.splitext(file_name)[1] in self.extensions:
246 work_list.append(os.path.join(path, file_name))
248 else:
249 self.mh.error(location,
250 "%s is not a file or directory" % item,
251 fatal = False)
252 ok = False
254 if not ok:
255 sys.exit(1)
257 work_list.sort()
259 return work_list
261 def write_output(
262 self,
263 ok: bool,
264 options: argparse.Namespace,
265 items: List[Union[Activity, Implementation, Requirement]],
266 ):
267 assert isinstance(ok, bool)
268 assert isinstance(options, argparse.Namespace)
269 assert isinstance(items, list)
270 assert all(isinstance(item, (Requirement,
271 Implementation,
272 Activity))
273 for item in items)
275 if ok:
276 if options.out:
277 with open(options.out, "w", encoding="UTF-8") as fd:
278 lobster_write(fd, self.schema, self.name, items)
279 print("%s: wrote %u items to %s" % (self.name,
280 len(items),
281 options.out))
282 else:
283 lobster_write(sys.stdout, self.schema, self.name, items)
284 return 0
286 else:
287 print("%s: aborting due to earlier errors" % self.name)
288 return 1
290 @abstractmethod
291 def process_tool_options(
292 self,
293 options: argparse.Namespace,
294 work_list: List[Tuple[File_Reference, str]],
295 ):
296 assert isinstance(options, argparse.Namespace)
297 assert isinstance(work_list, list)
299 @abstractmethod
300 def execute(self):
301 pass
304class LOBSTER_Per_File_Tool(LOBSTER_Tool):
305 def __init__(self, name, description, extensions, official=False):
306 super().__init__(name, description, extensions, official)
308 @classmethod
309 @abstractmethod
310 def process(
311 cls,
312 options,
313 file_name,
314 ) -> Tuple[bool, List[Union[Activity, Implementation, Requirement]]]:
315 pass
317 def execute(self):
318 options, work_list = self.process_commandline_and_yaml_options()
320 ok = True
321 items = []
322 pfun = partial(self.process, options)
324 if options.single:
325 for file_name in work_list:
326 new_ok, new_items = pfun(file_name)
327 ok &= new_ok
328 items += new_items
329 else:
330 with multiprocessing.Pool() as pool:
331 for new_ok, new_items in pool.imap(pfun, work_list, 4):
332 ok &= new_ok
333 items += new_items
334 pool.close()
335 pool.join()
337 return self.write_output(ok, options, items)