Coverage for lobster/tools/codebeamer/codebeamer.py: 52%
291 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-01-09 10:06 +0000
1#!/usr/bin/env python3
2#
3# lobster_codebeamer - Extract codebeamer items for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20# This tool is based on the codebeamer Rest API v3, as documented here:
21# https://codebeamer.com/cb/wiki/11631738
22#
23# There are some assumptions encoded here that are not clearly
24# documented, in that items have a type and the type has a name.
25#
26#
27# Main limitations:
28# * Item descriptions are ignored right now
29# * Branches (if they exist) are ignored
30# * We only ever fetch the HEAD item
31#
32# However you _can_ import all the items referenced from another
33# lobster artefact.
35import os
36import sys
37import argparse
38import netrc
39from typing import Dict, Iterable, List, Optional, Sequence, TextIO, Union
40from urllib.parse import quote, urlparse
41from enum import Enum
42from http import HTTPStatus
44import requests
45from requests.adapters import HTTPAdapter
46from requests.exceptions import (
47 Timeout,
48 ConnectionError as RequestsConnectionError,
49 RequestException,
50)
51import yaml
52from urllib3.util.retry import Retry
54from lobster.common.items import Tracing_Tag, Requirement, Implementation, Activity
55from lobster.common.location import Codebeamer_Reference
56from lobster.common.errors import Message_Handler, LOBSTER_Error
57from lobster.common.io import lobster_read, lobster_write, ensure_output_directory
58from lobster.common.meta_data_tool_base import MetaDataToolBase
59from lobster.tools.codebeamer.bearer_auth import BearerAuth
60from lobster.tools.codebeamer.config import AuthenticationConfig, Config
61from lobster.tools.codebeamer.exceptions import (
62 MismatchException, NotFileException, QueryException,
63)
66TOOL_NAME = "lobster-codebeamer"
69class SupportedConfigKeys(Enum):
70 """Helper class to define supported configuration keys."""
71 NUM_REQUEST_RETRY = "num_request_retry"
72 RETRY_ERROR_CODES = "retry_error_codes"
73 IMPORT_TAGGED = "import_tagged"
74 IMPORT_QUERY = "import_query"
75 VERIFY_SSL = "verify_ssl"
76 PAGE_SIZE = "page_size"
77 REFS = "refs"
78 SCHEMA = "schema"
79 CB_TOKEN = "token"
80 CB_ROOT = "root"
81 CB_USER = "user"
82 CB_PASS = "pass"
83 TIMEOUT = "timeout"
84 OUT = "out"
86 @classmethod
87 def as_set(cls) -> set:
88 return {parameter.value for parameter in cls}
91def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase:
92 if cb_auth_config.token:
93 return BearerAuth(cb_auth_config.token)
94 return requests.auth.HTTPBasicAuth(cb_auth_config.user,
95 cb_auth_config.password)
98def _get_response_message(response: requests.Response) -> str:
99 try:
100 data = response.json()
101 if isinstance(data, dict) and "message" in data:
102 return data["message"]
103 except ValueError:
104 pass
106 return response.text.strip() or "Unknown error"
109def _get_http_reason(response: requests.Response) -> str:
110 if response.reason:
111 return response.reason
112 try:
113 return HTTPStatus(response.status_code).phrase
114 except ValueError:
115 return "Unknown Status"
118def query_cb_single(cb_config: Config, url: str):
119 if cb_config.num_request_retry <= 0:
120 raise ValueError("Retry is disabled (num_request_retry is set to 0). "
121 "Cannot proceed with retries.")
123 # Set up a Retry object with exponential backoff
124 retry_strategy = Retry(
125 total=cb_config.num_request_retry,
126 backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, etc.
127 status_forcelist=cb_config.retry_error_codes,
128 allowed_methods=["GET"],
129 raise_on_status=False,
130 )
132 adapter = HTTPAdapter(max_retries=retry_strategy)
133 session = requests.Session()
134 session.mount("https://", adapter)
135 session.mount("http://", adapter)
137 try:
138 response = session.get(
139 url,
140 auth=get_authentication(cb_config.cb_auth_conf),
141 timeout=cb_config.timeout,
142 verify=cb_config.verify_ssl,
143 )
144 except Timeout as ex:
145 raise QueryException(
146 "Connection timed out while contacting Codebeamer\n"
147 f"URL: {url}\n"
148 f"Reason: {ex}\n"
149 "\nPossible actions:\n"
150 "• Increase the timeout using the 'timeout' parameter"
151 ) from ex
153 except RequestsConnectionError as ex:
154 raise QueryException(
155 "Unable to connect to Codebeamer\n"
156 f"URL: {url}\n"
157 f"Reason: {ex}\n"
158 "\nPossible actions:\n"
159 "• Check internet connection\n"
160 "• Increase retries using 'num_request_retry'"
161 ) from ex
163 except RequestException as ex:
164 raise QueryException(
165 "Unexpected network error while connecting to Codebeamer\n"
166 f"URL: {url}\n"
167 f"Reason: {ex}"
168 "\nPossible actions:\n"
169 "• Check network stability\n"
170 ) from ex
172 if response.status_code == 200:
173 return response.json()
175 error_message = _get_response_message(response)
176 reason = _get_http_reason(response)
178 raise QueryException(
179 "Codebeamer request failed:\n"
180 f" URL: {url}\n"
181 f" HTTP Status: {response.status_code} ({reason})\n"
182 f"Reason: {error_message}"
183 )
186def get_single_item(cb_config: Config, item_id: int):
187 if not isinstance(item_id, int) or (item_id <= 0):
188 raise ValueError("item_id must be a positive integer")
189 url = f"{cb_config.base}/items/{item_id}"
190 return query_cb_single(cb_config, url)
193def get_many_items(cb_config: Config, item_ids: Iterable[int]):
194 rv = []
196 page_id = 1
197 query_string = quote(f"item.id IN "
198 f"({','.join(str(item_id) for item_id in item_ids)})")
200 while True:
201 base_url = "%s/items/query?page=%u&pageSize=%u&queryString=%s"\
202 % (cb_config.base, page_id,
203 cb_config.page_size, query_string)
204 data = query_cb_single(cb_config, base_url)
205 rv += data["items"]
206 if len(rv) == data["total"]: 206 ↛ 208line 206 didn't jump to line 208 because the condition on line 206 was always true
207 break
208 page_id += 1
210 return rv
213def get_query(cb_config: Config, query: Union[int, str]):
214 if (not query) or (not isinstance(query, (int, str))): 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 raise ValueError(
216 "The query must either be a real positive integer or a non-empty string!",
217 )
219 rv = []
220 url = ""
221 page_id = 1
222 total_items = None
224 while total_items is None or len(rv) < total_items:
225 print("Fetching page %u of query..." % page_id)
226 if isinstance(query, int):
227 url = ("%s/reports/%u/items?page=%u&pageSize=%u" %
228 (cb_config.base,
229 query,
230 page_id,
231 cb_config.page_size))
232 elif isinstance(query, str): 232 ↛ 238line 232 didn't jump to line 238 because the condition on line 232 was always true
233 url = ("%s/items/query?page=%u&pageSize=%u&queryString=%s" %
234 (cb_config.base,
235 page_id,
236 cb_config.page_size,
237 query))
238 data = query_cb_single(cb_config, url)
239 if len(data) != 4: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true
240 raise MismatchException(
241 f"Expected codebeamer response with 4 data entries, but instead "
242 f"received {len(data)}!",
243 )
245 if page_id == 1 and len(data["items"]) == 0:
246 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message
247 print("This query doesn't generate items. Please check:")
248 print(" * is the number actually correct?")
249 print(" * do you have permissions to access it?")
250 print(f"You can try to access '{url}' manually to check.")
252 if page_id != data["page"]:
253 raise MismatchException(
254 f"Page mismatch in query result: expected page "
255 f"{page_id} from codebeamer, but got {data['page']}"
256 )
258 if page_id == 1: 258 ↛ 260line 258 didn't jump to line 260 because the condition on line 258 was always true
259 total_items = data["total"]
260 elif total_items != data["total"]:
261 raise MismatchException(
262 f"Item count mismatch in query result: expected "
263 f"{total_items} items so far, but page "
264 f"{data['page']} claims to have sent {data['total']} "
265 f"items in total."
266 )
268 if isinstance(query, int):
269 rv += [to_lobster(cb_config, cb_item["item"])
270 for cb_item in data["items"]]
271 elif isinstance(query, str): 271 ↛ 275line 271 didn't jump to line 275 because the condition on line 271 was always true
272 rv += [to_lobster(cb_config, cb_item)
273 for cb_item in data["items"]]
275 page_id += 1
277 if total_items != len(rv): 277 ↛ 278line 277 didn't jump to line 278 because the condition on line 277 was never true
278 raise MismatchException(
279 f"Expected to receive {total_items} items in total from codebeamer, "
280 f"but actually received {len(rv)}!",
281 )
283 return rv
286def get_schema_config(cb_config: Config) -> dict:
287 """
288 The function returns a schema map based on the schema mentioned
289 in the cb_config dictionary.
291 If there is no match, it raises a KeyError.
293 Positional arguments:
294 cb_config -- configuration object containing the schema.
296 Returns:
297 A dictionary containing the namespace and class associated with the schema.
299 Raises:
300 KeyError -- if the provided schema is not supported.
301 """
302 schema_map = {
303 'requirement': {"namespace": "req", "class": Requirement},
304 'implementation': {"namespace": "imp", "class": Implementation},
305 'activity': {"namespace": "act", "class": Activity},
306 }
307 schema = cb_config.schema.lower()
309 if schema not in schema_map: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.")
312 return schema_map[schema]
315def to_lobster(cb_config: Config, cb_item: dict):
316 if not isinstance(cb_item, dict): 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 raise ValueError("'cb_item' must be of type 'dict'!")
318 if "id" not in cb_item: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 raise KeyError("Codebeamer item does not contain ID!")
321 # This looks like it's business logic, maybe we should make this
322 # configurable?
324 categories = cb_item.get("categories")
325 if categories: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true
326 kind = categories[0].get("name", "codebeamer item")
327 else:
328 kind = "codebeamer item"
330 status = cb_item["status"].get("name", None) if "status" in cb_item else None
332 # Get item name. Sometimes items do not have one, in which case we
333 # come up with one.
334 if "name" in cb_item: 334 ↛ 337line 334 didn't jump to line 337 because the condition on line 334 was always true
335 item_name = cb_item["name"]
336 else:
337 item_name = f"Unnamed item {cb_item['id']}"
339 schema_config = get_schema_config(cb_config)
341 # Construct the appropriate object based on 'kind'
342 common_params = _create_common_params(
343 schema_config["namespace"], cb_item,
344 cb_config.cb_auth_conf.root, item_name, kind)
345 item = _create_lobster_item(
346 schema_config["class"],
347 common_params, item_name, status)
349 if cb_config.references: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 for displayed_name in cb_config.references:
351 if cb_item.get(displayed_name):
352 item_references = cb_item.get(displayed_name) if (
353 isinstance(cb_item.get(displayed_name), list)) \
354 else [cb_item.get(displayed_name)]
355 else:
356 item_references = [value for custom_field
357 in cb_item["customFields"]
358 if custom_field["name"] == displayed_name and
359 custom_field.get("values")
360 for value in custom_field["values"]]
362 for value in item_references:
363 item.add_tracing_target(Tracing_Tag("req", str(value["id"])))
365 return item
368def _create_common_params(namespace: str, cb_item: dict, cb_root: str,
369 item_name: str, kind: str):
370 """
371 Creates and returns common parameters for a Codebeamer item.
372 Args:
373 namespace (str): Namespace for the tag.
374 cb_item (dict): Codebeamer item dictionary.
375 cb_root (str): Root URL or path of Codebeamer.
376 item_name (str): Name of the item.
377 kind (str): Type of the item.
378 Returns:
379 dict: Common parameters including tag, location, and kind.
380 """
381 return {
382 'tag': Tracing_Tag(
383 namespace=namespace,
384 tag=str(cb_item["id"]),
385 version=cb_item["version"]
386 ),
387 'location': Codebeamer_Reference(
388 cb_root=cb_root,
389 tracker=cb_item["tracker"]["id"],
390 item=cb_item["id"],
391 version=cb_item["version"],
392 name=item_name
393 ),
394 'kind': kind
395 }
398def _create_lobster_item(schema_class, common_params, item_name, status):
399 """
400 Creates and returns a Lobster item based on the schema class.
401 Args:
402 schema_class: Class of the schema (Requirement, Implementation, Activity).
403 common_params (dict): Common parameters for the item.
404 item_name (str): Name of the item.
405 status (str): Status of the item.
406 Returns:
407 Object: An instance of the schema class with the appropriate parameters.
408 """
409 if schema_class is Requirement: 409 ↛ 418line 409 didn't jump to line 418 because the condition on line 409 was always true
410 return Requirement(
411 **common_params,
412 framework="codebeamer",
413 text=None,
414 status=status,
415 name= item_name
416 )
418 elif schema_class is Implementation:
419 return Implementation(
420 **common_params,
421 language="python",
422 name= item_name,
423 )
425 else:
426 return Activity(
427 **common_params,
428 framework="codebeamer",
429 status=status
430 )
433def import_tagged(cb_config: Config, items_to_import: Iterable[int]):
434 rv = []
436 cb_items = get_many_items(cb_config, items_to_import)
437 for cb_item in cb_items:
438 l_item = to_lobster(cb_config, cb_item)
439 rv.append(l_item)
441 return rv
444def ensure_list(instance) -> List:
445 if isinstance(instance, list): 445 ↛ 447line 445 didn't jump to line 447 because the condition on line 445 was always true
446 return instance
447 return [instance]
450def update_authentication_parameters(
451 auth_conf: AuthenticationConfig,
452 netrc_path: Optional[str] = None):
453 if (auth_conf.token is None and
454 (auth_conf.user is None or auth_conf.password is None)):
455 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"),
456 ".netrc")
457 if os.path.isfile(netrc_file):
458 netrc_config = netrc.netrc(netrc_file)
459 machine = urlparse(auth_conf.root).hostname
460 auth = netrc_config.authenticators(machine)
461 if auth is not None:
462 print(f"Using .netrc login for {auth_conf.root}")
463 auth_conf.user, _, auth_conf.password = auth
464 else:
465 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None"
466 raise KeyError(f"Error parsing .netrc file."
467 f"\nExpected '{machine}', but got '{provided_machine}'.")
469 if (auth_conf.token is None and
470 (auth_conf.user is None or auth_conf.password is None)):
471 raise KeyError("Please add your token to the config file, "
472 "or use user and pass in the config file, "
473 "or configure credentials in the .netrc file.")
476def load_config(file_name: str) -> Config:
477 """
478 Parses a YAML configuration file and returns a validated configuration object.
480 Args:
481 file_name (str): Path to the YAML configuration file.
483 Returns:
484 Config: validated configuration.
486 Raises:
487 ValueError: If `file_name` is not a string.
488 FileNotFoundError: If the file does not exist.
489 KeyError: If required fields are missing or unsupported keys are present.
490 """
491 with open(file_name, "r", encoding='utf-8') as file:
492 return parse_config_data(yaml.safe_load(file) or {})
495def parse_config_data(data: dict) -> Config:
496 # Validate supported keys
497 provided_config_keys = set(data.keys())
498 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set()
499 if unsupported_keys:
500 raise KeyError(
501 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
502 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}."
503 )
505 # create config object
506 config = Config(
507 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])),
508 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value),
509 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value),
510 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, False),
511 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100),
512 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"),
513 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30),
514 out=data.get(SupportedConfigKeys.OUT.value),
515 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5),
516 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []),
517 cb_auth_conf=AuthenticationConfig(
518 token=data.get(SupportedConfigKeys.CB_TOKEN.value),
519 user=data.get(SupportedConfigKeys.CB_USER.value),
520 password=data.get(SupportedConfigKeys.CB_PASS.value),
521 root=data.get(SupportedConfigKeys.CB_ROOT.value)
522 ),
523 )
525 # Ensure consistency of the configuration
526 if (not config.import_tagged) and (not config.import_query):
527 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or "
528 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!")
530 if config.cb_auth_conf.root is None: 530 ↛ 531line 530 didn't jump to line 531 because the condition on line 530 was never true
531 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!")
533 if not config.cb_auth_conf.root.startswith("https://"): 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, "
535 f"but value is {config.cb_auth_conf.root}.")
537 return config
540class CodebeamerTool(MetaDataToolBase):
541 def __init__(self):
542 super().__init__(
543 name="codebeamer",
544 description="Extract codebeamer items for LOBSTER",
545 official=True,
546 )
547 self._argument_parser.add_argument(
548 "--config",
549 help=(f"Path to YAML file with arguments, "
550 f"by default (codebeamer-config.yaml) "
551 f"supported references: '{', '.join(SupportedConfigKeys.as_set())}'"),
552 default=os.path.join(os.getcwd(), "codebeamer-config.yaml"))
554 self._argument_parser.add_argument(
555 "--out",
556 help=("Name of output file"),
557 default="codebeamer.lobster",
558 )
560 def _run_impl(self, options: argparse.Namespace) -> int:
561 try:
562 self._execute(options)
563 return 0
564 except NotFileException as ex:
565 print(ex)
566 except QueryException as query_ex:
567 print(query_ex)
568 except FileNotFoundError as file_ex:
569 self._print_error(f"File '{file_ex.filename}' not found.")
570 except IsADirectoryError as isdir_ex:
571 self._print_error(
572 f"Path '{isdir_ex.filename}' is a directory, but a file was expected.",
573 )
574 except ValueError as value_error:
575 self._print_error(value_error)
576 except LOBSTER_Error as lobster_error:
577 self._print_error(lobster_error)
579 return 1
581 @staticmethod
582 def _print_error(error: Union[Exception, str]):
583 print(f"{TOOL_NAME}: {error}", file=sys.stderr)
585 def _execute(self, options: argparse.Namespace) -> None:
586 mh = Message_Handler()
588 cb_config = load_config(options.config)
590 if cb_config.out is None:
591 cb_config.out = options.out
593 update_authentication_parameters(cb_config.cb_auth_conf)
595 items_to_import = set()
597 if cb_config.import_tagged:
598 source_items = {}
599 lobster_read(
600 mh = mh,
601 filename = cb_config.import_tagged,
602 level = "N/A",
603 items = source_items,
604 )
606 for item in source_items.values():
607 for tag in item.unresolved_references:
608 if tag.namespace != "req":
609 continue
610 try:
611 item_id = int(tag.tag, 10)
612 if item_id > 0:
613 items_to_import.add(item_id)
614 else:
615 mh.warning(item.location,
616 f"invalid codebeamer reference to {item_id}")
617 except ValueError:
618 mh.warning(
619 item.location,
620 f"cannot convert reference '{tag.tag}' to integer "
621 f"Codebeamer ID",
622 )
624 items = import_tagged(cb_config, items_to_import)
626 elif cb_config.import_query is not None:
627 try:
628 if isinstance(cb_config.import_query, str):
629 if (cb_config.import_query.startswith("-") and
630 cb_config.import_query[1:].isdigit()):
631 self._argument_parser.error(
632 "import_query must be a positive integer")
633 elif cb_config.import_query.startswith("-"):
634 self._argument_parser.error(
635 "import_query must be a valid cbQL query")
636 elif cb_config.import_query == "":
637 self._argument_parser.error(
638 "import_query must either be a query string or a query ID")
639 elif cb_config.import_query.isdigit():
640 cb_config.import_query = int(cb_config.import_query)
641 except ValueError as e:
642 self._argument_parser.error(str(e))
644 items = get_query(cb_config, cb_config.import_query)
645 else:
646 raise ValueError(
647 f"Unclear what to do, because neither "
648 f"'{SupportedConfigKeys.IMPORT_QUERY.value}' nor "
649 f"'{SupportedConfigKeys.IMPORT_TAGGED.value}' is specified!",
650 )
652 with _get_out_stream(cb_config.out) as out_stream:
653 _cb_items_to_lobster(items, cb_config, out_stream)
654 if cb_config.out:
655 print(f"Written {len(items)} requirements to {cb_config.out}")
658def _get_out_stream(config_out: Optional[str]) -> TextIO:
659 if config_out:
660 ensure_output_directory(config_out)
661 return open(config_out, "w", encoding="UTF-8")
662 return sys.stdout
665def _cb_items_to_lobster(items: List[Dict], config: Config, out_file: TextIO) -> None:
666 schema_config = get_schema_config(config)
667 lobster_write(out_file, schema_config["class"], TOOL_NAME.replace("-", "_"), items)
670def lobster_codebeamer(config: Config, out_file: str) -> None:
671 """Loads items from codebeamer and serializes them in the LOBSTER interchange
672 format to the given file.
673 """
674 # This is an API function.
675 items = get_query(config, config.import_query)
676 ensure_output_directory(out_file)
677 with open(out_file, "w", encoding="UTF-8") as fd:
678 _cb_items_to_lobster(items, config, fd)
681def main(args: Optional[Sequence[str]] = None) -> int:
682 return CodebeamerTool().run(args)