Coverage for lobster/tools/python/python.py: 0%

306 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 14:55 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_python - Extract Python tracing tags for LOBSTER 

4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20import sys 

21import argparse 

22import os.path 

23import multiprocessing 

24import functools 

25import re 

26 

27from libcst.metadata import PositionProvider 

28import libcst as cst 

29 

30from lobster.items import Tracing_Tag, Implementation, Activity 

31from lobster.location import File_Reference 

32from lobster.io import lobster_write 

33from lobster.version import get_version 

34 

35LOBSTER_TRACE_PREFIX = "# lobster-trace: " 

36LOBSTER_JUST_PREFIX = "# lobster-exclude: " 

37func_name = [] 

38 

39 

40def count_occurrence_of_last_function_from_function_name_list(function_names): 

41 """ 

42 Returns the last function and class name (if present) in a list along with 

43 the count of its previous occurrences. 

44 

45 The function identifies the last entry in the `function_names` list, extracts 

46 the function and class names (if applicable), and counts prior occurrences of 

47 the same function. 

48 The result is formatted as `module.class.function-count` or `module.function-count`. 

49 

50 Args: 

51 function_names (list): 

52 List of strings formatted as `module.class.function:line_number` 

53 or `module.function:line_number`. 

54 

55 Returns: 

56 str: The last function (and class if applicable) with its occurrence count, 

57 formatted as `module.class.function-count` or `module.function-count`. 

58 

59 Examples: 

60 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8'] 

61 returns: 'hello.add-2' 

62 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',] 

63 returns: 'Example.hello.add-2' 

64 """ 

65 function_and_file_name = re.split(r"[.:]", function_names[-1]) 

66 class_name_with_module = function_names[-1].split(':', 1)[0].split(".") 

67 

68 if len(class_name_with_module) == 3: 

69 function_and_file_name[1] = (class_name_with_module[1] + '.' + 

70 class_name_with_module[2]) 

71 

72 filename = function_and_file_name[0] 

73 last_function = function_and_file_name[1] 

74 count = 0 

75 for element in range(0, len(function_names) - 1): 

76 class_name_with_function = function_names[element].split(':', 1)[0].split(".") 

77 if len(class_name_with_function) == 3: 

78 if last_function == (class_name_with_function[1] + '.' + 

79 class_name_with_function[2]): 

80 count += 1 

81 if re.split(r"[.:]", function_names[element])[1] == last_function: 

82 count += 1 

83 function_name = (filename + "." + last_function + 

84 ("-" + str(count) if count > 0 else '')) 

85 

86 return function_name 

87 

88 

89def parse_value(val): 

90 if isinstance(val, cst.SimpleString): 

91 return val.value[1:-1] 

92 elif isinstance(val, cst.List): 

93 return [parse_value(item.value) 

94 for item in val.elements] 

95 else: 

96 rv = str(val.value) 

97 if rv == "None": 

98 rv = None 

99 return rv 

100 

101 

102class Python_Traceable_Node: 

103 def __init__(self, location, name, kind): 

104 assert isinstance(location, File_Reference) 

105 assert isinstance(name, str) 

106 assert isinstance(kind, str) 

107 self.location = location 

108 self.name = name 

109 self.kind = kind 

110 self.parent = None 

111 self.children = [] 

112 self.tags = [] 

113 self.just = [] 

114 

115 def register_tag(self, tag): 

116 assert isinstance(tag, Tracing_Tag) 

117 self.tags.append(tag) 

118 

119 def register_justification(self, justification): 

120 assert isinstance(justification, str) 

121 self.just.append(justification) 

122 

123 def set_parent(self, node): 

124 assert isinstance(node, Python_Traceable_Node) 

125 node.children.append(self) 

126 self.parent = node 

127 

128 def to_json(self): 

129 return {"kind" : self.kind, 

130 "name" : self.name, 

131 "tags" : [x.to_json() for x in self.tags], 

132 "just" : self.just, 

133 "children" : [x.to_json() for x in self.children]} 

134 

135 def to_lobster(self, schema, items): 

136 assert schema is Implementation or schema is Activity 

137 assert isinstance(items, list) 

138 assert False 

139 

140 def fqn(self): 

141 if self.parent: 

142 rv = self.parent.fqn() + "." 

143 else: 

144 rv = "" 

145 if self.location.line is not None and \ 

146 isinstance(self, Python_Function): 

147 rv += f"{self.name}:{str(self.location.line)}" 

148 else: 

149 rv += self.name 

150 return rv 

151 

152 def lobster_tag(self): 

