#18: Audit-log Gitea MCP mutating actions with execution profile metadata #29
@@ -30,6 +30,11 @@ GITEA_FORBIDDEN_OPERATIONS=merge,branch.push
|
||||
# Optional short label attached to this runtime for audit purposes.
|
||||
GITEA_AUDIT_LABEL=reviewer-runtime
|
||||
|
||||
# Optional path to an audit log file (#18). When set, each mutating action
|
||||
# appends one redacted JSON record (profile + authenticated user + outcome).
|
||||
# Leave unset to disable auditing entirely (no records, no extra API calls).
|
||||
GITEA_AUDIT_LOG=/path/to/gitea-mcp-audit.log
|
||||
|
||||
# Optional NAME of the token's source (e.g. an env var name). This is a name
|
||||
# only — never the token value. Surfaced by gitea_get_profile.
|
||||
GITEA_TOKEN_SOURCE=GITEA_TOKEN
|
||||
|
||||
@@ -168,6 +168,7 @@ Recognized environment fields (see [`.env.example`](.env.example) for placeholde
|
||||
| `GITEA_AUDIT_LABEL` | Optional short label for this runtime, for audit purposes. |
|
||||
| `GITEA_TOKEN_SOURCE` | Optional *name* of the token source (e.g. an env var name). A name only — never the token value. |
|
||||
| `GITEA_BASE_URL` | Optional informational base URL. |
|
||||
| `GITEA_AUDIT_LOG` | Optional path to an audit log file. When set, mutating actions append one redacted JSON record each (profile + authenticated user + outcome). Unset ⇒ auditing off (no records, no extra API calls). |
|
||||
|
||||
Notes:
|
||||
|
||||
@@ -180,9 +181,12 @@ Notes:
|
||||
can inspect which runtime it is talking to before deciding to act.
|
||||
- See [`docs/gitea-execution-profiles.md`](docs/gitea-execution-profiles.md) for
|
||||
the full profile model.
|
||||
- **Audit logging:** #16 returns structured gate/merge results but does not add
|
||||
durable audit logging. Durable audit logging for Gitea MCP mutating actions is
|
||||
tracked by #18.
|
||||
- **Audit logging (#18):** mutating actions emit a durable, redacted JSON audit
|
||||
record — timestamp, action, result (`allowed`/`blocked`/`failed`/`succeeded`),
|
||||
profile name + audit label, authenticated username, target repo/issue/PR,
|
||||
branch and head SHA where applicable — when `GITEA_AUDIT_LOG` is set. Auditing
|
||||
is off by default and never adds API calls or breaks the action when off.
|
||||
See [`gitea_audit.py`](gitea_audit.py).
|
||||
</details>
|
||||
|
||||
<details>
|
||||
|
||||
+126
@@ -0,0 +1,126 @@
|
||||
"""Audit logging for Gitea MCP mutating actions (issue #18).
|
||||
|
||||
Emits one structured JSON record per mutating action so an operator can see
|
||||
*which execution profile and authenticated Gitea user* performed (or was
|
||||
blocked from / failed) each mutation.
|
||||
|
||||
Design constraints:
|
||||
|
||||
- **Off by default.** Records are written only when ``GITEA_AUDIT_LOG`` names a
|
||||
file path. With it unset, ``write_event`` is a no-op and callers add zero
|
||||
behaviour — existing tool behaviour and API call patterns are unchanged.
|
||||
- **Never raises.** Auditing must never break the action it records; all sink
|
||||
I/O is best-effort and swallows errors.
|
||||
- **No secrets.** Tokens / Authorization material are redacted from request
|
||||
metadata and reason strings before a record is written.
|
||||
- **No network.** This module performs no HTTP; the caller supplies identity
|
||||
and profile metadata it already resolved.
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import datetime
|
||||
|
||||
# Result states for an audited action.
|
||||
ALLOWED = "allowed"
|
||||
BLOCKED = "blocked"
|
||||
FAILED = "failed"
|
||||
SUCCEEDED = "succeeded"
|
||||
|
||||
REDACTED = "[REDACTED]"
|
||||
|
||||
# A dict key containing any of these (case-insensitive) has its value redacted.
|
||||
_SECRET_KEY_HINTS = ("token", "password", "secret", "authorization", "auth")
|
||||
# A string value starting with one of these has the following run redacted.
|
||||
_SECRET_VALUE_PREFIXES = ("token ", "Basic ", "Bearer ")
|
||||
|
||||
|
||||
def _redact_str(text):
|
||||
"""Redact anything that looks like an Authorization credential in *text*."""
|
||||
if not isinstance(text, str) or not text:
|
||||
return text
|
||||
out = text
|
||||
for prefix in _SECRET_VALUE_PREFIXES:
|
||||
idx = 0
|
||||
while True:
|
||||
i = out.find(prefix, idx)
|
||||
if i == -1:
|
||||
break
|
||||
j = i + len(prefix)
|
||||
while j < len(out) and not out[j].isspace():
|
||||
j += 1
|
||||
out = out[:i] + prefix + REDACTED + out[j:]
|
||||
idx = i + len(prefix) + len(REDACTED)
|
||||
return out
|
||||
|
||||
|
||||
def redact(value):
|
||||
"""Recursively redact secret-looking keys/values from a JSON-able value."""
|
||||
if isinstance(value, dict):
|
||||
result = {}
|
||||
for k, v in value.items():
|
||||
if isinstance(k, str) and any(h in k.lower() for h in _SECRET_KEY_HINTS):
|
||||
result[k] = REDACTED
|
||||
else:
|
||||
result[k] = redact(v)
|
||||
return result
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [redact(v) for v in value]
|
||||
if isinstance(value, str):
|
||||
return _redact_str(value)
|
||||
return value
|
||||
|
||||
|
||||
def audit_log_path():
|
||||
"""Return the configured audit log file path, or None if auditing is off."""
|
||||
return (os.environ.get("GITEA_AUDIT_LOG") or "").strip() or None
|
||||
|
||||
|
||||
def audit_enabled():
|
||||
"""True when a sink is configured. When False, callers should skip auditing."""
|
||||
return audit_log_path() is not None
|
||||
|
||||
|
||||
def build_event(*, action, result, remote=None, server=None, repository=None,
|
||||
issue_number=None, pr_number=None, profile_name=None,
|
||||
audit_label=None, authenticated_username=None, target_branch=None,
|
||||
head_sha=None, reason=None, request_metadata=None, now=None):
|
||||
"""Build a redacted, JSON-able audit record for a mutating action."""
|
||||
ts = now or datetime.datetime.now(datetime.timezone.utc)
|
||||
if isinstance(ts, datetime.datetime):
|
||||
ts = ts.isoformat()
|
||||
return {
|
||||
"timestamp": ts,
|
||||
"action": action,
|
||||
"action_type": "mutating",
|
||||
"result": result,
|
||||
"remote": remote,
|
||||
"server": server,
|
||||
"repository": repository,
|
||||
"issue_number": issue_number,
|
||||
"pr_number": pr_number,
|
||||
"profile_name": profile_name,
|
||||
"audit_label": audit_label,
|
||||
"authenticated_username": authenticated_username,
|
||||
"target_branch": target_branch,
|
||||
"head_sha": head_sha,
|
||||
"reason": _redact_str(reason) if reason else reason,
|
||||
"request_metadata": redact(request_metadata) if request_metadata is not None else None,
|
||||
}
|
||||
|
||||
|
||||
def write_event(event, path=None):
|
||||
"""Append *event* as one JSON line to the audit sink. Never raises.
|
||||
|
||||
Returns True if a line was written, False if auditing is off or the write
|
||||
failed (auditing is best-effort and must not break the caller).
|
||||
"""
|
||||
path = path or audit_log_path()
|
||||
if not path:
|
||||
return False
|
||||
try:
|
||||
line = json.dumps(event, default=str, sort_keys=True)
|
||||
with open(path, "a", encoding="utf-8") as fh:
|
||||
fh.write(line + "\n")
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
+183
@@ -15,6 +15,8 @@ Configuration (mcp_config.json):
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import functools
|
||||
import contextlib
|
||||
import subprocess
|
||||
|
||||
# Resolve the project root. MCP clients must launch this script directly with
|
||||
@@ -38,6 +40,7 @@ from gitea_auth import ( # noqa: E402
|
||||
repo_api_url,
|
||||
get_profile,
|
||||
)
|
||||
import gitea_audit # noqa: E402
|
||||
|
||||
mcp = FastMCP("gitea-tools", instructions=(
|
||||
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
|
||||
@@ -70,6 +73,144 @@ def _auth(host: str) -> str:
|
||||
return header
|
||||
|
||||
|
||||
# ── Audit logging (#18) ─────────────────────────────────────────────────────────
|
||||
# Mutating actions emit a structured audit record (profile + authenticated user
|
||||
# + outcome) when GITEA_AUDIT_LOG is configured. When it is not, every helper
|
||||
# below short-circuits and performs NO work — no extra API calls, no I/O — so
|
||||
# existing tool behaviour and API call sequences are unchanged.
|
||||
|
||||
_UNSET = object()
|
||||
|
||||
# Best-effort identity cache keyed by host, so an enabled audit trail resolves
|
||||
# the authenticated username at most once per host per process.
|
||||
_IDENTITY_CACHE: dict = {}
|
||||
|
||||
|
||||
def _authenticated_username(host: str):
|
||||
"""Resolve the authenticated Gitea username for *host* (cached, fail-soft).
|
||||
|
||||
Read-only. Returns None if the identity cannot be determined; never raises
|
||||
and never surfaces credential material.
|
||||
"""
|
||||
if host in _IDENTITY_CACHE:
|
||||
return _IDENTITY_CACHE[host]
|
||||
user = None
|
||||
try:
|
||||
header = get_auth_header(host)
|
||||
if header:
|
||||
who = api_request("GET", f"https://{host}/api/v1/user", header)
|
||||
user = (who or {}).get("login")
|
||||
except Exception:
|
||||
user = None
|
||||
_IDENTITY_CACHE[host] = user
|
||||
return user
|
||||
|
||||
|
||||
def _audit(action: str, *, host, remote, result, org=None, repo=None,
|
||||
reason=None, request_metadata=None, issue_number=None,
|
||||
pr_number=None, target_branch=None, head_sha=None, username=_UNSET):
|
||||
"""Emit one audit record for a mutating action. No-op unless auditing is on.
|
||||
|
||||
Never raises — auditing must not break the action it records.
|
||||
"""
|
||||
if not gitea_audit.audit_enabled():
|
||||
return
|
||||
try:
|
||||
profile = get_profile()
|
||||
if username is _UNSET:
|
||||
username = _authenticated_username(host) if host else None
|
||||
event = gitea_audit.build_event(
|
||||
action=action,
|
||||
result=result,
|
||||
remote=remote,
|
||||
server=(f"https://{host}" if host else None),
|
||||
repository=repo,
|
||||
issue_number=issue_number,
|
||||
pr_number=pr_number,
|
||||
profile_name=profile["profile_name"],
|
||||
audit_label=profile["audit_label"],
|
||||
authenticated_username=username,
|
||||
target_branch=target_branch,
|
||||
head_sha=head_sha,
|
||||
reason=reason,
|
||||
request_metadata=request_metadata,
|
||||
)
|
||||
gitea_audit.write_event(event)
|
||||
except Exception:
|
||||
pass # best-effort; never break the action
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _audited(action: str, *, host, remote, org=None, repo=None,
|
||||
request_metadata=None, issue_number=None, pr_number=None,
|
||||
target_branch=None):
|
||||
"""Wrap a mutating API call: audit SUCCEEDED on return, FAILED on exception.
|
||||
|
||||
When auditing is off this yields immediately with no bookkeeping.
|
||||
"""
|
||||
if not gitea_audit.audit_enabled():
|
||||
yield
|
||||
return
|
||||
try:
|
||||
yield
|
||||
except Exception as exc:
|
||||
_audit(action, host=host, remote=remote, org=org, repo=repo,
|
||||
result=gitea_audit.FAILED, reason=_redact(str(exc)),
|
||||
request_metadata=request_metadata, issue_number=issue_number,
|
||||
pr_number=pr_number, target_branch=target_branch)
|
||||
raise
|
||||
_audit(action, host=host, remote=remote, org=org, repo=repo,
|
||||
result=gitea_audit.SUCCEEDED, request_metadata=request_metadata,
|
||||
issue_number=issue_number, pr_number=pr_number,
|
||||
target_branch=target_branch)
|
||||
|
||||
|
||||
def _audit_pr_result(action: str):
|
||||
"""Decorator for gated PR tools that return a result dict.
|
||||
|
||||
Reads the tool's own result dict (authenticated_user, profile, reasons,
|
||||
performed) to emit an audit record classifying the outcome as SUCCEEDED,
|
||||
BLOCKED, or FAILED. No extra API calls: identity comes from the result.
|
||||
"""
|
||||
def decorate(fn):
|
||||
@functools.wraps(fn)
|
||||
def wrapper(*args, **kwargs):
|
||||
result = fn(*args, **kwargs)
|
||||
try:
|
||||
if isinstance(result, dict) and gitea_audit.audit_enabled():
|
||||
reasons = [str(x) for x in (result.get("reasons") or [])]
|
||||
if result.get("performed"):
|
||||
status = gitea_audit.SUCCEEDED
|
||||
elif any("failed:" in x.lower() for x in reasons):
|
||||
# "failed:" marks a surfaced exception (e.g. "merge
|
||||
# failed: <err>"); a bare gate message like "… failed
|
||||
# (fail closed)" is a policy block, not an execution error.
|
||||
status = gitea_audit.FAILED
|
||||
else:
|
||||
status = gitea_audit.BLOCKED
|
||||
remote = result.get("remote")
|
||||
host = REMOTES[remote]["host"] if remote in REMOTES else None
|
||||
_audit(
|
||||
action,
|
||||
host=host,
|
||||
remote=remote,
|
||||
result=status,
|
||||
reason="; ".join(reasons) or None,
|
||||
pr_number=result.get("pr_number"),
|
||||
head_sha=result.get("head_sha"),
|
||||
username=result.get("authenticated_user"),
|
||||
request_metadata={
|
||||
"requested_action": result.get("requested_action"),
|
||||
"merge_method": result.get("merge_method"),
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
pass # best-effort; never break the tool
|
||||
return result
|
||||
return wrapper
|
||||
return decorate
|
||||
|
||||
|
||||
# ── Tools ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
@mcp.tool()
|
||||
@@ -97,7 +238,16 @@ def gitea_create_issue(
|
||||
h, o, r = _resolve(remote, host, org, repo)
|
||||
auth = _auth(h)
|
||||
url = f"{repo_api_url(h, o, r)}/issues"
|
||||
try:
|
||||
data = api_request("POST", url, auth, {"title": title, "body": body})
|
||||
except Exception as exc:
|
||||
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
|
||||
result=gitea_audit.FAILED, reason=_redact(str(exc)),
|
||||
request_metadata={"title": title})
|
||||
raise
|
||||
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
|
||||
result=gitea_audit.SUCCEEDED, issue_number=data["number"],
|
||||
request_metadata={"title": title})
|
||||
return {"number": data["number"], "url": data["html_url"]}
|
||||
|
||||
|
||||
@@ -131,7 +281,17 @@ def gitea_create_pr(
|
||||
auth = _auth(h)
|
||||
url = f"{repo_api_url(h, o, r)}/pulls"
|
||||
payload = {"title": title, "body": body, "head": head, "base": base}
|
||||
meta = {"title": title, "head": head, "base": base}
|
||||
try:
|
||||
data = api_request("POST", url, auth, payload)
|
||||
except Exception as exc:
|
||||
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
|
||||
result=gitea_audit.FAILED, reason=_redact(str(exc)),
|
||||
target_branch=head, request_metadata=meta)
|
||||
raise
|
||||
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
|
||||
result=gitea_audit.SUCCEEDED, pr_number=data["number"],
|
||||
target_branch=head, request_metadata=meta)
|
||||
return {"number": data["number"], "url": data["html_url"]}
|
||||
|
||||
|
||||
@@ -396,6 +556,7 @@ def _redact(text: str) -> str:
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
@_audit_pr_result("submit_pr_review")
|
||||
def gitea_submit_pr_review(
|
||||
pr_number: int,
|
||||
action: str,
|
||||
@@ -575,6 +736,8 @@ def gitea_edit_pr(
|
||||
if not payload:
|
||||
raise ValueError("At least one field to edit (title, body, state, base) must be provided.")
|
||||
|
||||
with _audited("edit_pr", host=h, remote=remote, org=o, repo=r,
|
||||
pr_number=pr_number, request_metadata={"fields": sorted(payload)}):
|
||||
data = api_request("PATCH", url, auth, payload)
|
||||
return {
|
||||
"success": True,
|
||||
@@ -663,6 +826,11 @@ def gitea_commit_files(
|
||||
if new_branch is not None:
|
||||
payload["new_branch"] = new_branch
|
||||
|
||||
with _audited("commit_files", host=h, remote=remote, org=o, repo=r,
|
||||
target_branch=(new_branch or branch),
|
||||
request_metadata={"message": message,
|
||||
"paths": [f.get("path") for f in files],
|
||||
"operations": [f.get("operation") for f in files]}):
|
||||
data = api_request("POST", url, auth, payload)
|
||||
return {
|
||||
"success": True,
|
||||
@@ -676,6 +844,7 @@ _MERGE_METHODS = ("merge", "squash", "rebase")
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
@_audit_pr_result("merge_pr")
|
||||
def gitea_merge_pr(
|
||||
pr_number: int,
|
||||
confirmation: str = "",
|
||||
@@ -952,6 +1121,8 @@ def gitea_delete_branch(
|
||||
import urllib.parse
|
||||
encoded_branch = urllib.parse.quote(branch, safe="")
|
||||
url = f"{repo_api_url(h, o, r)}/branches/{encoded_branch}"
|
||||
with _audited("delete_branch", host=h, remote=remote, org=o, repo=r,
|
||||
target_branch=branch, request_metadata={"branch": branch}):
|
||||
api_request("DELETE", url, auth)
|
||||
return {"success": True, "message": f"Remote branch '{branch}' deleted."}
|
||||
|
||||
@@ -979,6 +1150,8 @@ def gitea_close_issue(
|
||||
h, o, r = _resolve(remote, host, org, repo)
|
||||
auth = _auth(h)
|
||||
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
|
||||
with _audited("close_issue", host=h, remote=remote, org=o, repo=r,
|
||||
issue_number=issue_number, request_metadata={"state": "closed"}):
|
||||
api_request("PATCH", url, auth, {"state": "closed"})
|
||||
return {"success": True, "message": f"Issue #{issue_number} closed."}
|
||||
|
||||
@@ -1232,10 +1405,16 @@ def gitea_mark_issue(
|
||||
)
|
||||
|
||||
if action == "start":
|
||||
with _audited("label_issue", host=h, remote=remote, org=o, repo=r,
|
||||
issue_number=issue_number,
|
||||
request_metadata={"op": "add", "label": "status:in-progress"}):
|
||||
api_request("POST", f"{base}/issues/{issue_number}/labels", auth,
|
||||
{"labels": [label_id]})
|
||||
return {"success": True, "message": f"Issue #{issue_number} claimed."}
|
||||
else:
|
||||
with _audited("unlabel_issue", host=h, remote=remote, org=o, repo=r,
|
||||
issue_number=issue_number,
|
||||
request_metadata={"op": "remove", "label": "status:in-progress"}):
|
||||
api_request("DELETE",
|
||||
f"{base}/issues/{issue_number}/labels/{label_id}", auth)
|
||||
return {"success": True, "message": f"Issue #{issue_number} released."}
|
||||
@@ -1301,6 +1480,8 @@ def gitea_create_label(
|
||||
"color": color,
|
||||
"description": description,
|
||||
}
|
||||
with _audited("create_label", host=h, remote=remote, org=o, repo=r,
|
||||
request_metadata={"name": name}):
|
||||
return api_request("POST", f"{base}/labels", auth, payload)
|
||||
|
||||
|
||||
@@ -1350,6 +1531,8 @@ def gitea_set_issue_labels(
|
||||
)
|
||||
|
||||
# 3. PUT the labels to the issue
|
||||
with _audited("set_issue_labels", host=h, remote=remote, org=o, repo=r,
|
||||
issue_number=issue_number, request_metadata={"labels": labels}):
|
||||
res = api_request("PUT", f"{base}/issues/{issue_number}/labels", auth, {"labels": label_ids})
|
||||
return res
|
||||
|
||||
|
||||
@@ -0,0 +1,302 @@
|
||||
"""Tests for Gitea MCP mutating-action audit logging (issue #18).
|
||||
|
||||
Covers the pure audit module (redaction, event building, sink writes) and the
|
||||
wiring in mcp_server: mutating tools emit one record with profile +
|
||||
authenticated username + outcome, secrets are redacted, and — critically —
|
||||
auditing is a no-op (no records, no extra API calls) when GITEA_AUDIT_LOG is
|
||||
unset so existing behaviour is unchanged.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import datetime
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
||||
|
||||
import gitea_audit # noqa: E402
|
||||
import mcp_server # noqa: E402
|
||||
from mcp_server import ( # noqa: E402
|
||||
gitea_create_issue,
|
||||
gitea_close_issue,
|
||||
gitea_merge_pr,
|
||||
gitea_submit_pr_review,
|
||||
)
|
||||
|
||||
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
|
||||
FIXED_TS = datetime.datetime(2026, 7, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pure audit module
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestRedaction(unittest.TestCase):
|
||||
|
||||
def test_redacts_secret_keys(self):
|
||||
red = gitea_audit.redact({"token": "abc", "title": "hi", "Password": "x"})
|
||||
self.assertEqual(red["token"], gitea_audit.REDACTED)
|
||||
self.assertEqual(red["Password"], gitea_audit.REDACTED)
|
||||
self.assertEqual(red["title"], "hi")
|
||||
|
||||
def test_redacts_nested_and_lists(self):
|
||||
red = gitea_audit.redact({"outer": {"authorization": "Basic xyz"},
|
||||
"items": [{"secret": "s"}, "plain"]})
|
||||
self.assertEqual(red["outer"]["authorization"], gitea_audit.REDACTED)
|
||||
self.assertEqual(red["items"][0]["secret"], gitea_audit.REDACTED)
|
||||
self.assertEqual(red["items"][1], "plain")
|
||||
|
||||
def test_redacts_credential_value_prefixes(self):
|
||||
self.assertEqual(
|
||||
gitea_audit._redact_str("failed: token abc-123 rejected"),
|
||||
"failed: token [REDACTED] rejected",
|
||||
)
|
||||
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Basic Zm9v"))
|
||||
self.assertIn(gitea_audit.REDACTED, gitea_audit._redact_str("Bearer tok99"))
|
||||
|
||||
def test_non_string_untouched(self):
|
||||
self.assertEqual(gitea_audit.redact(42), 42)
|
||||
self.assertIsNone(gitea_audit.redact(None))
|
||||
|
||||
|
||||
class TestBuildEvent(unittest.TestCase):
|
||||
|
||||
def test_core_fields_and_injected_timestamp(self):
|
||||
ev = gitea_audit.build_event(
|
||||
action="create_issue", result=gitea_audit.SUCCEEDED,
|
||||
remote="prgs", server="https://gitea.prgs.cc", repository="Repo",
|
||||
issue_number=5, profile_name="gitea-author", audit_label="author-rt",
|
||||
authenticated_username="bot", now=FIXED_TS,
|
||||
)
|
||||
self.assertEqual(ev["timestamp"], "2026-07-01T12:00:00+00:00")
|
||||
self.assertEqual(ev["action"], "create_issue")
|
||||
self.assertEqual(ev["action_type"], "mutating")
|
||||
self.assertEqual(ev["result"], "succeeded")
|
||||
self.assertEqual(ev["profile_name"], "gitea-author")
|
||||
self.assertEqual(ev["audit_label"], "author-rt")
|
||||
self.assertEqual(ev["authenticated_username"], "bot")
|
||||
self.assertEqual(ev["issue_number"], 5)
|
||||
|
||||
def test_metadata_and_reason_redacted(self):
|
||||
ev = gitea_audit.build_event(
|
||||
action="create_pr", result=gitea_audit.FAILED,
|
||||
reason="HTTP 500: token secret-xyz bad",
|
||||
request_metadata={"title": "t", "token": "leak"}, now=FIXED_TS,
|
||||
)
|
||||
self.assertNotIn("secret-xyz", ev["reason"])
|
||||
self.assertEqual(ev["request_metadata"]["token"], gitea_audit.REDACTED)
|
||||
self.assertEqual(ev["request_metadata"]["title"], "t")
|
||||
|
||||
|
||||
class TestSink(unittest.TestCase):
|
||||
|
||||
def test_enabled_reflects_env(self):
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
self.assertFalse(gitea_audit.audit_enabled())
|
||||
with patch.dict(os.environ, {"GITEA_AUDIT_LOG": "/tmp/x.log"}, clear=True):
|
||||
self.assertTrue(gitea_audit.audit_enabled())
|
||||
|
||||
def test_write_event_appends_json_lines(self):
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
path = os.path.join(d, "audit.log")
|
||||
self.assertTrue(gitea_audit.write_event({"a": 1}, path=path))
|
||||
self.assertTrue(gitea_audit.write_event({"b": 2}, path=path))
|
||||
with open(path, encoding="utf-8") as fh:
|
||||
lines = fh.read().splitlines()
|
||||
self.assertEqual(len(lines), 2)
|
||||
self.assertEqual(json.loads(lines[0])["a"], 1)
|
||||
self.assertEqual(json.loads(lines[1])["b"], 2)
|
||||
|
||||
def test_write_event_noop_without_path(self):
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
self.assertFalse(gitea_audit.write_event({"a": 1}))
|
||||
|
||||
def test_write_event_never_raises_on_bad_path(self):
|
||||
# A path inside a non-existent directory cannot be opened; must not raise.
|
||||
self.assertFalse(gitea_audit.write_event({"a": 1}, path="/no/such/dir/x.log"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# mcp_server wiring
|
||||
# ---------------------------------------------------------------------------
|
||||
class _AuditWiringBase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self._dir = tempfile.TemporaryDirectory()
|
||||
self.audit_path = os.path.join(self._dir.name, "audit.log")
|
||||
# Identity cache is process-global; clear so lookups are deterministic.
|
||||
mcp_server._IDENTITY_CACHE.clear()
|
||||
|
||||
def tearDown(self):
|
||||
mcp_server._IDENTITY_CACHE.clear()
|
||||
self._dir.cleanup()
|
||||
|
||||
def _env(self, **extra):
|
||||
env = {"GITEA_AUDIT_LOG": self.audit_path,
|
||||
"GITEA_PROFILE_NAME": "gitea-author",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
||||
env.update(extra)
|
||||
return env
|
||||
|
||||
def _records(self):
|
||||
if not os.path.exists(self.audit_path):
|
||||
return []
|
||||
with open(self.audit_path, encoding="utf-8") as fh:
|
||||
return [json.loads(line) for line in fh if line.strip()]
|
||||
|
||||
|
||||
class TestSimpleToolAudit(_AuditWiringBase):
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_issue_success_audited(self, _auth, mock_api):
|
||||
# 1: create POST result, 2: identity /user lookup for the audit record.
|
||||
mock_api.side_effect = [
|
||||
{"number": 11, "html_url": "https://gitea.prgs.cc/issues/11"},
|
||||
{"login": "author-bot"},
|
||||
]
|
||||
with patch.dict(os.environ, self._env(), clear=True):
|
||||
result = gitea_create_issue(title="Add thing", remote="prgs")
|
||||
self.assertEqual(result["number"], 11)
|
||||
recs = self._records()
|
||||
self.assertEqual(len(recs), 1)
|
||||
rec = recs[0]
|
||||
self.assertEqual(rec["action"], "create_issue")
|
||||
self.assertEqual(rec["result"], "succeeded")
|
||||
self.assertEqual(rec["profile_name"], "gitea-author")
|
||||
self.assertEqual(rec["authenticated_username"], "author-bot")
|
||||
self.assertEqual(rec["issue_number"], 11)
|
||||
self.assertEqual(rec["request_metadata"]["title"], "Add thing")
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_issue_failure_audited(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
RuntimeError("HTTP 500: boom"),
|
||||
{"login": "author-bot"}, # identity lookup for the audit record
|
||||
]
|
||||
with patch.dict(os.environ, self._env(), clear=True):
|
||||
with self.assertRaises(RuntimeError):
|
||||
gitea_create_issue(title="X", remote="prgs")
|
||||
recs = self._records()
|
||||
self.assertEqual(len(recs), 1)
|
||||
self.assertEqual(recs[0]["result"], "failed")
|
||||
self.assertIn("HTTP 500", recs[0]["reason"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_close_issue_audited(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"state": "closed"}, {"login": "mgr-bot"}]
|
||||
with patch.dict(os.environ, self._env(), clear=True):
|
||||
gitea_close_issue(issue_number=42, remote="prgs")
|
||||
recs = self._records()
|
||||
self.assertEqual(len(recs), 1)
|
||||
self.assertEqual(recs[0]["action"], "close_issue")
|
||||
self.assertEqual(recs[0]["issue_number"], 42)
|
||||
self.assertEqual(recs[0]["authenticated_username"], "mgr-bot")
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_disabled_writes_nothing_and_no_extra_call(self, _auth, mock_api):
|
||||
# No GITEA_AUDIT_LOG -> audit is a no-op: exactly one API call, no file.
|
||||
mock_api.return_value = {"number": 1, "html_url": "http://x/1"}
|
||||
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
|
||||
gitea_create_issue(title="x", remote="prgs")
|
||||
mock_api.assert_called_once()
|
||||
self.assertEqual(self._records(), [])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_secrets_never_written(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"number": 3, "html_url": "http://x/3"},
|
||||
{"login": "author-bot"},
|
||||
]
|
||||
env = self._env(GITEA_TOKEN="super-secret-token")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
gitea_create_issue(title="t", remote="prgs")
|
||||
with open(self.audit_path, encoding="utf-8") as fh:
|
||||
blob = fh.read().lower()
|
||||
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
||||
self.assertNotIn(secret, blob)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_audit_failure_never_breaks_action(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"number": 9, "html_url": "http://x/9"},
|
||||
{"login": "author-bot"},
|
||||
]
|
||||
with patch.dict(os.environ, self._env(), clear=True):
|
||||
with patch("gitea_audit.write_event", side_effect=RuntimeError("disk full")):
|
||||
result = gitea_create_issue(title="t", remote="prgs")
|
||||
# The mutation result is returned even though the sink write blew up.
|
||||
self.assertEqual(result["number"], 9)
|
||||
|
||||
|
||||
class TestGatedToolAudit(_AuditWiringBase):
|
||||
|
||||
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
||||
return {"user": {"login": author}, "state": state,
|
||||
"head": {"sha": sha}, "mergeable": mergeable}
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_merge_success_audited(self, _auth, mock_api):
|
||||
# user, pr, merge POST, readback — no extra identity call (uses result).
|
||||
mock_api.side_effect = [
|
||||
{"login": "merger-bot"}, self._pr("author-bot"),
|
||||
{}, {"merged_commit_sha": "c1"},
|
||||
]
|
||||
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
|
||||
GITEA_ALLOWED_OPERATIONS="read,merge")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8",
|
||||
expected_head_sha="abc123", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
recs = self._records()
|
||||
self.assertEqual(len(recs), 1)
|
||||
self.assertEqual(recs[0]["action"], "merge_pr")
|
||||
self.assertEqual(recs[0]["result"], "succeeded")
|
||||
self.assertEqual(recs[0]["authenticated_username"], "merger-bot")
|
||||
self.assertEqual(recs[0]["pr_number"], 8)
|
||||
self.assertEqual(recs[0]["head_sha"], "abc123")
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_merge_blocked_audited(self, _auth, mock_api):
|
||||
# Self-author merge is blocked; must still be recorded as blocked.
|
||||
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
||||
env = self._env(GITEA_PROFILE_NAME="gitea-merger",
|
||||
GITEA_ALLOWED_OPERATIONS="read,merge")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 8", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
recs = self._records()
|
||||
self.assertEqual(len(recs), 1)
|
||||
self.assertEqual(recs[0]["result"], "blocked")
|
||||
self.assertEqual(recs[0]["authenticated_username"], "jcwalker3")
|
||||
self.assertIn("PR author", recs[0]["reason"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_submit_review_success_audited(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
|
||||
]
|
||||
env = self._env(GITEA_PROFILE_NAME="gitea-reviewer",
|
||||
GITEA_ALLOWED_OPERATIONS="read,review,approve")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(pr_number=8, action="approve",
|
||||
body="LGTM", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
recs = self._records()
|
||||
self.assertEqual(len(recs), 1)
|
||||
self.assertEqual(recs[0]["action"], "submit_pr_review")
|
||||
self.assertEqual(recs[0]["result"], "succeeded")
|
||||
self.assertEqual(recs[0]["authenticated_username"], "reviewer-bot")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user