Coverage for lobster/tools/trlc/trlc.py: 52%

257 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 14:55 +0000

1#!/usr/bin/env python3 

2# 

3# LOBSTER - Lightweight Open BMW Software Traceability Evidence Report 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import os 

21import re 

22import sys 

23import argparse 

24 

25from copy import copy 

26from typing import Tuple, List, Set 

27 

28from trlc.trlc import Source_Manager 

29from trlc.lexer import TRLC_Lexer, Token 

30from trlc.parser import Parser_Base 

31from trlc.errors import Message_Handler, TRLC_Error 

32from trlc import ast 

33 

34from lobster.tool import LOBSTER_Tool 

35from lobster.items import Tracing_Tag, Requirement 

36from lobster.location import File_Reference 

37from lobster.io import lobster_write 

38 

39 

40class Config_Parser(Parser_Base): 

41 def __init__(self, mh, file_name, stab): 

42 assert isinstance(mh, Message_Handler) 

43 assert os.path.isfile(file_name) 

44 assert isinstance(stab, ast.Symbol_Table) 

45 super().__init__(mh = mh, 

46 lexer = TRLC_Lexer(mh, file_name), 

47 eoc_name = "end of file", 

48 token_map = Token.KIND, 

49 keywords = TRLC_Lexer.KEYWORDS) 

50 self.stab = stab 

51 self.tree = {} 

52 self.entries = {} 

53 self.config = {} 

54 self.to_string = {} 

55 

56 # Construct type hierarchy 

57 for n_pkg in self.stab.values(ast.Package): 

58 for n_typ in n_pkg.symbols.values(ast.Record_Type): 

59 if n_typ not in self.tree: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true

60 self.tree[n_typ] = set() 

61 if n_typ.parent: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 if n_typ.parent not in self.tree: 

63 self.tree[n_typ.parent] = set([n_typ]) 

64 else: 

65 self.tree[n_typ.parent].add(n_typ) 

66 

67 def generate_lobster_object(self, n_obj): 

68 assert isinstance(n_obj, ast.Record_Object) 

69 assert n_obj.n_typ in self.config 

70 

71 config = self.config[n_obj.n_typ] 

72 

73 if not config["trace"]: 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true

74 return None 

75 

76 item_tag = Tracing_Tag(namespace = "req", 

77 tag = n_obj.fully_qualified_name(), 

78 version = None) 

79 

80 item_loc = File_Reference(filename = n_obj.location.file_name, 

81 line = n_obj.location.line_no, 

82 column = n_obj.location.col_no) 

83 

84 item_data = n_obj.to_python_dict() 

85 item_text = None 

86 

87 if len(config["description_fields"]) == 1: 87 ↛ 89line 87 didn't jump to line 89 because the condition on line 87 was always true

88 item_text = item_data[config["description_fields"][0].name] 

89 elif len(config["description_fields"]) > 1: 

90 item_text = "\n\n".join("%s: %s" % (field.name, 

91 item_data[field.name]) 

92 for field in config["description_fields"] 

93 if item_data[field.name]) 

94 

95 rv = Requirement(tag = item_tag, 

96 location = item_loc, 

97 framework = "TRLC", 

98 kind = n_obj.n_typ.name, 

99 name = n_obj.fully_qualified_name(), 

100 text = item_text if item_text else None) 

101 

102 for tag_namespace, tag_field in config["tag_fields"]: 102 ↛ 103line 102 didn't jump to line 103 because the loop on line 102 never started

103 if item_data[tag_field.name] is None: 

104 continue 

105 elif isinstance(tag_field.n_typ, ast.Array_Type): 

106 for element in item_data[tag_field.name]: 

107 text = self.generate_text(tag_field.n_typ.element_type, 

108 element) 

109 tag = Tracing_Tag.from_text(tag_namespace, text) 

110 rv.add_tracing_target(tag) 

111 else: 

112 text = self.generate_text(tag_field.n_typ, 

113 item_data[tag_field.name]) 

114 tag = Tracing_Tag.from_text(tag_namespace, text) 

115 rv.add_tracing_target(tag) 

116 

117 for lst, name in ((rv.just_up, "just_up"), 

118 (rv.just_down, "just_down"), 

119 (rv.just_global, "just_global")): 

120 for just_field in config[name + "_fields"]: 120 ↛ 121line 120 didn't jump to line 121 because the loop on line 120 never started

121 if item_data[just_field.name] is None: 

122 continue 

123 elif isinstance(just_field.n_typ, ast.Array_Type): 

124 for element in item_data[just_field.name]: 

