Coverage for lobster/tools/python/python.py: 0%

311 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-04-16 05:31 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_python - Extract Python tracing tags for LOBSTER 

4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20from argparse import Namespace 

21import sys 

22import os.path 

23import multiprocessing 

24import functools 

25import re 

26from typing import Optional, Sequence 

27 

28from libcst.metadata import PositionProvider 

29import libcst as cst 

30 

31from lobster.common.items import Tracing_Tag, Implementation, Activity 

32from lobster.common.location import File_Reference 

33from lobster.common.io import lobster_write, ensure_output_directory 

34from lobster.common.meta_data_tool_base import MetaDataToolBase 

35 

36LOBSTER_TRACE_PREFIX = "# lobster-trace: " 

37LOBSTER_JUST_PREFIX = "# lobster-exclude: " 

38func_name = [] 

39 

40 

41def count_occurrence_of_last_function_from_function_name_list(function_names): 

42 """ 

43 Returns the last function and class name (if present) in a list along with 

44 the count of its previous occurrences. 

45 

46 The function identifies the last entry in the `function_names` list, extracts 

47 the function and class names (if applicable), and counts prior occurrences of 

48 the same function. 

49 The result is formatted as `module.class.function-count` or `module.function-count`. 

50 

51 Args: 

52 function_names (list): 

53 List of strings formatted as `module.class.function:line_number` 

54 or `module.function:line_number`. 

55 

56 Returns: 

57 str: The last function (and class if applicable) with its occurrence count, 

58 formatted as `module.class.function-count` or `module.function-count`. 

59 

60 Examples: 

61 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8'] 

62 returns: 'hello.add-2' 

63 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',] 

64 returns: 'Example.hello.add-2' 

65 """ 

66 function_and_file_name = re.split(r"[.:]", function_names[-1]) 

67 class_name_with_module = function_names[-1].split(':', 1)[0].split(".") 

68 

69 if len(class_name_with_module) == 3: 

70 function_and_file_name[1] = (class_name_with_module[1] + '.' + 

71 class_name_with_module[2]) 

72 

73 filename = function_and_file_name[0] 

74 last_function = function_and_file_name[1] 

75 count = 0 

76 for element in range(0, len(function_names) - 1): 

77 class_name_with_function = function_names[element].split(':', 1)[0].split(".") 

78 if len(class_name_with_function) == 3: 

79 if last_function == (class_name_with_function[1] + '.' + 

80 class_name_with_function[2]): 

81 count += 1 

82 if re.split(r"[.:]", function_names[element])[1] == last_function: 

83 count += 1 

84 function_name = (filename + "." + last_function + 

85 ("-" + str(count) if count > 0 else '')) 

86 

87 return function_name 

88 

89 

90def parse_value(val): 

91 if isinstance(val, cst.SimpleString): 

92 return val.value[1:-1] 

93 if isinstance(val, cst.List): 

94 return [parse_value(item.value) 

95 for item in val.elements] 

96 

97 rv = str(val.value) 

98 if rv == "None": 

99 rv = None 

100 return rv 

101 

102 

103class Python_Traceable_Node: 

104 def __init__(self, location, name, kind): 

105 assert isinstance(location, File_Reference) 

106 assert isinstance(name, str) 

107 assert isinstance(kind, str) 

108 self.location = location 

109 self.name = name 

110 self.kind = kind 

111 self.parent = None 

112 self.children = [] 

113 self.tags = [] 

114 self.just = [] 

115 

116 def register_tag(self, tag): 

117 assert isinstance(tag, Tracing_Tag) 

118 self.tags.append(tag) 

119 

120 def register_justification(self, justification): 

121 assert isinstance(justification, str) 

122 self.just.append(justification) 

123 

124 def set_parent(self, node): 

125 assert isinstance(node, Python_Traceable_Node) 

126 node.children.append(self) 

127 self.parent = node 

128 

129 def to_json(self): 

130 return {"kind" : self.kind, 

131 "name" : self.name, 

132 "tags" : [x.to_json() for x in self.tags], 

133 "just" : self.just, 

134 "children" : [x.to_json() for x in self.children]} 

135 

136 def to_lobster(self, schema, items): 

137 assert schema is Implementation or schema is Activity 

138 assert isinstance(items, list) 

139 assert False 

140 

141 def fqn(self): 

142 if self.parent: 

143 rv = self.parent.fqn() + "." 

144 else: 

145 rv = "" 

146 if self.location.line is not None and \ 

147 isinstance(self, Python_Function): 

148 rv += f"{self.name}:{str(self.location.line)}" 

149 else: 

150 rv += self.name 

151 return rv 

