Coverage for lobster/tools/pkg/pkg.py: 90%

181 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 11:07 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_pkg - Extract tracing values from xml file for LOBSTER 

4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19import os.path 

20from pathlib import Path 

21import sys 

22import xml.etree.ElementTree as ET 

23import json 

24import re 

25from typing import Dict, List, Optional, Sequence 

26from xml.dom import minidom 

27from argparse import Namespace 

28 

29from lobster.common.exceptions import LOBSTER_Exception 

30from lobster.common.io import lobster_write 

31from lobster.common.items import Activity, Tracing_Tag 

32from lobster.common.location import File_Reference 

33from lobster.common.meta_data_tool_base import MetaDataToolBase 

34 

35NS = { 

36 "ecu": "http://www.tracetronic.de/xml/ecu-test", 

37 "xsi": "http://www.w3.org/2001/XMLSchema-instance", 

38} 

39TSBLOCK = "TsBlock" 

40 

41 

42def get_valid_files( 

43 file_dir: List[str] 

44) -> List[str]: 

45 file_list = [] 

46 for item in file_dir: 

47 if os.path.isfile(item): 

48 file_list.append(item) 

49 elif os.path.isdir(item): 

50 for path, _, files in os.walk(item): 

51 for filename in files: 

52 _, ext = os.path.splitext(filename) 

53 if ext in (".pkg", ".ta"): 53 ↛ 51line 53 didn't jump to line 51 because the condition on line 53 was always true

54 file_list.append(os.path.join(path, filename)) 

55 else: 

56 raise FileNotFoundError("%s is not a file or directory" % item) 

57 return file_list 

58 

59 

60def write_to_file(options: Namespace, data: Dict[str, Activity]) -> None: 

61 with open(options.out, "w", encoding="UTF-8") as file: 

62 lobster_write(file, Activity, "lobster-pkg", data.values()) 

63 print("Written output for %u items to %s" % (len(data), options.out)) 

64 

65 

66def create_raw_entry( 

67 data: Dict[str, Activity], file_name: str, trace_list: list 

68) -> None: 

69 

70 activity_list = json.loads(trace_list) 

71 # Collect all traces marked as "first" 

72 traces = [] 

73 for item in activity_list: 

74 if item.get("activity") == "first": 74 ↛ 73line 74 didn't jump to line 73 because the condition on line 74 was always true

75 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))] 

76 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix 

77 

78 tag = Tracing_Tag("pkg", f"{file_name}") 

79 loc = File_Reference(file_name) 

80 data[tag.key()] = Activity( 

81 tag=tag, location=loc, framework="lobster-pkg", kind="test" 

82 ) 

83 for trace_value in traces: 

84 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value)) 

85 

86 # Handle other activities (if any) 

87 for item in activity_list: 

88 if item.get("activity") != "first": 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))] 

90 action = item.get("action") 

91 line = item.get("line") 

92 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}") 

93 loc = File_Reference(file_name, int(item.get("line"))) 

94 data[tag.key()] = Activity( 

95 tag=tag, location=loc, framework="lobster-pkg", kind="test" 

96 ) 

97 for trace_value in trace2[1:]: 

98 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value)) 

99 

100 

101def create_default_activity(file_content, file_name: str, 

102 data: Dict[str, Activity]) -> None: 

103 # Only create a default Activity entry for packages with 

104 # the TESTCASE tag 

105 # Check for TESTCASE tag in INFORMATION/TAGS 

106 tree = ET.fromstring(file_content) 

107 info = tree.find(".//ecu:INFORMATION", NS) 

108 is_testcase = False 

109 if info is not None: 109 ↛ 120line 109 didn't jump to line 120 because the condition on line 109 was always true

110 tags = info.find( 

111 ".//ecu:TAGS", NS 

112 ) 

113 if tags is not None: 113 ↛ 120line 113 didn't jump to line 120 because the condition on line 113 was always true

114 for tag in tags.findall( 114 ↛ 120line 114 didn't jump to line 120 because the loop on line 114 didn't complete

115 ".//ecu:TAG", NS 

116 ): 

117 if (tag.text or "").strip().upper() == "TESTCASE": 117 ↛ 114line 117 didn't jump to line 114 because the condition on line 117 was always true

118 is_testcase = True 

119 break 

120 if is_testcase: 120 ↛ exitline 120 didn't return from function 'create_default_activity' because the condition on line 120 was always true

121 tag = Tracing_Tag("pkg", f"{file_name}") 

122 loc = File_Reference(file_name) 

