Coverage for lobster/tools/pkg/pkg.py: 90%

170 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-12 15:02 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_pkg - Extract tracing values from xml file for LOBSTER 

4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19from pathlib import Path 

20import sys 

21import xml.etree.ElementTree as ET 

22import json 

23import re 

24from typing import Dict, Optional, Sequence 

25from xml.dom import minidom 

26from argparse import Namespace 

27from dataclasses import dataclass 

28 

29from lobster.common.multi_file_input_config import Config 

30from lobster.common.multi_file_input_tool import create_worklist, MultiFileInputTool 

31from lobster.common.exceptions import LOBSTER_Exception 

32from lobster.common.items import Activity, Tracing_Tag 

33from lobster.common.location import File_Reference 

34 

35NS = { 

36 "ecu": "http://www.tracetronic.de/xml/ecu-test", 

37 "xsi": "http://www.w3.org/2001/XMLSchema-instance", 

38} 

39TSBLOCK = "TsBlock" 

40 

41 

42@dataclass 

43class PkgToolConfig: 

44 files: Sequence[Path] 

45 out: Optional[Path] = None 

46 

47 

48def create_raw_entry( 

49 data: Dict[str, Activity], file_name: str, trace_list: list 

50) -> None: 

51 

52 activity_list = json.loads(trace_list) 

53 # Collect all traces marked as "first" 

54 traces = [] 

55 for item in activity_list: 

56 if item.get("activity") == "first": 56 ↛ 55line 56 didn't jump to line 55 because the condition on line 56 was always true

57 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))] 

58 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix 

59 

60 tag = Tracing_Tag("pkg", f"{file_name}") 

61 loc = File_Reference(file_name) 

62 data[tag.key()] = Activity( 

63 tag=tag, location=loc, framework="lobster-pkg", kind="test" 

64 ) 

65 for trace_value in traces: 

66 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value)) 

67 

68 # Handle other activities (if any) 

69 for item in activity_list: 

70 if item.get("activity") != "first": 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))] 

72 action = item.get("action") 

73 line = item.get("line") 

74 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}") 

75 loc = File_Reference(file_name, int(item.get("line"))) 

76 data[tag.key()] = Activity( 

77 tag=tag, location=loc, framework="lobster-pkg", kind="test" 

78 ) 

79 for trace_value in trace2[1:]: 

80 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value)) 

81 

82 

83def create_default_activity(file_content, file_name: str, 

84 data: Dict[str, Activity]) -> None: 

85 # Only create a default Activity entry for packages with 

86 # the TESTCASE tag 

87 # Check for TESTCASE tag in INFORMATION/TAGS 

88 tree = ET.fromstring(file_content) 

89 info = tree.find(".//ecu:INFORMATION", NS) 

90 is_testcase = False 

91 if info is not None: 91 ↛ 102line 91 didn't jump to line 102 because the condition on line 91 was always true

92 tags = info.find( 

93 ".//ecu:TAGS", NS 

94 ) 

95 if tags is not None: 95 ↛ 102line 95 didn't jump to line 102 because the condition on line 95 was always true

96 for tag in tags.findall( 96 ↛ 102line 96 didn't jump to line 102 because the loop on line 96 didn't complete

97 ".//ecu:TAG", NS 

98 ): 

99 if (tag.text or "").strip().upper() == "TESTCASE": 99 ↛ 96line 99 didn't jump to line 96 because the condition on line 99 was always true

100 is_testcase = True 

101 break 

102 if is_testcase: 102 ↛ exitline 102 didn't return from function 'create_default_activity' because the condition on line 102 was always true

103 tag = Tracing_Tag("pkg", f"{file_name}") 

104 loc = File_Reference(file_name) 

105 data[tag.key()] = Activity( 

106 tag=tag, 

107 location=loc, 

108 framework="lobster-pkg", 

109 kind="test", 

110 ) 

111 

112 

113def xml_parser(file_content, filename): 

114 activity_list = [] 

115 misplaced_lobster_lines = [] 

116 tree = ET.fromstring(file_content) 

117 

118 info = tree.find(".//ecu:INFORMATION", NS) 

119 is_testcase = False 

120 if info is not None: 120 ↛ 127line 120 didn't jump to line 127 because the condition on line 120 was always true

121 tags = info.find(".//ecu:TAGS", NS) 

122 if tags is not None: 122 ↛ 127line 122 didn't jump to line 127 because the condition on line 122 was always true

123 for tag in tags.findall(".//ecu:TAG", NS): 

124 if (tag.text or "").strip().upper() == "TESTCASE": 

125 is_testcase = True 

126 break 

127 if not is_testcase: 

128 return activity_list 