152 

153 def lobster_tag(self): 

154 return Tracing_Tag("python", self.fqn()) 

155 

156 def warn_ignored(self, reason): 

157 for tag in self.tags: 

158 print("%s: warning: ignored tag %s because " 

159 "%s already has annotations" % 

160 (self.location.to_string(), 

161 tag, 

162 reason)) 

163 for just in self.just: 

164 print("%s: warning: ignored justification '%s' because " 

165 "%s already has annotations" % 

166 (self.location.to_string(), 

167 just, 

168 reason)) 

169 

170 

171class Python_Module(Python_Traceable_Node): 

172 def __init__(self, location, name): 

173 super().__init__(location, name, "Module") 

174 

175 def to_lobster(self, schema, items): 

176 assert schema is Implementation or schema is Activity 

177 assert isinstance(items, list) 

178 for node in self.children: 

179 node.to_lobster(schema, items) 

180 

181 

182class Python_Class(Python_Traceable_Node): 

183 def __init__(self, location, name): 

184 super().__init__(location, name, "Class") 

185 

186 def to_lobster(self, schema, items): 

187 assert schema is Implementation or schema is Activity 

188 assert isinstance(items, list) 

189 # Classes are dealt with a bit differently. If you add a tag 

190 # or justification to a class, then children are ignored, and 

191 # we trace to the class. 

192 # 

193 # Alternatively, can leave out the tag and instead trace to 

194 # each child. 

195 

196 # First get child items 

197 class_contents = [] 

198 for node in self.children: 

199 node.to_lobster(schema, class_contents) 

200 

201 # If we're extracting pyunit/unittest items, then we always ignore 

202 # classes, but we do add our tags to all the tests. 

203 if schema is Activity: 

204 for item in class_contents: 

205 for tag in self.tags: 

206 item.add_tracing_target(tag) 

207 items += class_contents 

208 return 

209 

210 l_item = Implementation(tag = Tracing_Tag("python", 

211 self.fqn()), 

212 location = self.location, 

213 language = "Python", 

214 kind = self.kind, 

215 name = self.fqn()) 

216 

217 # If we have tags or justifications on the class itself, we 

218 # give precedence to that. 

219 if self.tags or self.just: 

220 for tag in self.tags: 

221 l_item.add_tracing_target(tag) 

222 l_item.just_up += self.just 

223 

224 for c_item in self.children: 

225 c_item.warn_ignored(self.name) 

226 

227 items.append(l_item) 

228 return 

229 

230 # Otherwise, we ignore the class and instead trace to each 

231 # child 

232 items += class_contents 

233 

234 

235class Python_Function(Python_Traceable_Node): 

236 def __init__(self, location, name): 

237 super().__init__(location, name, "Function") 

238 

239 def set_parent(self, node): 

240 assert isinstance(node, Python_Traceable_Node) 

241 node.children.append(self) 

242 self.parent = node 

243 if isinstance(node, Python_Class): 

244 if self.name == "__init__": 

245 self.kind = "Constructor" 

246 else: 

247 self.kind = "Method" 

248 

249 def to_lobster(self, schema, items): 

250 assert schema is Implementation or schema is Activity 

251 assert isinstance(items, list) 

252 

253 func_name.append(self.fqn()) 

254 tagname = count_occurrence_of_last_function_from_function_name_list( 

255 func_name 

256 ) 

257 pattern = r"[-]" 

258 val = re.split(pattern, tagname) 

259 name_value = val[0] 

260 

261 if schema is Implementation: 

262 l_item = Implementation(tag = Tracing_Tag("python", 

263 tagname), 

264 location = self.location, 

265 language = "Python", 

266 kind = self.kind, 

267 name = name_value) 

268 elif self.name.startswith("test") or self.name.startswith("_test") \ 

269 or self.name.endswith("test"): 

270 l_item = Activity(tag = Tracing_Tag("pyunit", 

271 self.fqn()), 

272 location = self.location, 

273 framework = "PyUnit", 

274 kind = "Test") 

275 else: 

276 return 

277 

278 for tag in self.tags: 

279 l_item.add_tracing_target(tag) 

280 l_item.just_up += self.just 

281 

282 # Any children of functions are not testable units. Their 

283 # tracing tags contribute to ours, but otherwise they don't 

284 # appear. 

285 nested_items = [] 

286 for node in self.children: 

287 node.to_lobster(schema, nested_items) 

288 for item in nested_items: 

289 # TODO: Warn about useless nested justifications 

290 # Merge tracing tags 

291 for tag in item.unresolved_references: 

292 l_item.add_tracing_target(tag) 

293 

294 items.append(l_item) 