123 data[tag.key()] = Activity( 

124 tag=tag, 

125 location=loc, 

126 framework="lobster-pkg", 

127 kind="test", 

128 ) 

129 

130 

131def xml_parser(file_content, filename): 

132 activity_list = [] 

133 misplaced_lobster_lines = [] 

134 tree = ET.fromstring(file_content) 

135 

136 info = tree.find(".//ecu:INFORMATION", NS) 

137 is_testcase = False 

138 if info is not None: 138 ↛ 145line 138 didn't jump to line 145 because the condition on line 138 was always true

139 tags = info.find(".//ecu:TAGS", NS) 

140 if tags is not None: 140 ↛ 145line 140 didn't jump to line 145 because the condition on line 140 was always true

141 for tag in tags.findall(".//ecu:TAG", NS): 

142 if (tag.text or "").strip().upper() == "TESTCASE": 

143 is_testcase = True 

144 break 

145 if not is_testcase: 

146 return activity_list 

147 

148 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP" 

149 tag_value = f"{{{NS['ecu']}}}VALUE" 

150 

151 # Find the parent TESTSTEPS element 

152 teststeps_parent = tree.find( 

153 ".//ecu:TESTSTEPS", NS 

154 ) 

155 if teststeps_parent is None: 

156 return activity_list 

157 

158 # Find the first relevant TsBlock (first level under TESTSTEPS) 

159 first_level_tsblocks = [ 

160 elem 

161 for elem in teststeps_parent 

162 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK 

163 ] 

164 if not first_level_tsblocks: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 return activity_list 

166 

167 # The first TsBlock determines the allowed parent level 

168 allowed_parent = first_level_tsblocks[0] 

169 

170 # Collect all TsBlocks that are direct children of the allowed parent 

171 # (i.e., one level deeper) 

172 allowed_tsblocks = [ 

173 elem 

174 for elem in allowed_parent 

175 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK 

176 ] 

177 

178 # Also allow the first TsBlock itself 

179 allowed_tsblocks_set = set(allowed_tsblocks) 

180 allowed_tsblocks_set.add(allowed_parent) 

181 

182 # Traverse all TsBlocks in the document 

183 for tsblock in tree.iter(tag_teststep): 

184 if tsblock.attrib.get("name") != TSBLOCK: 

185 continue 

186 

187 # Check if this TsBlock is allowed 

188 # (first TsBlock or direct child of first TsBlock) 

189 is_allowed = False 

190 if tsblock is allowed_parent: 

191 is_allowed = True 

192 else: 

193 # xml.etree.ElementTree does not support getparent, 

194 # so we check by structure: 

195 # Is tsblock a direct child of allowed_parent? 

196 for child in allowed_parent: 

197 if child is tsblock: 197 ↛ 198line 197 didn't jump to line 198 because the condition on line 197 was never true

198 is_allowed = True 

199 break 

200 

201 obj_value = tsblock.findall(".//" + tag_value) 

202 for trace_search in obj_value: 

203 if "lobster-trace:" in (trace_search.text or ""): 

204 if is_allowed: 

205 # Allowed: add to activity_list 

206 activity_obj = {"trace": trace_search.text, "activity": "first"} 

207 activity_list.append(activity_obj) 

208 else: 

209 # Misplaced: not at allowed nesting 

210 search_string = trace_search.text 

211 dom_tree = minidom.parseString(file_content) 

212 xml_content = dom_tree.toxml() 

213 start_index = xml_content.find(search_string) 

214 line_number = xml_content.count("\n", 0, start_index) + 2 

215 misplaced_lobster_lines.append(line_number) 

216 

217 if misplaced_lobster_lines: 

218 raise LOBSTER_Exception( 

219 f"Misplaced LOBSTER tag(s) in file {filename}" 

220 f" at line(s): {misplaced_lobster_lines}" 

221 ) 

222 

223 return activity_list 

224 

225 

226def extract_lobster_traces_from_trace_analysis(tree, filename): 

227 """ 

228 Extracts lobster traces from DESCRIPTION fields that are direct 

229 children of ANALYSISITEMs of type 'episode' which are themselves 

230 direct children of TRACE-ANALYSIS blocks. 

231 

232 - A valid lobster-trace is only allowed in a DESCRIPTION element 

233 that is a direct child of an ANALYSISITEM with xsi:type="episode", 

234 which itself is a direct child of TRACE-ANALYSIS. 

235 - Any lobster-trace found in a DESCRIPTION elsewhere under 

236 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations) 

237 is considered misplaced and will generate a warning. 

238 

239 Returns: 

240 tuple: (list of valid trace dicts, list of misplaced trace warning strings) 

241 """ 