153 return Tracing_Tag("python", self.fqn()) 

154 

155 def warn_ignored(self, reason): 

156 for tag in self.tags: 

157 print("%s: warning: ignored tag %s because " 

158 "%s already has annotations" % 

159 (self.location.to_string(), 

160 tag, 

161 reason)) 

162 for just in self.just: 

163 print("%s: warning: ignored justification '%s' because " 

164 "%s already has annotations" % 

165 (self.location.to_string(), 

166 just, 

167 reason)) 

168 

169 

170class Python_Module(Python_Traceable_Node): 

171 def __init__(self, location, name): 

172 super().__init__(location, name, "Module") 

173 

174 def to_lobster(self, schema, items): 

175 assert schema is Implementation or schema is Activity 

176 assert isinstance(items, list) 

177 for node in self.children: 

178 node.to_lobster(schema, items) 

179 

180 

181class Python_Class(Python_Traceable_Node): 

182 def __init__(self, location, name): 

183 super().__init__(location, name, "Class") 

184 

185 def to_lobster(self, schema, items): 

186 assert schema is Implementation or schema is Activity 

187 assert isinstance(items, list) 

188 # Classes are dealt with a bit differently. If you add a tag 

189 # or justification to a class, then children are ignored, and 

190 # we trace to the class. 

191 # 

192 # Alternatively, can leave out the tag and instead trace to 

193 # each child. 

194 

195 # First get child items 

196 class_contents = [] 

197 for node in self.children: 

198 node.to_lobster(schema, class_contents) 

199 

200 # If we're extracting pyunit/unittest items, then we always ignore 

201 # classes, but we do add our tags to all the tests. 

202 if schema is Activity: 

203 for item in class_contents: 

204 for tag in self.tags: 

205 item.add_tracing_target(tag) 

206 items += class_contents 

207 return 

208 

209 l_item = Implementation(tag = Tracing_Tag("python", 

210 self.fqn()), 

211 location = self.location, 

212 language = "Python", 

213 kind = self.kind, 

214 name = self.fqn()) 

215 

216 # If we have tags or justifications on the class itself, we 

217 # give precedence to that. 

218 if self.tags or self.just: 

219 for tag in self.tags: 

220 l_item.add_tracing_target(tag) 

221 l_item.just_up += self.just 

222 

223 for c_item in self.children: 

224 c_item.warn_ignored(self.name) 

225 

226 items.append(l_item) 

227 return 

228 

229 # Otherwise, we ignore the class and instead trace to each 

230 # child 

231 items += class_contents 

232 

233 

234class Python_Function(Python_Traceable_Node): 

235 def __init__(self, location, name): 

236 super().__init__(location, name, "Function") 

237 

238 def set_parent(self, node): 

239 assert isinstance(node, Python_Traceable_Node) 

240 node.children.append(self) 

241 self.parent = node 

242 if isinstance(node, Python_Class): 

243 if self.name == "__init__": 

244 self.kind = "Constructor" 

245 else: 

246 self.kind = "Method" 

247 

248 def to_lobster(self, schema, items): 

249 assert schema is Implementation or schema is Activity 

250 assert isinstance(items, list) 

251 

252 func_name.append(self.fqn()) 

253 tagname = count_occurrence_of_last_function_from_function_name_list( 

254 func_name 

255 ) 

256 pattern = r"[-]" 

257 val = re.split(pattern, tagname) 

258 name_value = val[0] 

259 

260 if schema is Implementation: 

261 l_item = Implementation(tag = Tracing_Tag("python", 

262 tagname), 

263 location = self.location, 

264 language = "Python", 

265 kind = self.kind, 

266 name = name_value) 

267 elif self.name.startswith("test") or self.name.startswith("_test") \ 

268 or self.name.endswith("test"): 

269 l_item = Activity(tag = Tracing_Tag("pyunit", 

270 self.fqn()), 

271 location = self.location, 

272 framework = "PyUnit", 

273 kind = "Test") 

274 else: 

275 return 

276 

277 for tag in self.tags: 

278 l_item.add_tracing_target(tag) 

279 l_item.just_up += self.just 

280 

281 # Any children of functions are not testable units. Their 

282 # tracing tags contribute to ours, but otherwise they don't 

283 # appear. 

284 nested_items = [] 

285 for node in self.children: 

286 node.to_lobster(schema, nested_items) 

287 for item in nested_items: 

288 # TODO: Warn about useless nested justifications 

289 # Merge tracing tags 

290 for tag in item.unresolved_references: 

291 l_item.add_tracing_target(tag) 

292 

293 items.append(l_item) 

