Coverage for lobster/common/tool.py: 89%
150 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-20 08:15 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-20 08:15 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023-2026 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import sys
22import argparse
23import multiprocessing
25from abc import ABCMeta, abstractmethod
26from functools import partial
27from typing import Iterable, List, Sequence, Union, Tuple, Set, Dict
28import yaml
29from lobster.common.errors import Message_Handler
30from lobster.common.location import File_Reference
31from lobster.common.items import Requirement, Implementation, Activity
32from lobster.common.io import lobster_write, ensure_output_directory
33from lobster.common.meta_data_tool_base import MetaDataToolBase
36class SupportedCommonConfigKeys:
37 """Helper class to define supported configuration keys."""
38 INPUTS_FROM_FILE = "inputs_from_file"
39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs"
40 INPUTS = "inputs"
42 @classmethod
43 def get_config_keys_manual(cls) -> Dict[str, str]:
44 help_dict = {
45 cls.INPUTS_FROM_FILE : "Read input files or directories from this "
46 "file. Each non-empty line is interpreted as "
47 "one input. Supports comments starting with #.",
48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded "
49 "by default.",
50 cls.INPUTS : "List of files to process or directories to "
51 "search for relevant input files.",
52 }
53 return help_dict
55 @abstractmethod
56 def get_mandatory_parameters(self) -> Set[str]:
57 """Return a set of config file parameters that are mandatory"""
59 def get_formatted_help_text(self):
60 help_dict = self.get_config_keys_manual()
61 help_text = ''
62 max_length = max(len(config_key) for config_key in help_dict)
63 for config_key, help_info in help_dict.items():
64 spaces = " " * (max_length - len(config_key))
65 help_text += f"\n{config_key} {spaces}- {help_info}"
66 return help_text
68 @classmethod
69 def get_config_keys_as_set(cls) -> Set[str]:
70 return set(cls.get_config_keys_manual().keys())
73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta):
74 def __init__(
75 self,
76 name: str,
77 description: str,
78 extensions: Iterable[str],
79 official: bool,
80 ):
81 super().__init__(
82 name=name,
83 description=description,
84 official=official,
85 )
86 assert isinstance(extensions, (list, set, frozenset, tuple))
87 assert all(isinstance(extension, str)
88 for extension in extensions)
90 self.extensions = [f".{extension}" for extension in (sorted(extensions))]
91 self.exclude_pat = []
92 self.schema = None
93 self.mh = Message_Handler()
94 self.config = {}
96 self._argument_parser.add_argument(
97 "--out",
98 default = f'{self.name}.lobster',
99 help = "Write output to the given file instead of stdout."
100 )
101 self._argument_parser.add_argument(
102 "--config",
103 default=None,
104 help=(f"Path to YAML file with arguments as below,"
105 f"{self.get_formatted_help_text()}"),
106 required=True
107 )
109 def load_yaml_config(self, config_path):
110 """
111 Loads configuration from a YAML file.
112 Parameters
113 ----------
114 config_path - Yaml config file path.
116 Returns
117 -------
118 data - Returns the Yaml file contents in dictionary format.
119 """
120 if not config_path: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 return {}
122 if not os.path.isfile(config_path):
123 sys.exit(f"Error: Config file '{config_path}' not found.")
124 with open(config_path, "r", encoding="UTF-8") as f:
125 data = yaml.safe_load(f) or {}
126 return data
128 def validate_yaml_supported_config_parameters(self, config):
129 """
130 Function to check if the parameters mentioned in the Yaml config are
131 supported by the tool in execution
132 Parameters
133 ----------
134 data - Yaml config file contents
136 Returns
137 -------
138 Nothing
139 """
140 if config:
141 supported_keys = self.get_config_keys_as_set()
142 unsupported_keys = set(config.keys()) - supported_keys
143 if unsupported_keys: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 raise KeyError(
145 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
146 f"Supported keys are: {', '.join(supported_keys)}."
147 )
149 def check_mandatory_config_parameters(self, config):
150 """
151 Function to check if the mandatory parameters are provided in the config file
152 Parameters
153 ----------
154 data - Yaml config file contents
156 Returns
157 -------
158 Nothing
159 """
160 if self.get_mandatory_parameters(): 160 ↛ exitline 160 didn't return from function 'check_mandatory_config_parameters' because the condition on line 160 was always true
161 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys())
162 if mandatory_parameters:
163 sys.exit(f"Required mandatory parameters missing - "
164 f"{','.join(mandatory_parameters)}")
166 def process_commandline_and_yaml_options(
167 self,
168 options: argparse.Namespace,
169 ) -> List[Tuple[File_Reference, str]]:
170 """Processes all command line options"""
172 self.config = self.load_yaml_config(options.config)
173 self.validate_yaml_supported_config_parameters(self.config)
174 self.check_mandatory_config_parameters(self.config)
175 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None)
176 options.inputs = self.config.get(self.INPUTS, [])
177 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False)
178 work_list = self.process_common_options(options)
179 self.process_tool_options(options, work_list)
180 return work_list
182 def _is_excluded_dir(self, dir_name: str) -> bool:
183 return any(pattern.match(dir_name) for pattern in self.exclude_pat)
185 def _filter_walk_dirs(self, dirs: List[str]) -> None:
186 dirs[:] = [dir_name for dir_name in dirs
187 if not self._is_excluded_dir(dir_name)]
189 def _collect_files_from_directory(self, directory: str) -> List[str]:
190 work_items = []
191 for path, dirs, files in os.walk(directory):
192 self._filter_walk_dirs(dirs)
193 for file_name in files:
194 if os.path.splitext(file_name)[1] in self.extensions:
195 work_items.append(os.path.join(path, file_name))
196 return work_items
198 def process_common_options(
199 self,
200 options: argparse.Namespace,
201 ) -> List[Tuple[File_Reference, str]]:
202 """Generates the exact list of files to work on later. The list is sorted
203 alphabetically."""
204 # Sanity check output
205 if options.out and \ 205 ↛ 208line 205 didn't jump to line 208 because the condition on line 205 was never true
206 os.path.exists(options.out) and \
207 not os.path.isfile(options.out):
208 self._argument_parser.error(
209 f"output {options.out} already exists and is not a file",
210 )
212 # Assemble input requests
213 inputs = []
214 if options.inputs:
215 inputs += [(File_Reference("<config>"), item)
216 for item in options.inputs]
217 if options.inputs_from_file:
218 if not os.path.isfile(options.inputs_from_file): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 self._argument_parser.error(f"cannot open {options.inputs_from_file}")
220 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd:
221 for line_no, raw_line in enumerate(fd, 1):
222 line = raw_line.split("#", 1)[0].strip()
223 if not line: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true
224 continue
225 inputs.append((File_Reference(options.inputs_from_file,
226 line_no),
227 line))
228 if not options.inputs and not options.inputs_from_file:
229 inputs.append((File_Reference("<config>"), "."))
230 # Sanity check and search directories
231 work_list = []
232 ok = True
233 for location, item in inputs:
234 if os.path.isfile(item):
235 if os.path.splitext(item)[1] not in self.extensions:
236 self.mh.warning(location,
237 f"not a {' or '.join(self.extensions)} file")
238 work_list.append(item)
240 elif os.path.isdir(item):
241 work_list.extend(self._collect_files_from_directory(item))
243 else:
244 self.mh.error(location,
245 f"{item} is not a file or directory",
246 fatal = False)
247 ok = False
249 if not ok:
250 sys.exit(1)
252 work_list.sort()
254 return work_list
256 def write_output(
257 self,
258 options: argparse.Namespace,
259 items: List[Union[Activity, Implementation, Requirement]],
260 ):
261 assert isinstance(options, argparse.Namespace)
262 assert isinstance(items, list)
263 assert all(isinstance(item, (Requirement,
264 Implementation,
265 Activity))
266 for item in items)
268 if options.out: 268 ↛ 274line 268 didn't jump to line 274 because the condition on line 268 was always true
269 ensure_output_directory(options.out)
270 with open(options.out, "w", encoding="UTF-8") as fd:
271 lobster_write(fd, self.schema, self.name, items)
272 print(f"{self.name}: wrote {len(items)} items to {options.out}")
273 else:
274 lobster_write(sys.stdout, self.schema, self.name, items)
276 @abstractmethod
277 def process_tool_options(
278 self,
279 options: argparse.Namespace,
280 work_list: List[Tuple[File_Reference, str]],
281 ):
282 assert isinstance(options, argparse.Namespace)
283 assert isinstance(work_list, list)
286class LOBSTER_Per_File_Tool(LOBSTER_Tool):
287 def __init__(self, name, description, extensions, official=False):
288 super().__init__(name, description, extensions, official)
290 @classmethod
291 @abstractmethod
292 def process(
293 cls,
294 options,
295 file_name,
296 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]:
297 pass
299 def _run_impl(self, options: argparse.Namespace) -> int:
300 work_list = self.process_commandline_and_yaml_options(options)
302 ok = True
303 items = []
304 pfun = partial(self.process, options)
306 if options.single: 306 ↛ 312line 306 didn't jump to line 312 because the condition on line 306 was always true
307 for file_name in work_list:
308 new_ok, new_items = pfun(file_name)
309 ok &= new_ok
310 items += new_items
311 else:
312 with multiprocessing.Pool() as pool:
313 for new_ok, new_items in pool.imap(pfun, work_list, 4):
314 ok &= new_ok
315 items += new_items
316 pool.close()
317 pool.join()
319 if ok:
320 self.write_output(options, items)
321 else:
322 print(f"{self.name}: aborting due to earlier errors")
324 return int(not ok)