Coverage for lobster/tools/python/python.py: 0%
311 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-16 05:31 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-04-16 05:31 +0000
1#!/usr/bin/env python3
2#
3# lobster_python - Extract Python tracing tags for LOBSTER
4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from argparse import Namespace
21import sys
22import os.path
23import multiprocessing
24import functools
25import re
26from typing import Optional, Sequence
28from libcst.metadata import PositionProvider
29import libcst as cst
31from lobster.common.items import Tracing_Tag, Implementation, Activity
32from lobster.common.location import File_Reference
33from lobster.common.io import lobster_write, ensure_output_directory
34from lobster.common.meta_data_tool_base import MetaDataToolBase
36LOBSTER_TRACE_PREFIX = "# lobster-trace: "
37LOBSTER_JUST_PREFIX = "# lobster-exclude: "
38func_name = []
41def count_occurrence_of_last_function_from_function_name_list(function_names):
42 """
43 Returns the last function and class name (if present) in a list along with
44 the count of its previous occurrences.
46 The function identifies the last entry in the `function_names` list, extracts
47 the function and class names (if applicable), and counts prior occurrences of
48 the same function.
49 The result is formatted as `module.class.function-count` or `module.function-count`.
51 Args:
52 function_names (list):
53 List of strings formatted as `module.class.function:line_number`
54 or `module.function:line_number`.
56 Returns:
57 str: The last function (and class if applicable) with its occurrence count,
58 formatted as `module.class.function-count` or `module.function-count`.
60 Examples:
61 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8']
62 returns: 'hello.add-2'
63 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',]
64 returns: 'Example.hello.add-2'
65 """
66 function_and_file_name = re.split(r"[.:]", function_names[-1])
67 class_name_with_module = function_names[-1].split(':', 1)[0].split(".")
69 if len(class_name_with_module) == 3:
70 function_and_file_name[1] = (class_name_with_module[1] + '.' +
71 class_name_with_module[2])
73 filename = function_and_file_name[0]
74 last_function = function_and_file_name[1]
75 count = 0
76 for element in range(0, len(function_names) - 1):
77 class_name_with_function = function_names[element].split(':', 1)[0].split(".")
78 if len(class_name_with_function) == 3:
79 if last_function == (class_name_with_function[1] + '.' +
80 class_name_with_function[2]):
81 count += 1
82 if re.split(r"[.:]", function_names[element])[1] == last_function:
83 count += 1
84 function_name = (filename + "." + last_function +
85 ("-" + str(count) if count > 0 else ''))
87 return function_name
90def parse_value(val):
91 if isinstance(val, cst.SimpleString):
92 return val.value[1:-1]
93 if isinstance(val, cst.List):
94 return [parse_value(item.value)
95 for item in val.elements]
97 rv = str(val.value)
98 if rv == "None":
99 rv = None
100 return rv
103class Python_Traceable_Node:
104 def __init__(self, location, name, kind):
105 assert isinstance(location, File_Reference)
106 assert isinstance(name, str)
107 assert isinstance(kind, str)
108 self.location = location
109 self.name = name
110 self.kind = kind
111 self.parent = None
112 self.children = []
113 self.tags = []
114 self.just = []
116 def register_tag(self, tag):
117 assert isinstance(tag, Tracing_Tag)
118 self.tags.append(tag)
120 def register_justification(self, justification):
121 assert isinstance(justification, str)
122 self.just.append(justification)
124 def set_parent(self, node):
125 assert isinstance(node, Python_Traceable_Node)
126 node.children.append(self)
127 self.parent = node
129 def to_json(self):
130 return {"kind" : self.kind,
131 "name" : self.name,
132 "tags" : [x.to_json() for x in self.tags],
133 "just" : self.just,
134 "children" : [x.to_json() for x in self.children]}
136 def to_lobster(self, schema, items):
137 assert schema is Implementation or schema is Activity
138 assert isinstance(items, list)
139 assert False
141 def fqn(self):
142 if self.parent:
143 rv = self.parent.fqn() + "."
144 else:
145 rv = ""
146 if self.location.line is not None and \
147 isinstance(self, Python_Function):
148 rv += f"{self.name}:{str(self.location.line)}"
149 else:
150 rv += self.name
151 return rv
153 def lobster_tag(self):
154 return Tracing_Tag("python", self.fqn())
156 def warn_ignored(self, reason):
157 for tag in self.tags:
158 print("%s: warning: ignored tag %s because "
159 "%s already has annotations" %
160 (self.location.to_string(),
161 tag,
162 reason))
163 for just in self.just:
164 print("%s: warning: ignored justification '%s' because "
165 "%s already has annotations" %
166 (self.location.to_string(),
167 just,
168 reason))
171class Python_Module(Python_Traceable_Node):
172 def __init__(self, location, name):
173 super().__init__(location, name, "Module")
175 def to_lobster(self, schema, items):
176 assert schema is Implementation or schema is Activity
177 assert isinstance(items, list)
178 for node in self.children:
179 node.to_lobster(schema, items)
182class Python_Class(Python_Traceable_Node):
183 def __init__(self, location, name):
184 super().__init__(location, name, "Class")
186 def to_lobster(self, schema, items):
187 assert schema is Implementation or schema is Activity
188 assert isinstance(items, list)
189 # Classes are dealt with a bit differently. If you add a tag
190 # or justification to a class, then children are ignored, and
191 # we trace to the class.
192 #
193 # Alternatively, can leave out the tag and instead trace to
194 # each child.
196 # First get child items
197 class_contents = []
198 for node in self.children:
199 node.to_lobster(schema, class_contents)
201 # If we're extracting pyunit/unittest items, then we always ignore
202 # classes, but we do add our tags to all the tests.
203 if schema is Activity:
204 for item in class_contents:
205 for tag in self.tags:
206 item.add_tracing_target(tag)
207 items += class_contents
208 return
210 l_item = Implementation(tag = Tracing_Tag("python",
211 self.fqn()),
212 location = self.location,
213 language = "Python",
214 kind = self.kind,
215 name = self.fqn())
217 # If we have tags or justifications on the class itself, we
218 # give precedence to that.
219 if self.tags or self.just:
220 for tag in self.tags:
221 l_item.add_tracing_target(tag)
222 l_item.just_up += self.just
224 for c_item in self.children:
225 c_item.warn_ignored(self.name)
227 items.append(l_item)
228 return
230 # Otherwise, we ignore the class and instead trace to each
231 # child
232 items += class_contents
235class Python_Function(Python_Traceable_Node):
236 def __init__(self, location, name):
237 super().__init__(location, name, "Function")
239 def set_parent(self, node):
240 assert isinstance(node, Python_Traceable_Node)
241 node.children.append(self)
242 self.parent = node
243 if isinstance(node, Python_Class):
244 if self.name == "__init__":
245 self.kind = "Constructor"
246 else:
247 self.kind = "Method"
249 def to_lobster(self, schema, items):
250 assert schema is Implementation or schema is Activity
251 assert isinstance(items, list)
253 func_name.append(self.fqn())
254 tagname = count_occurrence_of_last_function_from_function_name_list(
255 func_name
256 )
257 pattern = r"[-]"
258 val = re.split(pattern, tagname)
259 name_value = val[0]
261 if schema is Implementation:
262 l_item = Implementation(tag = Tracing_Tag("python",
263 tagname),
264 location = self.location,
265 language = "Python",
266 kind = self.kind,
267 name = name_value)
268 elif self.name.startswith("test") or self.name.startswith("_test") \
269 or self.name.endswith("test"):
270 l_item = Activity(tag = Tracing_Tag("pyunit",
271 self.fqn()),
272 location = self.location,
273 framework = "PyUnit",
274 kind = "Test")
275 else:
276 return
278 for tag in self.tags:
279 l_item.add_tracing_target(tag)
280 l_item.just_up += self.just
282 # Any children of functions are not testable units. Their
283 # tracing tags contribute to ours, but otherwise they don't
284 # appear.
285 nested_items = []
286 for node in self.children:
287 node.to_lobster(schema, nested_items)
288 for item in nested_items:
289 # TODO: Warn about useless nested justifications
290 # Merge tracing tags
291 for tag in item.unresolved_references:
292 l_item.add_tracing_target(tag)
294 items.append(l_item)
297class Lobster_Visitor(cst.CSTVisitor):
298 METADATA_DEPENDENCIES = (PositionProvider,)
300 def __init__(self, file_name, options):
301 super().__init__()
302 assert os.path.isfile(file_name)
303 self.file_name = file_name
305 self.module = Python_Module(
306 File_Reference(file_name),
307 os.path.basename(file_name).replace(".py", ""))
309 self.activity = options["activity"]
310 self.current_node = None
311 self.stack = [self.module]
313 self.namespace = options["namespace"]
314 self.exclude_untagged = options["exclude_untagged"]
316 self.decorator_name = options["decorator"]
317 self.dec_arg_name = options["dec_arg_name"]
318 self.dec_arg_version = options["dec_arg_version"]
320 def parse_dotted_name(self, name):
321 if isinstance(name, cst.Call):
322 return self.parse_dotted_name(name.func)
323 if isinstance(name, cst.Name):
324 return name.value
325 if isinstance(name, cst.Attribute):
326 # value -- prefix
327 # attr -- postfix
328 return "%s.%s" % (self.parse_dotted_name(name.value),
329 self.parse_dotted_name(name.attr))
330 return None
332 def parse_decorators(self, decorators):
333 for dec in decorators:
334 dec_name = self.parse_dotted_name(dec.decorator)
335 if dec_name is None:
336 continue
337 if dec_name != self.decorator_name:
338 continue
339 dec_args = {arg.keyword.value: parse_value(arg.value)
340 for arg in dec.decorator.args}
342 # TODO: Better error messages if these assumptions are
343 # violated
344 assert self.dec_arg_name in dec_args
345 if self.dec_arg_version:
346 assert self.dec_arg_version in dec_args
347 tag = Tracing_Tag(self.namespace,
348 dec_args[self.dec_arg_name],
349 dec_args.get(self.dec_arg_version, None))
350 self.current_node.register_tag(tag)
352 elif isinstance(dec_args[self.dec_arg_name], list):
353 for item in dec_args[self.dec_arg_name]:
354 tag = Tracing_Tag(self.namespace, item)
355 self.current_node.register_tag(tag)
357 else:
358 tag = Tracing_Tag(self.namespace,
359 dec_args[self.dec_arg_name])
360 self.current_node.register_tag(tag)
362 def visit_ClassDef(self, node):
363 line = self.get_metadata(PositionProvider, node).start.line
364 loc = File_Reference(self.file_name, line)
365 t_item = Python_Class(loc, node.name.value)
366 t_item.set_parent(self.stack[-1])
367 self.stack.append(t_item)
368 self.current_node = t_item
369 self.parse_decorators(node.decorators)
371 def visit_FunctionDef(self, node):
372 line = self.get_metadata(PositionProvider, node).start.line
373 loc = File_Reference(self.file_name, line)
374 t_item = Python_Function(loc, node.name.value)
375 t_item.set_parent(self.stack[-1])
376 self.stack.append(t_item)
377 self.current_node = t_item
378 self.parse_decorators(node.decorators)
380 def leave_FunctionDef(self, original_node):
381 self.stack.pop()
382 self.current_node = self.stack[-1]
384 def leave_ClassDef(self, original_node):
385 self.stack.pop()
386 self.current_node = self.stack[-1]
388 def visit_Comment(self, node):
389 line = self.get_metadata(PositionProvider, node).start.line
390 # For some reason the comment in a class is associated with
391 # its constructor. We can check if it preceeds it (by line),
392 # and so associate it with the enclosing item.
393 if self.current_node and \
394 self.current_node.location.line and \
395 self.current_node.location.line > line:
396 actual = self.current_node.parent
397 else:
398 actual = self.current_node
400 if node.value.startswith(LOBSTER_TRACE_PREFIX):
401 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip()
402 actual.register_tag(
403 Tracing_Tag.from_text(self.namespace,
404 tag))
406 elif node.value.startswith(LOBSTER_JUST_PREFIX):
407 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip()
408 actual.register_justification(reason)
411def process_file(file_name, options):
412 # pylint: disable=protected-access
413 assert isinstance(file_name, str)
414 assert isinstance(options, dict)
416 items = []
417 try:
418 with open(file_name, "r", encoding="UTF-8") as fd:
419 ast = cst.parse_module(fd.read())
421 ast = cst.MetadataWrapper(ast)
422 visitor = Lobster_Visitor(file_name, options)
423 ast.visit(visitor)
425 if options["activity"]:
426 visitor.module.to_lobster(Activity, items)
427 else:
428 visitor.module.to_lobster(Implementation, items)
430 if options["exclude_untagged"]:
431 items = [item for item in items if item.unresolved_references]
433 return True, items
435 except cst._exceptions.ParserSyntaxError as exc:
436 print(file_name, exc.message)
437 return False, []
439 except UnicodeDecodeError as exc:
440 print(file_name, str(exc))
441 return False, []
443 except Exception as exc:
444 print("Unspecified issue in file: %s" % file_name)
445 raise
448class PythonTool(MetaDataToolBase):
449 def __init__(self):
450 super().__init__(
451 name="python",
452 description="Extract tracing tags from Python code or tests",
453 official=True,
454 )
455 ap = self._argument_parser
456 ap.add_argument("files",
457 nargs="+",
458 metavar="FILE|DIR")
459 ap.add_argument("--activity",
460 action="store_true",
461 default=False,
462 help=("generate activity traces (tests) instead of"
463 " an implementation trace"))
464 ap.add_argument("--out",
465 default=None)
466 ap.add_argument("--single",
467 action="store_true",
468 default=False,
469 help="don't multi-thread")
470 ap.add_argument("--only-tagged-functions",
471 default=False,
472 action="store_true",
473 help="only trace functions with tags")
474 grp = ap.add_mutually_exclusive_group()
475 grp.add_argument("--parse-decorator",
476 nargs=2,
477 metavar=("DECORATOR", "NAME_ARG"),
478 default=(None, None))
479 grp.add_argument("--parse-versioned-decorator",
480 nargs=3,
481 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"),
482 default=(None, None, None))
484 def _run_impl(self, options: Namespace) -> int:
485 file_list = []
486 for item in options.files:
487 if os.path.isfile(item):
488 file_list.append(item)
489 elif os.path.isdir(item):
490 for path, _, files in os.walk(item):
491 for filename in files:
492 _, ext = os.path.splitext(filename)
493 if ext == ".py":
494 file_list.append(os.path.join(path, filename))
495 else:
496 self._argument_parser.error(f"{item} is not a file or directory")
498 context = {
499 "activity" : options.activity,
500 "decorator" : None,
501 "dec_arg_name" : None,
502 "dec_arg_version" : None,
503 "exclude_untagged" : options.only_tagged_functions,
504 "namespace" : "req",
505 }
507 if options.parse_decorator[0] is not None:
508 context["decorator"] = options.parse_decorator[0]
509 context["dec_arg_name"] = options.parse_decorator[1]
510 elif options.parse_versioned_decorator[0] is not None:
511 context["decorator"] = options.parse_versioned_decorator[0]
512 context["dec_arg_name"] = options.parse_versioned_decorator[1]
513 context["dec_arg_version"] = options.parse_versioned_decorator[2]
515 pfun = functools.partial(process_file, options=context)
516 items = []
517 ok = True
519 if options.single:
520 for file_name in file_list:
521 new_ok, new_items = pfun(file_name)
522 ok &= new_ok
523 items += new_items
524 else:
525 with multiprocessing.Pool() as pool:
526 for new_ok, new_items in pool.imap_unordered(pfun, file_list):
527 ok &= new_ok
528 items += new_items
530 if options.activity:
531 schema = Activity
532 else:
533 schema = Implementation
535 if options.out:
536 ensure_output_directory(options.out)
537 with open(options.out, "w", encoding="UTF-8") as fd:
538 lobster_write(fd, schema, "lobster_python", items)
539 print(f"Written output for {len(items)} items to {options.out}")
540 else:
541 lobster_write(sys.stdout, schema, "lobster_python", items)
542 print()
544 if ok:
545 return 0
547 print("Note: Earlier parse errors make actual output unreliable")
548 return 1
551def main(args: Optional[Sequence[str]] = None) -> int:
552 return PythonTool().run(args)