From c3c48fb7c24e12fad8f6c09040e11b831d79a169 Mon Sep 17 00:00:00 2001 From: Jason Walker <913443@dadeschools.net> Date: Wed, 1 Jul 2026 22:20:51 -0400 Subject: [PATCH] feat: audit-log Gitea MCP mutating actions with profile metadata (#18) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add durable, opt-in audit logging for every mutating Gitea MCP action so an operator can see which execution profile and authenticated Gitea user performed (or was blocked from / failed) each mutation. - New gitea_audit.py: pure, no-network module — recursive secret redaction (token/password/authorization keys; token/Basic/Bearer value runs), build_event (timestamp, action, result, profile, audit label, authenticated username, repo, issue/PR, target branch, head SHA, redacted request metadata), and an append-only JSON Lines sink. - mcp_server.py: _audit helper + _audited context manager (simple mutations) and an _audit_pr_result decorator (gated review/merge tools, reading their own result dict) wired into create_issue, create_pr, edit_pr, close_issue, commit_files, delete_branch, create_label, set_issue_labels, mark_issue (label/unlabel), gitea_submit_pr_review, and gitea_merge_pr. - Outcomes recorded as allowed/blocked/failed/succeeded; blocked and failed eligibility checks are logged, not just successes. Off by default: records are written only when GITEA_AUDIT_LOG is set. When it is unset every audit path short-circuits — no records, no extra API calls — so existing tool behaviour and API call sequences are unchanged. Auditing never raises; sink writes are best-effort. Tokens are never written. Docs: README env table + audit note, .env.example placeholder. Tests: tests/test_audit.py (19 cases) — redaction, event build, sink writes, per-tool success/failure/blocked records, secret-free output, off-by-default no-op, and audit-failure-never-breaks-action. Closes #18 Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 5 + README.md | 10 +- gitea_audit.py | 126 ++++++++++++++++++ mcp_server.py | 207 ++++++++++++++++++++++++++++-- tests/test_audit.py | 302 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 635 insertions(+), 15 deletions(-) create mode 100644 gitea_audit.py create mode 100644 tests/test_audit.py diff --git a/.env.example b/.env.example index 52d9bc9..0748a31 100644 --- a/.env.example +++ b/.env.example @@ -30,6 +30,11 @@ GITEA_FORBIDDEN_OPERATIONS=merge,branch.push # Optional short label attached to this runtime for audit purposes. GITEA_AUDIT_LABEL=reviewer-runtime +# Optional path to an audit log file (#18). When set, each mutating action +# appends one redacted JSON record (profile + authenticated user + outcome). +# Leave unset to disable auditing entirely (no records, no extra API calls). +GITEA_AUDIT_LOG=/path/to/gitea-mcp-audit.log + # Optional NAME of the token's source (e.g. an env var name). This is a name # only — never the token value. Surfaced by gitea_get_profile. GITEA_TOKEN_SOURCE=GITEA_TOKEN diff --git a/README.md b/README.md index fb12133..8cf4916 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,7 @@ Recognized environment fields (see [`.env.example`](.env.example) for placeholde | `GITEA_AUDIT_LABEL` | Optional short label for this runtime, for audit purposes. | | `GITEA_TOKEN_SOURCE` | Optional *name* of the token source (e.g. an env var name). A name only — never the token value. | | `GITEA_BASE_URL` | Optional informational base URL. | +| `GITEA_AUDIT_LOG` | Optional path to an audit log file. When set, mutating actions append one redacted JSON record each (profile + authenticated user + outcome). Unset ⇒ auditing off (no records, no extra API calls). | Notes: @@ -180,9 +181,12 @@ Notes: can inspect which runtime it is talking to before deciding to act. - See [`docs/gitea-execution-profiles.md`](docs/gitea-execution-profiles.md) for the full profile model. -- **Audit logging:** #16 returns structured gate/merge results but does not add - durable audit logging. Durable audit logging for Gitea MCP mutating actions is - tracked by #18. +- **Audit logging (#18):** mutating actions emit a durable, redacted JSON audit + record — timestamp, action, result (`allowed`/`blocked`/`failed`/`succeeded`), + profile name + audit label, authenticated username, target repo/issue/PR, + branch and head SHA where applicable — when `GITEA_AUDIT_LOG` is set. Auditing + is off by default and never adds API calls or breaks the action when off. + See [`gitea_audit.py`](gitea_audit.py).
diff --git a/gitea_audit.py b/gitea_audit.py new file mode 100644 index 0000000..3289ef9 --- /dev/null +++ b/gitea_audit.py @@ -0,0 +1,126 @@ +"""Audit logging for Gitea MCP mutating actions (issue #18). + +Emits one structured JSON record per mutating action so an operator can see +*which execution profile and authenticated Gitea user* performed (or was +blocked from / failed) each mutation. + +Design constraints: + +- **Off by default.** Records are written only when ``GITEA_AUDIT_LOG`` names a + file path. With it unset, ``write_event`` is a no-op and callers add zero + behaviour — existing tool behaviour and API call patterns are unchanged. +- **Never raises.** Auditing must never break the action it records; all sink + I/O is best-effort and swallows errors. +- **No secrets.** Tokens / Authorization material are redacted from request + metadata and reason strings before a record is written. +- **No network.** This module performs no HTTP; the caller supplies identity + and profile metadata it already resolved. +""" +import os +import json +import datetime + +# Result states for an audited action. +ALLOWED = "allowed" +BLOCKED = "blocked" +FAILED = "failed" +SUCCEEDED = "succeeded" + +REDACTED = "[REDACTED]" + +# A dict key containing any of these (case-insensitive) has its value redacted. +_SECRET_KEY_HINTS = ("token", "password", "secret", "authorization", "auth") +# A string value starting with one of these has the following run redacted. +_SECRET_VALUE_PREFIXES = ("token ", "Basic ", "Bearer ") + + +def _redact_str(text): + """Redact anything that looks like an Authorization credential in *text*.""" + if not isinstance(text, str) or not text: + return text + out = text + for prefix in _SECRET_VALUE_PREFIXES: + idx = 0 + while True: + i = out.find(prefix, idx) + if i == -1: + break + j = i + len(prefix) + while j < len(out) and not out[j].isspace(): + j += 1 + out = out[:i] + prefix + REDACTED + out[j:] + idx = i + len(prefix) + len(REDACTED) + return out + + +def redact(value): + """Recursively redact secret-looking keys/values from a JSON-able value.""" + if isinstance(value, dict): + result = {} + for k, v in value.items(): + if isinstance(k, str) and any(h in k.lower() for h in _SECRET_KEY_HINTS): + result[k] = REDACTED + else: + result[k] = redact(v) + return result + if isinstance(value, (list, tuple)): + return [redact(v) for v in value] + if isinstance(value, str): + return _redact_str(value) + return value + + +def audit_log_path(): + """Return the configured audit log file path, or None if auditing is off.""" + return (os.environ.get("GITEA_AUDIT_LOG") or "").strip() or None + + +def audit_enabled(): + """True when a sink is configured. When False, callers should skip auditing.""" + return audit_log_path() is not None + + +def build_event(*, action, result, remote=None, server=None, repository=None, + issue_number=None, pr_number=None, profile_name=None, + audit_label=None, authenticated_username=None, target_branch=None, + head_sha=None, reason=None, request_metadata=None, now=None): + """Build a redacted, JSON-able audit record for a mutating action.""" + ts = now or datetime.datetime.now(datetime.timezone.utc) + if isinstance(ts, datetime.datetime): + ts = ts.isoformat() + return { + "timestamp": ts, + "action": action, + "action_type": "mutating", + "result": result, + "remote": remote, + "server": server, + "repository": repository, + "issue_number": issue_number, + "pr_number": pr_number, + "profile_name": profile_name, + "audit_label": audit_label, + "authenticated_username": authenticated_username, + "target_branch": target_branch, + "head_sha": head_sha, + "reason": _redact_str(reason) if reason else reason, + "request_metadata": redact(request_metadata) if request_metadata is not None else None, + } + + +def write_event(event, path=None): + """Append *event* as one JSON line to the audit sink. Never raises. + + Returns True if a line was written, False if auditing is off or the write + failed (auditing is best-effort and must not break the caller). + """ + path = path or audit_log_path() + if not path: + return False + try: + line = json.dumps(event, default=str, sort_keys=True) + with open(path, "a", encoding="utf-8") as fh: + fh.write(line + "\n") + return True + except Exception: + return False diff --git a/mcp_server.py b/mcp_server.py index 3810c3f..8f33c43 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -15,6 +15,8 @@ Configuration (mcp_config.json): """ import os import sys +import functools +import contextlib import subprocess # Resolve the project root. MCP clients must launch this script directly with @@ -38,6 +40,7 @@ from gitea_auth import ( # noqa: E402 repo_api_url, get_profile, ) +import gitea_audit # noqa: E402 mcp = FastMCP("gitea-tools", instructions=( "Gitea issue tracker and PR management for dadeschools and prgs instances. " @@ -70,6 +73,144 @@ def _auth(host: str) -> str: return header +# ── Audit logging (#18) ───────────────────────────────────────────────────────── +# Mutating actions emit a structured audit record (profile + authenticated user +# + outcome) when GITEA_AUDIT_LOG is configured. When it is not, every helper +# below short-circuits and performs NO work — no extra API calls, no I/O — so +# existing tool behaviour and API call sequences are unchanged. + +_UNSET = object() + +# Best-effort identity cache keyed by host, so an enabled audit trail resolves +# the authenticated username at most once per host per process. +_IDENTITY_CACHE: dict = {} + + +def _authenticated_username(host: str): + """Resolve the authenticated Gitea username for *host* (cached, fail-soft). + + Read-only. Returns None if the identity cannot be determined; never raises + and never surfaces credential material. + """ + if host in _IDENTITY_CACHE: + return _IDENTITY_CACHE[host] + user = None + try: + header = get_auth_header(host) + if header: + who = api_request("GET", f"https://{host}/api/v1/user", header) + user = (who or {}).get("login") + except Exception: + user = None + _IDENTITY_CACHE[host] = user + return user + + +def _audit(action: str, *, host, remote, result, org=None, repo=None, + reason=None, request_metadata=None, issue_number=None, + pr_number=None, target_branch=None, head_sha=None, username=_UNSET): + """Emit one audit record for a mutating action. No-op unless auditing is on. + + Never raises — auditing must not break the action it records. + """ + if not gitea_audit.audit_enabled(): + return + try: + profile = get_profile() + if username is _UNSET: + username = _authenticated_username(host) if host else None + event = gitea_audit.build_event( + action=action, + result=result, + remote=remote, + server=(f"https://{host}" if host else None), + repository=repo, + issue_number=issue_number, + pr_number=pr_number, + profile_name=profile["profile_name"], + audit_label=profile["audit_label"], + authenticated_username=username, + target_branch=target_branch, + head_sha=head_sha, + reason=reason, + request_metadata=request_metadata, + ) + gitea_audit.write_event(event) + except Exception: + pass # best-effort; never break the action + + +@contextlib.contextmanager +def _audited(action: str, *, host, remote, org=None, repo=None, + request_metadata=None, issue_number=None, pr_number=None, + target_branch=None): + """Wrap a mutating API call: audit SUCCEEDED on return, FAILED on exception. + + When auditing is off this yields immediately with no bookkeeping. + """ + if not gitea_audit.audit_enabled(): + yield + return + try: + yield + except Exception as exc: + _audit(action, host=host, remote=remote, org=org, repo=repo, + result=gitea_audit.FAILED, reason=_redact(str(exc)), + request_metadata=request_metadata, issue_number=issue_number, + pr_number=pr_number, target_branch=target_branch) + raise + _audit(action, host=host, remote=remote, org=org, repo=repo, + result=gitea_audit.SUCCEEDED, request_metadata=request_metadata, + issue_number=issue_number, pr_number=pr_number, + target_branch=target_branch) + + +def _audit_pr_result(action: str): + """Decorator for gated PR tools that return a result dict. + + Reads the tool's own result dict (authenticated_user, profile, reasons, + performed) to emit an audit record classifying the outcome as SUCCEEDED, + BLOCKED, or FAILED. No extra API calls: identity comes from the result. + """ + def decorate(fn): + @functools.wraps(fn) + def wrapper(*args, **kwargs): + result = fn(*args, **kwargs) + try: + if isinstance(result, dict) and gitea_audit.audit_enabled(): + reasons = [str(x) for x in (result.get("reasons") or [])] + if result.get("performed"): + status = gitea_audit.SUCCEEDED + elif any("failed:" in x.lower() for x in reasons): + # "failed:" marks a surfaced exception (e.g. "merge + # failed: "); a bare gate message like "… failed + # (fail closed)" is a policy block, not an execution error. + status = gitea_audit.FAILED + else: + status = gitea_audit.BLOCKED + remote = result.get("remote") + host = REMOTES[remote]["host"] if remote in REMOTES else None + _audit( + action, + host=host, + remote=remote, + result=status, + reason="; ".join(reasons) or None, + pr_number=result.get("pr_number"), + head_sha=result.get("head_sha"), + username=result.get("authenticated_user"), + request_metadata={ + "requested_action": result.get("requested_action"), + "merge_method": result.get("merge_method"), + }, + ) + except Exception: + pass # best-effort; never break the tool + return result + return wrapper + return decorate + + # ── Tools ───────────────────────────────────────────────────────────────────── @mcp.tool() @@ -97,7 +238,16 @@ def gitea_create_issue( h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues" - data = api_request("POST", url, auth, {"title": title, "body": body}) + try: + data = api_request("POST", url, auth, {"title": title, "body": body}) + except Exception as exc: + _audit("create_issue", host=h, remote=remote, org=o, repo=r, + result=gitea_audit.FAILED, reason=_redact(str(exc)), + request_metadata={"title": title}) + raise + _audit("create_issue", host=h, remote=remote, org=o, repo=r, + result=gitea_audit.SUCCEEDED, issue_number=data["number"], + request_metadata={"title": title}) return {"number": data["number"], "url": data["html_url"]} @@ -131,7 +281,17 @@ def gitea_create_pr( auth = _auth(h) url = f"{repo_api_url(h, o, r)}/pulls" payload = {"title": title, "body": body, "head": head, "base": base} - data = api_request("POST", url, auth, payload) + meta = {"title": title, "head": head, "base": base} + try: + data = api_request("POST", url, auth, payload) + except Exception as exc: + _audit("create_pr", host=h, remote=remote, org=o, repo=r, + result=gitea_audit.FAILED, reason=_redact(str(exc)), + target_branch=head, request_metadata=meta) + raise + _audit("create_pr", host=h, remote=remote, org=o, repo=r, + result=gitea_audit.SUCCEEDED, pr_number=data["number"], + target_branch=head, request_metadata=meta) return {"number": data["number"], "url": data["html_url"]} @@ -396,6 +556,7 @@ def _redact(text: str) -> str: @mcp.tool() +@_audit_pr_result("submit_pr_review") def gitea_submit_pr_review( pr_number: int, action: str, @@ -575,7 +736,9 @@ def gitea_edit_pr( if not payload: raise ValueError("At least one field to edit (title, body, state, base) must be provided.") - data = api_request("PATCH", url, auth, payload) + with _audited("edit_pr", host=h, remote=remote, org=o, repo=r, + pr_number=pr_number, request_metadata={"fields": sorted(payload)}): + data = api_request("PATCH", url, auth, payload) return { "success": True, "number": data["number"], @@ -663,7 +826,12 @@ def gitea_commit_files( if new_branch is not None: payload["new_branch"] = new_branch - data = api_request("POST", url, auth, payload) + with _audited("commit_files", host=h, remote=remote, org=o, repo=r, + target_branch=(new_branch or branch), + request_metadata={"message": message, + "paths": [f.get("path") for f in files], + "operations": [f.get("operation") for f in files]}): + data = api_request("POST", url, auth, payload) return { "success": True, "commit": data.get("commit", {}).get("sha", ""), @@ -676,6 +844,7 @@ _MERGE_METHODS = ("merge", "squash", "rebase") @mcp.tool() +@_audit_pr_result("merge_pr") def gitea_merge_pr( pr_number: int, confirmation: str = "", @@ -952,7 +1121,9 @@ def gitea_delete_branch( import urllib.parse encoded_branch = urllib.parse.quote(branch, safe="") url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}" - api_request("DELETE", url, auth) + with _audited("delete_branch", host=h, remote=remote, org=o, repo=r, + target_branch=branch, request_metadata={"branch": branch}): + api_request("DELETE", url, auth) return {"success": True, "message": f"Remote branch '{branch}' deleted."} @@ -979,7 +1150,9 @@ def gitea_close_issue( h, o, r = _resolve(remote, host, org, repo) auth = _auth(h) url = f"{repo_api_url(h, o, r)}/issues/{issue_number}" - api_request("PATCH", url, auth, {"state": "closed"}) + with _audited("close_issue", host=h, remote=remote, org=o, repo=r, + issue_number=issue_number, request_metadata={"state": "closed"}): + api_request("PATCH", url, auth, {"state": "closed"}) return {"success": True, "message": f"Issue #{issue_number} closed."} @@ -1232,12 +1405,18 @@ def gitea_mark_issue( ) if action == "start": - api_request("POST", f"{base}/issues/{issue_number}/labels", auth, - {"labels": [label_id]}) + with _audited("label_issue", host=h, remote=remote, org=o, repo=r, + issue_number=issue_number, + request_metadata={"op": "add", "label": "status:in-progress"}): + api_request("POST", f"{base}/issues/{issue_number}/labels", auth, + {"labels": [label_id]}) return {"success": True, "message": f"Issue #{issue_number} claimed."} else: - api_request("DELETE", - f"{base}/issues/{issue_number}/labels/{label_id}", auth) + with _audited("unlabel_issue", host=h, remote=remote, org=o, repo=r, + issue_number=issue_number, + request_metadata={"op": "remove", "label": "status:in-progress"}): + api_request("DELETE", + f"{base}/issues/{issue_number}/labels/{label_id}", auth) return {"success": True, "message": f"Issue #{issue_number} released."} @@ -1301,7 +1480,9 @@ def gitea_create_label( "color": color, "description": description, } - return api_request("POST", f"{base}/labels", auth, payload) + with _audited("create_label", host=h, remote=remote, org=o, repo=r, + request_metadata={"name": name}): + return api_request("POST", f"{base}/labels", auth, payload) @mcp.tool() @@ -1350,7 +1531,9 @@ def gitea_set_issue_labels( ) # 3. PUT the labels to the issue - res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids}) + with _audited("set_issue_labels", host=h, remote=remote, org=o, repo=r, + issue_number=issue_number, request_metadata={"labels": labels}): + res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids}) return res diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..7c31f7f --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,302 @@ +"""Tests for Gitea MCP mutating-action audit logging (issue #18). + +Covers the pure audit module (redaction, event building, sink writes) and the +wiring in mcp_server: mutating tools emit one record with profile + +authenticated username + outcome, secrets are redacted, and — critically — +auditing is a no-op (no records, no extra API calls) when GITEA_AUDIT_LOG is +unset so existing behaviour is unchanged. +""" +import os +import sys +import json +import datetime +import tempfile +import unittest +from unittest.mock import patch + +sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) + +import gitea_audit # noqa: E402 +import mcp_server # noqa: E402 +from mcp_server import ( # noqa: E402 + gitea_create_issue, + gitea_close_issue, + gitea_merge_pr, + gitea_submit_pr_review, +) + +FAKE_AUTH = "Basic dGVzdDp0ZXN0" +FIXED_TS = datetime.datetime(2026, 7, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + + +# --------------------------------------------------------------------------- +# Pure audit module +# --------------------------------------------------------------------------- +class TestRedaction(unittest.TestCase): + + def test_redacts_secret_keys(self): + red = gitea_audit.redact({"token": "abc", "title": "hi", "Password": "x"}) + self.assertEqual(red["token"], gitea_audit.REDACTED) + self.assertEqual(red["Password"], gitea_audit.REDACTED) + self.assertEqual(red["title"], "hi") + + def test_redacts_nested_and_lists(self): + red = gitea_audit.redact({"outer": {"authorization": "Basic xyz"}, + "items": [{"secret": "s"}, "plain"]}) + self.assertEqual(red["outer"]["authorization"], gitea_audit.REDACTED) + self.assertEqual(red["items"][0]["secret"], gitea_audit.REDACTED) + self.assertEqual(red["items"][1], "plain") + + def test_redacts_credential_value_prefixes(self): + self.assertEqual( + gitea_audit._redact_str("failed: token abc-123 rejected"), + "failed: token [REDACTED] rejected", + ) + self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Basic Zm9v")) + self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Bearer tok99")) + + def test_non_string_untouched(self): + self.assertEqual(gitea_audit.redact(42), 42) + self.assertIsNone(gitea_audit.redact(None)) + + +class TestBuildEvent(unittest.TestCase): + + def test_core_fields_and_injected_timestamp(self): + ev = gitea_audit.build_event( + action="create_issue", result=gitea_audit.SUCCEEDED, + remote="prgs", server="https://gitea.prgs.cc", repository="Repo", + issue_number=5, profile_name="gitea-author", audit_label="author-rt", + authenticated_username="bot", now=FIXED_TS, + ) + self.assertEqual(ev["timestamp"], "2026-07-01T12:00:00+00:00") + self.assertEqual(ev["action"], "create_issue") + self.assertEqual(ev["action_type"], "mutating") + self.assertEqual(ev["result"], "succeeded") + self.assertEqual(ev["profile_name"], "gitea-author") + self.assertEqual(ev["audit_label"], "author-rt") + self.assertEqual(ev["authenticated_username"], "bot") + self.assertEqual(ev["issue_number"], 5) + + def test_metadata_and_reason_redacted(self): + ev = gitea_audit.build_event( + action="create_pr", result=gitea_audit.FAILED, + reason="HTTP 500: token secret-xyz bad", + request_metadata={"title": "t", "token": "leak"}, now=FIXED_TS, + ) + self.assertNotIn("secret-xyz", ev["reason"]) + self.assertEqual(ev["request_metadata"]["token"], gitea_audit.REDACTED) + self.assertEqual(ev["request_metadata"]["title"], "t") + + +class TestSink(unittest.TestCase): + + def test_enabled_reflects_env(self): + with patch.dict(os.environ, {}, clear=True): + self.assertFalse(gitea_audit.audit_enabled()) + with patch.dict(os.environ, {"GITEA_AUDIT_LOG": "/tmp/x.log"}, clear=True): + self.assertTrue(gitea_audit.audit_enabled()) + + def test_write_event_appends_json_lines(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "audit.log") + self.assertTrue(gitea_audit.write_event({"a": 1}, path=path)) + self.assertTrue(gitea_audit.write_event({"b": 2}, path=path)) + with open(path, encoding="utf-8") as fh: + lines = fh.read().splitlines() + self.assertEqual(len(lines), 2) + self.assertEqual(json.loads(lines[0])["a"], 1) + self.assertEqual(json.loads(lines[1])["b"], 2) + + def test_write_event_noop_without_path(self): + with patch.dict(os.environ, {}, clear=True): + self.assertFalse(gitea_audit.write_event({"a": 1})) + + def test_write_event_never_raises_on_bad_path(self): + # A path inside a non-existent directory cannot be opened; must not raise. + self.assertFalse(gitea_audit.write_event({"a": 1}, path="/no/such/dir/x.log")) + + +# --------------------------------------------------------------------------- +# mcp_server wiring +# --------------------------------------------------------------------------- +class _AuditWiringBase(unittest.TestCase): + + def setUp(self): + self._dir = tempfile.TemporaryDirectory() + self.audit_path = os.path.join(self._dir.name, "audit.log") + # Identity cache is process-global; clear so lookups are deterministic. + mcp_server._IDENTITY_CACHE.clear() + + def tearDown(self): + mcp_server._IDENTITY_CACHE.clear() + self._dir.cleanup() + + def _env(self, **extra): + env = {"GITEA_AUDIT_LOG": self.audit_path, + "GITEA_PROFILE_NAME": "gitea-author", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + env.update(extra) + return env + + def _records(self): + if not os.path.exists(self.audit_path): + return [] + with open(self.audit_path, encoding="utf-8") as fh: + return [json.loads(line) for line in fh if line.strip()] + + +class TestSimpleToolAudit(_AuditWiringBase): + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_create_issue_success_audited(self, _auth, mock_api): + # 1: create POST result, 2: identity /user lookup for the audit record. + mock_api.side_effect = [ + {"number": 11, "html_url": "https://gitea.prgs.cc/issues/11"}, + {"login": "author-bot"}, + ] + with patch.dict(os.environ, self._env(), clear=True): + result = gitea_create_issue(title="Add thing", remote="prgs") + self.assertEqual(result["number"], 11) + recs = self._records() + self.assertEqual(len(recs), 1) + rec = recs[0] + self.assertEqual(rec["action"], "create_issue") + self.assertEqual(rec["result"], "succeeded") + self.assertEqual(rec["profile_name"], "gitea-author") + self.assertEqual(rec["authenticated_username"], "author-bot") + self.assertEqual(rec["issue_number"], 11) + self.assertEqual(rec["request_metadata"]["title"], "Add thing") + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_create_issue_failure_audited(self, _auth, mock_api): + mock_api.side_effect = [ + RuntimeError("HTTP 500: boom"), + {"login": "author-bot"}, # identity lookup for the audit record + ] + with patch.dict(os.environ, self._env(), clear=True): + with self.assertRaises(RuntimeError): + gitea_create_issue(title="X", remote="prgs") + recs = self._records() + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0]["result"], "failed") + self.assertIn("HTTP 500", recs[0]["reason"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_close_issue_audited(self, _auth, mock_api): + mock_api.side_effect = [{"state": "closed"}, {"login": "mgr-bot"}] + with patch.dict(os.environ, self._env(), clear=True): + gitea_close_issue(issue_number=42, remote="prgs") + recs = self._records() + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0]["action"], "close_issue") + self.assertEqual(recs[0]["issue_number"], 42) + self.assertEqual(recs[0]["authenticated_username"], "mgr-bot") + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_disabled_writes_nothing_and_no_extra_call(self, _auth, mock_api): + # No GITEA_AUDIT_LOG -> audit is a no-op: exactly one API call, no file. + mock_api.return_value = {"number": 1, "html_url": "http://x/1"} + with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True): + gitea_create_issue(title="x", remote="prgs") + mock_api.assert_called_once() + self.assertEqual(self._records(), []) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_secrets_never_written(self, _auth, mock_api): + mock_api.side_effect = [ + {"number": 3, "html_url": "http://x/3"}, + {"login": "author-bot"}, + ] + env = self._env(GITEA_TOKEN="super-secret-token") + with patch.dict(os.environ, env, clear=True): + gitea_create_issue(title="t", remote="prgs") + with open(self.audit_path, encoding="utf-8") as fh: + blob = fh.read().lower() + for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()): + self.assertNotIn(secret, blob) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_audit_failure_never_breaks_action(self, _auth, mock_api): + mock_api.side_effect = [ + {"number": 9, "html_url": "http://x/9"}, + {"login": "author-bot"}, + ] + with patch.dict(os.environ, self._env(), clear=True): + with patch("gitea_audit.write_event", side_effect=RuntimeError("disk full")): + result = gitea_create_issue(title="t", remote="prgs") + # The mutation result is returned even though the sink write blew up. + self.assertEqual(result["number"], 9) + + +class TestGatedToolAudit(_AuditWiringBase): + + def _pr(self, author, state="open", sha="abc123", mergeable=True): + return {"user": {"login": author}, "state": state, + "head": {"sha": sha}, "mergeable": mergeable} + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_merge_success_audited(self, _auth, mock_api): + # user, pr, merge POST, readback — no extra identity call (uses result). + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot"), + {}, {"merged_commit_sha": "c1"}, + ] + env = self._env(GITEA_PROFILE_NAME="gitea-merger", + GITEA_ALLOWED_OPERATIONS="read,merge") + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", + expected_head_sha="abc123", remote="prgs") + self.assertTrue(r["performed"]) + recs = self._records() + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0]["action"], "merge_pr") + self.assertEqual(recs[0]["result"], "succeeded") + self.assertEqual(recs[0]["authenticated_username"], "merger-bot") + self.assertEqual(recs[0]["pr_number"], 8) + self.assertEqual(recs[0]["head_sha"], "abc123") + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_merge_blocked_audited(self, _auth, mock_api): + # Self-author merge is blocked; must still be recorded as blocked. + mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")] + env = self._env(GITEA_PROFILE_NAME="gitea-merger", + GITEA_ALLOWED_OPERATIONS="read,merge") + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", remote="prgs") + self.assertFalse(r["performed"]) + recs = self._records() + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0]["result"], "blocked") + self.assertEqual(recs[0]["authenticated_username"], "jcwalker3") + self.assertIn("PR author", recs[0]["reason"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_submit_review_success_audited(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7}, + ] + env = self._env(GITEA_PROFILE_NAME="gitea-reviewer", + GITEA_ALLOWED_OPERATIONS="read,review,approve") + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review(pr_number=8, action="approve", + body="LGTM", remote="prgs") + self.assertTrue(r["performed"]) + recs = self._records() + self.assertEqual(len(recs), 1) + self.assertEqual(recs[0]["action"], "submit_pr_review") + self.assertEqual(recs[0]["result"], "succeeded") + self.assertEqual(recs[0]["authenticated_username"], "reviewer-bot") + + +if __name__ == "__main__": + unittest.main() -- 2.43.7