Coverage for lobster/tools/python/python.py: 67%

311 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-12 15:02 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_python - Extract Python tracing tags for LOBSTER 

4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20from argparse import Namespace 

21import sys 

22import os.path 

23import multiprocessing 

24import functools 

25import re 

26from typing import Optional, Sequence 

27 

28from libcst.metadata import PositionProvider 

29import libcst as cst 

30 

31from lobster.common.items import Tracing_Tag, Implementation, Activity 

32from lobster.common.location import File_Reference 

33from lobster.common.io import lobster_write, ensure_output_directory 

34from lobster.common.meta_data_tool_base import MetaDataToolBase 

35 

36LOBSTER_TRACE_PREFIX = "# lobster-trace: " 

37LOBSTER_JUST_PREFIX = "# lobster-exclude: " 

38func_name = [] 

39 

40 

41def count_occurrence_of_last_function_from_function_name_list(function_names): 

42 """ 

43 Returns the last function and class name (if present) in a list along with 

44 the count of its previous occurrences. 

45 

46 The function identifies the last entry in the `function_names` list, extracts 

47 the function and class names (if applicable), and counts prior occurrences of 

48 the same function. 

49 The result is formatted as `module.class.function-count` or `module.function-count`. 

50 

51 Args: 

52 function_names (list): 

53 List of strings formatted as `module.class.function:line_number` 

54 or `module.function:line_number`. 

55 

56 Returns: 

57 str: The last function (and class if applicable) with its occurrence count, 

58 formatted as `module.class.function-count` or `module.function-count`. 

59 

60 Examples: 

61 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8'] 

62 returns: 'hello.add-2' 

63 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',] 

64 returns: 'Example.hello.add-2' 

65 """ 

66 function_and_file_name = re.split(r"[.:]", function_names[-1]) 

67 class_name_with_module = function_names[-1].split(':', 1)[0].split(".") 

68 

69 if len(class_name_with_module) == 3: 

70 function_and_file_name[1] = (class_name_with_module[1] + '.' + 

71 class_name_with_module[2]) 

72 

73 filename = function_and_file_name[0] 

74 last_function = function_and_file_name[1] 

75 count = 0 

76 for element in range(0, len(function_names) - 1): 

77 class_name_with_function = function_names[element].split(':', 1)[0].split(".") 

78 if len(class_name_with_function) == 3: 

79 if last_function == (class_name_with_function[1] + '.' + 79 ↛ 81line 79 didn't jump to line 81 because the condition on line 79 was never true

80 class_name_with_function[2]): 

81 count += 1 

82 if re.split(r"[.:]", function_names[element])[1] == last_function: 

83 count += 1 

84 function_name = (filename + "." + last_function + 

85 ("-" + str(count) if count > 0 else '')) 

86 

87 return function_name 

88 

89 

90def parse_value(val): 

91 if isinstance(val, cst.SimpleString): 

92 return val.value[1:-1] 

93 if isinstance(val, cst.List): 

94 return [parse_value(item.value) 

95 for item in val.elements] 

96 

97 rv = str(val.value) 

98 if rv == "None": 

99 rv = None 

100 return rv 

101 

102 

103class Python_Traceable_Node: 

104 def __init__(self, location, name, kind): 

105 assert isinstance(location, File_Reference) 

106 assert isinstance(name, str) 

107 assert isinstance(kind, str) 

108 self.location = location 

109 self.name = name 

110 self.kind = kind 

111 self.parent = None 

112 self.children = [] 

113 self.tags = [] 

114 self.just = [] 

115 

116 def register_tag(self, tag): 

117 assert isinstance(tag, Tracing_Tag) 

118 self.tags.append(tag) 

119 

120 def register_justification(self, justification): 

121 assert isinstance(justification, str) 

122 self.just.append(justification) 

123 

124 def set_parent(self, node): 

125 assert isinstance(node, Python_Traceable_Node) 

126 node.children.append(self) 

127 self.parent = node 

128 

129 def to_json(self): 

130 return {"kind" : self.kind, 

131 "name" : self.name, 

132 "tags" : [x.to_json() for x in self.tags], 

133 "just" : self.just, 

134 "children" : [x.to_json() for x in self.children]} 

135 

136 def to_lobster(self, schema, items): 

137 assert schema is Implementation or schema is Activity 

138 assert isinstance(items, list) 

139 assert False 

140 

141 def fqn(self): 

142 if self.parent: 

143 rv = self.parent.fqn() + "." 

144 else: 

145 rv = "" 

146 if self.location.line is not None and \ 

147 isinstance(self, Python_Function): 

148 rv += f"{self.name}:{str(self.location.line)}" 

149 else: 

150 rv += self.name 

151 return rv 

152 

153 def lobster_tag(self): 

154 return Tracing_Tag("python", self.fqn()) 

