Coverage for lobster/tools/codebeamer/codebeamer.py: 52%
293 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-12 15:02 +0000
1#!/usr/bin/env python3
2#
3# lobster_codebeamer - Extract codebeamer items for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20# This tool is based on the codebeamer Rest API v3, as documented here:
21# https://codebeamer.com/cb/wiki/11631738
22#
23# There are some assumptions encoded here that are not clearly
24# documented, in that items have a type and the type has a name.
25#
26#
27# Main limitations:
28# * Item descriptions are ignored right now
29# * Branches (if they exist) are ignored
30# * We only ever fetch the HEAD item
31#
32# However you _can_ import all the items referenced from another
33# lobster artefact.
35import os
36import sys
37import argparse
38import netrc
39from typing import Dict, Iterable, List, Optional, Sequence, TextIO, Union
40from urllib.parse import quote, urlparse
41from enum import Enum
42from http import HTTPStatus
44import requests
45from requests.adapters import HTTPAdapter
46from requests.exceptions import (
47 Timeout,
48 ConnectionError as RequestsConnectionError,
49 RequestException,
50)
51import yaml
52from urllib3.util.retry import Retry
54from lobster.common.items import Tracing_Tag, Requirement, Implementation, Activity
55from lobster.common.location import Codebeamer_Reference
56from lobster.common.errors import Message_Handler, LOBSTER_Error
57from lobster.common.io import lobster_read, lobster_write, ensure_output_directory
58from lobster.common.meta_data_tool_base import MetaDataToolBase
59from lobster.tools.codebeamer.bearer_auth import BearerAuth
60from lobster.tools.codebeamer.config import AuthenticationConfig, Config
61from lobster.tools.codebeamer.exceptions import (
62 MismatchException, NotFileException, QueryException,
63)
66TOOL_NAME = "lobster-codebeamer"
69class SupportedConfigKeys(Enum):
70 """Helper class to define supported configuration keys."""
71 NUM_REQUEST_RETRY = "num_request_retry"
72 RETRY_ERROR_CODES = "retry_error_codes"
73 IMPORT_TAGGED = "import_tagged"
74 IMPORT_QUERY = "import_query"
75 VERIFY_SSL = "verify_ssl"
76 PAGE_SIZE = "page_size"
77 REFS = "refs"
78 SCHEMA = "schema"
79 CB_TOKEN = "token"
80 CB_ROOT = "root"
81 CB_USER = "user"
82 CB_PASS = "pass"
83 TIMEOUT = "timeout"
84 OUT = "out"
86 @classmethod
87 def as_set(cls) -> set:
88 return {parameter.value for parameter in cls}
91def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase:
92 if cb_auth_config.token:
93 return BearerAuth(cb_auth_config.token)
94 return requests.auth.HTTPBasicAuth(cb_auth_config.user,
95 cb_auth_config.password)
98def _get_response_message(response: requests.Response) -> str:
99 try:
100 data = response.json()
101 if isinstance(data, dict) and "message" in data:
102 return data["message"]
103 except ValueError:
104 pass
106 return response.text.strip() or "Unknown error"
109def _get_http_reason(response: requests.Response) -> str:
110 if response.reason:
111 return response.reason
112 try:
113 return HTTPStatus(response.status_code).phrase
114 except ValueError:
115 return "Unknown Status"
118def query_cb_single(cb_config: Config, url: str):
119 if cb_config.num_request_retry <= 0:
120 raise ValueError("Retry is disabled (num_request_retry is set to 0). "
121 "Cannot proceed with retries.")
123 # Set up a Retry object with exponential backoff
124 retry_strategy = Retry(
125 total=cb_config.num_request_retry,
126 backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, etc.
127 status_forcelist=cb_config.retry_error_codes,
128 allowed_methods=["GET"],
129 raise_on_status=False,
130 )
132 adapter = HTTPAdapter(max_retries=retry_strategy)
133 session = requests.Session()
134 session.mount("https://", adapter)
135 session.mount("http://", adapter)
137 try:
138 response = session.get(
139 url,
140 auth=get_authentication(cb_config.cb_auth_conf),
141 timeout=cb_config.timeout,
142 verify=cb_config.verify_ssl,
143 )
144 except Timeout as ex:
145 raise QueryException(
146 "Connection timed out while contacting Codebeamer\n"
147 f"URL: {url}\n"
148 f"Reason: {ex}\n"
149 "\nPossible actions:\n"
150 "• Increase the timeout using the 'timeout' parameter"
151 ) from ex
153 except RequestsConnectionError as ex:
154 raise QueryException(
155 "Unable to connect to Codebeamer\n"
156 f"URL: {url}\n"
157 f"Reason: {ex}\n"
158 "\nPossible actions:\n"
159 "• Check internet connection\n"
160 "• Increase retries using 'num_request_retry'\n"
161 "• Check SSL certificates or disable verification by setting "
162 f"'{SupportedConfigKeys.VERIFY_SSL.value}' to false"
163 ) from ex
165 except RequestException as ex:
166 raise QueryException(
167 "Unexpected network error while connecting to Codebeamer\n"
168 f"URL: {url}\n"
169 f"Reason: {ex}"
170 "\nPossible actions:\n"
171 "• Check network stability\n"
172 ) from ex
174 if response.status_code == 200:
175 return response.json()
177 error_message = _get_response_message(response)
178 reason = _get_http_reason(response)
180 raise QueryException(
181 "Codebeamer request failed:\n"
182 f" URL: {url}\n"
183 f" HTTP Status: {response.status_code} ({reason})\n"
184 f"Reason: {error_message}"
185 )
188def get_single_item(cb_config: Config, item_id: int):
189 if not isinstance(item_id, int) or (item_id <= 0):
190 raise ValueError("item_id must be a positive integer")
191 url = f"{cb_config.base}/items/{item_id}"
192 return query_cb_single(cb_config, url)
195def get_many_items(cb_config: Config, item_ids: Iterable[int]):
196 rv = []
198 page_id = 1
199 query_string = quote(f"item.id IN "
200 f"({','.join(str(item_id) for item_id in item_ids)})")
202 while True:
203 base_url = (f"{cb_config.base}/items/query?page={page_id}"
204 f"&pageSize={cb_config.page_size}"
205 f"&queryString={query_string}")
206 data = query_cb_single(cb_config, base_url)
207 rv += data["items"]
208 if len(rv) == data["total"]: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true
209 break
210 page_id += 1
212 return rv
215def get_query(cb_config: Config, query: Union[int, str]):
216 if (not query) or (not isinstance(query, (int, str))): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 raise ValueError(
218 "The query must either be a real positive integer or a non-empty string!",
219 )
221 rv = []
222 url = ""
223 page_id = 1
224 total_items = None
226 while total_items is None or len(rv) < total_items:
227 print(f"Fetching page {page_id} of query...")
228 if isinstance(query, int):
229 url = (f"{cb_config.base}/reports/{query}/items"
230 f"?page={page_id}&pageSize={cb_config.page_size}")
231 elif isinstance(query, str): 231 ↛ 234line 231 didn't jump to line 234 because the condition on line 231 was always true
232 url = (f"{cb_config.base}/items/query?page={page_id}"
233 f"&pageSize={cb_config.page_size}&queryString={query}")
234 data = query_cb_single(cb_config, url)
235 if len(data) != 4: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 raise MismatchException(
237 f"Expected codebeamer response with 4 data entries, but instead "
238 f"received {len(data)}!",
239 )
241 if page_id == 1 and len(data["items"]) == 0:
242 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message
243 print("This query doesn't generate items. Please check:")
244 print(" * is the number actually correct?")
245 print(" * do you have permissions to access it?")
246 print(f"You can try to access '{url}' manually to check.")
248 if page_id != data["page"]:
249 raise MismatchException(
250 f"Page mismatch in query result: expected page "
251 f"{page_id} from codebeamer, but got {data['page']}"
252 )
254 if page_id == 1: 254 ↛ 256line 254 didn't jump to line 256 because the condition on line 254 was always true
255 total_items = data["total"]
256 elif total_items != data["total"]:
257 raise MismatchException(
258 f"Item count mismatch in query result: expected "
259 f"{total_items} items so far, but page "
260 f"{data['page']} claims to have sent {data['total']} "
261 f"items in total."
262 )
264 if isinstance(query, int):
265 rv += [to_lobster(cb_config, cb_item["item"])
266 for cb_item in data["items"]]
267 elif isinstance(query, str): 267 ↛ 271line 267 didn't jump to line 271 because the condition on line 267 was always true
268 rv += [to_lobster(cb_config, cb_item)
269 for cb_item in data["items"]]
271 page_id += 1
273 if total_items != len(rv): 273 ↛ 274line 273 didn't jump to line 274 because the condition on line 273 was never true
274 raise MismatchException(
275 f"Expected to receive {total_items} items in total from codebeamer, "
276 f"but actually received {len(rv)}!",
277 )
279 return rv
282def get_schema_config(cb_config: Config) -> dict:
283 """
284 The function returns a schema map based on the schema mentioned
285 in the cb_config dictionary.
287 If there is no match, it raises a KeyError.
289 Positional arguments:
290 cb_config -- configuration object containing the schema.
292 Returns:
293 A dictionary containing the namespace and class associated with the schema.
295 Raises:
296 KeyError -- if the provided schema is not supported.
297 """
298 schema_map = {
299 'requirement': {"namespace": "req", "class": Requirement},
300 'implementation': {"namespace": "imp", "class": Implementation},
301 'activity': {"namespace": "act", "class": Activity},
302 }
303 schema = cb_config.schema.lower()
305 if schema not in schema_map: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.")
308 return schema_map[schema]
311def to_lobster(cb_config: Config, cb_item: dict):
312 if not isinstance(cb_item, dict): 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 raise ValueError("'cb_item' must be of type 'dict'!")
314 if "id" not in cb_item: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 raise KeyError("Codebeamer item does not contain ID!")
317 # This looks like it's business logic, maybe we should make this
318 # configurable?
320 categories = cb_item.get("categories")
321 if categories: 321 ↛ 324line 321 didn't jump to line 324 because the condition on line 321 was always true
322 kind = categories[0].get("name", "codebeamer item")
323 else:
324 kind = "codebeamer item"
326 status = cb_item["status"].get("name", None) if "status" in cb_item else None
328 # Get item name. Sometimes items do not have one, in which case we
329 # come up with one.
330 if "name" in cb_item: 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 item_name = cb_item["name"]
332 else:
333 item_name = f"Unnamed item {cb_item['id']}"
335 schema_config = get_schema_config(cb_config)
337 # Construct the appropriate object based on 'kind'
338 common_params = _create_common_params(
339 schema_config["namespace"], cb_item,
340 cb_config.cb_auth_conf.root, item_name, kind)
341 item = _create_lobster_item(
342 schema_config["class"],
343 common_params, item_name, status)
345 if cb_config.references: 345 ↛ 346line 345 didn't jump to line 346 because the condition on line 345 was never true
346 for displayed_name in cb_config.references:
347 if cb_item.get(displayed_name):
348 item_references = cb_item.get(displayed_name) if (
349 isinstance(cb_item.get(displayed_name), list)) \
350 else [cb_item.get(displayed_name)]
351 else:
352 item_references = [value for custom_field
353 in cb_item["customFields"]
354 if custom_field["name"] == displayed_name and
355 custom_field.get("values")
356 for value in custom_field["values"]]
358 for value in item_references:
359 item.add_tracing_target(Tracing_Tag("req", str(value["id"])))
361 return item
364def _create_common_params(namespace: str, cb_item: dict, cb_root: str,
365 item_name: str, kind: str):
366 """
367 Creates and returns common parameters for a Codebeamer item.
368 Args:
369 namespace (str): Namespace for the tag.
370 cb_item (dict): Codebeamer item dictionary.
371 cb_root (str): Root URL or path of Codebeamer.
372 item_name (str): Name of the item.
373 kind (str): Type of the item.
374 Returns:
375 dict: Common parameters including tag, location, and kind.
376 """
377 return {
378 'tag': Tracing_Tag(
379 namespace=namespace,
380 tag=str(cb_item["id"]),
381 version=cb_item["version"]
382 ),
383 'location': Codebeamer_Reference(
384 cb_root=cb_root,
385 tracker=cb_item["tracker"]["id"],
386 item=cb_item["id"],
387 version=cb_item["version"],
388 name=item_name
389 ),
390 'kind': kind
391 }
394def _create_lobster_item(schema_class, common_params, item_name, status):
395 """
396 Creates and returns a Lobster item based on the schema class.
397 Args:
398 schema_class: Class of the schema (Requirement, Implementation, Activity).
399 common_params (dict): Common parameters for the item.
400 item_name (str): Name of the item.
401 status (str): Status of the item.
402 Returns:
403 Object: An instance of the schema class with the appropriate parameters.
404 """
405 if schema_class is Requirement: 405 ↛ 414line 405 didn't jump to line 414 because the condition on line 405 was always true
406 return Requirement(
407 **common_params,
408 framework="codebeamer",
409 text=None,
410 status=status,
411 name= item_name
412 )
414 if schema_class is Implementation:
415 return Implementation(
416 **common_params,
417 language="python",
418 name= item_name,
419 )
421 if schema_class is Activity:
422 return Activity(
423 **common_params,
424 framework="codebeamer",
425 status=status
426 )
428 raise KeyError(f"Unsupported schema class '{schema_class}'!")
431def import_tagged(cb_config: Config, items_to_import: Iterable[int]):
432 rv = []
434 cb_items = get_many_items(cb_config, items_to_import)
435 for cb_item in cb_items:
436 l_item = to_lobster(cb_config, cb_item)
437 rv.append(l_item)
439 return rv
442def ensure_list(instance) -> List:
443 if isinstance(instance, list): 443 ↛ 445line 443 didn't jump to line 445 because the condition on line 443 was always true
444 return instance
445 return [instance]
448def update_authentication_parameters(
449 auth_conf: AuthenticationConfig,
450 netrc_path: Optional[str] = None):
451 if (auth_conf.token is None and
452 (auth_conf.user is None or auth_conf.password is None)):
453 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"),
454 ".netrc")
455 if os.path.isfile(netrc_file):
456 netrc_config = netrc.netrc(netrc_file)
457 machine = urlparse(auth_conf.root).hostname
458 auth = netrc_config.authenticators(machine)
459 if auth is not None:
460 print(f"Using .netrc login for {auth_conf.root}")
461 auth_conf.user, _, auth_conf.password = auth
462 else:
463 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None"
464 raise KeyError(f"Error parsing .netrc file."
465 f"\nExpected '{machine}', but got '{provided_machine}'.")
467 if (auth_conf.token is None and
468 (auth_conf.user is None or auth_conf.password is None)):
469 raise KeyError("Please add your token to the config file, "
470 "or use user and pass in the config file, "
471 "or configure credentials in the .netrc file.")
474def load_config(file_name: str) -> Config:
475 """
476 Parses a YAML configuration file and returns a validated configuration object.
478 Args:
479 file_name (str): Path to the YAML configuration file.
481 Returns:
482 Config: validated configuration.
484 Raises:
485 ValueError: If `file_name` is not a string.
486 FileNotFoundError: If the file does not exist.
487 KeyError: If required fields are missing or unsupported keys are present.
488 """
489 with open(file_name, encoding='utf-8') as file:
490 return parse_config_data(yaml.safe_load(file) or {})
493def parse_config_data(data: dict) -> Config:
494 # Validate supported keys
495 provided_config_keys = set(data.keys())
496 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set()
497 if unsupported_keys:
498 raise KeyError(
499 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
500 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}."
501 )
503 # create config object
504 config = Config(
505 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])),
506 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value),
507 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value),
508 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, True),
509 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100),
510 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"),
511 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30),
512 out=data.get(SupportedConfigKeys.OUT.value),
513 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5),
514 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []),
515 cb_auth_conf=AuthenticationConfig(
516 token=data.get(SupportedConfigKeys.CB_TOKEN.value),
517 user=data.get(SupportedConfigKeys.CB_USER.value),
518 password=data.get(SupportedConfigKeys.CB_PASS.value),
519 root=data.get(SupportedConfigKeys.CB_ROOT.value)
520 ),
521 )
523 # Ensure consistency of the configuration
524 if (not config.import_tagged) and (not config.import_query):
525 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or "
526 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!")
528 if config.cb_auth_conf.root is None: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!")
531 if not config.cb_auth_conf.root.startswith("https://"): 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, "
533 f"but value is {config.cb_auth_conf.root}.")
535 return config
538class CodebeamerTool(MetaDataToolBase):
539 def __init__(self):
540 super().__init__(
541 name="codebeamer",
542 description="Extract codebeamer items for LOBSTER",
543 official=True,
544 )
545 self._argument_parser.add_argument(
546 "--config",
547 help=(f"Path to YAML file with arguments, "
548 f"by default (codebeamer-config.yaml) "
549 f"supported references: '{', '.join(SupportedConfigKeys.as_set())}'"),
550 default=os.path.join(os.getcwd(), "codebeamer-config.yaml"))
552 self._argument_parser.add_argument(
553 "--out",
554 help=("Name of output file"),
555 default="codebeamer.lobster",
556 )
558 def _run_impl(self, options: argparse.Namespace) -> int:
559 try:
560 self._execute(options)
561 return 0
562 except NotFileException as ex:
563 print(ex)
564 except QueryException as query_ex:
565 print(query_ex)
566 except FileNotFoundError as file_ex:
567 self._print_error(f"File '{file_ex.filename}' not found.")
568 except IsADirectoryError as isdir_ex:
569 self._print_error(
570 f"Path '{isdir_ex.filename}' is a directory, but a file was expected.",
571 )
572 except ValueError as value_error:
573 self._print_error(value_error)
574 except LOBSTER_Error as lobster_error:
575 self._print_error(lobster_error)
577 return 1
579 @staticmethod
580 def _print_error(error: Union[Exception, str]):
581 print(f"{TOOL_NAME}: {error}", file=sys.stderr)
583 def _execute(self, options: argparse.Namespace) -> None:
584 mh = Message_Handler()
586 cb_config = load_config(options.config)
588 if cb_config.out is None:
589 cb_config.out = options.out
591 update_authentication_parameters(cb_config.cb_auth_conf)
593 items_to_import = set()
595 if cb_config.import_tagged:
596 source_items = {}
597 lobster_read(
598 mh = mh,
599 filename = cb_config.import_tagged,
600 level = "N/A",
601 items = source_items,
602 )
604 for item in source_items.values():
605 for tag in item.unresolved_references:
606 if tag.namespace != "req":
607 continue
608 try:
609 item_id = int(tag.tag, 10)
610 if item_id > 0:
611 items_to_import.add(item_id)
612 else:
613 mh.warning(item.location,
614 f"invalid codebeamer reference to {item_id}")
615 except ValueError:
616 mh.warning(
617 item.location,
618 f"cannot convert reference '{tag.tag}' to integer "
619 f"Codebeamer ID",
620 )
622 items = import_tagged(cb_config, items_to_import)
624 elif cb_config.import_query is not None:
625 try:
626 if isinstance(cb_config.import_query, str):
627 if (cb_config.import_query.startswith("-") and
628 cb_config.import_query[1:].isdigit()):
629 self._argument_parser.error(
630 "import_query must be a positive integer")
631 elif cb_config.import_query.startswith("-"):
632 self._argument_parser.error(
633 "import_query must be a valid cbQL query")
634 elif cb_config.import_query == "":
635 self._argument_parser.error(
636 "import_query must either be a query string or a query ID")
637 elif cb_config.import_query.isdigit():
638 cb_config.import_query = int(cb_config.import_query)
639 except ValueError as e:
640 self._argument_parser.error(str(e))
642 items = get_query(cb_config, cb_config.import_query)
643 else:
644 raise ValueError(
645 f"Unclear what to do, because neither "
646 f"'{SupportedConfigKeys.IMPORT_QUERY.value}' nor "
647 f"'{SupportedConfigKeys.IMPORT_TAGGED.value}' is specified!",
648 )
650 with _get_out_stream(cb_config.out) as out_stream:
651 _cb_items_to_lobster(items, cb_config, out_stream)
652 if cb_config.out:
653 print(f"Written {len(items)} requirements to {cb_config.out}")
656def _get_out_stream(config_out: Optional[str]) -> TextIO:
657 if config_out:
658 ensure_output_directory(config_out)
659 return open(config_out, "w", encoding="UTF-8")
660 return sys.stdout
663def _cb_items_to_lobster(items: List[Dict], config: Config, out_file: TextIO) -> None:
664 schema_config = get_schema_config(config)
665 lobster_write(out_file, schema_config["class"], TOOL_NAME.replace("-", "_"), items)
668def lobster_codebeamer(config: Config, out_file: str) -> None:
669 """Loads items from codebeamer and serializes them in the LOBSTER interchange
670 format to the given file.
671 """
672 # This is an API function.
673 items = get_query(config, config.import_query)
674 ensure_output_directory(out_file)
675 with open(out_file, "w", encoding="UTF-8") as fd:
676 _cb_items_to_lobster(items, config, fd)
679def main(args: Optional[Sequence[str]] = None) -> int:
680 return CodebeamerTool().run(args)