Skip to content

Sources API Reference

Auto-generated documentation from source code docstrings. Note these are low level source details; for user-facing source guides see the User Guide.

The base source class is a foundation for building specific data source integrations. It provides common functionality such as data fetching, updating, and state management.

Base Classes

UpdateState

UpdateState dataclass

Shared state container for the update process.

Source code in macrotrace/sources/base.py
@dataclass
class UpdateState:
    """Shared state container for the update process."""

    dataset: Dataset | None = None
    dataset_id: str | None = None
    source: str | None = None
    dataset_mode: str | None = None

    series: Series | None = None
    series_mode: str | None = None
    series_key: Dict | None = None

    release_start_date: datetime | None = None
    release_end_date: datetime | None = None

    new_dataset_dimensions: List[DatasetDimension] | None = None
    new_series_dimension_filters: List[SeriesDimensionFilter] | None = None

    new_releases: List[Release] | None = None
    new_release_dimensions: List[ReleaseDimension] | None = None

    new_observations: List[Observation] | None = None

APIClient

APIClient

Source code in macrotrace/sources/base.py
class APIClient:
    def __init__(
        self,
        base_url: str,
        cache_settings: Optional[Dict[str, Any]] = None,
        cache_path: Optional[str] = None,
    ):
        """
        Args:
            base_url (str): Base URL for the API
            cache_settings (Dict[str, Any], optional): Cache settings. Defaults to {"caching": True, "cache_expiry": 86400}.
            cache_path (str, optional): Path to the request-cache SQLite
                file. Resolution: this argument, then ``MACROTRACE_CACHE``,
                then beside ``MACROTRACE_DB`` if set, else
                ``MacroTraceRequestCache.sqlite`` in the current
                working directory.
        """
        self.user_agent = USER_AGENT
        self.base_url = base_url
        if cache_settings is None:
            cache_settings = {"caching": True, "cache_expiry": 86400}

        self.session = (
            requests.Session()
            if not cache_settings["caching"]
            else requests_cache.CachedSession(
                resolve_cache_path(cache_path),
                expire_after=cache_settings["cache_expiry"],
            )
        )
        logger.debug(
            f"Initialized APIClient with base_url={base_url}, caching={cache_settings['caching']}"
        )

    def _get_request_headers(self) -> Dict[str, Any]:
        """
        Get the request details for the API client.
        This method must be overridden by subclasses to provide specific details.

        Returns:
            Dict[str, Any]: A dictionary containing request headers.
        """
        raise NotImplementedError("Subclasses must implement this method.")

    def _get_default_params(self) -> Dict[str, str]:
        """
        Get the default parameters for the API client. For instance, API keys or file types.
        This method must be overridden by subclasses to provide specific parameters.

        Returns:
            Dict[str, str]: A dictionary containing default parameters.
        """
        raise NotImplementedError("Subclasses must implement this method.")

    @retry(
        stop=stop_after_attempt(4),
        wait=wait_exponential(max=30),
        before_sleep=before_sleep_log(logger, logging.WARNING),
        reraise=True,
    )
    def make_request(
        self, endpoint: str, params: Dict[str, Any] = {}
    ) -> Dict[str, Any]:
        """
        Make a request to the API endpoint. Note that this method does not handle pagination.

        Args:
            endpoint (str): API endpoint path
            params (Dict[str, Any], optional): Additional parameters for the request. To be merged with default parameters. Defaults to {}.

        Returns:
            Dict[str, Any]: JSON response from the API
        """
        headers = self._get_request_headers()
        # Merge user agent into headers
        headers["User-Agent"] = self.user_agent

        default_params = self._get_default_params()
        params = default_params | params

        logger.debug(
            f"Making API request to endpoint: {endpoint} with params: {params}"
        )

        resp = self.session.get(
            self.base_url + endpoint, headers=headers, params=params
        )

        # Check if response came from cache
        is_cached = getattr(resp, "from_cache", False)
        logger.debug(
            f"API response received: status={resp.status_code}, "
            f"cached={is_cached}, size={len(resp.content)} bytes"
        )

        resp.raise_for_status()
        return resp.json()

    def make_paginated_request(
        self,
        endpoint: str,
        params: Dict[str, Any] = {},
        limit_param: str = "limit",
        offset_param: str = "offset",
        limit: int = 1000,
        items_key: Optional[str] = None,
        max_pages: int = 50,
    ) -> List[Dict[str, Any]]:
        """
        Make paginated requests to the API endpoint using limit/offset pagination.
        Automatically fetches all pages when the number of returned items equals the limit.

        Args:
            endpoint (str): API endpoint path
            params (Dict[str, Any], optional): Additional parameters for the request. Defaults to {}.
            limit_param (str, optional): The parameter name for the page size limit. Defaults to "limit".
            offset_param (str, optional): The parameter name for the offset. Defaults to "offset".
            limit (int, optional): The page size for each request. Defaults to 1000.
            items_key (Optional[str], optional): The key in the response containing the items list.
                If None, assumes the entire response is the items list. Defaults to None.
            max_pages (int, optional): Maximum number of pages to fetch as a safety limit. Defaults to 50.

        Returns:
            List[Dict[str, Any]]: A combined list of all items from all pages

        Raises:
            RuntimeError: If max_pages is reached before pagination completes naturally
        """
        all_items = []

        logger.debug(
            f"Starting paginated request to endpoint: {endpoint} "
            f"(limit={limit}, limit_param={limit_param}, offset_param={offset_param}, max_pages={max_pages})"
        )

        for page_num in range(max_pages):
            offset = page_num * limit
            paginated_params = params.copy()
            paginated_params[limit_param] = limit
            paginated_params[offset_param] = offset

            response = self.make_request(endpoint, paginated_params)

            if items_key:
                items = response.get(items_key, [])
            else:
                items = response if isinstance(response, list) else []

            items_count = len(items)
            all_items.extend(items)

            logger.debug(
                f"Fetched {items_count} items at offset {offset} (page {page_num + 1}/{max_pages}) "
                f"(total accumulated: {len(all_items)})"
            )

            if items_count < limit:
                logger.debug(
                    f"Pagination complete: received {items_count} < {limit}, "
                    f"total items: {len(all_items)} across {page_num + 1} page(s)"
                )
                break
        else:
            logger.error(
                f"Reached maximum page limit ({max_pages}) for endpoint {endpoint}. "
                f"Fetched {len(all_items)} items. This may indicate an API issue."
            )
            raise RuntimeError(
                f"Pagination limit reached: max_pages={max_pages}. "
                f"Fetched {len(all_items)} items. This may indicate an API issue."
            )

        logger.info(
            f"Completed paginated request to {endpoint}: {len(all_items)} total items across {page_num + 1} page(s)"
        )
        return all_items

    def make_request_dry_run(
        self, endpoint: str, params: Dict[str, Any] = {}
    ) -> Tuple[str, Dict[str, Any]]:
        """
        Generate the request URL and parameters without making the actual request.
        Useful for debugging or logging purposes.

        Args:
            endpoint (str): API endpoint path
            params (Dict[str, Any], optional): Additional parameters for the request. To be merged with default parameters. Defaults to {}.
        Returns:
            Tuple[str, Dict[str, Any]]: The full request URL and the parameters.
        """
        default_params = self._get_default_params()
        merged_params = default_params | params
        prepared = self.session.prepare_request(
            requests.Request(
                method="GET",
                url=self.base_url + endpoint,
                params=merged_params,
            )
        )
        return prepared.url, merged_params

