Coverage for lobster/tools/python/python.py: 0%

310 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-27 13:02 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_python - Extract Python tracing tags for LOBSTER 

4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20from argparse import Namespace 

21import sys 

22import os.path 

23import multiprocessing 

24import functools 

25import re 

26from typing import Optional, Sequence 

27 

28from libcst.metadata import PositionProvider 

29import libcst as cst 

30 

31from lobster.common.items import Tracing_Tag, Implementation, Activity 

32from lobster.common.location import File_Reference 

33from lobster.common.io import lobster_write 

34from lobster.common.meta_data_tool_base import MetaDataToolBase 

35 

36LOBSTER_TRACE_PREFIX = "# lobster-trace: " 

37LOBSTER_JUST_PREFIX = "# lobster-exclude: " 

38func_name = [] 

39 

40 

41def count_occurrence_of_last_function_from_function_name_list(function_names): 

42 """ 

43 Returns the last function and class name (if present) in a list along with 

44 the count of its previous occurrences. 

45 

46 The function identifies the last entry in the `function_names` list, extracts 

47 the function and class names (if applicable), and counts prior occurrences of 

48 the same function. 

49 The result is formatted as `module.class.function-count` or `module.function-count`. 

50 

51 Args: 

52 function_names (list): 

53 List of strings formatted as `module.class.function:line_number` 

54 or `module.function:line_number`. 

55 

56 Returns: 

57 str: The last function (and class if applicable) with its occurrence count, 

58 formatted as `module.class.function-count` or `module.function-count`. 

59 

60 Examples: 

61 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8'] 

62 returns: 'hello.add-2' 

63 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',] 

64 returns: 'Example.hello.add-2' 

65 """ 

66 function_and_file_name = re.split(r"[.:]", function_names[-1]) 

67 class_name_with_module = function_names[-1].split(':', 1)[0].split(".") 

68 

69 if len(class_name_with_module) == 3: 

70 function_and_file_name[1] = (class_name_with_module[1] + '.' + 

71 class_name_with_module[2]) 

72 

73 filename = function_and_file_name[0] 

74 last_function = function_and_file_name[1] 

75 count = 0 

76 for element in range(0, len(function_names) - 1): 

77 class_name_with_function = function_names[element].split(':', 1)[0].split(".") 

78 if len(class_name_with_function) == 3: 

79 if last_function == (class_name_with_function[1] + '.' + 

80 class_name_with_function[2]): 

81 count += 1 

82 if re.split(r"[.:]", function_names[element])[1] == last_function: 

83 count += 1 

84 function_name = (filename + "." + last_function + 

85 ("-" + str(count) if count > 0 else '')) 

86 

87 return function_name 

88 

89 

90def parse_value(val): 

91 if isinstance(val, cst.SimpleString): 

92 return val.value[1:-1] 

93 elif isinstance(val, cst.List): 

94 return [parse_value(item.value) 

95 for item in val.elements] 

96 else: 

97 rv = str(val.value) 

98 if rv == "None": 

99 rv = None 

100 return rv 

101 

102 

103class Python_Traceable_Node: 

104 def __init__(self, location, name, kind): 

105 assert isinstance(location, File_Reference) 

106 assert isinstance(name, str) 

107 assert isinstance(kind, str) 

108 self.location = location 

109 self.name = name 

110 self.kind = kind 

111 self.parent = None 

112 self.children = [] 

113 self.tags = [] 

114 self.just = [] 

115 

116 def register_tag(self, tag): 

117 assert isinstance(tag, Tracing_Tag) 

118 self.tags.append(tag) 

119 

120 def register_justification(self, justification): 

121 assert isinstance(justification, str) 

122 self.just.append(justification) 

123 

124 def set_parent(self, node): 

125 assert isinstance(node, Python_Traceable_Node) 

126 node.children.append(self) 

127 self.parent = node 

128 

129 def to_json(self): 

130 return {"kind" : self.kind, 

131 "name" : self.name, 

132 "tags" : [x.to_json() for x in self.tags], 

133 "just" : self.just, 

134 "children" : [x.to_json() for x in self.children]} 

135 

136 def to_lobster(self, schema, items): 

137 assert schema is Implementation or schema is Activity 

138 assert isinstance(items, list) 

139 assert False 

140 

141 def fqn(self): 

142 if self.parent: 

143 rv = self.parent.fqn() + "." 

144 else: 

145 rv = "" 

146 if self.location.line is not None and \ 

147 isinstance(self, Python_Function): 

148 rv += f"{self.name}:{str(self.location.line)}" 

149 else: 

150 rv += self.name 

151 return rv 

152 