155 

156 def warn_ignored(self, reason): 

157 for tag in self.tags: 

158 print(f"{self.location.to_string()}: warning: ignored tag {tag}" 

159 f" because {reason} already has annotations") 

160 for just in self.just: 

161 print(f"{self.location.to_string()}: warning: " 

162 f"ignored justification '{just}' " 

163 f"because {reason} already has annotations") 

164 

165 

166class Python_Module(Python_Traceable_Node): 

167 def __init__(self, location, name): 

168 super().__init__(location, name, "Module") 

169 

170 def to_lobster(self, schema, items): 

171 assert schema is Implementation or schema is Activity 

172 assert isinstance(items, list) 

173 for node in self.children: 

174 node.to_lobster(schema, items) 

175 

176 

177class Python_Class(Python_Traceable_Node): 

178 def __init__(self, location, name): 

179 super().__init__(location, name, "Class") 

180 

181 def to_lobster(self, schema, items): 

182 assert schema is Implementation or schema is Activity 

183 assert isinstance(items, list) 

184 # Classes are dealt with a bit differently. If you add a tag 

185 # or justification to a class, then children are ignored, and 

186 # we trace to the class. 

187 # 

188 # Alternatively, can leave out the tag and instead trace to 

189 # each child. 

190 

191 # First get child items 

192 class_contents = [] 

193 for node in self.children: 

194 node.to_lobster(schema, class_contents) 

195 

196 # If we're extracting pyunit/unittest items, then we always ignore 

197 # classes, but we do add our tags to all the tests. 

198 if schema is Activity: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 for item in class_contents: 

200 for tag in self.tags: 

201 item.add_tracing_target(tag) 

202 items += class_contents 

203 return 

204 

205 l_item = Implementation(tag = Tracing_Tag("python", 

206 self.fqn()), 

207 location = self.location, 

208 language = "Python", 

209 kind = self.kind, 

210 name = self.fqn()) 

211 

212 # If we have tags or justifications on the class itself, we 

213 # give precedence to that. 

214 if self.tags or self.just: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 for tag in self.tags: 

216 l_item.add_tracing_target(tag) 

217 l_item.just_up += self.just 

218 

219 for c_item in self.children: 

220 c_item.warn_ignored(self.name) 

221 

222 items.append(l_item) 

223 return 

224 

225 # Otherwise, we ignore the class and instead trace to each 

226 # child 

227 items += class_contents 

228 

229 

230class Python_Function(Python_Traceable_Node): 

231 def __init__(self, location, name): 

232 super().__init__(location, name, "Function") 

233 

234 def set_parent(self, node): 

235 assert isinstance(node, Python_Traceable_Node) 

236 node.children.append(self) 

237 self.parent = node 

238 if isinstance(node, Python_Class): 

239 if self.name == "__init__": 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 self.kind = "Constructor" 

241 else: 

242 self.kind = "Method" 

243 

244 def to_lobster(self, schema, items): 

245 assert schema is Implementation or schema is Activity 

246 assert isinstance(items, list) 

247 

248 func_name.append(self.fqn()) 

249 tagname = count_occurrence_of_last_function_from_function_name_list( 

250 func_name 

251 ) 

252 pattern = r"[-]" 

253 val = re.split(pattern, tagname) 

254 name_value = val[0] 

255 

256 if schema is Implementation: 256 ↛ 263line 256 didn't jump to line 263 because the condition on line 256 was always true

257 l_item = Implementation(tag = Tracing_Tag("python", 

258 tagname), 

259 location = self.location, 

260 language = "Python", 

261 kind = self.kind, 

262 name = name_value) 

263 elif self.name.startswith("test") or self.name.startswith("_test") \ 

264 or self.name.endswith("test"): 

265 l_item = Activity(tag = Tracing_Tag("pyunit", 

266 self.fqn()), 

267 location = self.location, 

268 framework = "PyUnit", 

269 kind = "Test") 

270 else: 

271 return 

272 

273 for tag in self.tags: 

274 l_item.add_tracing_target(tag) 

275 l_item.just_up += self.just 

276 

277 # Any children of functions are not testable units. Their 

278 # tracing tags contribute to ours, but otherwise they don't 

279 # appear. 

280 nested_items = [] 

281 for node in self.children: 

282 node.to_lobster(schema, nested_items) 

283 for item in nested_items: 

284 # TODO: Warn about useless nested justifications 

285 # Merge tracing tags 

286 for tag in item.unresolved_references: 286 ↛ 287line 286 didn't jump to line 287 because the loop on line 286 never started

287 l_item.add_tracing_target(tag) 

288 

289 items.append(l_item) 

290 

291 

292class Lobster_Visitor(cst.CSTVisitor): 

293 METADATA_DEPENDENCIES = (PositionProvider,) 