__init__(base_url, cache_settings=None, cache_path=None)

Parameters:

Name Type Description Default
base_url str

Base URL for the API

required
cache_settings Dict[str, Any]

Cache settings. Defaults to {"caching": True, "cache_expiry": 86400}.

None
cache_path str

Path to the request-cache SQLite file. Resolution: this argument, then MACROTRACE_CACHE, then beside MACROTRACE_DB if set, else MacroTraceRequestCache.sqlite in the current working directory.

None
Source code in macrotrace/sources/base.py
def __init__(
    self,
    base_url: str,
    cache_settings: Optional[Dict[str, Any]] = None,
    cache_path: Optional[str] = None,
):
    """
    Args:
        base_url (str): Base URL for the API
        cache_settings (Dict[str, Any], optional): Cache settings. Defaults to {"caching": True, "cache_expiry": 86400}.
        cache_path (str, optional): Path to the request-cache SQLite
            file. Resolution: this argument, then ``MACROTRACE_CACHE``,
            then beside ``MACROTRACE_DB`` if set, else
            ``MacroTraceRequestCache.sqlite`` in the current
            working directory.
    """
    self.user_agent = USER_AGENT
    self.base_url = base_url
    if cache_settings is None:
        cache_settings = {"caching": True, "cache_expiry": 86400}

    self.session = (
        requests.Session()
        if not cache_settings["caching"]
        else requests_cache.CachedSession(
            resolve_cache_path(cache_path),
            expire_after=cache_settings["cache_expiry"],
        )
    )
    logger.debug(
        f"Initialized APIClient with base_url={base_url}, caching={cache_settings['caching']}"
    )