295 

296 

297class Lobster_Visitor(cst.CSTVisitor): 

298 METADATA_DEPENDENCIES = (PositionProvider,) 

299 

300 def __init__(self, file_name, options): 

301 super().__init__() 

302 assert os.path.isfile(file_name) 

303 self.file_name = file_name 

304 

305 self.module = Python_Module( 

306 File_Reference(file_name), 

307 os.path.basename(file_name).replace(".py", "")) 

308 

309 self.activity = options["activity"] 

310 self.current_node = None 

311 self.stack = [self.module] 

312 

313 self.namespace = options["namespace"] 

314 self.exclude_untagged = options["exclude_untagged"] 

315 

316 self.decorator_name = options["decorator"] 

317 self.dec_arg_name = options["dec_arg_name"] 

318 self.dec_arg_version = options["dec_arg_version"] 

319 

320 def parse_dotted_name(self, name): 

321 if isinstance(name, cst.Call): 

322 return self.parse_dotted_name(name.func) 

323 if isinstance(name, cst.Name): 

324 return name.value 

325 if isinstance(name, cst.Attribute): 

326 # value -- prefix 

327 # attr -- postfix 

328 return "%s.%s" % (self.parse_dotted_name(name.value), 

329 self.parse_dotted_name(name.attr)) 

330 return None 

331 

332 def parse_decorators(self, decorators): 

333 for dec in decorators: 

334 dec_name = self.parse_dotted_name(dec.decorator) 

335 if dec_name is None: 

336 continue 

337 if dec_name != self.decorator_name: 

338 continue 

339 dec_args = {arg.keyword.value: parse_value(arg.value) 

340 for arg in dec.decorator.args} 

341 

342 # TODO: Better error messages if these assumptions are 

343 # violated 

344 assert self.dec_arg_name in dec_args 

345 if self.dec_arg_version: 

346 assert self.dec_arg_version in dec_args 

347 tag = Tracing_Tag(self.namespace, 

348 dec_args[self.dec_arg_name], 

349 dec_args.get(self.dec_arg_version, None)) 

350 self.current_node.register_tag(tag) 

351 

352 elif isinstance(dec_args[self.dec_arg_name], list): 

353 for item in dec_args[self.dec_arg_name]: 

354 tag = Tracing_Tag(self.namespace, item) 

355 self.current_node.register_tag(tag) 

356 

357 else: 

358 tag = Tracing_Tag(self.namespace, 

359 dec_args[self.dec_arg_name]) 

360 self.current_node.register_tag(tag) 

361 

362 def visit_ClassDef(self, node): 

363 line = self.get_metadata(PositionProvider, node).start.line 

364 loc = File_Reference(self.file_name, line) 

365 t_item = Python_Class(loc, node.name.value) 

366 t_item.set_parent(self.stack[-1]) 

367 self.stack.append(t_item) 

368 self.current_node = t_item 

369 self.parse_decorators(node.decorators) 

370 

371 def visit_FunctionDef(self, node): 

372 line = self.get_metadata(PositionProvider, node).start.line 

373 loc = File_Reference(self.file_name, line) 

374 t_item = Python_Function(loc, node.name.value) 

375 t_item.set_parent(self.stack[-1]) 

376 self.stack.append(t_item) 

377 self.current_node = t_item 

378 self.parse_decorators(node.decorators) 

379 

380 def leave_FunctionDef(self, original_node): 

381 self.stack.pop() 

382 self.current_node = self.stack[-1] 

383 

384 def leave_ClassDef(self, original_node): 

385 self.stack.pop() 

386 self.current_node = self.stack[-1] 

387 

388 def visit_Comment(self, node): 

389 line = self.get_metadata(PositionProvider, node).start.line 

390 # For some reason the comment in a class is associated with 

391 # its constructor. We can check if it preceeds it (by line), 

392 # and so associate it with the enclosing item. 

393 if self.current_node and \ 

394 self.current_node.location.line and \ 

395 self.current_node.location.line > line: 

396 actual = self.current_node.parent 

397 else: 

398 actual = self.current_node 

399 

400 if node.value.startswith(LOBSTER_TRACE_PREFIX): 

401 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip() 

402 actual.register_tag( 

403 Tracing_Tag.from_text(self.namespace, 

404 tag)) 

405 

406 elif node.value.startswith(LOBSTER_JUST_PREFIX): 

407 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip() 

408 actual.register_justification(reason) 

409 

410 

411def process_file(file_name, options): 

412 # pylint: disable=protected-access 

413 assert isinstance(file_name, str) 

414 assert isinstance(options, dict) 

415 

416 items = [] 

417 try: 

