Coverage for lobster/tools/codebeamer/codebeamer.py: 57%

255 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-26 14:55 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_codebeamer - Extract codebeamer items for LOBSTER 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20# This tool is based on the codebeamer Rest API v3, as documented here: 

21# https://codebeamer.com/cb/wiki/11631738 

22# 

23# There are some assumptions encoded here that are not clearly 

24# documented, in that items have a type and the type has a name. 

25# 

26# 

27# Main limitations: 

28# * Item descriptions are ignored right now 

29# * Branches (if they exist) are ignored 

30# * We only ever fetch the HEAD item 

31# 

32# However you _can_ import all the items referenced from another 

33# lobster artefact. 

34 

35import os 

36import time 

37import sys 

38import argparse 

39import netrc 

40from typing import List, Optional, Set, Union 

41from urllib.parse import quote, urlparse 

42from enum import Enum 

43import requests 

44import yaml 

45 

46from lobster.items import Tracing_Tag, Requirement, Implementation, Activity 

47from lobster.location import Codebeamer_Reference 

48from lobster.errors import Message_Handler, LOBSTER_Error 

49from lobster.io import lobster_read, lobster_write 

50from lobster.tools.codebeamer.bearer_auth import BearerAuth 

51from lobster.tools.codebeamer.config import AuthenticationConfig, Config 

52from lobster.version import get_version 

53 

54 

55class CodebeamerError(Exception): 

56 pass 

57 

58 

59class SupportedConfigKeys(Enum): 

60 """Helper class to define supported configuration keys.""" 

61 NUM_REQUEST_RETRY = "num_request_retry" 

62 RETRY_ERROR_CODES = "retry_error_codes" 

63 IMPORT_TAGGED = "import_tagged" 

64 IMPORT_QUERY = "import_query" 

65 VERIFY_SSL = "verify_ssl" 

66 PAGE_SIZE = "page_size" 

67 REFS = "refs" 

68 SCHEMA = "schema" 

69 CB_TOKEN = "token" 

70 CB_ROOT = "root" 

71 CB_USER = "user" 

72 CB_PASS = "pass" 

73 TIMEOUT = "timeout" 

74 OUT = "out" 

75 

76 @classmethod 

77 def as_set(cls) -> set: 

78 return {parameter.value for parameter in cls} 

79 

80 

81def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase: 

82 if cb_auth_config.token: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true

83 return BearerAuth(cb_auth_config.token) 

84 return requests.auth.HTTPBasicAuth(cb_auth_config.user, 

85 cb_auth_config.password) 

86 

87 

88def query_cb_single(cb_config: Config, url: str): 

89 if cb_config.num_request_retry <= 0: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 raise ValueError("Retry is disabled (num_request_retry is set to 0). " 

91 "Cannot proceed with retries.") 

92 

93 for attempt in range(1, cb_config.num_request_retry + 1): 

94 try: 

95 result = requests.get( 

96 url, 

97 auth=get_authentication(cb_config.cb_auth_conf), 

98 timeout=cb_config.timeout, 

99 verify=cb_config.verify_ssl, 

100 ) 

101 if result.status_code == 200: 

102 return result.json() 

103 

104 if result.status_code in cb_config.retry_error_codes: 

105 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] " 

106 f"Retryable error: {result.status_code}") 

107 time.sleep(1) # wait a bit before retrying 

108 continue 

109 

110 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] Failed with " 

111 f"status {result.status_code}") 

112 break 

113 

114 except requests.exceptions.ReadTimeout: 

115 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] Timeout when " 

116 f"fetching {url}") 

117 except requests.exceptions.RequestException as err: 

118 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] Request error: " 

119 f"{err}") 

120 break 

121 

122 # Final error handling after all retries 

123 print(f"Could not fetch {url}.") 

124 print("You can either:") 

125 print("* increase the timeout with the timeout parameter") 

126 print("* decrease the query size with the query_size parameter") 

127 print("* increase the retry count with the parameters (num_request_retry, " 

128 "retry_error_codes)") 