make_request(endpoint, params={})

Make a request to the API endpoint. Note that this method does not handle pagination.

Parameters:

Name Type Description Default
endpoint str

API endpoint path

required
params Dict[str, Any]

Additional parameters for the request. To be merged with default parameters. Defaults to {}.

{}

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: JSON response from the API

Source code in macrotrace/sources/base.py
@retry(
    stop=stop_after_attempt(4),
    wait=wait_exponential(max=30),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)
def make_request(
    self, endpoint: str, params: Dict[str, Any] = {}
) -> Dict[str, Any]:
    """
    Make a request to the API endpoint. Note that this method does not handle pagination.

    Args:
        endpoint (str): API endpoint path
        params (Dict[str, Any], optional): Additional parameters for the request. To be merged with default parameters. Defaults to {}.

    Returns:
        Dict[str, Any]: JSON response from the API
    """
    headers = self._get_request_headers()
    # Merge user agent into headers
    headers["User-Agent"] = self.user_agent

    default_params = self._get_default_params()
    params = default_params | params

    logger.debug(
        f"Making API request to endpoint: {endpoint} with params: {params}"
    )

    resp = self.session.get(
        self.base_url + endpoint, headers=headers, params=params
    )

    # Check if response came from cache
    is_cached = getattr(resp, "from_cache", False)
    logger.debug(
        f"API response received: status={resp.status_code}, "
        f"cached={is_cached}, size={len(resp.content)} bytes"
    )

    resp.raise_for_status()
    return resp.json()

make_paginated_request(endpoint, params={}, limit_param='limit', offset_param='offset', limit=1000, items_key=None, max_pages=50)

Make paginated requests to the API endpoint using limit/offset pagination. Automatically fetches all pages when the number of returned items equals the limit.

Parameters:

Name Type Description Default
endpoint str

API endpoint path

required
params Dict[str, Any]

Additional parameters for the request. Defaults to {}.

{}
limit_param str

The parameter name for the page size limit. Defaults to "limit".

'limit'
offset_param str

The parameter name for the offset. Defaults to "offset".

'offset'
limit int

The page size for each request. Defaults to 1000.

1000
items_key Optional[str]

The key in the response containing the items list. If None, assumes the entire response is the items list. Defaults to None.

None
max_pages int

Maximum number of pages to fetch as a safety limit. Defaults to 50.

50

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A combined list of all items from all pages

Raises:

Type Description
RuntimeError

If max_pages is reached before pagination completes naturally

