Coverage for lobster/tools/codebeamer/codebeamer.py: 57%
255 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-26 14:55 +0000
1#!/usr/bin/env python3
2#
3# lobster_codebeamer - Extract codebeamer items for LOBSTER
4# Copyright (C) 2023-2025 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU Affero General Public License as
8# published by the Free Software Foundation, either version 3 of the
9# License, or (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Affero General Public License for more details.
15#
16# You should have received a copy of the GNU Affero General Public
17# License along with this program. If not, see
18# <https://www.gnu.org/licenses/>.
20# This tool is based on the codebeamer Rest API v3, as documented here:
21# https://codebeamer.com/cb/wiki/11631738
22#
23# There are some assumptions encoded here that are not clearly
24# documented, in that items have a type and the type has a name.
25#
26#
27# Main limitations:
28# * Item descriptions are ignored right now
29# * Branches (if they exist) are ignored
30# * We only ever fetch the HEAD item
31#
32# However you _can_ import all the items referenced from another
33# lobster artefact.
35import os
36import time
37import sys
38import argparse
39import netrc
40from typing import List, Optional, Set, Union
41from urllib.parse import quote, urlparse
42from enum import Enum
43import requests
44import yaml
46from lobster.items import Tracing_Tag, Requirement, Implementation, Activity
47from lobster.location import Codebeamer_Reference
48from lobster.errors import Message_Handler, LOBSTER_Error
49from lobster.io import lobster_read, lobster_write
50from lobster.tools.codebeamer.bearer_auth import BearerAuth
51from lobster.tools.codebeamer.config import AuthenticationConfig, Config
52from lobster.version import get_version
55class CodebeamerError(Exception):
56 pass
59class SupportedConfigKeys(Enum):
60 """Helper class to define supported configuration keys."""
61 NUM_REQUEST_RETRY = "num_request_retry"
62 RETRY_ERROR_CODES = "retry_error_codes"
63 IMPORT_TAGGED = "import_tagged"
64 IMPORT_QUERY = "import_query"
65 VERIFY_SSL = "verify_ssl"
66 PAGE_SIZE = "page_size"
67 REFS = "refs"
68 SCHEMA = "schema"
69 CB_TOKEN = "token"
70 CB_ROOT = "root"
71 CB_USER = "user"
72 CB_PASS = "pass"
73 TIMEOUT = "timeout"
74 OUT = "out"
76 @classmethod
77 def as_set(cls) -> set:
78 return {parameter.value for parameter in cls}
81def get_authentication(cb_auth_config: AuthenticationConfig) -> requests.auth.AuthBase:
82 if cb_auth_config.token: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true
83 return BearerAuth(cb_auth_config.token)
84 return requests.auth.HTTPBasicAuth(cb_auth_config.user,
85 cb_auth_config.password)
88def query_cb_single(cb_config: Config, url: str):
89 if cb_config.num_request_retry <= 0: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 raise ValueError("Retry is disabled (num_request_retry is set to 0). "
91 "Cannot proceed with retries.")
93 for attempt in range(1, cb_config.num_request_retry + 1):
94 try:
95 result = requests.get(
96 url,
97 auth=get_authentication(cb_config.cb_auth_conf),
98 timeout=cb_config.timeout,
99 verify=cb_config.verify_ssl,
100 )
101 if result.status_code == 200:
102 return result.json()
104 if result.status_code in cb_config.retry_error_codes:
105 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] "
106 f"Retryable error: {result.status_code}")
107 time.sleep(1) # wait a bit before retrying
108 continue
110 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] Failed with "
111 f"status {result.status_code}")
112 break
114 except requests.exceptions.ReadTimeout:
115 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] Timeout when "
116 f"fetching {url}")
117 except requests.exceptions.RequestException as err:
118 print(f"[Attempt {attempt}/{cb_config.num_request_retry}] Request error: "
119 f"{err}")
120 break
122 # Final error handling after all retries
123 print(f"Could not fetch {url}.")
124 print("You can either:")
125 print("* increase the timeout with the timeout parameter")
126 print("* decrease the query size with the query_size parameter")
127 print("* increase the retry count with the parameters (num_request_retry, "
128 "retry_error_codes)")
129 sys.exit(1)
132def get_single_item(cb_config: Config, item_id: int):
133 if not isinstance(item_id, int) or (item_id <= 0):
134 raise ValueError("item_id must be a positive integer")
135 url = f"{cb_config.base}/items/{item_id}"
136 return query_cb_single(cb_config, url)
139def get_many_items(cb_config: Config, item_ids: Set[int]):
140 assert isinstance(item_ids, set)
142 rv = []
144 page_id = 1
145 query_string = quote(f"item.id IN "
146 f"({','.join(str(item_id) for item_id in item_ids)})")
148 while True:
149 base_url = "%s/items/query?page=%u&pageSize=%u&queryString=%s"\
150 % (cb_config.base, page_id,
151 cb_config.page_size, query_string)
152 data = query_cb_single(cb_config, base_url)
153 rv += data["items"]
154 if len(rv) == data["total"]:
155 break
156 page_id += 1
158 return rv
161def get_query(cb_config: Config, query: Union[int, str]):
162 assert isinstance(query, (int, str))
163 rv = []
164 url = ""
165 page_id = 1
166 total_items = None
168 while total_items is None or len(rv) < total_items:
169 print("Fetching page %u of query..." % page_id)
170 if isinstance(query, int): 170 ↛ 176line 170 didn't jump to line 176 because the condition on line 170 was always true
171 url = ("%s/reports/%u/items?page=%u&pageSize=%u" %
172 (cb_config.base,
173 query,
174 page_id,
175 cb_config.page_size))
176 elif isinstance(query, str):
177 url = ("%s/items/query?page=%u&pageSize=%u&queryString=%s" %
178 (cb_config.base,
179 page_id,
180 cb_config.page_size,
181 query))
182 data = query_cb_single(cb_config, url)
183 assert len(data) == 4
185 if page_id == 1 and len(data["items"]) == 0:
186 # lobster-trace: codebeamer_req.Get_Query_Zero_Items_Message
187 print("This query doesn't generate items. Please check:")
188 print(" * is the number actually correct?")
189 print(" * do you have permissions to access it?")
190 print(f"You can try to access '{url}' manually to check.")
192 if page_id != data["page"]: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 raise CodebeamerError(f"Page mismatch in query result: expected page "
194 f"{page_id} from codebeamer, but got {data['page']}")
196 if page_id == 1:
197 total_items = data["total"]
198 elif total_items != data["total"]: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise CodebeamerError(f"Item count mismatch in query result: expected "
200 f"{total_items} items so far, but page "
201 f"{data['page']} claims to have sent {data['total']} "
202 f"items in total.")
204 if isinstance(query, int): 204 ↛ 207line 204 didn't jump to line 207 because the condition on line 204 was always true
205 rv += [to_lobster(cb_config, cb_item["item"])
206 for cb_item in data["items"]]
207 elif isinstance(query, str):
208 rv += [to_lobster(cb_config, cb_item)
209 for cb_item in data["items"]]
211 page_id += 1
213 assert total_items == len(rv)
215 return rv
218def get_schema_config(cb_config: Config) -> dict:
219 """
220 The function returns a schema map based on the schema mentioned
221 in the cb_config dictionary.
223 If there is no match, it raises a KeyError.
225 Positional arguments:
226 cb_config -- configuration dictionary containing the schema.
228 Returns:
229 A dictionary containing the namespace and class associated with the schema.
231 Raises:
232 KeyError -- if the provided schema is not supported.
233 """
234 schema_map = {
235 'requirement': {"namespace": "req", "class": Requirement},
236 'implementation': {"namespace": "imp", "class": Implementation},
237 'activity': {"namespace": "act", "class": Activity},
238 }
239 schema = cb_config.schema.lower()
241 if schema not in schema_map: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 raise KeyError(f"Unsupported SCHEMA '{schema}' provided in configuration.")
244 return schema_map[schema]
247def to_lobster(cb_config: Config, cb_item: dict):
248 assert isinstance(cb_item, dict) and "id" in cb_item
250 # This looks like it's business logic, maybe we should make this
251 # configurable?
253 categories = cb_item.get("categories")
254 if categories: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true
255 kind = categories[0].get("name", "codebeamer item")
256 else:
257 kind = "codebeamer item"
259 status = cb_item["status"].get("name", None) if "status" in cb_item else None
261 # Get item name. Sometimes items do not have one, in which case we
262 # come up with one.
263 if "name" in cb_item: 263 ↛ 266line 263 didn't jump to line 266 because the condition on line 263 was always true
264 item_name = cb_item["name"]
265 else:
266 item_name = "Unnamed item %u" % cb_item["id"]
268 schema_config = get_schema_config(cb_config)
270 # Construct the appropriate object based on 'kind'
271 common_params = _create_common_params(
272 schema_config["namespace"], cb_item,
273 cb_config.cb_auth_conf.root, item_name, kind)
274 item = _create_lobster_item(
275 schema_config["class"],
276 common_params, item_name, status)
278 if cb_config.references:
279 for displayed_name in cb_config.references:
280 if cb_item.get(displayed_name): 280 ↛ 285line 280 didn't jump to line 285 because the condition on line 280 was always true
281 item_references = cb_item.get(displayed_name) if (
282 isinstance(cb_item.get(displayed_name), list)) \
283 else [cb_item.get(displayed_name)]
284 else:
285 item_references = [value for custom_field
286 in cb_item["customFields"]
287 if custom_field["name"] == displayed_name and
288 custom_field.get("values")
289 for value in custom_field["values"]]
291 for value in item_references:
292 item.add_tracing_target(Tracing_Tag("req", str(value["id"])))
294 return item
297def _create_common_params(namespace: str, cb_item: dict, cb_root: str,
298 item_name: str, kind: str):
299 """
300 Creates and returns common parameters for a Codebeamer item.
301 Args:
302 namespace (str): Namespace for the tag.
303 cb_item (dict): Codebeamer item dictionary.
304 cb_root (str): Root URL or path of Codebeamer.
305 item_name (str): Name of the item.
306 kind (str): Type of the item.
307 Returns:
308 dict: Common parameters including tag, location, and kind.
309 """
310 return {
311 'tag': Tracing_Tag(
312 namespace=namespace,
313 tag=str(cb_item["id"]),
314 version=cb_item["version"]
315 ),
316 'location': Codebeamer_Reference(
317 cb_root=cb_root,
318 tracker=cb_item["tracker"]["id"],
319 item=cb_item["id"],
320 version=cb_item["version"],
321 name=item_name
322 ),
323 'kind': kind
324 }
327def _create_lobster_item(schema_class, common_params, item_name, status):
328 """
329 Creates and returns a Lobster item based on the schema class.
330 Args:
331 schema_class: Class of the schema (Requirement, Implementation, Activity).
332 common_params (dict): Common parameters for the item.
333 item_name (str): Name of the item.
334 status (str): Status of the item.
335 Returns:
336 Object: An instance of the schema class with the appropriate parameters.
337 """
338 if schema_class is Requirement: 338 ↛ 347line 338 didn't jump to line 347 because the condition on line 338 was always true
339 return Requirement(
340 **common_params,
341 framework="codebeamer",
342 text=None,
343 status=status,
344 name= item_name
345 )
347 elif schema_class is Implementation:
348 return Implementation(
349 **common_params,
350 language="python",
351 name= item_name,
352 )
354 else:
355 return Activity(
356 **common_params,
357 framework="codebeamer",
358 status=status
359 )
362def import_tagged(cb_config: Config, items_to_import: Set[int]):
363 assert isinstance(items_to_import, set)
364 rv = []
366 cb_items = get_many_items(cb_config, items_to_import)
367 for cb_item in cb_items:
368 l_item = to_lobster(cb_config, cb_item)
369 rv.append(l_item)
371 return rv
374def ensure_list(instance) -> List:
375 if isinstance(instance, list): 375 ↛ 377line 375 didn't jump to line 377 because the condition on line 375 was always true
376 return instance
377 return [instance]
380def update_authentication_parameters(
381 auth_conf: AuthenticationConfig,
382 netrc_path: Optional[str] = None):
383 if (auth_conf.token is None and 383 ↛ 385line 383 didn't jump to line 385 because the condition on line 383 was never true
384 (auth_conf.user is None or auth_conf.password is None)):
385 netrc_file = netrc_path or os.path.join(os.path.expanduser("~"),
386 ".netrc")
387 if os.path.isfile(netrc_file):
388 netrc_config = netrc.netrc(netrc_file)
389 machine = urlparse(auth_conf.root).hostname
390 auth = netrc_config.authenticators(machine)
391 if auth is not None:
392 print(f"Using .netrc login for {auth_conf.root}")
393 auth_conf.user, _, auth_conf.password = auth
394 else:
395 provided_machine = ", ".join(netrc_config.hosts.keys()) or "None"
396 raise KeyError(f"Error parsing .netrc file."
397 f"\nExpected '{machine}', but got '{provided_machine}'.")
399 if (auth_conf.token is None and 399 ↛ 401line 399 didn't jump to line 401 because the condition on line 399 was never true
400 (auth_conf.user is None or auth_conf.password is None)):
401 raise KeyError("Please add your token to the config file, "
402 "or use user and pass in the config file, "
403 "or configure credentials in the .netrc file.")
406def parse_yaml_config(file_name: str) -> Config:
407 """
408 Parses a YAML configuration file and returns a validated configuration dictionary.
410 Args:
411 file_name (str): Path to the YAML configuration file.
413 Returns:
414 Dict[str, Any]: Parsed and validated configuration.
416 Raises:
417 ValueError: If `file_name` is not a string.
418 FileNotFoundError: If the file does not exist.
419 KeyError: If required fields are missing or unsupported keys are present.
420 """
421 assert os.path.isfile(file_name)
423 with open(file_name, "r", encoding='utf-8') as file:
424 return parse_config_data(yaml.safe_load(file) or {})
427def parse_config_data(data: dict) -> Config:
428 # Validate supported keys
429 provided_config_keys = set(data.keys())
430 unsupported_keys = provided_config_keys - SupportedConfigKeys.as_set()
431 if unsupported_keys: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true
432 raise KeyError(
433 f"Unsupported config keys: {', '.join(unsupported_keys)}. "
434 f"Supported keys are: {', '.join(SupportedConfigKeys.as_set())}."
435 )
437 # create config object
438 config = Config(
439 references=ensure_list(data.get(SupportedConfigKeys.REFS.value, [])),
440 import_tagged=data.get(SupportedConfigKeys.IMPORT_TAGGED.value),
441 import_query=data.get(SupportedConfigKeys.IMPORT_QUERY.value),
442 verify_ssl=data.get(SupportedConfigKeys.VERIFY_SSL.value, False),
443 page_size=data.get(SupportedConfigKeys.PAGE_SIZE.value, 100),
444 schema=data.get(SupportedConfigKeys.SCHEMA.value, "Requirement"),
445 timeout=data.get(SupportedConfigKeys.TIMEOUT.value, 30),
446 out=data.get(SupportedConfigKeys.OUT.value),
447 num_request_retry=data.get(SupportedConfigKeys.NUM_REQUEST_RETRY.value, 5),
448 retry_error_codes=data.get(SupportedConfigKeys.RETRY_ERROR_CODES.value, []),
449 cb_auth_conf=AuthenticationConfig(
450 token=data.get(SupportedConfigKeys.CB_TOKEN.value),
451 user=data.get(SupportedConfigKeys.CB_USER.value),
452 password=data.get(SupportedConfigKeys.CB_PASS.value),
453 root=data.get(SupportedConfigKeys.CB_ROOT.value)
454 ),
455 )
457 # Ensure consistency of the configuration
458 if (not config.import_tagged) and (not config.import_query): 458 ↛ 459line 458 didn't jump to line 459 because the condition on line 458 was never true
459 raise KeyError(f"Either {SupportedConfigKeys.IMPORT_TAGGED.value} or "
460 f"{SupportedConfigKeys.IMPORT_QUERY.value} must be provided!")
462 if config.cb_auth_conf.root is None: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true
463 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must be provided!")
465 if not config.cb_auth_conf.root.startswith("https://"): 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 raise KeyError(f"{SupportedConfigKeys.CB_ROOT.value} must start with https://, "
467 f"but value is {config.cb_auth_conf.root}.")
469 return config
472ap = argparse.ArgumentParser(conflict_handler='resolve')
475@get_version(ap)
476def main():
477 # lobster-trace: codebeamer_req.Dummy_Requirement
478 ap.add_argument("--config",
479 help=("Path to YAML file with arguments, "
480 "by default (codebeamer-config.yaml) "
481 "supported references: '%s'" %
482 ', '.join(SupportedConfigKeys.as_set())),
483 default=os.path.join(os.getcwd(), "codebeamer-config.yaml"))
485 ap.add_argument("--out",
486 help=("Name of output file"),
487 default="codebeamer.lobster")
489 options = ap.parse_args()
491 mh = Message_Handler()
493 if not os.path.isfile(options.config): 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true
494 print((f"lobster-codebeamer: Config file '{options.config}' not found."))
495 return 1
497 cb_config = parse_yaml_config(options.config)
499 if cb_config.out is None: 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true
500 cb_config.out = options.out
502 update_authentication_parameters(cb_config.cb_auth_conf)
504 items_to_import = set()
506 if cb_config.import_tagged: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true
507 if not os.path.isfile(cb_config.import_tagged):
508 sys.exit(f"lobster-codebeamer: {cb_config.import_tagged} is not a file.")
509 items = {}
510 try:
511 lobster_read(mh = mh,
512 filename = cb_config.import_tagged,
513 level = "N/A",
514 items = items)
515 except LOBSTER_Error:
516 return 1
517 for item in items.values():
518 for tag in item.unresolved_references:
519 if tag.namespace != "req":
520 continue
521 try:
522 item_id = int(tag.tag, 10)
523 if item_id > 0:
524 items_to_import.add(item_id)
525 else:
526 mh.warning(item.location,
527 "invalid codebeamer reference to %i" %
528 item_id)
529 except ValueError:
530 pass
532 elif cb_config.import_query is not None: 532 ↛ 547line 532 didn't jump to line 547 because the condition on line 532 was always true
533 try:
534 if isinstance(cb_config.import_query, str): 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 if (cb_config.import_query.startswith("-") and
536 cb_config.import_query[1:].isdigit()):
537 ap.error("import_query must be a positive integer")
538 elif cb_config.import_query.startswith("-"):
539 ap.error("import_query must be a valid cbQL query")
540 elif cb_config.import_query == "":
541 ap.error("import_query must either be a query string or a query ID")
542 elif cb_config.import_query.isdigit():
543 cb_config.import_query = int(cb_config.import_query)
544 except ValueError as e:
545 ap.error(str(e))
547 try:
548 if cb_config.import_tagged: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 items = import_tagged(cb_config, items_to_import)
550 elif cb_config.import_query: 550 ↛ 555line 550 didn't jump to line 555 because the condition on line 550 was always true
551 items = get_query(cb_config, cb_config.import_query)
552 except LOBSTER_Error:
553 return 1
555 schema_config = get_schema_config(cb_config)
557 if cb_config.out is None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 with sys.stdout as fd:
559 lobster_write(fd, schema_config["class"], "lobster_codebeamer", items)
560 else:
561 with open(cb_config.out, "w", encoding="UTF-8") as fd:
562 lobster_write(fd, schema_config["class"], "lobster_codebeamer", items)
563 print(f"Written {len(items)} requirements to {cb_config.out}")
565 return 0
568if __name__ == "__main__":
569 sys.exit(main())