c3c48fb7c2
Add durable, opt-in audit logging for every mutating Gitea MCP action so an operator can see which execution profile and authenticated Gitea user performed (or was blocked from / failed) each mutation. - New gitea_audit.py: pure, no-network module — recursive secret redaction (token/password/authorization keys; token/Basic/Bearer value runs), build_event (timestamp, action, result, profile, audit label, authenticated username, repo, issue/PR, target branch, head SHA, redacted request metadata), and an append-only JSON Lines sink. - mcp_server.py: _audit helper + _audited context manager (simple mutations) and an _audit_pr_result decorator (gated review/merge tools, reading their own result dict) wired into create_issue, create_pr, edit_pr, close_issue, commit_files, delete_branch, create_label, set_issue_labels, mark_issue (label/unlabel), gitea_submit_pr_review, and gitea_merge_pr. - Outcomes recorded as allowed/blocked/failed/succeeded; blocked and failed eligibility checks are logged, not just successes. Off by default: records are written only when GITEA_AUDIT_LOG is set. When it is unset every audit path short-circuits — no records, no extra API calls — so existing tool behaviour and API call sequences are unchanged. Auditing never raises; sink writes are best-effort. Tokens are never written. Docs: README env table + audit note, .env.example placeholder. Tests: tests/test_audit.py (19 cases) — redaction, event build, sink writes, per-tool success/failure/blocked records, secret-free output, off-by-default no-op, and audit-failure-never-breaks-action. Closes #18 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
303 lines
13 KiB
Python
303 lines
13 KiB
Python
"""Tests for Gitea MCP mutating-action audit logging (issue #18).
|
|
|
|
Covers the pure audit module (redaction, event building, sink writes) and the
|
|
wiring in mcp_server: mutating tools emit one record with profile +
|
|
authenticated username + outcome, secrets are redacted, and — critically —
|
|
auditing is a no-op (no records, no extra API calls) when GITEA_AUDIT_LOG is
|
|
unset so existing behaviour is unchanged.
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import datetime
|
|
import tempfile
|
|
import unittest
|
|
from unittest.mock import patch
|
|
|
|
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
|
|
|
import gitea_audit # noqa: E402
|
|
import mcp_server # noqa: E402
|
|
from mcp_server import ( # noqa: E402
|
|
gitea_create_issue,
|
|
gitea_close_issue,
|
|
gitea_merge_pr,
|
|
gitea_submit_pr_review,
|
|
)
|
|
|
|
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
|
|
FIXED_TS = datetime.datetime(2026, 7, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pure audit module
|
|
# ---------------------------------------------------------------------------
|
|
class TestRedaction(unittest.TestCase):
|
|
|
|
def test_redacts_secret_keys(self):
|
|
red = gitea_audit.redact({"token": "abc", "title": "hi", "Password": "x"})
|
|
self.assertEqual(red["token"], gitea_audit.REDACTED)
|
|
self.assertEqual(red["Password"], gitea_audit.REDACTED)
|
|
self.assertEqual(red["title"], "hi")
|
|
|
|
def test_redacts_nested_and_lists(self):
|
|
red = gitea_audit.redact({"outer": {"authorization": "Basic xyz"},
|
|
"items": [{"secret": "s"}, "plain"]})
|
|
self.assertEqual(red["outer"]["authorization"], gitea_audit.REDACTED)
|
|
self.assertEqual(red["items"][0]["secret"], gitea_audit.REDACTED)
|
|
self.assertEqual(red["items"][1], "plain")
|
|
|
|
def test_redacts_credential_value_prefixes(self):
|
|
self.assertEqual(
|
|
gitea_audit._redact_str("failed: token abc-123 rejected"),
|
|
"failed: token [REDACTED] rejected",
|
|
)
|
|
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Basic Zm9v"))
|
|
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Bearer tok99"))
|
|
|
|
def test_non_string_untouched(self):
|
|
self.assertEqual(gitea_audit.redact(42), 42)
|
|
self.assertIsNone(gitea_audit.redact(None))
|
|
|
|
|
|
class TestBuildEvent(unittest.TestCase):
|
|
|
|
def test_core_fields_and_injected_timestamp(self):
|
|
ev = gitea_audit.build_event(
|
|
action="create_issue", result=gitea_audit.SUCCEEDED,
|
|
remote="prgs", server="https://gitea.prgs.cc", repository="Repo",
|
|
issue_number=5, profile_name="gitea-author", audit_label="author-rt",
|
|
authenticated_username="bot", now=FIXED_TS,
|
|
)
|
|
self.assertEqual(ev["timestamp"], "2026-07-01T12:00:00+00:00")
|
|
self.assertEqual(ev["action"], "create_issue")
|
|
self.assertEqual(ev["action_type"], "mutating")
|
|
self.assertEqual(ev["result"], "succeeded")
|
|
self.assertEqual(ev["profile_name"], "gitea-author")
|
|
self.assertEqual(ev["audit_label"], "author-rt")
|
|
self.assertEqual(ev["authenticated_username"], "bot")
|
|
self.assertEqual(ev["issue_number"], 5)
|
|
|
|
def test_metadata_and_reason_redacted(self):
|
|
ev = gitea_audit.build_event(
|
|
action="create_pr", result=gitea_audit.FAILED,
|
|
reason="HTTP 500: token secret-xyz bad",
|
|
request_metadata={"title": "t", "token": "leak"}, now=FIXED_TS,
|
|
)
|
|
self.assertNotIn("secret-xyz", ev["reason"])
|
|
self.assertEqual(ev["request_metadata"]["token"], gitea_audit.REDACTED)
|
|
self.assertEqual(ev["request_metadata"]["title"], "t")
|
|
|
|
|
|
class TestSink(unittest.TestCase):
|
|
|
|
def test_enabled_reflects_env(self):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
self.assertFalse(gitea_audit.audit_enabled())
|
|
with patch.dict(os.environ, {"GITEA_AUDIT_LOG": "/tmp/x.log"}, clear=True):
|
|
self.assertTrue(gitea_audit.audit_enabled())
|
|
|
|
def test_write_event_appends_json_lines(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = os.path.join(d, "audit.log")
|
|
self.assertTrue(gitea_audit.write_event({"a": 1}, path=path))
|
|
self.assertTrue(gitea_audit.write_event({"b": 2}, path=path))
|
|
with open(path, encoding="utf-8") as fh:
|
|
lines = fh.read().splitlines()
|
|
self.assertEqual(len(lines), 2)
|
|
self.assertEqual(json.loads(lines[0])["a"], 1)
|
|
self.assertEqual(json.loads(lines[1])["b"], 2)
|
|
|
|
def test_write_event_noop_without_path(self):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
self.assertFalse(gitea_audit.write_event({"a": 1}))
|
|
|
|
def test_write_event_never_raises_on_bad_path(self):
|
|
# A path inside a non-existent directory cannot be opened; must not raise.
|
|
self.assertFalse(gitea_audit.write_event({"a": 1}, path="/no/such/dir/x.log"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# mcp_server wiring
|
|
# ---------------------------------------------------------------------------
|
|
class _AuditWiringBase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self._dir = tempfile.TemporaryDirectory()
|
|
self.audit_path = os.path.join(self._dir.name, "audit.log")
|
|
# Identity cache is process-global; clear so lookups are deterministic.
|
|
mcp_server._IDENTITY_CACHE.clear()
|
|
|
|
def tearDown(self):
|
|
mcp_server._IDENTITY_CACHE.clear()
|
|
self._dir.cleanup()
|
|
|
|
def _env(self, **extra):
|
|
env = {"GITEA_AUDIT_LOG": self.audit_path,
|
|
"GITEA_PROFILE_NAME": "gitea-author",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
env.update(extra)
|
|
return env
|
|
|
|
def _records(self):
|
|
if not os.path.exists(self.audit_path):
|
|
return []
|
|
with open(self.audit_path, encoding="utf-8") as fh:
|
|
return [json.loads(line) for line in fh if line.strip()]
|
|
|
|
|
|
class TestSimpleToolAudit(_AuditWiringBase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_create_issue_success_audited(self, _auth, mock_api):
|
|
# 1: create POST result, 2: identity /user lookup for the audit record.
|
|
mock_api.side_effect = [
|
|
{"number": 11, "html_url": "https://gitea.prgs.cc/issues/11"},
|
|
{"login": "author-bot"},
|
|
]
|
|
with patch.dict(os.environ, self._env(), clear=True):
|
|
result = gitea_create_issue(title="Add thing", remote="prgs")
|
|
self.assertEqual(result["number"], 11)
|
|
recs = self._records()
|
|
self.assertEqual(len(recs), 1)
|
|
rec = recs[0]
|
|
self.assertEqual(rec["action"], "create_issue")
|
|
self.assertEqual(rec["result"], "succeeded")
|
|
self.assertEqual(rec["profile_name"], "gitea-author")
|
|
self.assertEqual(rec["authenticated_username"], "author-bot")
|
|
self.assertEqual(rec["issue_number"], 11)
|
|
self.assertEqual(rec["request_metadata"]["title"], "Add thing")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_create_issue_failure_audited(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
RuntimeError("HTTP 500: boom"),
|
|
{"login": "author-bot"}, # identity lookup for the audit record
|
|
]
|
|
with patch.dict(os.environ, self._env(), clear=True):
|
|
with self.assertRaises(RuntimeError):
|
|
gitea_create_issue(title="X", remote="prgs")
|
|
recs = self._records()
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0]["result"], "failed")
|
|
self.assertIn("HTTP 500", recs[0]["reason"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_close_issue_audited(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"state": "closed"}, {"login": "mgr-bot"}]
|
|
with patch.dict(os.environ, self._env(), clear=True):
|
|
gitea_close_issue(issue_number=42, remote="prgs")
|
|
recs = self._records()
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0]["action"], "close_issue")
|
|
self.assertEqual(recs[0]["issue_number"], 42)
|
|
self.assertEqual(recs[0]["authenticated_username"], "mgr-bot")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_disabled_writes_nothing_and_no_extra_call(self, _auth, mock_api):
|
|
# No GITEA_AUDIT_LOG -> audit is a no-op: exactly one API call, no file.
|
|
mock_api.return_value = {"number": 1, "html_url": "http://x/1"}
|
|
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
|
|
gitea_create_issue(title="x", remote="prgs")
|
|
mock_api.assert_called_once()
|
|
self.assertEqual(self._records(), [])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_secrets_never_written(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"number": 3, "html_url": "http://x/3"},
|
|
{"login": "author-bot"},
|
|
]
|
|
env = self._env(GITEA_TOKEN="super-secret-token")
|
|
with patch.dict(os.environ, env, clear=True):
|
|
gitea_create_issue(title="t", remote="prgs")
|
|
with open(self.audit_path, encoding="utf-8") as fh:
|
|
blob = fh.read().lower()
|
|
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_audit_failure_never_breaks_action(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"number": 9, "html_url": "http://x/9"},
|
|
{"login": "author-bot"},
|
|
]
|
|
with patch.dict(os.environ, self._env(), clear=True):
|
|
with patch("gitea_audit.write_event", side_effect=RuntimeError("disk full")):
|
|
result = gitea_create_issue(title="t", remote="prgs")
|
|
# The mutation result is returned even though the sink write blew up.
|
|
self.assertEqual(result["number"], 9)
|
|
|
|
|
|
class TestGatedToolAudit(_AuditWiringBase):
|
|
|
|
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
|
return {"user": {"login": author}, "state": state,
|
|
"head": {"sha": sha}, "mergeable": mergeable}
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_merge_success_audited(self, _auth, mock_api):
|
|
# user, pr, merge POST, readback — no extra identity call (uses result).
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot"),
|
|
{}, {"merged_commit_sha": "c1"},
|
|
]
|
|
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
|
|
GITEA_ALLOWED_OPERATIONS="read,merge")
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8",
|
|
expected_head_sha="abc123", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
recs = self._records()
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0]["action"], "merge_pr")
|
|
self.assertEqual(recs[0]["result"], "succeeded")
|
|
self.assertEqual(recs[0]["authenticated_username"], "merger-bot")
|
|
self.assertEqual(recs[0]["pr_number"], 8)
|
|
self.assertEqual(recs[0]["head_sha"], "abc123")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_merge_blocked_audited(self, _auth, mock_api):
|
|
# Self-author merge is blocked; must still be recorded as blocked.
|
|
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
|
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
|
|
GITEA_ALLOWED_OPERATIONS="read,merge")
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
recs = self._records()
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0]["result"], "blocked")
|
|
self.assertEqual(recs[0]["authenticated_username"], "jcwalker3")
|
|
self.assertIn("PR author", recs[0]["reason"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_submit_review_success_audited(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
|
|
]
|
|
env = self._env(GITEA_PROFILE_NAME="gitea-reviewer",
|
|
GITEA_ALLOWED_OPERATIONS="read,review,approve")
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(pr_number=8, action="approve",
|
|
body="LGTM", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
recs = self._records()
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0]["action"], "submit_pr_review")
|
|
self.assertEqual(recs[0]["result"], "succeeded")
|
|
self.assertEqual(recs[0]["authenticated_username"], "reviewer-bot")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|