Coverage for lobster/tools/codebeamer/codebeamer.py: 55%

267 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 09:51 +0000

1#!/usr/bin/env python3 

2# 

3# lobster_codebeamer - Extract codebeamer items for LOBSTER 

4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) 

5# 

6# This program is free software: you can redistribute it and/or modify 

7# it under the terms of the GNU Affero General Public License as 

8# published by the Free Software Foundation, either version 3 of the 

9# License, or (at your option) any later version. 

10# 

11# This program is distributed in the hope that it will be useful, but 

12# WITHOUT ANY WARRANTY; without even the implied warranty of 

13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

14# Affero General Public License for more details. 

15# 

16# You should have received a copy of the GNU Affero General Public 

17# License along with this program. If not, see 

18# <https://www.gnu.org/licenses/>. 

19 

20# This tool is based on the codebeamer Rest API v3, as documented here: 

21# https://codebeamer.com/cb/wiki/11631738 

22# 

23# There are some assumptions encoded here that are not clearly 

24# documented, in that items have a type and the type has a name. 

25# 

26# 

27# Main limitations: 

28# * Item descriptions are ignored right now 

29# * Branches (if they exist) are ignored 

30# * We only ever fetch the HEAD item 

31# 

32# However you _can_ import all the items referenced from another 

33# lobster artefact. 

34 

35import os 

36import sys 

37import argparse 

38import netrc 

39from typing import Dict, Iterable, List, Optional, TextIO, Union 

40from urllib.parse import quote, urlparse 

41from enum import Enum 

42import requests 

43from requests.adapters import HTTPAdapter 

44import yaml 

45from urllib3.util.retry import Retry 

46 

47from lobster.items import Tracing_Tag, Requirement, Implementation, Activity 

48from lobster.location import Codebeamer_Reference 

49from lobster.errors import Message_Handler, LOBSTER_Error 

50from lobster.io import lobster_read, lobster_write 

51from lobster.meta_data_tool_base import MetaDataToolBase 

52from lobster.tools.codebeamer.bearer_auth import BearerAuth 

53from lobster.tools.codebeamer.config import AuthenticationConfig, Config 

54from lobster.tools.codebeamer.exceptions import ( 

55 MismatchException, NotFileException, QueryException, 

56) 

57 

58 

59TOOL_NAME = "lobster-codebeamer" 

60 

61 

62class SupportedConfigKeys(Enum): 

63 """Helper class to define supported configuration keys.""" 

64 NUM_REQUEST_RETRY = "num_request_retry" 

65 RETRY_ERROR_CODES = "retry_error_codes" 

66 IMPORT_TAGGED = "import_tagged" 

67 IMPORT_QUERY = "import_query" 

68 VERIFY_SSL = "verify_ssl" 

69 PAGE_SIZE = "page_size" 

70 REFS = "refs" 

71 SCHEMA = "schema" 

72 CB_TOKEN = "token" 

73 CB_ROOT = "root" 

74 CB_USER = "user" 

75 CB_PASS = "pass" 

76 TIMEOUT = "timeout" 

77 OUT = "out" 

78 

79 @classmethod 

80 def as_set(cls) -> set: 

81 return {parameter.value for parameter in cls} 

82 

83 

84def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase: 

85 if cb_auth_config.token: 

86 return BearerAuth(cb_auth_config.token) 

87 return requests.auth.HTTPBasicAuth(cb_auth_config.user, 

88 cb_auth_config.password) 

89 

90 

91def query_cb_single(cb_config: Config, url: str): 

92 if cb_config.num_request_retry <= 0: 

93 raise ValueError("Retry is disabled (num_request_retry is set to 0). " 

94 "Cannot proceed with retries.") 

95 

96 # Set up a Retry object with exponential backoff 

97 retry_strategy = Retry( 

98 total=cb_config.num_request_retry, 

99 backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, etc. 

100 status_forcelist=cb_config.retry_error_codes, 

101 allowed_methods=["GET"], 

102 raise_on_status=False, 

103 ) 

104 

105 adapter = HTTPAdapter(max_retries=retry_strategy) 

106 session = requests.Session() 

107 session.mount("https://", adapter) 

108 session.mount("http://", adapter) 

109 

110 response = session.get( 

111 url, 

112 auth=get_authentication(cb_config.cb_auth_conf), 

113 timeout=cb_config.timeout, 

114 verify=cb_config.verify_ssl, 

115 ) 

116 

117 if response.status_code == 200: 

118 return response.json() 

119 

120 # Final error handling after all retries 