129 sys.exit(1) 

130 

131 

132def get_single_item(cb_config: Config, item_id: int): 

133 if not isinstance(item_id, int) or (item_id <= 0): 

134 raise ValueError("item_id must be a positive integer") 

135 url = f"{cb_config.base}/items/{item_id}" 

136 return query_cb_single(cb_config, url) 

137 

138 

139def get_many_items(cb_config: Config, item_ids: Set[int]): 

140 assert isinstance(item_ids, set) 

141 

142 rv = [] 

143 

144 page_id = 1 

145 query_string = quote(f"item.id IN " 

146 f"({','.join(str(item_id) for item_id in item_ids)})") 

147 

148 while True: 

149 base_url = "%s/items/query?page=%u&pageSize=%u&queryString=%s"\ 

150 % (cb_config.base, page_id, 

151 cb_config.page_size, query_string) 

152 data = query_cb_single(cb_config, base_url) 

153 rv += data["items"] 

154 if len(rv) == data["total"]: 

155 break 

156 page_id += 1 

157 

158 return rv 

159 

160 

161def get_query(cb_config: Config, query: Union[int, str]): 

162 assert isinstance(query, (int, str)) 

163 rv = [] 

164 url = "" 

165 page_id = 1 

166 total_items = None 

167 

168 while total_items is None or len(rv) < total_items: 

169 print("Fetching page %u of query..." % page_id) 

170 if isinstance(query, int): 170 ↛ 176line 170 didn't jump to line 176 because the condition on line 170 was always true

171 url = ("%s/reports/%u/items?page=%u&pageSize=%u" % 

172 (cb_config.base, 

173 query, 

174 page_id, 

175 cb_config.page_size)) 

176 elif isinstance(query, str): 

177 url = ("%s/items/query?page=%u&pageSize=%u&queryString=%s" % 

178 (cb_config.base, 

179 page_id, 

180 cb_config.page_size, 

181 query)) 

182 data = query_cb_single(cb_config, url) 

183 assert len(data) == 4 

184 

185 if page_id == 1 and len(data["items"]) == 0: 

186 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message 

187 print("This query doesn't generate items. Please check:") 

188 print(" * is the number actually correct?") 

189 print(" * do you have permissions to access it?") 

190 print(f"You can try to access '{url}' manually to check.") 

191 

192 if page_id != data["page"]: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 raise CodebeamerError(f"Page mismatch in query result: expected page " 

194 f"{page_id} from codebeamer, but got {data['page']}") 

195 

196 if page_id == 1: 

197 total_items = data["total"] 

198 elif total_items != data["total"]: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 raise CodebeamerError(f"Item count mismatch in query result: expected " 

200 f"{total_items} items so far, but page " 

201 f"{data['page']} claims to have sent {data['total']} " 

202 f"items in total.") 

203 

204 if isinstance(query, int): 204 ↛ 207line 204 didn't jump to line 207 because the condition on line 204 was always true

205 rv += [to_lobster(cb_config, cb_item["item"]) 

206 for cb_item in data["items"]] 

207 elif isinstance(query, str): 

208 rv += [to_lobster(cb_config, cb_item) 

209 for cb_item in data["items"]] 

210 

211 page_id += 1 

212 

213 assert total_items == len(rv) 

214 

215 return rv 

216 

217 

218def get_schema_config(cb_config: Config) -> dict: 

219 """ 

220 The function returns a schema map based on the schema mentioned 

221 in the cb_config dictionary. 

222 

223 If there is no match, it raises a KeyError. 

224 

225 Positional arguments: 

226 cb_config -- configuration dictionary containing the schema. 

227 

228 Returns: 

229 A dictionary containing the namespace and class associated with the schema. 

230 

231 Raises: 

232 KeyError -- if the provided schema is not supported. 

233 """ 

234 schema_map = { 

235 'requirement': {"namespace": "req", "class": Requirement}, 

236 'implementation': {"namespace": "imp", "class": Implementation}, 

237 'activity': {"namespace": "act", "class": Activity}, 

238 } 