125 text = self.generate_text( 

126 just_field.n_typ.element_type, 

127 element) 

128 lst.append(text) 

129 else: 

130 text = self.generate_text(just_field.n_typ, 

131 item_data[just_field.name]) 

132 lst.append(text) 

133 

134 return rv 

135 

136 def generate_text(self, n_typ, value): 

137 assert isinstance(n_typ, ast.Type) 

138 assert not isinstance(n_typ, ast.Array_Type) 

139 

140 if isinstance(n_typ, ast.Tuple_Type): 

141 if n_typ not in self.to_string: 

142 self.lexer.mh.error(n_typ.location, 

143 "please define a to_string function for" 

144 " this type in the lobster-trlc" 

145 " configuration file") 

146 

147 # We have functions, so we attempt to apply until we get 

148 # one that works, in order. 

149 for function_seq in self.to_string[n_typ]: 

150 rv = "" 

151 valid = True 

152 for kind, func in function_seq: 

153 if kind == "text": 

154 assert isinstance(func, str) 

155 rv += func 

156 elif kind == "field": 

157 assert isinstance(func, ast.Composite_Component) 

158 if value[func.name] is None: 

159 valid = False 

160 break 

161 rv += self.generate_text(func.n_typ, 

162 value[func.name]) 

163 if valid: 

164 return rv 

165 

166 self.lexer.mh.error(n_typ.location, 

167 "please define a to_string function that" 

168 " can render %s" % value) 

169 

170 else: 

171 return str(value) 

172 

173 def parse_config_file(self): 

174 # First parse config file 

175 while self.nt: 

176 self.parse_directive() 

177 self.match_eof() 

178 

179 # Then build the type hierarchy configuration 

180 for n_typ in self.tree: 

181 if n_typ.parent: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 continue 

183 context = { 

184 "trace" : False, 

185 "description_fields" : [], 

186 "tag_fields" : [], 

187 "just_up_fields" : [], 

188 "just_down_fields" : [], 

189 "just_global_fields" : [], 

190 } 

191 

192 self.build_config(n_typ, context) 

193 

194 def build_config(self, n_typ, config): 

195 assert isinstance(n_typ, ast.Record_Type) 

196 assert isinstance(config, dict) 

197 

198 self.config[n_typ] = config 

199 if n_typ in self.entries: 199 ↛ 213line 199 didn't jump to line 213 because the condition on line 199 was always true

200 self.config[n_typ]["trace"] = True 

201 for field in ("description", 

202 "just_up", 

203 "just_down", 

204 "just_global"): 

205 ctx_name = "%s_fields" % field 

206 for new_field in self.entries[n_typ][ctx_name]: 

207 if new_field not in self.config[n_typ][ctx_name]: 207 ↛ 206line 207 didn't jump to line 206 because the condition on line 207 was always true

208 self.config[n_typ][ctx_name].append(new_field) 

209 for tag_namespace, tag_field in self.entries[n_typ]["tag_fields"]: 209 ↛ 210line 209 didn't jump to line 210 because the loop on line 209 never started

210 self.config[n_typ]["tag_fields"].append((tag_namespace, 

211 tag_field)) 

212 

213 for n_extension in self.tree[n_typ]: 213 ↛ 214line 213 didn't jump to line 214 because the loop on line 213 never started

214 new_context = { 

215 "trace" : self.config[n_typ]["trace"], 

216 

217 "description_fields" : 

218 copy(self.config[n_typ]["description_fields"]), 

219 

220 "just_up_fields" : 

221 copy(self.config[n_typ]["just_up_fields"]), 

222 

223 "just_down_fields" : 

224 copy(self.config[n_typ]["just_down_fields"]), 

225 

226 "just_global_fields" : 

227 copy(self.config[n_typ]["just_global_fields"]), 

228 

229 "tag_fields" : 

230 copy(self.config[n_typ]["tag_fields"]), 

231 } 

232 self.build_config(n_extension, new_context) 

233 

234 def parse_record_type(self, n_typ): 

235 assert isinstance(n_typ, ast.Record_Type) 

236 

237 if n_typ in self.entries: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 self.lexer.mh.error(self.ct.location, 

239 "duplicate configuration block") 

240 else: 

241 self.entries[n_typ] = { 

242 "description_fields" : [], 

243 "tag_fields" : [], 

244 "just_up_fields" : [], 

245 "just_down_fields" : [], 

246 "just_global_fields" : [], 

247 } 

248 

249 self.match("C_BRA") 

250 

251 while self.peek("IDENTIFIER"): 

252 self.match("IDENTIFIER") 