121 raise QueryException(f"Could not fetch {url}.") 

122 

123 

124def get_single_item(cb_config: Config, item_id: int): 

125 if not isinstance(item_id, int) or (item_id <= 0): 

126 raise ValueError("item_id must be a positive integer") 

127 url = f"{cb_config.base}/items/{item_id}" 

128 return query_cb_single(cb_config, url) 

129 

130 

131def get_many_items(cb_config: Config, item_ids: Iterable[int]): 

132 rv = [] 

133 

134 page_id = 1 

135 query_string = quote(f"item.id IN " 

136 f"({','.join(str(item_id) for item_id in item_ids)})") 

137 

138 while True: 

139 base_url = "%s/items/query?page=%u&pageSize=%u&queryString=%s"\ 

140 % (cb_config.base, page_id, 

141 cb_config.page_size, query_string) 

142 data = query_cb_single(cb_config, base_url) 

143 rv += data["items"] 

144 if len(rv) == data["total"]: 144 ↛ 146line 144 didn't jump to line 146 because the condition on line 144 was always true

145 break 

146 page_id += 1 

147 

148 return rv 

149 

150 

151def get_query(cb_config: Config, query: Union[int, str]): 

152 if (not query) or (not isinstance(query, (int, str))): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true

153 raise ValueError( 

154 "The query must either be a real positive integer or a non-empty string!", 

155 ) 

156 

157 rv = [] 

158 url = "" 

159 page_id = 1 

160 total_items = None 

161 

162 while total_items is None or len(rv) < total_items: 

163 print("Fetching page %u of query..." % page_id) 

164 if isinstance(query, int): 

165 url = ("%s/reports/%u/items?page=%u&pageSize=%u" % 

166 (cb_config.base, 

167 query, 

168 page_id, 

169 cb_config.page_size)) 

170 elif isinstance(query, str): 170 ↛ 176line 170 didn't jump to line 176 because the condition on line 170 was always true

171 url = ("%s/items/query?page=%u&pageSize=%u&queryString=%s" % 

172 (cb_config.base, 

173 page_id, 

174 cb_config.page_size, 

175 query)) 

176 data = query_cb_single(cb_config, url) 

177 if len(data) != 4: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 raise MismatchException( 

179 f"Expected codebeamer response with 4 data entries, but instead " 

180 f"received {len(data)}!", 

181 ) 

182 

183 if page_id == 1 and len(data["items"]) == 0: 

184 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message 

185 print("This query doesn't generate items. Please check:") 

186 print(" * is the number actually correct?") 

187 print(" * do you have permissions to access it?") 

188 print(f"You can try to access '{url}' manually to check.") 

189 

190 if page_id != data["page"]: 

191 raise MismatchException( 

192 f"Page mismatch in query result: expected page " 

193 f"{page_id} from codebeamer, but got {data['page']}" 

194 ) 

195 

196 if page_id == 1: 196 ↛ 198line 196 didn't jump to line 198 because the condition on line 196 was always true

197 total_items = data["total"] 

198 elif total_items != data["total"]: 

199 raise MismatchException( 

200 f"Item count mismatch in query result: expected " 

201 f"{total_items} items so far, but page " 

202 f"{data['page']} claims to have sent {data['total']} " 

203 f"items in total." 

204 ) 

205 

206 if isinstance(query, int): 

207 rv += [to_lobster(cb_config, cb_item["item"]) 

208 for cb_item in data["items"]] 

209 elif isinstance(query, str): 209 ↛ 213line 209 didn't jump to line 213 because the condition on line 209 was always true

210 rv += [to_lobster(cb_config, cb_item) 

211 for cb_item in data["items"]] 

212 

213 page_id += 1 

214 

215 if total_items != len(rv): 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 raise MismatchException( 

217 f"Expected to receive {total_items} items in total from codebeamer, " 

218 f"but actually received {len(rv)}!", 

219 ) 

220 

221 return rv 

222 

223 

224def get_schema_config(cb_config: Config) -> dict: 

225 """ 

226 The function returns a schema map based on the schema mentioned 

227 in the cb_config dictionary. 

228 

229 If there is no match, it raises a KeyError. 

230 

231 Positional arguments: 

232 cb_config -- configuration object containing the schema. 

233 

234 Returns: 

235 A dictionary containing the namespace and class associated with the schema. 

236 

237 Raises: 

238 KeyError -- if the provided schema is not supported. 

239 """ 

