Coverage for lobster/tools/codebeamer/codebeamer.py: 58%
267 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 09:51 +0000
1#!/usr/bin/env python3
2#
3# lobster_codebeamer - Extract codebeamer items for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20# This tool is based on the codebeamer Rest API v3, as documented here:
21# https://codebeamer.com/cb/wiki/11631738
22#
23# There are some assumptions encoded here that are not clearly
24# documented, in that items have a type and the type has a name.
25#
26#
27# Main limitations:
28# * Item descriptions are ignored right now
29# * Branches (if they exist) are ignored
30# * We only ever fetch the HEAD item
31#
32# However you _can_ import all the items referenced from another
33# lobster artefact.
35import os
36import sys
37import argparse
38import netrc
39from typing import Dict, Iterable, List, Optional, TextIO, Union
40from urllib.parse import quote, urlparse
41from enum import Enum
42import requests
43from requests.adapters import HTTPAdapter
44import yaml
45from urllib3.util.retry import Retry
47from lobster.items import Tracing_Tag, Requirement, Implementation, Activity
48from lobster.location import Codebeamer_Reference
49from lobster.errors import Message_Handler, LOBSTER_Error
50from lobster.io import lobster_read, lobster_write
51from lobster.meta_data_tool_base import MetaDataToolBase
52from lobster.tools.codebeamer.bearer_auth import BearerAuth
53from lobster.tools.codebeamer.config import AuthenticationConfig, Config
54from lobster.tools.codebeamer.exceptions import (
55 MismatchException, NotFileException, QueryException,
56)
59TOOL_NAME = "lobster-codebeamer"
62class SupportedConfigKeys(Enum):
63 """Helper class to define supported configuration keys."""
64 NUM_REQUEST_RETRY = "num_request_retry"
65 RETRY_ERROR_CODES = "retry_error_codes"
66 IMPORT_TAGGED = "import_tagged"
67 IMPORT_QUERY = "import_query"
68 VERIFY_SSL = "verify_ssl"
69 PAGE_SIZE = "page_size"
70 REFS = "refs"
71 SCHEMA = "schema"
72 CB_TOKEN = "token"
73 CB_ROOT = "root"
74 CB_USER = "user"
75 CB_PASS = "pass"
76 TIMEOUT = "timeout"
77 OUT = "out"
79 @classmethod
80 def as_set(cls) -> set:
81 return {parameter.value for parameter in cls}
84def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase:
85 if cb_auth_config.token: 85 ↛ 87line 85 didn't jump to line 87 because the condition on line 85 was always true
86 return BearerAuth(cb_auth_config.token)
87 return requests.auth.HTTPBasicAuth(cb_auth_config.user,
88 cb_auth_config.password)
91def query_cb_single(cb_config: Config, url: str):
92 if cb_config.num_request_retry <= 0: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true
93 raise ValueError("Retry is disabled (num_request_retry is set to 0). "
94 "Cannot proceed with retries.")
96 # Set up a Retry object with exponential backoff
97 retry_strategy = Retry(
98 total=cb_config.num_request_retry,
99 backoff_factor=1, # Exponential backoff: 1s, 2s, 4s, etc.
100 status_forcelist=cb_config.retry_error_codes,
101 allowed_methods=["GET"],
102 raise_on_status=False,
103 )
105 adapter = HTTPAdapter(max_retries=retry_strategy)
106 session = requests.Session()
107 session.mount("https://", adapter)
108 session.mount("http://", adapter)
110 response = session.get(
111 url,
112 auth=get_authentication(cb_config.cb_auth_conf),
113 timeout=cb_config.timeout,
114 verify=cb_config.verify_ssl,
115 )
117 if response.status_code == 200:
118 return response.json()
120 # Final error handling after all retries
121 raise QueryException(f"Could not fetch {url}.")
124def get_single_item(cb_config: Config, item_id: int):
125 if not isinstance(item_id, int) or (item_id <= 0):
126 raise ValueError("item_id must be a positive integer")
127 url = f"{cb_config.base}/items/{item_id}"
128 return query_cb_single(cb_config, url)
131def get_many_items(cb_config: Config, item_ids: Iterable[int]):
132 rv = []
134 page_id = 1
135 query_string = quote(f"item.id IN "
136 f"({','.join(str(item_id) for item_id in item_ids)})")
138 while True:
139 base_url = "%s/items/query?page=%u&pageSize=%u&queryString=%s"\
140 % (cb_config.base, page_id,
141 cb_config.page_size, query_string)
142 data = query_cb_single(cb_config, base_url)
143 rv += data["items"]
144 if len(rv) == data["total"]:
145 break
146 page_id += 1
148 return rv
151def get_query(cb_config: Config, query: Union[int, str]):
152 if (not query) or (not isinstance(query, (int, str))): 152 ↛ 153line 152 didn't jump to line 153 because the condition on line 152 was never true
153 raise ValueError(
154 "The query must either be a real positive integer or a non-empty string!",
155 )
157 rv = []
158 url = ""
159 page_id = 1
160 total_items = None
162 while total_items is None or len(rv) < total_items:
163 print("Fetching page %u of query..." % page_id)
164 if isinstance(query, int): 164 ↛ 170line 164 didn't jump to line 170 because the condition on line 164 was always true
165 url = ("%s/reports/%u/items?page=%u&pageSize=%u" %
166 (cb_config.base,
167 query,
168 page_id,
169 cb_config.page_size))
170 elif isinstance(query, str):
171 url = ("%s/items/query?page=%u&pageSize=%u&queryString=%s" %
172 (cb_config.base,
173 page_id,
174 cb_config.page_size,
175 query))
176 data = query_cb_single(cb_config, url)
177 if len(data) != 4: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 raise MismatchException(
179 f"Expected codebeamer response with 4 data entries, but instead "
180 f"received {len(data)}!",
181 )
183 if page_id == 1 and len(data["items"]) == 0:
184 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message
185 print("This query doesn't generate items. Please check:")
186 print(" * is the number actually correct?")
187 print(" * do you have permissions to access it?")
188 print(f"You can try to access '{url}' manually to check.")
190 if page_id != data["page"]: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 raise MismatchException(
192 f"Page mismatch in query result: expected page "
193 f"{page_id} from codebeamer, but got {data['page']}"
194 )
196 if page_id == 1:
197 total_items = data["total"]
198 elif total_items != data["total"]: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise MismatchException(
200 f"Item count mismatch in query result: expected "
201 f"{total_items} items so far, but page "
202 f"{data['page']} claims to have sent {data['total']} "
203 f"items in total."
204 )
206 if isinstance(query, int): 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true
207 rv += [to_lobster(cb_config, cb_item["item"])
208 for cb_item in data["items"]]
209 elif isinstance(query, str):
210 rv += [to_lobster(cb_config, cb_item)
211 for cb_item in data["items"]]
213 page_id += 1
215 if total_items != len(rv): 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 raise MismatchException(
217 f"Expected to receive {total_items} items in total from codebeamer, "
218 f"but actually received {len(rv)}!",
219 )
221 return rv
224def get_schema_config(cb_config: Config) -> dict:
225 """
226 The function returns a schema map based on the schema mentioned
227 in the cb_config dictionary.
229 If there is no match, it raises a KeyError.
231 Positional arguments:
232 cb_config -- configuration object containing the schema.
234 Returns:
235 A dictionary containing the namespace and class associated with the schema.
237 Raises:
238 KeyError -- if the provided schema is not supported.
239 """
240 schema_map = {
241 'requirement': {"namespace": "req", "class": Requirement},
242 'implementation': {"namespace": "imp", "class": Implementation},
243 'activity': {"namespace": "act", "class": Activity},
244 }
245 schema = cb_config.schema.lower()
247 if schema not in schema_map: 247 ↛ 248line 247 didn't jump to line 248 because the condition on line 247 was never true
248 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.")
250 return schema_map[schema]
253def to_lobster(cb_config: Config, cb_item: dict):
254 if not isinstance(cb_item, dict): 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 raise ValueError("'cb_item' must be of type 'dict'!")
256 if "id" not in cb_item: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 raise KeyError("Codebeamer item does not contain ID!")
259 # This looks like it's business logic, maybe we should make this
260 # configurable?
262 categories = cb_item.get("categories")
263 if categories: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 kind = categories[0].get("name", "codebeamer item")
265 else:
266 kind = "codebeamer item"
268 status = cb_item["status"].get("name", None) if "status" in cb_item else None
270 # Get item name. Sometimes items do not have one, in which case we
271 # come up with one.
272 if "name" in cb_item: 272 ↛ 275line 272 didn't jump to line 275 because the condition on line 272 was always true
273 item_name = cb_item["name"]
274 else:
275 item_name = f"Unnamed item {cb_item['id']}"
277 schema_config = get_schema_config(cb_config)
279 # Construct the appropriate object based on 'kind'
280 common_params = _create_common_params(
281 schema_config["namespace"], cb_item,
282 cb_config.cb_auth_conf.root, item_name, kind)
283 item = _create_lobster_item(
284 schema_config["class"],
285 common_params, item_name, status)
287 if cb_config.references:
288 for displayed_name in cb_config.references:
289 if cb_item.get(displayed_name): 289 ↛ 294line 289 didn't jump to line 294 because the condition on line 289 was always true
290 item_references = cb_item.get(displayed_name) if (
291 isinstance(cb_item.get(displayed_name), list)) \
292 else [cb_item.get(displayed_name)]
293 else:
294 item_references = [value for custom_field
295 in cb_item["customFields"]
296 if custom_field["name"] == displayed_name and
297 custom_field.get("values")
298 for value in custom_field["values"]]
300 for value in item_references:
301 item.add_tracing_target(Tracing_Tag("req", str(value["id"])))
303 return item
306def _create_common_params(namespace: str, cb_item: dict, cb_root: str,
307 item_name: str, kind: str):
308 """
309 Creates and returns common parameters for a Codebeamer item.
310 Args:
311 namespace (str): Namespace for the tag.
312 cb_item (dict): Codebeamer item dictionary.
313 cb_root (str): Root URL or path of Codebeamer.
314 item_name (str): Name of the item.
315 kind (str): Type of the item.
316 Returns:
317 dict: Common parameters including tag, location, and kind.
318 """
319 return {
320 'tag': Tracing_Tag(
321 namespace=namespace,
322 tag=str(cb_item["id"]),
323 version=cb_item["version"]
324 ),
325 'location': Codebeamer_Reference(
326 cb_root=cb_root,
327 tracker=cb_item["tracker"]["id"],
328 item=cb_item["id"],
329 version=cb_item["version"],
330 name=item_name
331 ),
332 'kind': kind
333 }
336def _create_lobster_item(schema_class, common_params, item_name, status):
337 """
338 Creates and returns a Lobster item based on the schema class.
339 Args:
340 schema_class: Class of the schema (Requirement, Implementation, Activity).
341 common_params (dict): Common parameters for the item.
342 item_name (str): Name of the item.
343 status (str): Status of the item.
344 Returns:
345 Object: An instance of the schema class with the appropriate parameters.
346 """
347 if schema_class is Requirement: 347 ↛ 356line 347 didn't jump to line 356 because the condition on line 347 was always true
348 return Requirement(
349 **common_params,
350 framework="codebeamer",
351 text=None,
352 status=status,
353 name= item_name
354 )
356 elif schema_class is Implementation:
357 return Implementation(
358 **common_params,
359 language="python",
360 name= item_name,
361 )
363 else:
364 return Activity(
365 **common_params,
366 framework="codebeamer",
367 status=status
368 )
371def import_tagged(cb_config: Config, items_to_import: Iterable[int]):
372 rv = []
374 cb_items = get_many_items(cb_config, items_to_import)
375 for cb_item in cb_items:
376 l_item = to_lobster(cb_config, cb_item)
377 rv.append(l_item)
379 return rv
382def ensure_list(instance) -> List:
383 if isinstance(instance, list): 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was always true
384 return instance
385 return [instance]
388def update_authentication_parameters(
389 auth_conf: AuthenticationConfig,
390 netrc_path: Optional[str] = None):
391 if (auth_conf.token is None and 391 ↛ 393line 391 didn't jump to line 393 because the condition on line 391 was never true
392 (auth_conf.user is None or auth_conf.password is None)):
393 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"),
394 ".netrc")
395 if os.path.isfile(netrc_file):
396 netrc_config = netrc.netrc(netrc_file)
397 machine = urlparse(auth_conf.root).hostname
398 auth = netrc_config.authenticators(machine)
399 if auth is not None:
400 print(f"Using .netrc login for {auth_conf.root}")
401 auth_conf.user, _, auth_conf.password = auth
402 else:
403 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None"
404 raise KeyError(f"Error parsing .netrc file."
405 f"\nExpected '{machine}', but got '{provided_machine}'.")
407 if (auth_conf.token is None and 407 ↛ 409line 407 didn't jump to line 409 because the condition on line 407 was never true
408 (auth_conf.user is None or auth_conf.password is None)):
409 raise KeyError("Please add your token to the config file, "
410 "or use user and pass in the config file, "
411 "or configure credentials in the .netrc file.")
414def load_config(file_name: str) -> Config:
415 """
416 Parses a YAML configuration file and returns a validated configuration object.
418 Args:
419 file_name (str): Path to the YAML configuration file.
421 Returns:
422 Config: validated configuration.
424 Raises:
425 ValueError: If `file_name` is not a string.
426 FileNotFoundError: If the file does not exist.
427 KeyError: If required fields are missing or unsupported keys are present.
428 """
429 with open(file_name, "r", encoding='utf-8') as file:
430 return parse_config_data(yaml.safe_load(file) or {})
433def parse_config_data(data: dict) -> Config:
434 # Validate supported keys
435 provided_config_keys = set(data.keys())
436 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set()
437 if unsupported_keys: 437 ↛ 438line 437 didn't jump to line 438 because the condition on line 437 was never true
438 raise KeyError(
439 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
440 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}."
441 )
443 # create config object
444 config = Config(
445 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])),
446 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value),
447 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value),
448 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, False),
449 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100),
450 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"),
451 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30),
452 out=data.get(SupportedConfigKeys.OUT.value),
453 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5),
454 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []),
455 cb_auth_conf=AuthenticationConfig(
456 token=data.get(SupportedConfigKeys.CB_TOKEN.value),
457 user=data.get(SupportedConfigKeys.CB_USER.value),
458 password=data.get(SupportedConfigKeys.CB_PASS.value),
459 root=data.get(SupportedConfigKeys.CB_ROOT.value)
460 ),
461 )
463 # Ensure consistency of the configuration
464 if (not config.import_tagged) and (not config.import_query): 464 ↛ 465line 464 didn't jump to line 465 because the condition on line 464 was never true
465 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or "
466 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!")
468 if config.cb_auth_conf.root is None: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!")
471 if not config.cb_auth_conf.root.startswith("https://"): 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true
472 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, "
473 f"but value is {config.cb_auth_conf.root}.")
475 return config
478class CodebeamerTool(MetaDataToolBase):
479 def __init__(self):
480 super().__init__(
481 name="codebeamer",
482 description="Extract codebeamer items for LOBSTER",
483 official=True,
484 )
485 self._argument_parser.add_argument(
486 "--config",
487 help=(f"Path to YAML file with arguments, "
488 f"by default (codebeamer-config.yaml) "
489 f"supported references: '{', '.join(SupportedConfigKeys.as_set())}'"),
490 default=os.path.join(os.getcwd(), "codebeamer-config.yaml"))
492 self._argument_parser.add_argument(
493 "--out",
494 help=("Name of output file"),
495 default="codebeamer.lobster",
496 )
498 def _run_impl(self, options: argparse.Namespace) -> int:
499 try:
500 self._execute(options)
501 return 0
502 except NotFileException as ex:
503 print(ex)
504 except QueryException as query_ex:
505 print(query_ex)
506 print("You can either:")
507 print("* increase the timeout with the timeout parameter")
508 print("* decrease the query size with the query_size parameter")
509 print("* increase the retry count with the parameters (num_request_retry, "
510 "retry_error_codes)")
511 except FileNotFoundError as file_ex:
512 self._print_error(f"File '{file_ex.filename}' not found.")
513 except IsADirectoryError as isdir_ex:
514 self._print_error(
515 f"Path '{isdir_ex.filename}' is a directory, but a file was expected.",
516 )
517 except ValueError as value_error:
518 self._print_error(value_error)
519 except LOBSTER_Error as lobster_error:
520 self._print_error(lobster_error)
522 return 1
524 @staticmethod
525 def _print_error(error: Union[Exception, str]):
526 print(f"{TOOL_NAME}: {error}", file=sys.stderr)
528 def _execute(self, options: argparse.Namespace) -> None:
529 mh = Message_Handler()
531 cb_config = load_config(options.config)
533 if cb_config.out is None: 533 ↛ 534line 533 didn't jump to line 534 because the condition on line 533 was never true
534 cb_config.out = options.out
536 update_authentication_parameters(cb_config.cb_auth_conf)
538 items_to_import = set()
540 if cb_config.import_tagged: 540 ↛ 541line 540 didn't jump to line 541 because the condition on line 540 was never true
541 source_items = {}
542 lobster_read(
543 mh = mh,
544 filename = cb_config.import_tagged,
545 level = "N/A",
546 items = source_items,
547 )
549 for item in source_items.values():
550 for tag in item.unresolved_references:
551 if tag.namespace != "req":
552 continue
553 try:
554 item_id = int(tag.tag, 10)
555 if item_id > 0:
556 items_to_import.add(item_id)
557 else:
558 mh.warning(item.location,
559 f"invalid codebeamer reference to {item_id}")
560 except ValueError:
561 mh.warning(
562 item.location,
563 f"cannot convert reference '{tag.tag}' to integer "
564 f"Codebeamer ID",
565 )
567 items = import_tagged(cb_config, items_to_import)
569 elif cb_config.import_query is not None: 569 ↛ 589line 569 didn't jump to line 589 because the condition on line 569 was always true
570 try:
571 if isinstance(cb_config.import_query, str): 571 ↛ 572line 571 didn't jump to line 572 because the condition on line 571 was never true
572 if (cb_config.import_query.startswith("-") and
573 cb_config.import_query[1:].isdigit()):
574 self._argument_parser.error(
575 "import_query must be a positive integer")
576 elif cb_config.import_query.startswith("-"):
577 self._argument_parser.error(
578 "import_query must be a valid cbQL query")
579 elif cb_config.import_query == "":
580 self._argument_parser.error(
581 "import_query must either be a query string or a query ID")
582 elif cb_config.import_query.isdigit():
583 cb_config.import_query = int(cb_config.import_query)
584 except ValueError as e:
585 self._argument_parser.error(str(e))
587 items = get_query(cb_config, cb_config.import_query)
588 else:
589 raise ValueError(
590 f"Unclear what to do, because neither "
591 f"'{SupportedConfigKeys.IMPORT_QUERY.value}' nor "
592 f"'{SupportedConfigKeys.IMPORT_TAGGED.value}' is specified!",
593 )
595 with _get_out_stream(cb_config.out) as out_stream:
596 _cb_items_to_lobster(items, cb_config, out_stream)
597 if cb_config.out: 597 ↛ exitline 597 didn't return from function '_execute' because the condition on line 597 was always true
598 print(f"Written {len(items)} requirements to {cb_config.out}")
601def _get_out_stream(config_out: Optional[str]) -> TextIO:
602 if config_out: 602 ↛ 604line 602 didn't jump to line 604 because the condition on line 602 was always true
603 return open(config_out, "w", encoding="UTF-8")
604 return sys.stdout
607def _cb_items_to_lobster(items: List[Dict], config: Config, out_file: TextIO) -> None:
608 schema_config = get_schema_config(config)
609 lobster_write(out_file, schema_config["class"], TOOL_NAME.replace("-", "_"), items)
612def cb_query_to_lobster_file(config: Config, out_file: str) -> None:
613 """Loads items from codebeamer and serializes them in the LOBSTER interchange
614 format to the given file.
615 """
616 # This is an API function.
617 items = get_query(config, config.import_query)
618 with open(out_file, "w", encoding="UTF-8") as fd:
619 _cb_items_to_lobster(items, config, fd)
622def main() -> int:
623 return CodebeamerTool().run()