#18: Audit-log Gitea MCP mutating actions with execution profile metadata #29

Merged
sysadmin merged 1 commits from feat/issue-18-audit-log-mutating-actions into master 2026-07-01 21:33:22 -05:00
5 changed files with 635 additions and 15 deletions
+5
View File
@@ -30,6 +30,11 @@ GITEA_FORBIDDEN_OPERATIONS=merge,branch.push
# Optional short label attached to this runtime for audit purposes. # Optional short label attached to this runtime for audit purposes.
GITEA_AUDIT_LABEL=reviewer-runtime GITEA_AUDIT_LABEL=reviewer-runtime
# Optional path to an audit log file (#18). When set, each mutating action
# appends one redacted JSON record (profile + authenticated user + outcome).
# Leave unset to disable auditing entirely (no records, no extra API calls).
GITEA_AUDIT_LOG=/path/to/gitea-mcp-audit.log
# Optional NAME of the token's source (e.g. an env var name). This is a name # Optional NAME of the token's source (e.g. an env var name). This is a name
# only — never the token value. Surfaced by gitea_get_profile. # only — never the token value. Surfaced by gitea_get_profile.
GITEA_TOKEN_SOURCE=GITEA_TOKEN GITEA_TOKEN_SOURCE=GITEA_TOKEN
+7 -3
View File
@@ -168,6 +168,7 @@ Recognized environment fields (see [`.env.example`](.env.example) for placeholde
| `GITEA_AUDIT_LABEL` | Optional short label for this runtime, for audit purposes. | | `GITEA_AUDIT_LABEL` | Optional short label for this runtime, for audit purposes. |
| `GITEA_TOKEN_SOURCE` | Optional *name* of the token source (e.g. an env var name). A name only — never the token value. | | `GITEA_TOKEN_SOURCE` | Optional *name* of the token source (e.g. an env var name). A name only — never the token value. |
| `GITEA_BASE_URL` | Optional informational base URL. | | `GITEA_BASE_URL` | Optional informational base URL. |
| `GITEA_AUDIT_LOG` | Optional path to an audit log file. When set, mutating actions append one redacted JSON record each (profile + authenticated user + outcome). Unset ⇒ auditing off (no records, no extra API calls). |
Notes: Notes:
@@ -180,9 +181,12 @@ Notes:
can inspect which runtime it is talking to before deciding to act. can inspect which runtime it is talking to before deciding to act.
- See [`docs/gitea-execution-profiles.md`](docs/gitea-execution-profiles.md) for - See [`docs/gitea-execution-profiles.md`](docs/gitea-execution-profiles.md) for
the full profile model. the full profile model.
- **Audit logging:** #16 returns structured gate/merge results but does not add - **Audit logging (#18):** mutating actions emit a durable, redacted JSON audit
durable audit logging. Durable audit logging for Gitea MCP mutating actions is record — timestamp, action, result (`allowed`/`blocked`/`failed`/`succeeded`),
tracked by #18. profile name + audit label, authenticated username, target repo/issue/PR,
branch and head SHA where applicable — when `GITEA_AUDIT_LOG` is set. Auditing
is off by default and never adds API calls or breaks the action when off.
See [`gitea_audit.py`](gitea_audit.py).
</details> </details>
<details> <details>
+126
View File
@@ -0,0 +1,126 @@
"""Audit logging for Gitea MCP mutating actions (issue #18).
Emits one structured JSON record per mutating action so an operator can see
*which execution profile and authenticated Gitea user* performed (or was
blocked from / failed) each mutation.
Design constraints:
- **Off by default.** Records are written only when ``GITEA_AUDIT_LOG`` names a
file path. With it unset, ``write_event`` is a no-op and callers add zero
behaviour — existing tool behaviour and API call patterns are unchanged.
- **Never raises.** Auditing must never break the action it records; all sink
I/O is best-effort and swallows errors.
- **No secrets.** Tokens / Authorization material are redacted from request
metadata and reason strings before a record is written.
- **No network.** This module performs no HTTP; the caller supplies identity
and profile metadata it already resolved.
"""
import os
import json
import datetime
# Result states for an audited action.
ALLOWED = "allowed"
BLOCKED = "blocked"
FAILED = "failed"
SUCCEEDED = "succeeded"
REDACTED = "[REDACTED]"
# A dict key containing any of these (case-insensitive) has its value redacted.
_SECRET_KEY_HINTS = ("token", "password", "secret", "authorization", "auth")
# A string value starting with one of these has the following run redacted.
_SECRET_VALUE_PREFIXES = ("token ", "Basic ", "Bearer ")
def _redact_str(text):
"""Redact anything that looks like an Authorization credential in *text*."""
if not isinstance(text, str) or not text:
return text
out = text
for prefix in _SECRET_VALUE_PREFIXES:
idx = 0
while True:
i = out.find(prefix, idx)
if i == -1:
break
j = i + len(prefix)
while j < len(out) and not out[j].isspace():
j += 1
out = out[:i] + prefix + REDACTED + out[j:]
idx = i + len(prefix) + len(REDACTED)
return out
def redact(value):
"""Recursively redact secret-looking keys/values from a JSON-able value."""
if isinstance(value, dict):
result = {}
for k, v in value.items():
if isinstance(k, str) and any(h in k.lower() for h in _SECRET_KEY_HINTS):
result[k] = REDACTED
else:
result[k] = redact(v)
return result
if isinstance(value, (list, tuple)):
return [redact(v) for v in value]
if isinstance(value, str):
return _redact_str(value)
return value
def audit_log_path():
"""Return the configured audit log file path, or None if auditing is off."""
return (os.environ.get("GITEA_AUDIT_LOG") or "").strip() or None
def audit_enabled():
"""True when a sink is configured. When False, callers should skip auditing."""
return audit_log_path() is not None
def build_event(*, action, result, remote=None, server=None, repository=None,
issue_number=None, pr_number=None, profile_name=None,
audit_label=None, authenticated_username=None, target_branch=None,
head_sha=None, reason=None, request_metadata=None, now=None):
"""Build a redacted, JSON-able audit record for a mutating action."""
ts = now or datetime.datetime.now(datetime.timezone.utc)
if isinstance(ts, datetime.datetime):
ts = ts.isoformat()
return {
"timestamp": ts,
"action": action,
"action_type": "mutating",
"result": result,
"remote": remote,
"server": server,
"repository": repository,
"issue_number": issue_number,
"pr_number": pr_number,
"profile_name": profile_name,
"audit_label": audit_label,
"authenticated_username": authenticated_username,
"target_branch": target_branch,
"head_sha": head_sha,
"reason": _redact_str(reason) if reason else reason,
"request_metadata": redact(request_metadata) if request_metadata is not None else None,
}
def write_event(event, path=None):
"""Append *event* as one JSON line to the audit sink. Never raises.
Returns True if a line was written, False if auditing is off or the write
failed (auditing is best-effort and must not break the caller).
"""
path = path or audit_log_path()
if not path:
return False
try:
line = json.dumps(event, default=str, sort_keys=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(line + "\n")
return True
except Exception:
return False
+183
View File
@@ -15,6 +15,8 @@ Configuration (mcp_config.json):
""" """
import os import os
import sys import sys
import functools
import contextlib
import subprocess import subprocess
# Resolve the project root. MCP clients must launch this script directly with # Resolve the project root. MCP clients must launch this script directly with
@@ -38,6 +40,7 @@ from gitea_auth import ( # noqa: E402
repo_api_url, repo_api_url,
get_profile, get_profile,
) )
import gitea_audit # noqa: E402
mcp = FastMCP("gitea-tools", instructions=( mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. " "Gitea issue tracker and PR management for dadeschools and prgs instances. "
@@ -70,6 +73,144 @@ def _auth(host: str) -> str:
return header return header
# ── Audit logging (#18) ─────────────────────────────────────────────────────────
# Mutating actions emit a structured audit record (profile + authenticated user
# + outcome) when GITEA_AUDIT_LOG is configured. When it is not, every helper
# below short-circuits and performs NO work — no extra API calls, no I/O — so
# existing tool behaviour and API call sequences are unchanged.
_UNSET = object()
# Best-effort identity cache keyed by host, so an enabled audit trail resolves
# the authenticated username at most once per host per process.
_IDENTITY_CACHE: dict = {}
def _authenticated_username(host: str):
"""Resolve the authenticated Gitea username for *host* (cached, fail-soft).
Read-only. Returns None if the identity cannot be determined; never raises
and never surfaces credential material.
"""
if host in _IDENTITY_CACHE:
return _IDENTITY_CACHE[host]
user = None
try:
header = get_auth_header(host)
if header:
who = api_request("GET", f"https://{host}/api/v1/user", header)
user = (who or {}).get("login")
except Exception:
user = None
_IDENTITY_CACHE[host] = user
return user
def _audit(action: str, *, host, remote, result, org=None, repo=None,
reason=None, request_metadata=None, issue_number=None,
pr_number=None, target_branch=None, head_sha=None, username=_UNSET):
"""Emit one audit record for a mutating action. No-op unless auditing is on.
Never raises — auditing must not break the action it records.
"""
if not gitea_audit.audit_enabled():
return
try:
profile = get_profile()
if username is _UNSET:
username = _authenticated_username(host) if host else None
event = gitea_audit.build_event(
action=action,
result=result,
remote=remote,
server=(f"https://{host}" if host else None),
repository=repo,
issue_number=issue_number,
pr_number=pr_number,
profile_name=profile["profile_name"],
audit_label=profile["audit_label"],
authenticated_username=username,
target_branch=target_branch,
head_sha=head_sha,
reason=reason,
request_metadata=request_metadata,
)
gitea_audit.write_event(event)
except Exception:
pass # best-effort; never break the action
@contextlib.contextmanager
def _audited(action: str, *, host, remote, org=None, repo=None,
request_metadata=None, issue_number=None, pr_number=None,
target_branch=None):
"""Wrap a mutating API call: audit SUCCEEDED on return, FAILED on exception.
When auditing is off this yields immediately with no bookkeeping.
"""
if not gitea_audit.audit_enabled():
yield
return
try:
yield
except Exception as exc:
_audit(action, host=host, remote=remote, org=org, repo=repo,
result=gitea_audit.FAILED, reason=_redact(str(exc)),
request_metadata=request_metadata, issue_number=issue_number,
pr_number=pr_number, target_branch=target_branch)
raise
_audit(action, host=host, remote=remote, org=org, repo=repo,
result=gitea_audit.SUCCEEDED, request_metadata=request_metadata,
issue_number=issue_number, pr_number=pr_number,
target_branch=target_branch)
def _audit_pr_result(action: str):
"""Decorator for gated PR tools that return a result dict.
Reads the tool's own result dict (authenticated_user, profile, reasons,
performed) to emit an audit record classifying the outcome as SUCCEEDED,
BLOCKED, or FAILED. No extra API calls: identity comes from the result.
"""
def decorate(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
try:
if isinstance(result, dict) and gitea_audit.audit_enabled():
reasons = [str(x) for x in (result.get("reasons") or [])]
if result.get("performed"):
status = gitea_audit.SUCCEEDED
elif any("failed:" in x.lower() for x in reasons):
# "failed:" marks a surfaced exception (e.g. "merge
# failed: <err>"); a bare gate message like "… failed
# (fail closed)" is a policy block, not an execution error.
status = gitea_audit.FAILED
else:
status = gitea_audit.BLOCKED
remote = result.get("remote")
host = REMOTES[remote]["host"] if remote in REMOTES else None
_audit(
action,
host=host,
remote=remote,
result=status,
reason="; ".join(reasons) or None,
pr_number=result.get("pr_number"),
head_sha=result.get("head_sha"),
username=result.get("authenticated_user"),
request_metadata={
"requested_action": result.get("requested_action"),
"merge_method": result.get("merge_method"),
},
)
except Exception:
pass # best-effort; never break the tool
return result
return wrapper
return decorate
# ── Tools ───────────────────────────────────────────────────────────────────── # ── Tools ─────────────────────────────────────────────────────────────────────
@mcp.tool() @mcp.tool()
@@ -97,7 +238,16 @@ def gitea_create_issue(
h, o, r = _resolve(remote, host, org, repo) h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h) auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues" url = f"{repo_api_url(h, o, r)}/issues"
try:
data = api_request("POST", url, auth, {"title": title, "body": body}) data = api_request("POST", url, auth, {"title": title, "body": body})
except Exception as exc:
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.FAILED, reason=_redact(str(exc)),
request_metadata={"title": title})
raise
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.SUCCEEDED, issue_number=data["number"],
request_metadata={"title": title})
return {"number": data["number"], "url": data["html_url"]} return {"number": data["number"], "url": data["html_url"]}
@@ -131,7 +281,17 @@ def gitea_create_pr(
auth = _auth(h) auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls" url = f"{repo_api_url(h, o, r)}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base} payload = {"title": title, "body": body, "head": head, "base": base}
meta = {"title": title, "head": head, "base": base}
try:
data = api_request("POST", url, auth, payload) data = api_request("POST", url, auth, payload)
except Exception as exc:
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.FAILED, reason=_redact(str(exc)),
target_branch=head, request_metadata=meta)
raise
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.SUCCEEDED, pr_number=data["number"],
target_branch=head, request_metadata=meta)
return {"number": data["number"], "url": data["html_url"]} return {"number": data["number"], "url": data["html_url"]}
@@ -396,6 +556,7 @@ def _redact(text: str) -> str:
@mcp.tool() @mcp.tool()
@_audit_pr_result("submit_pr_review")
def gitea_submit_pr_review( def gitea_submit_pr_review(
pr_number: int, pr_number: int,
action: str, action: str,
@@ -575,6 +736,8 @@ def gitea_edit_pr(
if not payload: if not payload:
raise ValueError("At least one field to edit (title, body, state, base) must be provided.") raise ValueError("At least one field to edit (title, body, state, base) must be provided.")
with _audited("edit_pr", host=h, remote=remote, org=o, repo=r,
pr_number=pr_number, request_metadata={"fields": sorted(payload)}):
data = api_request("PATCH", url, auth, payload) data = api_request("PATCH", url, auth, payload)
return { return {
"success": True, "success": True,
@@ -663,6 +826,11 @@ def gitea_commit_files(
if new_branch is not None: if new_branch is not None:
payload["new_branch"] = new_branch payload["new_branch"] = new_branch
with _audited("commit_files", host=h, remote=remote, org=o, repo=r,
target_branch=(new_branch or branch),
request_metadata={"message": message,
"paths": [f.get("path") for f in files],
"operations": [f.get("operation") for f in files]}):
data = api_request("POST", url, auth, payload) data = api_request("POST", url, auth, payload)
return { return {
"success": True, "success": True,
@@ -676,6 +844,7 @@ _MERGE_METHODS = ("merge", "squash", "rebase")
@mcp.tool() @mcp.tool()
@_audit_pr_result("merge_pr")
def gitea_merge_pr( def gitea_merge_pr(
pr_number: int, pr_number: int,
confirmation: str = "", confirmation: str = "",
@@ -952,6 +1121,8 @@ def gitea_delete_branch(
import urllib.parse import urllib.parse
encoded_branch = urllib.parse.quote(branch, safe="") encoded_branch = urllib.parse.quote(branch, safe="")
url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}" url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}"
with _audited("delete_branch", host=h, remote=remote, org=o, repo=r,
target_branch=branch, request_metadata={"branch": branch}):
api_request("DELETE", url, auth) api_request("DELETE", url, auth)
return {"success": True, "message": f"Remote branch '{branch}' deleted."} return {"success": True, "message": f"Remote branch '{branch}' deleted."}
@@ -979,6 +1150,8 @@ def gitea_close_issue(
h, o, r = _resolve(remote, host, org, repo) h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h) auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}" url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
with _audited("close_issue", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number, request_metadata={"state": "closed"}):
api_request("PATCH", url, auth, {"state": "closed"}) api_request("PATCH", url, auth, {"state": "closed"})
return {"success": True, "message": f"Issue #{issue_number} closed."} return {"success": True, "message": f"Issue #{issue_number} closed."}
@@ -1232,10 +1405,16 @@ def gitea_mark_issue(
) )
if action == "start": if action == "start":
with _audited("label_issue", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number,
request_metadata={"op": "add", "label": "status:in-progress"}):
api_request("POST", f"{base}/issues/{issue_number}/labels", auth, api_request("POST", f"{base}/issues/{issue_number}/labels", auth,
{"labels": [label_id]}) {"labels": [label_id]})
return {"success": True, "message": f"Issue #{issue_number} claimed."} return {"success": True, "message": f"Issue #{issue_number} claimed."}
else: else:
with _audited("unlabel_issue", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number,
request_metadata={"op": "remove", "label": "status:in-progress"}):
api_request("DELETE", api_request("DELETE",
f"{base}/issues/{issue_number}/labels/{label_id}", auth) f"{base}/issues/{issue_number}/labels/{label_id}", auth)
return {"success": True, "message": f"Issue #{issue_number} released."} return {"success": True, "message": f"Issue #{issue_number} released."}
@@ -1301,6 +1480,8 @@ def gitea_create_label(
"color": color, "color": color,
"description": description, "description": description,
} }
with _audited("create_label", host=h, remote=remote, org=o, repo=r,
request_metadata={"name": name}):
return api_request("POST", f"{base}/labels", auth, payload) return api_request("POST", f"{base}/labels", auth, payload)
@@ -1350,6 +1531,8 @@ def gitea_set_issue_labels(
) )
# 3. PUT the labels to the issue # 3. PUT the labels to the issue
with _audited("set_issue_labels", host=h, remote=remote, org=o, repo=r,
issue_number=issue_number, request_metadata={"labels": labels}):
res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids}) res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids})
return res return res
+302
View File
@@ -0,0 +1,302 @@
"""Tests for Gitea MCP mutating-action audit logging (issue #18).
Covers the pure audit module (redaction, event building, sink writes) and the
wiring in mcp_server: mutating tools emit one record with profile +
authenticated username + outcome, secrets are redacted, and — critically —
auditing is a no-op (no records, no extra API calls) when GITEA_AUDIT_LOG is
unset so existing behaviour is unchanged.
"""
import os
import sys
import json
import datetime
import tempfile
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_audit # noqa: E402
import mcp_server # noqa: E402
from mcp_server import ( # noqa: E402
gitea_create_issue,
gitea_close_issue,
gitea_merge_pr,
gitea_submit_pr_review,
)
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
FIXED_TS = datetime.datetime(2026, 7, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
# ---------------------------------------------------------------------------
# Pure audit module
# ---------------------------------------------------------------------------
class TestRedaction(unittest.TestCase):
def test_redacts_secret_keys(self):
red = gitea_audit.redact({"token": "abc", "title": "hi", "Password": "x"})
self.assertEqual(red["token"], gitea_audit.REDACTED)
self.assertEqual(red["Password"], gitea_audit.REDACTED)
self.assertEqual(red["title"], "hi")
def test_redacts_nested_and_lists(self):
red = gitea_audit.redact({"outer": {"authorization": "Basic xyz"},
"items": [{"secret": "s"}, "plain"]})
self.assertEqual(red["outer"]["authorization"], gitea_audit.REDACTED)
self.assertEqual(red["items"][0]["secret"], gitea_audit.REDACTED)
self.assertEqual(red["items"][1], "plain")
def test_redacts_credential_value_prefixes(self):
self.assertEqual(
gitea_audit._redact_str("failed: token abc-123 rejected"),
"failed: token [REDACTED] rejected",
)
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Basic Zm9v"))
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Bearer tok99"))
def test_non_string_untouched(self):
self.assertEqual(gitea_audit.redact(42), 42)
self.assertIsNone(gitea_audit.redact(None))
class TestBuildEvent(unittest.TestCase):
def test_core_fields_and_injected_timestamp(self):
ev = gitea_audit.build_event(
action="create_issue", result=gitea_audit.SUCCEEDED,
remote="prgs", server="https://gitea.prgs.cc", repository="Repo",
issue_number=5, profile_name="gitea-author", audit_label="author-rt",
authenticated_username="bot", now=FIXED_TS,
)
self.assertEqual(ev["timestamp"], "2026-07-01T12:00:00+00:00")
self.assertEqual(ev["action"], "create_issue")
self.assertEqual(ev["action_type"], "mutating")
self.assertEqual(ev["result"], "succeeded")
self.assertEqual(ev["profile_name"], "gitea-author")
self.assertEqual(ev["audit_label"], "author-rt")
self.assertEqual(ev["authenticated_username"], "bot")
self.assertEqual(ev["issue_number"], 5)
def test_metadata_and_reason_redacted(self):
ev = gitea_audit.build_event(
action="create_pr", result=gitea_audit.FAILED,
reason="HTTP 500: token secret-xyz bad",
request_metadata={"title": "t", "token": "leak"}, now=FIXED_TS,
)
self.assertNotIn("secret-xyz", ev["reason"])
self.assertEqual(ev["request_metadata"]["token"], gitea_audit.REDACTED)
self.assertEqual(ev["request_metadata"]["title"], "t")
class TestSink(unittest.TestCase):
def test_enabled_reflects_env(self):
with patch.dict(os.environ, {}, clear=True):
self.assertFalse(gitea_audit.audit_enabled())
with patch.dict(os.environ, {"GITEA_AUDIT_LOG": "/tmp/x.log"}, clear=True):
self.assertTrue(gitea_audit.audit_enabled())
def test_write_event_appends_json_lines(self):
with tempfile.TemporaryDirectory() as d:
path = os.path.join(d, "audit.log")
self.assertTrue(gitea_audit.write_event({"a": 1}, path=path))
self.assertTrue(gitea_audit.write_event({"b": 2}, path=path))
with open(path, encoding="utf-8") as fh:
lines = fh.read().splitlines()
self.assertEqual(len(lines), 2)
self.assertEqual(json.loads(lines[0])["a"], 1)
self.assertEqual(json.loads(lines[1])["b"], 2)
def test_write_event_noop_without_path(self):
with patch.dict(os.environ, {}, clear=True):
self.assertFalse(gitea_audit.write_event({"a": 1}))
def test_write_event_never_raises_on_bad_path(self):
# A path inside a non-existent directory cannot be opened; must not raise.
self.assertFalse(gitea_audit.write_event({"a": 1}, path="/no/such/dir/x.log"))
# ---------------------------------------------------------------------------
# mcp_server wiring
# ---------------------------------------------------------------------------
class _AuditWiringBase(unittest.TestCase):
def setUp(self):
self._dir = tempfile.TemporaryDirectory()
self.audit_path = os.path.join(self._dir.name, "audit.log")
# Identity cache is process-global; clear so lookups are deterministic.
mcp_server._IDENTITY_CACHE.clear()
def tearDown(self):
mcp_server._IDENTITY_CACHE.clear()
self._dir.cleanup()
def _env(self, **extra):
env = {"GITEA_AUDIT_LOG": self.audit_path,
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
env.update(extra)
return env
def _records(self):
if not os.path.exists(self.audit_path):
return []
with open(self.audit_path, encoding="utf-8") as fh:
return [json.loads(line) for line in fh if line.strip()]
class TestSimpleToolAudit(_AuditWiringBase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_issue_success_audited(self, _auth, mock_api):
# 1: create POST result, 2: identity /user lookup for the audit record.
mock_api.side_effect = [
{"number": 11, "html_url": "https://gitea.prgs.cc/issues/11"},
{"login": "author-bot"},
]
with patch.dict(os.environ, self._env(), clear=True):
result = gitea_create_issue(title="Add thing", remote="prgs")
self.assertEqual(result["number"], 11)
recs = self._records()
self.assertEqual(len(recs), 1)
rec = recs[0]
self.assertEqual(rec["action"], "create_issue")
self.assertEqual(rec["result"], "succeeded")
self.assertEqual(rec["profile_name"], "gitea-author")
self.assertEqual(rec["authenticated_username"], "author-bot")
self.assertEqual(rec["issue_number"], 11)
self.assertEqual(rec["request_metadata"]["title"], "Add thing")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_issue_failure_audited(self, _auth, mock_api):
mock_api.side_effect = [
RuntimeError("HTTP 500: boom"),
{"login": "author-bot"}, # identity lookup for the audit record
]
with patch.dict(os.environ, self._env(), clear=True):
with self.assertRaises(RuntimeError):
gitea_create_issue(title="X", remote="prgs")
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["result"], "failed")
self.assertIn("HTTP 500", recs[0]["reason"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_close_issue_audited(self, _auth, mock_api):
mock_api.side_effect = [{"state": "closed"}, {"login": "mgr-bot"}]
with patch.dict(os.environ, self._env(), clear=True):
gitea_close_issue(issue_number=42, remote="prgs")
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["action"], "close_issue")
self.assertEqual(recs[0]["issue_number"], 42)
self.assertEqual(recs[0]["authenticated_username"], "mgr-bot")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_disabled_writes_nothing_and_no_extra_call(self, _auth, mock_api):
# No GITEA_AUDIT_LOG -> audit is a no-op: exactly one API call, no file.
mock_api.return_value = {"number": 1, "html_url": "http://x/1"}
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
gitea_create_issue(title="x", remote="prgs")
mock_api.assert_called_once()
self.assertEqual(self._records(), [])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_secrets_never_written(self, _auth, mock_api):
mock_api.side_effect = [
{"number": 3, "html_url": "http://x/3"},
{"login": "author-bot"},
]
env = self._env(GITEA_TOKEN="super-secret-token")
with patch.dict(os.environ, env, clear=True):
gitea_create_issue(title="t", remote="prgs")
with open(self.audit_path, encoding="utf-8") as fh:
blob = fh.read().lower()
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_audit_failure_never_breaks_action(self, _auth, mock_api):
mock_api.side_effect = [
{"number": 9, "html_url": "http://x/9"},
{"login": "author-bot"},
]
with patch.dict(os.environ, self._env(), clear=True):
with patch("gitea_audit.write_event", side_effect=RuntimeError("disk full")):
result = gitea_create_issue(title="t", remote="prgs")
# The mutation result is returned even though the sink write blew up.
self.assertEqual(result["number"], 9)
class TestGatedToolAudit(_AuditWiringBase):
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {"user": {"login": author}, "state": state,
"head": {"sha": sha}, "mergeable": mergeable}
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_success_audited(self, _auth, mock_api):
# user, pr, merge POST, readback — no extra identity call (uses result).
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
{}, {"merged_commit_sha": "c1"},
]
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
GITEA_ALLOWED_OPERATIONS="read,merge")
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8",
expected_head_sha="abc123", remote="prgs")
self.assertTrue(r["performed"])
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["action"], "merge_pr")
self.assertEqual(recs[0]["result"], "succeeded")
self.assertEqual(recs[0]["authenticated_username"], "merger-bot")
self.assertEqual(recs[0]["pr_number"], 8)
self.assertEqual(recs[0]["head_sha"], "abc123")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_blocked_audited(self, _auth, mock_api):
# Self-author merge is blocked; must still be recorded as blocked.
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
GITEA_ALLOWED_OPERATIONS="read,merge")
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", remote="prgs")
self.assertFalse(r["performed"])
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["result"], "blocked")
self.assertEqual(recs[0]["authenticated_username"], "jcwalker3")
self.assertIn("PR author", recs[0]["reason"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_submit_review_success_audited(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
]
env = self._env(GITEA_PROFILE_NAME="gitea-reviewer",
GITEA_ALLOWED_OPERATIONS="read,review,approve")
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(pr_number=8, action="approve",
body="LGTM", remote="prgs")
self.assertTrue(r["performed"])
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["action"], "submit_pr_review")
self.assertEqual(recs[0]["result"], "succeeded")
self.assertEqual(recs[0]["authenticated_username"], "reviewer-bot")
if __name__ == "__main__":
unittest.main()