240 schema_map = { 

241 'requirement': {"namespace": "req", "class": Requirement}, 

242 'implementation': {"namespace": "imp", "class": Implementation}, 

243 'activity': {"namespace": "act", "class": Activity}, 

244 } 

245 schema = cb_config.schema.lower() 

246 

247 if schema not in schema_map: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true

248 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.") 

249 

250 return schema_map[schema] 

251 

252 

253def to_lobster(cb_config: Config, cb_item: dict): 

254 if not isinstance(cb_item, dict): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 raise ValueError("'cb_item' must be of type 'dict'!") 

256 if "id" not in cb_item: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 raise KeyError("Codebeamer item does not contain ID!") 

258 

259 # This looks like it's business logic, maybe we should make this 

260 # configurable? 

261 

262 categories = cb_item.get("categories") 

263 if categories: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true

264 kind = categories[0].get("name", "codebeamer item") 

265 else: 

266 kind = "codebeamer item" 

267 

268 status = cb_item["status"].get("name", None) if "status" in cb_item else None 

269 

270 # Get item name. Sometimes items do not have one, in which case we 

271 # come up with one. 

272 if "name" in cb_item: 272 ↛ 275line 272 didn't jump to line 275 because the condition on line 272 was always true

273 item_name = cb_item["name"] 

274 else: 

275 item_name = f"Unnamed item {cb_item['id']}" 

276 

277 schema_config = get_schema_config(cb_config) 

278 

279 # Construct the appropriate object based on 'kind' 

280 common_params = _create_common_params( 

281 schema_config["namespace"], cb_item, 

282 cb_config.cb_auth_conf.root, item_name, kind) 

283 item = _create_lobster_item( 

284 schema_config["class"], 

285 common_params, item_name, status) 

286 

287 if cb_config.references: 287 ↛ 288line 287 didn't jump to line 288 because the condition on line 287 was never true

288 for displayed_name in cb_config.references: 

289 if cb_item.get(displayed_name): 

290 item_references = cb_item.get(displayed_name) if ( 

291 isinstance(cb_item.get(displayed_name), list)) \ 

292 else [cb_item.get(displayed_name)] 

293 else: 

294 item_references = [value for custom_field 

295 in cb_item["customFields"] 

296 if custom_field["name"] == displayed_name and 

297 custom_field.get("values") 

298 for value in custom_field["values"]] 

299 

300 for value in item_references: 

301 item.add_tracing_target(Tracing_Tag("req", str(value["id"]))) 

302 

303 return item 

304 

305 

306def _create_common_params(namespace: str, cb_item: dict, cb_root: str, 

307 item_name: str, kind: str): 

308 """ 

309 Creates and returns common parameters for a Codebeamer item. 

310 Args: 

311 namespace (str): Namespace for the tag. 

312 cb_item (dict): Codebeamer item dictionary. 

313 cb_root (str): Root URL or path of Codebeamer. 

314 item_name (str): Name of the item. 

315 kind (str): Type of the item. 

316 Returns: 

317 dict: Common parameters including tag, location, and kind. 

318 """ 

319 return { 

320 'tag': Tracing_Tag( 

321 namespace=namespace, 

322 tag=str(cb_item["id"]), 

323 version=cb_item["version"] 

324 ), 

325 'location': Codebeamer_Reference( 

326 cb_root=cb_root, 

327 tracker=cb_item["tracker"]["id"], 

328 item=cb_item["id"], 

329 version=cb_item["version"], 

330 name=item_name 

331 ), 

332 'kind': kind 

333 } 

334 

335 

336def _create_lobster_item(schema_class, common_params, item_name, status): 

337 """ 

338 Creates and returns a Lobster item based on the schema class. 

339 Args: 

340 schema_class: Class of the schema (Requirement, Implementation, Activity). 

341 common_params (dict): Common parameters for the item. 

342 item_name (str): Name of the item. 

343 status (str): Status of the item. 

344 Returns: 

345 Object: An instance of the schema class with the appropriate parameters. 

346 """ 

347 if schema_class is Requirement: 347 ↛ 356line 347 didn't jump to line 356 because the condition on line 347 was always true

348 return Requirement( 

349 **common_params, 

350 framework="codebeamer", 

351 text=None, 

352 status=status, 

353 name= item_name 

354 ) 

355 

356 elif schema_class is Implementation: 

357 return Implementation( 

358 **common_params, 

359 language="python", 

360 name= item_name, 

361 ) 

362 

363 else: 

364 return Activity( 

365 **common_params, 

366 framework="codebeamer", 

367 status=status 

368 ) 

369 

370 

