"""
Repository provider for GitHub.
Forks repos, clones to a temp directory, applies file changes,
pushes branches, and creates pull requests via the GitHub API.
"""
import asyncio
import logging
import os
import shutil
import subprocess
import tempfile
from contextlib import contextmanager
import httpx
from bridge.config import settings
from bridge.services.protocols import ForkInfo, RepoProvider
from .github_auth import get_github_headers
logger = logging.getLogger(__name__)
[docs]
class GitHubRepoProvider(RepoProvider):
"""
Provide GitHub repository operations.
"""
def __init__(self):
settings.require_github_token()
self._login: str | None = None
logger.debug("GitHubRepoProvider initialized (token verified).")
async def _get_authenticated_login(self) -> str:
"""
Return the login of the authenticated GitHub user (cached).
Returns
-------
str
GitHub username of the authenticated user.
"""
if self._login is not None:
return self._login
base = settings.github_api_base
url = f"{base}/user"
headers = get_github_headers()
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
data = response.json()
self._login = data["login"]
logger.debug(f"Authenticated as GitHub user '{self._login}'")
return self._login
async def _get_existing_fork(self, source_owner: str, source_repo: str) -> ForkInfo | None:
"""
Return ForkInfo for an existing fork of `source_owner/source_repo` owned by
the authenticated user, or None if it does not exist.
Parameters
----------
source_owner : str
Owner of the source repository.
source_repo : str
Name of the source repository.
Returns
-------
ForkInfo | None
ForkInfo of the existing fork, or None if not found.
"""
login = await self._get_authenticated_login()
base = settings.github_api_base
url = f"{base}/repos/{login}/{source_repo}"
headers = get_github_headers()
logger.debug(f"Checking for existing fork {login}/{source_repo} of {source_owner}/{source_repo}")
async with httpx.AsyncClient(timeout=10) as client:
response = await client.get(url, headers=headers)
if response.status_code == 404:
logger.debug("No existing fork found.")
return None
response.raise_for_status()
data = response.json()
if not data.get("fork"):
logger.debug(f"Repo {login}/{source_repo} exists but is not a fork.")
return None
parent_full_name = data.get("parent", {}).get("full_name")
if parent_full_name != f"{source_owner}/{source_repo}":
logger.debug(
f"Repo {login}/{source_repo} is a fork, but parent is {parent_full_name}, "
f"not {source_owner}/{source_repo}."
)
return None
fork_info = ForkInfo(
full_name=data["full_name"],
owner=data["owner"]["login"],
repo=data["name"],
)
logger.info(f"Reusing existing fork: {fork_info.full_name}")
return fork_info
async def _delete_repo(self, owner: str, repo: str) -> None:
"""
Delete a GitHub repository.
NOTE: This is destructive.
Parameters
----------
owner : str
Owner of the repository.
repo : str
Name of the repository.
"""
base = settings.github_api_base
url = f"{base}/repos/{owner}/{repo}"
headers = get_github_headers()
logger.warning(f"Deleting repo {owner}/{repo}")
async with httpx.AsyncClient(timeout=10) as client:
response = await client.delete(url, headers=headers)
# 204: deleted; 404: already gone -> both OK
if response.status_code not in (204, 404):
try:
response.raise_for_status()
except httpx.HTTPStatusError as e:
logger.error(f"Failed to delete repo {owner}/{repo}: {e}")
raise
logger.info(f"Repo {owner}/{repo} deleted or did not exist.")
[docs]
async def fork(
self, owner: str, repo: str, replace_existing: bool = True, wait_ready: bool = True, max_wait: int = 20
) -> ForkInfo:
"""
Fork a GitHub repository (or return an existing fork).
Parameters
----------
owner : str
The owner of the repository to fork.
repo : str
The name of the repository to fork.
replace_existing : bool
Whether to delete an existing fork (if present) before creating a new one. Default is True.
wait_ready : bool
Whether to wait until the fork is fully ready. Default is True.
max_wait : int
Maximum number of seconds to wait for the fork to become ready. Default is 20.
Returns
-------
dict
JSON metadata for the forked repository.
Raises
------
HTTPError
If the fork operation fails.
"""
existing_fork = await self._get_existing_fork(owner, repo)
if not replace_existing and existing_fork is not None:
return existing_fork
if replace_existing and existing_fork is not None:
await self._delete_repo(existing_fork.owner, existing_fork.repo)
base = settings.github_api_base
url = f"{base}/repos/{owner}/{repo}/forks"
headers = get_github_headers()
logger.info(f"Forking repo {owner}/{repo} (wait_ready={wait_ready}, max_wait={max_wait}s)")
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, headers=headers)
response.raise_for_status()
fork_data = response.json()
fork_full_name = fork_data["full_name"]
fork_owner = fork_data["owner"]["login"]
fork_repo = fork_data["name"]
fork_info = ForkInfo(full_name=fork_full_name, owner=fork_owner, repo=fork_repo)
logger.info(f"Fork created: {fork_full_name}")
if not wait_ready:
return fork_info
# poll until fork is available (but don't fail hard if slow)
for i in range(max_wait):
check = await client.get(f"{settings.github_api_url}repos/{fork_full_name}", headers=headers)
if check.status_code == 200:
logger.debug(f"Fork {fork_full_name} became ready after {i + 1}s.")
return fork_info
await asyncio.sleep(1)
# if still not ready, just warn and continue with the initial response
logger.warning(f"Fork {fork_full_name} not fully ready after {max_wait}s, returning initial data.")
return fork_info
except httpx.HTTPStatusError as e:
logger.error(f"GitHub API error while forking {owner}/{repo}: {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error while forking {owner}/{repo}: {e}")
raise
[docs]
@contextmanager
def clone_context(self, repo_full_name: str):
"""
Use context manager to clone a GitHub repo into a temp dir and delete it afterward.
Parameters
----------
repo_full_name : str
Full name of the repository (e.g., "owner/repo").
"""
tmp_dir = tempfile.mkdtemp()
repo_url = f"https://{settings.github_token}:x-oauth-basic@github.com/{repo_full_name}.git"
logger.info(f"Cloning repo {repo_full_name} into temp dir {tmp_dir}")
try:
subprocess.run(["git", "clone", repo_url, tmp_dir], check=True)
logger.debug(f"Repo {repo_full_name} cloned successfully.")
yield tmp_dir
except subprocess.CalledProcessError as e:
logger.error(f"Git clone failed for {repo_full_name}: {e}")
raise
finally:
logger.debug(f"Cleaning up temp dir {tmp_dir}")
shutil.rmtree(tmp_dir, ignore_errors=True)
[docs]
def apply_changes_and_push(self, repo_path: str, branch_name: str, file_changes: dict):
"""
Create a new branch from a local cloned repo, apply changes, and push.
Parameters
----------
repo_path : str
Path to the local cloned GitHub repository.
branch_name : str
Name of the new branch to create.
file_changes : dict
Dictionary mapping file paths to their new content.
"""
logger.info(f"Applying changes to {repo_path} on new branch {branch_name}")
try:
os.chdir(repo_path)
subprocess.run(["git", "checkout", "-b", branch_name], check=True)
for path, content in file_changes.items():
full_path = os.path.join(repo_path, path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(content)
subprocess.run(["git", "add", path], check=True)
logger.debug(f"Added file: {path}")
subprocess.run(["git", "commit", "-m", "Automated update via script"], check=True)
subprocess.run(["git", "push", "--set-upstream", "origin", branch_name], check=True)
logger.info(f"Branch {branch_name} pushed successfully for repo at {repo_path}")
except subprocess.CalledProcessError as e:
logger.error(f"Git command failed: {e.cmd}: {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error while applying changes to {repo_path}: {e}")
raise
[docs]
async def create_pull_request(
self, owner: str, repo: str, title: str, body: str, head_branch: str, base_branch: str
) -> dict:
"""
Create a pull request via GitHub REST API.
Parameters
----------
owner : str
GitHub user or organization that owns the repository.
repo : str
Repository name.
title : str
Title of the pull request.
body : str
Description of the pull request.
head_branch : str
Name of the branch with the proposed changes.
base_branch : str, optional
Target branch to merge into (default is 'main').
Returns
-------
dict
JSON response from the GitHub API representing the created pull request.
Raises
------
HTTPError
If the API request fails.
"""
base = settings.github_api_base
url = f"{base}/repos/{owner}/{repo}/pulls"
data = {
"title": title,
"body": body,
"head": head_branch,
"base": base_branch,
}
headers = get_github_headers()
logger.info(f"Creating PR on {owner}/{repo}: '{title}' (head={head_branch}, base={base_branch})")
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 422:
logger.warning(f"PR creation returned 422 for {owner}/{repo}: {response.text}")
response.raise_for_status()
pr_data = response.json()
logger.info(f"Pull request created successfully: {pr_data.get('html_url', 'unknown URL')}")
return pr_data
except httpx.HTTPStatusError as e:
logger.error(f"GitHub API PR creation failed: {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error creating PR for {owner}/{repo}: {e}")
raise
[docs]
async def create_issue(
self,
owner: str,
repo: str,
title: str,
body: str = "",
labels: list[str] | None = None,
assignees: list[str] | None = None,
) -> dict:
"""
Create a new issue on a GitHub repository.
Parameters
----------
owner : str
GitHub user or organization that owns the repository.
repo : str
Repository name.
title : str
Title of the issue.
body : str, optional
Description of the issue.
labels : list of str, optional
List of label names to assign to the issue.
assignees : list of str, optional
List of GitHub usernames to assign to the issue.
Returns
-------
dict
JSON response from the GitHub API representing the created issue.
Raises
------
HTTPError
If the API request fails.
"""
base = settings.github_api_base
url = f"{base}/repos/{owner}/{repo}/issues"
headers = get_github_headers()
data = {"title": title}
if body:
data["body"] = body
if labels:
data["labels"] = labels
if assignees:
data["assignees"] = assignees
logger.info(f"Creating issue on {owner}/{repo}: '{title}'")
try:
async with httpx.AsyncClient(timeout=10) as client:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 422:
logger.warning(f"Issue creation returned 422 for {owner}/{repo}: {response.text}")
response.raise_for_status()
issue_data = response.json()
logger.info(f"Issue created successfully: {issue_data.get('html_url', 'unknown URL')}")
return issue_data
except httpx.HTTPStatusError as e:
logger.error(f"GitHub API issue creation failed for {owner}/{repo}: {e}")
raise
except Exception as e:
logger.exception(f"Unexpected error while creating issue on {owner}/{repo}: {e}")
raise