Source code in macrotrace/sources/base.py
def make_paginated_request(
    self,
    endpoint: str,
    params: Dict[str, Any] = {},
    limit_param: str = "limit",
    offset_param: str = "offset",
    limit: int = 1000,
    items_key: Optional[str] = None,
    max_pages: int = 50,
) -> List[Dict[str, Any]]:
    """
    Make paginated requests to the API endpoint using limit/offset pagination.
    Automatically fetches all pages when the number of returned items equals the limit.

    Args:
        endpoint (str): API endpoint path
        params (Dict[str, Any], optional): Additional parameters for the request. Defaults to {}.
        limit_param (str, optional): The parameter name for the page size limit. Defaults to "limit".
        offset_param (str, optional): The parameter name for the offset. Defaults to "offset".
        limit (int, optional): The page size for each request. Defaults to 1000.
        items_key (Optional[str], optional): The key in the response containing the items list.
            If None, assumes the entire response is the items list. Defaults to None.
        max_pages (int, optional): Maximum number of pages to fetch as a safety limit. Defaults to 50.

    Returns:
        List[Dict[str, Any]]: A combined list of all items from all pages

    Raises:
        RuntimeError: If max_pages is reached before pagination completes naturally
    """
    all_items = []

    logger.debug(
        f"Starting paginated request to endpoint: {endpoint} "
        f"(limit={limit}, limit_param={limit_param}, offset_param={offset_param}, max_pages={max_pages})"
    )

    for page_num in range(max_pages):
        offset = page_num * limit
        paginated_params = params.copy()
        paginated_params[limit_param] = limit
        paginated_params[offset_param] = offset

        response = self.make_request(endpoint, paginated_params)

        if items_key:
            items = response.get(items_key, [])
        else:
            items = response if isinstance(response, list) else []

        items_count = len(items)
        all_items.extend(items)

        logger.debug(
            f"Fetched {items_count} items at offset {offset} (page {page_num + 1}/{max_pages}) "
            f"(total accumulated: {len(all_items)})"
        )

        if items_count < limit:
            logger.debug(
                f"Pagination complete: received {items_count} < {limit}, "
                f"total items: {len(all_items)} across {page_num + 1} page(s)"
            )
            break
    else:
        logger.error(
            f"Reached maximum page limit ({max_pages}) for endpoint {endpoint}. "
            f"Fetched {len(all_items)} items. This may indicate an API issue."
        )
        raise RuntimeError(
            f"Pagination limit reached: max_pages={max_pages}. "
            f"Fetched {len(all_items)} items. This may indicate an API issue."
        )

    logger.info(
        f"Completed paginated request to {endpoint}: {len(all_items)} total items across {page_num + 1} page(s)"
    )
    return all_items

make_request_dry_run(endpoint, params={})

Generate the request URL and parameters without making the actual request. Useful for debugging or logging purposes.

Parameters:

Name Type Description Default
endpoint str

API endpoint path

required
params Dict[str, Any]

Additional parameters for the request. To be merged with default parameters. Defaults to {}.

{}

Returns: Tuple[str, Dict[str, Any]]: The full request URL and the parameters.

Source code in macrotrace/sources/base.py
def make_request_dry_run(
    self, endpoint: str, params: Dict[str, Any] = {}
) -> Tuple[str, Dict[str, Any]]:
    """
    Generate the request URL and parameters without making the actual request.
    Useful for debugging or logging purposes.

    Args:
        endpoint (str): API endpoint path
        params (Dict[str, Any], optional): Additional parameters for the request. To be merged with default parameters. Defaults to {}.
    Returns:
        Tuple[str, Dict[str, Any]]: The full request URL and the parameters.
    """
    default_params = self._get_default_params()
    merged_params = default_params | params
    prepared = self.session.prepare_request(
        requests.Request(
            method="GET",
            url=self.base_url + endpoint,
            params=merged_params,
        )
    )
    return prepared.url, merged_params