Coverage for lobster/tools/pkg/pkg.py: 90%
163 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
1#!/usr/bin/env python3
2#
3# lobster_pkg - Extract tracing values from xml file for LOBSTER
4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19from pathlib import Path
20import sys
21import xml.etree.ElementTree as ET
22import json
23import re
24from typing import Dict, Optional, Sequence
25from xml.dom import minidom
26from argparse import Namespace
28from lobster.common.multi_file_input_config import Config
29from lobster.common.multi_file_input_tool import create_worklist, MultiFileInputTool
30from lobster.common.exceptions import LOBSTER_Exception
31from lobster.common.items import Activity, Tracing_Tag
32from lobster.common.location import File_Reference
34NS = {
35 "ecu": "http://www.tracetronic.de/xml/ecu-test",
36 "xsi": "http://www.w3.org/2001/XMLSchema-instance",
37}
38TSBLOCK = "TsBlock"
41def create_raw_entry(
42 data: Dict[str, Activity], file_name: str, trace_list: list
43) -> None:
45 activity_list = json.loads(trace_list)
46 # Collect all traces marked as "first"
47 traces = []
48 for item in activity_list:
49 if item.get("activity") == "first": 49 ↛ 48line 49 didn't jump to line 48 because the condition on line 49 was always true
50 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
51 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix
53 tag = Tracing_Tag("pkg", f"{file_name}")
54 loc = File_Reference(file_name)
55 data[tag.key()] = Activity(
56 tag=tag, location=loc, framework="lobster-pkg", kind="test"
57 )
58 for trace_value in traces:
59 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
61 # Handle other activities (if any)
62 for item in activity_list:
63 if item.get("activity") != "first": 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
65 action = item.get("action")
66 line = item.get("line")
67 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}")
68 loc = File_Reference(file_name, int(item.get("line")))
69 data[tag.key()] = Activity(
70 tag=tag, location=loc, framework="lobster-pkg", kind="test"
71 )
72 for trace_value in trace2[1:]:
73 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
76def create_default_activity(file_content, file_name: str,
77 data: Dict[str, Activity]) -> None:
78 # Only create a default Activity entry for packages with
79 # the TESTCASE tag
80 # Check for TESTCASE tag in INFORMATION/TAGS
81 tree = ET.fromstring(file_content)
82 info = tree.find(".//ecu:INFORMATION", NS)
83 is_testcase = False
84 if info is not None: 84 ↛ 95line 84 didn't jump to line 95 because the condition on line 84 was always true
85 tags = info.find(
86 ".//ecu:TAGS", NS
87 )
88 if tags is not None: 88 ↛ 95line 88 didn't jump to line 95 because the condition on line 88 was always true
89 for tag in tags.findall( 89 ↛ 95line 89 didn't jump to line 95 because the loop on line 89 didn't complete
90 ".//ecu:TAG", NS
91 ):
92 if (tag.text or "").strip().upper() == "TESTCASE": 92 ↛ 89line 92 didn't jump to line 89 because the condition on line 92 was always true
93 is_testcase = True
94 break
95 if is_testcase: 95 ↛ exitline 95 didn't return from function 'create_default_activity' because the condition on line 95 was always true
96 tag = Tracing_Tag("pkg", f"{file_name}")
97 loc = File_Reference(file_name)
98 data[tag.key()] = Activity(
99 tag=tag,
100 location=loc,
101 framework="lobster-pkg",
102 kind="test",
103 )
106def xml_parser(file_content, filename):
107 activity_list = []
108 misplaced_lobster_lines = []
109 tree = ET.fromstring(file_content)
111 info = tree.find(".//ecu:INFORMATION", NS)
112 is_testcase = False
113 if info is not None: 113 ↛ 120line 113 didn't jump to line 120 because the condition on line 113 was always true
114 tags = info.find(".//ecu:TAGS", NS)
115 if tags is not None: 115 ↛ 120line 115 didn't jump to line 120 because the condition on line 115 was always true
116 for tag in tags.findall(".//ecu:TAG", NS):
117 if (tag.text or "").strip().upper() == "TESTCASE":
118 is_testcase = True
119 break
120 if not is_testcase:
121 return activity_list
123 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP"
124 tag_value = f"{{{NS['ecu']}}}VALUE"
126 # Find the parent TESTSTEPS element
127 teststeps_parent = tree.find(
128 ".//ecu:TESTSTEPS", NS
129 )
130 if teststeps_parent is None:
131 return activity_list
133 # Find the first relevant TsBlock (first level under TESTSTEPS)
134 first_level_tsblocks = [
135 elem
136 for elem in teststeps_parent
137 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
138 ]
139 if not first_level_tsblocks: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 return activity_list
142 # The first TsBlock determines the allowed parent level
143 allowed_parent = first_level_tsblocks[0]
145 # Collect all TsBlocks that are direct children of the allowed parent
146 # (i.e., one level deeper)
147 allowed_tsblocks = [
148 elem
149 for elem in allowed_parent
150 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
151 ]
153 # Also allow the first TsBlock itself
154 allowed_tsblocks_set = set(allowed_tsblocks)
155 allowed_tsblocks_set.add(allowed_parent)
157 # Traverse all TsBlocks in the document
158 for tsblock in tree.iter(tag_teststep):
159 if tsblock.attrib.get("name") != TSBLOCK:
160 continue
162 # Check if this TsBlock is allowed
163 # (first TsBlock or direct child of first TsBlock)
164 is_allowed = False
165 if tsblock is allowed_parent:
166 is_allowed = True
167 else:
168 # xml.etree.ElementTree does not support getparent,
169 # so we check by structure:
170 # Is tsblock a direct child of allowed_parent?
171 for child in allowed_parent:
172 if child is tsblock: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 is_allowed = True
174 break
176 obj_value = tsblock.findall(".//" + tag_value)
177 for trace_search in obj_value:
178 if "lobster-trace:" in (trace_search.text or ""):
179 if is_allowed:
180 # Allowed: add to activity_list
181 activity_obj = {"trace": trace_search.text, "activity": "first"}
182 activity_list.append(activity_obj)
183 else:
184 # Misplaced: not at allowed nesting
185 search_string = trace_search.text
186 dom_tree = minidom.parseString(file_content)
187 xml_content = dom_tree.toxml()
188 start_index = xml_content.find(search_string)
189 line_number = xml_content.count("\n", 0, start_index) + 2
190 misplaced_lobster_lines.append(line_number)
192 if misplaced_lobster_lines:
193 raise LOBSTER_Exception(
194 f"Misplaced LOBSTER tag(s) in file {filename}"
195 f" at line(s): {misplaced_lobster_lines}"
196 )
198 return activity_list
201def extract_lobster_traces_from_trace_analysis(tree, filename):
202 """
203 Extracts lobster traces from DESCRIPTION fields that are direct
204 children of ANALYSISITEMs of type 'episode' which are themselves
205 direct children of TRACE-ANALYSIS blocks.
207 - A valid lobster-trace is only allowed in a DESCRIPTION element
208 that is a direct child of an ANALYSISITEM with xsi:type="episode",
209 which itself is a direct child of TRACE-ANALYSIS.
210 - Any lobster-trace found in a DESCRIPTION elsewhere under
211 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations)
212 is considered misplaced and will generate a warning.
214 Returns:
215 tuple: (list of valid trace dicts, list of misplaced trace warning strings)
216 """
218 valid_traces = []
219 misplaced_traces = []
221 # Collect all valid DESCRIPTIONs (direct child of top-level episode)
222 valid_descs = set()
223 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS):
224 # Only consider ANALYSISITEMs of type 'episode' that are direct
225 # children of TRACE-ANALYSIS
226 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS):
227 if (
228 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode"
229 ):
230 # Only DESCRIPTION elements that are direct children of this episode
231 for child in episode:
232 if (
233 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and
234 child.text and
235 "lobster-trace:" in child.text
236 ):
237 valid_traces.append(
238 {
239 "trace": child.text.strip(),
240 "activity": "first",
241 "name": episode.findtext(
242 "ecu:NAME", default="", namespaces=NS
243 ),
244 "description": child.text.strip(),
245 "source": filename,
246 }
247 )
248 valid_descs.add(child)
250 # Now check for misplaced traces: any DESCRIPTION with lobster-trace
251 # under TRACE-ANALYSIS
252 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS):
253 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs:
254 msg = "WARNING: misplaced lobster-trace in " \
255 f"{filename}: {desc.text.strip()}"
256 if msg not in misplaced_traces: 256 ↛ 252line 256 didn't jump to line 252 because the condition on line 256 was always true
257 misplaced_traces.append(msg)
259 return valid_traces, misplaced_traces
262class PkgTool(MultiFileInputTool):
263 def __init__(self):
264 super().__init__(
265 name="pkg",
266 description="Extract tracing tags from pkg files for LOBSTER",
267 extensions=["pkg", "ta"],
268 official=True,
269 )
271 def _add_config_argument(self):
272 # This tool does not use a config file
273 pass
275 def lobster_pkg(self, options):
276 """
277 The main function to parse tracing information from .pkg files for LOBSTER.
279 This function processes the input files or directories specified in
280 'options.files', extracts tracing tags and activities from XML content
281 (including both standard and TRACE-ANALYSIS blocks), and writes the
282 results to an output file
284 Parameters
285 ----------
286 options (Namespace): Parsed command-line arguments with at least:
287 - dir_or_files: list of file or directory paths to process
288 - out: output file path (optional; if not set, output is report.lobster)
289 """
290 config = Config(
291 inputs=None,
292 inputs_from_file=None,
293 extensions=self._extensions,
294 exclude_patterns=None,
295 schema=Activity,
296 )
297 file_list = create_worklist(config, options.dir_or_files)
298 if not file_list:
299 raise ValueError("No input files found to process!")
301 data = {}
303 for file_path in file_list:
304 filename = Path(file_path).name
305 with open(file_path, "r", encoding="UTF-8") as file:
306 try:
307 file_content = file.read()
309 tree = ET.fromstring(file_content)
311 getvalues = xml_parser(file_content, filename)
313 # Also extract from TRACE-ANALYSIS blocks
314 valid_traces, misplaced_traces = (
315 extract_lobster_traces_from_trace_analysis(
316 tree, filename
317 )
318 )
319 getvalues.extend(valid_traces)
320 for msg in misplaced_traces:
321 print(msg)
323 if getvalues:
324 create_raw_entry(data, file.name, json.dumps(getvalues))
325 else:
326 create_default_activity(file_content, filename, data)
328 except ET.ParseError as err:
329 print(f"Error parsing XML file '{filename}' : {err}")
330 raise
331 except LOBSTER_Exception as err:
332 err.dump()
333 raise
335 items = (
336 list(data.values())
337 if not isinstance(data.values(), list)
338 else data.values()
339 )
340 self._write_output(
341 schema=config.schema,
342 out_file=options.out,
343 items=items,
344 )
346 def _run_impl(self, options: Namespace) -> int:
347 try:
348 self.lobster_pkg(options)
349 return 0
350 except (ValueError, FileNotFoundError,
351 LOBSTER_Exception, ET.ParseError) as exception:
352 print(
353 f"{self.name}: {exception}",
354 file=sys.stderr,
355 )
356 return 1
359def main(args: Optional[Sequence[str]] = None) -> int:
360 return PkgTool().run(args)