294 

295 

296class Lobster_Visitor(cst.CSTVisitor): 

297 METADATA_DEPENDENCIES = (PositionProvider,) 

298 

299 def __init__(self, file_name, options): 

300 super().__init__() 

301 assert os.path.isfile(file_name) 

302 self.file_name = file_name 

303 

304 self.module = Python_Module( 

305 File_Reference(file_name), 

306 os.path.basename(file_name).replace(".py", "")) 

307 

308 self.activity = options["activity"] 

309 self.current_node = None 

310 self.stack = [self.module] 

311 

312 self.namespace = options["namespace"] 

313 self.exclude_untagged = options["exclude_untagged"] 

314 

315 self.decorator_name = options["decorator"] 

316 self.dec_arg_name = options["dec_arg_name"] 

317 self.dec_arg_version = options["dec_arg_version"] 

318 

319 def parse_dotted_name(self, name): 

320 if isinstance(name, cst.Call): 

321 return self.parse_dotted_name(name.func) 

322 elif isinstance(name, cst.Name): 

323 return name.value 

324 elif isinstance(name, cst.Attribute): 

325 # value -- prefix 

326 # attr -- postfix 

327 return "%s.%s" % (self.parse_dotted_name(name.value), 

328 self.parse_dotted_name(name.attr)) 

329 else: 

330 return None 

331 

332 def parse_decorators(self, decorators): 

333 for dec in decorators: 

334 dec_name = self.parse_dotted_name(dec.decorator) 

335 if dec_name is None: 

336 continue 

337 if dec_name != self.decorator_name: 

338 continue 

339 dec_args = {arg.keyword.value: parse_value(arg.value) 

340 for arg in dec.decorator.args} 

341 

342 # TODO: Better error messages if these assumptions are 

343 # violated 

344 assert self.dec_arg_name in dec_args 

345 if self.dec_arg_version: 

346 assert self.dec_arg_version in dec_args 

347 tag = Tracing_Tag(self.namespace, 

348 dec_args[self.dec_arg_name], 

349 dec_args.get(self.dec_arg_version, None)) 

350 self.current_node.register_tag(tag) 

351 

352 elif isinstance(dec_args[self.dec_arg_name], list): 

353 for item in dec_args[self.dec_arg_name]: 

354 tag = Tracing_Tag(self.namespace, item) 

355 self.current_node.register_tag(tag) 

356 

357 else: 

358 tag = Tracing_Tag(self.namespace, 

359 dec_args[self.dec_arg_name]) 

360 self.current_node.register_tag(tag) 

361 

362 def visit_ClassDef(self, node): 

363 line = self.get_metadata(PositionProvider, node).start.line 

364 loc = File_Reference(self.file_name, line) 

365 t_item = Python_Class(loc, node.name.value) 

366 t_item.set_parent(self.stack[-1]) 

367 self.stack.append(t_item) 

368 self.current_node = t_item 

369 self.parse_decorators(node.decorators) 

370 

371 def visit_FunctionDef(self, node): 

372 line = self.get_metadata(PositionProvider, node).start.line 

373 loc = File_Reference(self.file_name, line) 

374 t_item = Python_Function(loc, node.name.value) 

375 t_item.set_parent(self.stack[-1]) 

376 self.stack.append(t_item) 

377 self.current_node = t_item 

378 self.parse_decorators(node.decorators) 

379 

380 def leave_FunctionDef(self, original_node): 

381 self.stack.pop() 

382 self.current_node = self.stack[-1] 

383 

384 def leave_ClassDef(self, original_node): 

385 self.stack.pop() 

386 self.current_node = self.stack[-1] 

387 

388 def visit_Comment(self, node): 

389 line = self.get_metadata(PositionProvider, node).start.line 

390 # For some reason the comment in a class is associated with 

391 # its constructor. We can check if it preceeds it (by line), 

392 # and so associate it with the enclosing item. 

393 if self.current_node and \ 

394 self.current_node.location.line and \ 

395 self.current_node.location.line > line: 

396 actual = self.current_node.parent 

397 else: 

398 actual = self.current_node 

399 

400 if node.value.startswith(LOBSTER_TRACE_PREFIX): 

401 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip() 

402 actual.register_tag( 

403 Tracing_Tag.from_text(self.namespace, 

404 tag)) 

405 

406 elif node.value.startswith(LOBSTER_JUST_PREFIX): 

407 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip() 

408 actual.register_justification(reason) 

409 

410 

411def process_file(file_name, options): 

412 # pylint: disable=protected-access 

413 assert isinstance(file_name, str) 

