c3c48fb7c2
Add durable, opt-in audit logging for every mutating Gitea MCP action so an operator can see which execution profile and authenticated Gitea user performed (or was blocked from / failed) each mutation. - New gitea_audit.py: pure, no-network module — recursive secret redaction (token/password/authorization keys; token/Basic/Bearer value runs), build_event (timestamp, action, result, profile, audit label, authenticated username, repo, issue/PR, target branch, head SHA, redacted request metadata), and an append-only JSON Lines sink. - mcp_server.py: _audit helper + _audited context manager (simple mutations) and an _audit_pr_result decorator (gated review/merge tools, reading their own result dict) wired into create_issue, create_pr, edit_pr, close_issue, commit_files, delete_branch, create_label, set_issue_labels, mark_issue (label/unlabel), gitea_submit_pr_review, and gitea_merge_pr. - Outcomes recorded as allowed/blocked/failed/succeeded; blocked and failed eligibility checks are logged, not just successes. Off by default: records are written only when GITEA_AUDIT_LOG is set. When it is unset every audit path short-circuits — no records, no extra API calls — so existing tool behaviour and API call sequences are unchanged. Auditing never raises; sink writes are best-effort. Tokens are never written. Docs: README env table + audit note, .env.example placeholder. Tests: tests/test_audit.py (19 cases) — redaction, event build, sink writes, per-tool success/failure/blocked records, secret-free output, off-by-default no-op, and audit-failure-never-breaks-action. Closes #18 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
127 lines
4.4 KiB
Python
127 lines
4.4 KiB
Python
"""Audit logging for Gitea MCP mutating actions (issue #18).
|
|
|
|
Emits one structured JSON record per mutating action so an operator can see
|
|
*which execution profile and authenticated Gitea user* performed (or was
|
|
blocked from / failed) each mutation.
|
|
|
|
Design constraints:
|
|
|
|
- **Off by default.** Records are written only when ``GITEA_AUDIT_LOG`` names a
|
|
file path. With it unset, ``write_event`` is a no-op and callers add zero
|
|
behaviour — existing tool behaviour and API call patterns are unchanged.
|
|
- **Never raises.** Auditing must never break the action it records; all sink
|
|
I/O is best-effort and swallows errors.
|
|
- **No secrets.** Tokens / Authorization material are redacted from request
|
|
metadata and reason strings before a record is written.
|
|
- **No network.** This module performs no HTTP; the caller supplies identity
|
|
and profile metadata it already resolved.
|
|
"""
|
|
import os
|
|
import json
|
|
import datetime
|
|
|
|
# Result states for an audited action.
|
|
ALLOWED = "allowed"
|
|
BLOCKED = "blocked"
|
|
FAILED = "failed"
|
|
SUCCEEDED = "succeeded"
|
|
|
|
REDACTED = "[REDACTED]"
|
|
|
|
# A dict key containing any of these (case-insensitive) has its value redacted.
|
|
_SECRET_KEY_HINTS = ("token", "password", "secret", "authorization", "auth")
|
|
# A string value starting with one of these has the following run redacted.
|
|
_SECRET_VALUE_PREFIXES = ("token ", "Basic ", "Bearer ")
|
|
|
|
|
|
def _redact_str(text):
|
|
"""Redact anything that looks like an Authorization credential in *text*."""
|
|
if not isinstance(text, str) or not text:
|
|
return text
|
|
out = text
|
|
for prefix in _SECRET_VALUE_PREFIXES:
|
|
idx = 0
|
|
while True:
|
|
i = out.find(prefix, idx)
|
|
if i == -1:
|
|
break
|
|
j = i + len(prefix)
|
|
while j < len(out) and not out[j].isspace():
|
|
j += 1
|
|
out = out[:i] + prefix + REDACTED + out[j:]
|
|
idx = i + len(prefix) + len(REDACTED)
|
|
return out
|
|
|
|
|
|
def redact(value):
|
|
"""Recursively redact secret-looking keys/values from a JSON-able value."""
|
|
if isinstance(value, dict):
|
|
result = {}
|
|
for k, v in value.items():
|
|
if isinstance(k, str) and any(h in k.lower() for h in _SECRET_KEY_HINTS):
|
|
result[k] = REDACTED
|
|
else:
|
|
result[k] = redact(v)
|
|
return result
|
|
if isinstance(value, (list, tuple)):
|
|
return [redact(v) for v in value]
|
|
if isinstance(value, str):
|
|
return _redact_str(value)
|
|
return value
|
|
|
|
|
|
def audit_log_path():
|
|
"""Return the configured audit log file path, or None if auditing is off."""
|
|
return (os.environ.get("GITEA_AUDIT_LOG") or "").strip() or None
|
|
|
|
|
|
def audit_enabled():
|
|
"""True when a sink is configured. When False, callers should skip auditing."""
|
|
return audit_log_path() is not None
|
|
|
|
|
|
def build_event(*, action, result, remote=None, server=None, repository=None,
|
|
issue_number=None, pr_number=None, profile_name=None,
|
|
audit_label=None, authenticated_username=None, target_branch=None,
|
|
head_sha=None, reason=None, request_metadata=None, now=None):
|
|
"""Build a redacted, JSON-able audit record for a mutating action."""
|
|
ts = now or datetime.datetime.now(datetime.timezone.utc)
|
|
if isinstance(ts, datetime.datetime):
|
|
ts = ts.isoformat()
|
|
return {
|
|
"timestamp": ts,
|
|
"action": action,
|
|
"action_type": "mutating",
|
|
"result": result,
|
|
"remote": remote,
|
|
"server": server,
|
|
"repository": repository,
|
|
"issue_number": issue_number,
|
|
"pr_number": pr_number,
|
|
"profile_name": profile_name,
|
|
"audit_label": audit_label,
|
|
"authenticated_username": authenticated_username,
|
|
"target_branch": target_branch,
|
|
"head_sha": head_sha,
|
|
"reason": _redact_str(reason) if reason else reason,
|
|
"request_metadata": redact(request_metadata) if request_metadata is not None else None,
|
|
}
|
|
|
|
|
|
def write_event(event, path=None):
|
|
"""Append *event* as one JSON line to the audit sink. Never raises.
|
|
|
|
Returns True if a line was written, False if auditing is off or the write
|
|
failed (auditing is best-effort and must not break the caller).
|
|
"""
|
|
path = path or audit_log_path()
|
|
if not path:
|
|
return False
|
|
try:
|
|
line = json.dumps(event, default=str, sort_keys=True)
|
|
with open(path, "a", encoding="utf-8") as fh:
|
|
fh.write(line + "\n")
|
|
return True
|
|
except Exception:
|
|
return False
|