Coverage for lobster/common/tool.py: 85%
148 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import sys
22import argparse
23import multiprocessing
25from abc import ABCMeta, abstractmethod
26from functools import partial
27from typing import Iterable, List, Sequence, Union, Tuple, Set, Dict
28import yaml
29from lobster.common.errors import Message_Handler
30from lobster.common.location import File_Reference
31from lobster.common.items import Requirement, Implementation, Activity
32from lobster.common.io import lobster_write
33from lobster.common.meta_data_tool_base import MetaDataToolBase
36class SupportedCommonConfigKeys:
37 """Helper class to define supported configuration keys."""
38 INPUTS_FROM_FILE = "inputs_from_file"
39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs"
40 INPUTS = "inputs"
42 @classmethod
43 def get_config_keys_manual(cls) -> Dict[str, str]:
44 help_dict = {
45 cls.INPUTS_FROM_FILE : "Read input files or directories from this "
46 "file. Each non-empty line is interpreted as "
47 "one input. Supports comments starting with #.",
48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded "
49 "by default.",
50 cls.INPUTS : "List of files to process or directories to "
51 "search for relevant input files.",
52 }
53 return help_dict
55 @abstractmethod
56 def get_mandatory_parameters(self) -> Set[str]:
57 """Return a set of config file parameters that are mandatory"""
59 def get_formatted_help_text(self):
60 help_dict = self.get_config_keys_manual()
61 help_text = ''
62 max_length = max(len(config_key) for config_key in help_dict)
63 for config_key, help_info in help_dict.items():
64 spaces = " " * (max_length - len(config_key))
65 help_text += f"\n{config_key} {spaces}- {help_info}"
66 return help_text
68 @classmethod
69 def get_config_keys_as_set(cls) -> Set[str]:
70 return set(cls.get_config_keys_manual().keys())
73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta):
74 def __init__(
75 self,
76 name: str,
77 description: str,
78 extensions: Iterable[str],
79 official: bool,
80 ):
81 super().__init__(
82 name=name,
83 description=description,
84 official=official,
85 )
86 assert isinstance(extensions, (list, set, frozenset, tuple))
87 assert all(isinstance(extension, str)
88 for extension in extensions)
90 self.extensions = [f".{extension}" for extension in (sorted(extensions))]
91 self.exclude_pat = []
92 self.schema = None
93 self.mh = Message_Handler()
94 self.config = {}
96 self._argument_parser.add_argument(
97 "--out",
98 default = f'{self.name}.lobster',
99 help = "Write output to the given file instead of stdout."
100 )
101 self._argument_parser.add_argument(
102 "--config",
103 default=None,
104 help=(f"Path to YAML file with arguments as below,"
105 f"{self.get_formatted_help_text()}"),
106 required=True
107 )
109 def load_yaml_config(self, config_path):
110 """
111 Loads configuration from a YAML file.
112 Parameters
113 ----------
114 config_path - Yaml config file path.
116 Returns
117 -------
118 data - Returns the Yaml file contents in dictionary format.
119 """
120 if not config_path: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 return {}
122 if not os.path.isfile(config_path): 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true
123 sys.exit(f"Error: Config file '{config_path}' not found.")
124 with open(config_path, "r", encoding="UTF-8") as f:
125 data = yaml.safe_load(f) or {}
126 return data
128 def validate_yaml_supported_config_parameters(self, config):
129 """
130 Function to check if the parameters mentioned in the Yaml config are
131 supported by the tool in execution
132 Parameters
133 ----------
134 data - Yaml config file contents
136 Returns
137 -------
138 Nothing
139 """
140 if config:
141 supported_keys = self.get_config_keys_as_set()
142 unsupported_keys = set(config.keys()) - supported_keys
143 if unsupported_keys: 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 raise KeyError(
145 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
146 f"Supported keys are: {', '.join(supported_keys)}."
147 )
149 def check_mandatory_config_parameters(self, config):
150 """
151 Function to check if the mandatory parameters are provided in the config file
152 Parameters
153 ----------
154 data - Yaml config file contents
156 Returns
157 -------
158 Nothing
159 """
160 if self.get_mandatory_parameters(): 160 ↛ exitline 160 didn't return from function 'check_mandatory_config_parameters' because the condition on line 160 was always true
161 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys())
162 if mandatory_parameters:
163 sys.exit(f"Required mandatory parameters missing - "
164 f"{','.join(mandatory_parameters)}")
166 def process_commandline_and_yaml_options(
167 self,
168 options: argparse.Namespace,
169 ) -> List[Tuple[File_Reference, str]]:
170 """Processes all command line options"""
172 self.config = self.load_yaml_config(options.config)
173 self.validate_yaml_supported_config_parameters(self.config)
174 self.check_mandatory_config_parameters(self.config)
175 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None)
176 options.inputs = self.config.get(self.INPUTS, [])
177 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False)
178 work_list = self.process_common_options(options)
179 self.process_tool_options(options, work_list)
180 return work_list
182 def process_common_options(
183 self,
184 options: argparse.Namespace,
185 ) -> List[Tuple[File_Reference, str]]:
186 """Generates the exact list of files to work on later. The list is sorted
187 alphabetically."""
188 # Sanity check output
189 if options.out and \ 189 ↛ 192line 189 didn't jump to line 192 because the condition on line 189 was never true
190 os.path.exists(options.out) and \
191 not os.path.isfile(options.out):
192 self._argument_parser.error(
193 f"output {options.out} already exists and is not a file",
194 )
196 # Assemble input requests
197 inputs = []
198 if options.inputs:
199 inputs += [(File_Reference("<config>"), item)
200 for item in options.inputs]
201 if options.inputs_from_file:
202 if not os.path.isfile(options.inputs_from_file): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 self._argument_parser.error(f"cannot open {options.inputs_from_file}")
204 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd:
205 for line_no, raw_line in enumerate(fd, 1):
206 line = raw_line.split("#", 1)[0].strip()
207 if not line: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 continue
209 inputs.append((File_Reference(options.inputs_from_file,
210 line_no),
211 line))
212 if not options.inputs and not options.inputs_from_file:
213 inputs.append((File_Reference("<config>"), "."))
214 # Sanity check and search directories
215 work_list = []
216 ok = True
217 for location, item in inputs:
218 if os.path.isfile(item):
219 if os.path.splitext(item)[1] not in self.extensions:
220 self.mh.warning(location,
221 f"not a {' or '.join(self.extensions)} file")
222 work_list.append(item)
224 elif os.path.isdir(item):
225 for path, dirs, files in os.walk(item):
226 for n, dir_name in reversed(list(enumerate(dirs))):
227 keep = True
228 for pattern in self.exclude_pat: 228 ↛ 229line 228 didn't jump to line 229 because the loop on line 228 never started
229 if pattern.match(dir_name):
230 keep = False
231 break
232 if not keep: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 del dirs[n]
235 for file_name in files:
236 if os.path.splitext(file_name)[1] in self.extensions:
237 work_list.append(os.path.join(path, file_name))
239 else:
240 self.mh.error(location,
241 f"{item} is not a file or directory",
242 fatal = False)
243 ok = False
245 if not ok:
246 sys.exit(1)
248 work_list.sort()
250 return work_list
252 def write_output(
253 self,
254 options: argparse.Namespace,
255 items: List[Union[Activity, Implementation, Requirement]],
256 ):
257 assert isinstance(options, argparse.Namespace)
258 assert isinstance(items, list)
259 assert all(isinstance(item, (Requirement,
260 Implementation,
261 Activity))
262 for item in items)
264 if options.out: 264 ↛ 269line 264 didn't jump to line 269 because the condition on line 264 was always true
265 with open(options.out, "w", encoding="UTF-8") as fd:
266 lobster_write(fd, self.schema, self.name, items)
267 print(f"{self.name}: wrote {len(items)} items to {options.out}")
268 else:
269 lobster_write(sys.stdout, self.schema, self.name, items)
271 @abstractmethod
272 def process_tool_options(
273 self,
274 options: argparse.Namespace,
275 work_list: List[Tuple[File_Reference, str]],
276 ):
277 assert isinstance(options, argparse.Namespace)
278 assert isinstance(work_list, list)
281class LOBSTER_Per_File_Tool(LOBSTER_Tool):
282 def __init__(self, name, description, extensions, official=False):
283 super().__init__(name, description, extensions, official)
285 @classmethod
286 @abstractmethod
287 def process(
288 cls,
289 options,
290 file_name,
291 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]:
292 pass
294 def _run_impl(self, options: argparse.Namespace) -> int:
295 work_list = self.process_commandline_and_yaml_options(options)
297 ok = True
298 items = []
299 pfun = partial(self.process, options)
301 if options.single: 301 ↛ 307line 301 didn't jump to line 307 because the condition on line 301 was always true
302 for file_name in work_list:
303 new_ok, new_items = pfun(file_name)
304 ok &= new_ok
305 items += new_items
306 else:
307 with multiprocessing.Pool() as pool:
308 for new_ok, new_items in pool.imap(pfun, work_list, 4):
309 ok &= new_ok
310 items += new_items
311 pool.close()
312 pool.join()
314 if ok:
315 self.write_output(options, items)
316 else:
317 print(f"{self.name}: aborting due to earlier errors")
319 return int(not ok)