Files
sysadmin c3c48fb7c2 feat: audit-log Gitea MCP mutating actions with profile metadata (#18)
Add durable, opt-in audit logging for every mutating Gitea MCP action so an
operator can see which execution profile and authenticated Gitea user
performed (or was blocked from / failed) each mutation.

- New gitea_audit.py: pure, no-network module — recursive secret redaction
  (token/password/authorization keys; token/Basic/Bearer value runs),
  build_event (timestamp, action, result, profile, audit label, authenticated
  username, repo, issue/PR, target branch, head SHA, redacted request
  metadata), and an append-only JSON Lines sink.
- mcp_server.py: _audit helper + _audited context manager (simple mutations)
  and an _audit_pr_result decorator (gated review/merge tools, reading their
  own result dict) wired into create_issue, create_pr, edit_pr, close_issue,
  commit_files, delete_branch, create_label, set_issue_labels, mark_issue
  (label/unlabel), gitea_submit_pr_review, and gitea_merge_pr.
- Outcomes recorded as allowed/blocked/failed/succeeded; blocked and failed
  eligibility checks are logged, not just successes.

Off by default: records are written only when GITEA_AUDIT_LOG is set. When it
is unset every audit path short-circuits — no records, no extra API calls — so
existing tool behaviour and API call sequences are unchanged. Auditing never
raises; sink writes are best-effort. Tokens are never written.

Docs: README env table + audit note, .env.example placeholder.
Tests: tests/test_audit.py (19 cases) — redaction, event build, sink writes,
per-tool success/failure/blocked records, secret-free output, off-by-default
no-op, and audit-failure-never-breaks-action.

Closes #18

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 22:20:51 -04:00

303 lines
13 KiB
Python

"""Tests for Gitea MCP mutating-action audit logging (issue #18).
Covers the pure audit module (redaction, event building, sink writes) and the
wiring in mcp_server: mutating tools emit one record with profile +
authenticated username + outcome, secrets are redacted, and — critically —
auditing is a no-op (no records, no extra API calls) when GITEA_AUDIT_LOG is
unset so existing behaviour is unchanged.
"""
import os
import sys
import json
import datetime
import tempfile
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_audit # noqa: E402
import mcp_server # noqa: E402
from mcp_server import ( # noqa: E402
gitea_create_issue,
gitea_close_issue,
gitea_merge_pr,
gitea_submit_pr_review,
)
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
FIXED_TS = datetime.datetime(2026, 7, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
# ---------------------------------------------------------------------------
# Pure audit module
# ---------------------------------------------------------------------------
class TestRedaction(unittest.TestCase):
def test_redacts_secret_keys(self):
red = gitea_audit.redact({"token": "abc", "title": "hi", "Password": "x"})
self.assertEqual(red["token"], gitea_audit.REDACTED)
self.assertEqual(red["Password"], gitea_audit.REDACTED)
self.assertEqual(red["title"], "hi")
def test_redacts_nested_and_lists(self):
red = gitea_audit.redact({"outer": {"authorization": "Basic xyz"},
"items": [{"secret": "s"}, "plain"]})
self.assertEqual(red["outer"]["authorization"], gitea_audit.REDACTED)
self.assertEqual(red["items"][0]["secret"], gitea_audit.REDACTED)
self.assertEqual(red["items"][1], "plain")
def test_redacts_credential_value_prefixes(self):
self.assertEqual(
gitea_audit._redact_str("failed: token abc-123 rejected"),
"failed: token [REDACTED] rejected",
)
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Basic Zm9v"))
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Bearer tok99"))
def test_non_string_untouched(self):
self.assertEqual(gitea_audit.redact(42), 42)
self.assertIsNone(gitea_audit.redact(None))
class TestBuildEvent(unittest.TestCase):
def test_core_fields_and_injected_timestamp(self):
ev = gitea_audit.build_event(
action="create_issue", result=gitea_audit.SUCCEEDED,
remote="prgs", server="https://gitea.prgs.cc", repository="Repo",
issue_number=5, profile_name="gitea-author", audit_label="author-rt",
authenticated_username="bot", now=FIXED_TS,
)
self.assertEqual(ev["timestamp"], "2026-07-01T12:00:00+00:00")
self.assertEqual(ev["action"], "create_issue")
self.assertEqual(ev["action_type"], "mutating")
self.assertEqual(ev["result"], "succeeded")
self.assertEqual(ev["profile_name"], "gitea-author")
self.assertEqual(ev["audit_label"], "author-rt")
self.assertEqual(ev["authenticated_username"], "bot")
self.assertEqual(ev["issue_number"], 5)
def test_metadata_and_reason_redacted(self):
ev = gitea_audit.build_event(
action="create_pr", result=gitea_audit.FAILED,
reason="HTTP 500: token secret-xyz bad",
request_metadata={"title": "t", "token": "leak"}, now=FIXED_TS,
)
self.assertNotIn("secret-xyz", ev["reason"])
self.assertEqual(ev["request_metadata"]["token"], gitea_audit.REDACTED)
self.assertEqual(ev["request_metadata"]["title"], "t")
class TestSink(unittest.TestCase):
def test_enabled_reflects_env(self):
with patch.dict(os.environ, {}, clear=True):
self.assertFalse(gitea_audit.audit_enabled())
with patch.dict(os.environ, {"GITEA_AUDIT_LOG": "/tmp/x.log"}, clear=True):
self.assertTrue(gitea_audit.audit_enabled())
def test_write_event_appends_json_lines(self):
with tempfile.TemporaryDirectory() as d:
path = os.path.join(d, "audit.log")
self.assertTrue(gitea_audit.write_event({"a": 1}, path=path))
self.assertTrue(gitea_audit.write_event({"b": 2}, path=path))
with open(path, encoding="utf-8") as fh:
lines = fh.read().splitlines()
self.assertEqual(len(lines), 2)
self.assertEqual(json.loads(lines[0])["a"], 1)
self.assertEqual(json.loads(lines[1])["b"], 2)
def test_write_event_noop_without_path(self):
with patch.dict(os.environ, {}, clear=True):
self.assertFalse(gitea_audit.write_event({"a": 1}))
def test_write_event_never_raises_on_bad_path(self):
# A path inside a non-existent directory cannot be opened; must not raise.
self.assertFalse(gitea_audit.write_event({"a": 1}, path="/no/such/dir/x.log"))
# ---------------------------------------------------------------------------
# mcp_server wiring
# ---------------------------------------------------------------------------
class _AuditWiringBase(unittest.TestCase):
def setUp(self):
self._dir = tempfile.TemporaryDirectory()
self.audit_path = os.path.join(self._dir.name, "audit.log")
# Identity cache is process-global; clear so lookups are deterministic.
mcp_server._IDENTITY_CACHE.clear()
def tearDown(self):
mcp_server._IDENTITY_CACHE.clear()
self._dir.cleanup()
def _env(self, **extra):
env = {"GITEA_AUDIT_LOG": self.audit_path,
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
env.update(extra)
return env
def _records(self):
if not os.path.exists(self.audit_path):
return []
with open(self.audit_path, encoding="utf-8") as fh:
return [json.loads(line) for line in fh if line.strip()]
class TestSimpleToolAudit(_AuditWiringBase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_issue_success_audited(self, _auth, mock_api):
# 1: create POST result, 2: identity /user lookup for the audit record.
mock_api.side_effect = [
{"number": 11, "html_url": "https://gitea.prgs.cc/issues/11"},
{"login": "author-bot"},
]
with patch.dict(os.environ, self._env(), clear=True):
result = gitea_create_issue(title="Add thing", remote="prgs")
self.assertEqual(result["number"], 11)
recs = self._records()
self.assertEqual(len(recs), 1)
rec = recs[0]
self.assertEqual(rec["action"], "create_issue")
self.assertEqual(rec["result"], "succeeded")
self.assertEqual(rec["profile_name"], "gitea-author")
self.assertEqual(rec["authenticated_username"], "author-bot")
self.assertEqual(rec["issue_number"], 11)
self.assertEqual(rec["request_metadata"]["title"], "Add thing")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_issue_failure_audited(self, _auth, mock_api):
mock_api.side_effect = [
RuntimeError("HTTP 500: boom"),
{"login": "author-bot"}, # identity lookup for the audit record
]
with patch.dict(os.environ, self._env(), clear=True):
with self.assertRaises(RuntimeError):
gitea_create_issue(title="X", remote="prgs")
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["result"], "failed")
self.assertIn("HTTP 500", recs[0]["reason"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_close_issue_audited(self, _auth, mock_api):
mock_api.side_effect = [{"state": "closed"}, {"login": "mgr-bot"}]
with patch.dict(os.environ, self._env(), clear=True):
gitea_close_issue(issue_number=42, remote="prgs")
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["action"], "close_issue")
self.assertEqual(recs[0]["issue_number"], 42)
self.assertEqual(recs[0]["authenticated_username"], "mgr-bot")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_disabled_writes_nothing_and_no_extra_call(self, _auth, mock_api):
# No GITEA_AUDIT_LOG -> audit is a no-op: exactly one API call, no file.
mock_api.return_value = {"number": 1, "html_url": "http://x/1"}
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
gitea_create_issue(title="x", remote="prgs")
mock_api.assert_called_once()
self.assertEqual(self._records(), [])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_secrets_never_written(self, _auth, mock_api):
mock_api.side_effect = [
{"number": 3, "html_url": "http://x/3"},
{"login": "author-bot"},
]
env = self._env(GITEA_TOKEN="super-secret-token")
with patch.dict(os.environ, env, clear=True):
gitea_create_issue(title="t", remote="prgs")
with open(self.audit_path, encoding="utf-8") as fh:
blob = fh.read().lower()
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_audit_failure_never_breaks_action(self, _auth, mock_api):
mock_api.side_effect = [
{"number": 9, "html_url": "http://x/9"},
{"login": "author-bot"},
]
with patch.dict(os.environ, self._env(), clear=True):
with patch("gitea_audit.write_event", side_effect=RuntimeError("disk full")):
result = gitea_create_issue(title="t", remote="prgs")
# The mutation result is returned even though the sink write blew up.
self.assertEqual(result["number"], 9)
class TestGatedToolAudit(_AuditWiringBase):
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {"user": {"login": author}, "state": state,
"head": {"sha": sha}, "mergeable": mergeable}
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_success_audited(self, _auth, mock_api):
# user, pr, merge POST, readback — no extra identity call (uses result).
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
{}, {"merged_commit_sha": "c1"},
]
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
GITEA_ALLOWED_OPERATIONS="read,merge")
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8",
expected_head_sha="abc123", remote="prgs")
self.assertTrue(r["performed"])
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["action"], "merge_pr")
self.assertEqual(recs[0]["result"], "succeeded")
self.assertEqual(recs[0]["authenticated_username"], "merger-bot")
self.assertEqual(recs[0]["pr_number"], 8)
self.assertEqual(recs[0]["head_sha"], "abc123")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_blocked_audited(self, _auth, mock_api):
# Self-author merge is blocked; must still be recorded as blocked.
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
GITEA_ALLOWED_OPERATIONS="read,merge")
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", remote="prgs")
self.assertFalse(r["performed"])
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["result"], "blocked")
self.assertEqual(recs[0]["authenticated_username"], "jcwalker3")
self.assertIn("PR author", recs[0]["reason"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_submit_review_success_audited(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
]
env = self._env(GITEA_PROFILE_NAME="gitea-reviewer",
GITEA_ALLOWED_OPERATIONS="read,review,approve")
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(pr_number=8, action="approve",
body="LGTM", remote="prgs")
self.assertTrue(r["performed"])
recs = self._records()
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0]["action"], "submit_pr_review")
self.assertEqual(recs[0]["result"], "succeeded")
self.assertEqual(recs[0]["authenticated_username"], "reviewer-bot")
if __name__ == "__main__":
unittest.main()