Coverage for lobster/tools/pkg/pkg.py: 0%

180 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-27 13:02 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_pkg - Extract tracing values from xml file for LOBSTER 

4# Copyright (C) 2024-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19import os.path 

20from pathlib import Path 

21import xml.etree.ElementTree as ET 

22import json 

23import re 

24from typing import Dict, List, Optional, Sequence 

25from xml.dom import minidom 

26from argparse import Namespace 

27 

28from lobster.common.exceptions import LOBSTER_Exception 

29from lobster.common.io import lobster_write 

30from lobster.common.items import Activity, Tracing_Tag 

31from lobster.common.location import File_Reference 

32from lobster.common.meta_data_tool_base import MetaDataToolBase 

33 

34NS = { 

35 "ecu": "http://www.tracetronic.de/xml/ecu-test", 

36 "xsi": "http://www.w3.org/2001/XMLSchema-instance", 

37} 

38TSBLOCK = "TsBlock" 

39 

40 

41def get_valid_files( 

42 file_dir: List[str] 

43) -> List[str]: 

44 file_list = [] 

45 for item in file_dir: 

46 if os.path.isfile(item): 

47 file_list.append(item) 

48 elif os.path.isdir(item): 

49 for path, _, files in os.walk(item): 

50 for filename in files: 

51 _, ext = os.path.splitext(filename) 

52 if ext in (".pkg", ".ta"): 

53 file_list.append(os.path.join(path, filename)) 

54 else: 

55 raise FileNotFoundError("%s is not a file or directory" % item) 

56 return file_list 

57 

58 

59def write_to_file(options: Namespace, data: Dict[str, Activity]) -> None: 

60 with open(options.out, "w", encoding="UTF-8") as file: 

61 lobster_write(file, Activity, "lobster-pkg", data.values()) 

62 print("Written output for %u items to %s" % (len(data), options.out)) 

63 

64 

65def create_raw_entry( 

66 data: Dict[str, Activity], file_name: str, trace_list: list 

67) -> None: 

68 

69 activity_list = json.loads(trace_list) 

70 # Collect all traces marked as "first" 

71 traces = [] 

72 for item in activity_list: 

73 if item.get("activity") == "first": 

74 trace_parts = [s.strip() for s in re.split(r"[:,]", item.get("trace"))] 

75 traces.extend(trace_parts[1:]) # skip the "lobster-trace" prefix 

76 

77 tag = Tracing_Tag("pkg", f"{file_name}") 

78 loc = File_Reference(file_name) 

79 data[tag.key()] = Activity( 

80 tag=tag, location=loc, framework="lobster-pkg", kind="test" 

81 ) 

82 for trace_value in traces: 

83 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value)) 

84 

85 # Handle other activities (if any) 

86 for item in activity_list: 

87 if item.get("activity") != "first": 

88 trace2 = [s.strip() for s in re.split(r"[:,]", item.get("trace"))] 

89 action = item.get("action") 

90 line = item.get("line") 

91 tag = Tracing_Tag("pkg", f"{file_name}::{action}::{line}") 

92 loc = File_Reference(file_name, int(item.get("line"))) 

93 data[tag.key()] = Activity( 

94 tag=tag, location=loc, framework="lobster-pkg", kind="test" 

95 ) 

96 for trace_value in trace2[1:]: 

97 data[tag.key()].add_tracing_target(Tracing_Tag("req", trace_value)) 

98 

99 

100def create_default_activity(file_content, file_name: str, 

101 data: Dict[str, Activity]) -> None: 

102 # Only create a default Activity entry for packages with 

103 # the TESTCASE tag 

104 # Check for TESTCASE tag in INFORMATION/TAGS 

105 tree = ET.fromstring(file_content) 

106 info = tree.find(".//ecu:INFORMATION", NS) 

107 is_testcase = False 

108 if info is not None: 

109 tags = info.find( 

110 ".//ecu:TAGS", NS 

111 ) 

112 if tags is not None: 

113 for tag in tags.findall( 

114 ".//ecu:TAG", NS 

115 ): 

116 if (tag.text or "").strip().upper() == "TESTCASE": 

117 is_testcase = True 

118 break 

119 if is_testcase: 

120 tag = Tracing_Tag("pkg", f"{file_name}") 

121 loc = File_Reference(file_name) 

122 data[tag.key()] = Activity( 

123 tag=tag, 

124 location=loc, 

125 framework="lobster-pkg", 

126 kind="test", 

127 ) 

128 

129 

130def xml_parser(file_content, filename): 

131 activity_list = [] 

