Coverage for lobster/tools/pkg/pkg.py: 68%
180 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-27 13:02 +0000
1#!/usr/bin/env python3
2#
3# lobster_pkg - Extract tracing values from xml file for LOBSTER
4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import os.path
20from pathlib import Path
21import xml.etree.ElementTree as ET
22import json
23import re
24from typing import Dict, List, Optional, Sequence
25from xml.dom import minidom
26from argparse import Namespace
28from lobster.common.exceptions import LOBSTER_Exception
29from lobster.common.io import lobster_write
30from lobster.common.items import Activity, Tracing_Tag
31from lobster.common.location import File_Reference
32from lobster.common.meta_data_tool_base import MetaDataToolBase
34NS = {
35 "ecu": "http://www.tracetronic.de/xml/ecu-test",
36 "xsi": "http://www.w3.org/2001/XMLSchema-instance",
37}
38TSBLOCK = "TsBlock"
41def get_valid_files(
42 file_dir: List[str]
43) -> List[str]:
44 file_list = []
45 for item in file_dir:
46 if os.path.isfile(item):
47 file_list.append(item)
48 elif os.path.isdir(item):
49 for path, _, files in os.walk(item):
50 for filename in files:
51 _, ext = os.path.splitext(filename)
52 if ext in (".pkg", ".ta"):
53 file_list.append(os.path.join(path, filename))
54 else:
55 raise FileNotFoundError("%s is not a file or directory" % item)
56 return file_list
59def write_to_file(options: Namespace, data: Dict[str, Activity]) -> None:
60 with open(options.out, "w", encoding="UTF-8") as file:
61 lobster_write(file, Activity, "lobster-pkg", data.values())
62 print("Written output for %u items to %s" % (len(data), options.out))
65def create_raw_entry(
66 data: Dict[str, Activity], file_name: str, trace_list: list
67) -> None:
69 activity_list = json.loads(trace_list)
70 # Collect all traces marked as "first"
71 traces = []
72 for item in activity_list:
73 if item.get("activity") == "first": 73 ↛ 72line 73 didn't jump to line 72 because the condition on line 73 was always true
74 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
75 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix
77 tag = Tracing_Tag("pkg", f"{file_name}")
78 loc = File_Reference(file_name)
79 data[tag.key()] = Activity(
80 tag=tag, location=loc, framework="lobster-pkg", kind="test"
81 )
82 for trace_value in traces:
83 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
85 # Handle other activities (if any)
86 for item in activity_list:
87 if item.get("activity") != "first": 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
89 action = item.get("action")
90 line = item.get("line")
91 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}")
92 loc = File_Reference(file_name, int(item.get("line")))
93 data[tag.key()] = Activity(
94 tag=tag, location=loc, framework="lobster-pkg", kind="test"
95 )
96 for trace_value in trace2[1:]:
97 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
100def create_default_activity(file_content, file_name: str,
101 data: Dict[str, Activity]) -> None:
102 # Only create a default Activity entry for packages with
103 # the TESTCASE tag
104 # Check for TESTCASE tag in INFORMATION/TAGS
105 tree = ET.fromstring(file_content)
106 info = tree.find(".//ecu:INFORMATION", NS)
107 is_testcase = False
108 if info is not None: 108 ↛ 119line 108 didn't jump to line 119 because the condition on line 108 was always true
109 tags = info.find(
110 ".//ecu:TAGS", NS
111 )
112 if tags is not None: 112 ↛ 119line 112 didn't jump to line 119 because the condition on line 112 was always true
113 for tag in tags.findall( 113 ↛ 119line 113 didn't jump to line 119 because the loop on line 113 didn't complete
114 ".//ecu:TAG", NS
115 ):
116 if (tag.text or "").strip().upper() == "TESTCASE": 116 ↛ 113line 116 didn't jump to line 113 because the condition on line 116 was always true
117 is_testcase = True
118 break
119 if is_testcase: 119 ↛ exitline 119 didn't return from function 'create_default_activity' because the condition on line 119 was always true
120 tag = Tracing_Tag("pkg", f"{file_name}")
121 loc = File_Reference(file_name)
122 data[tag.key()] = Activity(
123 tag=tag,
124 location=loc,
125 framework="lobster-pkg",
126 kind="test",
127 )
130def xml_parser(file_content, filename):
131 activity_list = []
132 misplaced_lobster_lines = []
133 tree = ET.fromstring(file_content)
135 info = tree.find(".//ecu:INFORMATION", NS)
136 is_testcase = False
137 if info is not None: 137 ↛ 144line 137 didn't jump to line 144 because the condition on line 137 was always true
138 tags = info.find(".//ecu:TAGS", NS)
139 if tags is not None: 139 ↛ 144line 139 didn't jump to line 144 because the condition on line 139 was always true
140 for tag in tags.findall(".//ecu:TAG", NS): 140 ↛ 144line 140 didn't jump to line 144 because the loop on line 140 didn't complete
141 if (tag.text or "").strip().upper() == "TESTCASE": 141 ↛ 140line 141 didn't jump to line 140 because the condition on line 141 was always true
142 is_testcase = True
143 break
144 if not is_testcase: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 return activity_list
147 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP"
148 tag_value = f"{{{NS['ecu']}}}VALUE"
150 # Find the parent TESTSTEPS element
151 teststeps_parent = tree.find(
152 ".//ecu:TESTSTEPS", NS
153 )
154 if teststeps_parent is None: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 return activity_list
157 # Find the first relevant TsBlock (first level under TESTSTEPS)
158 first_level_tsblocks = [
159 elem
160 for elem in teststeps_parent
161 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
162 ]
163 if not first_level_tsblocks: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 return activity_list
166 # The first TsBlock determines the allowed parent level
167 allowed_parent = first_level_tsblocks[0]
169 # Collect all TsBlocks that are direct children of the allowed parent
170 # (i.e., one level deeper)
171 allowed_tsblocks = [
172 elem
173 for elem in allowed_parent
174 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
175 ]
177 # Also allow the first TsBlock itself
178 allowed_tsblocks_set = set(allowed_tsblocks)
179 allowed_tsblocks_set.add(allowed_parent)
181 # Traverse all TsBlocks in the document
182 for tsblock in tree.iter(tag_teststep):
183 if tsblock.attrib.get("name") != TSBLOCK:
184 continue
186 # Check if this TsBlock is allowed
187 # (first TsBlock or direct child of first TsBlock)
188 is_allowed = False
189 if tsblock is allowed_parent:
190 is_allowed = True
191 else:
192 # xml.etree.ElementTree does not support getparent,
193 # so we check by structure:
194 # Is tsblock a direct child of allowed_parent?
195 for child in allowed_parent:
196 if child is tsblock: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 is_allowed = True
198 break
200 obj_value = tsblock.findall(".//" + tag_value)
201 for trace_search in obj_value:
202 if "lobster-trace:" in (trace_search.text or ""):
203 if is_allowed: 203 ↛ 209line 203 didn't jump to line 209 because the condition on line 203 was always true
204 # Allowed: add to activity_list
205 activity_obj = {"trace": trace_search.text, "activity": "first"}
206 activity_list.append(activity_obj)
207 else:
208 # Misplaced: not at allowed nesting
209 search_string = trace_search.text
210 dom_tree = minidom.parseString(file_content)
211 xml_content = dom_tree.toxml()
212 start_index = xml_content.find(search_string)
213 line_number = xml_content.count("\n", 0, start_index) + 2
214 misplaced_lobster_lines.append(line_number)
216 if misplaced_lobster_lines: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 raise LOBSTER_Exception(
218 f"Misplaced LOBSTER tag(s) in file {filename}"
219 f" at line(s): {misplaced_lobster_lines}"
220 )
222 return activity_list
225def extract_lobster_traces_from_trace_analysis(tree, filename):
226 """
227 Extracts lobster traces from DESCRIPTION fields that are direct
228 children of ANALYSISITEMs of type 'episode' which are themselves
229 direct children of TRACE-ANALYSIS blocks.
231 - A valid lobster-trace is only allowed in a DESCRIPTION element
232 that is a direct child of an ANALYSISITEM with xsi:type="episode",
233 which itself is a direct child of TRACE-ANALYSIS.
234 - Any lobster-trace found in a DESCRIPTION elsewhere under
235 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations)
236 is considered misplaced and will generate a warning.
238 Returns:
239 tuple: (list of valid trace dicts, list of misplaced trace warning strings)
240 """
242 valid_traces = []
243 misplaced_traces = []
245 # Collect all valid DESCRIPTIONs (direct child of top-level episode)
246 valid_descs = set()
247 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS):
248 # Only consider ANALYSISITEMs of type 'episode' that are direct
249 # children of TRACE-ANALYSIS
250 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS):
251 if ( 251 ↛ 250line 251 didn't jump to line 250 because the condition on line 251 was always true
252 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode"
253 ):
254 # Only DESCRIPTION elements that are direct children of this episode
255 for child in episode:
256 if (
257 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and
258 child.text and
259 "lobster-trace:" in child.text
260 ):
261 valid_traces.append(
262 {
263 "trace": child.text.strip(),
264 "activity": "first",
265 "name": episode.findtext(
266 "ecu:NAME", default="", namespaces=NS
267 ),
268 "description": child.text.strip(),
269 "source": filename,
270 }
271 )
272 valid_descs.add(child)
274 # Now check for misplaced traces: any DESCRIPTION with lobster-trace
275 # under TRACE-ANALYSIS
276 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS):
277 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs:
278 msg = "WARNING: misplaced lobster-trace in " \
279 f"{filename}: {desc.text.strip()}"
280 if msg not in misplaced_traces: 280 ↛ 276line 280 didn't jump to line 276 because the condition on line 280 was always true
281 misplaced_traces.append(msg)
283 return valid_traces, misplaced_traces
286def lobster_pkg(options):
287 """
288 The main function to parse tracing information from .pkg files for LOBSTER.
290 This function processes the input files or directories specified in 'options.files',
291 extracts tracing tags and activities from XML content (including both standard and
292 TRACE-ANALYSIS blocks), and writes the results to an output file
294 Parameters
295 ----------
296 options (Namespace): Parsed command-line arguments with at least:
297 - files: list of file or directory paths to process
298 - out: output file path (optional; if not set, output is report.lobster)
299 """
300 file_list = get_valid_files(options.files)
301 data = {}
303 for file_path in file_list:
304 filename = Path(file_path).name
305 with open(file_path, "r", encoding="UTF-8") as file:
306 try:
307 file_content = file.read()
309 tree = ET.fromstring(file_content)
311 getvalues = xml_parser(file_content, filename)
313 # Also extract from TRACE-ANALYSIS blocks
314 valid_traces, misplaced_traces = (
315 extract_lobster_traces_from_trace_analysis(
316 tree, filename
317 )
318 )
319 getvalues.extend(valid_traces)
320 for msg in misplaced_traces:
321 print(msg)
323 if getvalues:
324 create_raw_entry(data, file.name, json.dumps(getvalues))
325 else:
326 create_default_activity(file_content, filename, data)
328 except ET.ParseError as err:
329 print(f"Error parsing XML file '{filename}' : {err}")
330 raise
331 except LOBSTER_Exception as err:
332 err.dump()
333 raise
335 # Set default output file if not specified
336 output_file = getattr(options, "out", None)
337 if not output_file:
338 options.out = "report.lobster"
340 write_to_file(options, data)
342 return 0
345class PkgTool(MetaDataToolBase):
346 def __init__(self):
347 super().__init__(
348 name="pkg",
349 description="Extract tracing tags from pkg files for LOBSTER",
350 official=True,
351 )
352 self._argument_parser.add_argument(
353 "files",
354 nargs="+",
355 metavar="FILE|DIR",
356 help="Path to pkg file or directory.",
357 )
358 self._argument_parser.add_argument(
359 "--out",
360 default="report.lobster",
361 help="write output to this file; otherwise output is report.lobster",
362 )
364 def _run_impl(self, options: Namespace) -> int:
365 options = self._argument_parser.parse_args()
367 try:
368 lobster_pkg(options)
370 except ValueError as exception:
371 self._argument_parser.error(str(exception))
373 return 0
376def main(args: Optional[Sequence[str]] = None) -> int:
377 return PkgTool().run(args)