Coverage for lobster/tools/trlc/trlc.py: 34%
257 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import os
21import re
22import sys
23import argparse
25from copy import copy
26from typing import Tuple, List, Set
28from trlc.trlc import Source_Manager
29from trlc.lexer import TRLC_Lexer, Token
30from trlc.parser import Parser_Base
31from trlc.errors import Message_Handler, TRLC_Error
32from trlc import ast
34from lobster.tool import LOBSTER_Tool
35from lobster.items import Tracing_Tag, Requirement
36from lobster.location import File_Reference
37from lobster.io import lobster_write
40class Config_Parser(Parser_Base):
41 def __init__(self, mh, file_name, stab):
42 assert isinstance(mh, Message_Handler)
43 assert os.path.isfile(file_name)
44 assert isinstance(stab, ast.Symbol_Table)
45 super().__init__(mh = mh,
46 lexer = TRLC_Lexer(mh, file_name),
47 eoc_name = "end of file",
48 token_map = Token.KIND,
49 keywords = TRLC_Lexer.KEYWORDS)
50 self.stab = stab
51 self.tree = {}
52 self.entries = {}
53 self.config = {}
54 self.to_string = {}
56 # Construct type hierarchy
57 for n_pkg in self.stab.values(ast.Package): 57 ↛ 58line 57 didn't jump to line 58 because the loop on line 57 never started
58 for n_typ in n_pkg.symbols.values(ast.Record_Type):
59 if n_typ not in self.tree:
60 self.tree[n_typ] = set()
61 if n_typ.parent:
62 if n_typ.parent not in self.tree:
63 self.tree[n_typ.parent] = set([n_typ])
64 else:
65 self.tree[n_typ.parent].add(n_typ)
67 def generate_lobster_object(self, n_obj):
68 assert isinstance(n_obj, ast.Record_Object)
69 assert n_obj.n_typ in self.config
71 config = self.config[n_obj.n_typ]
73 if not config["trace"]:
74 return None
76 item_tag = Tracing_Tag(namespace = "req",
77 tag = n_obj.fully_qualified_name(),
78 version = None)
80 item_loc = File_Reference(filename = n_obj.location.file_name,
81 line = n_obj.location.line_no,
82 column = n_obj.location.col_no)
84 item_data = n_obj.to_python_dict()
85 item_text = None
87 if len(config["description_fields"]) == 1:
88 item_text = item_data[config["description_fields"][0].name]
89 elif len(config["description_fields"]) > 1: 89 ↛ 95line 89 didn't jump to line 95 because the condition on line 89 was always true
90 item_text = "\n\n".join("%s: %s" % (field.name,
91 item_data[field.name])
92 for field in config["description_fields"]
93 if item_data[field.name])
95 rv = Requirement(tag = item_tag,
96 location = item_loc,
97 framework = "TRLC",
98 kind = n_obj.n_typ.name,
99 name = n_obj.fully_qualified_name(),
100 text = item_text if item_text else None)
102 for tag_namespace, tag_field in config["tag_fields"]:
103 if item_data[tag_field.name] is None: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 continue
105 elif isinstance(tag_field.n_typ, ast.Array_Type): 105 ↛ 112line 105 didn't jump to line 112 because the condition on line 105 was always true
106 for element in item_data[tag_field.name]:
107 text = self.generate_text(tag_field.n_typ.element_type,
108 element)
109 tag = Tracing_Tag.from_text(tag_namespace, text)
110 rv.add_tracing_target(tag)
111 else:
112 text = self.generate_text(tag_field.n_typ,
113 item_data[tag_field.name])
114 tag = Tracing_Tag.from_text(tag_namespace, text)
115 rv.add_tracing_target(tag)
117 for lst, name in ((rv.just_up, "just_up"),
118 (rv.just_down, "just_down"),
119 (rv.just_global, "just_global")):
120 for just_field in config[name + "_fields"]:
121 if item_data[just_field.name] is None: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true
122 continue
123 elif isinstance(just_field.n_typ, ast.Array_Type): 123 ↛ 130line 123 didn't jump to line 130 because the condition on line 123 was always true
124 for element in item_data[just_field.name]:
125 text = self.generate_text(
126 just_field.n_typ.element_type,
127 element)
128 lst.append(text)
129 else:
130 text = self.generate_text(just_field.n_typ,
131 item_data[just_field.name])
132 lst.append(text)
134 return rv
136 def generate_text(self, n_typ, value):
137 assert isinstance(n_typ, ast.Type)
138 assert not isinstance(n_typ, ast.Array_Type)
140 if isinstance(n_typ, ast.Tuple_Type):
141 if n_typ not in self.to_string: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 self.lexer.mh.error(n_typ.location,
143 "please define a to_string function for"
144 " this type in the lobster-trlc"
145 " configuration file")
147 # We have functions, so we attempt to apply until we get
148 # one that works, in order.
149 for function_seq in self.to_string[n_typ]:
150 rv = ""
151 valid = True
152 for kind, func in function_seq:
153 if kind == "text":
154 assert isinstance(func, str)
155 rv += func
156 elif kind == "field": 156 ↛ 152line 156 didn't jump to line 152 because the condition on line 156 was always true
157 assert isinstance(func, ast.Composite_Component)
158 if value[func.name] is None:
159 valid = False
160 break
161 rv += self.generate_text(func.n_typ,
162 value[func.name])
163 if valid:
164 return rv
166 self.lexer.mh.error(n_typ.location,
167 "please define a to_string function that"
168 " can render %s" % value)
170 else:
171 return str(value)
173 def parse_config_file(self):
174 # First parse config file
175 while self.nt:
176 self.parse_directive()
177 self.match_eof()
179 # Then build the type hierarchy configuration
180 for n_typ in self.tree:
181 if n_typ.parent:
182 continue
183 context = {
184 "trace" : False,
185 "description_fields" : [],
186 "tag_fields" : [],
187 "just_up_fields" : [],
188 "just_down_fields" : [],
189 "just_global_fields" : [],
190 }
192 self.build_config(n_typ, context)
194 def build_config(self, n_typ, config):
195 assert isinstance(n_typ, ast.Record_Type)
196 assert isinstance(config, dict)
198 self.config[n_typ] = config
199 if n_typ in self.entries:
200 self.config[n_typ]["trace"] = True
201 for field in ("description",
202 "just_up",
203 "just_down",
204 "just_global"):
205 ctx_name = "%s_fields" % field
206 for new_field in self.entries[n_typ][ctx_name]:
207 if new_field not in self.config[n_typ][ctx_name]:
208 self.config[n_typ][ctx_name].append(new_field)
209 for tag_namespace, tag_field in self.entries[n_typ]["tag_fields"]:
210 self.config[n_typ]["tag_fields"].append((tag_namespace,
211 tag_field))
213 for n_extension in self.tree[n_typ]:
214 new_context = {
215 "trace" : self.config[n_typ]["trace"],
217 "description_fields" :
218 copy(self.config[n_typ]["description_fields"]),
220 "just_up_fields" :
221 copy(self.config[n_typ]["just_up_fields"]),
223 "just_down_fields" :
224 copy(self.config[n_typ]["just_down_fields"]),
226 "just_global_fields" :
227 copy(self.config[n_typ]["just_global_fields"]),
229 "tag_fields" :
230 copy(self.config[n_typ]["tag_fields"]),
231 }
232 self.build_config(n_extension, new_context)
234 def parse_record_type(self, n_typ):
235 assert isinstance(n_typ, ast.Record_Type)
237 if n_typ in self.entries:
238 self.lexer.mh.error(self.ct.location,
239 "duplicate configuration block")
240 else:
241 self.entries[n_typ] = {
242 "description_fields" : [],
243 "tag_fields" : [],
244 "just_up_fields" : [],
245 "just_down_fields" : [],
246 "just_global_fields" : [],
247 }
249 self.match("C_BRA")
251 while self.peek("IDENTIFIER"):
252 self.match("IDENTIFIER")
253 if self.ct.value in ("description",
254 "just_up",
255 "just_down",
256 "just_global"):
257 field = self.ct.value
258 self.match("ASSIGN")
259 self.match("IDENTIFIER")
260 n_comp = n_typ.components.lookup(
261 mh = self.lexer.mh,
262 referencing_token = self.ct,
263 required_subclass = ast.Composite_Component)
264 self.entries[n_typ]["%s_fields" % field].append(n_comp)
265 elif self.ct.value == "tags":
266 if self.peek("STRING"):
267 self.match("STRING")
268 tag_namespace = self.ct.value
269 else:
270 tag_namespace = "req"
271 self.match("ASSIGN")
272 self.match("IDENTIFIER")
273 n_comp = n_typ.components.lookup(
274 mh = self.lexer.mh,
275 referencing_token = self.ct,
276 required_subclass = ast.Composite_Component)
277 self.entries[n_typ]["tag_fields"].append((tag_namespace,
278 n_comp))
279 else:
280 self.lexer.mh.error(
281 self.ct.location,
282 "expected description|tags|just_up|just_down|just_global")
284 self.match("C_KET")
286 def parse_text_generator(self, n_typ):
287 assert isinstance(n_typ, ast.Composite_Type)
289 function = []
291 if self.peek("STRING"):
292 self.match("STRING")
293 cpos = 0
294 function = []
295 for match in re.finditer(r"\$\([a-z][a-z0-9_]*\)",
296 self.ct.value):
297 if match.span()[0] > cpos:
298 function.append(("text",
299 self.ct.value[cpos:match.span()[0]]))
300 n_comp = n_typ.components.lookup_direct(
301 mh = self.lexer.mh,
302 name = match.group(0)[2:-1],
303 error_location = self.ct.location,
304 required_subclass = ast.Composite_Component)
305 function.append(("field", n_comp))
306 cpos = match.span()[1]
307 if cpos < len(self.ct.value):
308 function.append(("text",
309 self.ct.value[cpos:]))
310 # for kind, value in function:
311 # if kind == "text" and not re.match("^[a-zA-Z_0-9@]+$",
312 # value):
313 # self.lexer.mh.error(
314 # self.ct.location,
315 # "text segment '%s' can only contain letters,"
316 # " numbers, underscores, or @" % value)
318 else:
319 self.match("IDENTIFIER")
320 n_comp = n_typ.components.lookup(
321 mh = self.lexer.mh,
322 referencing_token = self.ct,
323 required_subclass = ast.Composite_Component)
324 function.append(("field", n_comp))
326 return function
328 def parse_tuple_type(self, n_typ):
329 assert isinstance(n_typ, ast.Tuple_Type)
331 if n_typ in self.to_string:
332 self.lexer.mh.error(self.ct.location,
333 "duplicate configuration block")
334 else:
335 self.to_string[n_typ] = []
337 self.match("C_BRA")
339 while self.peek("IDENTIFIER"):
340 self.match("IDENTIFIER")
341 if self.ct.value == "to_string":
342 self.match("ASSIGN")
343 self.to_string[n_typ].append(
344 self.parse_text_generator(n_typ))
345 else:
346 self.lexer.mh.error(self.ct.location,
347 "expected to_string")
349 self.match("C_KET")
351 def parse_directive(self):
352 self.match("IDENTIFIER")
353 n_pkg = self.stab.lookup(mh = self.lexer.mh,
354 referencing_token = self.ct,
355 required_subclass = ast.Package)
356 self.match("DOT")
357 self.match("IDENTIFIER")
358 n_typ = n_pkg.symbols.lookup(mh = self.lexer.mh,
359 referencing_token = self.ct,
360 required_subclass = ast.Composite_Type)
361 if isinstance(n_typ, ast.Record_Type):
362 self.parse_record_type(n_typ)
363 else:
364 self.parse_tuple_type(n_typ)
367class LOBSTER_Trlc(LOBSTER_Tool):
368 def __init__(self):
369 super().__init__(
370 name = "trlc",
371 description = "Extract tracing data from rsl and trlc files.",
372 extensions = ["rsl", "trlc"],
373 official = True)
375 for action in self.ap._actions:
376 if action.dest == 'config':
377 action.required = False
379 # Supported config parameters for lobster-trlc
380 TRLC_CONFIG_FILE = "trlc_config_file"
382 @classmethod
383 def get_config_keys_manual(cls):
384 help_dict = super().get_config_keys_manual()
385 help_dict.update(
386 {
387 cls.TRLC_CONFIG_FILE: "Name of lobster-trlc config file, "
388 "by default lobster-trlc.conf"
389 }
390 )
391 return help_dict
393 def get_mandatory_parameters(self) -> Set[str]:
394 """As of now lobster-trlc don't have any mandatory parameters"""
395 return set()
397 def process_commandline_and_yaml_options(
398 self,
399 ) -> Tuple[argparse.Namespace, List[Tuple[File_Reference, str]]]:
400 """
401 Overrides the parent class method and add fetch tool specific options from the
402 yaml config
404 Returns
405 -------
406 options - command-line and yaml options
407 worklist - list of trlc and rsl files
408 """
410 options, work_list = super().process_commandline_and_yaml_options()
411 options.trlc_config_file = self.config.get(self.TRLC_CONFIG_FILE,
412 "lobster-trlc.conf")
413 return options, work_list
415 def process_tool_options(
416 self,
417 options: argparse.Namespace,
418 work_list: List[Tuple[File_Reference, str]],
419 ):
420 super().process_tool_options(options, work_list)
421 self.schema = Requirement
423 def execute(self):
424 trlc_mh = Message_Handler()
425 sm = Source_Manager(trlc_mh)
426 options, work_list = self.process_commandline_and_yaml_options()
427 if not os.path.isfile(options.trlc_config_file):
428 sys.exit("lobster-trlc: cannot open config file '%s'" %
429 options.trlc_config_file)
431 if os.path.exists(options.out) and not os.path.isfile(options.out):
432 sys.exit("lobster-trlc: output file '%s' exists and is not a file"
433 % options.out)
435 ok = True
436 for item in work_list:
437 if os.path.isfile(item):
438 try:
439 sm.register_file(item)
440 except TRLC_Error:
441 ok = False
442 elif os.path.isdir(item):
443 try:
444 sm.register_directory(item)
445 except TRLC_Error:
446 ok = False
447 else:
448 print("lobster-trlc: neither a file or directory: '%s'" %
449 item)
450 ok = False
452 if ok:
453 stab = sm.process()
454 # pylint: disable=possibly-used-before-assignment
455 if not ok or stab is None:
456 print("lobster-trlc: aborting due to earlier error")
457 return 1
459 config_parser = Config_Parser(trlc_mh, options.trlc_config_file, stab)
460 try:
461 config_parser.parse_config_file()
462 except TRLC_Error:
463 print("lobster-trlc: aborting due to error in"
464 " configuration file '%s'" % options.trlc_config_file)
465 return 1
467 items = []
468 for n_obj in stab.iter_record_objects():
469 try:
470 item = config_parser.generate_lobster_object(n_obj)
471 if item:
472 items.append(item)
473 except TRLC_Error:
474 ok = False
476 if not ok:
477 print("lobster-trlc: aborting due to error during extraction")
478 return 1
480 with open(options.out, "w", encoding="UTF-8") as fd:
481 # lobster-trace: trlc_req.Output_File
482 lobster_write(fd=fd,
483 kind=Requirement,
484 generator="lobster-trlc",
485 items=items)
486 print("lobster-trlc: successfully wrote %u items to %s" %
487 (len(items), options.out))
488 return 0
491def main():
492 # lobster-trace: trlc_req.Dummy_Requirement
493 tool = LOBSTER_Trlc()
494 return tool.execute()
497if __name__ == "__main__":
498 sys.exit(main())