132 misplaced_lobster_lines = [] 

133 tree = ET.fromstring(file_content) 

134 

135 info = tree.find(".//ecu:INFORMATION", NS) 

136 is_testcase = False 

137 if info is not None: 

138 tags = info.find(".//ecu:TAGS", NS) 

139 if tags is not None: 

140 for tag in tags.findall(".//ecu:TAG", NS): 

141 if (tag.text or "").strip().upper() == "TESTCASE": 

142 is_testcase = True 

143 break 

144 if not is_testcase: 

145 return activity_list 

146 

147 tag_teststep = f"{{{NS['ecu']}}}TESTSTEP" 

148 tag_value = f"{{{NS['ecu']}}}VALUE" 

149 

150 # Find the parent TESTSTEPS element 

151 teststeps_parent = tree.find( 

152 ".//ecu:TESTSTEPS", NS 

153 ) 

154 if teststeps_parent is None: 

155 return activity_list 

156 

157 # Find the first relevant TsBlock (first level under TESTSTEPS) 

158 first_level_tsblocks = [ 

159 elem 

160 for elem in teststeps_parent 

161 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK 

162 ] 

163 if not first_level_tsblocks: 

164 return activity_list 

165 

166 # The first TsBlock determines the allowed parent level 

167 allowed_parent = first_level_tsblocks[0] 

168 

169 # Collect all TsBlocks that are direct children of the allowed parent 

170 # (i.e., one level deeper) 

171 allowed_tsblocks = [ 

172 elem 

173 for elem in allowed_parent 

174 if elem.tag == tag_teststep and elem.attrib.get("name") == TSBLOCK 

175 ] 

176 

177 # Also allow the first TsBlock itself 

178 allowed_tsblocks_set = set(allowed_tsblocks) 

179 allowed_tsblocks_set.add(allowed_parent) 

180 

181 # Traverse all TsBlocks in the document 

182 for tsblock in tree.iter(tag_teststep): 

183 if tsblock.attrib.get("name") != TSBLOCK: 

184 continue 

185 

186 # Check if this TsBlock is allowed 

187 # (first TsBlock or direct child of first TsBlock) 

188 is_allowed = False 

189 if tsblock is allowed_parent: 

190 is_allowed = True 

191 else: 

192 # xml.etree.ElementTree does not support getparent, 

193 # so we check by structure: 

194 # Is tsblock a direct child of allowed_parent? 

195 for child in allowed_parent: 

196 if child is tsblock: 

197 is_allowed = True 

198 break 

199 

200 obj_value = tsblock.findall(".//" + tag_value) 

201 for trace_search in obj_value: 

202 if "lobster-trace:" in (trace_search.text or ""): 

203 if is_allowed: 

204 # Allowed: add to activity_list 

205 activity_obj = {"trace": trace_search.text, "activity": "first"} 

206 activity_list.append(activity_obj) 

207 else: 

208 # Misplaced: not at allowed nesting 

209 search_string = trace_search.text 

210 dom_tree = minidom.parseString(file_content) 

211 xml_content = dom_tree.toxml() 

212 start_index = xml_content.find(search_string) 

213 line_number = xml_content.count("\n", 0, start_index) + 2 

214 misplaced_lobster_lines.append(line_number) 

215 

216 if misplaced_lobster_lines: 

217 raise LOBSTER_Exception( 

218 f"Misplaced LOBSTER tag(s) in file {filename}" 

219 f" at line(s): {misplaced_lobster_lines}" 

220 ) 

221 

222 return activity_list 

223 

224 

225def extract_lobster_traces_from_trace_analysis(tree, filename): 

226 """ 

227 Extracts lobster traces from DESCRIPTION fields that are direct 

228 children of ANALYSISITEMs of type 'episode' which are themselves 

229 direct children of TRACE-ANALYSIS blocks. 

230 

231 - A valid lobster-trace is only allowed in a DESCRIPTION element 

232 that is a direct child of an ANALYSISITEM with xsi:type="episode", 

233 which itself is a direct child of TRACE-ANALYSIS. 

234 - Any lobster-trace found in a DESCRIPTION elsewhere under 

235 TRACE-ANALYSIS (e.g., in nested ANALYSISITEMs or other locations) 

236 is considered misplaced and will generate a warning. 

237 

238 Returns: 

239 tuple: (list of valid trace dicts, list of misplaced trace warning strings) 

240 """ 

241 

242 valid_traces = [] 

243 misplaced_traces = [] 

244 

245 # Collect all valid DESCRIPTIONs (direct child of top-level episode) 

