Files
Gitea-Tools/mcp_server.py
T
sysadmin f04cf44975 feat: gate gitea_merge_pr behind identity/profile/eligibility + confirmation (#16)
Replace the ungated gitea_merge_pr with a gated merge workflow. This is now
the only merge path the MCP server exposes; the merge API is called only
after every safety gate passes.

Gates (fail-closed at each step):
  1. Merge method is merge | squash | rebase.
  2. Explicit confirmation: confirmation must equal "MERGE PR <n>" (without it,
     zero API calls are made).
  3. Reuse gitea_check_pr_eligibility (#14) with action 'merge': proves the
     authenticated identity, the active profile (and that it allows merge), the
     PR author, blocks self-merge, requires the PR open, and fails closed when
     the PR is not mergeable or mergeability is unknown.
  4. Optional expected_head_sha: refuse if the PR head moved.
  5. Optional expected_changed_files: refuse if the PR's changed file set differs.
  6. Redundant self-merge block (auth user == PR author).

The force/ignore-checks option was removed — Gitea's own mergeable signal
(which reflects branch-protection required reviews/checks) must be positive,
so required approval/check state is honoured, never bypassed.

Output reports performed, authenticated user, profile name, PR author, PR
number, head SHA checked, merge method, gates passed/blocked, and merge
result / merge commit — never a token, auth header, or credential. Error text
is scrubbed via _redact.

Surface audit: no ungated merge path remains. The /merge endpoint appears only
inside gitea_merge_pr; gitea_review_pr fails closed on merge=True before any
API call; gitea_submit_pr_review has no merge parameter and 'merge' is not a
reviewable action. Tests assert all three.

Tests cover: merge succeeds only when all gates pass; self-author blocked;
unknown identity/profile blocked; profile without merge permission blocked;
missing/wrong confirmation blocked (no API call); head-SHA mismatch blocked;
changed-files mismatch blocked; closed PR blocked; non-mergeable blocked;
unknown mergeability fail-closed; no merge call when gates fail; invalid merge
method rejected; output and error redaction; and the no-ungated-merge-path audit.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 15:03:49 -04:00

1394 lines
48 KiB
Python

#!/usr/bin/env python3
"""Gitea MCP Server — exposes Gitea operations as MCP tools.
Runs over stdio. All tools authenticate via macOS keychain (git credential fill).
Usage (standalone test):
python3 mcp_server.py
Configuration (mcp_config.json):
"gitea-tools": {
"command": "/Users/jasonwalker/Development/Gitea-Tools/venv/bin/python3",
"args": ["/Users/jasonwalker/Development/Gitea-Tools/mcp_server.py"],
"env": {}
}
"""
import os
import sys
import subprocess
# Resolve the project root. MCP clients must launch this script directly with
# the venv interpreter (venv/bin/python3) — see the config example above. We do
# NOT os.execv() to re-point the interpreter: replacing the process after the
# client has already wired up the stdio pipes can desync the JSON-RPC transport
# (observed with Antigravity/Cascade hosts).
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
# Ensure the project root is on the path so gitea_auth.py can be imported.
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from mcp.server.fastmcp import FastMCP # noqa: E402
from gitea_auth import ( # noqa: E402
REMOTES,
get_credentials,
get_auth_header,
api_request,
repo_api_url,
get_profile,
)
mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
"Use the gitea_ prefixed tools to create issues, PRs, list issues, etc."
))
# ── Helpers ───────────────────────────────────────────────────────────────────
def _resolve(remote: str, host: str | None, org: str | None, repo: str | None):
"""Resolve remote + overrides to (host, org, repo)."""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
profile = REMOTES[remote]
return (
host or profile["host"],
org or profile["org"],
repo or profile["repo"],
)
def _auth(host: str) -> str:
"""Get auth header, raise if unavailable."""
header = get_auth_header(host)
if header is None:
raise RuntimeError(
f"No credentials for {host}. "
"Ensure you've logged in via HTTPS at least once."
)
return header
# ── Tools ─────────────────────────────────────────────────────────────────────
@mcp.tool()
def gitea_create_issue(
title: str,
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new issue on a Gitea repository.
Args:
title: Issue title (required).
body: Issue body text.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues"
data = api_request("POST", url, auth, {"title": title, "body": body})
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_create_pr(
title: str,
head: str,
base: str = "main",
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a pull request on a Gitea repository.
Args:
title: PR title (required).
head: Source branch name (required).
base: Target branch (default: 'main').
body: PR description.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created PR.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base}
data = api_request("POST", url, auth, payload)
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_list_prs(
state: str = "open",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List pull requests on a Gitea repository.
Args:
state: State filter — 'open', 'closed', or 'all' (default: 'open').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'head', 'base', 'url', 'mergeable'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls?state={state}"
prs = api_request("GET", url, auth) or []
return [
{
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
}
for pr in prs
]
@mcp.tool()
def gitea_view_pr(
pr_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get details of a single pull request.
Args:
pr_number: The pull request index/number.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with PR details.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
pr = api_request("GET", url, auth)
return {
"number": pr["number"],
"title": pr["title"],
"body": pr.get("body", ""),
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
"user": pr.get("user", {}).get("login", ""),
}
# Actions whose eligibility this tool can evaluate.
_ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge")
@mcp.tool()
def gitea_check_pr_eligibility(
pr_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Read-only: is the current identity/profile eligible to perform *action*
on a PR?
Evaluates eligibility only — it NEVER reviews, approves, requests changes,
merges, or mutates anything. It inspects the authenticated identity
(via the /user endpoint), the active runtime profile metadata
(``get_profile``), and the target PR (author, state, head SHA,
mergeability), then returns a decision with clear reasons.
Fail-closed rules:
- Unknown action or unknown remote → not eligible.
- Profile has no configured allowed operations, or the action is not in
the profile's allowed operations (or is forbidden) → not eligible.
- Authenticated identity cannot be determined → not eligible.
- Authenticated user equals the PR author → not eligible to ``approve`` or
``merge``.
- PR is not open → not eligible.
- For ``merge``, PR must be reported mergeable.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: Target PR number.
action: One of 'review', 'approve', 'request_changes', 'merge'.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'eligible' (bool), the inputs inspected, and 'reasons'.
"""
action = (action or "").strip().lower()
profile = get_profile()
result = {
"eligible": False,
"requested_action": action,
"authenticated_user": None,
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"pr_author": None,
"pr_number": pr_number,
"pr_state": None,
"head_sha": None,
"mergeable": None,
"remote": remote if remote in REMOTES else None,
"reasons": [],
}
reasons = result["reasons"]
if action not in _ELIGIBILITY_ACTIONS:
reasons.append(
f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}"
)
return result
if remote not in REMOTES:
reasons.append(f"unknown remote '{remote}'")
return result
# Profile capability check (metadata only; not enforcement of the action).
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
if not allowed:
reasons.append("profile has no configured allowed operations (fail closed)")
if action in forbidden:
reasons.append(f"profile forbids '{action}'")
elif action not in allowed:
reasons.append(f"profile is not allowed to {action}")
h, o, r = _resolve(remote, host, org, repo)
# Authenticated identity (read-only). Fail soft; never leak error/secret.
try:
auth = _auth(h)
except Exception:
auth = None
auth_user = None
if auth:
try:
who = api_request("GET", f"https://{h}/api/v1/user", auth)
auth_user = (who or {}).get("login")
except Exception:
auth_user = None
result["authenticated_user"] = auth_user
if not auth_user:
reasons.append("authenticated identity could not be determined")
# PR facts (read-only GET; no mutation).
pr_author = None
pr_state = None
if auth:
try:
pr = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth
)
pr_author = (pr or {}).get("user", {}).get("login")
pr_state = (pr or {}).get("state")
result["head_sha"] = ((pr or {}).get("head") or {}).get("sha")
result["mergeable"] = (pr or {}).get("mergeable")
except Exception:
reasons.append("PR details could not be retrieved")
else:
reasons.append("PR details could not be retrieved (no credentials)")
result["pr_author"] = pr_author
result["pr_state"] = pr_state
# PR must be open to act on.
if pr_state is None:
reasons.append("PR state unknown")
elif pr_state != "open":
reasons.append(f"PR is not open (state={pr_state})")
# Self-author must not approve or merge their own PR.
if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"):
reasons.append("authenticated user is PR author")
# Merge needs a positive mergeability signal.
if action == "merge":
if result["mergeable"] is False:
reasons.append("PR is not mergeable")
elif result["mergeable"] is None:
reasons.append("PR mergeability unknown")
result["eligible"] = len(reasons) == 0
if result["eligible"]:
reasons.append("all eligibility checks passed")
return result
# Review actions this gated tool can perform, mapped to (eligibility action,
# Gitea review *event*). The eligibility action is fed to
# ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same
# identity/profile/author gates. Note: 'merge' is deliberately absent — merge
# belongs to a separate tool/issue and is never performed here.
_REVIEW_ACTIONS = {
# 'comment' posts review findings without an approval/rejection state.
# #14 names this eligibility category 'review'.
"comment": ("review", "COMMENT"),
"approve": ("approve", "APPROVE"),
"request_changes": ("request_changes", "REQUEST_CHANGES"),
}
# Patterns scrubbed from any surfaced error text so a credential can never leak.
_SECRET_PREFIXES = ("token ", "Basic ")
def _redact(text: str) -> str:
"""Strip anything that looks like an Authorization credential from *text*.
Errors raised by ``api_request`` echo the server response body, not the
request headers, so a token should never appear — this is defence in depth
so a future change can't leak ``token …`` / ``Basic …`` material into a
tool result or log line.
"""
if not text:
return text
out = text
for prefix in _SECRET_PREFIXES:
idx = 0
while True:
i = out.find(prefix, idx)
if i == -1:
break
j = i + len(prefix)
while j < len(out) and not out[j].isspace():
j += 1
out = out[:i] + prefix + "[REDACTED]" + out[j:]
idx = i + len(prefix) + len("[REDACTED]")
return out
@mcp.tool()
def gitea_submit_pr_review(
pr_number: int,
action: str,
body: str = "",
expected_head_sha: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Gated PR review mutation: comment findings, request changes, or approve.
This is the only tool that submits a Gitea PR *review*. It performs a
mutation **only after every safety gate passes**; if any gate fails it
returns ``performed=False`` and never calls the mutating endpoint.
Gate order (fail-closed at each step):
1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'.
2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated
-user lookup, active-profile lookup, PR-author lookup, self-approval
block, and profile-allowed-operation check. ``approve`` requires
eligibility for 'approve', ``request_changes`` requires
'request_changes', and ``comment`` requires 'review'.
3. Redundantly block self-approval (authenticated user == PR author).
4. If ``expected_head_sha`` is supplied and the PR head has moved, abort.
5. Only then POST the review.
Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the
*formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES
review state tied to the head commit), chosen over the plain issue-comment
endpoint (``/issues/{n}/comments``) so that approvals and change requests
carry real review state — a plain comment cannot approve or block a PR.
Merge is intentionally NOT implemented here.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: Target PR number.
action: 'comment', 'approve', or 'request_changes'.
body: Review body / finding text.
expected_head_sha: Optional. If given and the PR head SHA differs, the
review is refused (guards against reviewing a changed PR).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict describing the attempt: action, whether it was performed, the
authenticated user, profile name, PR author, PR number, head SHA
checked, and the reasons/gates passed or blocked. Never secrets.
"""
action = (action or "").strip().lower()
result = {
"requested_action": action,
"performed": False,
"authenticated_user": None,
"profile_name": get_profile()["profile_name"],
"pr_author": None,
"pr_number": pr_number,
"head_sha": None,
"expected_head_sha": expected_head_sha,
"remote": remote if remote in REMOTES else None,
"reasons": [],
}
reasons = result["reasons"]
# Gate 1 — valid review action (no mutation on unknown action).
if action not in _REVIEW_ACTIONS:
reasons.append(
f"unknown review action '{action}'; expected one of "
f"{sorted(_REVIEW_ACTIONS)}"
)
return result
eligibility_action, event = _REVIEW_ACTIONS[action]
# Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve
# + profile-allowed). This performs only read-only GETs.
elig = gitea_check_pr_eligibility(
pr_number=pr_number,
action=eligibility_action,
remote=remote,
host=host,
org=org,
repo=repo,
)
result["authenticated_user"] = elig.get("authenticated_user")
result["profile_name"] = elig.get("profile_name", result["profile_name"])
result["pr_author"] = elig.get("pr_author")
result["head_sha"] = elig.get("head_sha")
if not elig.get("eligible"):
reasons.append(
f"eligibility check for '{eligibility_action}' failed (fail closed)"
)
reasons.extend(elig.get("reasons", []))
return result
# Gate 3 — redundant self-approval block (belt-and-suspenders over #14).
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
if action == "approve" and auth_user and pr_author and auth_user == pr_author:
reasons.append("self-approval blocked (authenticated user is PR author)")
return result
# Gate 4 — head SHA must match if the caller pinned one.
actual_sha = result["head_sha"]
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
reasons.append(
"expected head SHA does not match current PR head (fail closed)"
)
return result
if not actual_sha:
# Should be unreachable — eligibility fails closed without a head SHA —
# but never submit a review without a commit to pin it to.
reasons.append("PR head SHA unavailable (fail closed)")
return result
# All gates passed — perform the single mutating call.
h, o, r = _resolve(remote, host, org, repo)
try:
auth = _auth(h)
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
payload = {"body": body, "event": event, "commit_id": actual_sha}
api_request("POST", review_url, auth, payload)
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(f"review submission failed: {_redact(str(exc))}")
return result
result["performed"] = True
reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}")
return result
@mcp.tool()
def gitea_edit_pr(
pr_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
base: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Edit an existing pull request on a Gitea repository.
Args:
pr_number: The pull request index/number (required).
title: New PR title.
body: New PR description.
state: New state — 'open' or 'closed'.
base: Target branch name.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and details of the edited PR.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
payload["state"] = state
if base is not None:
payload["base"] = base
if not payload:
raise ValueError("At least one field to edit (title, body, state, base) must be provided.")
data = api_request("PATCH", url, auth, payload)
return {
"success": True,
"number": data["number"],
"title": data["title"],
"body": data.get("body", ""),
"state": data["state"],
"url": data["html_url"],
}
@mcp.tool()
def gitea_get_file(
filepath: str,
ref: str = "main",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Retrieve metadata and content of a file from a Gitea repository.
Args:
filepath: The path to the file in the repository (e.g. 'README.md' or 'src/main.py').
ref: The branch, tag, or commit hash to retrieve the file from (default: 'main').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict containing 'name', 'path', 'sha', 'size', 'encoding', and 'content' (base64).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
import urllib.parse
encoded_path = urllib.parse.quote(filepath, safe="")
url = f"{repo_api_url(h, o, r)}/contents/{encoded_path}?ref={ref}"
data = api_request("GET", url, auth)
return {
"name": data.get("name", ""),
"path": data.get("path", ""),
"sha": data.get("sha", ""),
"size": data.get("size", 0),
"encoding": data.get("encoding", ""),
"content": data.get("content", ""),
}
@mcp.tool()
def gitea_commit_files(
files: list[dict],
message: str,
branch: str | None = None,
new_branch: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Commit changes to multiple files in a Gitea repository in a single atomic commit.
Args:
files: List of file operations. Each file dict must contain 'operation' ('create', 'update', 'delete', 'rename'), 'path', and 'content' (base64 encoded for create/update), and optionally 'sha' (required for update/delete) or 'from_path' (for rename).
message: The commit message.
branch: Optional existing branch to start/commit from.
new_branch: Optional new branch name to create for this commit.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and commit/branch information.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/contents"
payload = {
"files": files,
"message": message,
}
if branch is not None:
payload["branch"] = branch
if new_branch is not None:
payload["new_branch"] = new_branch
data = api_request("POST", url, auth, payload)
return {
"success": True,
"commit": data.get("commit", {}).get("sha", ""),
"branch": data.get("branch", {}).get("name", ""),
}
# Merge methods supported by the Gitea merge API.
_MERGE_METHODS = ("merge", "squash", "rebase")
@mcp.tool()
def gitea_merge_pr(
pr_number: int,
confirmation: str = "",
expected_head_sha: str | None = None,
expected_changed_files: list[str] | None = None,
do: str = "merge",
title: str | None = None,
message: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Gated merge of a Gitea pull request (#16).
This is the ONLY merge path this server exposes, and it mutates only after
every safety gate passes. No ungated merge tool remains: legacy
``gitea_review_pr`` fails closed on ``merge=True`` and
``gitea_submit_pr_review`` never merges.
Gate order (fail-closed at each step; the merge API is called only if all
gates pass):
1. Merge method (``do``) is 'merge', 'squash', or 'rebase'.
2. Explicit confirmation: ``confirmation`` must equal ``"MERGE PR <n>"``.
Without it, the tool makes no API calls at all.
3. Reuse ``gitea_check_pr_eligibility`` (#14) with action 'merge': this
proves the authenticated identity, the active profile (and that it
allows merge), the PR author, blocks self-merge, requires the PR to be
open, and fails closed when the PR is not mergeable or mergeability is
unknown.
4. If ``expected_head_sha`` is given and the PR head moved → refuse.
5. If ``expected_changed_files`` is given and the PR's changed file set
differs → refuse.
6. Redundant self-merge block (authenticated user == PR author).
No force / ignore-checks option is exposed. Gitea's own ``mergeable`` signal
(which reflects branch-protection required reviews and status checks) must
be positive, so required approval/check state is honoured, never bypassed.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: The PR number to merge.
confirmation: Must be exactly ``"MERGE PR <pr_number>"`` or merge is refused.
expected_head_sha: Strongly recommended. If set and the PR head differs, refuse.
expected_changed_files: Optional. If set and the PR's changed file set
differs, refuse.
do: Merge style — 'merge', 'squash', or 'rebase'.
title: Optional merge commit title.
message: Optional merge commit message.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict describing the attempt: performed, authenticated user, profile
name, PR author, PR number, head SHA checked, merge method,
reasons/gates passed or blocked, and merge result / merge commit if
available. Never secrets.
"""
do = (do or "").strip().lower()
result = {
"performed": False,
"authenticated_user": None,
"profile_name": get_profile()["profile_name"],
"pr_author": None,
"pr_number": pr_number,
"head_sha": None,
"expected_head_sha": expected_head_sha,
"merge_method": do,
"mergeable": None,
"remote": remote if remote in REMOTES else None,
"merge_result": None,
"merge_commit": None,
"reasons": [],
}
reasons = result["reasons"]
# Gate 1 — valid merge method (no API call on a bad method).
if do not in _MERGE_METHODS:
reasons.append(
f"unknown merge method '{do}'; expected one of {list(_MERGE_METHODS)}"
)
return result
# Gate 2 — explicit confirmation (fail fast; zero API calls without it).
expected_confirmation = f"MERGE PR {pr_number}"
if (confirmation or "").strip() != expected_confirmation:
reasons.append(
f"explicit confirmation required: pass confirmation='{expected_confirmation}'"
)
return result
# Gate 3 — reuse #14 eligibility (identity + profile + merge-allowed +
# author + self-merge block + open + mergeable/unknown fail-closed).
# Read-only GETs only.
elig = gitea_check_pr_eligibility(
pr_number=pr_number, action="merge", remote=remote,
host=host, org=org, repo=repo,
)
result["authenticated_user"] = elig.get("authenticated_user")
result["profile_name"] = elig.get("profile_name", result["profile_name"])
result["pr_author"] = elig.get("pr_author")
result["head_sha"] = elig.get("head_sha")
result["mergeable"] = elig.get("mergeable")
if not elig.get("eligible"):
reasons.append("eligibility check for 'merge' failed (fail closed)")
reasons.extend(elig.get("reasons", []))
return result
# Gate 4 — head SHA must match if the caller pinned a reviewed SHA.
actual_sha = result["head_sha"]
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
reasons.append(
"expected head SHA does not match current PR head (fail closed)"
)
return result
if not actual_sha:
# Unreachable — eligibility fails closed without a head SHA — but never
# merge a PR whose head commit we could not read.
reasons.append("PR head SHA unavailable (fail closed)")
return result
h, o, r = _resolve(remote, host, org, repo)
# Gate 5 — changed files must match the reviewed set, if provided.
if expected_changed_files is not None:
try:
auth = _auth(h)
files = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}/files", auth
)
actual_files = sorted(
(f or {}).get("filename", "") for f in (files or [])
)
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(
f"could not verify changed files (fail closed): {_redact(str(exc))}"
)
return result
result["changed_files"] = actual_files
if actual_files != sorted(expected_changed_files):
reasons.append(
"PR changed files do not match expected_changed_files (fail closed)"
)
return result
# Gate 6 — redundant self-merge block (belt-and-suspenders over #14).
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
if auth_user and pr_author and auth_user == pr_author:
reasons.append("self-merge blocked (authenticated user is PR author)")
return result
# All gates passed — perform the single merge mutation.
try:
auth = _auth(h)
merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge"
payload = {"Do": do}
if title:
payload["MergeTitleField"] = title
if message:
payload["MergeMessageField"] = message
api_request("POST", merge_url, auth, payload)
# Best-effort read-back of the merge commit SHA (redacted on error).
try:
merged = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth
)
result["merge_commit"] = (merged or {}).get("merged_commit_sha")
except Exception:
result["merge_commit"] = None
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(f"merge failed: {_redact(str(exc))}")
return result
result["performed"] = True
result["merge_result"] = f"PR #{pr_number} merged via '{do}'."
reasons.append(f"all gates passed; merged PR #{pr_number} via '{do}'")
return result
@mcp.tool()
def gitea_review_pr(
pr_number: int,
event: str = "APPROVE",
body: str = "",
merge: bool = False,
merge_method: str = "merge",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Submit a review on a Gitea pull request (Legacy wrapper).
This tool is a compatibility wrapper around the safe `gitea_submit_pr_review`.
It uses the same #14 eligibility gates.
Merging via this tool is no longer supported and will fail closed (see #16).
Args:
pr_number: The PR number to review.
event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'.
body: Review body text / comment.
merge: Merging is disabled; if True, the tool fails closed.
merge_method: Ignored.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and message.
"""
if merge:
return {
"success": False,
"message": "merge=True is no longer supported in this tool (belongs to #16)."
}
if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]:
raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.")
# Map legacy event string to the action expected by gitea_submit_pr_review
event_map = {
"APPROVE": "approve",
"COMMENT": "comment",
"REQUEST_CHANGES": "request_changes"
}
action = event_map[event]
result = gitea_submit_pr_review(
pr_number=pr_number,
action=action,
body=body,
expected_head_sha=None,
remote=remote,
host=host,
org=org,
repo=repo
)
if result.get("performed"):
return {"success": True, "message": f"Successfully submitted review for PR #{pr_number} with event '{event}'."}
else:
reasons = result.get("reasons", [])
return {"success": False, "message": f"Review submission failed eligibility gates: {reasons}"}
@mcp.tool()
def gitea_delete_branch(
branch: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Delete a remote branch from a Gitea repository.
Args:
branch: The remote branch name (e.g. 'feat/branch-name').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
import urllib.parse
encoded_branch = urllib.parse.quote(branch, safe="")
url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}"
api_request("DELETE", url, auth)
return {"success": True, "message": f"Remote branch '{branch}' deleted."}
@mcp.tool()
def gitea_close_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Close an issue by setting its state to 'closed'.
Args:
issue_number: The issue number to close.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
api_request("PATCH", url, auth, {"state": "closed"})
return {"success": True, "message": f"Issue #{issue_number} closed."}
@mcp.tool()
def gitea_list_issues(
state: str = "open",
label: str | None = None,
limit: int = 50,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List issues on a Gitea repository with optional filters.
Args:
state: Filter by state — 'open', 'closed', or 'all'.
label: Filter by label name (e.g. 'important').
limit: Max number of issues to return (default: 50).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'labels', 'assignee'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
params = f"state={state}&limit={limit}&type=issues"
if label:
params += f"&labels={label}"
url = f"{repo_api_url(h, o, r)}/issues?{params}"
issues = api_request("GET", url, auth)
return [
{
"number": i["number"],
"title": i["title"],
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
}
for i in issues
]
@mcp.tool()
def gitea_view_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get full details of a single issue.
Args:
issue_number: The issue number to view.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
i = api_request("GET", url, auth)
return {
"number": i["number"],
"title": i["title"],
"body": i.get("body", ""),
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
"url": i["html_url"],
}
@mcp.tool()
def gitea_whoami(
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Look up the Gitea account the MCP server is authenticated as.
Read-only. Calls Gitea's authenticated-user endpoint (GET /api/v1/user)
with the configured token and returns safe identity metadata only. Use
this to prove which account a mutating workflow (e.g. review/merge) would
act as, so self-review/self-merge can be detected before acting.
Never returns the token, Authorization header, password, or any other
secret material. Fails closed with a clear error if the identity cannot
be determined.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
Returns:
dict with 'authenticated', 'username', 'display_name', 'user_id',
'email', 'server', 'remote', and 'profile' (safe runtime profile
metadata: profile_name + allowed_operations; never the token).
"""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
h = host or REMOTES[remote]["host"]
auth = _auth(h)
url = f"https://{h}/api/v1/user"
data = api_request("GET", url, auth)
if not data or not data.get("login"):
# Fail closed: never assume an identity we could not verify.
raise RuntimeError(
f"Could not determine the authenticated Gitea identity for {h}. "
"Verify the configured token is valid for this instance."
)
# Runtime profile metadata is non-secret (name + allowed op categories).
# The token is resolved separately and is never included here.
profile = get_profile()
return {
"authenticated": True,
"username": data.get("login"),
"display_name": data.get("full_name") or None,
"user_id": data.get("id"),
"email": data.get("email") or None,
"server": f"https://{h}",
"remote": remote,
"profile": {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
},
}
@mcp.tool()
def gitea_get_profile(
remote: str = "dadeschools",
host: str | None = None,
resolve_identity: bool = True,
) -> dict:
"""Describe the active Gitea MCP execution profile for this runtime.
Read-only. Reports the non-secret configuration of the running MCP
process (profile name, allowed/forbidden operation categories, audit
label, token *source name*, base URL) plus the resolved server for the
given remote. Optionally resolves the authenticated username via
``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as.
This tool never mutates Gitea and never approves, merges, comments, or
creates anything. It never returns the token value, Authorization header,
password, raw environment, or credential file paths. Identity resolution
fails soft: if it cannot be determined, ``authenticated_username`` is null
and ``identity_status`` marks it, but the profile config is still returned.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
resolve_identity: If True, attempt a read-only identity lookup.
Returns:
dict of safe profile metadata. ``identity_status`` is one of
'verified', 'unknown', 'unavailable', or 'not_resolved'.
"""
profile = get_profile()
result = {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"],
"audit_label": profile["audit_label"],
"token_source_name": profile["token_source_name"],
"base_url": profile["base_url"],
"remote": remote if remote in REMOTES else None,
"server": None,
"authenticated_username": None,
"identity_status": "not_resolved",
}
if remote not in REMOTES:
# Mark ambiguity rather than raising: the tool stays inspectable.
result["identity_status"] = "unknown"
result["remote_error"] = f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}"
return result
h = host or REMOTES[remote]["host"]
result["server"] = f"https://{h}"
if resolve_identity:
try:
auth = _auth(h)
data = api_request("GET", f"https://{h}/api/v1/user", auth)
login = (data or {}).get("login")
if login:
result["authenticated_username"] = login
result["identity_status"] = "verified"
else:
result["identity_status"] = "unknown"
except Exception:
# Fail soft for the identity field only. Never surface the error
# detail or any credential material — just mark it unavailable.
result["identity_status"] = "unavailable"
return result
@mcp.tool()
def gitea_mark_issue(
issue_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Claim or release an issue via the status:in-progress label.
This is the cross-agent lock mechanism. Check before starting work.
Args:
issue_number: The issue number.
action: 'start' to claim (add label) or 'done' to release (remove label).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
if action not in ("start", "done"):
raise ValueError(f"action must be 'start' or 'done', got '{action}'")
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# Find the status:in-progress label id
labels = api_request("GET", f"{base}/labels?limit=100", auth)
label_id = None
for lb in labels:
if lb["name"] == "status:in-progress":
label_id = lb["id"]
break
if label_id is None:
raise RuntimeError(
"Label 'status:in-progress' not found. "
"Run manage_labels.py to create it first."
)
if action == "start":
api_request("POST", f"{base}/issues/{issue_number}/labels", auth,
{"labels": [label_id]})
return {"success": True, "message": f"Issue #{issue_number} claimed."}
else:
api_request("DELETE",
f"{base}/issues/{issue_number}/labels/{label_id}", auth)
return {"success": True, "message": f"Issue #{issue_number} released."}
@mcp.tool()
def gitea_list_labels(
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list:
"""List all available labels in a Gitea repository.
Args:
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
list of labels.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
return api_request("GET", f"{base}/labels?limit=100", auth)
@mcp.tool()
def gitea_create_label(
name: str,
color: str,
description: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new label on a Gitea repository.
Args:
name: Name of the label (e.g. 'bug', 'epic').
color: HTML color code (hex, e.g. 'fbca04' or '#fbca04').
description: Description of the label.
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict containing the created label details.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
if color.startswith("#"):
color = color[1:]
payload = {
"name": name,
"color": color,
"description": description,
}
return api_request("POST", f"{base}/labels", auth, payload)
@mcp.tool()
def gitea_set_issue_labels(
issue_number: int,
labels: list[str],
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list:
"""Replace all labels on a Gitea issue with a new list of label names.
Args:
issue_number: The issue number.
labels: List of label names to apply.
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
list of all labels currently applied to the issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# 1. Fetch existing labels on the repo to resolve names -> IDs
existing = api_request("GET", f"{base}/labels?limit=100", auth)
name_to_id = {lb["name"]: lb["id"] for lb in existing}
# 2. Check if any requested labels do not exist, and raise error
label_ids = []
missing_labels = []
for name in labels:
if name in name_to_id:
label_ids.append(name_to_id[name])
else:
missing_labels.append(name)
if missing_labels:
raise RuntimeError(
f"The following labels do not exist on the repository: {missing_labels}. "
"Please create them first using gitea_create_label."
)
# 3. PUT the labels to the issue
res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids})
return res
@mcp.tool()
def gitea_mirror_refs(
apply: bool = False,
force: bool = False,
) -> dict:
"""Mirror branches and tags between dadeschools and prgs Timesheet repos.
Additive only — never deletes branches or tags. Diverged branches are
skipped unless force is True.
Args:
apply: If True, actually push. If False (default), dry-run only.
force: If True, force-push diverged branches.
Returns:
dict with 'output' (script stdout) and 'return_code'.
"""
script = os.path.join(PROJECT_ROOT, "mirror_refs.sh")
args = [script]
if apply:
args.append("--apply")
if force:
args.append("--force")
result = subprocess.run(
args, capture_output=True, text=True, timeout=120,
)
return {
"output": result.stdout + result.stderr,
"return_code": result.returncode,
}
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")