Coverage for lobster/tools/python/python.py: 0%
311 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
1#!/usr/bin/env python3
2#
3# lobster_python - Extract Python tracing tags for LOBSTER
4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from argparse import Namespace
21import sys
22import os.path
23import multiprocessing
24import functools
25import re
26from typing import Optional, Sequence
28from libcst.metadata import PositionProvider
29import libcst as cst
31from lobster.common.items import Tracing_Tag, Implementation, Activity
32from lobster.common.location import File_Reference
33from lobster.common.io import lobster_write, ensure_output_directory
34from lobster.common.meta_data_tool_base import MetaDataToolBase
36LOBSTER_TRACE_PREFIX = "# lobster-trace: "
37LOBSTER_JUST_PREFIX = "# lobster-exclude: "
38func_name = []
41def count_occurrence_of_last_function_from_function_name_list(function_names):
42 """
43 Returns the last function and class name (if present) in a list along with
44 the count of its previous occurrences.
46 The function identifies the last entry in the `function_names` list, extracts
47 the function and class names (if applicable), and counts prior occurrences of
48 the same function.
49 The result is formatted as `module.class.function-count` or `module.function-count`.
51 Args:
52 function_names (list):
53 List of strings formatted as `module.class.function:line_number`
54 or `module.function:line_number`.
56 Returns:
57 str: The last function (and class if applicable) with its occurrence count,
58 formatted as `module.class.function-count` or `module.function-count`.
60 Examples:
61 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8']
62 returns: 'hello.add-2'
63 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',]
64 returns: 'Example.hello.add-2'
65 """
66 function_and_file_name = re.split(r"[.:]", function_names[-1])
67 class_name_with_module = function_names[-1].split(':', 1)[0].split(".")
69 if len(class_name_with_module) == 3:
70 function_and_file_name[1] = (class_name_with_module[1] + '.' +
71 class_name_with_module[2])
73 filename = function_and_file_name[0]
74 last_function = function_and_file_name[1]
75 count = 0
76 for element in range(0, len(function_names) - 1):
77 class_name_with_function = function_names[element].split(':', 1)[0].split(".")
78 if len(class_name_with_function) == 3:
79 if last_function == (class_name_with_function[1] + '.' +
80 class_name_with_function[2]):
81 count += 1
82 if re.split(r"[.:]", function_names[element])[1] == last_function:
83 count += 1
84 function_name = (filename + "." + last_function +
85 ("-" + str(count) if count > 0 else ''))
87 return function_name
90def parse_value(val):
91 if isinstance(val, cst.SimpleString):
92 return val.value[1:-1]
93 if isinstance(val, cst.List):
94 return [parse_value(item.value)
95 for item in val.elements]
97 rv = str(val.value)
98 if rv == "None":
99 rv = None
100 return rv
103class Python_Traceable_Node:
104 def __init__(self, location, name, kind):
105 assert isinstance(location, File_Reference)
106 assert isinstance(name, str)
107 assert isinstance(kind, str)
108 self.location = location
109 self.name = name
110 self.kind = kind
111 self.parent = None
112 self.children = []
113 self.tags = []
114 self.just = []
116 def register_tag(self, tag):
117 assert isinstance(tag, Tracing_Tag)
118 self.tags.append(tag)
120 def register_justification(self, justification):
121 assert isinstance(justification, str)
122 self.just.append(justification)
124 def set_parent(self, node):
125 assert isinstance(node, Python_Traceable_Node)
126 node.children.append(self)
127 self.parent = node
129 def to_json(self):
130 return {"kind" : self.kind,
131 "name" : self.name,
132 "tags" : [x.to_json() for x in self.tags],
133 "just" : self.just,
134 "children" : [x.to_json() for x in self.children]}
136 def to_lobster(self, schema, items):
137 assert schema is Implementation or schema is Activity
138 assert isinstance(items, list)
139 assert False
141 def fqn(self):
142 if self.parent:
143 rv = self.parent.fqn() + "."
144 else:
145 rv = ""
146 if self.location.line is not None and \
147 isinstance(self, Python_Function):
148 rv += f"{self.name}:{str(self.location.line)}"
149 else:
150 rv += self.name
151 return rv
153 def lobster_tag(self):
154 return Tracing_Tag("python", self.fqn())
156 def warn_ignored(self, reason):
157 for tag in self.tags:
158 print(f"{self.location.to_string()}: warning: ignored tag {tag}"
159 f" because {reason} already has annotations")
160 for just in self.just:
161 print(f"{self.location.to_string()}: warning: "
162 f"ignored justification '{just}' "
163 f"because {reason} already has annotations")
166class Python_Module(Python_Traceable_Node):
167 def __init__(self, location, name):
168 super().__init__(location, name, "Module")
170 def to_lobster(self, schema, items):
171 assert schema is Implementation or schema is Activity
172 assert isinstance(items, list)
173 for node in self.children:
174 node.to_lobster(schema, items)
177class Python_Class(Python_Traceable_Node):
178 def __init__(self, location, name):
179 super().__init__(location, name, "Class")
181 def to_lobster(self, schema, items):
182 assert schema is Implementation or schema is Activity
183 assert isinstance(items, list)
184 # Classes are dealt with a bit differently. If you add a tag
185 # or justification to a class, then children are ignored, and
186 # we trace to the class.
187 #
188 # Alternatively, can leave out the tag and instead trace to
189 # each child.
191 # First get child items
192 class_contents = []
193 for node in self.children:
194 node.to_lobster(schema, class_contents)
196 # If we're extracting pyunit/unittest items, then we always ignore
197 # classes, but we do add our tags to all the tests.
198 if schema is Activity:
199 for item in class_contents:
200 for tag in self.tags:
201 item.add_tracing_target(tag)
202 items += class_contents
203 return
205 l_item = Implementation(tag = Tracing_Tag("python",
206 self.fqn()),
207 location = self.location,
208 language = "Python",
209 kind = self.kind,
210 name = self.fqn())
212 # If we have tags or justifications on the class itself, we
213 # give precedence to that.
214 if self.tags or self.just:
215 for tag in self.tags:
216 l_item.add_tracing_target(tag)
217 l_item.just_up += self.just
219 for c_item in self.children:
220 c_item.warn_ignored(self.name)
222 items.append(l_item)
223 return
225 # Otherwise, we ignore the class and instead trace to each
226 # child
227 items += class_contents
230class Python_Function(Python_Traceable_Node):
231 def __init__(self, location, name):
232 super().__init__(location, name, "Function")
234 def set_parent(self, node):
235 assert isinstance(node, Python_Traceable_Node)
236 node.children.append(self)
237 self.parent = node
238 if isinstance(node, Python_Class):
239 if self.name == "__init__":
240 self.kind = "Constructor"
241 else:
242 self.kind = "Method"
244 def to_lobster(self, schema, items):
245 assert schema is Implementation or schema is Activity
246 assert isinstance(items, list)
248 func_name.append(self.fqn())
249 tagname = count_occurrence_of_last_function_from_function_name_list(
250 func_name
251 )
252 pattern = r"[-]"
253 val = re.split(pattern, tagname)
254 name_value = val[0]
256 if schema is Implementation:
257 l_item = Implementation(tag = Tracing_Tag("python",
258 tagname),
259 location = self.location,
260 language = "Python",
261 kind = self.kind,
262 name = name_value)
263 elif self.name.startswith("test") or self.name.startswith("_test") \
264 or self.name.endswith("test"):
265 l_item = Activity(tag = Tracing_Tag("pyunit",
266 self.fqn()),
267 location = self.location,
268 framework = "PyUnit",
269 kind = "Test")
270 else:
271 return
273 for tag in self.tags:
274 l_item.add_tracing_target(tag)
275 l_item.just_up += self.just
277 # Any children of functions are not testable units. Their
278 # tracing tags contribute to ours, but otherwise they don't
279 # appear.
280 nested_items = []
281 for node in self.children:
282 node.to_lobster(schema, nested_items)
283 for item in nested_items:
284 # TODO: Warn about useless nested justifications
285 # Merge tracing tags
286 for tag in item.unresolved_references:
287 l_item.add_tracing_target(tag)
289 items.append(l_item)
292class Lobster_Visitor(cst.CSTVisitor):
293 METADATA_DEPENDENCIES = (PositionProvider,)
295 def __init__(self, file_name, options):
296 super().__init__()
297 assert os.path.isfile(file_name)
298 self.file_name = file_name
300 self.module = Python_Module(
301 File_Reference(file_name),
302 os.path.basename(file_name).replace(".py", ""))
304 self.activity = options["activity"]
305 self.current_node = None
306 self.stack = [self.module]
308 self.namespace = options["namespace"]
309 self.exclude_untagged = options["exclude_untagged"]
311 self.decorator_name = options["decorator"]
312 self.dec_arg_name = options["dec_arg_name"]
313 self.dec_arg_version = options["dec_arg_version"]
315 def parse_dotted_name(self, name):
316 if isinstance(name, cst.Call):
317 return self.parse_dotted_name(name.func)
318 if isinstance(name, cst.Name):
319 return name.value
320 if isinstance(name, cst.Attribute):
321 # value -- prefix
322 # attr -- postfix
323 return f"{self.parse_dotted_name(name.value)}." \
324 f"{self.parse_dotted_name(name.attr)}"
325 return None
327 def parse_decorators(self, decorators):
328 for dec in decorators:
329 dec_name = self.parse_dotted_name(dec.decorator)
330 if dec_name is None:
331 continue
332 if dec_name != self.decorator_name:
333 continue
334 dec_args = {arg.keyword.value: parse_value(arg.value)
335 for arg in dec.decorator.args}
337 # TODO: Better error messages if these assumptions are
338 # violated
339 assert self.dec_arg_name in dec_args
340 if self.dec_arg_version:
341 assert self.dec_arg_version in dec_args
342 tag = Tracing_Tag(self.namespace,
343 dec_args[self.dec_arg_name],
344 dec_args.get(self.dec_arg_version, None))
345 self.current_node.register_tag(tag)
347 elif isinstance(dec_args[self.dec_arg_name], list):
348 for item in dec_args[self.dec_arg_name]:
349 tag = Tracing_Tag(self.namespace, item)
350 self.current_node.register_tag(tag)
352 else:
353 tag = Tracing_Tag(self.namespace,
354 dec_args[self.dec_arg_name])
355 self.current_node.register_tag(tag)
357 def visit_ClassDef(self, node):
358 line = self.get_metadata(PositionProvider, node).start.line
359 loc = File_Reference(self.file_name, line)
360 t_item = Python_Class(loc, node.name.value)
361 t_item.set_parent(self.stack[-1])
362 self.stack.append(t_item)
363 self.current_node = t_item
364 self.parse_decorators(node.decorators)
366 def visit_FunctionDef(self, node):
367 line = self.get_metadata(PositionProvider, node).start.line
368 loc = File_Reference(self.file_name, line)
369 t_item = Python_Function(loc, node.name.value)
370 t_item.set_parent(self.stack[-1])
371 self.stack.append(t_item)
372 self.current_node = t_item
373 self.parse_decorators(node.decorators)
375 def leave_FunctionDef(self, original_node):
376 self.stack.pop()
377 self.current_node = self.stack[-1]
379 def leave_ClassDef(self, original_node):
380 self.stack.pop()
381 self.current_node = self.stack[-1]
383 def visit_Comment(self, node):
384 line = self.get_metadata(PositionProvider, node).start.line
385 # For some reason the comment in a class is associated with
386 # its constructor. We can check if it preceeds it (by line),
387 # and so associate it with the enclosing item.
388 if self.current_node and \
389 self.current_node.location.line and \
390 self.current_node.location.line > line:
391 actual = self.current_node.parent
392 else:
393 actual = self.current_node
395 if node.value.startswith(LOBSTER_TRACE_PREFIX):
396 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip()
397 actual.register_tag(
398 Tracing_Tag.from_text(self.namespace,
399 tag))
401 elif node.value.startswith(LOBSTER_JUST_PREFIX):
402 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip()
403 actual.register_justification(reason)
406def process_file(file_name, options):
407 # pylint: disable=protected-access
408 assert isinstance(file_name, str)
409 assert isinstance(options, dict)
411 items = []
412 try:
413 with open(file_name, encoding="UTF-8") as fd:
414 ast = cst.parse_module(fd.read())
416 ast = cst.MetadataWrapper(ast)
417 visitor = Lobster_Visitor(file_name, options)
418 ast.visit(visitor)
420 if options["activity"]:
421 visitor.module.to_lobster(Activity, items)
422 else:
423 visitor.module.to_lobster(Implementation, items)
425 if options["exclude_untagged"]:
426 items = [item for item in items if item.unresolved_references]
428 return True, items
430 except cst._exceptions.ParserSyntaxError as exc:
431 print(file_name, exc.message)
432 return False, []
434 except UnicodeDecodeError as exc:
435 print(file_name, str(exc))
436 return False, []
438 except Exception as exc:
439 print(f"Unspecified issue in file: {file_name}")
440 raise
443class PythonTool(MetaDataToolBase):
444 def __init__(self):
445 super().__init__(
446 name="python",
447 description="Extract tracing tags from Python code or tests",
448 official=True,
449 )
450 ap = self._argument_parser
451 ap.add_argument("files",
452 nargs="+",
453 metavar="FILE|DIR")
454 ap.add_argument("--activity",
455 action="store_true",
456 default=False,
457 help=("generate activity traces (tests) instead of"
458 " an implementation trace"))
459 ap.add_argument("--out",
460 default=None)
461 ap.add_argument("--single",
462 action="store_true",
463 default=False,
464 help="don't multi-thread")
465 ap.add_argument("--only-tagged-functions",
466 default=False,
467 action="store_true",
468 help="only trace functions with tags")
469 grp = ap.add_mutually_exclusive_group()
470 grp.add_argument("--parse-decorator",
471 nargs=2,
472 metavar=("DECORATOR", "NAME_ARG"),
473 default=(None, None))
474 grp.add_argument("--parse-versioned-decorator",
475 nargs=3,
476 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"),
477 default=(None, None, None))
479 def _run_impl(self, options: Namespace) -> int:
480 file_list = []
481 for item in options.files:
482 if os.path.isfile(item):
483 file_list.append(item)
484 elif os.path.isdir(item):
485 for path, _, files in os.walk(item):
486 for filename in files:
487 _, ext = os.path.splitext(filename)
488 if ext == ".py":
489 file_list.append(os.path.join(path, filename))
490 else:
491 self._argument_parser.error(f"{item} is not a file or directory")
493 context = {
494 "activity" : options.activity,
495 "decorator" : None,
496 "dec_arg_name" : None,
497 "dec_arg_version" : None,
498 "exclude_untagged" : options.only_tagged_functions,
499 "namespace" : "req",
500 }
502 if options.parse_decorator[0] is not None:
503 context["decorator"] = options.parse_decorator[0]
504 context["dec_arg_name"] = options.parse_decorator[1]
505 elif options.parse_versioned_decorator[0] is not None:
506 context["decorator"] = options.parse_versioned_decorator[0]
507 context["dec_arg_name"] = options.parse_versioned_decorator[1]
508 context["dec_arg_version"] = options.parse_versioned_decorator[2]
510 pfun = functools.partial(process_file, options=context)
511 items = []
512 ok = True
514 if options.single:
515 for file_name in file_list:
516 new_ok, new_items = pfun(file_name)
517 ok &= new_ok
518 items += new_items
519 else:
520 with multiprocessing.Pool() as pool:
521 for new_ok, new_items in pool.imap_unordered(pfun, file_list):
522 ok &= new_ok
523 items += new_items
525 if options.activity:
526 schema = Activity
527 else:
528 schema = Implementation
530 if options.out:
531 ensure_output_directory(options.out)
532 with open(options.out, "w", encoding="UTF-8") as fd:
533 lobster_write(fd, schema, "lobster_python", items)
534 print(f"Written output for {len(items)} items to {options.out}")
535 else:
536 lobster_write(sys.stdout, schema, "lobster_python", items)
537 print()
539 if ok:
540 return 0
542 print("Note: Earlier parse errors make actual output unreliable")
543 return 1
546def main(args: Optional[Sequence[str]] = None) -> int:
547 return PythonTool().run(args)