242 

243 valid_traces = [] 

244 misplaced_traces = [] 

245 

246 # Collect all valid DESCRIPTIONs (direct child of top-level episode) 

247 valid_descs = set() 

248 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS): 

249 # Only consider ANALYSISITEMs of type 'episode' that are direct 

250 # children of TRACE-ANALYSIS 

251 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS): 

252 if ( 

253 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode" 

254 ): 

255 # Only DESCRIPTION elements that are direct children of this episode 

256 for child in episode: 

257 if ( 

258 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and 

259 child.text and 

260 "lobster-trace:" in child.text 

261 ): 

262 valid_traces.append( 

263 { 

264 "trace": child.text.strip(), 

265 "activity": "first", 

266 "name": episode.findtext( 

267 "ecu:NAME", default="", namespaces=NS 

268 ), 

269 "description": child.text.strip(), 

270 "source": filename, 

271 } 

272 ) 

273 valid_descs.add(child) 

274 

275 # Now check for misplaced traces: any DESCRIPTION with lobster-trace 

276 # under TRACE-ANALYSIS 

277 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS): 

278 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs: 

279 msg = "WARNING: misplaced lobster-trace in " \ 

280 f"{filename}: {desc.text.strip()}" 

281 if msg not in misplaced_traces: 281 ↛ 277line 281 didn't jump to line 277 because the condition on line 281 was always true

282 misplaced_traces.append(msg) 

283 

284 return valid_traces, misplaced_traces 

285 

286 

287def lobster_pkg(options): 

288 """ 

289 The main function to parse tracing information from .pkg files for LOBSTER. 

290 

291 This function processes the input files or directories specified in 'options.files', 

292 extracts tracing tags and activities from XML content (including both standard and 

293 TRACE-ANALYSIS blocks), and writes the results to an output file 

294 

295 Parameters 

296 ---------- 

297 options (Namespace): Parsed command-line arguments with at least: 

298 - files: list of file or directory paths to process 

299 - out: output file path (optional; if not set, output is report.lobster) 

300 """ 

301 file_list = get_valid_files(options.files) 

302 data = {} 

303 

304 for file_path in file_list: 

305 filename = Path(file_path).name 

306 with open(file_path, "r", encoding="UTF-8") as file: 

307 try: 

308 file_content = file.read() 

309 

310 tree = ET.fromstring(file_content) 

311 

312 getvalues = xml_parser(file_content, filename) 

313 

314 # Also extract from TRACE-ANALYSIS blocks 

315 valid_traces, misplaced_traces = ( 

316 extract_lobster_traces_from_trace_analysis( 

317 tree, filename 

318 ) 

319 ) 

320 getvalues.extend(valid_traces) 

321 for msg in misplaced_traces: 

322 print(msg) 

323 

324 if getvalues: 

325 create_raw_entry(data, file.name, json.dumps(getvalues)) 

326 else: 

327 create_default_activity(file_content, filename, data) 

328 

329 except ET.ParseError as err: 

330 print(f"Error parsing XML file '{filename}' : {err}") 

331 raise 

332 except LOBSTER_Exception as err: 

333 err.dump() 

334 raise 

335 

336 # Set default output file if not specified 

337 output_file = getattr(options, "out", None) 

338 if not output_file: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 options.out = "report.lobster" 

340 

341 write_to_file(options, data) 

342 

343 return 0 

344 

345 

346class PkgTool(MetaDataToolBase): 

347 def __init__(self): 

348 super().__init__( 

349 name="pkg", 

350 description="Extract tracing tags from pkg files for LOBSTER", 

351 official=True, 

352 ) 

353 self._argument_parser.add_argument( 

354 "files", 

355 nargs="+", 

356 metavar="FILE|DIR", 

357 help="Path to pkg file or directory.", 

358 ) 

359 self._argument_parser.add_argument( 

360 "--out", 

361 required=True, 

362 help="write output to this file; otherwise output is report.lobster", 

363 ) 

364 

365 def _run_impl(self, options: Namespace) -> int: 

366 try: 

367 lobster_pkg(options) 

368 return 0 

369 except (ValueError, FileNotFoundError, 

370 LOBSTER_Exception, ET.ParseError) as exception: 

371 print( 

372 f"{self.name}: {exception}", 

373 file=sys.stderr, 

374 ) 

375 return 1 

376 

377 

378def main(args: Optional[Sequence[str]] = None) -> int: 

379 return PkgTool().run(args)