Coverage for lobster/tool.py: 85%
148 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023, 2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import sys
22import argparse
23import multiprocessing
25from abc import ABCMeta, abstractmethod
26from functools import partial
27from typing import List, Sequence, Union, Tuple, Set, Dict
28import yaml
29from lobster.errors import Message_Handler
30from lobster.location import File_Reference
31from lobster.items import Requirement, Implementation, Activity
32from lobster.io import lobster_write
33from lobster.meta_data_tool_base import MetaDataToolBase
36class SupportedCommonConfigKeys:
37 """Helper class to define supported configuration keys."""
38 INPUTS_FROM_FILE = "inputs_from_file"
39 TRAVERSE_BAZEL_DIRS = "traverse_bazel_dirs"
40 INPUTS = "inputs"
42 @classmethod
43 def get_config_keys_manual(cls) -> Dict[str, str]:
44 help_dict = {
45 cls.INPUTS_FROM_FILE : "Read input files or directories from this "
46 "file. Each non-empty line is interpreted as "
47 "one input. Supports comments starting with #.",
48 cls.TRAVERSE_BAZEL_DIRS : "Enter bazel-* directories, which are excluded "
49 "by default.",
50 cls.INPUTS : "List of files to process or directories to "
51 "search for relevant input files.",
52 }
53 return help_dict
55 @abstractmethod
56 def get_mandatory_parameters(self) -> Set[str]:
57 """Return a set of config file parameters that are mandatory"""
59 def get_formatted_help_text(self):
60 help_dict = self.get_config_keys_manual()
61 help_text = ''
62 max_length = max(len(config_key) for config_key in help_dict)
63 for config_key, help_info in help_dict.items():
64 spaces = " " * (max_length - len(config_key))
65 help_text += f"\n{config_key} {spaces}- {help_info}"
66 return help_text
68 @classmethod
69 def get_config_keys_as_set(cls) -> Set[str]:
70 return set(cls.get_config_keys_manual().keys())
73class LOBSTER_Tool(MetaDataToolBase, SupportedCommonConfigKeys, metaclass=ABCMeta):
74 def __init__(self, name, description, extensions, official):
75 super().__init__(
76 name=name,
77 description=description,
78 official=official,
79 )
80 assert isinstance(extensions, (list, set, frozenset, tuple))
81 assert all(isinstance(extension, str)
82 for extension in extensions)
84 self.extensions = [f".{extension}" for extension in (sorted(extensions))]
85 self.exclude_pat = []
86 self.schema = None
87 self.mh = Message_Handler()
88 self.config = {}
90 self._argument_parser.add_argument(
91 "--out",
92 default = f'{self.name}.lobster',
93 help = "Write output to the given file instead of stdout."
94 )
95 self._argument_parser.add_argument(
96 "--config",
97 default=None,
98 help=(f"Path to YAML file with arguments as below,"
99 f"{self.get_formatted_help_text()}"),
100 required=True
101 )
103 def load_yaml_config(self, config_path):
104 """
105 Loads configuration from a YAML file.
106 Parameters
107 ----------
108 config_path - Yaml config file path.
110 Returns
111 -------
112 data - Returns the Yaml file contents in dictionary format.
113 """
114 if not config_path: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 return {}
116 if not os.path.isfile(config_path): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 sys.exit(f"Error: Config file '{config_path}' not found.")
118 with open(config_path, "r", encoding="UTF-8") as f:
119 data = yaml.safe_load(f) or {}
120 return data
122 def validate_yaml_supported_config_parameters(self, config):
123 """
124 Function to check if the parameters mentioned in the Yaml config are
125 supported by the tool in execution
126 Parameters
127 ----------
128 data - Yaml config file contents
130 Returns
131 -------
132 Nothing
133 """
134 if config: 134 ↛ exitline 134 didn't return from function 'validate_yaml_supported_config_parameters' because the condition on line 134 was always true
135 supported_keys = self.get_config_keys_as_set()
136 unsupported_keys = set(config.keys()) - supported_keys
137 if unsupported_keys: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 raise KeyError(
139 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
140 f"Supported keys are: {', '.join(supported_keys)}."
141 )
143 def check_mandatory_config_parameters(self, config):
144 """
145 Function to check if the mandatory parameters are provided in the config file
146 Parameters
147 ----------
148 data - Yaml config file contents
150 Returns
151 -------
152 Nothing
153 """
154 if self.get_mandatory_parameters():
155 mandatory_parameters = self.get_mandatory_parameters() - set(config.keys())
156 if mandatory_parameters:
157 sys.exit(f"Required mandatory parameters missing - "
158 f"{','.join(mandatory_parameters)}")
160 def process_commandline_and_yaml_options(
161 self,
162 options: argparse.Namespace,
163 ) -> List[Tuple[File_Reference, str]]:
164 """Processes all command line options"""
166 self.config = self.load_yaml_config(options.config)
167 self.validate_yaml_supported_config_parameters(self.config)
168 self.check_mandatory_config_parameters(self.config)
169 options.inputs_from_file = self.config.get(self.INPUTS_FROM_FILE, None)
170 options.inputs = self.config.get(self.INPUTS, [])
171 options.traverse_bazel_dirs = self.config.get(self.TRAVERSE_BAZEL_DIRS, False)
172 work_list = self.process_common_options(options)
173 self.process_tool_options(options, work_list)
174 return work_list
176 def process_common_options(
177 self,
178 options: argparse.Namespace,
179 ) -> List[Tuple[File_Reference, str]]:
180 """Generates the exact list of files to work on later. The list is sorted
181 alphabetically."""
182 # Sanity check output
183 if options.out and \ 183 ↛ 186line 183 didn't jump to line 186 because the condition on line 183 was never true
184 os.path.exists(options.out) and \
185 not os.path.isfile(options.out):
186 self._argument_parser.error(
187 f"output {options.out} already exists and is not a file",
188 )
190 # Assemble input requests
191 inputs = []
192 if options.inputs:
193 inputs += [(File_Reference("<config>"), item)
194 for item in options.inputs]
195 if options.inputs_from_file:
196 if not os.path.isfile(options.inputs_from_file): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 self._argument_parser.error(f"cannot open {options.inputs_from_file}")
198 with open(options.inputs_from_file, "r", encoding="UTF-8") as fd:
199 for line_no, raw_line in enumerate(fd, 1):
200 line = raw_line.split("#", 1)[0].strip()
201 if not line: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 continue
203 inputs.append((File_Reference(options.inputs_from_file,
204 line_no),
205 line))
206 if not options.inputs and not options.inputs_from_file:
207 inputs.append((File_Reference("<config>"), "."))
208 # Sanity check and search directories
209 work_list = []
210 ok = True
211 for location, item in inputs:
212 if os.path.isfile(item):
213 if os.path.splitext(item)[1] not in self.extensions:
214 self.mh.warning(location,
215 f"not a {' or '.join(self.extensions)} file")
216 work_list.append(item)
218 elif os.path.isdir(item):
219 for path, dirs, files in os.walk(item):
220 for n, dir_name in reversed(list(enumerate(dirs))):
221 keep = True
222 for pattern in self.exclude_pat: 222 ↛ 223line 222 didn't jump to line 223 because the loop on line 222 never started
223 if pattern.match(dir_name):
224 keep = False
225 break
226 if not keep: 226 ↛ 227line 226 didn't jump to line 227 because the condition on line 226 was never true
227 del dirs[n]
229 for file_name in files:
230 if os.path.splitext(file_name)[1] in self.extensions:
231 work_list.append(os.path.join(path, file_name))
233 else:
234 self.mh.error(location,
235 f"{item} is not a file or directory",
236 fatal = False)
237 ok = False
239 if not ok:
240 sys.exit(1)
242 work_list.sort()
244 return work_list
246 def write_output(
247 self,
248 options: argparse.Namespace,
249 items: List[Union[Activity, Implementation, Requirement]],
250 ):
251 assert isinstance(options, argparse.Namespace)
252 assert isinstance(items, list)
253 assert all(isinstance(item, (Requirement,
254 Implementation,
255 Activity))
256 for item in items)
258 if options.out: 258 ↛ 263line 258 didn't jump to line 263 because the condition on line 258 was always true
259 with open(options.out, "w", encoding="UTF-8") as fd:
260 lobster_write(fd, self.schema, self.name, items)
261 print(f"{self.name}: wrote {len(items)} items to {options.out}")
262 else:
263 lobster_write(sys.stdout, self.schema, self.name, items)
265 @abstractmethod
266 def process_tool_options(
267 self,
268 options: argparse.Namespace,
269 work_list: List[Tuple[File_Reference, str]],
270 ):
271 assert isinstance(options, argparse.Namespace)
272 assert isinstance(work_list, list)
275class LOBSTER_Per_File_Tool(LOBSTER_Tool):
276 def __init__(self, name, description, extensions, official=False):
277 super().__init__(name, description, extensions, official)
279 @classmethod
280 @abstractmethod
281 def process(
282 cls,
283 options,
284 file_name,
285 ) -> Tuple[bool, Sequence[Union[Activity, Implementation, Requirement]]]:
286 pass
288 def _run_impl(self, options: argparse.Namespace) -> int:
289 work_list = self.process_commandline_and_yaml_options(options)
291 ok = True
292 items = []
293 pfun = partial(self.process, options)
295 if options.single: 295 ↛ 301line 295 didn't jump to line 301 because the condition on line 295 was always true
296 for file_name in work_list:
297 new_ok, new_items = pfun(file_name)
298 ok &= new_ok
299 items += new_items
300 else:
301 with multiprocessing.Pool() as pool:
302 for new_ok, new_items in pool.imap(pfun, work_list, 4):
303 ok &= new_ok
304 items += new_items
305 pool.close()
306 pool.join()
308 if ok:
309 self.write_output(options, items)
310 else:
311 print(f"{self.name}: aborting due to earlier errors")
313 return int(not ok)