153 def lobster_tag(self): 

154 return Tracing_Tag("python", self.fqn()) 

155 

156 def warn_ignored(self, reason): 

157 for tag in self.tags: 

158 print("%s: warning: ignored tag %s because " 

159 "%s already has annotations" % 

160 (self.location.to_string(), 

161 tag, 

162 reason)) 

163 for just in self.just: 

164 print("%s: warning: ignored justification '%s' because " 

165 "%s already has annotations" % 

166 (self.location.to_string(), 

167 just, 

168 reason)) 

169 

170 

171class Python_Module(Python_Traceable_Node): 

172 def __init__(self, location, name): 

173 super().__init__(location, name, "Module") 

174 

175 def to_lobster(self, schema, items): 

176 assert schema is Implementation or schema is Activity 

177 assert isinstance(items, list) 

178 for node in self.children: 

179 node.to_lobster(schema, items) 

180 

181 

182class Python_Class(Python_Traceable_Node): 

183 def __init__(self, location, name): 

184 super().__init__(location, name, "Class") 

185 

186 def to_lobster(self, schema, items): 

187 assert schema is Implementation or schema is Activity 

188 assert isinstance(items, list) 

189 # Classes are dealt with a bit differently. If you add a tag 

190 # or justification to a class, then children are ignored, and 

191 # we trace to the class. 

192 # 

193 # Alternatively, can leave out the tag and instead trace to 

194 # each child. 

195 

196 # First get child items 

197 class_contents = [] 

198 for node in self.children: 

199 node.to_lobster(schema, class_contents) 

200 

201 # If we're extracting pyunit/unittest items, then we always ignore 

202 # classes, but we do add our tags to all the tests. 

203 if schema is Activity: 

204 for item in class_contents: 

205 for tag in self.tags: 

206 item.add_tracing_target(tag) 

207 items += class_contents 

208 return 

209 

210 l_item = Implementation(tag = Tracing_Tag("python", 

211 self.fqn()), 

212 location = self.location, 

213 language = "Python", 

214 kind = self.kind, 

215 name = self.fqn()) 

216 

217 # If we have tags or justifications on the class itself, we 

218 # give precedence to that. 

219 if self.tags or self.just: 

220 for tag in self.tags: 

221 l_item.add_tracing_target(tag) 

222 l_item.just_up += self.just 

223 

224 for c_item in self.children: 

225 c_item.warn_ignored(self.name) 

226 

227 items.append(l_item) 

228 return 

229 

230 # Otherwise, we ignore the class and instead trace to each 

231 # child 

232 items += class_contents 

233 

234 

235class Python_Function(Python_Traceable_Node): 

236 def __init__(self, location, name): 

237 super().__init__(location, name, "Function") 

238 

239 def set_parent(self, node): 

240 assert isinstance(node, Python_Traceable_Node) 

241 node.children.append(self) 

242 self.parent = node 

243 if isinstance(node, Python_Class): 

244 if self.name == "__init__": 

245 self.kind = "Constructor" 

246 else: 

247 self.kind = "Method" 

248 

249 def to_lobster(self, schema, items): 

250 assert schema is Implementation or schema is Activity 

251 assert isinstance(items, list) 

252 

253 func_name.append(self.fqn()) 

254 tagname = count_occurrence_of_last_function_from_function_name_list( 

255 func_name 

256 ) 

257 pattern = r"[-]" 

258 val = re.split(pattern, tagname) 

259 name_value = val[0] 

260 

261 if schema is Implementation: 

262 l_item = Implementation(tag = Tracing_Tag("python", 

263 tagname), 

264 location = self.location, 

265 language = "Python", 

266 kind = self.kind, 

267 name = name_value) 

268 elif self.name.startswith("test") or self.name.startswith("_test") \ 

269 or self.name.endswith("test"): 

270 l_item = Activity(tag = Tracing_Tag("pyunit", 

271 self.fqn()), 

272 location = self.location, 

273 framework = "PyUnit", 

274 kind = "Test") 

275 else: 

276 return 

277 

278 for tag in self.tags: 

279 l_item.add_tracing_target(tag) 

280 l_item.just_up += self.just 

281 

282 # Any children of functions are not testable units. Their 

283 # tracing tags contribute to ours, but otherwise they don't 

284 # appear. 

285 nested_items = [] 

286 for node in self.children: 

287 node.to_lobster(schema, nested_items) 

288 for item in nested_items: 

289 # TODO: Warn about useless nested justifications 

290 # Merge tracing tags 

291 for tag in item.unresolved_references: 

292 l_item.add_tracing_target(tag) 

293 

294 items.append(l_item) 

