Coverage for lobster/common/tool.py: 86%
149 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import sys
22import argparse
23import multiprocessing
25from abc import ABCMeta, abstractmethod
26from functools import partial
27from typing import Iterable, List, Sequence, Union, Tuple, Set, Dict
28import yaml
29from lobster.common.errors import Message_Handler
30from lobster.common.location import File_Reference
31from lobster.common.items import Requirement, Implementation, Activity
32from lobster.common.io import lobster_write, ensure_output_directory
33from lobster.common.meta_data_tool_base import MetaDataToolBase
36class SupportedCommonConfigKeys:
37 """Helper class to define supported configuration keys."""
38 INPUTS_FROM_FILE = "inputs_from_file"
39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs"
40 INPUTS = "inputs"
42 @classmethod
43 def get_config_keys_manual(cls) -> Dict[str, str]:
44 help_dict = {
45 cls.INPUTS_FROM_FILE : "Read input files or directories from this "
46 "file. Each non-empty line is interpreted as "
47 "one input. Supports comments starting with #.",
48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded "
49 "by default.",
50 cls.INPUTS : "List of files to process or directories to "
51 "search for relevant input files.",
52 }
53 return help_dict
55 @abstractmethod
56 def get_mandatory_parameters(self) -> Set[str]:
57 """Return a set of config file parameters that are mandatory"""
59 def get_formatted_help_text(self):
60 help_dict = self.get_config_keys_manual()
61 help_text = ''
62 max_length = max(len(config_key) for config_key in help_dict)
63 for config_key, help_info in help_dict.items():
64 spaces = " " * (max_length - len(config_key))
65 help_text += f"\n{config_key} {spaces}- {help_info}"
66 return help_text
68 @classmethod
69 def get_config_keys_as_set(cls) -> Set[str]:
70 return set(cls.get_config_keys_manual().keys())
73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta):
74 def __init__(
75 self,
76 name: str,
77 description: str,
78 extensions: Iterable[str],
79 official: bool,
80 ):
81 super().__init__(
82 name=name,
83 description=description,
84 official=official,
85 )
86 assert isinstance(extensions, (list, set, frozenset, tuple))
87 assert all(isinstance(extension, str)
88 for extension in extensions)
90 self.extensions = [f".{extension}" for extension in (sorted(extensions))]
91 self.exclude_pat = []
92 self.schema = None
93 self.mh = Message_Handler()
94 self.config = {}
96 self._argument_parser.add_argument(
97 "--out",
98 default = f'{self.name}.lobster',
99 help = "Write output to the given file instead of stdout."
100 )
101 self._argument_parser.add_argument(
102 "--config",
103 default=None,
104 help=(f"Path to YAML file with arguments as below,"
105 f"{self.get_formatted_help_text()}"),
106 required=True
107 )
109 def load_yaml_config(self, config_path):
110 """
111 Loads configuration from a YAML file.
112 Parameters
113 ----------
114 config_path - Yaml config file path.
116 Returns
117 -------
118 data - Returns the Yaml file contents in dictionary format.
119 """
120 if not config_path: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 return {}
122 if not os.path.isfile(config_path):
123 sys.exit(f"Error: Config file '{config_path}' not found.")
124 with open(config_path, "r", encoding="UTF-8") as f:
125 data = yaml.safe_load(f) or {}
126 return data
128 def validate_yaml_supported_config_parameters(self, config):
129 """
130 Function to check if the parameters mentioned in the Yaml config are
131 supported by the tool in execution
132 Parameters
133 ----------
134 data - Yaml config file contents
136 Returns
137 -------
138 Nothing
139 """
140 if config:
141 supported_keys = self.get_config_keys_as_set()
142 unsupported_keys = set(config.keys()) - supported_keys
143 if unsupported_keys: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 raise KeyError(
145 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
146 f"Supported keys are: {', '.join(supported_keys)}."
147 )
149 def check_mandatory_config_parameters(self, config):
150 """
151 Function to check if the mandatory parameters are provided in the config file
152 Parameters
153 ----------
154 data - Yaml config file contents
156 Returns
157 -------
158 Nothing
159 """
160 if self.get_mandatory_parameters(): 160 ↛ exitline 160 didn't return from function 'check_mandatory_config_parameters' because the condition on line 160 was always true
161 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys())
162 if mandatory_parameters:
163 sys.exit(f"Required mandatory parameters missing - "
164 f"{','.join(mandatory_parameters)}")
166 def process_commandline_and_yaml_options(
167 self,
168 options: argparse.Namespace,
169 ) -> List[Tuple[File_Reference, str]]:
170 """Processes all command line options"""
172 self.config = self.load_yaml_config(options.config)
173 self.validate_yaml_supported_config_parameters(self.config)
174 self.check_mandatory_config_parameters(self.config)
175 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None)
176 options.inputs = self.config.get(self.INPUTS, [])
177 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False)
178 work_list = self.process_common_options(options)
179 self.process_tool_options(options, work_list)
180 return work_list
182 def process_common_options(
183 self,
184 options: argparse.Namespace,
185 ) -> List[Tuple[File_Reference, str]]:
186 """Generates the exact list of files to work on later. The list is sorted
187 alphabetically."""
188 # Sanity check output
189 if options.out and \ 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was never true
190 os.path.exists(options.out) and \
191 not os.path.isfile(options.out):
192 self._argument_parser.error(
193 f"output {options.out} already exists and is not a file",
194 )
196 # Assemble input requests
197 inputs = []
198 if options.inputs:
199 inputs += [(File_Reference("<config>"), item)
200 for item in options.inputs]
201 if options.inputs_from_file:
202 if not os.path.isfile(options.inputs_from_file): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 self._argument_parser.error(f"cannot open {options.inputs_from_file}")
204 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd:
205 for line_no, raw_line in enumerate(fd, 1):
206 line = raw_line.split("#", 1)[0].strip()
207 if not line: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 continue
209 inputs.append((File_Reference(options.inputs_from_file,
210 line_no),
211 line))
212 if not options.inputs and not options.inputs_from_file:
213 inputs.append((File_Reference("<config>"), "."))
214 # Sanity check and search directories
215 work_list = []
216 ok = True
217 for location, item in inputs:
218 if os.path.isfile(item):
219 if os.path.splitext(item)[1] not in self.extensions:
220 self.mh.warning(location,
221 f"not a {' or '.join(self.extensions)} file")
222 work_list.append(item)
224 elif os.path.isdir(item):
225 for path, dirs, files in os.walk(item):
226 for n, dir_name in reversed(list(enumerate(dirs))):
227 keep = True
228 for pattern in self.exclude_pat: 228 ↛ 229line 228 didn't jump to line 229 because the loop on line 228 never started
229 if pattern.match(dir_name):
230 keep = False
231 break
232 if not keep: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 del dirs[n]
235 for file_name in files:
236 if os.path.splitext(file_name)[1] in self.extensions:
237 work_list.append(os.path.join(path, file_name))
239 else:
240 self.mh.error(location,
241 f"{item} is not a file or directory",
242 fatal = False)
243 ok = False
245 if not ok:
246 sys.exit(1)
248 work_list.sort()
250 return work_list
252 def write_output(
253 self,
254 options: argparse.Namespace,
255 items: List[Union[Activity, Implementation, Requirement]],
256 ):
257 assert isinstance(options, argparse.Namespace)
258 assert isinstance(items, list)
259 assert all(isinstance(item, (Requirement,
260 Implementation,
261 Activity))
262 for item in items)
264 if options.out: 264 ↛ 270line 264 didn't jump to line 270 because the condition on line 264 was always true
265 ensure_output_directory(options.out)
266 with open(options.out, "w", encoding="UTF-8") as fd:
267 lobster_write(fd, self.schema, self.name, items)
268 print(f"{self.name}: wrote {len(items)} items to {options.out}")
269 else:
270 lobster_write(sys.stdout, self.schema, self.name, items)
272 @abstractmethod
273 def process_tool_options(
274 self,
275 options: argparse.Namespace,
276 work_list: List[Tuple[File_Reference, str]],
277 ):
278 assert isinstance(options, argparse.Namespace)
279 assert isinstance(work_list, list)
282class LOBSTER_Per_File_Tool(LOBSTER_Tool):
283 def __init__(self, name, description, extensions, official=False):
284 super().__init__(name, description, extensions, official)
286 @classmethod
287 @abstractmethod
288 def process(
289 cls,
290 options,
291 file_name,
292 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]:
293 pass
295 def _run_impl(self, options: argparse.Namespace) -> int:
296 work_list = self.process_commandline_and_yaml_options(options)
298 ok = True
299 items = []
300 pfun = partial(self.process, options)
302 if options.single: 302 ↛ 308line 302 didn't jump to line 308 because the condition on line 302 was always true
303 for file_name in work_list:
304 new_ok, new_items = pfun(file_name)
305 ok &= new_ok
306 items += new_items
307 else:
308 with multiprocessing.Pool() as pool:
309 for new_ok, new_items in pool.imap(pfun, work_list, 4):
310 ok &= new_ok
311 items += new_items
312 pool.close()
313 pool.join()
315 if ok:
316 self.write_output(options, items)
317 else:
318 print(f"{self.name}: aborting due to earlier errors")
320 return int(not ok)