Source code for bridge.bootstrap.wiring

"""
Default wiring for the bridge: registers the bio.tools schema, the GitHub repo provider, and both directional pipelines.
Idempotent and guarded by bootstrap.state.
"""

import logging

from bridge.builders import compose_biotools_metadata, compose_github_repo
from bridge.pipelines import (
    BiotoolsToGitHubForPRPipelineArgs,
    GitHubToBiotoolsForMetaPipelineArgs,
    PipelineGoal,
    run_bt2gh_pipeline_for_pr_issues,
    run_gh2bt_pipeline_for_meta,
)
from bridge.services import GitHubRepoProvider

from .base import register_bridge, register_repo, register_schema
from .state import is_initialized, mark_initialized

logger = logging.getLogger(__name__)


[docs] def wiring(force: bool = False): """ Initialize the registry with schemas, repositories, and bridges. Parameters ---------- force : bool, optional If True, forces re-initialization even if already initialized. Defaults to False. """ if is_initialized() and not force: logger.info("Registry already initialized; skipping.") return register_schema("biotools", composer=compose_biotools_metadata) register_repo("github", composer=compose_github_repo, provider_cls=GitHubRepoProvider) register_bridge( "biotools", "github", pipelines={ PipelineGoal.CREATE_PR_ISSUES: (run_bt2gh_pipeline_for_pr_issues, BiotoolsToGitHubForPRPipelineArgs), PipelineGoal.EXTRACT_METADATA: (run_gh2bt_pipeline_for_meta, GitHubToBiotoolsForMetaPipelineArgs), }, ) logger.info("Registry initialization completed.") mark_initialized()