371def import_tagged(cb_config: Config, items_to_import: Iterable[int]): 

372 rv = [] 

373 

374 cb_items = get_many_items(cb_config, items_to_import) 

375 for cb_item in cb_items: 

376 l_item = to_lobster(cb_config, cb_item) 

377 rv.append(l_item) 

378 

379 return rv 

380 

381 

382def ensure_list(instance) -> List: 

383 if isinstance(instance, list): 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true

384 return instance 

385 return [instance] 

386 

387 

388def update_authentication_parameters( 

389 auth_conf: AuthenticationConfig, 

390 netrc_path: Optional[str] = None): 

391 if (auth_conf.token is None and 

392 (auth_conf.user is None or auth_conf.password is None)): 

393 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"), 

394 ".netrc") 

395 if os.path.isfile(netrc_file): 

396 netrc_config = netrc.netrc(netrc_file) 

397 machine = urlparse(auth_conf.root).hostname 

398 auth = netrc_config.authenticators(machine) 

399 if auth is not None: 

400 print(f"Using .netrc login for {auth_conf.root}") 

401 auth_conf.user, _, auth_conf.password = auth 

402 else: 

403 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None" 

404 raise KeyError(f"Error parsing .netrc file." 

405 f"\nExpected '{machine}', but got '{provided_machine}'.") 

406 

407 if (auth_conf.token is None and 

408 (auth_conf.user is None or auth_conf.password is None)): 

409 raise KeyError("Please add your token to the config file, " 

410 "or use user and pass in the config file, " 

411 "or configure credentials in the .netrc file.") 

412 

413 

414def load_config(file_name: str) -> Config: 

415 """ 

416 Parses a YAML configuration file and returns a validated configuration object. 

417 

418 Args: 

419 file_name (str): Path to the YAML configuration file. 

420 

421 Returns: 

422 Config: validated configuration. 

423 

424 Raises: 

425 ValueError: If `file_name` is not a string. 

426 FileNotFoundError: If the file does not exist. 

427 KeyError: If required fields are missing or unsupported keys are present. 

428 """ 

429 with open(file_name, "r", encoding='utf-8') as file: 

430 return parse_config_data(yaml.safe_load(file) or {}) 

431 

432 

433def parse_config_data(data: dict) -> Config: 

434 # Validate supported keys 

435 provided_config_keys = set(data.keys()) 

436 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set() 

437 if unsupported_keys: 

438 raise KeyError( 

439 f"Unsupported config keys: {', '.join(unsupported_keys)}. " 

440 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}." 

441 ) 

442 

443 # create config object 

444 config = Config( 

445 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])), 

446 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value), 

447 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value), 

448 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, False), 

449 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100), 

450 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"), 

451 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30), 

452 out=data.get(SupportedConfigKeys.OUT.value), 

453 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5), 

454 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []), 

455 cb_auth_conf=AuthenticationConfig( 

456 token=data.get(SupportedConfigKeys.CB_TOKEN.value), 

457 user=data.get(SupportedConfigKeys.CB_USER.value), 

458 password=data.get(SupportedConfigKeys.CB_PASS.value), 

459 root=data.get(SupportedConfigKeys.CB_ROOT.value) 

460 ), 

461 ) 

462 

463 # Ensure consistency of the configuration 

464 if (not config.import_tagged) and (not config.import_query): 

465 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or " 

466 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!") 

467 

468 if config.cb_auth_conf.root is None: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!") 

470 

471 if not config.cb_auth_conf.root.startswith("https://"): 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, " 

473 f"but value is {config.cb_auth_conf.root}.") 

474 

475 return config 

476 

477 

478class CodebeamerTool(MetaDataToolBase): 

479 def __init__(self): 

480 super().__init__( 

481 name="codebeamer", 

482 description="Extract codebeamer items for LOBSTER", 

483 official=True, 

484 ) 

485 self._argument_parser.add_argument( 

486 "--config", 

487 help=(f"Path to YAML file with arguments, " 

488 f"by default (codebeamer-config.yaml) " 

489 f"supported references: '{', '.join(SupportedConfigKeys.as_set())}'"), 

490 default=os.path.join(os.getcwd(), "codebeamer-config.yaml")) 

491 

492 self._argument_parser.add_argument( 

493 "--out", 

494 help=("Name of output file"), 

495 default="codebeamer.lobster", 

496 ) 

497 

498 def _run_impl(self, options: argparse.Namespace) -> int: 

499 try: 

500 self._execute(options) 

501 return 0 

502 except NotFileException as ex: 

503 print(ex) 

