Source code for bridge.services.europe_pmc.europe_pmc_ingestor

"""
Async client for the Europe PMC API.
Fetches a publication by PMID/PMCID/DOI and returns the first matching record.
Raises EuropePMCNotFoundError if no record is found.
"""

import logging
from typing import Any

import httpx

from bridge.config import settings
from bridge.services.protocols import Ingestor

from .europe_pmc_auth import get_europe_pmc_headers

logger = logging.getLogger(__name__)


[docs] class EuropePMCNotFoundError(Exception): """Raised when no Europe PMC record matches the given identifiers."""
[docs] class EuropePMCIngestor(Ingestor): """ Ingest a publication record from Europe PMC using one or more identifiers. Parameters ---------- pmid : str, optional PubMed identifier (e.g., "36173614"). pmcid : str, optional PubMed Central identifier (e.g., "PMC9903320"). doi : str, optional Digital Object Identifier (e.g., "10.1021/acs.jproteome.2c00457"). """ def __init__(self, pmid: str | None = None, pmcid: str | None = None, doi: str | None = None): if not any([pmid, pmcid, doi]): raise ValueError("Provide at least one of: pmid, pmcid, doi.") self.pmid = pmid self.pmcid = pmcid self.doi = doi self._timeout = getattr(settings, "http_timeout_seconds", 10) def _build_queries(self) -> list[str]: queries: list[str] = [] if self.pmid: queries.append(f"EXT_ID:{self.pmid} AND SRC:MED") if self.pmcid: queries.append(f"EXT_ID:{self.pmcid} AND SRC:PMC") if self.doi: # Try both DOI: and EXT_ID:"<doi>" variants queries.append(f"DOI:{self.doi}") queries.append(f'EXT_ID:"{self.doi}"') return queries async def _get(self, params: dict[str, Any]) -> dict[str, Any]: default_params = {"format": "json", "resultType": "core", "pageSize": 1} all_params = {**default_params, **params} base = f"{settings.europepmc_api_base}/search" try: async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.get(base, params=all_params, headers=get_europe_pmc_headers()) resp.raise_for_status() return resp.json() except httpx.RequestError as e: logger.error(f"Network error while querying Europe PMC: {e}") raise except httpx.HTTPStatusError as e: logger.warning(f"HTTP {e.response.status_code} from Europe PMC for params={all_params}") raise @staticmethod def _first_result(payload: dict[str, Any]) -> dict[str, Any] | None: results = payload.get("resultList", {}).get("result") or [] return results[0] if results else None
[docs] async def fetch(self) -> dict[str, Any]: """ Fetch the Europe PMC record matching the provided identifiers. Returns ------- dict The first matching Europe PMC record. Raises ------ EuropePMCNotFoundError If no record matches the provided identifiers. httpx.RequestError, httpx.HTTPStatusError For network/HTTP issues. """ queries = self._build_queries() logger.debug(f"EuropePMCIngestor queries: {queries}") for q in queries: logger.debug(f"Querying Europe PMC with: {q}") data = await self._get({"query": q}) rec = self._first_result(data) if rec: logger.info("Found Europe PMC record") return rec msg = f"No Europe PMC record found for identifiers pmid={self.pmid!r}, pmcid={self.pmcid!r}, doi={self.doi!r}" logger.info(msg) raise EuropePMCNotFoundError(msg)