246 valid_descs = set() 

247 for trace_analysis in tree.findall(".//ecu:TRACE-ANALYSIS", NS): 

248 # Only consider ANALYSISITEMs of type 'episode' that are direct 

249 # children of TRACE-ANALYSIS 

250 for episode in trace_analysis.findall("ecu:ANALYSISITEM", NS): 

251 if ( 

252 episode.attrib.get(f"{{{NS['xsi']}}}type") == "episode" 

253 ): 

254 # Only DESCRIPTION elements that are direct children of this episode 

255 for child in episode: 

256 if ( 

257 child.tag == f"{{{NS['ecu']}}}DESCRIPTION" and 

258 child.text and 

259 "lobster-trace:" in child.text 

260 ): 

261 valid_traces.append( 

262 { 

263 "trace": child.text.strip(), 

264 "activity": "first", 

265 "name": episode.findtext( 

266 "ecu:NAME", default="", namespaces=NS 

267 ), 

268 "description": child.text.strip(), 

269 "source": filename, 

270 } 

271 ) 

272 valid_descs.add(child) 

273 

274 # Now check for misplaced traces: any DESCRIPTION with lobster-trace 

275 # under TRACE-ANALYSIS 

276 for desc in trace_analysis.findall(".//ecu:DESCRIPTION", NS): 

277 if desc.text and "lobster-trace:" in desc.text and desc not in valid_descs: 

278 msg = "WARNING: misplaced lobster-trace in " \ 

279 f"{filename}: {desc.text.strip()}" 

280 if msg not in misplaced_traces: 

281 misplaced_traces.append(msg) 

282 

283 return valid_traces, misplaced_traces 

284 

285 

286def lobster_pkg(options): 

287 """ 

288 The main function to parse tracing information from .pkg files for LOBSTER. 

289 

290 This function processes the input files or directories specified in 'options.files', 

291 extracts tracing tags and activities from XML content (including both standard and 

292 TRACE-ANALYSIS blocks), and writes the results to an output file 

293 

294 Parameters 

295 ---------- 

296 options (Namespace): Parsed command-line arguments with at least: 

297 - files: list of file or directory paths to process 

298 - out: output file path (optional; if not set, output is report.lobster) 

299 """ 

300 file_list = get_valid_files(options.files) 

301 data = {} 

302 

303 for file_path in file_list: 

304 filename = Path(file_path).name 

305 with open(file_path, "r", encoding="UTF-8") as file: 

306 try: 

307 file_content = file.read() 

308 

309 tree = ET.fromstring(file_content) 

310 

311 getvalues = xml_parser(file_content, filename) 

312 

313 # Also extract from TRACE-ANALYSIS blocks 

314 valid_traces, misplaced_traces = ( 

315 extract_lobster_traces_from_trace_analysis( 

316 tree, filename 

317 ) 

318 ) 

319 getvalues.extend(valid_traces) 

320 for msg in misplaced_traces: 

321 print(msg) 

322 

323 if getvalues: 

324 create_raw_entry(data, file.name, json.dumps(getvalues)) 

325 else: 

326 create_default_activity(file_content, filename, data) 

327 

328 except ET.ParseError as err: 

329 print(f"Error parsing XML file '{filename}' : {err}") 

330 raise 

331 except LOBSTER_Exception as err: 

332 err.dump() 

333 raise 

334 

335 # Set default output file if not specified 

336 output_file = getattr(options, "out", None) 

337 if not output_file: 

338 options.out = "report.lobster" 

339 

340 write_to_file(options, data) 

341 

342 return 0 

343 

344 

345class PkgTool(MetaDataToolBase): 

346 def __init__(self): 

347 super().__init__( 

348 name="pkg", 

349 description="Extract tracing tags from pkg files for LOBSTER", 

350 official=True, 

351 ) 

352 self._argument_parser.add_argument( 

353 "files", 

354 nargs="+", 

355 metavar="FILE|DIR", 

356 help="Path to pkg file or directory.", 

357 ) 

358 self._argument_parser.add_argument( 

359 "--out", 

360 default="report.lobster", 

361 help="write output to this file; otherwise output is report.lobster", 

362 ) 

363 

364 def _run_impl(self, options: Namespace) -> int: 

365 options = self._argument_parser.parse_args() 

366 

367 try: 

368 lobster_pkg(options) 

369 

370 except ValueError as exception: 

371 self._argument_parser.error(str(exception)) 

372 

373 return 0 

374 

375 

376def main(args: Optional[Sequence[str]] = None) -> int: 

377 return PkgTool().run(args)