414 assert isinstance(options, dict) 

415 

416 items = [] 

417 try: 

418 with open(file_name, "r", encoding="UTF-8") as fd: 

419 ast = cst.parse_module(fd.read()) 

420 

421 ast = cst.MetadataWrapper(ast) 

422 visitor = Lobster_Visitor(file_name, options) 

423 ast.visit(visitor) 

424 

425 if options["activity"]: 

426 visitor.module.to_lobster(Activity, items) 

427 else: 

428 visitor.module.to_lobster(Implementation, items) 

429 

430 if options["exclude_untagged"]: 

431 items = [item for item in items if item.unresolved_references] 

432 

433 return True, items 

434 

435 except cst._exceptions.ParserSyntaxError as exc: 

436 print(file_name, exc.message) 

437 return False, [] 

438 

439 except UnicodeDecodeError as exc: 

440 print(file_name, str(exc)) 

441 return False, [] 

442 

443 except Exception as exc: 

444 print("Unspecified issue in file: %s" % file_name) 

445 raise 

446 

447 

448ap = argparse.ArgumentParser() 

449 

450 

451@get_version(ap) 

452def main(): 

453 # lobster-trace: python_req.Dummy_Requirement 

454 ap.add_argument("files", 

455 nargs="+", 

456 metavar="FILE|DIR") 

457 ap.add_argument("--activity", 

458 action="store_true", 

459 default=False, 

460 help=("generate activity traces (tests) instead of" 

461 " an implementation trace")) 

462 ap.add_argument("--out", 

463 default=None) 

464 ap.add_argument("--single", 

465 action="store_true", 

466 default=False, 

467 help="don't multi-thread") 

468 ap.add_argument("--only-tagged-functions", 

469 default=False, 

470 action="store_true", 

471 help="only trace functions with tags") 

472 grp = ap.add_mutually_exclusive_group() 

473 grp.add_argument("--parse-decorator", 

474 nargs=2, 

475 metavar=("DECORATOR", "NAME_ARG"), 

476 default=(None, None)) 

477 grp.add_argument("--parse-versioned-decorator", 

478 nargs=3, 

479 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"), 

480 default=(None, None, None)) 

481 

482 options = ap.parse_args() 

483 

484 file_list = [] 

485 for item in options.files: 

486 if os.path.isfile(item): 

487 file_list.append(item) 

488 elif os.path.isdir(item): 

489 for path, _, files in os.walk(item): 

490 for filename in files: 

491 _, ext = os.path.splitext(filename) 

492 if ext == ".py": 

493 file_list.append(os.path.join(path, filename)) 

494 else: 

495 ap.error("%s is not a file or directory" % item) 

496 

497 context = { 

498 "activity" : options.activity, 

499 "decorator" : None, 

500 "dec_arg_name" : None, 

501 "dec_arg_version" : None, 

502 "exclude_untagged" : options.only_tagged_functions, 

503 "namespace" : "req", 

504 } 

505 

506 if options.parse_decorator[0] is not None: 

507 context["decorator"] = options.parse_decorator[0] 

508 context["dec_arg_name"] = options.parse_decorator[1] 

509 elif options.parse_versioned_decorator[0] is not None: 

510 context["decorator"] = options.parse_versioned_decorator[0] 

511 context["dec_arg_name"] = options.parse_versioned_decorator[1] 

512 context["dec_arg_version"] = options.parse_versioned_decorator[2] 

513 

514 pfun = functools.partial(process_file, options=context) 

515 items = [] 

516 ok = True 

517 

518 if options.single: 

519 for file_name in file_list: 

520 new_ok, new_items = pfun(file_name) 

521 ok &= new_ok 

522 items += new_items 

523 else: 

524 with multiprocessing.Pool() as pool: 

525 for new_ok, new_items in pool.imap_unordered(pfun, file_list): 

526 ok &= new_ok 

527 items += new_items 

528 

529 if options.activity: 

530 schema = Activity 

531 else: 

532 schema = Implementation 

533 

534 if options.out: 

535 with open(options.out, "w", encoding="UTF-8") as fd: 

536 lobster_write(fd, schema, "lobster_python", items) 

537 print("Written output for %u items to %s" % (len(items), 

538 options.out)) 

539 else: 

540 lobster_write(sys.stdout, schema, "lobster_python", items) 

541 print() 

542 

543 if ok: 

544 return 0 

545 else: 

546 print("Note: Earlier parse errors make actual output unreliable") 

547 return 1 

548 

549 

550if __name__ == "__main__": 

551 sys.exit(main())