418 with open(file_name, "r", encoding="UTF-8") as fd: 

419 ast = cst.parse_module(fd.read()) 

420 

421 ast = cst.MetadataWrapper(ast) 

422 visitor = Lobster_Visitor(file_name, options) 

423 ast.visit(visitor) 

424 

425 if options["activity"]: 

426 visitor.module.to_lobster(Activity, items) 

427 else: 

428 visitor.module.to_lobster(Implementation, items) 

429 

430 if options["exclude_untagged"]: 

431 items = [item for item in items if item.unresolved_references] 

432 

433 return True, items 

434 

435 except cst._exceptions.ParserSyntaxError as exc: 

436 print(file_name, exc.message) 

437 return False, [] 

438 

439 except UnicodeDecodeError as exc: 

440 print(file_name, str(exc)) 

441 return False, [] 

442 

443 except Exception as exc: 

444 print("Unspecified issue in file: %s" % file_name) 

445 raise 

446 

447 

448class PythonTool(MetaDataToolBase): 

449 def __init__(self): 

450 super().__init__( 

451 name="python", 

452 description="Extract tracing tags from Python code or tests", 

453 official=True, 

454 ) 

455 ap = self._argument_parser 

456 ap.add_argument("files", 

457 nargs="+", 

458 metavar="FILE|DIR") 

459 ap.add_argument("--activity", 

460 action="store_true", 

461 default=False, 

462 help=("generate activity traces (tests) instead of" 

463 " an implementation trace")) 

464 ap.add_argument("--out", 

465 default=None) 

466 ap.add_argument("--single", 

467 action="store_true", 

468 default=False, 

469 help="don't multi-thread") 

470 ap.add_argument("--only-tagged-functions", 

471 default=False, 

472 action="store_true", 

473 help="only trace functions with tags") 

474 grp = ap.add_mutually_exclusive_group() 

475 grp.add_argument("--parse-decorator", 

476 nargs=2, 

477 metavar=("DECORATOR", "NAME_ARG"), 

478 default=(None, None)) 

479 grp.add_argument("--parse-versioned-decorator", 

480 nargs=3, 

481 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"), 

482 default=(None, None, None)) 

483 

484 def _run_impl(self, options: Namespace) -> int: 

485 file_list = [] 

486 for item in options.files: 

487 if os.path.isfile(item): 

488 file_list.append(item) 

489 elif os.path.isdir(item): 

490 for path, _, files in os.walk(item): 

491 for filename in files: 

492 _, ext = os.path.splitext(filename) 

493 if ext == ".py": 

494 file_list.append(os.path.join(path, filename)) 

495 else: 

496 self._argument_parser.error(f"{item} is not a file or directory") 

497 

498 context = { 

499 "activity" : options.activity, 

500 "decorator" : None, 

501 "dec_arg_name" : None, 

502 "dec_arg_version" : None, 

503 "exclude_untagged" : options.only_tagged_functions, 

504 "namespace" : "req", 

505 } 

506 

507 if options.parse_decorator[0] is not None: 

508 context["decorator"] = options.parse_decorator[0] 

509 context["dec_arg_name"] = options.parse_decorator[1] 

510 elif options.parse_versioned_decorator[0] is not None: 

511 context["decorator"] = options.parse_versioned_decorator[0] 

512 context["dec_arg_name"] = options.parse_versioned_decorator[1] 

513 context["dec_arg_version"] = options.parse_versioned_decorator[2] 

514 

515 pfun = functools.partial(process_file, options=context) 

516 items = [] 

517 ok = True 

518 

519 if options.single: 

520 for file_name in file_list: 

521 new_ok, new_items = pfun(file_name) 

522 ok &= new_ok 

523 items += new_items 

524 else: 

525 with multiprocessing.Pool() as pool: 

526 for new_ok, new_items in pool.imap_unordered(pfun, file_list): 

527 ok &= new_ok 

528 items += new_items 

529 

530 if options.activity: 

531 schema = Activity 

532 else: 

533 schema = Implementation 

534 

535 if options.out: 

536 ensure_output_directory(options.out) 

537 with open(options.out, "w", encoding="UTF-8") as fd: 

538 lobster_write(fd, schema, "lobster_python", items) 

539 print(f"Written output for {len(items)} items to {options.out}") 

540 else: 

541 lobster_write(sys.stdout, schema, "lobster_python", items) 

542 print() 

543 

544 if ok: 

545 return 0 

546 

547 print("Note: Earlier parse errors make actual output unreliable") 

548 return 1 

549 

550 

551def main(args: Optional[Sequence[str]] = None) -> int: 

552 return PythonTool().run(args)