295 

296 

297class Lobster_Visitor(cst.CSTVisitor): 

298 METADATA_DEPENDENCIES = (PositionProvider,) 

299 

300 def __init__(self, file_name, options): 

301 super().__init__() 

302 assert os.path.isfile(file_name) 

303 self.file_name = file_name 

304 

305 self.module = Python_Module( 

306 File_Reference(file_name), 

307 os.path.basename(file_name).replace(".py", "")) 

308 

309 self.activity = options["activity"] 

310 self.current_node = None 

311 self.stack = [self.module] 

312 

313 self.namespace = options["namespace"] 

314 self.exclude_untagged = options["exclude_untagged"] 

315 

316 self.decorator_name = options["decorator"] 

317 self.dec_arg_name = options["dec_arg_name"] 

318 self.dec_arg_version = options["dec_arg_version"] 

319 

320 def parse_dotted_name(self, name): 

321 if isinstance(name, cst.Call): 

322 return self.parse_dotted_name(name.func) 

323 elif isinstance(name, cst.Name): 

324 return name.value 

325 elif isinstance(name, cst.Attribute): 

326 # value -- prefix 

327 # attr -- postfix 

328 return "%s.%s" % (self.parse_dotted_name(name.value), 

329 self.parse_dotted_name(name.attr)) 

330 else: 

331 return None 

332 

333 def parse_decorators(self, decorators): 

334 for dec in decorators: 

335 dec_name = self.parse_dotted_name(dec.decorator) 

336 if dec_name is None: 

337 continue 

338 if dec_name != self.decorator_name: 

339 continue 

340 dec_args = {arg.keyword.value: parse_value(arg.value) 

341 for arg in dec.decorator.args} 

342 

343 # TODO: Better error messages if these assumptions are 

344 # violated 

345 assert self.dec_arg_name in dec_args 

346 if self.dec_arg_version: 

347 assert self.dec_arg_version in dec_args 

348 tag = Tracing_Tag(self.namespace, 

349 dec_args[self.dec_arg_name], 

350 dec_args.get(self.dec_arg_version, None)) 

351 self.current_node.register_tag(tag) 

352 

353 elif isinstance(dec_args[self.dec_arg_name], list): 

354 for item in dec_args[self.dec_arg_name]: 

355 tag = Tracing_Tag(self.namespace, item) 

356 self.current_node.register_tag(tag) 

357 

358 else: 

359 tag = Tracing_Tag(self.namespace, 

360 dec_args[self.dec_arg_name]) 

361 self.current_node.register_tag(tag) 

362 

363 def visit_ClassDef(self, node): 

364 line = self.get_metadata(PositionProvider, node).start.line 

365 loc = File_Reference(self.file_name, line) 

366 t_item = Python_Class(loc, node.name.value) 

367 t_item.set_parent(self.stack[-1]) 

368 self.stack.append(t_item) 

369 self.current_node = t_item 

370 self.parse_decorators(node.decorators) 

371 

372 def visit_FunctionDef(self, node): 

373 line = self.get_metadata(PositionProvider, node).start.line 

374 loc = File_Reference(self.file_name, line) 

375 t_item = Python_Function(loc, node.name.value) 

376 t_item.set_parent(self.stack[-1]) 

377 self.stack.append(t_item) 

378 self.current_node = t_item 

379 self.parse_decorators(node.decorators) 

380 

381 def leave_FunctionDef(self, original_node): 

382 self.stack.pop() 

383 self.current_node = self.stack[-1] 

384 

385 def leave_ClassDef(self, original_node): 

386 self.stack.pop() 

387 self.current_node = self.stack[-1] 

388 

389 def visit_Comment(self, node): 

390 line = self.get_metadata(PositionProvider, node).start.line 

391 # For some reason the comment in a class is associated with 

392 # its constructor. We can check if it preceeds it (by line), 

393 # and so associate it with the enclosing item. 

394 if self.current_node and \ 

395 self.current_node.location.line and \ 

396 self.current_node.location.line > line: 

397 actual = self.current_node.parent 

398 else: 

399 actual = self.current_node 

400 

401 if node.value.startswith(LOBSTER_TRACE_PREFIX): 

402 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip() 

403 actual.register_tag( 

404 Tracing_Tag.from_text(self.namespace, 

405 tag)) 

406 

407 elif node.value.startswith(LOBSTER_JUST_PREFIX): 

408 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip() 

409 actual.register_justification(reason) 

410 

411 

412def process_file(file_name, options): 

413 # pylint: disable=protected-access 

414 assert isinstance(file_name, str) 

415 assert isinstance(options, dict) 

416 