294 

295 def __init__(self, file_name, options): 

296 super().__init__() 

297 assert os.path.isfile(file_name) 

298 self.file_name = file_name 

299 

300 self.module = Python_Module( 

301 File_Reference(file_name), 

302 os.path.basename(file_name).replace(".py", "")) 

303 

304 self.activity = options["activity"] 

305 self.current_node = None 

306 self.stack = [self.module] 

307 

308 self.namespace = options["namespace"] 

309 self.exclude_untagged = options["exclude_untagged"] 

310 

311 self.decorator_name = options["decorator"] 

312 self.dec_arg_name = options["dec_arg_name"] 

313 self.dec_arg_version = options["dec_arg_version"] 

314 

315 def parse_dotted_name(self, name): 

316 if isinstance(name, cst.Call): 

317 return self.parse_dotted_name(name.func) 

318 if isinstance(name, cst.Name): 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true

319 return name.value 

320 if isinstance(name, cst.Attribute): 

321 # value -- prefix 

322 # attr -- postfix 

323 return f"{self.parse_dotted_name(name.value)}." \ 

324 f"{self.parse_dotted_name(name.attr)}" 

325 return None 

326 

327 def parse_decorators(self, decorators): 

328 for dec in decorators: 

329 dec_name = self.parse_dotted_name(dec.decorator) 

330 if dec_name is None: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 continue 

332 if dec_name != self.decorator_name: 332 ↛ 334line 332 didn't jump to line 334 because the condition on line 332 was always true

333 continue 

334 dec_args = {arg.keyword.value: parse_value(arg.value) 

335 for arg in dec.decorator.args} 

336 

337 # TODO: Better error messages if these assumptions are 

338 # violated 

339 assert self.dec_arg_name in dec_args 

340 if self.dec_arg_version: 

341 assert self.dec_arg_version in dec_args 

342 tag = Tracing_Tag(self.namespace, 

343 dec_args[self.dec_arg_name], 

344 dec_args.get(self.dec_arg_version, None)) 

345 self.current_node.register_tag(tag) 

346 

347 elif isinstance(dec_args[self.dec_arg_name], list): 

348 for item in dec_args[self.dec_arg_name]: 

349 tag = Tracing_Tag(self.namespace, item) 

350 self.current_node.register_tag(tag) 

351 

352 else: 

353 tag = Tracing_Tag(self.namespace, 

354 dec_args[self.dec_arg_name]) 

355 self.current_node.register_tag(tag) 

356 

357 def visit_ClassDef(self, node): 

358 line = self.get_metadata(PositionProvider, node).start.line 

359 loc = File_Reference(self.file_name, line) 

360 t_item = Python_Class(loc, node.name.value) 

361 t_item.set_parent(self.stack[-1]) 

362 self.stack.append(t_item) 

363 self.current_node = t_item 

364 self.parse_decorators(node.decorators) 

365 

366 def visit_FunctionDef(self, node): 

367 line = self.get_metadata(PositionProvider, node).start.line 

368 loc = File_Reference(self.file_name, line) 

369 t_item = Python_Function(loc, node.name.value) 

370 t_item.set_parent(self.stack[-1]) 

371 self.stack.append(t_item) 

372 self.current_node = t_item 

373 self.parse_decorators(node.decorators) 

374 

375 def leave_FunctionDef(self, original_node): 

376 self.stack.pop() 

377 self.current_node = self.stack[-1] 

378 

379 def leave_ClassDef(self, original_node): 

380 self.stack.pop() 

381 self.current_node = self.stack[-1] 

382 

383 def visit_Comment(self, node): 

384 line = self.get_metadata(PositionProvider, node).start.line 

385 # For some reason the comment in a class is associated with 

386 # its constructor. We can check if it preceeds it (by line), 

387 # and so associate it with the enclosing item. 

388 if self.current_node and \ 

389 self.current_node.location.line and \ 

390 self.current_node.location.line > line: 

391 actual = self.current_node.parent 

392 else: 

393 actual = self.current_node 

394 

395 if node.value.startswith(LOBSTER_TRACE_PREFIX): 

396 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip() 

397 actual.register_tag( 

398 Tracing_Tag.from_text(self.namespace, 

399 tag)) 

400 

401 elif node.value.startswith(LOBSTER_JUST_PREFIX): 

402 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip() 

403 actual.register_justification(reason) 

404 

405 

406def process_file(file_name, options): 

407 # pylint: disable=protected-access 

408 assert isinstance(file_name, str) 

409 assert isinstance(options, dict) 

410 

411 items = [] 

412 try: 

413 with open(file_name, encoding="UTF-8") as fd: 

414 ast = cst.parse_module(fd.read()) 

415 

416 ast = cst.MetadataWrapper(ast) 