253 if self.ct.value in ("description", 253 ↛ 265line 253 didn't jump to line 265 because the condition on line 253 was always true

254 "just_up", 

255 "just_down", 

256 "just_global"): 

257 field = self.ct.value 

258 self.match("ASSIGN") 

259 self.match("IDENTIFIER") 

260 n_comp = n_typ.components.lookup( 

261 mh = self.lexer.mh, 

262 referencing_token = self.ct, 

263 required_subclass = ast.Composite_Component) 

264 self.entries[n_typ]["%s_fields" % field].append(n_comp) 

265 elif self.ct.value == "tags": 

266 if self.peek("STRING"): 

267 self.match("STRING") 

268 tag_namespace = self.ct.value 

269 else: 

270 tag_namespace = "req" 

271 self.match("ASSIGN") 

272 self.match("IDENTIFIER") 

273 n_comp = n_typ.components.lookup( 

274 mh = self.lexer.mh, 

275 referencing_token = self.ct, 

276 required_subclass = ast.Composite_Component) 

277 self.entries[n_typ]["tag_fields"].append((tag_namespace, 

278 n_comp)) 

279 else: 

280 self.lexer.mh.error( 

281 self.ct.location, 

282 "expected description|tags|just_up|just_down|just_global") 

283 

284 self.match("C_KET") 

285 

286 def parse_text_generator(self, n_typ): 

287 assert isinstance(n_typ, ast.Composite_Type) 

288 

289 function = [] 

290 

291 if self.peek("STRING"): 

292 self.match("STRING") 

293 cpos = 0 

294 function = [] 

295 for match in re.finditer(r"\$\([a-z][a-z0-9_]*\)", 

296 self.ct.value): 

297 if match.span()[0] > cpos: 

298 function.append(("text", 

299 self.ct.value[cpos:match.span()[0]])) 

300 n_comp = n_typ.components.lookup_direct( 

301 mh = self.lexer.mh, 

302 name = match.group(0)[2:-1], 

303 error_location = self.ct.location, 

304 required_subclass = ast.Composite_Component) 

305 function.append(("field", n_comp)) 

306 cpos = match.span()[1] 

307 if cpos < len(self.ct.value): 

308 function.append(("text", 

309 self.ct.value[cpos:])) 

310 # for kind, value in function: 

311 # if kind == "text" and not re.match("^[a-zA-Z_0-9@]+$", 

312 # value): 

313 # self.lexer.mh.error( 

314 # self.ct.location, 

315 # "text segment '%s' can only contain letters," 

316 # " numbers, underscores, or @" % value) 

317 

318 else: 

319 self.match("IDENTIFIER") 

320 n_comp = n_typ.components.lookup( 

321 mh = self.lexer.mh, 

322 referencing_token = self.ct, 

323 required_subclass = ast.Composite_Component) 

324 function.append(("field", n_comp)) 

325 

326 return function 

327 

328 def parse_tuple_type(self, n_typ): 

329 assert isinstance(n_typ, ast.Tuple_Type) 

330 

331 if n_typ in self.to_string: 

332 self.lexer.mh.error(self.ct.location, 

333 "duplicate configuration block") 

334 else: 

335 self.to_string[n_typ] = [] 

336 

337 self.match("C_BRA") 

338 

339 while self.peek("IDENTIFIER"): 

340 self.match("IDENTIFIER") 

341 if self.ct.value == "to_string": 

342 self.match("ASSIGN") 

343 self.to_string[n_typ].append( 

344 self.parse_text_generator(n_typ)) 

345 else: 

346 self.lexer.mh.error(self.ct.location, 

347 "expected to_string") 

348 

349 self.match("C_KET") 

350 

351 def parse_directive(self): 

352 self.match("IDENTIFIER") 

353 n_pkg = self.stab.lookup(mh = self.lexer.mh, 

354 referencing_token = self.ct, 

355 required_subclass = ast.Package) 

356 self.match("DOT") 

357 self.match("IDENTIFIER") 

358 n_typ = n_pkg.symbols.lookup(mh = self.lexer.mh, 

359 referencing_token = self.ct, 

360 required_subclass = ast.Composite_Type) 

361 if isinstance(n_typ, ast.Record_Type): 361 ↛ 364line 361 didn't jump to line 364 because the condition on line 361 was always true

362 self.parse_record_type(n_typ) 

363 else: 

364 self.parse_tuple_type(n_typ) 

365 

366 

367class LOBSTER_Trlc(LOBSTER_Tool): 

368 def __init__(self): 

369 super().__init__( 

370 name = "trlc", 

371 description = "Extract tracing data from rsl and trlc files.", 

372 extensions = ["rsl", "trlc"], 

373 official = True) 