239 schema = cb_config.schema.lower() 

240 

241 if schema not in schema_map: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.") 

243 

244 return schema_map[schema] 

245 

246 

247def to_lobster(cb_config: Config, cb_item: dict): 

248 assert isinstance(cb_item, dict) and "id" in cb_item 

249 

250 # This looks like it's business logic, maybe we should make this 

251 # configurable? 

252 

253 categories = cb_item.get("categories") 

254 if categories: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 kind = categories[0].get("name", "codebeamer item") 

256 else: 

257 kind = "codebeamer item" 

258 

259 status = cb_item["status"].get("name", None) if "status" in cb_item else None 

260 

261 # Get item name. Sometimes items do not have one, in which case we 

262 # come up with one. 

263 if "name" in cb_item: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true

264 item_name = cb_item["name"] 

265 else: 

266 item_name = "Unnamed item %u" % cb_item["id"] 

267 

268 schema_config = get_schema_config(cb_config) 

269 

270 # Construct the appropriate object based on 'kind' 

271 common_params = _create_common_params( 

272 schema_config["namespace"], cb_item, 

273 cb_config.cb_auth_conf.root, item_name, kind) 

274 item = _create_lobster_item( 

275 schema_config["class"], 

276 common_params, item_name, status) 

277 

278 if cb_config.references: 

279 for displayed_name in cb_config.references: 

280 if cb_item.get(displayed_name): 280 ↛ 285line 280 didn't jump to line 285 because the condition on line 280 was always true

281 item_references = cb_item.get(displayed_name) if ( 

282 isinstance(cb_item.get(displayed_name), list)) \ 

283 else [cb_item.get(displayed_name)] 

284 else: 

285 item_references = [value for custom_field 

286 in cb_item["customFields"] 

287 if custom_field["name"] == displayed_name and 

288 custom_field.get("values") 

289 for value in custom_field["values"]] 

290 

291 for value in item_references: 

292 item.add_tracing_target(Tracing_Tag("req", str(value["id"]))) 

293 

294 return item 

295 

296 

297def _create_common_params(namespace: str, cb_item: dict, cb_root: str, 

298 item_name: str, kind: str): 

299 """ 

300 Creates and returns common parameters for a Codebeamer item. 

301 Args: 

302 namespace (str): Namespace for the tag. 

303 cb_item (dict): Codebeamer item dictionary. 

304 cb_root (str): Root URL or path of Codebeamer. 

305 item_name (str): Name of the item. 

306 kind (str): Type of the item. 

307 Returns: 

308 dict: Common parameters including tag, location, and kind. 

309 """ 

310 return { 

311 'tag': Tracing_Tag( 

312 namespace=namespace, 

313 tag=str(cb_item["id"]), 

314 version=cb_item["version"] 

315 ), 

316 'location': Codebeamer_Reference( 

317 cb_root=cb_root, 

318 tracker=cb_item["tracker"]["id"], 

319 item=cb_item["id"], 

320 version=cb_item["version"], 

321 name=item_name 

322 ), 

323 'kind': kind 

324 } 

325 

326 

327def _create_lobster_item(schema_class, common_params, item_name, status): 

328 """ 

329 Creates and returns a Lobster item based on the schema class. 

330 Args: 

331 schema_class: Class of the schema (Requirement, Implementation, Activity). 

332 common_params (dict): Common parameters for the item. 

333 item_name (str): Name of the item. 

334 status (str): Status of the item. 

335 Returns: 

336 Object: An instance of the schema class with the appropriate parameters. 

337 """ 

338 if schema_class is Requirement: 338 ↛ 347line 338 didn't jump to line 347 because the condition on line 338 was always true

339 return Requirement( 

340 **common_params, 

341 framework="codebeamer", 

342 text=None, 

343 status=status, 

344 name= item_name 

345 ) 

346 

347 elif schema_class is Implementation: 