129 

130 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP" 

131 tag_value = f"{{{NS['ecu']}}}VALUE" 

132 

133 # Find the parent TESTSTEPS element 

134 teststeps_parent = tree.find( 

135 ".//ecu:TESTSTEPS", NS 

136 ) 

137 if teststeps_parent is None: 

138 return activity_list 

139 

140 # Find the first relevant TsBlock (first level under TESTSTEPS) 

141 first_level_tsblocks = [ 

142 elem 

143 for elem in teststeps_parent 

144 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK 

145 ] 

146 if not first_level_tsblocks: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 return activity_list 

148 

149 # The first TsBlock determines the allowed parent level 

150 allowed_parent = first_level_tsblocks[0] 

151 

152 # Collect all TsBlocks that are direct children of the allowed parent 

153 # (i.e., one level deeper) 

154 allowed_tsblocks = [ 

155 elem 

156 for elem in allowed_parent 

157 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK 

158 ] 

159 

160 # Also allow the first TsBlock itself 

161 allowed_tsblocks_set = set(allowed_tsblocks) 

162 allowed_tsblocks_set.add(allowed_parent) 

163 

164 # Traverse all TsBlocks in the document 

165 for tsblock in tree.iter(tag_teststep): 

166 if tsblock.attrib.get("name") != TSBLOCK: 

167 continue 

168 

169 # Check if this TsBlock is allowed 

170 # (first TsBlock or direct child of first TsBlock) 

171 is_allowed = False 

172 if tsblock is allowed_parent: 

173 is_allowed = True 

174 else: 

175 # xml.etree.ElementTree does not support getparent, 

176 # so we check by structure: 

177 # Is tsblock a direct child of allowed_parent? 

178 for child in allowed_parent: 

179 if child is tsblock: 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 is_allowed = True 

181 break 

182 

183 obj_value = tsblock.findall(".//" + tag_value) 

184 for trace_search in obj_value: 

185 if "lobster-trace:" in (trace_search.text or ""): 

186 if is_allowed: 

187 # Allowed: add to activity_list 

188 activity_obj = {"trace": trace_search.text, "activity": "first"} 

189 activity_list.append(activity_obj) 

190 else: 

191 # Misplaced: not at allowed nesting 

192 search_string = trace_search.text 

193 dom_tree = minidom.parseString(file_content) 

194 xml_content = dom_tree.toxml() 

195 start_index = xml_content.find(search_string) 

196 line_number = xml_content.count("\n", 0, start_index) + 2 

197 misplaced_lobster_lines.append(line_number) 

198 

199 if misplaced_lobster_lines: 

200 raise LOBSTER_Exception( 

201 f"Misplaced LOBSTER tag(s) in file {filename}" 

202 f" at line(s): {misplaced_lobster_lines}" 

203 ) 

204 

205 return activity_list 

206 

207 

208def extract_lobster_traces_from_trace_analysis(tree, filename): 

209 """ 

210 Extracts lobster traces from DESCRIPTION fields that are direct 

211 children of ANALYSISITEMs of type 'episode' which are themselves 

212 direct children of TRACE-ANALYSIS blocks. 

213 

214 - A valid lobster-trace is only allowed in a DESCRIPTION element 

215 that is a direct child of an ANALYSISITEM with xsi:type="episode", 

216 which itself is a direct child of TRACE-ANALYSIS. 

217 - Any lobster-trace found in a DESCRIPTION elsewhere under 

218 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations) 

219 is considered misplaced and will generate a warning. 

220 

221 Returns: 

222 tuple: (list of valid trace dicts, list of misplaced trace warning strings) 

223 """ 

224 

225 valid_traces = [] 

226 misplaced_traces = [] 

227 

228 # Collect all valid DESCRIPTIONs (direct child of top-level episode) 

229 valid_descs = set() 

230 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS): 

231 # Only consider ANALYSISITEMs of type 'episode' that are direct 

232 # children of TRACE-ANALYSIS 

233 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS): 

234 if ( 

235 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode" 

236 ): 

237 # Only DESCRIPTION elements that are direct children of this episode 

238 for child in episode: 

239 if ( 

240 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and 

241 child.text and 

242 "lobster-trace:" in child.text 

243 ): 

244 valid_traces.append( 

245 { 

246 "trace": child.text.strip(), 

247 "activity": "first", 

248 "name": episode.findtext( 

249 "ecu:NAME", default="", namespaces=NS 

250 ), 

251 "description": child.text.strip(), 

252 "source": filename, 

253 } 

254 ) 

255 valid_descs.add(child) 

256 

257 # Now check for misplaced traces: any DESCRIPTION with lobster-trace 

