#!/usr/bin/env python3 """Gitea MCP Server — exposes Gitea operations as MCP tools. Runs over stdio. All tools authenticate via macOS keychain (git credential fill). Usage (standalone test): python3 mcp_server.py Configuration (mcp_config.json): "gitea-tools": { "command": "/Users/jasonwalker/Development/Gitea-Tools/venv/bin/python3", "args": ["/Users/jasonwalker/Development/Gitea-Tools/mcp_server.py"], "env": {} } """ import os import re import sys import functools import contextlib import subprocess # Resolve the project root. MCP clients must launch this script directly with # the venv interpreter (venv/bin/python3) — see the config example above. We do # NOT os.execv() to re-point the interpreter: replacing the process after the # client has already wired up the stdio pipes can desync the JSON-RPC transport # (observed with Antigravity/Cascade hosts). PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) # Ensure the project root is on the path so gitea_auth.py can be imported. if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) from mcp.server.fastmcp import FastMCP # noqa: E402 from gitea_auth import ( # noqa: E402 REMOTES, get_credentials, get_auth_header, api_request, api_get_all, repo_api_url, get_profile, ) import gitea_audit # noqa: E402 import gitea_config # noqa: E402 def _reveal_endpoints() -> bool: """Admin/debug opt-in (#120): include endpoint URLs and token source names in tool output. Off by default so normal LLM-facing responses expose only logical names and status. Never affects token values, which are excluded on every path.""" return (os.environ.get("GITEA_MCP_REVEAL_ENDPOINTS") or "").strip().lower() \ in ("1", "true", "yes") mcp = FastMCP("gitea-tools", instructions=( "Gitea issue tracker and PR management for dadeschools and prgs instances. " "Use the gitea_ prefixed tools to create issues, PRs, list issues, etc." )) def extract_linked_issue_numbers(text: str | None, branch_name: str | None = None) -> list[int]: issues = set() if text: pattern = re.compile(r'(?i)(?:close[sd]?|fix(?:e[sd])?|resolve[sd]?|implement[s]?|implemented)\s+#(\d+)') issues.update(int(m) for m in pattern.findall(text)) if branch_name: pattern = re.compile(r'(?i)issue-(\d+)') issues.update(int(m) for m in pattern.findall(branch_name)) return sorted(list(issues)) def release_in_progress_label(issue_numbers: list[int], remote: str, host: str | None, org: str | None, repo: str | None) -> dict: if not issue_numbers: return {} h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) try: labels = api_request("GET", f"{base}/labels?limit=100", auth) label_id = None for lb in labels: if lb["name"] == "status:in-progress": label_id = lb["id"] break except Exception as exc: return {num: f"error fetching repo labels: {_redact(str(exc))}" for num in issue_numbers} results = {} if label_id is None: for num in issue_numbers: results[num] = "not present" return results for num in issue_numbers: try: url = f"{base}/issues/{num}" issue_data = api_request("GET", url, auth) issue_labels = [lb["name"] for lb in issue_data.get("labels", [])] if "status:in-progress" in issue_labels: with _audited("release_in_progress_label", host=h, remote=remote, org=o, repo=r, issue_number=num, request_metadata={"action": "remove status:in-progress"}): api_request("DELETE", f"{url}/labels/{label_id}", auth) results[num] = "released" else: results[num] = "not present" except Exception as exc: results[num] = f"error: {_redact(str(exc))}" return results def cleanup_in_progress_for_pr(pr_payload: dict, remote: str, host: str | None, org: str | None, repo: str | None) -> dict: body = pr_payload.get("body") or "" title = pr_payload.get("title") or "" branch = pr_payload.get("head", {}).get("ref") or "" text = f"{title}\n{body}" issues = extract_linked_issue_numbers(text, branch) if not issues: return {"cleanup_status": "no linked issue found"} results = release_in_progress_label(issues, remote, host, org, repo) return {"cleanup_status": results} # ── Helpers ─────────────────────────────────────────────────────────────────── def _resolve(remote: str, host: str | None, org: str | None, repo: str | None): """Resolve remote + overrides to (host, org, repo).""" if remote not in REMOTES: raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}") profile = REMOTES[remote] return ( host or profile["host"], org or profile["org"], repo or profile["repo"], ) def _auth(host: str) -> str: """Get auth header, raise if unavailable.""" header = get_auth_header(host) if header is None: raise RuntimeError( f"No credentials for {host}. " "Ensure you've logged in via HTTPS at least once." ) return header # ── Audit logging (#18) ───────────────────────────────────────────────────────── # Mutating actions emit a structured audit record (profile + authenticated user # + outcome) when GITEA_AUDIT_LOG is configured. When it is not, every helper # below short-circuits and performs NO work — no extra API calls, no I/O — so # existing tool behaviour and API call sequences are unchanged. _UNSET = object() # Best-effort identity cache keyed by host, so an enabled audit trail resolves # the authenticated username at most once per host per process. _IDENTITY_CACHE: dict = {} def _authenticated_username(host: str): """Resolve the authenticated Gitea username for *host* (cached, fail-soft). Read-only. Returns None if the identity cannot be determined; never raises and never surfaces credential material. """ if host in _IDENTITY_CACHE: return _IDENTITY_CACHE[host] user = None try: header = get_auth_header(host) if header: who = api_request("GET", f"https://{host}/api/v1/user", header) user = (who or {}).get("login") except Exception: user = None _IDENTITY_CACHE[host] = user return user def _audit(action: str, *, host, remote, result, org=None, repo=None, reason=None, request_metadata=None, issue_number=None, pr_number=None, target_branch=None, head_sha=None, username=_UNSET): """Emit one audit record for a mutating action. No-op unless auditing is on. Never raises — auditing must not break the action it records. """ if not gitea_audit.audit_enabled(): return try: profile = get_profile() if username is _UNSET: username = _authenticated_username(host) if host else None event = gitea_audit.build_event( action=action, result=result, remote=remote, server=(f"https://{host}" if host else None), repository=repo, issue_number=issue_number, pr_number=pr_number, profile_name=profile["profile_name"], audit_label=profile["audit_label"], authenticated_username=username, target_branch=target_branch, head_sha=head_sha, reason=reason, request_metadata=request_metadata, ) gitea_audit.write_event(event) except Exception: pass # best-effort; never break the action @contextlib.contextmanager def _audited(action: str, *, host, remote, org=None, repo=None, request_metadata=None, issue_number=None, pr_number=None, target_branch=None): """Wrap a mutating API call: audit SUCCEEDED on return, FAILED on exception. When auditing is off this yields immediately with no bookkeeping. """ if not gitea_audit.audit_enabled(): yield return try: yield except Exception as exc: _audit(action, host=host, remote=remote, org=org, repo=repo, result=gitea_audit.FAILED, reason=_redact(str(exc)), request_metadata=request_metadata, issue_number=issue_number, pr_number=pr_number, target_branch=target_branch) raise _audit(action, host=host, remote=remote, org=org, repo=repo, result=gitea_audit.SUCCEEDED, request_metadata=request_metadata, issue_number=issue_number, pr_number=pr_number, target_branch=target_branch) def _audit_pr_result(action: str): """Decorator for gated PR tools that return a result dict. Reads the tool's own result dict (authenticated_user, profile, reasons, performed) to emit an audit record classifying the outcome as SUCCEEDED, BLOCKED, or FAILED. No extra API calls: identity comes from the result. """ def decorate(fn): @functools.wraps(fn) def wrapper(*args, **kwargs): result = fn(*args, **kwargs) try: if isinstance(result, dict) and gitea_audit.audit_enabled(): reasons = [str(x) for x in (result.get("reasons") or [])] if result.get("performed"): status = gitea_audit.SUCCEEDED elif any("failed:" in x.lower() for x in reasons): # "failed:" marks a surfaced exception (e.g. "merge # failed: "); a bare gate message like "… failed # (fail closed)" is a policy block, not an execution error. status = gitea_audit.FAILED else: status = gitea_audit.BLOCKED remote = result.get("remote") host = REMOTES[remote]["host"] if remote in REMOTES else None _audit( action, host=host, remote=remote, result=status, reason="; ".join(reasons) or None, pr_number=result.get("pr_number"), head_sha=result.get("head_sha"), username=result.get("authenticated_user"), request_metadata={ "requested_action": result.get("requested_action"), "merge_method": result.get("merge_method"), }, ) except Exception: pass # best-effort; never break the tool return result return wrapper return decorate # ── Tools ───────────────────────────────────────────────────────────────────── @mcp.tool() def gitea_create_issue( title: str, body: str = "", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Create a new issue on a Gitea repository. Args: title: Issue title (required). body: Issue body text. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'number' and 'url' of the created issue. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues" try: data = api_request("POST", url, auth, {"title": title, "body": body}) except Exception as exc: _audit("create_issue", host=h, remote=remote, org=o, repo=r, result=gitea_audit.FAILED, reason=_redact(str(exc)), request_metadata={"title": title}) raise _audit("create_issue", host=h, remote=remote, org=o, repo=r, result=gitea_audit.SUCCEEDED, issue_number=data["number"], request_metadata={"title": title}) return {"number": data["number"], "url": data["html_url"]} @mcp.tool() def gitea_create_pr( title: str, head: str, base: str = "main", body: str = "", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Create a pull request on a Gitea repository. Args: title: PR title (required). head: Source branch name (required). base: Target branch (default: 'main'). body: PR description. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'number' and 'url' of the created PR. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls" payload = {"title": title, "body": body, "head": head, "base": base} meta = {"title": title, "head": head, "base": base} try: data = api_request("POST", url, auth, payload) except Exception as exc: _audit("create_pr", host=h, remote=remote, org=o, repo=r, result=gitea_audit.FAILED, reason=_redact(str(exc)), target_branch=head, request_metadata=meta) raise _audit("create_pr", host=h, remote=remote, org=o, repo=r, result=gitea_audit.SUCCEEDED, pr_number=data["number"], target_branch=head, request_metadata=meta) return {"number": data["number"], "url": data["html_url"]} @mcp.tool() def gitea_list_prs( state: str = "open", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list[dict]: """List pull requests on a Gitea repository. Args: state: State filter — 'open', 'closed', or 'all' (default: 'open'). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: List of dicts with 'number', 'title', 'state', 'head', 'base', 'url', 'mergeable'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls?state={state}" prs = api_get_all(url, auth) return [ { "number": pr["number"], "title": pr["title"], "state": pr["state"], "head": pr["head"]["ref"], "base": pr["base"]["ref"], "url": pr["html_url"], "mergeable": pr.get("mergeable"), } for pr in prs ] @mcp.tool() def gitea_view_pr( pr_number: int, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Get details of a single pull request. Args: pr_number: The pull request index/number. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with PR details. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}" pr = api_request("GET", url, auth) return { "number": pr["number"], "title": pr["title"], "body": pr.get("body", ""), "state": pr["state"], "head": pr["head"]["ref"], "base": pr["base"]["ref"], "url": pr["html_url"], "mergeable": pr.get("mergeable"), "user": pr.get("user", {}).get("login", ""), } # Actions whose eligibility this tool can evaluate. _ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge") @mcp.tool() def gitea_check_pr_eligibility( pr_number: int, action: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Read-only: is the current identity/profile eligible to perform *action* on a PR? Evaluates eligibility only — it NEVER reviews, approves, requests changes, merges, or mutates anything. It inspects the authenticated identity (via the /user endpoint), the active runtime profile metadata (``get_profile``), and the target PR (author, state, head SHA, mergeability), then returns a decision with clear reasons. Fail-closed rules: - Unknown action or unknown remote → not eligible. - Profile has no configured allowed operations, or the action is not in the profile's allowed operations (or is forbidden) → not eligible. - Authenticated identity cannot be determined → not eligible. - Authenticated user equals the PR author → not eligible to ``approve`` or ``merge``. - PR is not open → not eligible. - For ``merge``, PR must be reported mergeable. Never returns the token, Authorization header, or any credential material. Args: pr_number: Target PR number. action: One of 'review', 'approve', 'request_changes', 'merge'. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'eligible' (bool), the inputs inspected, and 'reasons'. """ action = (action or "").strip().lower() profile = get_profile() result = { "eligible": False, "requested_action": action, "authenticated_user": None, "profile_name": profile["profile_name"], "allowed_operations": profile["allowed_operations"], "pr_author": None, "pr_number": pr_number, "pr_state": None, "head_sha": None, "mergeable": None, "remote": remote if remote in REMOTES else None, "reasons": [], } reasons = result["reasons"] if action not in _ELIGIBILITY_ACTIONS: reasons.append( f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}" ) return result if remote not in REMOTES: reasons.append(f"unknown remote '{remote}'") return result # Profile capability check (metadata only; not enforcement of the action). # Both the action and the profile lists are normalized before comparison # (#106), so legacy spellings ("merge") and canonical namespaced ops # ("gitea.pr.merge") always match each other and never cross services. allowed = profile["allowed_operations"] forbidden = profile["forbidden_operations"] op_ok, op_reason = gitea_config.check_operation(action, allowed, forbidden) if not op_ok: if op_reason == "no-allowed-operations": reasons.append( "profile has no configured allowed operations (fail closed)") elif op_reason == "forbidden": reasons.append(f"profile forbids '{action}'") elif op_reason == "invalid-forbidden-entry": reasons.append( "profile has an unrecognized forbidden operation entry " "(fail closed)") else: reasons.append(f"profile is not allowed to {action}") h, o, r = _resolve(remote, host, org, repo) # Authenticated identity (read-only). Fail soft; never leak error/secret. try: auth = _auth(h) except Exception: auth = None auth_user = None if auth: try: who = api_request("GET", f"https://{h}/api/v1/user", auth) auth_user = (who or {}).get("login") except Exception: auth_user = None result["authenticated_user"] = auth_user if not auth_user: reasons.append("authenticated identity could not be determined") # PR facts (read-only GET; no mutation). pr_author = None pr_state = None if auth: try: pr = api_request( "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth ) pr_author = (pr or {}).get("user", {}).get("login") pr_state = (pr or {}).get("state") result["head_sha"] = ((pr or {}).get("head") or {}).get("sha") result["mergeable"] = (pr or {}).get("mergeable") except Exception: reasons.append("PR details could not be retrieved") else: reasons.append("PR details could not be retrieved (no credentials)") result["pr_author"] = pr_author result["pr_state"] = pr_state # PR must be open to act on. if pr_state is None: reasons.append("PR state unknown") elif pr_state != "open": reasons.append(f"PR is not open (state={pr_state})") # Self-author must not approve or merge their own PR. if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"): reasons.append("authenticated user is PR author") # Merge needs a positive mergeability signal. if action == "merge": if result["mergeable"] is False: reasons.append("PR is not mergeable") elif result["mergeable"] is None: reasons.append("PR mergeability unknown") result["eligible"] = len(reasons) == 0 if result["eligible"]: reasons.append("all eligibility checks passed") # Add enhanced error clarity details (#131) auth_user = result["authenticated_user"] pr_author = result["pr_author"] profile_name = result["profile_name"] is_self = bool(auth_user and pr_author and auth_user == pr_author) result["active_identity"] = auth_user result["active_profile"] = profile_name result["self_author"] = is_self # Determine missing permission missing_perm = None if not op_ok: missing_perm = f"gitea.pr.{action}" if action in ("approve", "request_changes", "merge") else f"gitea.{action}" result["missing_permission"] = missing_perm # Determine required profile req_profile = None if action in ("approve", "request_changes", "review"): req_profile = "A profile with reviewer role permissions (allowing approve/merge/review, and forbidding author operations)" elif action == "merge": req_profile = "A profile with reviewer role permissions and explicit merge permission" result["required_profile"] = req_profile # Determine required identity req_identity = None if is_self: req_identity = f"Any Gitea user other than PR author '{pr_author}'" result["required_identity"] = req_identity # Determine if fixable by profile switch vs requires different namespace switching_supported = gitea_config.is_runtime_switching_enabled() fixable_by_switch = False req_different_ns = False if not op_ok: config = gitea_config.load_config() has_capable_profile = False if config: for name, p in (config.get("profiles") or {}).items(): p_allowed = p.get("allowed_operations", []) p_forbidden = p.get("forbidden_operations", []) ok, _ = gitea_config.check_operation(action, p_allowed, p_forbidden) if ok: has_capable_profile = True break if switching_supported and has_capable_profile: fixable_by_switch = True else: req_different_ns = True result["fixable_by_profile_switch"] = fixable_by_switch result["requires_different_namespace"] = req_different_ns # Determine safe next step safe_step = "Ready." if not result["eligible"]: if is_self: safe_step = "Self-review/self-merge is forbidden. Ask a different operator/reviewer to review and merge this PR." elif not auth_user: safe_step = f"Ask the operator to configure valid credentials/token for profile '{profile_name}'." elif not op_ok: if fixable_by_switch: safe_step = f"Switch to a reviewer profile by calling gitea_activate_profile with a profile that allows {action}." else: safe_step = "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile." elif result["pr_state"] != "open": safe_step = "No action can be taken on a closed/merged PR." elif action == "merge": if result["mergeable"] is False: safe_step = "Address conflicts in the PR branches first, or wait for CI checks to pass before merging." elif result["mergeable"] is None: safe_step = "Wait for Gitea to finish calculating mergeability or trigger a re-check." result["safe_next_step"] = safe_step return result # Review actions this gated tool can perform, mapped to (eligibility action, # Gitea review *event*). The eligibility action is fed to # ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same # identity/profile/author gates. Note: 'merge' is deliberately absent — merge # belongs to a separate tool/issue and is never performed here. _REVIEW_ACTIONS = { # 'comment' posts review findings without an approval/rejection state. # #14 names this eligibility category 'review'. "comment": ("review", "COMMENT"), "approve": ("approve", "APPROVE"), "request_changes": ("request_changes", "REQUEST_CHANGES"), } # Patterns scrubbed from any surfaced error text so a credential can never leak. _SECRET_PREFIXES = ("token ", "Basic ") def _redact(text: str) -> str: """Strip anything that looks like an Authorization credential from *text*. Errors raised by ``api_request`` echo the server response body, not the request headers, so a token should never appear — this is defence in depth so a future change can't leak ``token …`` / ``Basic …`` material into a tool result or log line. """ if not text: return text out = text for prefix in _SECRET_PREFIXES: idx = 0 while True: i = out.find(prefix, idx) if i == -1: break j = i + len(prefix) while j < len(out) and not out[j].isspace(): j += 1 out = out[:i] + prefix + "[REDACTED]" + out[j:] idx = i + len(prefix) + len("[REDACTED]") return out @mcp.tool() @_audit_pr_result("submit_pr_review") def gitea_submit_pr_review( pr_number: int, action: str, body: str = "", expected_head_sha: str | None = None, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Gated PR review mutation: comment findings, request changes, or approve. This is the only tool that submits a Gitea PR *review*. It performs a mutation **only after every safety gate passes**; if any gate fails it returns ``performed=False`` and never calls the mutating endpoint. Gate order (fail-closed at each step): 1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'. 2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated -user lookup, active-profile lookup, PR-author lookup, self-approval block, and profile-allowed-operation check. ``approve`` requires eligibility for 'approve', ``request_changes`` requires 'request_changes', and ``comment`` requires 'review'. 3. Redundantly block self-approval (authenticated user == PR author). 4. If ``expected_head_sha`` is supplied and the PR head has moved, abort. 5. Only then POST the review. Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the *formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES review state tied to the head commit), chosen over the plain issue-comment endpoint (``/issues/{n}/comments``) so that approvals and change requests carry real review state — a plain comment cannot approve or block a PR. Merge is intentionally NOT implemented here. Never returns the token, Authorization header, or any credential material. Args: pr_number: Target PR number. action: 'comment', 'approve', or 'request_changes'. body: Review body / finding text. expected_head_sha: Optional. If given and the PR head SHA differs, the review is refused (guards against reviewing a changed PR). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict describing the attempt: action, whether it was performed, the authenticated user, profile name, PR author, PR number, head SHA checked, and the reasons/gates passed or blocked. Never secrets. """ action = (action or "").strip().lower() result = { "requested_action": action, "performed": False, "authenticated_user": None, "profile_name": get_profile()["profile_name"], "pr_author": None, "pr_number": pr_number, "head_sha": None, "expected_head_sha": expected_head_sha, "remote": remote if remote in REMOTES else None, "reasons": [], } reasons = result["reasons"] # Gate 1 — valid review action (no mutation on unknown action). if action not in _REVIEW_ACTIONS: reasons.append( f"unknown review action '{action}'; expected one of " f"{sorted(_REVIEW_ACTIONS)}" ) return result eligibility_action, event = _REVIEW_ACTIONS[action] # Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve # + profile-allowed). This performs only read-only GETs. elig = gitea_check_pr_eligibility( pr_number=pr_number, action=eligibility_action, remote=remote, host=host, org=org, repo=repo, ) result["authenticated_user"] = elig.get("authenticated_user") result["profile_name"] = elig.get("profile_name", result["profile_name"]) result["pr_author"] = elig.get("pr_author") result["head_sha"] = elig.get("head_sha") if not elig.get("eligible"): reasons.append( f"eligibility check for '{eligibility_action}' failed (fail closed)" ) reasons.extend(elig.get("reasons", [])) return result # Gate 3 — redundant self-approval block (belt-and-suspenders over #14). auth_user = result["authenticated_user"] pr_author = result["pr_author"] if action == "approve" and auth_user and pr_author and auth_user == pr_author: reasons.append("self-approval blocked (authenticated user is PR author)") return result # Gate 4 — head SHA must match if the caller pinned one. actual_sha = result["head_sha"] if expected_head_sha and actual_sha and expected_head_sha != actual_sha: reasons.append( "expected head SHA does not match current PR head (fail closed)" ) return result if not actual_sha: # Should be unreachable — eligibility fails closed without a head SHA — # but never submit a review without a commit to pin it to. reasons.append("PR head SHA unavailable (fail closed)") return result # All gates passed — perform the single mutating call. h, o, r = _resolve(remote, host, org, repo) try: auth = _auth(h) review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews" payload = {"body": body, "event": event, "commit_id": actual_sha} api_request("POST", review_url, auth, payload) except Exception as exc: # noqa: BLE001 — redact before surfacing reasons.append(f"review submission failed: {_redact(str(exc))}") return result result["performed"] = True reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}") return result @mcp.tool() def gitea_edit_pr( pr_number: int, title: str | None = None, body: str | None = None, state: str | None = None, base: str | None = None, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Edit an existing pull request on a Gitea repository. Args: pr_number: The pull request index/number (required). title: New PR title. body: New PR description. state: New state — 'open' or 'closed'. base: Target branch name. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with success status and details of the edited PR. """ # Validate inputs BEFORE any auth/profile resolution or API setup: a # no-fields call is a pure validation error and must not depend on # credentials, network, or environment configuration. payload = {} if title is not None: payload["title"] = title if body is not None: payload["body"] = body if state is not None: payload["state"] = state if base is not None: payload["base"] = base if not payload: raise ValueError("At least one field to edit (title, body, state, base) must be provided.") h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}" with _audited("edit_pr", host=h, remote=remote, org=o, repo=r, pr_number=pr_number, request_metadata={"fields": sorted(payload)}): data = api_request("PATCH", url, auth, payload) cleanup_status = None if state == "closed": cleanup = cleanup_in_progress_for_pr(data, remote, host, org, repo) cleanup_status = cleanup.get("cleanup_status") if isinstance(cleanup_status, dict): for issue_num, st in cleanup_status.items(): if st == "released": try: comment_url = f"{repo_api_url(h, o, r)}/issues/{issue_num}/comments" api_request("POST", comment_url, auth, {"body": f"Tracker cleanup: removed `status:in-progress` from this issue because linked PR #{pr_number} was closed."}) except Exception: pass return { "success": True, "number": data["number"], "title": data["title"], "body": data.get("body", ""), "state": data["state"], "url": data["html_url"], "cleanup_status": cleanup_status, } @mcp.tool() def gitea_get_file( filepath: str, ref: str = "main", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Retrieve metadata and content of a file from a Gitea repository. Args: filepath: The path to the file in the repository (e.g. 'README.md' or 'src/main.py'). ref: The branch, tag, or commit hash to retrieve the file from (default: 'main'). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict containing 'name', 'path', 'sha', 'size', 'encoding', and 'content' (base64). """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) import urllib.parse encoded_path = urllib.parse.quote(filepath, safe="") url = f"{repo_api_url(h, o, r)}/contents/{encoded_path}?ref={ref}" data = api_request("GET", url, auth) return { "name": data.get("name", ""), "path": data.get("path", ""), "sha": data.get("sha", ""), "size": data.get("size", 0), "encoding": data.get("encoding", ""), "content": data.get("content", ""), } @mcp.tool() def gitea_commit_files( files: list[dict], message: str, branch: str | None = None, new_branch: str | None = None, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Commit changes to multiple files in a Gitea repository in a single atomic commit. Args: files: List of file operations. Each file dict must contain 'operation' ('create', 'update', 'delete', 'rename'), 'path', and 'content' (base64 encoded for create/update), and optionally 'sha' (required for update/delete) or 'from_path' (for rename). message: The commit message. branch: Optional existing branch to start/commit from. new_branch: Optional new branch name to create for this commit. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with success status and commit/branch information. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/contents" payload = { "files": files, "message": message, } if branch is not None: payload["branch"] = branch if new_branch is not None: payload["new_branch"] = new_branch with _audited("commit_files", host=h, remote=remote, org=o, repo=r, target_branch=(new_branch or branch), request_metadata={"message": message, "paths": [f.get("path") for f in files], "operations": [f.get("operation") for f in files]}): data = api_request("POST", url, auth, payload) return { "success": True, "commit": data.get("commit", {}).get("sha", ""), "branch": data.get("branch", {}).get("name", ""), } # Merge methods supported by the Gitea merge API. _MERGE_METHODS = ("merge", "squash", "rebase") @mcp.tool() @_audit_pr_result("merge_pr") def gitea_merge_pr( pr_number: int, confirmation: str = "", expected_head_sha: str | None = None, expected_changed_files: list[str] | None = None, do: str = "merge", title: str | None = None, message: str | None = None, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Gated merge of a Gitea pull request (#16). This is the ONLY merge path this server exposes, and it mutates only after every safety gate passes. No ungated merge tool remains: legacy ``gitea_review_pr`` fails closed on ``merge=True`` and ``gitea_submit_pr_review`` never merges. Gate order (fail-closed at each step; the merge API is called only if all gates pass): 1. Merge method (``do``) is 'merge', 'squash', or 'rebase'. 2. Explicit confirmation: ``confirmation`` must equal ``"MERGE PR "``. Without it, the tool makes no API calls at all. 3. Reuse ``gitea_check_pr_eligibility`` (#14) with action 'merge': this proves the authenticated identity, the active profile (and that it allows merge), the PR author, blocks self-merge, requires the PR to be open, and fails closed when the PR is not mergeable or mergeability is unknown. 4. If ``expected_head_sha`` is given and the PR head moved → refuse. 5. If ``expected_changed_files`` is given and the PR's changed file set differs → refuse. 6. Redundant self-merge block (authenticated user == PR author). No force / ignore-checks option is exposed. Gitea's own ``mergeable`` signal (which reflects branch-protection required reviews and status checks) must be positive, so required approval/check state is honoured, never bypassed. Never returns the token, Authorization header, or any credential material. Args: pr_number: The PR number to merge. confirmation: Must be exactly ``"MERGE PR "`` or merge is refused. expected_head_sha: Strongly recommended. If set and the PR head differs, refuse. expected_changed_files: Optional. If set and the PR's changed file set differs, refuse. do: Merge style — 'merge', 'squash', or 'rebase'. title: Optional merge commit title. message: Optional merge commit message. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict describing the attempt: performed, authenticated user, profile name, PR author, PR number, head SHA checked, merge method, reasons/gates passed or blocked, and merge result / merge commit if available. Never secrets. """ do = (do or "").strip().lower() result = { "performed": False, "authenticated_user": None, "profile_name": get_profile()["profile_name"], "pr_author": None, "pr_number": pr_number, "head_sha": None, "expected_head_sha": expected_head_sha, "merge_method": do, "mergeable": None, "remote": remote if remote in REMOTES else None, "merge_result": None, "merge_commit": None, "reasons": [], } reasons = result["reasons"] # Gate 1 — valid merge method (no API call on a bad method). if do not in _MERGE_METHODS: reasons.append( f"unknown merge method '{do}'; expected one of {list(_MERGE_METHODS)}" ) return result # Gate 2 — explicit confirmation (fail fast; zero API calls without it). expected_confirmation = f"MERGE PR {pr_number}" if (confirmation or "").strip() != expected_confirmation: reasons.append( f"explicit confirmation required: pass confirmation='{expected_confirmation}'" ) return result # Gate 3 — reuse #14 eligibility (identity + profile + merge-allowed + # author + self-merge block + open + mergeable/unknown fail-closed). # Read-only GETs only. elig = gitea_check_pr_eligibility( pr_number=pr_number, action="merge", remote=remote, host=host, org=org, repo=repo, ) result["authenticated_user"] = elig.get("authenticated_user") result["profile_name"] = elig.get("profile_name", result["profile_name"]) result["pr_author"] = elig.get("pr_author") result["head_sha"] = elig.get("head_sha") result["mergeable"] = elig.get("mergeable") if not elig.get("eligible"): reasons.append("eligibility check for 'merge' failed (fail closed)") reasons.extend(elig.get("reasons", [])) return result # Gate 4 — head SHA must match if the caller pinned a reviewed SHA. actual_sha = result["head_sha"] if expected_head_sha and actual_sha and expected_head_sha != actual_sha: reasons.append( "expected head SHA does not match current PR head (fail closed)" ) return result if not actual_sha: # Unreachable — eligibility fails closed without a head SHA — but never # merge a PR whose head commit we could not read. reasons.append("PR head SHA unavailable (fail closed)") return result h, o, r = _resolve(remote, host, org, repo) # Gate 5 — changed files must match the reviewed set, if provided. if expected_changed_files is not None: try: auth = _auth(h) files = api_request( "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}/files", auth ) actual_files = sorted( (f or {}).get("filename", "") for f in (files or []) ) except Exception as exc: # noqa: BLE001 — redact before surfacing reasons.append( f"could not verify changed files (fail closed): {_redact(str(exc))}" ) return result result["changed_files"] = actual_files if actual_files != sorted(expected_changed_files): reasons.append( "PR changed files do not match expected_changed_files (fail closed)" ) return result # Gate 6 — redundant self-merge block (belt-and-suspenders over #14). auth_user = result["authenticated_user"] pr_author = result["pr_author"] if auth_user and pr_author and auth_user == pr_author: reasons.append("self-merge blocked (authenticated user is PR author)") return result # All gates passed — perform the single merge mutation. try: auth = _auth(h) merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge" payload = {"Do": do} if title: payload["MergeTitleField"] = title if message: payload["MergeMessageField"] = message api_request("POST", merge_url, auth, payload) # Best-effort read-back of the merge commit SHA (redacted on error). merged = None try: merged = api_request( "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth ) result["merge_commit"] = (merged or {}).get("merged_commit_sha") except Exception: result["merge_commit"] = None # Tracker cleanup (#98): never blocks the merge, and always surfaces an # explicit cleanup_status once the merge mutation has happened. Cleanup # needs the PR title/body/branch, which only the read-back payload # carries here — so a failed read-back is an explicit skip, not a # silent one. if merged is None: result["cleanup_status"] = "skipped (merge read-back failed)" else: try: cleanup = cleanup_in_progress_for_pr(merged, remote, host, org, repo) result["cleanup_status"] = cleanup.get("cleanup_status") except Exception as cleanup_exc: # noqa: BLE001 — redact before surfacing result["cleanup_status"] = ( f"skipped (cleanup error: {_redact(str(cleanup_exc))})" ) except Exception as exc: # noqa: BLE001 — redact before surfacing reasons.append(f"merge failed: {_redact(str(exc))}") return result result["performed"] = True result["merge_result"] = f"PR #{pr_number} merged via '{do}'." reasons.append(f"all gates passed; merged PR #{pr_number} via '{do}'") return result @mcp.tool() def gitea_review_pr( pr_number: int, event: str = "APPROVE", body: str = "", merge: bool = False, merge_method: str = "merge", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Submit a review on a Gitea pull request (Legacy wrapper). This tool is a compatibility wrapper around the safe `gitea_submit_pr_review`. It uses the same #14 eligibility gates. Merging via this tool is no longer supported and will fail closed (see #16). Args: pr_number: The PR number to review. event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'. body: Review body text / comment. merge: Merging is disabled; if True, the tool fails closed. merge_method: Ignored. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with success status and message. """ if merge: return { "success": False, "message": "merge=True is no longer supported in this tool (belongs to #16)." } if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]: raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.") # Map legacy event string to the action expected by gitea_submit_pr_review event_map = { "APPROVE": "approve", "COMMENT": "comment", "REQUEST_CHANGES": "request_changes" } action = event_map[event] result = gitea_submit_pr_review( pr_number=pr_number, action=action, body=body, expected_head_sha=None, remote=remote, host=host, org=org, repo=repo ) if result.get("performed"): return {"success": True, "message": f"Successfully submitted review for PR #{pr_number} with event '{event}'."} else: reasons = result.get("reasons", []) return {"success": False, "message": f"Review submission failed eligibility gates: {reasons}"} @mcp.tool() def gitea_delete_branch( branch: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Delete a remote branch from a Gitea repository. Args: branch: The remote branch name (e.g. 'feat/branch-name'). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' and 'message'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) import urllib.parse encoded_branch = urllib.parse.quote(branch, safe="") url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}" with _audited("delete_branch", host=h, remote=remote, org=o, repo=r, target_branch=branch, request_metadata={"branch": branch}): api_request("DELETE", url, auth) return {"success": True, "message": f"Remote branch '{branch}' deleted."} @mcp.tool() def gitea_close_issue( issue_number: int, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Close an issue by setting its state to 'closed'. Args: issue_number: The issue number to close. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' boolean and 'message'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues/{issue_number}" with _audited("close_issue", host=h, remote=remote, org=o, repo=r, issue_number=issue_number, request_metadata={"state": "closed"}): api_request("PATCH", url, auth, {"state": "closed"}) cleanup_result = release_in_progress_label([issue_number], remote, host, org, repo) return { "success": True, "message": f"Issue #{issue_number} closed.", "cleanup_status": cleanup_result } @mcp.tool() def gitea_list_issues( state: str = "open", label: str | None = None, limit: int = 50, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list[dict]: """List issues on a Gitea repository with optional filters. Args: state: Filter by state — 'open', 'closed', or 'all'. label: Filter by label name (e.g. 'important'). limit: Max number of issues to return across all pages (default: 50). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: List of dicts with 'number', 'title', 'state', 'labels', 'assignee'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) params = f"state={state}&type=issues" if label: params += f"&labels={label}" url = f"{repo_api_url(h, o, r)}/issues?{params}" issues = api_get_all(url, auth, limit=limit) return [ { "number": i["number"], "title": i["title"], "state": i["state"], "labels": [lb["name"] for lb in i.get("labels", [])], "assignee": (i.get("assignee") or {}).get("login", ""), } for i in issues ] @mcp.tool() def gitea_view_issue( issue_number: int, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Get full details of a single issue. Args: issue_number: The issue number to view. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues/{issue_number}" i = api_request("GET", url, auth) return { "number": i["number"], "title": i["title"], "body": i.get("body", ""), "state": i["state"], "labels": [lb["name"] for lb in i.get("labels", [])], "assignee": (i.get("assignee") or {}).get("login", ""), "url": i["html_url"], } def _issue_comment_gate(op: str) -> list[str]: """Profile permission check for issue-comment tools (#126). Issue discussion comments are gated separately from the gitea.pr.* review/merge family: listing requires ``gitea.read``, creating requires ``gitea.issue.comment``. Returns a list of block reasons (empty = allowed); an unreadable profile fails closed. """ try: profile = get_profile() except Exception as exc: return [f"profile could not be resolved (fail closed): {_redact(str(exc))}"] op_ok, op_reason = gitea_config.check_operation( op, profile["allowed_operations"], profile["forbidden_operations"]) if op_ok: return [] if op_reason == "no-allowed-operations": return ["profile has no configured allowed operations (fail closed)"] if op_reason == "forbidden": return [f"profile forbids '{op}'"] if op_reason == "invalid-forbidden-entry": return ["profile has an unrecognized forbidden operation entry (fail closed)"] return [f"profile is not allowed to {op}"] @mcp.tool() def gitea_list_issue_comments( issue_number: int, limit: int = 50, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """List discussion comments on a Gitea issue. Read-only. Issue discussion comments are distinct from PR reviews: this reads the issue comment thread and never touches review endpoints. The profile must allow ``gitea.read`` (fail closed otherwise). Normal output is LLM-safe: no endpoint URLs. Set GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include each comment's web link. Args: issue_number: The issue number whose comments to list (required). limit: Max number of comments to return (default: 50). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success', 'issue_number', and 'comments' (each with 'id', 'author', 'created_at', 'updated_at', 'body'); on a permission block, 'success' False and 'reasons' with no API call made. """ reasons = _issue_comment_gate("gitea.read") if reasons: return {"success": False, "issue_number": issue_number, "reasons": reasons} h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments" comments = api_request("GET", api, auth) or [] reveal = _reveal_endpoints() out = [] for c in comments[:limit]: entry = { "id": c["id"], "author": (c.get("user") or {}).get("login", ""), "created_at": c.get("created_at"), "updated_at": c.get("updated_at"), "body": c.get("body", ""), } if reveal: entry["url"] = c.get("html_url") out.append(entry) return {"success": True, "issue_number": issue_number, "comments": out} @mcp.tool() def gitea_create_issue_comment( issue_number: int, body: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Post a markdown comment to a Gitea issue's discussion thread. Issue discussion comments are distinct from PR reviews: this posts to the issue comment thread only and never submits review verdicts. The profile must allow ``gitea.issue.comment`` — gated separately from the gitea.pr.* review/merge operations (fail closed otherwise). The target issue is always explicit; there is no inference beyond the standard remote defaults used by every tool. Normal output is LLM-safe: comment id + issue number, no endpoint URLs. Set GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include the comment's web link. Errors are redacted before being raised. Args: issue_number: The issue number to comment on (required). body: Markdown comment body (required, non-empty). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success', 'comment_id', and 'issue_number' ('url' only with the reveal opt-in); on a permission block or empty body, 'success'/'performed' False and 'reasons' with no API call made. """ reasons = _issue_comment_gate("gitea.issue.comment") if not (body or "").strip(): reasons.append("comment body must be a non-empty string") if reasons: return {"success": False, "performed": False, "issue_number": issue_number, "reasons": reasons} h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments" try: with _audited("create_issue_comment", host=h, remote=remote, org=o, repo=r, issue_number=issue_number, request_metadata={"body_chars": len(body)}): data = api_request("POST", api, auth, {"body": body}) except Exception as exc: raise RuntimeError(_redact(str(exc))) from None result = {"success": True, "performed": True, "comment_id": data["id"], "issue_number": issue_number} if _reveal_endpoints(): result["url"] = data.get("html_url") return result # ── Operator guide / project skills (#128) ─────────────────────────────────── # Read-only capability discovery for new LLM sessions: what the active # profile may do, how identity is verified, the non-negotiable safety rules, # and which project workflows ("skills") exist. Nothing here mutates state or # widens any permission — the guide describes the gates, it never bypasses # them. def _role_kind(allowed, forbidden) -> str: """Classify the active profile from its normalized operations. 'author' can create PRs / push branches; 'reviewer' can approve/merge; 'mixed' can do both (a config smell — the reviewer-deadlock invariant forbids it in v2 configs); 'limited' can do neither. """ def can(op): return gitea_config.check_operation(op, allowed, forbidden)[0] review = can("gitea.pr.approve") or can("gitea.pr.merge") author = can("gitea.pr.create") or can("gitea.branch.push") if review and author: return "mixed" if review: return "reviewer" if author: return "author" return "limited" _HARD_STOPS = [ "No self-review and no self-merge: the authenticated user must differ " "from the PR author for review/approve/merge.", "No tags or releases without an explicit operator instruction.", "Never print token values, keychain IDs, passwords, or raw service " "URLs in normal output.", "No production access or production mutations.", "No force-merge and no bypassing of merge gates; Gitea's own " "mergeable signal is honoured, never overridden.", "Do not rewrite the live profiles config; config changes are " "operator-owned.", ] _GUIDE_RULES = { "hard_stops": _HARD_STOPS, "fail_closed": ( "Every gate fails closed: unknown/disabled profiles, unknown " "operations, empty allowed-operation lists, unresolved identity, " "and unparseable config all deny the action instead of guessing. " "Disabled profiles/services are never silently substituted."), "head_sha_pinning": ( "Before reviewing or merging, record the PR head SHA and pass it as " "expected_head_sha; if the head moves between review and merge, the " "tool refuses and the review must be redone."), "merge_confirmation": ( "gitea_merge_pr requires confirmation to equal 'MERGE PR ' " "exactly (e.g. 'MERGE PR 42'); without it no API call is made. " "Merge only when the operator has explicitly authorized it."), "redaction": [ "Normal output contains no endpoint URLs, keychain IDs, or token " "values; auth is reported as type/status only.", "GITEA_MCP_REVEAL_ENDPOINTS=1 is the local admin/debug opt-in for " "web links and endpoint names; it never reveals token values.", "Errors are redacted before being surfaced.", ], "separation": ( "Author, reviewer, and merger are separate identities. An author " "profile creates branches/commits/PRs and must never " "review/approve/merge its own work; a reviewer/merger profile " "reviews and merges and must never create PRs or push branches " "(reviewer-deadlock invariant). Issue discussion comments " "(gitea.issue.comment) are gated separately from PR reviews " "(gitea.pr.*)."), "profile_switching": [ "The active profile is selected via GITEA_MCP_PROFILE (with GITEA_MCP_CONFIG pointing at the config path).", "By default, static-profile mode is active: changing local environment variables does not dynamically update the already-connected MCP server process.", "If runtime profile switching is explicitly enabled in the config (allow_runtime_switching: true), use gitea_activate_profile to switch profiles in-place.", "If switching is disabled, you must change the launcher configuration/environment (dual-namespace) and reconnect/restart the MCP session.", "After any profile switch or startup, you must verify the new runtime identity with a fresh gitea_whoami call before taking action." ], "identity_verification": [ "Call gitea_whoami with an explicit remote first; confirm the " "authenticated username and profile match the role you were asked " "to perform.", "If the username, profile, or allowed operations do not match the " "task, STOP and report instead of proceeding.", "Runtime identity (whoami) is the source of truth; config-declared " "roles are metadata only.", ], } _COMMON_WORKFLOWS = [ "issue authoring: verify identity, create/claim the issue, keep scope " "explicit (remote/org/repo).", "implementation: claim issue, branch from fresh master, implement only " "the issue scope, test, open a PR referencing the issue, stop.", "PR review: verify reviewer identity, pin the head SHA, validate " "independently, post a verdict; never review your own work.", "PR merge: reviewer identity + eligibility check + pinned head + " "explicit 'MERGE PR ' confirmation, only with operator " "authorization.", ] # Skill registry (#128). status: 'available' = backed by tools in this # server; 'designed-not-implemented' = design exists but no MCP tools yet # (listed rather than omitted so sessions know the boundary); 'operator-only' # = never performed by an LLM session. _PROJECT_SKILLS = { "gitea-issue-authoring": { "description": "Create, view, label, claim, and close Gitea issues.", "when_to_use": "Filing new work, updating issue state, claiming an " "issue before implementation (gitea_mark_issue).", "required_operations": ["gitea.read"], "status": "available", "notes": "Always pass remote/org/repo explicitly.", "steps": [ "Verify identity with gitea_whoami (explicit remote).", "Check the issue queue with gitea_list_issues.", "Create with gitea_create_issue or claim with gitea_mark_issue " "action='start'.", "Keep issue bodies free of secrets, tokens, and raw service " "URLs.", "Release the claim (action='done') or close when finished.", ], }, "gitea-pr-creation": { "description": "Author a feature branch and open a pull request.", "when_to_use": "After implementing a claimed issue on a feature " "branch; author profiles only.", "required_operations": ["gitea.pr.create"], "status": "available", "steps": [ "Branch from fresh master (git pull first).", "Implement only the claimed issue's scope; stage files " "explicitly.", "Run targeted tests, the full suite, py_compile, git diff " "--check, and a secret sweep before committing.", "Open the PR with gitea_create_pr referencing the issue " "('Closes #').", "Stop: do not review or merge your own PR.", ], }, "gitea-pr-review": { "description": "Independently review a pull request and post a " "verdict.", "when_to_use": "Reviewer profile asked to evaluate someone else's " "PR.", "required_operations": ["gitea.pr.review"], "status": "available", "steps": [ "Verify reviewer identity with gitea_whoami; the PR author " "must be a different user.", "Pin the PR head SHA (gitea_view_pr) before validating.", "Validate independently: scope vs the linked issue, tests, " "diff check, secret sweep.", "Post the verdict with gitea_review_pr / " "gitea_submit_pr_review.", "Do not merge unless the operator explicitly authorizes it.", ], }, "gitea-pr-merge": { "description": "Gated merge of an approved pull request.", "when_to_use": "Reviewer/merger profile with explicit operator " "authorization to merge.", "required_operations": ["gitea.pr.merge"], "status": "available", "steps": [ "Confirm operator authorization for this specific merge.", "Run gitea_check_pr_eligibility action='merge' and resolve " "every reason it reports.", "Call gitea_merge_pr with expected_head_sha pinned and " "confirmation 'MERGE PR '.", "Confirm linked issues auto-closed and the in-progress label " "was released.", ], }, "gitea-issue-comments": { "description": "Read and post issue discussion comments (distinct " "from PR reviews).", "when_to_use": "Design discussions, review notes on issues, " "decision records.", "required_operations": ["gitea.issue.comment"], "status": "available", "notes": "Listing needs only gitea.read; posting needs " "gitea.issue.comment, gated separately from gitea.pr.*.", "steps": [ "List with gitea_list_issue_comments (explicit issue number).", "Post with gitea_create_issue_comment; markdown body, no " "secrets or raw URLs.", "Never use PR review tools for issue discussion or vice " "versa.", ], }, "profile-switching": { "description": "Change the active MCP identity between author and " "reviewer profiles.", "when_to_use": "A task requires a role the current profile " "forbids (e.g. moving from authoring to review).", "required_operations": [], "status": "available", "steps": [ "Check profile mode using gitea_get_runtime_context.", "If dynamic-profile mode is enabled (allow_runtime_switching: true), " "call gitea_activate_profile to switch in-place.", "If static-profile mode (default), ask the operator to update GITEA_MCP_PROFILE " "in the launcher environment and reconnect/restart the MCP server.", "Immediately verify the new identity with a fresh gitea_whoami call before taking action.", "If identity does not match the expected role, STOP.", ], }, "redaction-security-review": { "description": "Sweep changes and tool output for secrets, " "keychain IDs, token values, and raw service URLs.", "when_to_use": "Before every commit/PR and when reviewing any " "diff.", "required_operations": ["gitea.read"], "status": "available", "steps": [ "Run git diff --check for whitespace damage.", "Grep the diff for token/password/secret/keychain patterns; " "only synthetic fixtures may match.", "Confirm normal tool output contains no endpoint URLs or " "keychain IDs (reveal opt-in excepted).", "Confirm no live config or private machine paths are being " "committed.", ], }, "jenkins-readonly": { "description": "Read-only Jenkins CI inspection (jobs, builds, " "logs).", "when_to_use": "Checking CI state once Jenkins MCP tools exist.", "required_operations": ["jenkins.read"], "status": "designed-not-implemented", "notes": "Design exists (issues #72/#77); no Jenkins MCP server is " "connected yet. Report SKIPPED rather than substituting " "shell or direct API calls.", "steps": [ "Confirm a Jenkins MCP server is connected; if not, report " "SKIPPED.", "Use read-only operations only; never trigger, cancel, or " "configure builds.", ], }, "glitchtip-readonly": { "description": "Read-only GlitchTip error/event inspection.", "when_to_use": "Investigating reported errors once GlitchTip MCP " "tools exist.", "required_operations": ["glitchtip.read"], "status": "designed-not-implemented", "notes": "Design exists (issue #73); no GlitchTip MCP server is " "connected yet. Report SKIPPED rather than substituting " "shell or direct API calls.", "steps": [ "Confirm a GlitchTip MCP server is connected; if not, report " "SKIPPED.", "Use read-only operations only; never mutate issues or " "settings.", ], }, "release-operator": { "description": "Release, tag, and version workflows.", "when_to_use": "Never by an LLM session on its own: releases and " "tags are operator-owned.", "required_operations": [], "status": "operator-only", "notes": "See the release SOP docs. Hard stop: no tags or " "releases without an explicit operator instruction.", "steps": [ "Stop and hand off to the operator; do not create tags or " "releases.", ], }, } @mcp.tool() def mcp_get_control_plane_guide( remote: str = "dadeschools", host: str | None = None, ) -> dict: """Structured operator guide for the current Gitea MCP session (#128). Read-only. Call this first in a new session: it reports the active profile, the authenticated identity (fail-soft), what this profile may and may not do, and the non-negotiable workflow rules (hard stops, fail-closed behavior, head-SHA pinning, merge confirmation, redaction, author/reviewer separation, profile switching). Guidance is profile-aware; if identity cannot be resolved it instructs the session to STOP. Args: remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. Returns: dict with 'read_only', 'remote', 'profile' (name, operations, role_kind), 'identity' (username/status/instruction; 'server' only with the reveal opt-in), 'guidance' (profile-specific), 'rules', 'workflows', and 'skills_tool'. Never secrets. """ h, _, _ = _resolve(remote, host, None, None) profile = get_profile() allowed = profile["allowed_operations"] forbidden = profile["forbidden_operations"] role = _role_kind(allowed, forbidden) username = _authenticated_username(h) identity = { "authenticated_username": username, "remote": remote, "status": "verified" if username else "unresolved", "instruction": ( "Identity verified — confirm it matches the role this task " "requires before acting." if username else "STOP: the authenticated identity could not be resolved. Do " "not perform any mutating operation; report to the operator " "and wait."), } if _reveal_endpoints(): identity["server"] = h guidance = [] if not username: guidance.append( "STOP: identity unresolved — no mutating operations until the " "operator fixes credentials/profile and gitea_whoami verifies.") if role == "author": guidance.append( "Author profile: creating branches, commits, issues, and PRs " "is allowed; review, approve, and merge are forbidden for this " "profile — never attempt them, and never review or merge your " "own PR from any profile.") elif role == "reviewer": guidance.append( "Reviewer profile: review/approve/merge may proceed ONLY after " "gitea_check_pr_eligibility passes and the PR head SHA is " "pinned (expected_head_sha); the PR author must be a different " "user, and merging additionally requires explicit operator " "authorization plus the 'MERGE PR ' confirmation.") elif role == "mixed": guidance.append( "WARNING: this profile allows both authoring and " "review/merge, which defeats two-party review. Treat it as " "misconfigured: STOP and report to the operator before any " "review or merge.") else: guidance.append( "Limited profile: no authoring and no review/merge " "operations are allowed. Read-only work only; anything else " "fails closed.") return { "read_only": True, "remote": remote, "profile": { "name": profile["profile_name"], "allowed_operations": allowed, "forbidden_operations": forbidden, "role_kind": role, }, "identity": identity, "guidance": guidance, "rules": _GUIDE_RULES, "workflows": _COMMON_WORKFLOWS, "skills_tool": "mcp_list_project_skills", } @mcp.tool() def mcp_list_project_skills() -> dict: """List the project's workflow skills and when to use them (#128). Read-only; makes no API calls. Each skill reports its name, description, when-to-use guidance, required operations, status ('available', 'designed-not-implemented', or 'operator-only'), and whether the current profile's operations permit it. Use mcp_get_skill_guide(name) for the step-by-step guide. Returns: dict with 'read_only', 'count', and 'skills' (list). Never secrets. """ profile = get_profile() allowed = profile["allowed_operations"] forbidden = profile["forbidden_operations"] skills = [] for name, meta in _PROJECT_SKILLS.items(): ops = meta["required_operations"] available = all( gitea_config.check_operation(op, allowed, forbidden)[0] for op in ops ) if ops else True entry = { "name": name, "description": meta["description"], "when_to_use": meta["when_to_use"], "required_operations": ops, "status": meta["status"], "available_to_current_profile": ( available and meta["status"] == "available"), } if meta.get("notes"): entry["notes"] = meta["notes"] skills.append(entry) return {"read_only": True, "count": len(skills), "skills": skills} @mcp.tool() def mcp_get_skill_guide(skill_name: str) -> dict: """Step-by-step guide for one named project skill (#128). Read-only; makes no API calls. Unknown skill names fail closed with the list of valid names instead of guessing. Args: skill_name: A name from mcp_list_project_skills (case-insensitive). Returns: dict with 'success', 'skill' metadata, and 'steps'; on an unknown name, 'success' False with 'reasons' and 'valid_skills'. """ key = (skill_name or "").strip().lower() meta = _PROJECT_SKILLS.get(key) if meta is None: return { "success": False, "reasons": [f"unknown skill '{skill_name}' (fail closed)"], "valid_skills": sorted(_PROJECT_SKILLS), } skill = { "name": key, "description": meta["description"], "when_to_use": meta["when_to_use"], "required_operations": meta["required_operations"], "status": meta["status"], } if meta.get("notes"): skill["notes"] = meta["notes"] return {"success": True, "skill": skill, "steps": list(meta["steps"])} @mcp.tool() def gitea_whoami( remote: str = "dadeschools", host: str | None = None, ) -> dict: """Look up the Gitea account the MCP server is authenticated as. Read-only. Calls Gitea's authenticated-user endpoint (GET /api/v1/user) with the configured token and returns safe identity metadata only. Use this to prove which account a mutating workflow (e.g. review/merge) would act as, so self-review/self-merge can be detected before acting. Never returns the token, Authorization header, password, or any other secret material. Fails closed with a clear error if the identity cannot be determined. Args: remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. Returns: dict with 'authenticated', 'username', 'display_name', 'user_id', 'email', 'server', 'remote', and 'profile' (safe runtime profile metadata: profile_name + allowed_operations; never the token). """ if remote not in REMOTES: raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}") h = host or REMOTES[remote]["host"] auth = _auth(h) url = f"https://{h}/api/v1/user" data = api_request("GET", url, auth) if not data or not data.get("login"): # Fail closed: never assume an identity we could not verify. raise RuntimeError( f"Could not determine the authenticated Gitea identity for {h}. " "Verify the configured token is valid for this instance." ) # Runtime profile metadata is non-secret (name + allowed op categories). # The token is resolved separately and is never included here. Endpoint # URLs stay out of normal LLM-facing output (#120): the logical remote # name is the addressing surface; 'server' appears only under the # GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. profile = get_profile() result = { "authenticated": True, "username": data.get("login"), "display_name": data.get("full_name") or None, "user_id": data.get("id"), "email": data.get("email") or None, "remote": remote, "profile": { "profile_name": profile["profile_name"], "allowed_operations": profile["allowed_operations"], "forbidden_operations": profile["forbidden_operations"], "environment": profile.get("environment"), "service": profile.get("service"), "identity": profile.get("identity"), "role": profile.get("role"), "profile_address": profile.get("profile_path"), "execution_profile": profile.get("execution_profile"), "audit_label": profile.get("audit_label"), "auth_source_type": profile.get("auth_source_type"), }, } if _reveal_endpoints(): result["server"] = f"https://{h}" return result @mcp.tool() def gitea_get_authenticated_user( remote: str = "dadeschools", host: str | None = None, ) -> dict: """Alias for gitea_whoami. Look up the authenticated Gitea account.""" return gitea_whoami(remote=remote, host=host) @mcp.tool() def gitea_get_current_user( remote: str = "dadeschools", host: str | None = None, ) -> dict: """Alias for gitea_whoami. Look up the authenticated Gitea account.""" return gitea_whoami(remote=remote, host=host) @mcp.tool() def gitea_get_profile( remote: str = "dadeschools", host: str | None = None, resolve_identity: bool = True, ) -> dict: """Describe the active Gitea MCP execution profile for this runtime. Read-only. Reports the non-secret configuration of the running MCP process (profile name, allowed/forbidden operation categories, audit label, auth *status*). Endpoint URLs and token source names are hidden from normal output (#120) and appear only under the GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Optionally resolves the authenticated username via ``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as. This tool never mutates Gitea and never approves, merges, comments, or creates anything. It never returns the token value, Authorization header, password, raw environment, or credential file paths. Identity resolution fails soft: if it cannot be determined, ``authenticated_username`` is null and ``identity_status`` marks it, but the profile config is still returned. Args: remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. resolve_identity: If True, attempt a read-only identity lookup. Returns: dict of safe profile metadata. ``identity_status`` is one of 'verified', 'unknown', 'unavailable', or 'not_resolved'. """ profile = get_profile() reveal = _reveal_endpoints() result = { "profile_name": profile["profile_name"], "allowed_operations": profile["allowed_operations"], "forbidden_operations": profile["forbidden_operations"], "audit_label": profile["audit_label"], "environment": profile.get("environment"), "service": profile.get("service"), "identity": profile.get("identity"), "role": profile.get("role"), "profile_address": profile.get("profile_path"), "execution_profile": profile.get("execution_profile"), "auth_source_type": profile.get("auth_source_type"), # Auth is reported as a status only (#120): the token source *name* # (env var name / keychain id) joins endpoint URLs behind the # GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Token values never appear. "auth_status": ("configured" if profile["token_source_name"] else "unconfigured"), "remote": remote if remote in REMOTES else None, "authenticated_username": None, "identity_status": "not_resolved", } if reveal: result["token_source_name"] = profile["token_source_name"] result["base_url"] = profile["base_url"] result["server"] = None if remote not in REMOTES: # Mark ambiguity rather than raising: the tool stays inspectable. result["identity_status"] = "unknown" result["remote_error"] = f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}" return result h = host or REMOTES[remote]["host"] if reveal: result["server"] = f"https://{h}" if resolve_identity: try: auth = _auth(h) data = api_request("GET", f"https://{h}/api/v1/user", auth) login = (data or {}).get("login") if login: result["authenticated_username"] = login result["identity_status"] = "verified" else: result["identity_status"] = "unknown" except Exception: # Fail soft for the identity field only. Never surface the error # detail or any credential material — just mark it unavailable. result["identity_status"] = "unavailable" return result @mcp.tool() def gitea_get_runtime_context( remote: str = "dadeschools", host: str | None = None, ) -> dict: """Read-only: explicit visibility into active profile, configuration model, and eligibility. Reports config model shape, profile mode (static vs dynamic), and detailed blocks. Args: remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. """ profile = get_profile() config = gitea_config.load_config() reveal = _reveal_endpoints() # Determine config model/version if config is None: config_model = "env-only" elif config.get("version") == 1: config_model = "v1" elif config.get("version") == 2: if "contexts" in config or config.get("shape") == "contexts": config_model = "v2-contexts" else: config_model = "v2-environments" else: config_model = f"unknown-version-{config.get('version')}" # Determine profile source if os.environ.get("GITEA_PROFILE_NAME"): profile_source = "env var" elif gitea_config.selected_profile_name(): profile_source = "config file profile" else: profile_source = "default" h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None) username = _authenticated_username(h) if h else None switching = gitea_config.is_runtime_switching_enabled() profile_mode = "dynamic-profile" if switching else "static-profile" # Evaluate review/merge eligibility allowed = profile["allowed_operations"] or [] forbidden = profile["forbidden_operations"] or [] # Check approve approve_ok, _ = gitea_config.check_operation("approve", allowed, forbidden) # Check merge merge_ok, _ = gitea_config.check_operation("merge", allowed, forbidden) review_merge_allowed = False blocked_reasons = [] suggested_fix = "none" if not username: blocked_reasons.append("Authenticated identity is unresolved. Gitea credentials are missing or invalid.") suggested_fix = "operator action" elif not (approve_ok or merge_ok): blocked_reasons.append( f"Active profile '{profile['profile_name']}' does not permit review or merge operations." ) if switching: suggested_fix = "profile switch" else: suggested_fix = "reviewer namespace" else: review_merge_allowed = True # Note that self-review and self-merge are blocked regardless of profile roles blocked_reasons.append( "Note: self-review and self-merge are always blocked at runtime for any PR authored by the active user." ) safe_next_action = "None; ready for operations." if suggested_fix == "operator action": safe_next_action = f"Ask the operator to configure valid credentials/token for profile '{profile['profile_name']}'." elif suggested_fix == "profile switch": safe_next_action = "Switch to a reviewer profile by calling gitea_activate_profile with a reviewer profile." elif suggested_fix == "reviewer namespace": safe_next_action = ( "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, " "or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile." ) result = { "active_profile": profile["profile_name"], "authenticated_username": username, "remote": remote if remote in REMOTES else None, "config_model": config_model, "profile_source": profile_source, "allowed_operations": allowed, "forbidden_operations": forbidden, "runtime_switching_supported": switching, "profile_mode": profile_mode, "review_merge_allowed": review_merge_allowed, "review_merge_blocked_reasons": blocked_reasons, "suggested_fix": suggested_fix, "safe_next_action": safe_next_action, } if reveal and h: result["server"] = f"https://{h}" return result @mcp.tool() def gitea_list_profiles() -> dict: """Read-only: list all Gitea MCP profiles with redacted metadata. Exposes names, role kinds, enabled state, allowed/forbidden operations, and active status. Token values, keychain IDs, and raw URLs are hidden unless GITEA_MCP_REVEAL_ENDPOINTS=1 is enabled. """ config = gitea_config.load_config() reveal = _reveal_endpoints() active_name = gitea_config.selected_profile_name() or get_profile()["profile_name"] profiles_out = [] if config is None: # Env-only: return the active profile p = get_profile() role = _role_kind(p["allowed_operations"], p["forbidden_operations"]) h = REMOTES.get("dadeschools", {}).get("host") # default host username = _authenticated_username(h) if h else None prof = { "name": p["profile_name"], "role_kind": role, "allowed_operations": p["allowed_operations"], "forbidden_operations": p["forbidden_operations"], "identity_status": "verified" if username else "unresolved", "is_active": True, "enabled": True, } if reveal: prof["token_source_name"] = p["token_source_name"] prof["base_url"] = p["base_url"] profiles_out.append(prof) else: # Load from config profiles profiles_dict = config.get("profiles") or {} unavailable_dict = config.get("unavailable") or {} audit_only_profiles = config.get("audit_only_profiles") or {} # First, process available profiles for name, p in profiles_dict.items(): role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", [])) is_active = (name == active_name) # Identity status lookup if is_active: h = REMOTES.get("dadeschools", {}).get("host") # default host username = _authenticated_username(h) if h else None identity_status = "verified" if username else "unresolved" else: # Safely resolve if credentials are present without networking try: tok = gitea_config.resolve_token(p) identity_status = "credentials present" if tok else "missing credentials" except Exception: identity_status = "missing credentials" prof = { "name": name, "role_kind": role, "allowed_operations": p.get("allowed_operations", []), "forbidden_operations": p.get("forbidden_operations", []), "identity_status": identity_status, "is_active": is_active, "enabled": p.get("enabled", True), } if reveal: if isinstance(p.get("auth"), dict): prof["auth"] = p["auth"] prof["base_url"] = p.get("base_url") else: if isinstance(p.get("auth"), dict): prof["auth"] = {k: ("" if k != "type" else v) for k, v in p["auth"].items()} prof["base_url"] = "" profiles_out.append(prof) # Process unavailable/disabled/audit-only profiles for name, p in audit_only_profiles.items(): role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", [])) is_active = (name == active_name) prof = { "name": name, "role_kind": role, "allowed_operations": p.get("allowed_operations", []), "forbidden_operations": p.get("forbidden_operations", []), "identity_status": "unavailable (disabled)", "is_active": is_active, "enabled": p.get("enabled", True), "unavailable_reason": p.get("_unavailable_reason"), } if reveal: if isinstance(p.get("auth"), dict): prof["auth"] = p["auth"] prof["base_url"] = p.get("base_url") else: if isinstance(p.get("auth"), dict): prof["auth"] = {k: ("" if k != "type" else v) for k, v in p["auth"].items()} prof["base_url"] = "" profiles_out.append(prof) # Handle unavailable mappings that are not in audit_only for name, reason in unavailable_dict.items(): if not any(x["name"] == name for x in profiles_out): profiles_out.append({ "name": name, "role_kind": "limited", "allowed_operations": [], "forbidden_operations": [], "identity_status": "unavailable", "is_active": (name == active_name), "enabled": False, "unavailable_reason": reason, }) return {"profiles": profiles_out} @mcp.tool() def gitea_activate_profile( profile_name: str, remote: str = "dadeschools", host: str | None = None, ) -> dict: """Gated profile activation. Switch the active profile in-place for this runtime session. Only allowed if "allow_runtime_switching": true is explicitly configured. Args: profile_name: Profile to activate. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. """ if not gitea_config.is_runtime_switching_enabled(): return { "success": False, "message": ( "Runtime profile switching is disabled in this configuration. Static profile mode is active. " "To enable switching, set 'allow_runtime_switching': true in the config rules." ) } config = gitea_config.load_config() if not config: return { "success": False, "message": "No profiles configuration file is loaded. Switching is not supported in env-only mode." } profiles_dict = config.get("profiles") or {} if profile_name not in profiles_dict: return { "success": False, "message": f"Profile '{profile_name}' not found in loaded config." } h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None) # 1. Record before state before_profile = get_profile()["profile_name"] before_identity = _authenticated_username(h) if h else None # 2. Perform switch gitea_config._active_profile_override = profile_name # 3. Clear identity cache to force a fresh verification if h: _IDENTITY_CACHE.pop(h, None) # 4. Resolve fresh identity after_profile = get_profile()["profile_name"] after_identity = _authenticated_username(h) if h else None # 5. Audit the switch if auditing is on _audit( "activate_profile", host=h, remote=remote, result={"success": True, "before": before_profile, "after": after_profile}, username=after_identity, ) return { "success": True, "message": f"Successfully activated profile '{profile_name}' (fresh identity verification complete).", "before_profile": before_profile, "before_identity": before_identity, "after_profile": after_profile, "after_identity": after_identity, } @mcp.tool() def gitea_audit_config() -> dict: """Audit the configured profiles/services: enabled state, no secrets. Read-only and local-only: loads the canonical profiles.json named by GITEA_MCP_CONFIG and reports profile/service names, contexts, enabled state, capabilities, auth *status*, and one-line service summaries (e.g. ``PRGS Jenkins: enabled, read-only, authenticated``). Disabled entries are listed so they can be audited, but the server refuses to act with them and never falls back to another profile or service. Never includes endpoint URLs, keychain ids, token source names, or token values. Endpoint-revealing diagnostics exist only in the local admin CLI (``python3 gitea_config.py audit --reveal-endpoints``), never over MCP. """ config = gitea_config.load_config() if config is None: return { "configured": False, "message": "No GITEA_MCP_CONFIG configured; env-only mode.", } report = gitea_config.audit_config(config) report["configured"] = True report["summaries"] = gitea_config.service_summaries(config) return report @mcp.tool() def gitea_mark_issue( issue_number: int, action: str, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Claim or release an issue via the status:in-progress label. This is the cross-agent lock mechanism. Check before starting work. Args: issue_number: The issue number. action: 'start' to claim (add label) or 'done' to release (remove label). remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict with 'success' boolean and 'message'. """ if action not in ("start", "done"): raise ValueError(f"action must be 'start' or 'done', got '{action}'") h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) # Find the status:in-progress label id labels = api_request("GET", f"{base}/labels?limit=100", auth) label_id = None for lb in labels: if lb["name"] == "status:in-progress": label_id = lb["id"] break if label_id is None: raise RuntimeError( "Label 'status:in-progress' not found. " "Run manage_labels.py to create it first." ) if action == "start": with _audited("label_issue", host=h, remote=remote, org=o, repo=r, issue_number=issue_number, request_metadata={"op": "add", "label": "status:in-progress"}): api_request("POST", f"{base}/issues/{issue_number}/labels", auth, {"labels": [label_id]}) return {"success": True, "message": f"Issue #{issue_number} claimed."} else: with _audited("unlabel_issue", host=h, remote=remote, org=o, repo=r, issue_number=issue_number, request_metadata={"op": "remove", "label": "status:in-progress"}): api_request("DELETE", f"{base}/issues/{issue_number}/labels/{label_id}", auth) return {"success": True, "message": f"Issue #{issue_number} released."} @mcp.tool() def gitea_list_labels( remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list: """List all available labels in a Gitea repository. Args: remote: Known Gitea instance ('dadeschools' or 'prgs'). host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: list of labels. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) return api_get_all(f"{base}/labels", auth) @mcp.tool() def gitea_create_label( name: str, color: str, description: str = "", remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: """Create a new label on a Gitea repository. Args: name: Name of the label (e.g. 'bug', 'epic'). color: HTML color code (hex, e.g. 'fbca04' or '#fbca04'). description: Description of the label. remote: Known Gitea instance ('dadeschools' or 'prgs'). host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: dict containing the created label details. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) if color.startswith("#"): color = color[1:] payload = { "name": name, "color": color, "description": description, } with _audited("create_label", host=h, remote=remote, org=o, repo=r, request_metadata={"name": name}): return api_request("POST", f"{base}/labels", auth, payload) @mcp.tool() def gitea_set_issue_labels( issue_number: int, labels: list[str], remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> list: """Replace all labels on a Gitea issue with a new list of label names. Args: issue_number: The issue number. labels: List of label names to apply. remote: Known Gitea instance ('dadeschools' or 'prgs'). host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: list of all labels currently applied to the issue. """ h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) base = repo_api_url(h, o, r) # 1. Fetch existing labels on the repo to resolve names -> IDs existing = api_request("GET", f"{base}/labels?limit=100", auth) name_to_id = {lb["name"]: lb["id"] for lb in existing} # 2. Check if any requested labels do not exist, and raise error label_ids = [] missing_labels = [] for name in labels: if name in name_to_id: label_ids.append(name_to_id[name]) else: missing_labels.append(name) if missing_labels: raise RuntimeError( f"The following labels do not exist on the repository: {missing_labels}. " "Please create them first using gitea_create_label." ) # 3. PUT the labels to the issue with _audited("set_issue_labels", host=h, remote=remote, org=o, repo=r, issue_number=issue_number, request_metadata={"labels": labels}): res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids}) return res @mcp.tool() def gitea_mirror_refs( apply: bool = False, force: bool = False, ) -> dict: """Mirror branches and tags between dadeschools and prgs Timesheet repos. Additive only — never deletes branches or tags. Diverged branches are skipped unless force is True. Args: apply: If True, actually push. If False (default), dry-run only. force: If True, force-push diverged branches. Returns: dict with 'output' (script stdout) and 'return_code'. """ script = os.path.join(PROJECT_ROOT, "mirror_refs.sh") args = [script] if apply: args.append("--apply") if force: args.append("--force") result = subprocess.run( args, capture_output=True, text=True, timeout=120, ) return { "output": result.stdout + result.stderr, "return_code": result.returncode, } # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": mcp.run(transport="stdio")