Coverage for lobster/tools/pkg/pkg.py: 70%
170 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
1#!/usr/bin/env python3
2#
3# lobster_pkg - Extract tracing values from xml file for LOBSTER
4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19from pathlib import Path
20import sys
21import xml.etree.ElementTree as ET
22import json
23import re
24from typing import Dict, Optional, Sequence
25from xml.dom import minidom
26from argparse import Namespace
27from dataclasses import dataclass
29from lobster.common.multi_file_input_config import Config
30from lobster.common.multi_file_input_tool import create_worklist, MultiFileInputTool
31from lobster.common.exceptions import LOBSTER_Exception
32from lobster.common.items import Activity, Tracing_Tag
33from lobster.common.location import File_Reference
35NS = {
36 "ecu": "http://www.tracetronic.de/xml/ecu-test",
37 "xsi": "http://www.w3.org/2001/XMLSchema-instance",
38}
39TSBLOCK = "TsBlock"
42@dataclass
43class PkgToolConfig:
44 files: Sequence[Path]
45 out: Optional[Path] = None
48def create_raw_entry(
49 data: Dict[str, Activity], file_name: str, trace_list: list
50) -> None:
52 activity_list = json.loads(trace_list)
53 # Collect all traces marked as "first"
54 traces = []
55 for item in activity_list:
56 if item.get("activity") == "first": 56 ↛ 55line 56 didn't jump to line 55 because the condition on line 56 was always true
57 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
58 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix
60 tag = Tracing_Tag("pkg", f"{file_name}")
61 loc = File_Reference(file_name)
62 data[tag.key()] = Activity(
63 tag=tag, location=loc, framework="lobster-pkg", kind="test"
64 )
65 for trace_value in traces:
66 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
68 # Handle other activities (if any)
69 for item in activity_list:
70 if item.get("activity") != "first": 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
72 action = item.get("action")
73 line = item.get("line")
74 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}")
75 loc = File_Reference(file_name, int(item.get("line")))
76 data[tag.key()] = Activity(
77 tag=tag, location=loc, framework="lobster-pkg", kind="test"
78 )
79 for trace_value in trace2[1:]:
80 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
83def create_default_activity(file_content, file_name: str,
84 data: Dict[str, Activity]) -> None:
85 # Only create a default Activity entry for packages with
86 # the TESTCASE tag
87 # Check for TESTCASE tag in INFORMATION/TAGS
88 tree = ET.fromstring(file_content)
89 info = tree.find(".//ecu:INFORMATION", NS)
90 is_testcase = False
91 if info is not None: 91 ↛ 102line 91 didn't jump to line 102 because the condition on line 91 was always true
92 tags = info.find(
93 ".//ecu:TAGS", NS
94 )
95 if tags is not None: 95 ↛ 102line 95 didn't jump to line 102 because the condition on line 95 was always true
96 for tag in tags.findall( 96 ↛ 102line 96 didn't jump to line 102 because the loop on line 96 didn't complete
97 ".//ecu:TAG", NS
98 ):
99 if (tag.text or "").strip().upper() == "TESTCASE": 99 ↛ 96line 99 didn't jump to line 96 because the condition on line 99 was always true
100 is_testcase = True
101 break
102 if is_testcase: 102 ↛ exitline 102 didn't return from function 'create_default_activity' because the condition on line 102 was always true
103 tag = Tracing_Tag("pkg", f"{file_name}")
104 loc = File_Reference(file_name)
105 data[tag.key()] = Activity(
106 tag=tag,
107 location=loc,
108 framework="lobster-pkg",
109 kind="test",
110 )
113def xml_parser(file_content, filename):
114 activity_list = []
115 misplaced_lobster_lines = []
116 tree = ET.fromstring(file_content)
118 info = tree.find(".//ecu:INFORMATION", NS)
119 is_testcase = False
120 if info is not None: 120 ↛ 127line 120 didn't jump to line 127 because the condition on line 120 was always true
121 tags = info.find(".//ecu:TAGS", NS)
122 if tags is not None: 122 ↛ 127line 122 didn't jump to line 127 because the condition on line 122 was always true
123 for tag in tags.findall(".//ecu:TAG", NS): 123 ↛ 127line 123 didn't jump to line 127 because the loop on line 123 didn't complete
124 if (tag.text or "").strip().upper() == "TESTCASE": 124 ↛ 123line 124 didn't jump to line 123 because the condition on line 124 was always true
125 is_testcase = True
126 break
127 if not is_testcase: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 return activity_list
130 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP"
131 tag_value = f"{{{NS['ecu']}}}VALUE"
133 # Find the parent TESTSTEPS element
134 teststeps_parent = tree.find(
135 ".//ecu:TESTSTEPS", NS
136 )
137 if teststeps_parent is None: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 return activity_list
140 # Find the first relevant TsBlock (first level under TESTSTEPS)
141 first_level_tsblocks = [
142 elem
143 for elem in teststeps_parent
144 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
145 ]
146 if not first_level_tsblocks: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 return activity_list
149 # The first TsBlock determines the allowed parent level
150 allowed_parent = first_level_tsblocks[0]
152 # Collect all TsBlocks that are direct children of the allowed parent
153 # (i.e., one level deeper)
154 allowed_tsblocks = [
155 elem
156 for elem in allowed_parent
157 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
158 ]
160 # Also allow the first TsBlock itself
161 allowed_tsblocks_set = set(allowed_tsblocks)
162 allowed_tsblocks_set.add(allowed_parent)
164 # Traverse all TsBlocks in the document
165 for tsblock in tree.iter(tag_teststep):
166 if tsblock.attrib.get("name") != TSBLOCK:
167 continue
169 # Check if this TsBlock is allowed
170 # (first TsBlock or direct child of first TsBlock)
171 is_allowed = False
172 if tsblock is allowed_parent:
173 is_allowed = True
174 else:
175 # xml.etree.ElementTree does not support getparent,
176 # so we check by structure:
177 # Is tsblock a direct child of allowed_parent?
178 for child in allowed_parent:
179 if child is tsblock:
180 is_allowed = True
181 break
183 obj_value = tsblock.findall(".//" + tag_value)
184 for trace_search in obj_value:
185 if "lobster-trace:" in (trace_search.text or ""):
186 if is_allowed:
187 # Allowed: add to activity_list
188 activity_obj = {"trace": trace_search.text, "activity": "first"}
189 activity_list.append(activity_obj)
190 else:
191 # Misplaced: not at allowed nesting
192 search_string = trace_search.text
193 dom_tree = minidom.parseString(file_content)
194 xml_content = dom_tree.toxml()
195 start_index = xml_content.find(search_string)
196 line_number = xml_content.count("\n", 0, start_index) + 2
197 misplaced_lobster_lines.append(line_number)
199 if misplaced_lobster_lines:
200 raise LOBSTER_Exception(
201 f"Misplaced LOBSTER tag(s) in file {filename}"
202 f" at line(s): {misplaced_lobster_lines}"
203 )
205 return activity_list
208def extract_lobster_traces_from_trace_analysis(tree, filename):
209 """
210 Extracts lobster traces from DESCRIPTION fields that are direct
211 children of ANALYSISITEMs of type 'episode' which are themselves
212 direct children of TRACE-ANALYSIS blocks.
214 - A valid lobster-trace is only allowed in a DESCRIPTION element
215 that is a direct child of an ANALYSISITEM with xsi:type="episode",
216 which itself is a direct child of TRACE-ANALYSIS.
217 - Any lobster-trace found in a DESCRIPTION elsewhere under
218 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations)
219 is considered misplaced and will generate a warning.
221 Returns:
222 tuple: (list of valid trace dicts, list of misplaced trace warning strings)
223 """
225 valid_traces = []
226 misplaced_traces = []
228 # Collect all valid DESCRIPTIONs (direct child of top-level episode)
229 valid_descs = set()
230 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS):
231 # Only consider ANALYSISITEMs of type 'episode' that are direct
232 # children of TRACE-ANALYSIS
233 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS):
234 if ( 234 ↛ 233line 234 didn't jump to line 233 because the condition on line 234 was always true
235 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode"
236 ):
237 # Only DESCRIPTION elements that are direct children of this episode
238 for child in episode:
239 if (
240 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and
241 child.text and
242 "lobster-trace:" in child.text
243 ):
244 valid_traces.append(
245 {
246 "trace": child.text.strip(),
247 "activity": "first",
248 "name": episode.findtext(
249 "ecu:NAME", default="", namespaces=NS
250 ),
251 "description": child.text.strip(),
252 "source": filename,
253 }
254 )
255 valid_descs.add(child)
257 # Now check for misplaced traces: any DESCRIPTION with lobster-trace
258 # under TRACE-ANALYSIS
259 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS):
260 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs:
261 msg = "WARNING: misplaced lobster-trace in " \
262 f"{filename}: {desc.text.strip()}"
263 if msg not in misplaced_traces: 263 ↛ 259line 263 didn't jump to line 259 because the condition on line 263 was always true
264 misplaced_traces.append(msg)
266 return valid_traces, misplaced_traces
269class PkgTool(MultiFileInputTool):
270 def __init__(self):
271 super().__init__(
272 name="pkg",
273 description="Extract tracing tags from pkg files for LOBSTER",
274 extensions=["pkg", "ta"],
275 official=True,
276 )
278 def _add_config_argument(self):
279 # This tool does not use a config file
280 pass
282 def run_from_config(self, pkg_config: PkgToolConfig) -> None:
283 """
284 The main function to parse tracing information from .pkg files for LOBSTER.
286 This function processes the input files or directories specified in
287 ``pkg_config.files``, extracts tracing tags and activities from XML
288 content (including both standard and TRACE-ANALYSIS blocks), and
289 writes the results to an output file.
291 Parameters
292 ----------
293 pkg_config (PKGToolConfig): Typed configuration with at least:
294 - files: list of file or directory paths to process
295 - out: output file path (optional; if not set,
296 output is lobster-pkg.lobster)
297 """
298 config = Config(
299 inputs=None,
300 inputs_from_file=None,
301 extensions=self._extensions,
302 exclude_patterns=None,
303 schema=Activity,
304 )
305 file_list = create_worklist(
306 config,
307 [str(path) for path in pkg_config.files],
308 )
309 if not file_list:
310 raise ValueError("No input files found to process!")
312 data = {}
314 for file_path in file_list:
315 filename = Path(file_path).name
316 with open(file_path, "r", encoding="UTF-8") as file:
317 try:
318 file_content = file.read()
320 tree = ET.fromstring(file_content)
322 getvalues = xml_parser(file_content, filename)
324 # Also extract from TRACE-ANALYSIS blocks
325 valid_traces, misplaced_traces = (
326 extract_lobster_traces_from_trace_analysis(
327 tree, filename
328 )
329 )
330 getvalues.extend(valid_traces)
331 for msg in misplaced_traces:
332 print(msg)
334 if getvalues:
335 create_raw_entry(data, file.name, json.dumps(getvalues))
336 else:
337 create_default_activity(file_content, filename, data)
339 except ET.ParseError as err:
340 print(f"Error parsing XML file '{filename}' : {err}")
341 raise
342 except LOBSTER_Exception as err:
343 err.dump()
344 raise
346 items = (
347 list(data.values())
348 if not isinstance(data.values(), list)
349 else data.values()
350 )
351 self._write_output(
352 schema=config.schema,
353 out_file=pkg_config.out,
354 items=items,
355 )
357 def _run_impl(self, options: Namespace) -> int:
358 """
359 Parse CLI options and run package trace extraction.
361 This CLI entrypoint converts parsed command-line arguments to
362 ``PKGToolConfig``, then delegates processing to the API function.
364 Parameters
365 ----------
366 options (Namespace): Parsed command-line arguments with at least:
367 - dir_or_files: list of file or directory paths to process
368 - out: output file path (optional)
370 Returns
371 -------
372 int: 0 on success, 1 on handled error.
373 """
374 try:
375 lobster_pkg(
376 PkgToolConfig(
377 files=[Path(path) for path in options.dir_or_files],
378 out=Path(options.out) if options.out else None,
379 )
380 )
381 return 0
382 except (ValueError, FileNotFoundError,
383 LOBSTER_Exception, ET.ParseError) as exception:
384 print(
385 f"{self.name}: {exception}",
386 file=sys.stderr,
387 )
388 return 1
391def lobster_pkg(config: PkgToolConfig) -> None:
392 """
393 This is an API function.
395 Expected config attributes:
396 - files: list of input files/directories
397 - out: output file path (optional)
398 """
399 PkgTool().run_from_config(config)
402def main(args: Optional[Sequence[str]] = None) -> int:
403 return PkgTool().run(args)