Files
Gitea-Tools/mcp_server.py
T
sysadmin f05e58c847 feat: add gated gitea_submit_pr_review review actions (#15)
Add gitea_submit_pr_review, the only tool that submits a Gitea PR review.
It performs a review mutation (comment / approve / request_changes) only
after every safety gate passes, and never merges.

Gates (fail-closed at each step):
  1. Validate action is comment | approve | request_changes.
  2. Reuse gitea_check_pr_eligibility (#14) for authenticated-user lookup,
     active-profile lookup, PR-author lookup, self-approval block, and the
     profile-allowed-operation check. approve requires 'approve' eligibility,
     request_changes requires 'request_changes', comment requires 'review'.
  3. Redundant self-approval block (auth user == PR author).
  4. Optional expected_head_sha: refuse if the PR head has moved.
  5. Only then POST /repos/{owner}/{repo}/pulls/{n}/reviews (formal review
     endpoint, so approvals/change-requests carry real review state).

Output reports action, whether performed, authenticated user, profile name,
PR author, PR number, head SHA checked, and reasons — never a token, auth
header, or credential. Error text is scrubbed via _redact as defence in depth.

Merge is intentionally not implemented (belongs to #16).

Tests cover: self-author approve blocked, approve/request_changes/comment
succeed only when eligible, unknown identity fail-closed, disallowed profile
op blocked, head-SHA mismatch blocked, no mutation when gates fail, invalid
action rejected, and secret redaction in output and error paths.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 14:31:34 -04:00

1250 lines
42 KiB
Python

#!/usr/bin/env python3
"""Gitea MCP Server — exposes Gitea operations as MCP tools.
Runs over stdio. All tools authenticate via macOS keychain (git credential fill).
Usage (standalone test):
python3 mcp_server.py
Configuration (mcp_config.json):
"gitea-tools": {
"command": "/Users/jasonwalker/Development/Gitea-Tools/venv/bin/python3",
"args": ["/Users/jasonwalker/Development/Gitea-Tools/mcp_server.py"],
"env": {}
}
"""
import os
import sys
import subprocess
# Resolve the project root. MCP clients must launch this script directly with
# the venv interpreter (venv/bin/python3) — see the config example above. We do
# NOT os.execv() to re-point the interpreter: replacing the process after the
# client has already wired up the stdio pipes can desync the JSON-RPC transport
# (observed with Antigravity/Cascade hosts).
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
# Ensure the project root is on the path so gitea_auth.py can be imported.
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from mcp.server.fastmcp import FastMCP # noqa: E402
from gitea_auth import ( # noqa: E402
REMOTES,
get_credentials,
get_auth_header,
api_request,
repo_api_url,
get_profile,
)
mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
"Use the gitea_ prefixed tools to create issues, PRs, list issues, etc."
))
# ── Helpers ───────────────────────────────────────────────────────────────────
def _resolve(remote: str, host: str | None, org: str | None, repo: str | None):
"""Resolve remote + overrides to (host, org, repo)."""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
profile = REMOTES[remote]
return (
host or profile["host"],
org or profile["org"],
repo or profile["repo"],
)
def _auth(host: str) -> str:
"""Get auth header, raise if unavailable."""
header = get_auth_header(host)
if header is None:
raise RuntimeError(
f"No credentials for {host}. "
"Ensure you've logged in via HTTPS at least once."
)
return header
# ── Tools ─────────────────────────────────────────────────────────────────────
@mcp.tool()
def gitea_create_issue(
title: str,
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new issue on a Gitea repository.
Args:
title: Issue title (required).
body: Issue body text.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues"
data = api_request("POST", url, auth, {"title": title, "body": body})
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_create_pr(
title: str,
head: str,
base: str = "main",
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a pull request on a Gitea repository.
Args:
title: PR title (required).
head: Source branch name (required).
base: Target branch (default: 'main').
body: PR description.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created PR.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base}
data = api_request("POST", url, auth, payload)
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_list_prs(
state: str = "open",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List pull requests on a Gitea repository.
Args:
state: State filter — 'open', 'closed', or 'all' (default: 'open').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'head', 'base', 'url', 'mergeable'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls?state={state}"
prs = api_request("GET", url, auth) or []
return [
{
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
}
for pr in prs
]
@mcp.tool()
def gitea_view_pr(
pr_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get details of a single pull request.
Args:
pr_number: The pull request index/number.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with PR details.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
pr = api_request("GET", url, auth)
return {
"number": pr["number"],
"title": pr["title"],
"body": pr.get("body", ""),
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
"user": pr.get("user", {}).get("login", ""),
}
# Actions whose eligibility this tool can evaluate.
_ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge")
@mcp.tool()
def gitea_check_pr_eligibility(
pr_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Read-only: is the current identity/profile eligible to perform *action*
on a PR?
Evaluates eligibility only — it NEVER reviews, approves, requests changes,
merges, or mutates anything. It inspects the authenticated identity
(via the /user endpoint), the active runtime profile metadata
(``get_profile``), and the target PR (author, state, head SHA,
mergeability), then returns a decision with clear reasons.
Fail-closed rules:
- Unknown action or unknown remote → not eligible.
- Profile has no configured allowed operations, or the action is not in
the profile's allowed operations (or is forbidden) → not eligible.
- Authenticated identity cannot be determined → not eligible.
- Authenticated user equals the PR author → not eligible to ``approve`` or
``merge``.
- PR is not open → not eligible.
- For ``merge``, PR must be reported mergeable.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: Target PR number.
action: One of 'review', 'approve', 'request_changes', 'merge'.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'eligible' (bool), the inputs inspected, and 'reasons'.
"""
action = (action or "").strip().lower()
profile = get_profile()
result = {
"eligible": False,
"requested_action": action,
"authenticated_user": None,
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"pr_author": None,
"pr_number": pr_number,
"pr_state": None,
"head_sha": None,
"mergeable": None,
"remote": remote if remote in REMOTES else None,
"reasons": [],
}
reasons = result["reasons"]
if action not in _ELIGIBILITY_ACTIONS:
reasons.append(
f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}"
)
return result
if remote not in REMOTES:
reasons.append(f"unknown remote '{remote}'")
return result
# Profile capability check (metadata only; not enforcement of the action).
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
if not allowed:
reasons.append("profile has no configured allowed operations (fail closed)")
if action in forbidden:
reasons.append(f"profile forbids '{action}'")
elif action not in allowed:
reasons.append(f"profile is not allowed to {action}")
h, o, r = _resolve(remote, host, org, repo)
# Authenticated identity (read-only). Fail soft; never leak error/secret.
try:
auth = _auth(h)
except Exception:
auth = None
auth_user = None
if auth:
try:
who = api_request("GET", f"https://{h}/api/v1/user", auth)
auth_user = (who or {}).get("login")
except Exception:
auth_user = None
result["authenticated_user"] = auth_user
if not auth_user:
reasons.append("authenticated identity could not be determined")
# PR facts (read-only GET; no mutation).
pr_author = None
pr_state = None
if auth:
try:
pr = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth
)
pr_author = (pr or {}).get("user", {}).get("login")
pr_state = (pr or {}).get("state")
result["head_sha"] = ((pr or {}).get("head") or {}).get("sha")
result["mergeable"] = (pr or {}).get("mergeable")
except Exception:
reasons.append("PR details could not be retrieved")
else:
reasons.append("PR details could not be retrieved (no credentials)")
result["pr_author"] = pr_author
result["pr_state"] = pr_state
# PR must be open to act on.
if pr_state is None:
reasons.append("PR state unknown")
elif pr_state != "open":
reasons.append(f"PR is not open (state={pr_state})")
# Self-author must not approve or merge their own PR.
if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"):
reasons.append("authenticated user is PR author")
# Merge needs a positive mergeability signal.
if action == "merge":
if result["mergeable"] is False:
reasons.append("PR is not mergeable")
elif result["mergeable"] is None:
reasons.append("PR mergeability unknown")
result["eligible"] = len(reasons) == 0
if result["eligible"]:
reasons.append("all eligibility checks passed")
return result
# Review actions this gated tool can perform, mapped to (eligibility action,
# Gitea review *event*). The eligibility action is fed to
# ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same
# identity/profile/author gates. Note: 'merge' is deliberately absent — merge
# belongs to a separate tool/issue and is never performed here.
_REVIEW_ACTIONS = {
# 'comment' posts review findings without an approval/rejection state.
# #14 names this eligibility category 'review'.
"comment": ("review", "COMMENT"),
"approve": ("approve", "APPROVE"),
"request_changes": ("request_changes", "REQUEST_CHANGES"),
}
# Patterns scrubbed from any surfaced error text so a credential can never leak.
_SECRET_PREFIXES = ("token ", "Basic ")
def _redact(text: str) -> str:
"""Strip anything that looks like an Authorization credential from *text*.
Errors raised by ``api_request`` echo the server response body, not the
request headers, so a token should never appear — this is defence in depth
so a future change can't leak ``token …`` / ``Basic …`` material into a
tool result or log line.
"""
if not text:
return text
out = text
for prefix in _SECRET_PREFIXES:
idx = 0
while True:
i = out.find(prefix, idx)
if i == -1:
break
j = i + len(prefix)
while j < len(out) and not out[j].isspace():
j += 1
out = out[:i] + prefix + "[REDACTED]" + out[j:]
idx = i + len(prefix) + len("[REDACTED]")
return out
@mcp.tool()
def gitea_submit_pr_review(
pr_number: int,
action: str,
body: str = "",
expected_head_sha: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Gated PR review mutation: comment findings, request changes, or approve.
This is the only tool that submits a Gitea PR *review*. It performs a
mutation **only after every safety gate passes**; if any gate fails it
returns ``performed=False`` and never calls the mutating endpoint.
Gate order (fail-closed at each step):
1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'.
2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated
-user lookup, active-profile lookup, PR-author lookup, self-approval
block, and profile-allowed-operation check. ``approve`` requires
eligibility for 'approve', ``request_changes`` requires
'request_changes', and ``comment`` requires 'review'.
3. Redundantly block self-approval (authenticated user == PR author).
4. If ``expected_head_sha`` is supplied and the PR head has moved, abort.
5. Only then POST the review.
Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the
*formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES
review state tied to the head commit), chosen over the plain issue-comment
endpoint (``/issues/{n}/comments``) so that approvals and change requests
carry real review state — a plain comment cannot approve or block a PR.
Merge is intentionally NOT implemented here.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: Target PR number.
action: 'comment', 'approve', or 'request_changes'.
body: Review body / finding text.
expected_head_sha: Optional. If given and the PR head SHA differs, the
review is refused (guards against reviewing a changed PR).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict describing the attempt: action, whether it was performed, the
authenticated user, profile name, PR author, PR number, head SHA
checked, and the reasons/gates passed or blocked. Never secrets.
"""
action = (action or "").strip().lower()
result = {
"requested_action": action,
"performed": False,
"authenticated_user": None,
"profile_name": get_profile()["profile_name"],
"pr_author": None,
"pr_number": pr_number,
"head_sha": None,
"expected_head_sha": expected_head_sha,
"remote": remote if remote in REMOTES else None,
"reasons": [],
}
reasons = result["reasons"]
# Gate 1 — valid review action (no mutation on unknown action).
if action not in _REVIEW_ACTIONS:
reasons.append(
f"unknown review action '{action}'; expected one of "
f"{sorted(_REVIEW_ACTIONS)}"
)
return result
eligibility_action, event = _REVIEW_ACTIONS[action]
# Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve
# + profile-allowed). This performs only read-only GETs.
elig = gitea_check_pr_eligibility(
pr_number=pr_number,
action=eligibility_action,
remote=remote,
host=host,
org=org,
repo=repo,
)
result["authenticated_user"] = elig.get("authenticated_user")
result["profile_name"] = elig.get("profile_name", result["profile_name"])
result["pr_author"] = elig.get("pr_author")
result["head_sha"] = elig.get("head_sha")
if not elig.get("eligible"):
reasons.append(
f"eligibility check for '{eligibility_action}' failed (fail closed)"
)
reasons.extend(elig.get("reasons", []))
return result
# Gate 3 — redundant self-approval block (belt-and-suspenders over #14).
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
if action == "approve" and auth_user and pr_author and auth_user == pr_author:
reasons.append("self-approval blocked (authenticated user is PR author)")
return result
# Gate 4 — head SHA must match if the caller pinned one.
actual_sha = result["head_sha"]
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
reasons.append(
"expected head SHA does not match current PR head (fail closed)"
)
return result
if not actual_sha:
# Should be unreachable — eligibility fails closed without a head SHA —
# but never submit a review without a commit to pin it to.
reasons.append("PR head SHA unavailable (fail closed)")
return result
# All gates passed — perform the single mutating call.
h, o, r = _resolve(remote, host, org, repo)
try:
auth = _auth(h)
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
payload = {"body": body, "event": event, "commit_id": actual_sha}
api_request("POST", review_url, auth, payload)
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(f"review submission failed: {_redact(str(exc))}")
return result
result["performed"] = True
reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}")
return result
@mcp.tool()
def gitea_edit_pr(
pr_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
base: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Edit an existing pull request on a Gitea repository.
Args:
pr_number: The pull request index/number (required).
title: New PR title.
body: New PR description.
state: New state — 'open' or 'closed'.
base: Target branch name.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and details of the edited PR.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
payload["state"] = state
if base is not None:
payload["base"] = base
if not payload:
raise ValueError("At least one field to edit (title, body, state, base) must be provided.")
data = api_request("PATCH", url, auth, payload)
return {
"success": True,
"number": data["number"],
"title": data["title"],
"body": data.get("body", ""),
"state": data["state"],
"url": data["html_url"],
}
@mcp.tool()
def gitea_get_file(
filepath: str,
ref: str = "main",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Retrieve metadata and content of a file from a Gitea repository.
Args:
filepath: The path to the file in the repository (e.g. 'README.md' or 'src/main.py').
ref: The branch, tag, or commit hash to retrieve the file from (default: 'main').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict containing 'name', 'path', 'sha', 'size', 'encoding', and 'content' (base64).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
import urllib.parse
encoded_path = urllib.parse.quote(filepath, safe="")
url = f"{repo_api_url(h, o, r)}/contents/{encoded_path}?ref={ref}"
data = api_request("GET", url, auth)
return {
"name": data.get("name", ""),
"path": data.get("path", ""),
"sha": data.get("sha", ""),
"size": data.get("size", 0),
"encoding": data.get("encoding", ""),
"content": data.get("content", ""),
}
@mcp.tool()
def gitea_commit_files(
files: list[dict],
message: str,
branch: str | None = None,
new_branch: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Commit changes to multiple files in a Gitea repository in a single atomic commit.
Args:
files: List of file operations. Each file dict must contain 'operation' ('create', 'update', 'delete', 'rename'), 'path', and 'content' (base64 encoded for create/update), and optionally 'sha' (required for update/delete) or 'from_path' (for rename).
message: The commit message.
branch: Optional existing branch to start/commit from.
new_branch: Optional new branch name to create for this commit.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and commit/branch information.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/contents"
payload = {
"files": files,
"message": message,
}
if branch is not None:
payload["branch"] = branch
if new_branch is not None:
payload["new_branch"] = new_branch
data = api_request("POST", url, auth, payload)
return {
"success": True,
"commit": data.get("commit", {}).get("sha", ""),
"branch": data.get("branch", {}).get("name", ""),
}
@mcp.tool()
def gitea_merge_pr(
pr_number: int,
do: str = "merge",
title: str | None = None,
message: str | None = None,
force: bool = False,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Merge a Gitea pull request.
Args:
pr_number: The PR number to merge.
do: Merge style — 'merge', 'squash', or 'rebase'.
title: Optional merge title.
message: Optional merge message.
force: Force merge, ignoring status checks.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge"
payload = {
"Do": do,
"force_merge": force,
}
if title:
payload["MergeTitleField"] = title
if message:
payload["MergeMessageField"] = message
api_request("POST", url, auth, payload)
return {"success": True, "message": f"PR #{pr_number} merged via '{do}'."}
@mcp.tool()
def gitea_review_pr(
pr_number: int,
event: str = "APPROVE",
body: str = "",
merge: bool = False,
merge_method: str = "merge",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Submit a review on a Gitea pull request and optionally merge it.
Args:
pr_number: The PR number to review.
event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'.
body: Review body text / comment.
merge: If True and event is 'APPROVE', automatically merge the PR.
merge_method: Merge style to use if merging — 'merge', 'squash', or 'rebase'.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and message.
"""
if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]:
raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.")
if merge_method not in ["merge", "squash", "rebase"]:
raise ValueError(f"Invalid merge method: '{merge_method}'. Choose from 'merge', 'squash', 'rebase'.")
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
# 1. Fetch PR to get the latest head commit SHA (required for review payload)
pr_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
pr_data = api_request("GET", pr_url, auth)
commit_sha = pr_data.get("head", {}).get("sha")
if not commit_sha:
raise RuntimeError(f"Could not find head commit SHA for PR #{pr_number}.")
# 2. Submit the PR review
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
payload = {
"body": body,
"event": event,
"commit_id": commit_sha
}
api_request("POST", review_url, auth, payload)
msg = f"Successfully submitted review for PR #{pr_number} with event '{event}'."
# 3. Merge PR if merge is True and event is APPROVE
if merge:
if event != "APPROVE":
msg += " Warning: Skipping merge because review event is not 'APPROVE'."
else:
merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge"
merge_payload = {
"Do": merge_method,
"force_merge": False
}
api_request("POST", merge_url, auth, merge_payload)
msg += f" Successfully merged PR #{pr_number} using '{merge_method}' method."
return {"success": True, "message": msg}
@mcp.tool()
def gitea_delete_branch(
branch: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Delete a remote branch from a Gitea repository.
Args:
branch: The remote branch name (e.g. 'feat/branch-name').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
import urllib.parse
encoded_branch = urllib.parse.quote(branch, safe="")
url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}"
api_request("DELETE", url, auth)
return {"success": True, "message": f"Remote branch '{branch}' deleted."}
@mcp.tool()
def gitea_close_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Close an issue by setting its state to 'closed'.
Args:
issue_number: The issue number to close.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
api_request("PATCH", url, auth, {"state": "closed"})
return {"success": True, "message": f"Issue #{issue_number} closed."}
@mcp.tool()
def gitea_list_issues(
state: str = "open",
label: str | None = None,
limit: int = 50,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List issues on a Gitea repository with optional filters.
Args:
state: Filter by state — 'open', 'closed', or 'all'.
label: Filter by label name (e.g. 'important').
limit: Max number of issues to return (default: 50).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'labels', 'assignee'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
params = f"state={state}&limit={limit}&type=issues"
if label:
params += f"&labels={label}"
url = f"{repo_api_url(h, o, r)}/issues?{params}"
issues = api_request("GET", url, auth)
return [
{
"number": i["number"],
"title": i["title"],
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
}
for i in issues
]
@mcp.tool()
def gitea_view_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get full details of a single issue.
Args:
issue_number: The issue number to view.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
i = api_request("GET", url, auth)
return {
"number": i["number"],
"title": i["title"],
"body": i.get("body", ""),
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
"url": i["html_url"],
}
@mcp.tool()
def gitea_whoami(
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Look up the Gitea account the MCP server is authenticated as.
Read-only. Calls Gitea's authenticated-user endpoint (GET /api/v1/user)
with the configured token and returns safe identity metadata only. Use
this to prove which account a mutating workflow (e.g. review/merge) would
act as, so self-review/self-merge can be detected before acting.
Never returns the token, Authorization header, password, or any other
secret material. Fails closed with a clear error if the identity cannot
be determined.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
Returns:
dict with 'authenticated', 'username', 'display_name', 'user_id',
'email', 'server', 'remote', and 'profile' (safe runtime profile
metadata: profile_name + allowed_operations; never the token).
"""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
h = host or REMOTES[remote]["host"]
auth = _auth(h)
url = f"https://{h}/api/v1/user"
data = api_request("GET", url, auth)
if not data or not data.get("login"):
# Fail closed: never assume an identity we could not verify.
raise RuntimeError(
f"Could not determine the authenticated Gitea identity for {h}. "
"Verify the configured token is valid for this instance."
)
# Runtime profile metadata is non-secret (name + allowed op categories).
# The token is resolved separately and is never included here.
profile = get_profile()
return {
"authenticated": True,
"username": data.get("login"),
"display_name": data.get("full_name") or None,
"user_id": data.get("id"),
"email": data.get("email") or None,
"server": f"https://{h}",
"remote": remote,
"profile": {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
},
}
@mcp.tool()
def gitea_get_profile(
remote: str = "dadeschools",
host: str | None = None,
resolve_identity: bool = True,
) -> dict:
"""Describe the active Gitea MCP execution profile for this runtime.
Read-only. Reports the non-secret configuration of the running MCP
process (profile name, allowed/forbidden operation categories, audit
label, token *source name*, base URL) plus the resolved server for the
given remote. Optionally resolves the authenticated username via
``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as.
This tool never mutates Gitea and never approves, merges, comments, or
creates anything. It never returns the token value, Authorization header,
password, raw environment, or credential file paths. Identity resolution
fails soft: if it cannot be determined, ``authenticated_username`` is null
and ``identity_status`` marks it, but the profile config is still returned.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
resolve_identity: If True, attempt a read-only identity lookup.
Returns:
dict of safe profile metadata. ``identity_status`` is one of
'verified', 'unknown', 'unavailable', or 'not_resolved'.
"""
profile = get_profile()
result = {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"],
"audit_label": profile["audit_label"],
"token_source_name": profile["token_source_name"],
"base_url": profile["base_url"],
"remote": remote if remote in REMOTES else None,
"server": None,
"authenticated_username": None,
"identity_status": "not_resolved",
}
if remote not in REMOTES:
# Mark ambiguity rather than raising: the tool stays inspectable.
result["identity_status"] = "unknown"
result["remote_error"] = f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}"
return result
h = host or REMOTES[remote]["host"]
result["server"] = f"https://{h}"
if resolve_identity:
try:
auth = _auth(h)
data = api_request("GET", f"https://{h}/api/v1/user", auth)
login = (data or {}).get("login")
if login:
result["authenticated_username"] = login
result["identity_status"] = "verified"
else:
result["identity_status"] = "unknown"
except Exception:
# Fail soft for the identity field only. Never surface the error
# detail or any credential material — just mark it unavailable.
result["identity_status"] = "unavailable"
return result
@mcp.tool()
def gitea_mark_issue(
issue_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Claim or release an issue via the status:in-progress label.
This is the cross-agent lock mechanism. Check before starting work.
Args:
issue_number: The issue number.
action: 'start' to claim (add label) or 'done' to release (remove label).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
if action not in ("start", "done"):
raise ValueError(f"action must be 'start' or 'done', got '{action}'")
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# Find the status:in-progress label id
labels = api_request("GET", f"{base}/labels?limit=100", auth)
label_id = None
for lb in labels:
if lb["name"] == "status:in-progress":
label_id = lb["id"]
break
if label_id is None:
raise RuntimeError(
"Label 'status:in-progress' not found. "
"Run manage_labels.py to create it first."
)
if action == "start":
api_request("POST", f"{base}/issues/{issue_number}/labels", auth,
{"labels": [label_id]})
return {"success": True, "message": f"Issue #{issue_number} claimed."}
else:
api_request("DELETE",
f"{base}/issues/{issue_number}/labels/{label_id}", auth)
return {"success": True, "message": f"Issue #{issue_number} released."}
@mcp.tool()
def gitea_list_labels(
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list:
"""List all available labels in a Gitea repository.
Args:
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
list of labels.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
return api_request("GET", f"{base}/labels?limit=100", auth)
@mcp.tool()
def gitea_create_label(
name: str,
color: str,
description: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new label on a Gitea repository.
Args:
name: Name of the label (e.g. 'bug', 'epic').
color: HTML color code (hex, e.g. 'fbca04' or '#fbca04').
description: Description of the label.
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict containing the created label details.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
if color.startswith("#"):
color = color[1:]
payload = {
"name": name,
"color": color,
"description": description,
}
return api_request("POST", f"{base}/labels", auth, payload)
@mcp.tool()
def gitea_set_issue_labels(
issue_number: int,
labels: list[str],
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list:
"""Replace all labels on a Gitea issue with a new list of label names.
Args:
issue_number: The issue number.
labels: List of label names to apply.
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
list of all labels currently applied to the issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# 1. Fetch existing labels on the repo to resolve names -> IDs
existing = api_request("GET", f"{base}/labels?limit=100", auth)
name_to_id = {lb["name"]: lb["id"] for lb in existing}
# 2. Check if any requested labels do not exist, and raise error
label_ids = []
missing_labels = []
for name in labels:
if name in name_to_id:
label_ids.append(name_to_id[name])
else:
missing_labels.append(name)
if missing_labels:
raise RuntimeError(
f"The following labels do not exist on the repository: {missing_labels}. "
"Please create them first using gitea_create_label."
)
# 3. PUT the labels to the issue
res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids})
return res
@mcp.tool()
def gitea_mirror_refs(
apply: bool = False,
force: bool = False,
) -> dict:
"""Mirror branches and tags between dadeschools and prgs Timesheet repos.
Additive only — never deletes branches or tags. Diverged branches are
skipped unless force is True.
Args:
apply: If True, actually push. If False (default), dry-run only.
force: If True, force-push diverged branches.
Returns:
dict with 'output' (script stdout) and 'return_code'.
"""
script = os.path.join(PROJECT_ROOT, "mirror_refs.sh")
args = [script]
if apply:
args.append("--apply")
if force:
args.append("--force")
result = subprocess.run(
args, capture_output=True, text=True, timeout=120,
)
return {
"output": result.stdout + result.stderr,
"return_code": result.returncode,
}
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")