#!/usr/bin/env python3 """Gitea MCP Server — exposes Gitea operations as MCP tools. Runs over stdio. All tools authenticate via macOS keychain (git credential fill). Usage (standalone test): python3 mcp_server.py Configuration (mcp_config.json): "gitea-tools": { "command": "/Users/jasonwalker/Development/Gitea-Tools/venv/bin/python3", "args": ["/Users/jasonwalker/Development/Gitea-Tools/mcp_server.py"], "env": {} } """ import os import sys import subprocess # Resolve the project root. MCP clients must launch this script directly with # the venv interpreter (venv/bin/python3) — see the config example above. We do # NOT os.execv() to re-point the interpreter: replacing the process after the # client has already wired up the stdio pipes can desync the JSON-RPC transport # (observed with Antigravity/Cascade hosts). PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) # Ensure the project root is on the path so gitea_auth.py can be imported. if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) from mcp.server.fastmcp import FastMCP # noqa: E402 from gitea_auth import ( # noqa: E402 REMOTES, get_credentials, get_auth_header, api_request, repo_api_url, get_profile, ) mcp = FastMCP("gitea-tools", instructions=( "Gitea issue tracker and PR management for dadeschools and prgs instances. " "Use the gitea_ prefixed tools to create issues, PRs, list issues, etc." )) # ── Helpers ─────────────────────────────────────────────────────────────────── def _resolve(remote: str, host: str | None, org: str | None, repo: str | None): """Resolve remote + overrides to (host, org, repo).""" if remote not in REMOTES: raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}") profile = REMOTES[remote] return ( host or profile["host"], org or profile["org"], repo or profile["repo"], ) def _auth(host: str) -> str: """Get auth header, raise if unavailable.""" header = get_auth_header(host) if header is None: raise RuntimeError( f"No credentials for {host}. " "Ensure you've logged in via HTTPS at least once." ) return header # ── Tools ───────────────────────────────────────────────────────────────────── @mcp.tool() def gitea_create_issue( title: str, body: str = "", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Create a new issue on a Gitea repository. Args: title: Issue title (required). body: Issue body text. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'number' and 'url' of the created issue. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues" data = api_request("POST", url, auth, {"title": title, "body": body}) return {"number": data["number"], "url": data["html_url"]} @mcp.tool() def gitea_create_pr( title: str, head: str, base: str = "main", body: str = "", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Create a pull request on a Gitea repository. Args: title: PR title (required). head: Source branch name (required). base: Target branch (default: 'main'). body: PR description. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'number' and 'url' of the created PR. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls" payload = {"title": title, "body": body, "head": head, "base": base} data = api_request("POST", url, auth, payload) return {"number": data["number"], "url": data["html_url"]} @mcp.tool() def gitea_list_prs( state: str = "open", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list[dict]: """List pull requests on a Gitea repository. Args: state: State filter — 'open', 'closed', or 'all' (default: 'open'). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: List of dicts with 'number', 'title', 'state', 'head', 'base', 'url', 'mergeable'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls?state={state}" prs = api_request("GET", url, auth) or [] return [ { "number": pr["number"], "title": pr["title"], "state": pr["state"], "head": pr["head"]["ref"], "base": pr["base"]["ref"], "url": pr["html_url"], "mergeable": pr.get("mergeable"), } for pr in prs ] @mcp.tool() def gitea_view_pr( pr_number: int, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Get details of a single pull request. Args: pr_number: The pull request index/number. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with PR details. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}" pr = api_request("GET", url, auth) return { "number": pr["number"], "title": pr["title"], "body": pr.get("body", ""), "state": pr["state"], "head": pr["head"]["ref"], "base": pr["base"]["ref"], "url": pr["html_url"], "mergeable": pr.get("mergeable"), "user": pr.get("user", {}).get("login", ""), } # Actions whose eligibility this tool can evaluate. _ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge") @mcp.tool() def gitea_check_pr_eligibility( pr_number: int, action: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Read-only: is the current identity/profile eligible to perform *action* on a PR? Evaluates eligibility only — it NEVER reviews, approves, requests changes, merges, or mutates anything. It inspects the authenticated identity (via the /user endpoint), the active runtime profile metadata (``get_profile``), and the target PR (author, state, head SHA, mergeability), then returns a decision with clear reasons. Fail-closed rules: - Unknown action or unknown remote → not eligible. - Profile has no configured allowed operations, or the action is not in the profile's allowed operations (or is forbidden) → not eligible. - Authenticated identity cannot be determined → not eligible. - Authenticated user equals the PR author → not eligible to ``approve`` or ``merge``. - PR is not open → not eligible. - For ``merge``, PR must be reported mergeable. Never returns the token, Authorization header, or any credential material. Args: pr_number: Target PR number. action: One of 'review', 'approve', 'request_changes', 'merge'. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'eligible' (bool), the inputs inspected, and 'reasons'. """ action = (action or "").strip().lower() profile = get_profile() result = { "eligible": False, "requested_action": action, "authenticated_user": None, "profile_name": profile["profile_name"], "allowed_operations": profile["allowed_operations"], "pr_author": None, "pr_number": pr_number, "pr_state": None, "head_sha": None, "mergeable": None, "remote": remote if remote in REMOTES else None, "reasons": [], } reasons = result["reasons"] if action not in _ELIGIBILITY_ACTIONS: reasons.append( f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}" ) return result if remote not in REMOTES: reasons.append(f"unknown remote '{remote}'") return result # Profile capability check (metadata only; not enforcement of the action). allowed = profile["allowed_operations"] forbidden = profile["forbidden_operations"] if not allowed: reasons.append("profile has no configured allowed operations (fail closed)") if action in forbidden: reasons.append(f"profile forbids '{action}'") elif action not in allowed: reasons.append(f"profile is not allowed to {action}") h, o, r = _resolve(remote, host, org, repo) # Authenticated identity (read-only). Fail soft; never leak error/secret. try: auth = _auth(h) except Exception: auth = None auth_user = None if auth: try: who = api_request("GET", f"https://{h}/api/v1/user", auth) auth_user = (who or {}).get("login") except Exception: auth_user = None result["authenticated_user"] = auth_user if not auth_user: reasons.append("authenticated identity could not be determined") # PR facts (read-only GET; no mutation). pr_author = None pr_state = None if auth: try: pr = api_request( "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth ) pr_author = (pr or {}).get("user", {}).get("login") pr_state = (pr or {}).get("state") result["head_sha"] = ((pr or {}).get("head") or {}).get("sha") result["mergeable"] = (pr or {}).get("mergeable") except Exception: reasons.append("PR details could not be retrieved") else: reasons.append("PR details could not be retrieved (no credentials)") result["pr_author"] = pr_author result["pr_state"] = pr_state # PR must be open to act on. if pr_state is None: reasons.append("PR state unknown") elif pr_state != "open": reasons.append(f"PR is not open (state={pr_state})") # Self-author must not approve or merge their own PR. if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"): reasons.append("authenticated user is PR author") # Merge needs a positive mergeability signal. if action == "merge": if result["mergeable"] is False: reasons.append("PR is not mergeable") elif result["mergeable"] is None: reasons.append("PR mergeability unknown") result["eligible"] = len(reasons) == 0 if result["eligible"]: reasons.append("all eligibility checks passed") return result @mcp.tool() def gitea_edit_pr( pr_number: int, title: str | None = None, body: str | None = None, state: str | None = None, base: str | None = None, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Edit an existing pull request on a Gitea repository. Args: pr_number: The pull request index/number (required). title: New PR title. body: New PR description. state: New state — 'open' or 'closed'. base: Target branch name. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with success status and details of the edited PR. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}" payload = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body if state is not None: payload["state"] = state if base is not None: payload["base"] = base if not payload: raise ValueError("At least one field to edit (title, body, state, base) must be provided.") data = api_request("PATCH", url, auth, payload) return { "success": True, "number": data["number"], "title": data["title"], "body": data.get("body", ""), "state": data["state"], "url": data["html_url"], } @mcp.tool() def gitea_get_file( filepath: str, ref: str = "main", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Retrieve metadata and content of a file from a Gitea repository. Args: filepath: The path to the file in the repository (e.g. 'README.md' or 'src/main.py'). ref: The branch, tag, or commit hash to retrieve the file from (default: 'main'). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict containing 'name', 'path', 'sha', 'size', 'encoding', and 'content' (base64). """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) import urllib.parse encoded_path = urllib.parse.quote(filepath, safe="") url = f"{repo_api_url(h, o, r)}/contents/{encoded_path}?ref={ref}" data = api_request("GET", url, auth) return { "name": data.get("name", ""), "path": data.get("path", ""), "sha": data.get("sha", ""), "size": data.get("size", 0), "encoding": data.get("encoding", ""), "content": data.get("content", ""), } @mcp.tool() def gitea_commit_files( files: list[dict], message: str, branch: str | None = None, new_branch: str | None = None, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Commit changes to multiple files in a Gitea repository in a single atomic commit. Args: files: List of file operations. Each file dict must contain 'operation' ('create', 'update', 'delete', 'rename'), 'path', and 'content' (base64 encoded for create/update), and optionally 'sha' (required for update/delete) or 'from_path' (for rename). message: The commit message. branch: Optional existing branch to start/commit from. new_branch: Optional new branch name to create for this commit. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with success status and commit/branch information. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/contents" payload = { "files": files, "message": message, } if branch is not None: payload["branch"] = branch if new_branch is not None: payload["new_branch"] = new_branch data = api_request("POST", url, auth, payload) return { "success": True, "commit": data.get("commit", {}).get("sha", ""), "branch": data.get("branch", {}).get("name", ""), } @mcp.tool() def gitea_merge_pr( pr_number: int, do: str = "merge", title: str | None = None, message: str | None = None, force: bool = False, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Merge a Gitea pull request. Args: pr_number: The PR number to merge. do: Merge style — 'merge', 'squash', or 'rebase'. title: Optional merge title. message: Optional merge message. force: Force merge, ignoring status checks. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' and 'message'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge" payload = { "Do": do, "force_merge": force, } if title: payload["MergeTitleField"] = title if message: payload["MergeMessageField"] = message api_request("POST", url, auth, payload) return {"success": True, "message": f"PR #{pr_number} merged via '{do}'."} @mcp.tool() def gitea_review_pr( pr_number: int, event: str = "APPROVE", body: str = "", merge: bool = False, merge_method: str = "merge", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Submit a review on a Gitea pull request and optionally merge it. Args: pr_number: The PR number to review. event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'. body: Review body text / comment. merge: If True and event is 'APPROVE', automatically merge the PR. merge_method: Merge style to use if merging — 'merge', 'squash', or 'rebase'. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with success status and message. """ if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]: raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.") if merge_method not in ["merge", "squash", "rebase"]: raise ValueError(f"Invalid merge method: '{merge_method}'. Choose from 'merge', 'squash', 'rebase'.") h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) # 1. Fetch PR to get the latest head commit SHA (required for review payload) pr_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}" pr_data = api_request("GET", pr_url, auth) commit_sha = pr_data.get("head", {}).get("sha") if not commit_sha: raise RuntimeError(f"Could not find head commit SHA for PR #{pr_number}.") # 2. Submit the PR review review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews" payload = { "body": body, "event": event, "commit_id": commit_sha } api_request("POST", review_url, auth, payload) msg = f"Successfully submitted review for PR #{pr_number} with event '{event}'." # 3. Merge PR if merge is True and event is APPROVE if merge: if event != "APPROVE": msg += " Warning: Skipping merge because review event is not 'APPROVE'." else: merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge" merge_payload = { "Do": merge_method, "force_merge": False } api_request("POST", merge_url, auth, merge_payload) msg += f" Successfully merged PR #{pr_number} using '{merge_method}' method." return {"success": True, "message": msg} @mcp.tool() def gitea_delete_branch( branch: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Delete a remote branch from a Gitea repository. Args: branch: The remote branch name (e.g. 'feat/branch-name'). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' and 'message'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) import urllib.parse encoded_branch = urllib.parse.quote(branch, safe="") url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}" api_request("DELETE", url, auth) return {"success": True, "message": f"Remote branch '{branch}' deleted."} @mcp.tool() def gitea_close_issue( issue_number: int, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Close an issue by setting its state to 'closed'. Args: issue_number: The issue number to close. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' boolean and 'message'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues/{issue_number}" api_request("PATCH", url, auth, {"state": "closed"}) return {"success": True, "message": f"Issue #{issue_number} closed."} @mcp.tool() def gitea_list_issues( state: str = "open", label: str | None = None, limit: int = 50, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list[dict]: """List issues on a Gitea repository with optional filters. Args: state: Filter by state — 'open', 'closed', or 'all'. label: Filter by label name (e.g. 'important'). limit: Max number of issues to return (default: 50). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: List of dicts with 'number', 'title', 'state', 'labels', 'assignee'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) params = f"state={state}&limit={limit}&type=issues" if label: params += f"&labels={label}" url = f"{repo_api_url(h, o, r)}/issues?{params}" issues = api_request("GET", url, auth) return [ { "number": i["number"], "title": i["title"], "state": i["state"], "labels": [lb["name"] for lb in i.get("labels", [])], "assignee": (i.get("assignee") or {}).get("login", ""), } for i in issues ] @mcp.tool() def gitea_view_issue( issue_number: int, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Get full details of a single issue. Args: issue_number: The issue number to view. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues/{issue_number}" i = api_request("GET", url, auth) return { "number": i["number"], "title": i["title"], "body": i.get("body", ""), "state": i["state"], "labels": [lb["name"] for lb in i.get("labels", [])], "assignee": (i.get("assignee") or {}).get("login", ""), "url": i["html_url"], } @mcp.tool() def gitea_whoami( remote: str = "dadeschools", host: str | None = None, ) -> dict: """Look up the Gitea account the MCP server is authenticated as. Read-only. Calls Gitea's authenticated-user endpoint (GET /api/v1/user) with the configured token and returns safe identity metadata only. Use this to prove which account a mutating workflow (e.g. review/merge) would act as, so self-review/self-merge can be detected before acting. Never returns the token, Authorization header, password, or any other secret material. Fails closed with a clear error if the identity cannot be determined. Args: remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. Returns: dict with 'authenticated', 'username', 'display_name', 'user_id', 'email', 'server', 'remote', and 'profile' (safe runtime profile metadata: profile_name + allowed_operations; never the token). """ if remote not in REMOTES: raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}") h = host or REMOTES[remote]["host"] auth = _auth(h) url = f"https://{h}/api/v1/user" data = api_request("GET", url, auth) if not data or not data.get("login"): # Fail closed: never assume an identity we could not verify. raise RuntimeError( f"Could not determine the authenticated Gitea identity for {h}. " "Verify the configured token is valid for this instance." ) # Runtime profile metadata is non-secret (name + allowed op categories). # The token is resolved separately and is never included here. profile = get_profile() return { "authenticated": True, "username": data.get("login"), "display_name": data.get("full_name") or None, "user_id": data.get("id"), "email": data.get("email") or None, "server": f"https://{h}", "remote": remote, "profile": { "profile_name": profile["profile_name"], "allowed_operations": profile["allowed_operations"], }, } @mcp.tool() def gitea_get_profile( remote: str = "dadeschools", host: str | None = None, resolve_identity: bool = True, ) -> dict: """Describe the active Gitea MCP execution profile for this runtime. Read-only. Reports the non-secret configuration of the running MCP process (profile name, allowed/forbidden operation categories, audit label, token *source name*, base URL) plus the resolved server for the given remote. Optionally resolves the authenticated username via ``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as. This tool never mutates Gitea and never approves, merges, comments, or creates anything. It never returns the token value, Authorization header, password, raw environment, or credential file paths. Identity resolution fails soft: if it cannot be determined, ``authenticated_username`` is null and ``identity_status`` marks it, but the profile config is still returned. Args: remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. resolve_identity: If True, attempt a read-only identity lookup. Returns: dict of safe profile metadata. ``identity_status`` is one of 'verified', 'unknown', 'unavailable', or 'not_resolved'. """ profile = get_profile() result = { "profile_name": profile["profile_name"], "allowed_operations": profile["allowed_operations"], "forbidden_operations": profile["forbidden_operations"], "audit_label": profile["audit_label"], "token_source_name": profile["token_source_name"], "base_url": profile["base_url"], "remote": remote if remote in REMOTES else None, "server": None, "authenticated_username": None, "identity_status": "not_resolved", } if remote not in REMOTES: # Mark ambiguity rather than raising: the tool stays inspectable. result["identity_status"] = "unknown" result["remote_error"] = f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}" return result h = host or REMOTES[remote]["host"] result["server"] = f"https://{h}" if resolve_identity: try: auth = _auth(h) data = api_request("GET", f"https://{h}/api/v1/user", auth) login = (data or {}).get("login") if login: result["authenticated_username"] = login result["identity_status"] = "verified" else: result["identity_status"] = "unknown" except Exception: # Fail soft for the identity field only. Never surface the error # detail or any credential material — just mark it unavailable. result["identity_status"] = "unavailable" return result @mcp.tool() def gitea_mark_issue( issue_number: int, action: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Claim or release an issue via the status:in-progress label. This is the cross-agent lock mechanism. Check before starting work. Args: issue_number: The issue number. action: 'start' to claim (add label) or 'done' to release (remove label). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' boolean and 'message'. """ if action not in ("start", "done"): raise ValueError(f"action must be 'start' or 'done', got '{action}'") h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) # Find the status:in-progress label id labels = api_request("GET", f"{base}/labels?limit=100", auth) label_id = None for lb in labels: if lb["name"] == "status:in-progress": label_id = lb["id"] break if label_id is None: raise RuntimeError( "Label 'status:in-progress' not found. " "Run manage_labels.py to create it first." ) if action == "start": api_request("POST", f"{base}/issues/{issue_number}/labels", auth, {"labels": [label_id]}) return {"success": True, "message": f"Issue #{issue_number} claimed."} else: api_request("DELETE", f"{base}/issues/{issue_number}/labels/{label_id}", auth) return {"success": True, "message": f"Issue #{issue_number} released."} @mcp.tool() def gitea_list_labels( remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list: """List all available labels in a Gitea repository. Args: remote: Known Gitea instance ('dadeschools' or 'prgs'). host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: list of labels. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) return api_request("GET", f"{base}/labels?limit=100", auth) @mcp.tool() def gitea_create_label( name: str, color: str, description: str = "", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Create a new label on a Gitea repository. Args: name: Name of the label (e.g. 'bug', 'epic'). color: HTML color code (hex, e.g. 'fbca04' or '#fbca04'). description: Description of the label. remote: Known Gitea instance ('dadeschools' or 'prgs'). host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict containing the created label details. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) if color.startswith("#"): color = color[1:] payload = { "name": name, "color": color, "description": description, } return api_request("POST", f"{base}/labels", auth, payload) @mcp.tool() def gitea_set_issue_labels( issue_number: int, labels: list[str], remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list: """Replace all labels on a Gitea issue with a new list of label names. Args: issue_number: The issue number. labels: List of label names to apply. remote: Known Gitea instance ('dadeschools' or 'prgs'). host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: list of all labels currently applied to the issue. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) # 1. Fetch existing labels on the repo to resolve names -> IDs existing = api_request("GET", f"{base}/labels?limit=100", auth) name_to_id = {lb["name"]: lb["id"] for lb in existing} # 2. Check if any requested labels do not exist, and raise error label_ids = [] missing_labels = [] for name in labels: if name in name_to_id: label_ids.append(name_to_id[name]) else: missing_labels.append(name) if missing_labels: raise RuntimeError( f"The following labels do not exist on the repository: {missing_labels}. " "Please create them first using gitea_create_label." ) # 3. PUT the labels to the issue res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids}) return res @mcp.tool() def gitea_mirror_refs( apply: bool = False, force: bool = False, ) -> dict: """Mirror branches and tags between dadeschools and prgs Timesheet repos. Additive only — never deletes branches or tags. Diverged branches are skipped unless force is True. Args: apply: If True, actually push. If False (default), dry-run only. force: If True, force-push diverged branches. Returns: dict with 'output' (script stdout) and 'return_code'. """ script = os.path.join(PROJECT_ROOT, "mirror_refs.sh") args = [script] if apply: args.append("--apply") if force: args.append("--force") result = subprocess.run( args, capture_output=True, text=True, timeout=120, ) return { "output": result.stdout + result.stderr, "return_code": result.returncode, } # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": mcp.run(transport="stdio")