"""Tests for Gitea MCP mutating-action audit logging (issue #18). Covers the pure audit module (redaction, event building, sink writes) and the wiring in mcp_server: mutating tools emit one record with profile + authenticated username + outcome, secrets are redacted, and — critically — auditing is a no-op (no records, no extra API calls) when GITEA_AUDIT_LOG is unset so existing behaviour is unchanged. """ import os import sys import json import datetime import tempfile import unittest from unittest.mock import patch sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) import gitea_audit # noqa: E402 import mcp_server # noqa: E402 from mcp_server import ( # noqa: E402 gitea_create_issue, gitea_close_issue, gitea_merge_pr, gitea_submit_pr_review, ) FAKE_AUTH = "Basic dGVzdDp0ZXN0" FIXED_TS = datetime.datetime(2026, 7, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Pure audit module # --------------------------------------------------------------------------- class TestRedaction(unittest.TestCase): def test_redacts_secret_keys(self): red = gitea_audit.redact({"token": "abc", "title": "hi", "Password": "x"}) self.assertEqual(red["token"], gitea_audit.REDACTED) self.assertEqual(red["Password"], gitea_audit.REDACTED) self.assertEqual(red["title"], "hi") def test_redacts_nested_and_lists(self): red = gitea_audit.redact({"outer": {"authorization": "Basic xyz"}, "items": [{"secret": "s"}, "plain"]}) self.assertEqual(red["outer"]["authorization"], gitea_audit.REDACTED) self.assertEqual(red["items"][0]["secret"], gitea_audit.REDACTED) self.assertEqual(red["items"][1], "plain") def test_redacts_credential_value_prefixes(self): self.assertEqual( gitea_audit._redact_str("failed: token abc-123 rejected"), "failed: token [REDACTED] rejected", ) self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Basic Zm9v")) self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Bearer tok99")) def test_non_string_untouched(self): self.assertEqual(gitea_audit.redact(42), 42) self.assertIsNone(gitea_audit.redact(None)) class TestBuildEvent(unittest.TestCase): def test_core_fields_and_injected_timestamp(self): ev = gitea_audit.build_event( action="create_issue", result=gitea_audit.SUCCEEDED, remote="prgs", server="https://gitea.prgs.cc", repository="Repo", issue_number=5, profile_name="gitea-author", audit_label="author-rt", authenticated_username="bot", now=FIXED_TS, ) self.assertEqual(ev["timestamp"], "2026-07-01T12:00:00+00:00") self.assertEqual(ev["action"], "create_issue") self.assertEqual(ev["action_type"], "mutating") self.assertEqual(ev["result"], "succeeded") self.assertEqual(ev["profile_name"], "gitea-author") self.assertEqual(ev["audit_label"], "author-rt") self.assertEqual(ev["authenticated_username"], "bot") self.assertEqual(ev["issue_number"], 5) def test_metadata_and_reason_redacted(self): ev = gitea_audit.build_event( action="create_pr", result=gitea_audit.FAILED, reason="HTTP 500: token secret-xyz bad", request_metadata={"title": "t", "token": "leak"}, now=FIXED_TS, ) self.assertNotIn("secret-xyz", ev["reason"]) self.assertEqual(ev["request_metadata"]["token"], gitea_audit.REDACTED) self.assertEqual(ev["request_metadata"]["title"], "t") class TestSink(unittest.TestCase): def test_enabled_reflects_env(self): with patch.dict(os.environ, {}, clear=True): self.assertFalse(gitea_audit.audit_enabled()) with patch.dict(os.environ, {"GITEA_AUDIT_LOG": "/tmp/x.log"}, clear=True): self.assertTrue(gitea_audit.audit_enabled()) def test_write_event_appends_json_lines(self): with tempfile.TemporaryDirectory() as d: path = os.path.join(d, "audit.log") self.assertTrue(gitea_audit.write_event({"a": 1}, path=path)) self.assertTrue(gitea_audit.write_event({"b": 2}, path=path)) with open(path, encoding="utf-8") as fh: lines = fh.read().splitlines() self.assertEqual(len(lines), 2) self.assertEqual(json.loads(lines[0])["a"], 1) self.assertEqual(json.loads(lines[1])["b"], 2) def test_write_event_noop_without_path(self): with patch.dict(os.environ, {}, clear=True): self.assertFalse(gitea_audit.write_event({"a": 1})) def test_write_event_never_raises_on_bad_path(self): # A path inside a non-existent directory cannot be opened; must not raise. self.assertFalse(gitea_audit.write_event({"a": 1}, path="/no/such/dir/x.log")) # --------------------------------------------------------------------------- # mcp_server wiring # --------------------------------------------------------------------------- class _AuditWiringBase(unittest.TestCase): def setUp(self): self._dir = tempfile.TemporaryDirectory() self.audit_path = os.path.join(self._dir.name, "audit.log") # Identity cache is process-global; clear so lookups are deterministic. mcp_server._IDENTITY_CACHE.clear() def tearDown(self): mcp_server._IDENTITY_CACHE.clear() self._dir.cleanup() def _env(self, **extra): env = {"GITEA_AUDIT_LOG": self.audit_path, "GITEA_PROFILE_NAME": "gitea-author", "GITEA_ALLOWED_OPERATIONS": "read,merge"} env.update(extra) return env def _records(self): if not os.path.exists(self.audit_path): return [] with open(self.audit_path, encoding="utf-8") as fh: return [json.loads(line) for line in fh if line.strip()] class TestSimpleToolAudit(_AuditWiringBase): @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_create_issue_success_audited(self, _auth, mock_api): # 1: create POST result, 2: identity /user lookup for the audit record. mock_api.side_effect = [ {"number": 11, "html_url": "https://gitea.prgs.cc/issues/11"}, {"login": "author-bot"}, ] with patch.dict(os.environ, self._env(), clear=True): result = gitea_create_issue(title="Add thing", remote="prgs") self.assertEqual(result["number"], 11) recs = self._records() self.assertEqual(len(recs), 1) rec = recs[0] self.assertEqual(rec["action"], "create_issue") self.assertEqual(rec["result"], "succeeded") self.assertEqual(rec["profile_name"], "gitea-author") self.assertEqual(rec["authenticated_username"], "author-bot") self.assertEqual(rec["issue_number"], 11) self.assertEqual(rec["request_metadata"]["title"], "Add thing") @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_create_issue_failure_audited(self, _auth, mock_api): mock_api.side_effect = [ RuntimeError("HTTP 500: boom"), {"login": "author-bot"}, # identity lookup for the audit record ] with patch.dict(os.environ, self._env(), clear=True): with self.assertRaises(RuntimeError): gitea_create_issue(title="X", remote="prgs") recs = self._records() self.assertEqual(len(recs), 1) self.assertEqual(recs[0]["result"], "failed") self.assertIn("HTTP 500", recs[0]["reason"]) @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_close_issue_audited(self, _auth, mock_api): mock_api.side_effect = [{"state": "closed"}, {"login": "mgr-bot"}] with patch.dict(os.environ, self._env(), clear=True): gitea_close_issue(issue_number=42, remote="prgs") recs = self._records() self.assertEqual(len(recs), 1) self.assertEqual(recs[0]["action"], "close_issue") self.assertEqual(recs[0]["issue_number"], 42) self.assertEqual(recs[0]["authenticated_username"], "mgr-bot") @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_disabled_writes_nothing_and_no_extra_call(self, _auth, mock_api): # No GITEA_AUDIT_LOG -> audit is a no-op: exactly one API call, no file. mock_api.return_value = {"number": 1, "html_url": "http://x/1"} with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True): gitea_create_issue(title="x", remote="prgs") mock_api.assert_called_once() self.assertEqual(self._records(), []) @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_secrets_never_written(self, _auth, mock_api): mock_api.side_effect = [ {"number": 3, "html_url": "http://x/3"}, {"login": "author-bot"}, ] env = self._env(GITEA_TOKEN="super-secret-token") with patch.dict(os.environ, env, clear=True): gitea_create_issue(title="t", remote="prgs") with open(self.audit_path, encoding="utf-8") as fh: blob = fh.read().lower() for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()): self.assertNotIn(secret, blob) @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_audit_failure_never_breaks_action(self, _auth, mock_api): mock_api.side_effect = [ {"number": 9, "html_url": "http://x/9"}, {"login": "author-bot"}, ] with patch.dict(os.environ, self._env(), clear=True): with patch("gitea_audit.write_event", side_effect=RuntimeError("disk full")): result = gitea_create_issue(title="t", remote="prgs") # The mutation result is returned even though the sink write blew up. self.assertEqual(result["number"], 9) class TestGatedToolAudit(_AuditWiringBase): def _pr(self, author, state="open", sha="abc123", mergeable=True): return {"user": {"login": author}, "state": state, "head": {"sha": sha}, "mergeable": mergeable} @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_merge_success_audited(self, _auth, mock_api): # user, pr, merge POST, readback — no extra identity call (uses result). mock_api.side_effect = [ {"login": "merger-bot"}, self._pr("author-bot"), {}, {"merged_commit_sha": "c1"}, ] env = self._env(GITEA_PROFILE_NAME="gitea-merger", GITEA_ALLOWED_OPERATIONS="read,merge") with patch.dict(os.environ, env, clear=True): r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", expected_head_sha="abc123", remote="prgs") self.assertTrue(r["performed"]) recs = self._records() self.assertEqual(len(recs), 1) self.assertEqual(recs[0]["action"], "merge_pr") self.assertEqual(recs[0]["result"], "succeeded") self.assertEqual(recs[0]["authenticated_username"], "merger-bot") self.assertEqual(recs[0]["pr_number"], 8) self.assertEqual(recs[0]["head_sha"], "abc123") @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_merge_blocked_audited(self, _auth, mock_api): # Self-author merge is blocked; must still be recorded as blocked. mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")] env = self._env(GITEA_PROFILE_NAME="gitea-merger", GITEA_ALLOWED_OPERATIONS="read,merge") with patch.dict(os.environ, env, clear=True): r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", remote="prgs") self.assertFalse(r["performed"]) recs = self._records() self.assertEqual(len(recs), 1) self.assertEqual(recs[0]["result"], "blocked") self.assertEqual(recs[0]["authenticated_username"], "jcwalker3") self.assertIn("PR author", recs[0]["reason"]) @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_submit_review_success_audited(self, _auth, mock_api): mock_api.side_effect = [ {"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7}, ] env = self._env(GITEA_PROFILE_NAME="gitea-reviewer", GITEA_ALLOWED_OPERATIONS="read,review,approve") with patch.dict(os.environ, env, clear=True): r = gitea_submit_pr_review(pr_number=8, action="approve", body="LGTM", remote="prgs") self.assertTrue(r["performed"]) recs = self._records() self.assertEqual(len(recs), 1) self.assertEqual(recs[0]["action"], "submit_pr_review") self.assertEqual(recs[0]["result"], "succeeded") self.assertEqual(recs[0]["authenticated_username"], "reviewer-bot") if __name__ == "__main__": unittest.main()