Coverage for lobster/tools/python/python.py: 0%
306 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# lobster_python - Extract Python tracing tags for LOBSTER
4# Copyright (C) 2022-2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20import sys
21import argparse
22import os.path
23import multiprocessing
24import functools
25import re
27from libcst.metadata import PositionProvider
28import libcst as cst
30from lobster.items import Tracing_Tag, Implementation, Activity
31from lobster.location import File_Reference
32from lobster.io import lobster_write
33from lobster.version import get_version
35LOBSTER_TRACE_PREFIX = "# lobster-trace: "
36LOBSTER_JUST_PREFIX = "# lobster-exclude: "
37func_name = []
40def count_occurrence_of_last_function_from_function_name_list(function_names):
41 """
42 Returns the last function and class name (if present) in a list along with
43 the count of its previous occurrences.
45 The function identifies the last entry in the `function_names` list, extracts
46 the function and class names (if applicable), and counts prior occurrences of
47 the same function.
48 The result is formatted as `module.class.function-count` or `module.function-count`.
50 Args:
51 function_names (list):
52 List of strings formatted as `module.class.function:line_number`
53 or `module.function:line_number`.
55 Returns:
56 str: The last function (and class if applicable) with its occurrence count,
57 formatted as `module.class.function-count` or `module.function-count`.
59 Examples:
60 function_names = ['hello.add:2', 'hello.sub:5', 'hello.add:8']
61 returns: 'hello.add-2'
62 class_function_names = ['Example.hello.add:2', 'Example.hello.sub:5',]
63 returns: 'Example.hello.add-2'
64 """
65 function_and_file_name = re.split(r"[.:]", function_names[-1])
66 class_name_with_module = function_names[-1].split(':', 1)[0].split(".")
68 if len(class_name_with_module) == 3:
69 function_and_file_name[1] = (class_name_with_module[1] + '.' +
70 class_name_with_module[2])
72 filename = function_and_file_name[0]
73 last_function = function_and_file_name[1]
74 count = 0
75 for element in range(0, len(function_names) - 1):
76 class_name_with_function = function_names[element].split(':', 1)[0].split(".")
77 if len(class_name_with_function) == 3:
78 if last_function == (class_name_with_function[1] + '.' +
79 class_name_with_function[2]):
80 count += 1
81 if re.split(r"[.:]", function_names[element])[1] == last_function:
82 count += 1
83 function_name = (filename + "." + last_function +
84 ("-" + str(count) if count > 0 else ''))
86 return function_name
89def parse_value(val):
90 if isinstance(val, cst.SimpleString):
91 return val.value[1:-1]
92 elif isinstance(val, cst.List):
93 return [parse_value(item.value)
94 for item in val.elements]
95 else:
96 rv = str(val.value)
97 if rv == "None":
98 rv = None
99 return rv
102class Python_Traceable_Node:
103 def __init__(self, location, name, kind):
104 assert isinstance(location, File_Reference)
105 assert isinstance(name, str)
106 assert isinstance(kind, str)
107 self.location = location
108 self.name = name
109 self.kind = kind
110 self.parent = None
111 self.children = []
112 self.tags = []
113 self.just = []
115 def register_tag(self, tag):
116 assert isinstance(tag, Tracing_Tag)
117 self.tags.append(tag)
119 def register_justification(self, justification):
120 assert isinstance(justification, str)
121 self.just.append(justification)
123 def set_parent(self, node):
124 assert isinstance(node, Python_Traceable_Node)
125 node.children.append(self)
126 self.parent = node
128 def to_json(self):
129 return {"kind" : self.kind,
130 "name" : self.name,
131 "tags" : [x.to_json() for x in self.tags],
132 "just" : self.just,
133 "children" : [x.to_json() for x in self.children]}
135 def to_lobster(self, schema, items):
136 assert schema is Implementation or schema is Activity
137 assert isinstance(items, list)
138 assert False
140 def fqn(self):
141 if self.parent:
142 rv = self.parent.fqn() + "."
143 else:
144 rv = ""
145 if self.location.line is not None and \
146 isinstance(self, Python_Function):
147 rv += f"{self.name}:{str(self.location.line)}"
148 else:
149 rv += self.name
150 return rv
152 def lobster_tag(self):
153 return Tracing_Tag("python", self.fqn())
155 def warn_ignored(self, reason):
156 for tag in self.tags:
157 print("%s: warning: ignored tag %s because "
158 "%s already has annotations" %
159 (self.location.to_string(),
160 tag,
161 reason))
162 for just in self.just:
163 print("%s: warning: ignored justification '%s' because "
164 "%s already has annotations" %
165 (self.location.to_string(),
166 just,
167 reason))
170class Python_Module(Python_Traceable_Node):
171 def __init__(self, location, name):
172 super().__init__(location, name, "Module")
174 def to_lobster(self, schema, items):
175 assert schema is Implementation or schema is Activity
176 assert isinstance(items, list)
177 for node in self.children:
178 node.to_lobster(schema, items)
181class Python_Class(Python_Traceable_Node):
182 def __init__(self, location, name):
183 super().__init__(location, name, "Class")
185 def to_lobster(self, schema, items):
186 assert schema is Implementation or schema is Activity
187 assert isinstance(items, list)
188 # Classes are dealt with a bit differently. If you add a tag
189 # or justification to a class, then children are ignored, and
190 # we trace to the class.
191 #
192 # Alternatively, can leave out the tag and instead trace to
193 # each child.
195 # First get child items
196 class_contents = []
197 for node in self.children:
198 node.to_lobster(schema, class_contents)
200 # If we're extracting pyunit/unittest items, then we always ignore
201 # classes, but we do add our tags to all the tests.
202 if schema is Activity:
203 for item in class_contents:
204 for tag in self.tags:
205 item.add_tracing_target(tag)
206 items += class_contents
207 return
209 l_item = Implementation(tag = Tracing_Tag("python",
210 self.fqn()),
211 location = self.location,
212 language = "Python",
213 kind = self.kind,
214 name = self.fqn())
216 # If we have tags or justifications on the class itself, we
217 # give precedence to that.
218 if self.tags or self.just:
219 for tag in self.tags:
220 l_item.add_tracing_target(tag)
221 l_item.just_up += self.just
223 for c_item in self.children:
224 c_item.warn_ignored(self.name)
226 items.append(l_item)
227 return
229 # Otherwise, we ignore the class and instead trace to each
230 # child
231 items += class_contents
234class Python_Function(Python_Traceable_Node):
235 def __init__(self, location, name):
236 super().__init__(location, name, "Function")
238 def set_parent(self, node):
239 assert isinstance(node, Python_Traceable_Node)
240 node.children.append(self)
241 self.parent = node
242 if isinstance(node, Python_Class):
243 if self.name == "__init__":
244 self.kind = "Constructor"
245 else:
246 self.kind = "Method"
248 def to_lobster(self, schema, items):
249 assert schema is Implementation or schema is Activity
250 assert isinstance(items, list)
252 func_name.append(self.fqn())
253 tagname = count_occurrence_of_last_function_from_function_name_list(
254 func_name
255 )
256 pattern = r"[-]"
257 val = re.split(pattern, tagname)
258 name_value = val[0]
260 if schema is Implementation:
261 l_item = Implementation(tag = Tracing_Tag("python",
262 tagname),
263 location = self.location,
264 language = "Python",
265 kind = self.kind,
266 name = name_value)
267 elif self.name.startswith("test") or self.name.startswith("_test") \
268 or self.name.endswith("test"):
269 l_item = Activity(tag = Tracing_Tag("pyunit",
270 self.fqn()),
271 location = self.location,
272 framework = "PyUnit",
273 kind = "Test")
274 else:
275 return
277 for tag in self.tags:
278 l_item.add_tracing_target(tag)
279 l_item.just_up += self.just
281 # Any children of functions are not testable units. Their
282 # tracing tags contribute to ours, but otherwise they don't
283 # appear.
284 nested_items = []
285 for node in self.children:
286 node.to_lobster(schema, nested_items)
287 for item in nested_items:
288 # TODO: Warn about useless nested justifications
289 # Merge tracing tags
290 for tag in item.unresolved_references:
291 l_item.add_tracing_target(tag)
293 items.append(l_item)
296class Lobster_Visitor(cst.CSTVisitor):
297 METADATA_DEPENDENCIES = (PositionProvider,)
299 def __init__(self, file_name, options):
300 super().__init__()
301 assert os.path.isfile(file_name)
302 self.file_name = file_name
304 self.module = Python_Module(
305 File_Reference(file_name),
306 os.path.basename(file_name).replace(".py", ""))
308 self.activity = options["activity"]
309 self.current_node = None
310 self.stack = [self.module]
312 self.namespace = options["namespace"]
313 self.exclude_untagged = options["exclude_untagged"]
315 self.decorator_name = options["decorator"]
316 self.dec_arg_name = options["dec_arg_name"]
317 self.dec_arg_version = options["dec_arg_version"]
319 def parse_dotted_name(self, name):
320 if isinstance(name, cst.Call):
321 return self.parse_dotted_name(name.func)
322 elif isinstance(name, cst.Name):
323 return name.value
324 elif isinstance(name, cst.Attribute):
325 # value -- prefix
326 # attr -- postfix
327 return "%s.%s" % (self.parse_dotted_name(name.value),
328 self.parse_dotted_name(name.attr))
329 else:
330 return None
332 def parse_decorators(self, decorators):
333 for dec in decorators:
334 dec_name = self.parse_dotted_name(dec.decorator)
335 if dec_name is None:
336 continue
337 if dec_name != self.decorator_name:
338 continue
339 dec_args = {arg.keyword.value: parse_value(arg.value)
340 for arg in dec.decorator.args}
342 # TODO: Better error messages if these assumptions are
343 # violated
344 assert self.dec_arg_name in dec_args
345 if self.dec_arg_version:
346 assert self.dec_arg_version in dec_args
347 tag = Tracing_Tag(self.namespace,
348 dec_args[self.dec_arg_name],
349 dec_args.get(self.dec_arg_version, None))
350 self.current_node.register_tag(tag)
352 elif isinstance(dec_args[self.dec_arg_name], list):
353 for item in dec_args[self.dec_arg_name]:
354 tag = Tracing_Tag(self.namespace, item)
355 self.current_node.register_tag(tag)
357 else:
358 tag = Tracing_Tag(self.namespace,
359 dec_args[self.dec_arg_name])
360 self.current_node.register_tag(tag)
362 def visit_ClassDef(self, node):
363 line = self.get_metadata(PositionProvider, node).start.line
364 loc = File_Reference(self.file_name, line)
365 t_item = Python_Class(loc, node.name.value)
366 t_item.set_parent(self.stack[-1])
367 self.stack.append(t_item)
368 self.current_node = t_item
369 self.parse_decorators(node.decorators)
371 def visit_FunctionDef(self, node):
372 line = self.get_metadata(PositionProvider, node).start.line
373 loc = File_Reference(self.file_name, line)
374 t_item = Python_Function(loc, node.name.value)
375 t_item.set_parent(self.stack[-1])
376 self.stack.append(t_item)
377 self.current_node = t_item
378 self.parse_decorators(node.decorators)
380 def leave_FunctionDef(self, original_node):
381 self.stack.pop()
382 self.current_node = self.stack[-1]
384 def leave_ClassDef(self, original_node):
385 self.stack.pop()
386 self.current_node = self.stack[-1]
388 def visit_Comment(self, node):
389 line = self.get_metadata(PositionProvider, node).start.line
390 # For some reason the comment in a class is associated with
391 # its constructor. We can check if it preceeds it (by line),
392 # and so associate it with the enclosing item.
393 if self.current_node and \
394 self.current_node.location.line and \
395 self.current_node.location.line > line:
396 actual = self.current_node.parent
397 else:
398 actual = self.current_node
400 if node.value.startswith(LOBSTER_TRACE_PREFIX):
401 tag = node.value[len(LOBSTER_TRACE_PREFIX):].strip()
402 actual.register_tag(
403 Tracing_Tag.from_text(self.namespace,
404 tag))
406 elif node.value.startswith(LOBSTER_JUST_PREFIX):
407 reason = node.value[len(LOBSTER_JUST_PREFIX):].strip()
408 actual.register_justification(reason)
411def process_file(file_name, options):
412 # pylint: disable=protected-access
413 assert isinstance(file_name, str)
414 assert isinstance(options, dict)
416 items = []
417 try:
418 with open(file_name, "r", encoding="UTF-8") as fd:
419 ast = cst.parse_module(fd.read())
421 ast = cst.MetadataWrapper(ast)
422 visitor = Lobster_Visitor(file_name, options)
423 ast.visit(visitor)
425 if options["activity"]:
426 visitor.module.to_lobster(Activity, items)
427 else:
428 visitor.module.to_lobster(Implementation, items)
430 if options["exclude_untagged"]:
431 items = [item for item in items if item.unresolved_references]
433 return True, items
435 except cst._exceptions.ParserSyntaxError as exc:
436 print(file_name, exc.message)
437 return False, []
439 except UnicodeDecodeError as exc:
440 print(file_name, str(exc))
441 return False, []
443 except Exception as exc:
444 print("Unspecified issue in file: %s" % file_name)
445 raise
448ap = argparse.ArgumentParser()
451@get_version(ap)
452def main():
453 # lobster-trace: python_req.Dummy_Requirement
454 ap.add_argument("files",
455 nargs="+",
456 metavar="FILE|DIR")
457 ap.add_argument("--activity",
458 action="store_true",
459 default=False,
460 help=("generate activity traces (tests) instead of"
461 " an implementation trace"))
462 ap.add_argument("--out",
463 default=None)
464 ap.add_argument("--single",
465 action="store_true",
466 default=False,
467 help="don't multi-thread")
468 ap.add_argument("--only-tagged-functions",
469 default=False,
470 action="store_true",
471 help="only trace functions with tags")
472 grp = ap.add_mutually_exclusive_group()
473 grp.add_argument("--parse-decorator",
474 nargs=2,
475 metavar=("DECORATOR", "NAME_ARG"),
476 default=(None, None))
477 grp.add_argument("--parse-versioned-decorator",
478 nargs=3,
479 metavar=("DECORATOR", "NAME_ARG", "VERSION_ARG"),
480 default=(None, None, None))
482 options = ap.parse_args()
484 file_list = []
485 for item in options.files:
486 if os.path.isfile(item):
487 file_list.append(item)
488 elif os.path.isdir(item):
489 for path, _, files in os.walk(item):
490 for filename in files:
491 _, ext = os.path.splitext(filename)
492 if ext == ".py":
493 file_list.append(os.path.join(path, filename))
494 else:
495 ap.error("%s is not a file or directory" % item)
497 context = {
498 "activity" : options.activity,
499 "decorator" : None,
500 "dec_arg_name" : None,
501 "dec_arg_version" : None,
502 "exclude_untagged" : options.only_tagged_functions,
503 "namespace" : "req",
504 }
506 if options.parse_decorator[0] is not None:
507 context["decorator"] = options.parse_decorator[0]
508 context["dec_arg_name"] = options.parse_decorator[1]
509 elif options.parse_versioned_decorator[0] is not None:
510 context["decorator"] = options.parse_versioned_decorator[0]
511 context["dec_arg_name"] = options.parse_versioned_decorator[1]
512 context["dec_arg_version"] = options.parse_versioned_decorator[2]
514 pfun = functools.partial(process_file, options=context)
515 items = []
516 ok = True
518 if options.single:
519 for file_name in file_list:
520 new_ok, new_items = pfun(file_name)
521 ok &= new_ok
522 items += new_items
523 else:
524 with multiprocessing.Pool() as pool:
525 for new_ok, new_items in pool.imap_unordered(pfun, file_list):
526 ok &= new_ok
527 items += new_items
529 if options.activity:
530 schema = Activity
531 else:
532 schema = Implementation
534 if options.out:
535 with open(options.out, "w", encoding="UTF-8") as fd:
536 lobster_write(fd, schema, "lobster_python", items)
537 print("Written output for %u items to %s" % (len(items),
538 options.out))
539 else:
540 lobster_write(sys.stdout, schema, "lobster_python", items)
541 print()
543 if ok:
544 return 0
545 else:
546 print("Note: Earlier parse errors make actual output unreliable")
547 return 1
550if __name__ == "__main__":
551 sys.exit(main())