348 return Implementation( 

349 **common_params, 

350 language="python", 

351 name= item_name, 

352 ) 

353 

354 else: 

355 return Activity( 

356 **common_params, 

357 framework="codebeamer", 

358 status=status 

359 ) 

360 

361 

362def import_tagged(cb_config: Config, items_to_import: Set[int]): 

363 assert isinstance(items_to_import, set) 

364 rv = [] 

365 

366 cb_items = get_many_items(cb_config, items_to_import) 

367 for cb_item in cb_items: 

368 l_item = to_lobster(cb_config, cb_item) 

369 rv.append(l_item) 

370 

371 return rv 

372 

373 

374def ensure_list(instance) -> List: 

375 if isinstance(instance, list): 375 ↛ 377line 375 didn't jump to line 377 because the condition on line 375 was always true

376 return instance 

377 return [instance] 

378 

379 

380def update_authentication_parameters( 

381 auth_conf: AuthenticationConfig, 

382 netrc_path: Optional[str] = None): 

383 if (auth_conf.token is None and 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was never true

384 (auth_conf.user is None or auth_conf.password is None)): 

385 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"), 

386 ".netrc") 

387 if os.path.isfile(netrc_file): 

388 netrc_config = netrc.netrc(netrc_file) 

389 machine = urlparse(auth_conf.root).hostname 

390 auth = netrc_config.authenticators(machine) 

391 if auth is not None: 

392 print(f"Using .netrc login for {auth_conf.root}") 

393 auth_conf.user, _, auth_conf.password = auth 

394 else: 

395 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None" 

396 raise KeyError(f"Error parsing .netrc file." 

397 f"\nExpected '{machine}', but got '{provided_machine}'.") 

398 

399 if (auth_conf.token is None and 399 ↛ 401line 399 didn't jump to line 401 because the condition on line 399 was never true

400 (auth_conf.user is None or auth_conf.password is None)): 

401 raise KeyError("Please add your token to the config file, " 

402 "or use user and pass in the config file, " 

403 "or configure credentials in the .netrc file.") 

404 

405 

406def parse_yaml_config(file_name: str) -> Config: 

407 """ 

408 Parses a YAML configuration file and returns a validated configuration dictionary. 

409 

410 Args: 

411 file_name (str): Path to the YAML configuration file. 

412 

413 Returns: 

414 Dict[str, Any]: Parsed and validated configuration. 

415 

416 Raises: 

417 ValueError: If `file_name` is not a string. 

418 FileNotFoundError: If the file does not exist. 

419 KeyError: If required fields are missing or unsupported keys are present. 

420 """ 

421 assert os.path.isfile(file_name) 

422 

423 with open(file_name, "r", encoding='utf-8') as file: 

424 return parse_config_data(yaml.safe_load(file) or {}) 

425 

426 

427def parse_config_data(data: dict) -> Config: 

428 # Validate supported keys 

429 provided_config_keys = set(data.keys()) 

430 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set() 

431 if unsupported_keys: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true

432 raise KeyError( 

433 f"Unsupported config keys: {', '.join(unsupported_keys)}. " 

434 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}." 

435 ) 

436 

437 # create config object 

438 config = Config( 

439 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])), 

440 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value), 

441 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value), 

442 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, False), 

443 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100), 

444 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"), 

445 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30), 

446 out=data.get(SupportedConfigKeys.OUT.value), 

447 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5), 

448 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []), 

449 cb_auth_conf=AuthenticationConfig( 

450 token=data.get(SupportedConfigKeys.CB_TOKEN.value), 

451 user=data.get(SupportedConfigKeys.CB_USER.value), 

452 password=data.get(SupportedConfigKeys.CB_PASS.value), 

453 root=data.get(SupportedConfigKeys.CB_ROOT.value) 

454 ), 

455 ) 

456 

457 # Ensure consistency of the configuration 

458 if (not config.import_tagged) and (not config.import_query): 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true

459 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or " 

460 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!") 

461 

