Coverage for lobster/tools/python/python.py: 0%
310 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
1#!/usr/bin/env python3
2#
3# lobster_python - Extract Python tracing tags for LOBSTER
4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20from argparse import Namespace
21import sys
22import os.path
23import multiprocessing
24import functools
25import re
26from typing import Optional, Sequence
28from libcst.metadata import PositionProvider
29import libcst as cst
31from lobster.common.items import Tracing_Tag, Implementation, Activity
32from lobster.common.location import File_Reference
33from lobster.common.io import lobster_write
34from lobster.common.meta_data_tool_base import MetaDataToolBase
36LOBSTER_TRACE_PREFIX = "# lobster-trace: "
37LOBSTER_JUST_PREFIX = "# lobster-exclude: "
38func_name = []
41def count_occurrence_of_last_function_from_function_name_list(function_names):
42 """
43 Returns the last function and class name (if present) in a list along with
44 the count of its previous occurrences.
46 The function identifies the last entry in the `function_names` list, extracts
47 the function and class names (if applicable), and counts prior occurrences of
48 the same function.
49 The result is formatted as `module.class.function-count` or `module.function-count`.
51 Args:
52 function_names (list):
53 List of strings formatted as `module.class.function:line_number`
54 or `module.function:line_number`.
56 Returns:
57 str: The last function (and class if applicable) with its occurrence count,
58 formatted as `module.class.function-count` or `module.function-count`.
60 Examples:
61 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8']
62 returns: 'hello.add-2'
63 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',]
64 returns: 'Example.hello.add-2'
65 """
66 function_and_file_name = re.split(r"[.:]", function_names[-1])
67 class_name_with_module = function_names[-1].split(':', 1)[0].split(".")
69 if len(class_name_with_module) == 3:
70 function_and_file_name[1] = (class_name_with_module[1] + '.' +
71 class_name_with_module[2])
73 filename = function_and_file_name[0]
74 last_function = function_and_file_name[1]
75 count = 0
76 for element in range(0, len(function_names) - 1):
77 class_name_with_function = function_names[element].split(':', 1)[0].split(".")
78 if len(class_name_with_function) == 3:
79 if last_function == (class_name_with_function[1] + '.' +
80 class_name_with_function[2]):
81 count += 1
82 if re.split(r"[.:]", function_names[element])[1] == last_function:
83 count += 1
84 function_name = (filename + "." + last_function +
85 ("-" + str(count) if count > 0 else ''))
87 return function_name
90def parse_value(val):
91 if isinstance(val, cst.SimpleString):
92 return val.value[1:-1]
93 elif isinstance(val, cst.List):
94 return [parse_value(item.value)
95 for item in val.elements]
96 else:
97 rv = str(val.value)
98 if rv == "None":
99 rv = None
100 return rv
103class Python_Traceable_Node:
104 def __init__(self, location, name, kind):
105 assert isinstance(location, File_Reference)
106 assert isinstance(name, str)
107 assert isinstance(kind, str)
108 self.location = location
109 self.name = name
110 self.kind = kind
111 self.parent = None
112 self.children = []
113 self.tags = []
114 self.just = []
116 def register_tag(self, tag):
117 assert isinstance(tag, Tracing_Tag)
118 self.tags.append(tag)
120 def register_justification(self, justification):
121 assert isinstance(justification, str)
122 self.just.append(justification)
124 def set_parent(self, node):
125 assert isinstance(node, Python_Traceable_Node)
126 node.children.append(self)
127 self.parent = node
129 def to_json(self):
130 return {"kind" : self.kind,
131 "name" : self.name,
132 "tags" : [x.to_json() for x in self.tags],
133 "just" : self.just,
134 "children" : [x.to_json() for x in self.children]}
136 def to_lobster(self, schema, items):
137 assert schema is Implementation or schema is Activity
138 assert isinstance(items, list)
139 assert False
141 def fqn(self):
142 if self.parent:
143 rv = self.parent.fqn() + "."
144 else:
145 rv = ""
146 if self.location.line is not None and \
147 isinstance(self, Python_Function):
148 rv += f"{self.name}:{str(self.location.line)}"
149 else:
150 rv += self.name
151 return rv
153 def lobster_tag(self):
154 return Tracing_Tag("python", self.fqn())
156 def warn_ignored(self, reason):
157 for tag in self.tags:
158 print("%s: warning: ignored tag %s because "
159 "%s already has annotations" %
160 (self.location.to_string(),
161 tag,
162 reason))
163 for just in self.just:
164 print("%s: warning: ignored justification '%s' because "
165 "%s already has annotations" %
166 (self.location.to_string(),
167 just,
168 reason))
171class Python_Module(Python_Traceable_Node):
172 def __init__(self, location, name):
173 super().__init__(location, name, "Module")
175 def to_lobster(self, schema, items):
176 assert schema is Implementation or schema is Activity
177 assert isinstance(items, list)
178 for node in self.children:
179 node.to_lobster(schema, items)
182class Python_Class(Python_Traceable_Node):
183 def __init__(self, location, name):
184 super().__init__(location, name, "Class")
186 def to_lobster(self, schema, items):
187 assert schema is Implementation or schema is Activity
188 assert isinstance(items, list)
189 # Classes are dealt with a bit differently. If you add a tag
190 # or justification to a class, then children are ignored, and
191 # we trace to the class.
192 #
193 # Alternatively, can leave out the tag and instead trace to
194 # each child.
196 # First get child items
197 class_contents = []
198 for node in self.children:
199 node.to_lobster(schema, class_contents)
201 # If we're extracting pyunit/unittest items, then we always ignore
202 # classes, but we do add our tags to all the tests.
203 if schema is Activity:
204 for item in class_contents:
205 for tag in self.tags:
206 item.add_tracing_target(tag)
207 items += class_contents
208 return
210 l_item = Implementation(tag = Tracing_Tag("python",
211 self.fqn()),
212 location = self.location,
213 language = "Python",
214 kind = self.kind,
215 name = self.fqn())
217 # If we have tags or justifications on the class itself, we
218 # give precedence to that.
219 if self.tags or self.just:
220 for tag in self.tags:
221 l_item.add_tracing_target(tag)
222 l_item.just_up += self.just
224 for c_item in self.children:
225 c_item.warn_ignored(self.name)
227 items.append(l_item)
228 return
230 # Otherwise, we ignore the class and instead trace to each
231 # child
232 items += class_contents
235class Python_Function(Python_Traceable_Node):
236 def __init__(self, location, name):
237 super().__init__(location, name, "Function")
239 def set_parent(self, node):
240 assert isinstance(node, Python_Traceable_Node)
241 node.children.append(self)
242 self.parent = node
243 if isinstance(node, Python_Class):
244 if self.name == "__init__":
245 self.kind = "Constructor"
246 else:
247 self.kind = "Method"
249 def to_lobster(self, schema, items):
250 assert schema is Implementation or schema is Activity
251 assert isinstance(items, list)
253 func_name.append(self.fqn())
254 tagname = count_occurrence_of_last_function_from_function_name_list(
255 func_name
256 )
257 pattern = r"[-]"
258 val = re.split(pattern, tagname)
259 name_value = val[0]
261 if schema is Implementation:
262 l_item = Implementation(tag = Tracing_Tag("python",
263 tagname),
264 location = self.location,
265 language = "Python",
266 kind = self.kind,
267 name = name_value)
268 elif self.name.startswith("test") or self.name.startswith("_test") \
269 or self.name.endswith("test"):
270 l_item = Activity(tag = Tracing_Tag("pyunit",
271 self.fqn()),
272 location = self.location,
273 framework = "PyUnit",
274 kind = "Test")
275 else:
276 return
278 for tag in self.tags:
279 l_item.add_tracing_target(tag)
280 l_item.just_up += self.just
282 # Any children of functions are not testable units. Their
283 # tracing tags contribute to ours, but otherwise they don't
284 # appear.
285 nested_items = []
286 for node in self.children:
287 node.to_lobster(schema, nested_items)
288 for item in nested_items:
289 # TODO: Warn about useless nested justifications
290 # Merge tracing tags
291 for tag in item.unresolved_references:
292 l_item.add_tracing_target(tag)
294 items.append(l_item)
297class Lobster_Visitor(cst.CSTVisitor):
298 METADATA_DEPENDENCIES = (PositionProvider,)
300 def __init__(self, file_name, options):
301 super().__init__()
302 assert os.path.isfile(file_name)
303 self.file_name = file_name
305 self.module = Python_Module(
306 File_Reference(file_name),
307 os.path.basename(file_name).replace(".py", ""))
309 self.activity = options["activity"]
310 self.current_node = None
311 self.stack = [self.module]
313 self.namespace = options["namespace"]
314 self.exclude_untagged = options["exclude_untagged"]
316 self.decorator_name = options["decorator"]
317 self.dec_arg_name = options["dec_arg_name"]
318 self.dec_arg_version = options["dec_arg_version"]
320 def parse_dotted_name(self, name):
321 if isinstance(name, cst.Call):
322 return self.parse_dotted_name(name.func)
323 elif isinstance(name, cst.Name):
324 return name.value
325 elif isinstance(name, cst.Attribute):
326 # value -- prefix
327 # attr -- postfix
328 return "%s.%s" % (self.parse_dotted_name(name.value),
329 self.parse_dotted_name(name.attr))
330 else:
331 return None
333 def parse_decorators(self, decorators):
334 for dec in decorators:
335 dec_name = self.parse_dotted_name(dec.decorator)
336 if dec_name is None:
337 continue
338 if dec_name != self.decorator_name:
339 continue
340 dec_args = {arg.keyword.value: parse_value(arg.value)
341 for arg in dec.decorator.args}
343 # TODO: Better error messages if these assumptions are
344 # violated
345 assert self.dec_arg_name in dec_args
346 if self.dec_arg_version:
347 assert self.dec_arg_version in dec_args
348 tag = Tracing_Tag(self.namespace,
349 dec_args[self.dec_arg_name],
350 dec_args.get(self.dec_arg_version, None))
351 self.current_node.register_tag(tag)
353 elif isinstance(dec_args[self.dec_arg_name], list):
354 for item in dec_args[self.dec_arg_name]:
355 tag = Tracing_Tag(self.namespace, item)
356 self.current_node.register_tag(tag)
358 else:
359 tag = Tracing_Tag(self.namespace,
360 dec_args[self.dec_arg_name])
361 self.current_node.register_tag(tag)
363 def visit_ClassDef(self, node):
364 line = self.get_metadata(PositionProvider, node).start.line
365 loc = File_Reference(self.file_name, line)
366 t_item = Python_Class(loc, node.name.value)
367 t_item.set_parent(self.stack[-1])
368 self.stack.append(t_item)
369 self.current_node = t_item
370 self.parse_decorators(node.decorators)
372 def visit_FunctionDef(self, node):
373 line = self.get_metadata(PositionProvider, node).start.line
374 loc = File_Reference(self.file_name, line)
375 t_item = Python_Function(loc, node.name.value)
376 t_item.set_parent(self.stack[-1])
377 self.stack.append(t_item)
378 self.current_node = t_item
379 self.parse_decorators(node.decorators)
381 def leave_FunctionDef(self, original_node):
382 self.stack.pop()
383 self.current_node = self.stack[-1]
385 def leave_ClassDef(self, original_node):
386 self.stack.pop()
387 self.current_node = self.stack[-1]
389 def visit_Comment(self, node):
390 line = self.get_metadata(PositionProvider, node).start.line
391 # For some reason the comment in a class is associated with
392 # its constructor. We can check if it preceeds it (by line),
393 # and so associate it with the enclosing item.
394 if self.current_node and \
395 self.current_node.location.line and \
396 self.current_node.location.line > line:
397 actual = self.current_node.parent
398 else:
399 actual = self.current_node
401 if node.value.startswith(LOBSTER_TRACE_PREFIX):
402 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip()
403 actual.register_tag(
404 Tracing_Tag.from_text(self.namespace,
405 tag))
407 elif node.value.startswith(LOBSTER_JUST_PREFIX):
408 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip()
409 actual.register_justification(reason)
412def process_file(file_name, options):
413 # pylint: disable=protected-access
414 assert isinstance(file_name, str)
415 assert isinstance(options, dict)
417 items = []
418 try:
419 with open(file_name, "r", encoding="UTF-8") as fd:
420 ast = cst.parse_module(fd.read())
422 ast = cst.MetadataWrapper(ast)
423 visitor = Lobster_Visitor(file_name, options)
424 ast.visit(visitor)
426 if options["activity"]:
427 visitor.module.to_lobster(Activity, items)
428 else:
429 visitor.module.to_lobster(Implementation, items)
431 if options["exclude_untagged"]:
432 items = [item for item in items if item.unresolved_references]
434 return True, items
436 except cst._exceptions.ParserSyntaxError as exc:
437 print(file_name, exc.message)
438 return False, []
440 except UnicodeDecodeError as exc:
441 print(file_name, str(exc))
442 return False, []
444 except Exception as exc:
445 print("Unspecified issue in file: %s" % file_name)
446 raise
449class PythonTool(MetaDataToolBase):
450 def __init__(self):
451 super().__init__(
452 name="python",
453 description="Extract tracing tags from Python code or tests",
454 official=True,
455 )
456 ap = self._argument_parser
457 ap.add_argument("files",
458 nargs="+",
459 metavar="FILE|DIR")
460 ap.add_argument("--activity",
461 action="store_true",
462 default=False,
463 help=("generate activity traces (tests) instead of"
464 " an implementation trace"))
465 ap.add_argument("--out",
466 default=None)
467 ap.add_argument("--single",
468 action="store_true",
469 default=False,
470 help="don't multi-thread")
471 ap.add_argument("--only-tagged-functions",
472 default=False,
473 action="store_true",
474 help="only trace functions with tags")
475 grp = ap.add_mutually_exclusive_group()
476 grp.add_argument("--parse-decorator",
477 nargs=2,
478 metavar=("DECORATOR", "NAME_ARG"),
479 default=(None, None))
480 grp.add_argument("--parse-versioned-decorator",
481 nargs=3,
482 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"),
483 default=(None, None, None))
485 def _run_impl(self, options: Namespace) -> int:
486 file_list = []
487 for item in options.files:
488 if os.path.isfile(item):
489 file_list.append(item)
490 elif os.path.isdir(item):
491 for path, _, files in os.walk(item):
492 for filename in files:
493 _, ext = os.path.splitext(filename)
494 if ext == ".py":
495 file_list.append(os.path.join(path, filename))
496 else:
497 self._argument_parser.error(f"{item} is not a file or directory")
499 context = {
500 "activity" : options.activity,
501 "decorator" : None,
502 "dec_arg_name" : None,
503 "dec_arg_version" : None,
504 "exclude_untagged" : options.only_tagged_functions,
505 "namespace" : "req",
506 }
508 if options.parse_decorator[0] is not None:
509 context["decorator"] = options.parse_decorator[0]
510 context["dec_arg_name"] = options.parse_decorator[1]
511 elif options.parse_versioned_decorator[0] is not None:
512 context["decorator"] = options.parse_versioned_decorator[0]
513 context["dec_arg_name"] = options.parse_versioned_decorator[1]
514 context["dec_arg_version"] = options.parse_versioned_decorator[2]
516 pfun = functools.partial(process_file, options=context)
517 items = []
518 ok = True
520 if options.single:
521 for file_name in file_list:
522 new_ok, new_items = pfun(file_name)
523 ok &= new_ok
524 items += new_items
525 else:
526 with multiprocessing.Pool() as pool:
527 for new_ok, new_items in pool.imap_unordered(pfun, file_list):
528 ok &= new_ok
529 items += new_items
531 if options.activity:
532 schema = Activity
533 else:
534 schema = Implementation
536 if options.out:
537 with open(options.out, "w", encoding="UTF-8") as fd:
538 lobster_write(fd, schema, "lobster_python", items)
539 print(f"Written output for {len(items)} items to {options.out}")
540 else:
541 lobster_write(sys.stdout, schema, "lobster_python", items)
542 print()
544 if ok:
545 return 0
546 else:
547 print("Note: Earlier parse errors make actual output unreliable")
548 return 1
551def main(args: Optional[Sequence[str]] = None) -> int:
552 return PythonTool().run(args)