"""Audit logging for Gitea MCP mutating actions (issue #18). Emits one structured JSON record per mutating action so an operator can see *which execution profile and authenticated Gitea user* performed (or was blocked from / failed) each mutation. Design constraints: - **Off by default.** Records are written only when ``GITEA_AUDIT_LOG`` names a file path. With it unset, ``write_event`` is a no-op and callers add zero behaviour — existing tool behaviour and API call patterns are unchanged. - **Never raises.** Auditing must never break the action it records; all sink I/O is best-effort and swallows errors. - **No secrets.** Tokens / Authorization material are redacted from request metadata and reason strings before a record is written. - **No network.** This module performs no HTTP; the caller supplies identity and profile metadata it already resolved. """ import os import json import datetime # Result states for an audited action. ALLOWED = "allowed" BLOCKED = "blocked" FAILED = "failed" SUCCEEDED = "succeeded" REDACTED = "[REDACTED]" # A dict key containing any of these (case-insensitive) has its value redacted. _SECRET_KEY_HINTS = ("token", "password", "secret", "authorization", "auth") # A string value starting with one of these has the following run redacted. _SECRET_VALUE_PREFIXES = ("token ", "Basic ", "Bearer ") def _redact_str(text): """Redact anything that looks like an Authorization credential in *text*.""" if not isinstance(text, str) or not text: return text out = text for prefix in _SECRET_VALUE_PREFIXES: idx = 0 while True: i = out.find(prefix, idx) if i == -1: break j = i + len(prefix) while j < len(out) and not out[j].isspace(): j += 1 out = out[:i] + prefix + REDACTED + out[j:] idx = i + len(prefix) + len(REDACTED) return out def redact(value): """Recursively redact secret-looking keys/values from a JSON-able value.""" if isinstance(value, dict): result = {} for k, v in value.items(): if isinstance(k, str) and any(h in k.lower() for h in _SECRET_KEY_HINTS): result[k] = REDACTED else: result[k] = redact(v) return result if isinstance(value, (list, tuple)): return [redact(v) for v in value] if isinstance(value, str): return _redact_str(value) return value def audit_log_path(): """Return the configured audit log file path, or None if auditing is off.""" return (os.environ.get("GITEA_AUDIT_LOG") or "").strip() or None def audit_enabled(): """True when a sink is configured. When False, callers should skip auditing.""" return audit_log_path() is not None def build_event(*, action, result, remote=None, server=None, repository=None, issue_number=None, pr_number=None, profile_name=None, audit_label=None, authenticated_username=None, target_branch=None, head_sha=None, reason=None, request_metadata=None, now=None): """Build a redacted, JSON-able audit record for a mutating action.""" ts = now or datetime.datetime.now(datetime.timezone.utc) if isinstance(ts, datetime.datetime): ts = ts.isoformat() return { "timestamp": ts, "action": action, "action_type": "mutating", "result": result, "remote": remote, "server": server, "repository": repository, "issue_number": issue_number, "pr_number": pr_number, "profile_name": profile_name, "audit_label": audit_label, "authenticated_username": authenticated_username, "target_branch": target_branch, "head_sha": head_sha, "reason": _redact_str(reason) if reason else reason, "request_metadata": redact(request_metadata) if request_metadata is not None else None, } def write_event(event, path=None): """Append *event* as one JSON line to the audit sink. Never raises. Returns True if a line was written, False if auditing is off or the write failed (auditing is best-effort and must not break the caller). """ path = path or audit_log_path() if not path: return False try: line = json.dumps(event, default=str, sort_keys=True) with open(path, "a", encoding="utf-8") as fh: fh.write(line + "\n") return True except Exception: return False