Coverage for lobster/tools/codebeamer/codebeamer.py: 52%
291 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-10 16:49 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-10 16:49 +0000
1#!/usr/bin/env python3
2#
3# lobster_codebeamer - Extract codebeamer items for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20# This tool is based on the codebeamer Rest API v3, as documented here:
21# https://codebeamer.com/cb/wiki/11631738
22#
23# There are some assumptions encoded here that are not clearly
24# documented, in that items have a type and the type has a name.
25#
26#
27# Main limitations:
28# * Item descriptions are ignored right now
29# * Branches (if they exist) are ignored
30# * We only ever fetch the HEAD item
31#
32# However you _can_ import all the items referenced from another
33# lobster artefact.
35import os
36import sys
37import argparse
38import netrc
39from typing import Dict, Iterable, List, Optional, Sequence, TextIO, Union
40from urllib.parse import quote, urlparse
41from enum import Enum
42from http import HTTPStatus
44import requests
45from requests.adapters import HTTPAdapter
46from requests.exceptions import (
47 Timeout,
48 ConnectionError as RequestsConnectionError,
49 RequestException,
50)
51import yaml
52from urllib3.util.retry import Retry
54from lobster.common.items import Tracing_Tag, Requirement, Implementation, Activity
55from lobster.common.location import Codebeamer_Reference
56from lobster.common.errors import Message_Handler, LOBSTER_Error
57from lobster.common.io import lobster_read, lobster_write, ensure_output_directory
58from lobster.common.meta_data_tool_base import MetaDataToolBase
59from lobster.tools.codebeamer.bearer_auth import BearerAuth
60from lobster.tools.codebeamer.config import AuthenticationConfig, Config
61from lobster.tools.codebeamer.exceptions import (
62 MismatchException, NotFileException, QueryException,
63)
66TOOL_NAME = "lobster-codebeamer"
69class SupportedConfigKeys(Enum):
70 """Helper class to define supported configuration keys."""
71 NUM_REQUEST_RETRY = "num_request_retry"
72 RETRY_ERROR_CODES = "retry_error_codes"
73 IMPORT_TAGGED = "import_tagged"
74 IMPORT_QUERY = "import_query"
75 VERIFY_SSL = "verify_ssl"
76 PAGE_SIZE = "page_size"
77 REFS = "refs"
78 SCHEMA = "schema"
79 CB_TOKEN = "token"
80 CB_ROOT = "root"
81 CB_USER = "user"
82 CB_PASS = "pass"
83 TIMEOUT = "timeout"
84 OUT = "out"
86 @classmethod
87 def as_set(cls) -> set:
88 return {parameter.value for parameter in cls}
91def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase:
92 if cb_auth_config.token:
93 return BearerAuth(cb_auth_config.token)
94 return requests.auth.HTTPBasicAuth(cb_auth_config.user,
95 cb_auth_config.password)
98def _get_response_message(response: requests.Response) -> str:
99 try:
100 data = response.json()
101 if isinstance(data, dict) and "message" in data:
102 return data["message"]
103 except ValueError:
104 pass
106 return response.text.strip() or "Unknown error"
109def _get_http_reason(response: requests.Response) -> str:
110 if response.reason:
111 return response.reason
112 try:
113 return HTTPStatus(response.status_code).phrase
114 except ValueError:
115 return "Unknown Status"
118def query_cb_single(cb_config: Config, url: str):
119 if cb_config.num_request_retry <= 0:
120 raise ValueError("Retry is disabled (num_request_retry is set to 0). "
121 "Cannot proceed with retries.")
123 # Set up a Retry object with exponential backoff
124 retry_strategy = Retry(
125 total=cb_config.num_request_retry,
126 backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, etc.
127 status_forcelist=cb_config.retry_error_codes,
128 allowed_methods=["GET"],
129 raise_on_status=False,
130 )
132 adapter = HTTPAdapter(max_retries=retry_strategy)
133 session = requests.Session()
134 session.mount("https://", adapter)
135 session.mount("http://", adapter)
137 try:
138 response = session.get(
139 url,
140 auth=get_authentication(cb_config.cb_auth_conf),
141 timeout=cb_config.timeout,
142 verify=cb_config.verify_ssl,
143 )
144 except Timeout as ex:
145 raise QueryException(
146 "Connection timed out while contacting Codebeamer\n"
147 f"URL: {url}\n"
148 f"Reason: {ex}\n"
149 "\nPossible actions:\n"
150 "• Increase the timeout using the 'timeout' parameter"
151 ) from ex
153 except RequestsConnectionError as ex:
154 raise QueryException(
155 "Unable to connect to Codebeamer\n"
156 f"URL: {url}\n"
157 f"Reason: {ex}\n"
158 "\nPossible actions:\n"
159 "• Check internet connection\n"
160 "• Increase retries using 'num_request_retry'\n"
161 "• Check SSL certificates or disable verification by setting "
162 f"'{SupportedConfigKeys.VERIFY_SSL.value}' to false"
163 ) from ex
165 except RequestException as ex:
166 raise QueryException(
167 "Unexpected network error while connecting to Codebeamer\n"
168 f"URL: {url}\n"
169 f"Reason: {ex}"
170 "\nPossible actions:\n"
171 "• Check network stability\n"
172 ) from ex
174 if response.status_code == 200:
175 return response.json()
177 error_message = _get_response_message(response)
178 reason = _get_http_reason(response)
180 raise QueryException(
181 "Codebeamer request failed:\n"
182 f" URL: {url}\n"
183 f" HTTP Status: {response.status_code} ({reason})\n"
184 f"Reason: {error_message}"
185 )
188def get_single_item(cb_config: Config, item_id: int):
189 if not isinstance(item_id, int) or (item_id <= 0):
190 raise ValueError("item_id must be a positive integer")
191 url = f"{cb_config.base}/items/{item_id}"
192 return query_cb_single(cb_config, url)
195def get_many_items(cb_config: Config, item_ids: Iterable[int]):
196 rv = []
198 page_id = 1
199 query_string = quote(f"item.id IN "
200 f"({','.join(str(item_id) for item_id in item_ids)})")
202 while True:
203 base_url = "%s/items/query?page=%u&pageSize=%u&queryString=%s"\
204 % (cb_config.base, page_id,
205 cb_config.page_size, query_string)
206 data = query_cb_single(cb_config, base_url)
207 rv += data["items"]
208 if len(rv) == data["total"]: 208 ↛ 210line 208 didn't jump to line 210 because the condition on line 208 was always true
209 break
210 page_id += 1
212 return rv
215def get_query(cb_config: Config, query: Union[int, str]):
216 if (not query) or (not isinstance(query, (int, str))): 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 raise ValueError(
218 "The query must either be a real positive integer or a non-empty string!",
219 )
221 rv = []
222 url = ""
223 page_id = 1
224 total_items = None
226 while total_items is None or len(rv) < total_items:
227 print("Fetching page %u of query..." % page_id)
228 if isinstance(query, int):
229 url = ("%s/reports/%u/items?page=%u&pageSize=%u" %
230 (cb_config.base,
231 query,
232 page_id,
233 cb_config.page_size))
234 elif isinstance(query, str): 234 ↛ 240line 234 didn't jump to line 240 because the condition on line 234 was always true
235 url = ("%s/items/query?page=%u&pageSize=%u&queryString=%s" %
236 (cb_config.base,
237 page_id,
238 cb_config.page_size,
239 query))
240 data = query_cb_single(cb_config, url)
241 if len(data) != 4: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 raise MismatchException(
243 f"Expected codebeamer response with 4 data entries, but instead "
244 f"received {len(data)}!",
245 )
247 if page_id == 1 and len(data["items"]) == 0:
248 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message
249 print("This query doesn't generate items. Please check:")
250 print(" * is the number actually correct?")
251 print(" * do you have permissions to access it?")
252 print(f"You can try to access '{url}' manually to check.")
254 if page_id != data["page"]:
255 raise MismatchException(
256 f"Page mismatch in query result: expected page "
257 f"{page_id} from codebeamer, but got {data['page']}"
258 )
260 if page_id == 1: 260 ↛ 262line 260 didn't jump to line 262 because the condition on line 260 was always true
261 total_items = data["total"]
262 elif total_items != data["total"]:
263 raise MismatchException(
264 f"Item count mismatch in query result: expected "
265 f"{total_items} items so far, but page "
266 f"{data['page']} claims to have sent {data['total']} "
267 f"items in total."
268 )
270 if isinstance(query, int):
271 rv += [to_lobster(cb_config, cb_item["item"])
272 for cb_item in data["items"]]
273 elif isinstance(query, str): 273 ↛ 277line 273 didn't jump to line 277 because the condition on line 273 was always true
274 rv += [to_lobster(cb_config, cb_item)
275 for cb_item in data["items"]]
277 page_id += 1
279 if total_items != len(rv): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 raise MismatchException(
281 f"Expected to receive {total_items} items in total from codebeamer, "
282 f"but actually received {len(rv)}!",
283 )
285 return rv
288def get_schema_config(cb_config: Config) -> dict:
289 """
290 The function returns a schema map based on the schema mentioned
291 in the cb_config dictionary.
293 If there is no match, it raises a KeyError.
295 Positional arguments:
296 cb_config -- configuration object containing the schema.
298 Returns:
299 A dictionary containing the namespace and class associated with the schema.
301 Raises:
302 KeyError -- if the provided schema is not supported.
303 """
304 schema_map = {
305 'requirement': {"namespace": "req", "class": Requirement},
306 'implementation': {"namespace": "imp", "class": Implementation},
307 'activity': {"namespace": "act", "class": Activity},
308 }
309 schema = cb_config.schema.lower()
311 if schema not in schema_map: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.")
314 return schema_map[schema]
317def to_lobster(cb_config: Config, cb_item: dict):
318 if not isinstance(cb_item, dict): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 raise ValueError("'cb_item' must be of type 'dict'!")
320 if "id" not in cb_item: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 raise KeyError("Codebeamer item does not contain ID!")
323 # This looks like it's business logic, maybe we should make this
324 # configurable?
326 categories = cb_item.get("categories")
327 if categories: 327 ↛ 330line 327 didn't jump to line 330 because the condition on line 327 was always true
328 kind = categories[0].get("name", "codebeamer item")
329 else:
330 kind = "codebeamer item"
332 status = cb_item["status"].get("name", None) if "status" in cb_item else None
334 # Get item name. Sometimes items do not have one, in which case we
335 # come up with one.
336 if "name" in cb_item: 336 ↛ 339line 336 didn't jump to line 339 because the condition on line 336 was always true
337 item_name = cb_item["name"]
338 else:
339 item_name = f"Unnamed item {cb_item['id']}"
341 schema_config = get_schema_config(cb_config)
343 # Construct the appropriate object based on 'kind'
344 common_params = _create_common_params(
345 schema_config["namespace"], cb_item,
346 cb_config.cb_auth_conf.root, item_name, kind)
347 item = _create_lobster_item(
348 schema_config["class"],
349 common_params, item_name, status)
351 if cb_config.references: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 for displayed_name in cb_config.references:
353 if cb_item.get(displayed_name):
354 item_references = cb_item.get(displayed_name) if (
355 isinstance(cb_item.get(displayed_name), list)) \
356 else [cb_item.get(displayed_name)]
357 else:
358 item_references = [value for custom_field
359 in cb_item["customFields"]
360 if custom_field["name"] == displayed_name and
361 custom_field.get("values")
362 for value in custom_field["values"]]
364 for value in item_references:
365 item.add_tracing_target(Tracing_Tag("req", str(value["id"])))
367 return item
370def _create_common_params(namespace: str, cb_item: dict, cb_root: str,
371 item_name: str, kind: str):
372 """
373 Creates and returns common parameters for a Codebeamer item.
374 Args:
375 namespace (str): Namespace for the tag.
376 cb_item (dict): Codebeamer item dictionary.
377 cb_root (str): Root URL or path of Codebeamer.
378 item_name (str): Name of the item.
379 kind (str): Type of the item.
380 Returns:
381 dict: Common parameters including tag, location, and kind.
382 """
383 return {
384 'tag': Tracing_Tag(
385 namespace=namespace,
386 tag=str(cb_item["id"]),
387 version=cb_item["version"]
388 ),
389 'location': Codebeamer_Reference(
390 cb_root=cb_root,
391 tracker=cb_item["tracker"]["id"],
392 item=cb_item["id"],
393 version=cb_item["version"],
394 name=item_name
395 ),
396 'kind': kind
397 }
400def _create_lobster_item(schema_class, common_params, item_name, status):
401 """
402 Creates and returns a Lobster item based on the schema class.
403 Args:
404 schema_class: Class of the schema (Requirement, Implementation, Activity).
405 common_params (dict): Common parameters for the item.
406 item_name (str): Name of the item.
407 status (str): Status of the item.
408 Returns:
409 Object: An instance of the schema class with the appropriate parameters.
410 """
411 if schema_class is Requirement: 411 ↛ 420line 411 didn't jump to line 420 because the condition on line 411 was always true
412 return Requirement(
413 **common_params,
414 framework="codebeamer",
415 text=None,
416 status=status,
417 name= item_name
418 )
420 elif schema_class is Implementation:
421 return Implementation(
422 **common_params,
423 language="python",
424 name= item_name,
425 )
427 else:
428 return Activity(
429 **common_params,
430 framework="codebeamer",
431 status=status
432 )
435def import_tagged(cb_config: Config, items_to_import: Iterable[int]):
436 rv = []
438 cb_items = get_many_items(cb_config, items_to_import)
439 for cb_item in cb_items:
440 l_item = to_lobster(cb_config, cb_item)
441 rv.append(l_item)
443 return rv
446def ensure_list(instance) -> List:
447 if isinstance(instance, list): 447 ↛ 449line 447 didn't jump to line 449 because the condition on line 447 was always true
448 return instance
449 return [instance]
452def update_authentication_parameters(
453 auth_conf: AuthenticationConfig,
454 netrc_path: Optional[str] = None):
455 if (auth_conf.token is None and
456 (auth_conf.user is None or auth_conf.password is None)):
457 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"),
458 ".netrc")
459 if os.path.isfile(netrc_file):
460 netrc_config = netrc.netrc(netrc_file)
461 machine = urlparse(auth_conf.root).hostname
462 auth = netrc_config.authenticators(machine)
463 if auth is not None:
464 print(f"Using .netrc login for {auth_conf.root}")
465 auth_conf.user, _, auth_conf.password = auth
466 else:
467 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None"
468 raise KeyError(f"Error parsing .netrc file."
469 f"\nExpected '{machine}', but got '{provided_machine}'.")
471 if (auth_conf.token is None and
472 (auth_conf.user is None or auth_conf.password is None)):
473 raise KeyError("Please add your token to the config file, "
474 "or use user and pass in the config file, "
475 "or configure credentials in the .netrc file.")
478def load_config(file_name: str) -> Config:
479 """
480 Parses a YAML configuration file and returns a validated configuration object.
482 Args:
483 file_name (str): Path to the YAML configuration file.
485 Returns:
486 Config: validated configuration.
488 Raises:
489 ValueError: If `file_name` is not a string.
490 FileNotFoundError: If the file does not exist.
491 KeyError: If required fields are missing or unsupported keys are present.
492 """
493 with open(file_name, "r", encoding='utf-8') as file:
494 return parse_config_data(yaml.safe_load(file) or {})
497def parse_config_data(data: dict) -> Config:
498 # Validate supported keys
499 provided_config_keys = set(data.keys())
500 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set()
501 if unsupported_keys:
502 raise KeyError(
503 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
504 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}."
505 )
507 # create config object
508 config = Config(
509 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])),
510 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value),
511 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value),
512 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, True),
513 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100),
514 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"),
515 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30),
516 out=data.get(SupportedConfigKeys.OUT.value),
517 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5),
518 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []),
519 cb_auth_conf=AuthenticationConfig(
520 token=data.get(SupportedConfigKeys.CB_TOKEN.value),
521 user=data.get(SupportedConfigKeys.CB_USER.value),
522 password=data.get(SupportedConfigKeys.CB_PASS.value),
523 root=data.get(SupportedConfigKeys.CB_ROOT.value)
524 ),
525 )
527 # Ensure consistency of the configuration
528 if (not config.import_tagged) and (not config.import_query):
529 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or "
530 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!")
532 if config.cb_auth_conf.root is None: 532 ↛ 533line 532 didn't jump to line 533 because the condition on line 532 was never true
533 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!")
535 if not config.cb_auth_conf.root.startswith("https://"): 535 ↛ 536line 535 didn't jump to line 536 because the condition on line 535 was never true
536 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, "
537 f"but value is {config.cb_auth_conf.root}.")
539 return config
542class CodebeamerTool(MetaDataToolBase):
543 def __init__(self):
544 super().__init__(
545 name="codebeamer",
546 description="Extract codebeamer items for LOBSTER",
547 official=True,
548 )
549 self._argument_parser.add_argument(
550 "--config",
551 help=(f"Path to YAML file with arguments, "
552 f"by default (codebeamer-config.yaml) "
553 f"supported references: '{', '.join(SupportedConfigKeys.as_set())}'"),
554 default=os.path.join(os.getcwd(), "codebeamer-config.yaml"))
556 self._argument_parser.add_argument(
557 "--out",
558 help=("Name of output file"),
559 default="codebeamer.lobster",
560 )
562 def _run_impl(self, options: argparse.Namespace) -> int:
563 try:
564 self._execute(options)
565 return 0
566 except NotFileException as ex:
567 print(ex)
568 except QueryException as query_ex:
569 print(query_ex)
570 except FileNotFoundError as file_ex:
571 self._print_error(f"File '{file_ex.filename}' not found.")
572 except IsADirectoryError as isdir_ex:
573 self._print_error(
574 f"Path '{isdir_ex.filename}' is a directory, but a file was expected.",
575 )
576 except ValueError as value_error:
577 self._print_error(value_error)
578 except LOBSTER_Error as lobster_error:
579 self._print_error(lobster_error)
581 return 1
583 @staticmethod
584 def _print_error(error: Union[Exception, str]):
585 print(f"{TOOL_NAME}: {error}", file=sys.stderr)
587 def _execute(self, options: argparse.Namespace) -> None:
588 mh = Message_Handler()
590 cb_config = load_config(options.config)
592 if cb_config.out is None:
593 cb_config.out = options.out
595 update_authentication_parameters(cb_config.cb_auth_conf)
597 items_to_import = set()
599 if cb_config.import_tagged:
600 source_items = {}
601 lobster_read(
602 mh = mh,
603 filename = cb_config.import_tagged,
604 level = "N/A",
605 items = source_items,
606 )
608 for item in source_items.values():
609 for tag in item.unresolved_references:
610 if tag.namespace != "req":
611 continue
612 try:
613 item_id = int(tag.tag, 10)
614 if item_id > 0:
615 items_to_import.add(item_id)
616 else:
617 mh.warning(item.location,
618 f"invalid codebeamer reference to {item_id}")
619 except ValueError:
620 mh.warning(
621 item.location,
622 f"cannot convert reference '{tag.tag}' to integer "
623 f"Codebeamer ID",
624 )
626 items = import_tagged(cb_config, items_to_import)
628 elif cb_config.import_query is not None:
629 try:
630 if isinstance(cb_config.import_query, str):
631 if (cb_config.import_query.startswith("-") and
632 cb_config.import_query[1:].isdigit()):
633 self._argument_parser.error(
634 "import_query must be a positive integer")
635 elif cb_config.import_query.startswith("-"):
636 self._argument_parser.error(
637 "import_query must be a valid cbQL query")
638 elif cb_config.import_query == "":
639 self._argument_parser.error(
640 "import_query must either be a query string or a query ID")
641 elif cb_config.import_query.isdigit():
642 cb_config.import_query = int(cb_config.import_query)
643 except ValueError as e:
644 self._argument_parser.error(str(e))
646 items = get_query(cb_config, cb_config.import_query)
647 else:
648 raise ValueError(
649 f"Unclear what to do, because neither "
650 f"'{SupportedConfigKeys.IMPORT_QUERY.value}' nor "
651 f"'{SupportedConfigKeys.IMPORT_TAGGED.value}' is specified!",
652 )
654 with _get_out_stream(cb_config.out) as out_stream:
655 _cb_items_to_lobster(items, cb_config, out_stream)
656 if cb_config.out:
657 print(f"Written {len(items)} requirements to {cb_config.out}")
660def _get_out_stream(config_out: Optional[str]) -> TextIO:
661 if config_out:
662 ensure_output_directory(config_out)
663 return open(config_out, "w", encoding="UTF-8")
664 return sys.stdout
667def _cb_items_to_lobster(items: List[Dict], config: Config, out_file: TextIO) -> None:
668 schema_config = get_schema_config(config)
669 lobster_write(out_file, schema_config["class"], TOOL_NAME.replace("-", "_"), items)
672def lobster_codebeamer(config: Config, out_file: str) -> None:
673 """Loads items from codebeamer and serializes them in the LOBSTER interchange
674 format to the given file.
675 """
676 # This is an API function.
677 items = get_query(config, config.import_query)
678 ensure_output_directory(out_file)
679 with open(out_file, "w", encoding="UTF-8") as fd:
680 _cb_items_to_lobster(items, config, fd)
683def main(args: Optional[Sequence[str]] = None) -> int:
684 return CodebeamerTool().run(args)