462 if config.cb_auth_conf.root is None: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!") 

464 

465 if not config.cb_auth_conf.root.startswith("https://"): 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true

466 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, " 

467 f"but value is {config.cb_auth_conf.root}.") 

468 

469 return config 

470 

471 

472ap = argparse.ArgumentParser(conflict_handler='resolve') 

473 

474 

475@get_version(ap) 

476def main(): 

477 # lobster-trace: codebeamer_req.Dummy_Requirement 

478 ap.add_argument("--config", 

479 help=("Path to YAML file with arguments, " 

480 "by default (codebeamer-config.yaml) " 

481 "supported references: '%s'" % 

482 ', '.join(SupportedConfigKeys.as_set())), 

483 default=os.path.join(os.getcwd(), "codebeamer-config.yaml")) 

484 

485 ap.add_argument("--out", 

486 help=("Name of output file"), 

487 default="codebeamer.lobster") 

488 

489 options = ap.parse_args() 

490 

491 mh = Message_Handler() 

492 

493 if not os.path.isfile(options.config): 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true

494 print((f"lobster-codebeamer: Config file '{options.config}' not found.")) 

495 return 1 

496 

497 cb_config = parse_yaml_config(options.config) 

498 

499 if cb_config.out is None: 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true

500 cb_config.out = options.out 

501 

502 update_authentication_parameters(cb_config.cb_auth_conf) 

503 

504 items_to_import = set() 

505 

506 if cb_config.import_tagged: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true

507 if not os.path.isfile(cb_config.import_tagged): 

508 sys.exit(f"lobster-codebeamer: {cb_config.import_tagged} is not a file.") 

509 items = {} 

510 try: 

511 lobster_read(mh = mh, 

512 filename = cb_config.import_tagged, 

513 level = "N/A", 

514 items = items) 

515 except LOBSTER_Error: 

516 return 1 

517 for item in items.values(): 

518 for tag in item.unresolved_references: 

519 if tag.namespace != "req": 

520 continue 

521 try: 

522 item_id = int(tag.tag, 10) 

523 if item_id > 0: 

524 items_to_import.add(item_id) 

525 else: 

526 mh.warning(item.location, 

527 "invalid codebeamer reference to %i" % 

528 item_id) 

529 except ValueError: 

530 pass 

531 

532 elif cb_config.import_query is not None: 532 ↛ 547line 532 didn't jump to line 547 because the condition on line 532 was always true

533 try: 

534 if isinstance(cb_config.import_query, str): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 if (cb_config.import_query.startswith("-") and 

536 cb_config.import_query[1:].isdigit()): 

537 ap.error("import_query must be a positive integer") 

538 elif cb_config.import_query.startswith("-"): 

539 ap.error("import_query must be a valid cbQL query") 

540 elif cb_config.import_query == "": 

541 ap.error("import_query must either be a query string or a query ID") 

542 elif cb_config.import_query.isdigit(): 

543 cb_config.import_query = int(cb_config.import_query) 

544 except ValueError as e: 

545 ap.error(str(e)) 

546 

547 try: 

548 if cb_config.import_tagged: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 items = import_tagged(cb_config, items_to_import) 

550 elif cb_config.import_query: 550 ↛ 555line 550 didn't jump to line 555 because the condition on line 550 was always true

551 items = get_query(cb_config, cb_config.import_query) 

552 except LOBSTER_Error: 

553 return 1 

554 

555 schema_config = get_schema_config(cb_config) 

556 

557 if cb_config.out is None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 with sys.stdout as fd: 

559 lobster_write(fd, schema_config["class"], "lobster_codebeamer", items) 

560 else: 

561 with open(cb_config.out, "w", encoding="UTF-8") as fd: 

562 lobster_write(fd, schema_config["class"], "lobster_codebeamer", items) 

563 print(f"Written {len(items)} requirements to {cb_config.out}") 

564 

565 return 0 

566 

567 

568if __name__ == "__main__": 

569 sys.exit(main())