417 visitor = Lobster_Visitor(file_name, options) 

418 ast.visit(visitor) 

419 

420 if options["activity"]: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true

421 visitor.module.to_lobster(Activity, items) 

422 else: 

423 visitor.module.to_lobster(Implementation, items) 

424 

425 if options["exclude_untagged"]: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 items = [item for item in items if item.unresolved_references] 

427 

428 return True, items 

429 

430 except cst._exceptions.ParserSyntaxError as exc: 

431 print(file_name, exc.message) 

432 return False, [] 

433 

434 except UnicodeDecodeError as exc: 

435 print(file_name, str(exc)) 

436 return False, [] 

437 

438 except Exception as exc: 

439 print(f"Unspecified issue in file: {file_name}") 

440 raise 

441 

442 

443class PythonTool(MetaDataToolBase): 

444 def __init__(self): 

445 super().__init__( 

446 name="python", 

447 description="Extract tracing tags from Python code or tests", 

448 official=True, 

449 ) 

450 ap = self._argument_parser 

451 ap.add_argument("files", 

452 nargs="+", 

453 metavar="FILE|DIR") 

454 ap.add_argument("--activity", 

455 action="store_true", 

456 default=False, 

457 help=("generate activity traces (tests) instead of" 

458 " an implementation trace")) 

459 ap.add_argument("--out", 

460 default=None) 

461 ap.add_argument("--single", 

462 action="store_true", 

463 default=False, 

464 help="don't multi-thread") 

465 ap.add_argument("--only-tagged-functions", 

466 default=False, 

467 action="store_true", 

468 help="only trace functions with tags") 

469 grp = ap.add_mutually_exclusive_group() 

470 grp.add_argument("--parse-decorator", 

471 nargs=2, 

472 metavar=("DECORATOR", "NAME_ARG"), 

473 default=(None, None)) 

474 grp.add_argument("--parse-versioned-decorator", 

475 nargs=3, 

476 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"), 

477 default=(None, None, None)) 

478 

479 def _run_impl(self, options: Namespace) -> int: 

480 file_list = [] 

481 for item in options.files: 

482 if os.path.isfile(item): 482 ↛ 484line 482 didn't jump to line 484 because the condition on line 482 was always true

483 file_list.append(item) 

484 elif os.path.isdir(item): 

485 for path, _, files in os.walk(item): 

486 for filename in files: 

487 _, ext = os.path.splitext(filename) 

488 if ext == ".py": 

489 file_list.append(os.path.join(path, filename)) 

490 else: 

491 self._argument_parser.error(f"{item} is not a file or directory") 

492 

493 context = { 

494 "activity" : options.activity, 

495 "decorator" : None, 

496 "dec_arg_name" : None, 

497 "dec_arg_version" : None, 

498 "exclude_untagged" : options.only_tagged_functions, 

499 "namespace" : "req", 

500 } 

501 

502 if options.parse_decorator[0] is not None: 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 context["decorator"] = options.parse_decorator[0] 

504 context["dec_arg_name"] = options.parse_decorator[1] 

505 elif options.parse_versioned_decorator[0] is not None: 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true

506 context["decorator"] = options.parse_versioned_decorator[0] 

507 context["dec_arg_name"] = options.parse_versioned_decorator[1] 

508 context["dec_arg_version"] = options.parse_versioned_decorator[2] 

509 

510 pfun = functools.partial(process_file, options=context) 

511 items = [] 

512 ok = True 

513 

514 if options.single: 514 ↛ 520line 514 didn't jump to line 520 because the condition on line 514 was always true

515 for file_name in file_list: 

516 new_ok, new_items = pfun(file_name) 

517 ok &= new_ok 

518 items += new_items 

519 else: 

520 with multiprocessing.Pool() as pool: 

521 for new_ok, new_items in pool.imap_unordered(pfun, file_list): 

522 ok &= new_ok 

523 items += new_items 

524 

525 if options.activity: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true

526 schema = Activity 

527 else: 

528 schema = Implementation 

529 

530 if options.out: 530 ↛ 536line 530 didn't jump to line 536 because the condition on line 530 was always true

531 ensure_output_directory(options.out) 

532 with open(options.out, "w", encoding="UTF-8") as fd: 

533 lobster_write(fd, schema, "lobster_python", items) 

534 print(f"Written output for {len(items)} items to {options.out}") 

535 else: 

536 lobster_write(sys.stdout, schema, "lobster_python", items) 

537 print() 

538 

539 if ok: 539 ↛ 542line 539 didn't jump to line 542 because the condition on line 539 was always true

540 return 0 

541 

542 print("Note: Earlier parse errors make actual output unreliable") 

543 return 1 

544 

545 

546def main(args: Optional[Sequence[str]] = None) -> int: 

547 return PythonTool().run(args)