Files
Gitea-Tools/mcp_server.py
T

1672 lines
60 KiB
Python

#!/usr/bin/env python3
"""Gitea MCP Server — exposes Gitea operations as MCP tools.
Runs over stdio. All tools authenticate via macOS keychain (git credential fill).
Usage (standalone test):
python3 mcp_server.py
Configuration (mcp_config.json):
"gitea-tools": {
"command": "/Users/jasonwalker/Development/Gitea-Tools/venv/bin/python3",
"args": ["/Users/jasonwalker/Development/Gitea-Tools/mcp_server.py"],
"env": {}
}
"""
import os
import re
import sys
import functools
import contextlib
import subprocess
# Resolve the project root. MCP clients must launch this script directly with
# the venv interpreter (venv/bin/python3) — see the config example above. We do
# NOT os.execv() to re-point the interpreter: replacing the process after the
# client has already wired up the stdio pipes can desync the JSON-RPC transport
# (observed with Antigravity/Cascade hosts).
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
# Ensure the project root is on the path so gitea_auth.py can be imported.
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from mcp.server.fastmcp import FastMCP # noqa: E402
from gitea_auth import ( # noqa: E402
REMOTES,
get_credentials,
get_auth_header,
api_request,
repo_api_url,
get_profile,
)
import gitea_audit # noqa: E402
mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
"Use the gitea_ prefixed tools to create issues, PRs, list issues, etc."
))
def extract_linked_issue_numbers(text: str | None, branch_name: str | None = None) -> list[int]:
issues = set()
if text:
pattern = re.compile(r'(?i)(?:close[sd]?|fix(?:e[sd])?|resolve[sd]?|ref[s]?)\s+#(\d+)')
issues.update(int(m) for m in pattern.findall(text))
if branch_name:
pattern = re.compile(r'(?i)issue-(\d+)')
issues.update(int(m) for m in pattern.findall(branch_name))
return sorted(list(issues))
def release_in_progress_label(issue_numbers: list[int], remote: str, host: str | None, org: str | None, repo: str | None) -> dict:
if not issue_numbers:
return {}
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
try:
labels = api_request("GET", f"{base}/labels?limit=100", auth)
label_id = None
for lb in labels:
if lb["name"] == "status:in-progress":
label_id = lb["id"]
break
except Exception as exc:
return {num: f"error fetching repo labels: {_redact(str(exc))}" for num in issue_numbers}
results = {}
if label_id is None:
for num in issue_numbers:
results[num] = "not present"
return results
for num in issue_numbers:
try:
url = f"{base}/issues/{num}"
issue_data = api_request("GET", url, auth)
issue_labels = [lb["name"] for lb in issue_data.get("labels", [])]
if "status:in-progress" in issue_labels:
with _audited("release_in_progress_label", host=h, remote=remote, org=o, repo=r, issue_number=num, request_metadata={"action": "remove status:in-progress"}):
api_request("DELETE", f"{url}/labels/{label_id}", auth)
results[num] = "released"
else:
results[num] = "not present"
except Exception as exc:
results[num] = f"error: {_redact(str(exc))}"
return results
def cleanup_in_progress_for_pr(pr_payload: dict, remote: str, host: str | None, org: str | None, repo: str | None) -> dict:
body = pr_payload.get("body") or ""
title = pr_payload.get("title") or ""
branch = pr_payload.get("head", {}).get("ref") or ""
text = f"{title}\n{body}"
issues = extract_linked_issue_numbers(text, branch)
if not issues:
return {"cleanup_status": "no linked issue found"}
results = release_in_progress_label(issues, remote, host, org, repo)
return {"cleanup_status": results}
# ── Helpers ───────────────────────────────────────────────────────────────────
def _resolve(remote: str, host: str | None, org: str | None, repo: str | None):
"""Resolve remote + overrides to (host, org, repo)."""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
profile = REMOTES[remote]
return (
host or profile["host"],
org or profile["org"],
repo or profile["repo"],
)
def _auth(host: str) -> str:
"""Get auth header, raise if unavailable."""
header = get_auth_header(host)
if header is None:
raise RuntimeError(
f"No credentials for {host}. "
"Ensure you've logged in via HTTPS at least once."
)
return header
# ── Audit logging (#18) ─────────────────────────────────────────────────────────
# Mutating actions emit a structured audit record (profile + authenticated user
# + outcome) when GITEA_AUDIT_LOG is configured. When it is not, every helper
# below short-circuits and performs NO work — no extra API calls, no I/O — so
# existing tool behaviour and API call sequences are unchanged.
_UNSET = object()
# Best-effort identity cache keyed by host, so an enabled audit trail resolves
# the authenticated username at most once per host per process.
_IDENTITY_CACHE: dict = {}
def _authenticated_username(host: str):
"""Resolve the authenticated Gitea username for *host* (cached, fail-soft).
Read-only. Returns None if the identity cannot be determined; never raises
and never surfaces credential material.
"""
if host in _IDENTITY_CACHE:
return _IDENTITY_CACHE[host]
user = None
try:
header = get_auth_header(host)
if header:
who = api_request("GET", f"https://{host}/api/v1/user", header)
user = (who or {}).get("login")
except Exception:
user = None
_IDENTITY_CACHE[host] = user
return user
def _audit(action: str, *, host, remote, result, org=None, repo=None,
reason=None, request_metadata=None, issue_number=None,
pr_number=None, target_branch=None, head_sha=None, username=_UNSET):
"""Emit one audit record for a mutating action. No-op unless auditing is on.
Never raises — auditing must not break the action it records.
"""
if not gitea_audit.audit_enabled():
return
try:
profile = get_profile()
if username is _UNSET:
username = _authenticated_username(host) if host else None
event = gitea_audit.build_event(
action=action,
result=result,
remote=remote,
server=(f"https://{host}" if host else None),
repository=repo,
issue_number=issue_number,
pr_number=pr_number,
profile_name=profile["profile_name"],
audit_label=profile["audit_label"],
authenticated_username=username,
target_branch=target_branch,
head_sha=head_sha,
reason=reason,
request_metadata=request_metadata,
)
gitea_audit.write_event(event)
except Exception:
pass # best-effort; never break the action
@contextlib.contextmanager
def _audited(action: str, *, host, remote, org=None, repo=None,
request_metadata=None, issue_number=None, pr_number=None,
target_branch=None):
"""Wrap a mutating API call: audit SUCCEEDED on return, FAILED on exception.
When auditing is off this yields immediately with no bookkeeping.
"""
if not gitea_audit.audit_enabled():
yield
return
try:
yield
except Exception as exc:
_audit(action, host=host, remote=remote, org=org, repo=repo,
result=gitea_audit.FAILED, reason=_redact(str(exc)),
request_metadata=request_metadata, issue_number=issue_number,
pr_number=pr_number, target_branch=target_branch)
raise
_audit(action, host=host, remote=remote, org=org, repo=repo,
result=gitea_audit.SUCCEEDED, request_metadata=request_metadata,
issue_number=issue_number, pr_number=pr_number,
target_branch=target_branch)
def _audit_pr_result(action: str):
"""Decorator for gated PR tools that return a result dict.
Reads the tool's own result dict (authenticated_user, profile, reasons,
performed) to emit an audit record classifying the outcome as SUCCEEDED,
BLOCKED, or FAILED. No extra API calls: identity comes from the result.
"""
def decorate(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
try:
if isinstance(result, dict) and gitea_audit.audit_enabled():
reasons = [str(x) for x in (result.get("reasons") or [])]
if result.get("performed"):
status = gitea_audit.SUCCEEDED
elif any("failed:" in x.lower() for x in reasons):
# "failed:" marks a surfaced exception (e.g. "merge
# failed: <err>"); a bare gate message like "… failed
# (fail closed)" is a policy block, not an execution error.
status = gitea_audit.FAILED
else:
status = gitea_audit.BLOCKED
remote = result.get("remote")
host = REMOTES[remote]["host"] if remote in REMOTES else None
_audit(
action,
host=host,
remote=remote,
result=status,
reason="; ".join(reasons) or None,
pr_number=result.get("pr_number"),
head_sha=result.get("head_sha"),
username=result.get("authenticated_user"),
request_metadata={
"requested_action": result.get("requested_action"),
"merge_method": result.get("merge_method"),
},
)
except Exception:
pass # best-effort; never break the tool
return result
return wrapper
return decorate
# ── Tools ─────────────────────────────────────────────────────────────────────
@mcp.tool()
def gitea_create_issue(
title: str,
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new issue on a Gitea repository.
Args:
title: Issue title (required).
body: Issue body text.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues"
try:
data = api_request("POST", url, auth, {"title": title, "body": body})
except Exception as exc:
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.FAILED, reason=_redact(str(exc)),
request_metadata={"title": title})
raise
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.SUCCEEDED, issue_number=data["number"],
request_metadata={"title": title})
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_create_pr(
title: str,
head: str,
base: str = "main",
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a pull request on a Gitea repository.
Args:
title: PR title (required).
head: Source branch name (required).
base: Target branch (default: 'main').
body: PR description.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created PR.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base}
meta = {"title": title, "head": head, "base": base}
try:
data = api_request("POST", url, auth, payload)
except Exception as exc:
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.FAILED, reason=_redact(str(exc)),
target_branch=head, request_metadata=meta)
raise
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.SUCCEEDED, pr_number=data["number"],
target_branch=head, request_metadata=meta)
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_list_prs(
state: str = "open",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List pull requests on a Gitea repository.
Args:
state: State filter — 'open', 'closed', or 'all' (default: 'open').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'head', 'base', 'url', 'mergeable'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls?state={state}"
prs = api_request("GET", url, auth) or []
return [
{
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
}
for pr in prs
]
@mcp.tool()
def gitea_view_pr(
pr_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get details of a single pull request.
Args:
pr_number: The pull request index/number.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with PR details.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
pr = api_request("GET", url, auth)
return {
"number": pr["number"],
"title": pr["title"],
"body": pr.get("body", ""),
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
"user": pr.get("user", {}).get("login", ""),
}
# Actions whose eligibility this tool can evaluate.
_ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge")
@mcp.tool()
def gitea_check_pr_eligibility(
pr_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Read-only: is the current identity/profile eligible to perform *action*
on a PR?
Evaluates eligibility only — it NEVER reviews, approves, requests changes,
merges, or mutates anything. It inspects the authenticated identity
(via the /user endpoint), the active runtime profile metadata
(``get_profile``), and the target PR (author, state, head SHA,
mergeability), then returns a decision with clear reasons.
Fail-closed rules:
- Unknown action or unknown remote → not eligible.
- Profile has no configured allowed operations, or the action is not in
the profile's allowed operations (or is forbidden) → not eligible.
- Authenticated identity cannot be determined → not eligible.
- Authenticated user equals the PR author → not eligible to ``approve`` or
``merge``.
- PR is not open → not eligible.
- For ``merge``, PR must be reported mergeable.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: Target PR number.
action: One of 'review', 'approve', 'request_changes', 'merge'.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'eligible' (bool), the inputs inspected, and 'reasons'.
"""
action = (action or "").strip().lower()
profile = get_profile()
result = {
"eligible": False,
"requested_action": action,
"authenticated_user": None,
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"pr_author": None,
"pr_number": pr_number,
"pr_state": None,
"head_sha": None,
"mergeable": None,
"remote": remote if remote in REMOTES else None,
"reasons": [],
}
reasons = result["reasons"]
if action not in _ELIGIBILITY_ACTIONS:
reasons.append(
f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}"
)
return result
if remote not in REMOTES:
reasons.append(f"unknown remote '{remote}'")
return result
# Profile capability check (metadata only; not enforcement of the action).
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
if not allowed:
reasons.append("profile has no configured allowed operations (fail closed)")
if action in forbidden:
reasons.append(f"profile forbids '{action}'")
elif action not in allowed:
reasons.append(f"profile is not allowed to {action}")
h, o, r = _resolve(remote, host, org, repo)
# Authenticated identity (read-only). Fail soft; never leak error/secret.
try:
auth = _auth(h)
except Exception:
auth = None
auth_user = None
if auth:
try:
who = api_request("GET", f"https://{h}/api/v1/user", auth)
auth_user = (who or {}).get("login")
except Exception:
auth_user = None
result["authenticated_user"] = auth_user
if not auth_user:
reasons.append("authenticated identity could not be determined")
# PR facts (read-only GET; no mutation).
pr_author = None
pr_state = None
if auth:
try:
pr = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth
)
pr_author = (pr or {}).get("user", {}).get("login")
pr_state = (pr or {}).get("state")
result["head_sha"] = ((pr or {}).get("head") or {}).get("sha")
result["mergeable"] = (pr or {}).get("mergeable")
except Exception:
reasons.append("PR details could not be retrieved")
else:
reasons.append("PR details could not be retrieved (no credentials)")
result["pr_author"] = pr_author
result["pr_state"] = pr_state
# PR must be open to act on.
if pr_state is None:
reasons.append("PR state unknown")
elif pr_state != "open":
reasons.append(f"PR is not open (state={pr_state})")
# Self-author must not approve or merge their own PR.
if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"):
reasons.append("authenticated user is PR author")
# Merge needs a positive mergeability signal.
if action == "merge":
if result["mergeable"] is False:
reasons.append("PR is not mergeable")
elif result["mergeable"] is None:
reasons.append("PR mergeability unknown")
result["eligible"] = len(reasons) == 0
if result["eligible"]:
reasons.append("all eligibility checks passed")
return result
# Review actions this gated tool can perform, mapped to (eligibility action,
# Gitea review *event*). The eligibility action is fed to
# ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same
# identity/profile/author gates. Note: 'merge' is deliberately absent — merge
# belongs to a separate tool/issue and is never performed here.
_REVIEW_ACTIONS = {
# 'comment' posts review findings without an approval/rejection state.
# #14 names this eligibility category 'review'.
"comment": ("review", "COMMENT"),
"approve": ("approve", "APPROVE"),
"request_changes": ("request_changes", "REQUEST_CHANGES"),
}
# Patterns scrubbed from any surfaced error text so a credential can never leak.
_SECRET_PREFIXES = ("token ", "Basic ")
def _redact(text: str) -> str:
"""Strip anything that looks like an Authorization credential from *text*.
Errors raised by ``api_request`` echo the server response body, not the
request headers, so a token should never appear — this is defence in depth
so a future change can't leak ``token …`` / ``Basic …`` material into a
tool result or log line.
"""
if not text:
return text
out = text
for prefix in _SECRET_PREFIXES:
idx = 0
while True:
i = out.find(prefix, idx)
if i == -1:
break
j = i + len(prefix)
while j < len(out) and not out[j].isspace():
j += 1
out = out[:i] + prefix + "[REDACTED]" + out[j:]
idx = i + len(prefix) + len("[REDACTED]")
return out
@mcp.tool()
@_audit_pr_result("submit_pr_review")
def gitea_submit_pr_review(
pr_number: int,
action: str,
body: str = "",
expected_head_sha: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Gated PR review mutation: comment findings, request changes, or approve.
This is the only tool that submits a Gitea PR *review*. It performs a
mutation **only after every safety gate passes**; if any gate fails it
returns ``performed=False`` and never calls the mutating endpoint.
Gate order (fail-closed at each step):
1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'.
2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated
-user lookup, active-profile lookup, PR-author lookup, self-approval
block, and profile-allowed-operation check. ``approve`` requires
eligibility for 'approve', ``request_changes`` requires
'request_changes', and ``comment`` requires 'review'.
3. Redundantly block self-approval (authenticated user == PR author).
4. If ``expected_head_sha`` is supplied and the PR head has moved, abort.
5. Only then POST the review.
Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the
*formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES
review state tied to the head commit), chosen over the plain issue-comment
endpoint (``/issues/{n}/comments``) so that approvals and change requests
carry real review state — a plain comment cannot approve or block a PR.
Merge is intentionally NOT implemented here.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: Target PR number.
action: 'comment', 'approve', or 'request_changes'.
body: Review body / finding text.
expected_head_sha: Optional. If given and the PR head SHA differs, the
review is refused (guards against reviewing a changed PR).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict describing the attempt: action, whether it was performed, the
authenticated user, profile name, PR author, PR number, head SHA
checked, and the reasons/gates passed or blocked. Never secrets.
"""
action = (action or "").strip().lower()
result = {
"requested_action": action,
"performed": False,
"authenticated_user": None,
"profile_name": get_profile()["profile_name"],
"pr_author": None,
"pr_number": pr_number,
"head_sha": None,
"expected_head_sha": expected_head_sha,
"remote": remote if remote in REMOTES else None,
"reasons": [],
}
reasons = result["reasons"]
# Gate 1 — valid review action (no mutation on unknown action).
if action not in _REVIEW_ACTIONS:
reasons.append(
f"unknown review action '{action}'; expected one of "
f"{sorted(_REVIEW_ACTIONS)}"
)
return result
eligibility_action, event = _REVIEW_ACTIONS[action]
# Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve
# + profile-allowed). This performs only read-only GETs.
elig = gitea_check_pr_eligibility(
pr_number=pr_number,
action=eligibility_action,
remote=remote,
host=host,
org=org,
repo=repo,
)
result["authenticated_user"] = elig.get("authenticated_user")
result["profile_name"] = elig.get("profile_name", result["profile_name"])
result["pr_author"] = elig.get("pr_author")
result["head_sha"] = elig.get("head_sha")
if not elig.get("eligible"):
reasons.append(
f"eligibility check for '{eligibility_action}' failed (fail closed)"
)
reasons.extend(elig.get("reasons", []))
return result
# Gate 3 — redundant self-approval block (belt-and-suspenders over #14).
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
if action == "approve" and auth_user and pr_author and auth_user == pr_author:
reasons.append("self-approval blocked (authenticated user is PR author)")
return result
# Gate 4 — head SHA must match if the caller pinned one.
actual_sha = result["head_sha"]
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
reasons.append(
"expected head SHA does not match current PR head (fail closed)"
)
return result
if not actual_sha:
# Should be unreachable — eligibility fails closed without a head SHA —
# but never submit a review without a commit to pin it to.
reasons.append("PR head SHA unavailable (fail closed)")
return result
# All gates passed — perform the single mutating call.
h, o, r = _resolve(remote, host, org, repo)
try:
auth = _auth(h)
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
payload = {"body": body, "event": event, "commit_id": actual_sha}
api_request("POST", review_url, auth, payload)
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(f"review submission failed: {_redact(str(exc))}")
return result
result["performed"] = True
reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}")
return result
@mcp.tool()
def gitea_edit_pr(
pr_number: int,
title: str | None = None,
body: str | None = None,
state: str | None = None,
base: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Edit an existing pull request on a Gitea repository.
Args:
pr_number: The pull request index/number (required).
title: New PR title.
body: New PR description.
state: New state — 'open' or 'closed'.
base: Target branch name.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and details of the edited PR.
"""
# Validate inputs BEFORE any auth/profile resolution or API setup: a
# no-fields call is a pure validation error and must not depend on
# credentials, network, or environment configuration.
payload = {}
if title is not None:
payload["title"] = title
if body is not None:
payload["body"] = body
if state is not None:
payload["state"] = state
if base is not None:
payload["base"] = base
if not payload:
raise ValueError("At least one field to edit (title, body, state, base) must be provided.")
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
with _audited("edit_pr", host=h, remote=remote, org=o, repo=r,
pr_number=pr_number, request_metadata={"fields": sorted(payload)}):
data = api_request("PATCH", url, auth, payload)
cleanup_status = None
if state == "closed":
cleanup = cleanup_in_progress_for_pr(data, remote, host, org, repo)
cleanup_status = cleanup.get("cleanup_status")
if isinstance(cleanup_status, dict):
for issue_num, st in cleanup_status.items():
if st == "released":
try:
comment_url = f"{repo_api_url(h, o, r)}/issues/{issue_num}/comments"
api_request("POST", comment_url, auth, {"body": f"Tracker cleanup: removed `status:in-progress` from this issue because linked PR #{pr_number} was closed."})
except Exception:
pass
return {
"success": True,
"number": data["number"],
"title": data["title"],
"body": data.get("body", ""),
"state": data["state"],
"url": data["html_url"],
"cleanup_status": cleanup_status,
}
@mcp.tool()
def gitea_get_file(
filepath: str,
ref: str = "main",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Retrieve metadata and content of a file from a Gitea repository.
Args:
filepath: The path to the file in the repository (e.g. 'README.md' or 'src/main.py').
ref: The branch, tag, or commit hash to retrieve the file from (default: 'main').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict containing 'name', 'path', 'sha', 'size', 'encoding', and 'content' (base64).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
import urllib.parse
encoded_path = urllib.parse.quote(filepath, safe="")
url = f"{repo_api_url(h, o, r)}/contents/{encoded_path}?ref={ref}"
data = api_request("GET", url, auth)
return {
"name": data.get("name", ""),
"path": data.get("path", ""),
"sha": data.get("sha", ""),
"size": data.get("size", 0),
"encoding": data.get("encoding", ""),
"content": data.get("content", ""),
}
@mcp.tool()
def gitea_commit_files(
files: list[dict],
message: str,
branch: str | None = None,
new_branch: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Commit changes to multiple files in a Gitea repository in a single atomic commit.
Args:
files: List of file operations. Each file dict must contain 'operation' ('create', 'update', 'delete', 'rename'), 'path', and 'content' (base64 encoded for create/update), and optionally 'sha' (required for update/delete) or 'from_path' (for rename).
message: The commit message.
branch: Optional existing branch to start/commit from.
new_branch: Optional new branch name to create for this commit.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and commit/branch information.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/contents"
payload = {
"files": files,
"message": message,
}
if branch is not None:
payload["branch"] = branch
if new_branch is not None:
payload["new_branch"] = new_branch
with _audited("commit_files", host=h, remote=remote, org=o, repo=r,
target_branch=(new_branch or branch),
request_metadata={"message": message,
"paths": [f.get("path") for f in files],
"operations": [f.get("operation") for f in files]}):
data = api_request("POST", url, auth, payload)
return {
"success": True,
"commit": data.get("commit", {}).get("sha", ""),
"branch": data.get("branch", {}).get("name", ""),
}
# Merge methods supported by the Gitea merge API.
_MERGE_METHODS = ("merge", "squash", "rebase")
@mcp.tool()
@_audit_pr_result("merge_pr")
def gitea_merge_pr(
pr_number: int,
confirmation: str = "",
expected_head_sha: str | None = None,
expected_changed_files: list[str] | None = None,
do: str = "merge",
title: str | None = None,
message: str | None = None,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Gated merge of a Gitea pull request (#16).
This is the ONLY merge path this server exposes, and it mutates only after
every safety gate passes. No ungated merge tool remains: legacy
``gitea_review_pr`` fails closed on ``merge=True`` and
``gitea_submit_pr_review`` never merges.
Gate order (fail-closed at each step; the merge API is called only if all
gates pass):
1. Merge method (``do``) is 'merge', 'squash', or 'rebase'.
2. Explicit confirmation: ``confirmation`` must equal ``"MERGE PR <n>"``.
Without it, the tool makes no API calls at all.
3. Reuse ``gitea_check_pr_eligibility`` (#14) with action 'merge': this
proves the authenticated identity, the active profile (and that it
allows merge), the PR author, blocks self-merge, requires the PR to be
open, and fails closed when the PR is not mergeable or mergeability is
unknown.
4. If ``expected_head_sha`` is given and the PR head moved → refuse.
5. If ``expected_changed_files`` is given and the PR's changed file set
differs → refuse.
6. Redundant self-merge block (authenticated user == PR author).
No force / ignore-checks option is exposed. Gitea's own ``mergeable`` signal
(which reflects branch-protection required reviews and status checks) must
be positive, so required approval/check state is honoured, never bypassed.
Never returns the token, Authorization header, or any credential material.
Args:
pr_number: The PR number to merge.
confirmation: Must be exactly ``"MERGE PR <pr_number>"`` or merge is refused.
expected_head_sha: Strongly recommended. If set and the PR head differs, refuse.
expected_changed_files: Optional. If set and the PR's changed file set
differs, refuse.
do: Merge style — 'merge', 'squash', or 'rebase'.
title: Optional merge commit title.
message: Optional merge commit message.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict describing the attempt: performed, authenticated user, profile
name, PR author, PR number, head SHA checked, merge method,
reasons/gates passed or blocked, and merge result / merge commit if
available. Never secrets.
"""
do = (do or "").strip().lower()
result = {
"performed": False,
"authenticated_user": None,
"profile_name": get_profile()["profile_name"],
"pr_author": None,
"pr_number": pr_number,
"head_sha": None,
"expected_head_sha": expected_head_sha,
"merge_method": do,
"mergeable": None,
"remote": remote if remote in REMOTES else None,
"merge_result": None,
"merge_commit": None,
"reasons": [],
}
reasons = result["reasons"]
# Gate 1 — valid merge method (no API call on a bad method).
if do not in _MERGE_METHODS:
reasons.append(
f"unknown merge method '{do}'; expected one of {list(_MERGE_METHODS)}"
)
return result
# Gate 2 — explicit confirmation (fail fast; zero API calls without it).
expected_confirmation = f"MERGE PR {pr_number}"
if (confirmation or "").strip() != expected_confirmation:
reasons.append(
f"explicit confirmation required: pass confirmation='{expected_confirmation}'"
)
return result
# Gate 3 — reuse #14 eligibility (identity + profile + merge-allowed +
# author + self-merge block + open + mergeable/unknown fail-closed).
# Read-only GETs only.
elig = gitea_check_pr_eligibility(
pr_number=pr_number, action="merge", remote=remote,
host=host, org=org, repo=repo,
)
result["authenticated_user"] = elig.get("authenticated_user")
result["profile_name"] = elig.get("profile_name", result["profile_name"])
result["pr_author"] = elig.get("pr_author")
result["head_sha"] = elig.get("head_sha")
result["mergeable"] = elig.get("mergeable")
if not elig.get("eligible"):
reasons.append("eligibility check for 'merge' failed (fail closed)")
reasons.extend(elig.get("reasons", []))
return result
# Gate 4 — head SHA must match if the caller pinned a reviewed SHA.
actual_sha = result["head_sha"]
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
reasons.append(
"expected head SHA does not match current PR head (fail closed)"
)
return result
if not actual_sha:
# Unreachable — eligibility fails closed without a head SHA — but never
# merge a PR whose head commit we could not read.
reasons.append("PR head SHA unavailable (fail closed)")
return result
h, o, r = _resolve(remote, host, org, repo)
# Gate 5 — changed files must match the reviewed set, if provided.
if expected_changed_files is not None:
try:
auth = _auth(h)
files = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}/files", auth
)
actual_files = sorted(
(f or {}).get("filename", "") for f in (files or [])
)
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(
f"could not verify changed files (fail closed): {_redact(str(exc))}"
)
return result
result["changed_files"] = actual_files
if actual_files != sorted(expected_changed_files):
reasons.append(
"PR changed files do not match expected_changed_files (fail closed)"
)
return result
# Gate 6 — redundant self-merge block (belt-and-suspenders over #14).
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
if auth_user and pr_author and auth_user == pr_author:
reasons.append("self-merge blocked (authenticated user is PR author)")
return result
# All gates passed — perform the single merge mutation.
try:
auth = _auth(h)
merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge"
payload = {"Do": do}
if title:
payload["MergeTitleField"] = title
if message:
payload["MergeMessageField"] = message
api_request("POST", merge_url, auth, payload)
# Best-effort read-back of the merge commit SHA (redacted on error).
try:
merged = api_request(
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth
)
result["merge_commit"] = (merged or {}).get("merged_commit_sha")
cleanup = cleanup_in_progress_for_pr(merged or {}, remote, host, org, repo)
result["cleanup_status"] = cleanup.get("cleanup_status")
except Exception:
result["merge_commit"] = None
except Exception as exc: # noqa: BLE001 — redact before surfacing
reasons.append(f"merge failed: {_redact(str(exc))}")
return result
result["performed"] = True
result["merge_result"] = f"PR #{pr_number} merged via '{do}'."
reasons.append(f"all gates passed; merged PR #{pr_number} via '{do}'")
return result
@mcp.tool()
def gitea_review_pr(
pr_number: int,
event: str = "APPROVE",
body: str = "",
merge: bool = False,
merge_method: str = "merge",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Submit a review on a Gitea pull request (Legacy wrapper).
This tool is a compatibility wrapper around the safe `gitea_submit_pr_review`.
It uses the same #14 eligibility gates.
Merging via this tool is no longer supported and will fail closed (see #16).
Args:
pr_number: The PR number to review.
event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'.
body: Review body text / comment.
merge: Merging is disabled; if True, the tool fails closed.
merge_method: Ignored.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with success status and message.
"""
if merge:
return {
"success": False,
"message": "merge=True is no longer supported in this tool (belongs to #16)."
}
if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]:
raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.")
# Map legacy event string to the action expected by gitea_submit_pr_review
event_map = {
"APPROVE": "approve",
"COMMENT": "comment",
"REQUEST_CHANGES": "request_changes"
}
action = event_map[event]
result = gitea_submit_pr_review(
pr_number=pr_number,
action=action,
body=body,
expected_head_sha=None,
remote=remote,
host=host,
org=org,
repo=repo
)
if result.get("performed"):
return {"success": True, "message": f"Successfully submitted review for PR #{pr_number} with event '{event}'."}
else:
reasons = result.get("reasons", [])
return {"success": False, "message": f"Review submission failed eligibility gates: {reasons}"}
@mcp.tool()
def gitea_delete_branch(
branch: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Delete a remote branch from a Gitea repository.
Args:
branch: The remote branch name (e.g. 'feat/branch-name').
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
import urllib.parse
encoded_branch = urllib.parse.quote(branch, safe="")
url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}"
with _audited("delete_branch", host=h, remote=remote, org=o, repo=r,
target_branch=branch, request_metadata={"branch": branch}):
api_request("DELETE", url, auth)
return {"success": True, "message": f"Remote branch '{branch}' deleted."}
@mcp.tool()
def gitea_close_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Close an issue by setting its state to 'closed'.
Args:
issue_number: The issue number to close.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
with _audited("close_issue", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number, request_metadata={"state": "closed"}):
api_request("PATCH", url, auth, {"state": "closed"})
cleanup_result = release_in_progress_label([issue_number], remote, host, org, repo)
return {
"success": True,
"message": f"Issue #{issue_number} closed.",
"cleanup_status": cleanup_result
}
@mcp.tool()
def gitea_list_issues(
state: str = "open",
label: str | None = None,
limit: int = 50,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List issues on a Gitea repository with optional filters.
Args:
state: Filter by state — 'open', 'closed', or 'all'.
label: Filter by label name (e.g. 'important').
limit: Max number of issues to return (default: 50).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'labels', 'assignee'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
params = f"state={state}&limit={limit}&type=issues"
if label:
params += f"&labels={label}"
url = f"{repo_api_url(h, o, r)}/issues?{params}"
issues = api_request("GET", url, auth)
return [
{
"number": i["number"],
"title": i["title"],
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
}
for i in issues
]
@mcp.tool()
def gitea_view_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get full details of a single issue.
Args:
issue_number: The issue number to view.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
i = api_request("GET", url, auth)
return {
"number": i["number"],
"title": i["title"],
"body": i.get("body", ""),
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
"url": i["html_url"],
}
@mcp.tool()
def gitea_whoami(
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Look up the Gitea account the MCP server is authenticated as.
Read-only. Calls Gitea's authenticated-user endpoint (GET /api/v1/user)
with the configured token and returns safe identity metadata only. Use
this to prove which account a mutating workflow (e.g. review/merge) would
act as, so self-review/self-merge can be detected before acting.
Never returns the token, Authorization header, password, or any other
secret material. Fails closed with a clear error if the identity cannot
be determined.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
Returns:
dict with 'authenticated', 'username', 'display_name', 'user_id',
'email', 'server', 'remote', and 'profile' (safe runtime profile
metadata: profile_name + allowed_operations; never the token).
"""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
h = host or REMOTES[remote]["host"]
auth = _auth(h)
url = f"https://{h}/api/v1/user"
data = api_request("GET", url, auth)
if not data or not data.get("login"):
# Fail closed: never assume an identity we could not verify.
raise RuntimeError(
f"Could not determine the authenticated Gitea identity for {h}. "
"Verify the configured token is valid for this instance."
)
# Runtime profile metadata is non-secret (name + allowed op categories).
# The token is resolved separately and is never included here.
profile = get_profile()
return {
"authenticated": True,
"username": data.get("login"),
"display_name": data.get("full_name") or None,
"user_id": data.get("id"),
"email": data.get("email") or None,
"server": f"https://{h}",
"remote": remote,
"profile": {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
},
}
@mcp.tool()
def gitea_get_profile(
remote: str = "dadeschools",
host: str | None = None,
resolve_identity: bool = True,
) -> dict:
"""Describe the active Gitea MCP execution profile for this runtime.
Read-only. Reports the non-secret configuration of the running MCP
process (profile name, allowed/forbidden operation categories, audit
label, token *source name*, base URL) plus the resolved server for the
given remote. Optionally resolves the authenticated username via
``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as.
This tool never mutates Gitea and never approves, merges, comments, or
creates anything. It never returns the token value, Authorization header,
password, raw environment, or credential file paths. Identity resolution
fails soft: if it cannot be determined, ``authenticated_username`` is null
and ``identity_status`` marks it, but the profile config is still returned.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
resolve_identity: If True, attempt a read-only identity lookup.
Returns:
dict of safe profile metadata. ``identity_status`` is one of
'verified', 'unknown', 'unavailable', or 'not_resolved'.
"""
profile = get_profile()
result = {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"],
"audit_label": profile["audit_label"],
"token_source_name": profile["token_source_name"],
"base_url": profile["base_url"],
"remote": remote if remote in REMOTES else None,
"server": None,
"authenticated_username": None,
"identity_status": "not_resolved",
}
if remote not in REMOTES:
# Mark ambiguity rather than raising: the tool stays inspectable.
result["identity_status"] = "unknown"
result["remote_error"] = f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}"
return result
h = host or REMOTES[remote]["host"]
result["server"] = f"https://{h}"
if resolve_identity:
try:
auth = _auth(h)
data = api_request("GET", f"https://{h}/api/v1/user", auth)
login = (data or {}).get("login")
if login:
result["authenticated_username"] = login
result["identity_status"] = "verified"
else:
result["identity_status"] = "unknown"
except Exception:
# Fail soft for the identity field only. Never surface the error
# detail or any credential material — just mark it unavailable.
result["identity_status"] = "unavailable"
return result
@mcp.tool()
def gitea_mark_issue(
issue_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Claim or release an issue via the status:in-progress label.
This is the cross-agent lock mechanism. Check before starting work.
Args:
issue_number: The issue number.
action: 'start' to claim (add label) or 'done' to release (remove label).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
if action not in ("start", "done"):
raise ValueError(f"action must be 'start' or 'done', got '{action}'")
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# Find the status:in-progress label id
labels = api_request("GET", f"{base}/labels?limit=100", auth)
label_id = None
for lb in labels:
if lb["name"] == "status:in-progress":
label_id = lb["id"]
break
if label_id is None:
raise RuntimeError(
"Label 'status:in-progress' not found. "
"Run manage_labels.py to create it first."
)
if action == "start":
with _audited("label_issue", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number,
request_metadata={"op": "add", "label": "status:in-progress"}):
api_request("POST", f"{base}/issues/{issue_number}/labels", auth,
{"labels": [label_id]})
return {"success": True, "message": f"Issue #{issue_number} claimed."}
else:
with _audited("unlabel_issue", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number,
request_metadata={"op": "remove", "label": "status:in-progress"}):
api_request("DELETE",
f"{base}/issues/{issue_number}/labels/{label_id}", auth)
return {"success": True, "message": f"Issue #{issue_number} released."}
@mcp.tool()
def gitea_list_labels(
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list:
"""List all available labels in a Gitea repository.
Args:
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
list of labels.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
return api_request("GET", f"{base}/labels?limit=100", auth)
@mcp.tool()
def gitea_create_label(
name: str,
color: str,
description: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new label on a Gitea repository.
Args:
name: Name of the label (e.g. 'bug', 'epic').
color: HTML color code (hex, e.g. 'fbca04' or '#fbca04').
description: Description of the label.
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict containing the created label details.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
if color.startswith("#"):
color = color[1:]
payload = {
"name": name,
"color": color,
"description": description,
}
with _audited("create_label", host=h, remote=remote, org=o, repo=r,
request_metadata={"name": name}):
return api_request("POST", f"{base}/labels", auth, payload)
@mcp.tool()
def gitea_set_issue_labels(
issue_number: int,
labels: list[str],
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list:
"""Replace all labels on a Gitea issue with a new list of label names.
Args:
issue_number: The issue number.
labels: List of label names to apply.
remote: Known Gitea instance ('dadeschools' or 'prgs').
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
list of all labels currently applied to the issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# 1. Fetch existing labels on the repo to resolve names -> IDs
existing = api_request("GET", f"{base}/labels?limit=100", auth)
name_to_id = {lb["name"]: lb["id"] for lb in existing}
# 2. Check if any requested labels do not exist, and raise error
label_ids = []
missing_labels = []
for name in labels:
if name in name_to_id:
label_ids.append(name_to_id[name])
else:
missing_labels.append(name)
if missing_labels:
raise RuntimeError(
f"The following labels do not exist on the repository: {missing_labels}. "
"Please create them first using gitea_create_label."
)
# 3. PUT the labels to the issue
with _audited("set_issue_labels", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number, request_metadata={"labels": labels}):
res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids})
return res
@mcp.tool()
def gitea_mirror_refs(
apply: bool = False,
force: bool = False,
) -> dict:
"""Mirror branches and tags between dadeschools and prgs Timesheet repos.
Additive only — never deletes branches or tags. Diverged branches are
skipped unless force is True.
Args:
apply: If True, actually push. If False (default), dry-run only.
force: If True, force-push diverged branches.
Returns:
dict with 'output' (script stdout) and 'return_code'.
"""
script = os.path.join(PROJECT_ROOT, "mirror_refs.sh")
args = [script]
if apply:
args.append("--apply")
if force:
args.append("--force")
result = subprocess.run(
args, capture_output=True, text=True, timeout=120,
)
return {
"output": result.stdout + result.stderr,
"return_code": result.returncode,
}
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")