417 items = [] 

418 try: 

419 with open(file_name, "r", encoding="UTF-8") as fd: 

420 ast = cst.parse_module(fd.read()) 

421 

422 ast = cst.MetadataWrapper(ast) 

423 visitor = Lobster_Visitor(file_name, options) 

424 ast.visit(visitor) 

425 

426 if options["activity"]: 

427 visitor.module.to_lobster(Activity, items) 

428 else: 

429 visitor.module.to_lobster(Implementation, items) 

430 

431 if options["exclude_untagged"]: 

432 items = [item for item in items if item.unresolved_references] 

433 

434 return True, items 

435 

436 except cst._exceptions.ParserSyntaxError as exc: 

437 print(file_name, exc.message) 

438 return False, [] 

439 

440 except UnicodeDecodeError as exc: 

441 print(file_name, str(exc)) 

442 return False, [] 

443 

444 except Exception as exc: 

445 print("Unspecified issue in file: %s" % file_name) 

446 raise 

447 

448 

449class PythonTool(MetaDataToolBase): 

450 def __init__(self): 

451 super().__init__( 

452 name="python", 

453 description="Extract tracing tags from Python code or tests", 

454 official=True, 

455 ) 

456 ap = self._argument_parser 

457 ap.add_argument("files", 

458 nargs="+", 

459 metavar="FILE|DIR") 

460 ap.add_argument("--activity", 

461 action="store_true", 

462 default=False, 

463 help=("generate activity traces (tests) instead of" 

464 " an implementation trace")) 

465 ap.add_argument("--out", 

466 default=None) 

467 ap.add_argument("--single", 

468 action="store_true", 

469 default=False, 

470 help="don't multi-thread") 

471 ap.add_argument("--only-tagged-functions", 

472 default=False, 

473 action="store_true", 

474 help="only trace functions with tags") 

475 grp = ap.add_mutually_exclusive_group() 

476 grp.add_argument("--parse-decorator", 

477 nargs=2, 

478 metavar=("DECORATOR", "NAME_ARG"), 

479 default=(None, None)) 

480 grp.add_argument("--parse-versioned-decorator", 

481 nargs=3, 

482 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"), 

483 default=(None, None, None)) 

484 

485 def _run_impl(self, options: Namespace) -> int: 

486 file_list = [] 

487 for item in options.files: 

488 if os.path.isfile(item): 

489 file_list.append(item) 

490 elif os.path.isdir(item): 

491 for path, _, files in os.walk(item): 

492 for filename in files: 

493 _, ext = os.path.splitext(filename) 

494 if ext == ".py": 

495 file_list.append(os.path.join(path, filename)) 

496 else: 

497 self._argument_parser.error(f"{item} is not a file or directory") 

498 

499 context = { 

500 "activity" : options.activity, 

501 "decorator" : None, 

502 "dec_arg_name" : None, 

503 "dec_arg_version" : None, 

504 "exclude_untagged" : options.only_tagged_functions, 

505 "namespace" : "req", 

506 } 

507 

508 if options.parse_decorator[0] is not None: 

509 context["decorator"] = options.parse_decorator[0] 

510 context["dec_arg_name"] = options.parse_decorator[1] 

511 elif options.parse_versioned_decorator[0] is not None: 

512 context["decorator"] = options.parse_versioned_decorator[0] 

513 context["dec_arg_name"] = options.parse_versioned_decorator[1] 

514 context["dec_arg_version"] = options.parse_versioned_decorator[2] 

515 

516 pfun = functools.partial(process_file, options=context) 

517 items = [] 

518 ok = True 

519 

520 if options.single: 

521 for file_name in file_list: 

522 new_ok, new_items = pfun(file_name) 

523 ok &= new_ok 

524 items += new_items 

525 else: 

526 with multiprocessing.Pool() as pool: 

527 for new_ok, new_items in pool.imap_unordered(pfun, file_list): 

528 ok &= new_ok 

529 items += new_items 

530 

531 if options.activity: 

532 schema = Activity 

533 else: 

534 schema = Implementation 

535 

536 if options.out: 

537 with open(options.out, "w", encoding="UTF-8") as fd: 

538 lobster_write(fd, schema, "lobster_python", items) 

539 print(f"Written output for {len(items)} items to {options.out}") 

540 else: 

541 lobster_write(sys.stdout, schema, "lobster_python", items) 

542 print() 

543 

544 if ok: 

545 return 0 

546 else: 

547 print("Note: Earlier parse errors make actual output unreliable") 

548 return 1 

549 

550 

551def main(args: Optional[Sequence[str]] = None) -> int: 

552 return PythonTool().run(args)