Coverage for lobster/tools/pkg/pkg.py: 90%
181 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 11:07 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 11:07 +0000
1#!/usr/bin/env python3
2#
3# lobster_pkg - Extract tracing values from xml file for LOBSTER
4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
19import os.path
20from pathlib import Path
21import sys
22import xml.etree.ElementTree as ET
23import json
24import re
25from typing import Dict, List, Optional, Sequence
26from xml.dom import minidom
27from argparse import Namespace
29from lobster.common.exceptions import LOBSTER_Exception
30from lobster.common.io import lobster_write
31from lobster.common.items import Activity, Tracing_Tag
32from lobster.common.location import File_Reference
33from lobster.common.meta_data_tool_base import MetaDataToolBase
35NS = {
36 "ecu": "http://www.tracetronic.de/xml/ecu-test",
37 "xsi": "http://www.w3.org/2001/XMLSchema-instance",
38}
39TSBLOCK = "TsBlock"
42def get_valid_files(
43 file_dir: List[str]
44) -> List[str]:
45 file_list = []
46 for item in file_dir:
47 if os.path.isfile(item):
48 file_list.append(item)
49 elif os.path.isdir(item):
50 for path, _, files in os.walk(item):
51 for filename in files:
52 _, ext = os.path.splitext(filename)
53 if ext in (".pkg", ".ta"): 53 ↛ 51line 53 didn't jump to line 51 because the condition on line 53 was always true
54 file_list.append(os.path.join(path, filename))
55 else:
56 raise FileNotFoundError("%s is not a file or directory" % item)
57 return file_list
60def write_to_file(options: Namespace, data: Dict[str, Activity]) -> None:
61 with open(options.out, "w", encoding="UTF-8") as file:
62 lobster_write(file, Activity, "lobster-pkg", data.values())
63 print("Written output for %u items to %s" % (len(data), options.out))
66def create_raw_entry(
67 data: Dict[str, Activity], file_name: str, trace_list: list
68) -> None:
70 activity_list = json.loads(trace_list)
71 # Collect all traces marked as "first"
72 traces = []
73 for item in activity_list:
74 if item.get("activity") == "first": 74 ↛ 73line 74 didn't jump to line 73 because the condition on line 74 was always true
75 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
76 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix
78 tag = Tracing_Tag("pkg", f"{file_name}")
79 loc = File_Reference(file_name)
80 data[tag.key()] = Activity(
81 tag=tag, location=loc, framework="lobster-pkg", kind="test"
82 )
83 for trace_value in traces:
84 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
86 # Handle other activities (if any)
87 for item in activity_list:
88 if item.get("activity") != "first": 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))]
90 action = item.get("action")
91 line = item.get("line")
92 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}")
93 loc = File_Reference(file_name, int(item.get("line")))
94 data[tag.key()] = Activity(
95 tag=tag, location=loc, framework="lobster-pkg", kind="test"
96 )
97 for trace_value in trace2[1:]:
98 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value))
101def create_default_activity(file_content, file_name: str,
102 data: Dict[str, Activity]) -> None:
103 # Only create a default Activity entry for packages with
104 # the TESTCASE tag
105 # Check for TESTCASE tag in INFORMATION/TAGS
106 tree = ET.fromstring(file_content)
107 info = tree.find(".//ecu:INFORMATION", NS)
108 is_testcase = False
109 if info is not None: 109 ↛ 120line 109 didn't jump to line 120 because the condition on line 109 was always true
110 tags = info.find(
111 ".//ecu:TAGS", NS
112 )
113 if tags is not None: 113 ↛ 120line 113 didn't jump to line 120 because the condition on line 113 was always true
114 for tag in tags.findall( 114 ↛ 120line 114 didn't jump to line 120 because the loop on line 114 didn't complete
115 ".//ecu:TAG", NS
116 ):
117 if (tag.text or "").strip().upper() == "TESTCASE": 117 ↛ 114line 117 didn't jump to line 114 because the condition on line 117 was always true
118 is_testcase = True
119 break
120 if is_testcase: 120 ↛ exitline 120 didn't return from function 'create_default_activity' because the condition on line 120 was always true
121 tag = Tracing_Tag("pkg", f"{file_name}")
122 loc = File_Reference(file_name)
123 data[tag.key()] = Activity(
124 tag=tag,
125 location=loc,
126 framework="lobster-pkg",
127 kind="test",
128 )
131def xml_parser(file_content, filename):
132 activity_list = []
133 misplaced_lobster_lines = []
134 tree = ET.fromstring(file_content)
136 info = tree.find(".//ecu:INFORMATION", NS)
137 is_testcase = False
138 if info is not None: 138 ↛ 145line 138 didn't jump to line 145 because the condition on line 138 was always true
139 tags = info.find(".//ecu:TAGS", NS)
140 if tags is not None: 140 ↛ 145line 140 didn't jump to line 145 because the condition on line 140 was always true
141 for tag in tags.findall(".//ecu:TAG", NS):
142 if (tag.text or "").strip().upper() == "TESTCASE":
143 is_testcase = True
144 break
145 if not is_testcase:
146 return activity_list
148 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP"
149 tag_value = f"{{{NS['ecu']}}}VALUE"
151 # Find the parent TESTSTEPS element
152 teststeps_parent = tree.find(
153 ".//ecu:TESTSTEPS", NS
154 )
155 if teststeps_parent is None:
156 return activity_list
158 # Find the first relevant TsBlock (first level under TESTSTEPS)
159 first_level_tsblocks = [
160 elem
161 for elem in teststeps_parent
162 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
163 ]
164 if not first_level_tsblocks: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 return activity_list
167 # The first TsBlock determines the allowed parent level
168 allowed_parent = first_level_tsblocks[0]
170 # Collect all TsBlocks that are direct children of the allowed parent
171 # (i.e., one level deeper)
172 allowed_tsblocks = [
173 elem
174 for elem in allowed_parent
175 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK
176 ]
178 # Also allow the first TsBlock itself
179 allowed_tsblocks_set = set(allowed_tsblocks)
180 allowed_tsblocks_set.add(allowed_parent)
182 # Traverse all TsBlocks in the document
183 for tsblock in tree.iter(tag_teststep):
184 if tsblock.attrib.get("name") != TSBLOCK:
185 continue
187 # Check if this TsBlock is allowed
188 # (first TsBlock or direct child of first TsBlock)
189 is_allowed = False
190 if tsblock is allowed_parent:
191 is_allowed = True
192 else:
193 # xml.etree.ElementTree does not support getparent,
194 # so we check by structure:
195 # Is tsblock a direct child of allowed_parent?
196 for child in allowed_parent:
197 if child is tsblock: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true
198 is_allowed = True
199 break
201 obj_value = tsblock.findall(".//" + tag_value)
202 for trace_search in obj_value:
203 if "lobster-trace:" in (trace_search.text or ""):
204 if is_allowed:
205 # Allowed: add to activity_list
206 activity_obj = {"trace": trace_search.text, "activity": "first"}
207 activity_list.append(activity_obj)
208 else:
209 # Misplaced: not at allowed nesting
210 search_string = trace_search.text
211 dom_tree = minidom.parseString(file_content)
212 xml_content = dom_tree.toxml()
213 start_index = xml_content.find(search_string)
214 line_number = xml_content.count("\n", 0, start_index) + 2
215 misplaced_lobster_lines.append(line_number)
217 if misplaced_lobster_lines:
218 raise LOBSTER_Exception(
219 f"Misplaced LOBSTER tag(s) in file {filename}"
220 f" at line(s): {misplaced_lobster_lines}"
221 )
223 return activity_list
226def extract_lobster_traces_from_trace_analysis(tree, filename):
227 """
228 Extracts lobster traces from DESCRIPTION fields that are direct
229 children of ANALYSISITEMs of type 'episode' which are themselves
230 direct children of TRACE-ANALYSIS blocks.
232 - A valid lobster-trace is only allowed in a DESCRIPTION element
233 that is a direct child of an ANALYSISITEM with xsi:type="episode",
234 which itself is a direct child of TRACE-ANALYSIS.
235 - Any lobster-trace found in a DESCRIPTION elsewhere under
236 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations)
237 is considered misplaced and will generate a warning.
239 Returns:
240 tuple: (list of valid trace dicts, list of misplaced trace warning strings)
241 """
243 valid_traces = []
244 misplaced_traces = []
246 # Collect all valid DESCRIPTIONs (direct child of top-level episode)
247 valid_descs = set()
248 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS):
249 # Only consider ANALYSISITEMs of type 'episode' that are direct
250 # children of TRACE-ANALYSIS
251 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS):
252 if (
253 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode"
254 ):
255 # Only DESCRIPTION elements that are direct children of this episode
256 for child in episode:
257 if (
258 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and
259 child.text and
260 "lobster-trace:" in child.text
261 ):
262 valid_traces.append(
263 {
264 "trace": child.text.strip(),
265 "activity": "first",
266 "name": episode.findtext(
267 "ecu:NAME", default="", namespaces=NS
268 ),
269 "description": child.text.strip(),
270 "source": filename,
271 }
272 )
273 valid_descs.add(child)
275 # Now check for misplaced traces: any DESCRIPTION with lobster-trace
276 # under TRACE-ANALYSIS
277 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS):
278 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs:
279 msg = "WARNING: misplaced lobster-trace in " \
280 f"{filename}: {desc.text.strip()}"
281 if msg not in misplaced_traces: 281 ↛ 277line 281 didn't jump to line 277 because the condition on line 281 was always true
282 misplaced_traces.append(msg)
284 return valid_traces, misplaced_traces
287def lobster_pkg(options):
288 """
289 The main function to parse tracing information from .pkg files for LOBSTER.
291 This function processes the input files or directories specified in 'options.files',
292 extracts tracing tags and activities from XML content (including both standard and
293 TRACE-ANALYSIS blocks), and writes the results to an output file
295 Parameters
296 ----------
297 options (Namespace): Parsed command-line arguments with at least:
298 - files: list of file or directory paths to process
299 - out: output file path (optional; if not set, output is report.lobster)
300 """
301 file_list = get_valid_files(options.files)
302 data = {}
304 for file_path in file_list:
305 filename = Path(file_path).name
306 with open(file_path, "r", encoding="UTF-8") as file:
307 try:
308 file_content = file.read()
310 tree = ET.fromstring(file_content)
312 getvalues = xml_parser(file_content, filename)
314 # Also extract from TRACE-ANALYSIS blocks
315 valid_traces, misplaced_traces = (
316 extract_lobster_traces_from_trace_analysis(
317 tree, filename
318 )
319 )
320 getvalues.extend(valid_traces)
321 for msg in misplaced_traces:
322 print(msg)
324 if getvalues:
325 create_raw_entry(data, file.name, json.dumps(getvalues))
326 else:
327 create_default_activity(file_content, filename, data)
329 except ET.ParseError as err:
330 print(f"Error parsing XML file '{filename}' : {err}")
331 raise
332 except LOBSTER_Exception as err:
333 err.dump()
334 raise
336 # Set default output file if not specified
337 output_file = getattr(options, "out", None)
338 if not output_file: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 options.out = "report.lobster"
341 write_to_file(options, data)
343 return 0
346class PkgTool(MetaDataToolBase):
347 def __init__(self):
348 super().__init__(
349 name="pkg",
350 description="Extract tracing tags from pkg files for LOBSTER",
351 official=True,
352 )
353 self._argument_parser.add_argument(
354 "files",
355 nargs="+",
356 metavar="FILE|DIR",
357 help="Path to pkg file or directory.",
358 )
359 self._argument_parser.add_argument(
360 "--out",
361 required=True,
362 help="write output to this file; otherwise output is report.lobster",
363 )
365 def _run_impl(self, options: Namespace) -> int:
366 try:
367 lobster_pkg(options)
368 return 0
369 except (ValueError, FileNotFoundError,
370 LOBSTER_Exception, ET.ParseError) as exception:
371 print(
372 f"{self.name}: {exception}",
373 file=sys.stderr,
374 )
375 return 1
378def main(args: Optional[Sequence[str]] = None) -> int:
379 return PkgTool().run(args)