374 

375 for action in self.ap._actions: 

376 if action.dest == 'config': 

377 action.required = False 

378 

379 # Supported config parameters for lobster-trlc 

380 TRLC_CONFIG_FILE = "trlc_config_file" 

381 

382 @classmethod 

383 def get_config_keys_manual(cls): 

384 help_dict = super().get_config_keys_manual() 

385 help_dict.update( 

386 { 

387 cls.TRLC_CONFIG_FILE: "Name of lobster-trlc config file, " 

388 "by default lobster-trlc.conf" 

389 } 

390 ) 

391 return help_dict 

392 

393 def get_mandatory_parameters(self) -> Set[str]: 

394 """As of now lobster-trlc don't have any mandatory parameters""" 

395 return set() 

396 

397 def process_commandline_and_yaml_options( 

398 self, 

399 ) -> Tuple[argparse.Namespace, List[Tuple[File_Reference, str]]]: 

400 """ 

401 Overrides the parent class method and add fetch tool specific options from the 

402 yaml config 

403 

404 Returns 

405 ------- 

406 options - command-line and yaml options 

407 worklist - list of trlc and rsl files 

408 """ 

409 

410 options, work_list = super().process_commandline_and_yaml_options() 

411 options.trlc_config_file = self.config.get(self.TRLC_CONFIG_FILE, 

412 "lobster-trlc.conf") 

413 return options, work_list 

414 

415 def process_tool_options( 

416 self, 

417 options: argparse.Namespace, 

418 work_list: List[Tuple[File_Reference, str]], 

419 ): 

420 super().process_tool_options(options, work_list) 

421 self.schema = Requirement 

422 

423 def execute(self): 

424 trlc_mh = Message_Handler() 

425 sm = Source_Manager(trlc_mh) 

426 options, work_list = self.process_commandline_and_yaml_options() 

427 if not os.path.isfile(options.trlc_config_file): 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 sys.exit("lobster-trlc: cannot open config file '%s'" % 

429 options.trlc_config_file) 

430 

431 if os.path.exists(options.out) and not os.path.isfile(options.out): 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true

432 sys.exit("lobster-trlc: output file '%s' exists and is not a file" 

433 % options.out) 

434 

435 ok = True 

436 for item in work_list: 

437 if os.path.isfile(item): 437 ↛ 442line 437 didn't jump to line 442 because the condition on line 437 was always true

438 try: 

439 sm.register_file(item) 

440 except TRLC_Error: 

441 ok = False 

442 elif os.path.isdir(item): 

443 try: 

444 sm.register_directory(item) 

445 except TRLC_Error: 

446 ok = False 

447 else: 

448 print("lobster-trlc: neither a file or directory: '%s'" % 

449 item) 

450 ok = False 

451 

452 if ok: 452 ↛ 455line 452 didn't jump to line 455 because the condition on line 452 was always true

453 stab = sm.process() 

454 # pylint: disable=possibly-used-before-assignment 

455 if not ok or stab is None: 

456 print("lobster-trlc: aborting due to earlier error") 

457 return 1 

458 

459 config_parser = Config_Parser(trlc_mh, options.trlc_config_file, stab) 

460 try: 

461 config_parser.parse_config_file() 

462 except TRLC_Error: 

463 print("lobster-trlc: aborting due to error in" 

464 " configuration file '%s'" % options.trlc_config_file) 

465 return 1 

466 

467 items = [] 

468 for n_obj in stab.iter_record_objects(): 

469 try: 

470 item = config_parser.generate_lobster_object(n_obj) 

471 if item: 471 ↛ 468line 471 didn't jump to line 468 because the condition on line 471 was always true

472 items.append(item) 

473 except TRLC_Error: 

474 ok = False 

475 

476 if not ok: 476 ↛ 477line 476 didn't jump to line 477 because the condition on line 476 was never true

477 print("lobster-trlc: aborting due to error during extraction") 

478 return 1 

479 

480 with open(options.out, "w", encoding="UTF-8") as fd: 

481 # lobster-trace: trlc_req.Output_File 

482 lobster_write(fd=fd, 

483 kind=Requirement, 

484 generator="lobster-trlc", 

485 items=items) 

486 print("lobster-trlc: successfully wrote %u items to %s" % 

487 (len(items), options.out)) 

488 return 0 

489 

490 

491def main(): 

492 # lobster-trace: trlc_req.Dummy_Requirement 

493 tool = LOBSTER_Trlc() 

494 return tool.execute() 

495 

496 

497if __name__ == "__main__": 

498 sys.exit(main())