Files
Gitea-Tools/gitea_audit.py
sysadmin c3c48fb7c2 feat: audit-log Gitea MCP mutating actions with profile metadata (#18)
Add durable, opt-in audit logging for every mutating Gitea MCP action so an
operator can see which execution profile and authenticated Gitea user
performed (or was blocked from / failed) each mutation.

- New gitea_audit.py: pure, no-network module — recursive secret redaction
  (token/password/authorization keys; token/Basic/Bearer value runs),
  build_event (timestamp, action, result, profile, audit label, authenticated
  username, repo, issue/PR, target branch, head SHA, redacted request
  metadata), and an append-only JSON Lines sink.
- mcp_server.py: _audit helper + _audited context manager (simple mutations)
  and an _audit_pr_result decorator (gated review/merge tools, reading their
  own result dict) wired into create_issue, create_pr, edit_pr, close_issue,
  commit_files, delete_branch, create_label, set_issue_labels, mark_issue
  (label/unlabel), gitea_submit_pr_review, and gitea_merge_pr.
- Outcomes recorded as allowed/blocked/failed/succeeded; blocked and failed
  eligibility checks are logged, not just successes.

Off by default: records are written only when GITEA_AUDIT_LOG is set. When it
is unset every audit path short-circuits — no records, no extra API calls — so
existing tool behaviour and API call sequences are unchanged. Auditing never
raises; sink writes are best-effort. Tokens are never written.

Docs: README env table + audit note, .env.example placeholder.
Tests: tests/test_audit.py (19 cases) — redaction, event build, sink writes,
per-tool success/failure/blocked records, secret-free output, off-by-default
no-op, and audit-failure-never-breaks-action.

Closes #18

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 22:20:51 -04:00

127 lines
4.4 KiB
Python

"""Audit logging for Gitea MCP mutating actions (issue #18).
Emits one structured JSON record per mutating action so an operator can see
*which execution profile and authenticated Gitea user* performed (or was
blocked from / failed) each mutation.
Design constraints:
- **Off by default.** Records are written only when ``GITEA_AUDIT_LOG`` names a
file path. With it unset, ``write_event`` is a no-op and callers add zero
behaviour — existing tool behaviour and API call patterns are unchanged.
- **Never raises.** Auditing must never break the action it records; all sink
I/O is best-effort and swallows errors.
- **No secrets.** Tokens / Authorization material are redacted from request
metadata and reason strings before a record is written.
- **No network.** This module performs no HTTP; the caller supplies identity
and profile metadata it already resolved.
"""
import os
import json
import datetime
# Result states for an audited action.
ALLOWED = "allowed"
BLOCKED = "blocked"
FAILED = "failed"
SUCCEEDED = "succeeded"
REDACTED = "[REDACTED]"
# A dict key containing any of these (case-insensitive) has its value redacted.
_SECRET_KEY_HINTS = ("token", "password", "secret", "authorization", "auth")
# A string value starting with one of these has the following run redacted.
_SECRET_VALUE_PREFIXES = ("token ", "Basic ", "Bearer ")
def _redact_str(text):
"""Redact anything that looks like an Authorization credential in *text*."""
if not isinstance(text, str) or not text:
return text
out = text
for prefix in _SECRET_VALUE_PREFIXES:
idx = 0
while True:
i = out.find(prefix, idx)
if i == -1:
break
j = i + len(prefix)
while j < len(out) and not out[j].isspace():
j += 1
out = out[:i] + prefix + REDACTED + out[j:]
idx = i + len(prefix) + len(REDACTED)
return out
def redact(value):
"""Recursively redact secret-looking keys/values from a JSON-able value."""
if isinstance(value, dict):
result = {}
for k, v in value.items():
if isinstance(k, str) and any(h in k.lower() for h in _SECRET_KEY_HINTS):
result[k] = REDACTED
else:
result[k] = redact(v)
return result
if isinstance(value, (list, tuple)):
return [redact(v) for v in value]
if isinstance(value, str):
return _redact_str(value)
return value
def audit_log_path():
"""Return the configured audit log file path, or None if auditing is off."""
return (os.environ.get("GITEA_AUDIT_LOG") or "").strip() or None
def audit_enabled():
"""True when a sink is configured. When False, callers should skip auditing."""
return audit_log_path() is not None
def build_event(*, action, result, remote=None, server=None, repository=None,
issue_number=None, pr_number=None, profile_name=None,
audit_label=None, authenticated_username=None, target_branch=None,
head_sha=None, reason=None, request_metadata=None, now=None):
"""Build a redacted, JSON-able audit record for a mutating action."""
ts = now or datetime.datetime.now(datetime.timezone.utc)
if isinstance(ts, datetime.datetime):
ts = ts.isoformat()
return {
"timestamp": ts,
"action": action,
"action_type": "mutating",
"result": result,
"remote": remote,
"server": server,
"repository": repository,
"issue_number": issue_number,
"pr_number": pr_number,
"profile_name": profile_name,
"audit_label": audit_label,
"authenticated_username": authenticated_username,
"target_branch": target_branch,
"head_sha": head_sha,
"reason": _redact_str(reason) if reason else reason,
"request_metadata": redact(request_metadata) if request_metadata is not None else None,
}
def write_event(event, path=None):
"""Append *event* as one JSON line to the audit sink. Never raises.
Returns True if a line was written, False if auditing is off or the write
failed (auditing is best-effort and must not break the caller).
"""
path = path or audit_log_path()
if not path:
return False
try:
line = json.dumps(event, default=str, sort_keys=True)
with open(path, "a", encoding="utf-8") as fh:
fh.write(line + "\n")
return True
except Exception:
return False