258 # under TRACE-ANALYSIS 

259 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS): 

260 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs: 

261 msg = "WARNING: misplaced lobster-trace in " \ 

262 f"{filename}: {desc.text.strip()}" 

263 if msg not in misplaced_traces: 263 ↛ 259line 263 didn't jump to line 259 because the condition on line 263 was always true

264 misplaced_traces.append(msg) 

265 

266 return valid_traces, misplaced_traces 

267 

268 

269class PkgTool(MultiFileInputTool): 

270 def __init__(self): 

271 super().__init__( 

272 name="pkg", 

273 description="Extract tracing tags from pkg files for LOBSTER", 

274 extensions=["pkg", "ta"], 

275 official=True, 

276 ) 

277 

278 def _add_config_argument(self): 

279 # This tool does not use a config file 

280 pass 

281 

282 def run_from_config(self, pkg_config: PkgToolConfig) -> None: 

283 """ 

284 The main function to parse tracing information from .pkg files for LOBSTER. 

285 

286 This function processes the input files or directories specified in 

287 ``pkg_config.files``, extracts tracing tags and activities from XML 

288 content (including both standard and TRACE-ANALYSIS blocks), and 

289 writes the results to an output file. 

290 

291 Parameters 

292 ---------- 

293 pkg_config (PKGToolConfig): Typed configuration with at least: 

294 - files: list of file or directory paths to process 

295 - out: output file path (optional; if not set, 

296 output is lobster-pkg.lobster) 

297 """ 

298 config = Config( 

299 inputs=None, 

300 inputs_from_file=None, 

301 extensions=self._extensions, 

302 exclude_patterns=None, 

303 schema=Activity, 

304 ) 

305 file_list = create_worklist( 

306 config, 

307 [str(path) for path in pkg_config.files], 

308 ) 

309 if not file_list: 

310 raise ValueError("No input files found to process!") 

311 

312 data = {} 

313 

314 for file_path in file_list: 

315 filename = Path(file_path).name 

316 with open(file_path, "r", encoding="UTF-8") as file: 

317 try: 

318 file_content = file.read() 

319 

320 tree = ET.fromstring(file_content) 

321 

322 getvalues = xml_parser(file_content, filename) 

323 

324 # Also extract from TRACE-ANALYSIS blocks 

325 valid_traces, misplaced_traces = ( 

326 extract_lobster_traces_from_trace_analysis( 

327 tree, filename 

328 ) 

329 ) 

330 getvalues.extend(valid_traces) 

331 for msg in misplaced_traces: 

332 print(msg) 

333 

334 if getvalues: 

335 create_raw_entry(data, file.name, json.dumps(getvalues)) 

336 else: 

337 create_default_activity(file_content, filename, data) 

338 

339 except ET.ParseError as err: 

340 print(f"Error parsing XML file '{filename}' : {err}") 

341 raise 

342 except LOBSTER_Exception as err: 

343 err.dump() 

344 raise 

345 

346 items = ( 

347 list(data.values()) 

348 if not isinstance(data.values(), list) 

349 else data.values() 

350 ) 

351 self._write_output( 

352 schema=config.schema, 

353 out_file=pkg_config.out, 

354 items=items, 

355 ) 

356 

357 def _run_impl(self, options: Namespace) -> int: 

358 """ 

359 Parse CLI options and run package trace extraction. 

360 

361 This CLI entrypoint converts parsed command-line arguments to 

362 ``PKGToolConfig``, then delegates processing to the API function. 

363 

364 Parameters 

365 ---------- 

366 options (Namespace): Parsed command-line arguments with at least: 

367 - dir_or_files: list of file or directory paths to process 

368 - out: output file path (optional) 

369 

370 Returns 

371 ------- 

372 int: 0 on success, 1 on handled error. 

373 """ 

374 try: 

375 lobster_pkg( 

376 PkgToolConfig( 

377 files=[Path(path) for path in options.dir_or_files], 

378 out=Path(options.out) if options.out else None, 

379 ) 

380 ) 

381 return 0 

382 except (ValueError, FileNotFoundError, 

383 LOBSTER_Exception, ET.ParseError) as exception: 

384 print( 

385 f"{self.name}: {exception}", 

386 file=sys.stderr, 

387 ) 

388 return 1 

389 

390 

391def lobster_pkg(config: PkgToolConfig) -> None: 

392 """ 

393 This is an API function. 

394 

395 Expected config attributes: 

396 - files: list of input files/directories 

397 - out: output file path (optional) 

398 """ 

399 PkgTool().run_from_config(config) 

400 

401 

402def main(args: Optional[Sequence[str]] = None) -> int: 

403 return PkgTool().run(args)