504 except QueryException as query_ex: 

505 print(query_ex) 

506 print("You can either:") 

507 print("* increase the timeout with the timeout parameter") 

508 print("* decrease the query size with the query_size parameter") 

509 print("* increase the retry count with the parameters (num_request_retry, " 

510 "retry_error_codes)") 

511 except FileNotFoundError as file_ex: 

512 self._print_error(f"File '{file_ex.filename}' not found.") 

513 except IsADirectoryError as isdir_ex: 

514 self._print_error( 

515 f"Path '{isdir_ex.filename}' is a directory, but a file was expected.", 

516 ) 

517 except ValueError as value_error: 

518 self._print_error(value_error) 

519 except LOBSTER_Error as lobster_error: 

520 self._print_error(lobster_error) 

521 

522 return 1 

523 

524 @staticmethod 

525 def _print_error(error: Union[Exception, str]): 

526 print(f"{TOOL_NAME}: {error}", file=sys.stderr) 

527 

528 def _execute(self, options: argparse.Namespace) -> None: 

529 mh = Message_Handler() 

530 

531 cb_config = load_config(options.config) 

532 

533 if cb_config.out is None: 

534 cb_config.out = options.out 

535 

536 update_authentication_parameters(cb_config.cb_auth_conf) 

537 

538 items_to_import = set() 

539 

540 if cb_config.import_tagged: 

541 source_items = {} 

542 lobster_read( 

543 mh = mh, 

544 filename = cb_config.import_tagged, 

545 level = "N/A", 

546 items = source_items, 

547 ) 

548 

549 for item in source_items.values(): 

550 for tag in item.unresolved_references: 

551 if tag.namespace != "req": 

552 continue 

553 try: 

554 item_id = int(tag.tag, 10) 

555 if item_id > 0: 

556 items_to_import.add(item_id) 

557 else: 

558 mh.warning(item.location, 

559 f"invalid codebeamer reference to {item_id}") 

560 except ValueError: 

561 mh.warning( 

562 item.location, 

563 f"cannot convert reference '{tag.tag}' to integer " 

564 f"Codebeamer ID", 

565 ) 

566 

567 items = import_tagged(cb_config, items_to_import) 

568 

569 elif cb_config.import_query is not None: 

570 try: 

571 if isinstance(cb_config.import_query, str): 

572 if (cb_config.import_query.startswith("-") and 

573 cb_config.import_query[1:].isdigit()): 

574 self._argument_parser.error( 

575 "import_query must be a positive integer") 

576 elif cb_config.import_query.startswith("-"): 

577 self._argument_parser.error( 

578 "import_query must be a valid cbQL query") 

579 elif cb_config.import_query == "": 

580 self._argument_parser.error( 

581 "import_query must either be a query string or a query ID") 

582 elif cb_config.import_query.isdigit(): 

583 cb_config.import_query = int(cb_config.import_query) 

584 except ValueError as e: 

585 self._argument_parser.error(str(e)) 

586 

587 items = get_query(cb_config, cb_config.import_query) 

588 else: 

589 raise ValueError( 

590 f"Unclear what to do, because neither " 

591 f"'{SupportedConfigKeys.IMPORT_QUERY.value}' nor " 

592 f"'{SupportedConfigKeys.IMPORT_TAGGED.value}' is specified!", 

593 ) 

594 

595 with _get_out_stream(cb_config.out) as out_stream: 

596 _cb_items_to_lobster(items, cb_config, out_stream) 

597 if cb_config.out: 

598 print(f"Written {len(items)} requirements to {cb_config.out}") 

599 

600 

601def _get_out_stream(config_out: Optional[str]) -> TextIO: 

602 if config_out: 

603 return open(config_out, "w", encoding="UTF-8") 

604 return sys.stdout 

605 

606 

607def _cb_items_to_lobster(items: List[Dict], config: Config, out_file: TextIO) -> None: 

608 schema_config = get_schema_config(config) 

609 lobster_write(out_file, schema_config["class"], TOOL_NAME.replace("-", "_"), items) 

610 

611 

612def cb_query_to_lobster_file(config: Config, out_file: str) -> None: 

613 """Loads items from codebeamer and serializes them in the LOBSTER interchange 

614 format to the given file. 

615 """ 

616 # This is an API function. 

617 items = get_query(config, config.import_query) 

618 with open(out_file, "w", encoding="UTF-8") as fd: 

619 _cb_items_to_lobster(items, config, fd) 

620 

621 

622def main() -> int: 

623 return CodebeamerTool().run()