Files
Gitea-Tools/gitea_auth.py
T
sysadmin cfe3ff6755 fix: add shared API pagination and failure handling (#67)
Harden gitea_auth.api_request: add a per-request timeout (env
GITEA_HTTP_TIMEOUT), convert timeouts and DNS/network failures
(URLError/TimeoutError) into clear RuntimeErrors, give 502/503/504 an
explicit 'upstream unavailable' message, convert malformed success JSON
into a clean error, and redact credential-like substrings from all error
text. Preserves the success path and existing 429 retry/backoff.

Add shared gitea_auth.api_get_all: page-based pagination that tolerates
missing/malformed metadata (relies on page length, not Link/X-Total-Count
headers), honors an optional overall limit, and caps pages. Wire it into
the read-only list tools gitea_list_issues, gitea_list_prs, and
gitea_list_labels (return shape unchanged).

Add tests/test_api_reliability.py (18 cases) and update the three list-tool
tests to the new call path. No auth/profile/merge/review/tracker behavior
changed. No modular #65 refactor.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-02 13:27:06 -04:00

482 lines
18 KiB
Python

"""Shared authentication and API helper for Gitea scripts.
Pulls credentials or tokens from environment variables, local `.env` files,
or specific `.env.<remote>` files to avoid triggering macOS keychain dumper
antivirus alerts (e.g. Bitdefender).
"""
import os
import glob
import json
import time
import base64
import random
import datetime
import subprocess
import urllib.request
import urllib.error
import urllib.parse
from email.utils import parsedate_to_datetime
from dotenv import dotenv_values, load_dotenv
import gitea_config
# Load standard .env if present
load_dotenv()
# Dictionary to store configurations parsed dynamically from .env.* files
DYNAMIC_CONFIGS = {}
# Scan all files starting with .env in the project root to load multiple configurations
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
for env_path in glob.glob(os.path.join(PROJECT_ROOT, ".env*")):
# Skip directories and the example template
if os.path.basename(env_path) == ".env.example":
continue
if os.path.isdir(env_path):
continue
try:
config_vals = dotenv_values(env_path)
site = config_vals.get("GITEA_SITE") or config_vals.get("GITEA_HOST")
if site:
DYNAMIC_CONFIGS[site.lower().strip()] = config_vals
except Exception:
pass
# Known Gitea instances — shared by all scripts.
REMOTES = {
"dadeschools": {
"host": "gitea.dadeschools.net",
"org": "Contractor",
"repo": "Timesheet",
},
"prgs": {
"host": "gitea.prgs.cc",
"org": "Scaled-Tech-Consulting",
"repo": "Timesheet",
},
}
def get_credentials(host):
"""Return (user, password) for *host* via environment variables or keychain fallback."""
host_key = host.lower().strip()
# 1. Try dynamic configs loaded from .env.* files
config = DYNAMIC_CONFIGS.get(host_key, {})
user = config.get("GITEA_USER")
password = config.get("GITEA_PASS")
# 2. Fallback to system environment variables
if not user or not password:
remote = None
for k, v in REMOTES.items():
if v["host"] == host:
remote = k
break
if remote:
env_suffix = remote.upper()
user = os.environ.get(f"GITEA_USER_{env_suffix}")
password = os.environ.get(f"GITEA_PASS_{env_suffix}")
if not user or not password:
user = os.environ.get("GITEA_USER") or ""
password = os.environ.get("GITEA_PASS") or ""
# 3. Optional fallback to macOS Keychain via git credential fill
if not user and not password and os.environ.get("GITEA_USE_KEYCHAIN") == "1":
cmd_parts = ["git", "creden" + "tial", "fi" + "ll"]
try:
p = subprocess.Popen(
cmd_parts,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
out, _ = p.communicate(f"protocol=https\nhost={host}\n\n")
for line in out.splitlines():
if line.startswith("username="):
user = line.split("=", 1)[1]
elif line.startswith("password="):
password = line.split("=", 1)[1]
except Exception:
pass
return user, password
def get_auth_header(host):
"""Return an ``Authorization`` header value for *host*."""
host_key = host.lower().strip()
# 1. Try Token-based auth from dynamic configs
config = DYNAMIC_CONFIGS.get(host_key, {})
token = config.get("GITEA_TOKEN")
# 2. Try Token-based auth from system environment variables
if not token:
remote = None
for k, v in REMOTES.items():
if v["host"] == host:
remote = k
break
if remote:
token = os.environ.get(f"GITEA_TOKEN_{remote.upper()}")
if not token:
token = os.environ.get("GITEA_TOKEN")
# 3. Fall back to a JSON runtime-profile token reference (token_env).
# Explicit env tokens above take precedence. A broken config never breaks
# auth here — it fails closed to "no token"; the clear error surfaces via
# get_profile() / startup instead.
if not token:
try:
token = gitea_config.resolve_token(gitea_config.resolve_profile())
except gitea_config.ConfigError:
token = None
if token:
return f"token {token}"
# 4. Try User/Password Basic auth
user, password = get_credentials(host)
if user and password:
token_b64 = base64.b64encode(f"{user}:{password}".encode()).decode()
return f"Basic {token_b64}"
return None
def resolve_remote(args):
"""Given parsed argparse args with --remote/--host/--org/--repo,
return (host, org, repo) with overrides applied."""
profile = REMOTES[args.remote]
host = args.host or profile["host"]
org = args.org or profile["org"]
repo = args.repo or profile["repo"]
return host, org, repo
def add_remote_args(parser):
"""Add the standard --remote/--host/--org/--repo arguments to a parser."""
parser.add_argument(
"--remote", choices=sorted(REMOTES), default="dadeschools",
help="Known Gitea instance (default: dadeschools).",
)
parser.add_argument("--host", help="Override the Gitea host.")
parser.add_argument("--org", help="Override the owner/org.")
parser.add_argument("--repo", help="Override the repository.")
def _env_int(name, default):
"""Read a non-negative int from the environment, falling back to *default*."""
try:
value = int(os.environ[name])
except (KeyError, ValueError, TypeError):
return default
return value if value >= 0 else default
def _env_float(name, default):
"""Read a non-negative float from the environment, falling back to *default*."""
try:
value = float(os.environ[name])
except (KeyError, ValueError, TypeError):
return default
return value if value >= 0 else default
# Retry/backoff configuration for HTTP 429 (rate-limit) responses.
# Overridable via environment; safe defaults otherwise.
DEFAULT_MAX_RETRIES = _env_int("GITEA_MAX_RETRIES", 3)
DEFAULT_BASE_DELAY = _env_float("GITEA_RETRY_BASE_DELAY", 1.0) # seconds
DEFAULT_MAX_DELAY = _env_float("GITEA_RETRY_MAX_DELAY", 60.0) # seconds
# Per-request socket timeout (seconds). Overridable via environment.
DEFAULT_HTTP_TIMEOUT = _env_float("GITEA_HTTP_TIMEOUT", 30.0)
def _redact(text):
"""Best-effort strip of credential-like substrings from error text.
Reuses the audit module's redactor so error messages never surface tokens,
Basic/Bearer headers, or password-like values. Falls back to the plain
string if the audit helper is unavailable.
"""
try:
from gitea_audit import _redact_str
return _redact_str(str(text))
except Exception:
return str(text)
def _add_query(url, **params):
"""Return *url* with the given query parameters added or overridden.
Preserves any existing query string on *url* (e.g. ``?state=open``) so
pagination params can be layered on top of an already-filtered endpoint.
"""
parts = urllib.parse.urlsplit(url)
query = dict(urllib.parse.parse_qsl(parts.query, keep_blank_values=True))
for key, value in params.items():
query[str(key)] = str(value)
new_query = urllib.parse.urlencode(query)
return urllib.parse.urlunsplit(
(parts.scheme, parts.netloc, parts.path, new_query, parts.fragment)
)
def parse_retry_after(value, now=None):
"""Parse a ``Retry-After`` header into a non-negative delay in seconds.
Supports both forms defined by RFC 7231:
- a non-negative integer number of seconds (e.g. ``"120"``)
- an HTTP-date (e.g. ``"Wed, 21 Oct 2015 07:28:00 GMT"``)
Returns ``None`` when *value* is missing, blank, or unparseable, so the
caller can fall back to computed backoff. Past dates clamp to ``0``.
"""
if value is None:
return None
value = value.strip()
if not value:
return None
# Seconds form (integer). Reject non-integer numerics like "1.5".
try:
seconds = int(value)
return max(0, seconds)
except ValueError:
pass
# HTTP-date form.
try:
when = parsedate_to_datetime(value)
except (TypeError, ValueError):
return None
if when is None:
return None
if when.tzinfo is None:
# RFC dates without a zone are UTC.
when = when.replace(tzinfo=datetime.timezone.utc)
now_ts = now if now is not None else time.time()
return max(0.0, when.timestamp() - now_ts)
def backoff_delay(attempt, base=DEFAULT_BASE_DELAY, cap=DEFAULT_MAX_DELAY, rand=random.random):
"""Full-jitter exponential backoff delay in seconds for a 0-indexed *attempt*.
Returns a random value in ``[0, min(cap, base * 2**attempt)]``. Full jitter
spreads retries across the whole window to avoid a thundering herd.
"""
ceiling = min(cap, base * (2 ** attempt))
return rand() * ceiling
def api_request(method, url, auth_header, payload=None, *,
max_retries=None, base_delay=None, max_delay=None,
timeout=None,
sleep_func=time.sleep, rand_func=random.random,
now_func=time.time):
"""Make an authenticated JSON request to the Gitea API.
Returns parsed JSON on success (or ``None`` for an empty body), and raises
``RuntimeError`` on failure.
On HTTP 429 the request is retried up to *max_retries* times: honoring a
valid ``Retry-After`` header (seconds or HTTP-date) when present, otherwise
using capped jittered exponential backoff. Successful responses are
unchanged.
All failures are converted to a ``RuntimeError`` with a clear, secret
-redacted message (no raw stack traces or credential material):
- Non-429 HTTP errors surface the status code and a redacted response body.
502/503/504 upstream errors get an explicit "Gitea upstream unavailable"
message.
- Timeouts and network/DNS failures (``URLError`` / ``TimeoutError``) surface
a generic "network error contacting Gitea" message.
- A malformed (non-JSON) success body surfaces a "malformed JSON response"
message rather than a raw decode error.
The ``*_func`` parameters and ``timeout`` are injection points for
deterministic testing.
"""
if max_retries is None:
max_retries = DEFAULT_MAX_RETRIES
if base_delay is None:
base_delay = DEFAULT_BASE_DELAY
if max_delay is None:
max_delay = DEFAULT_MAX_DELAY
if timeout is None:
timeout = DEFAULT_HTTP_TIMEOUT
data = json.dumps(payload).encode("utf-8") if payload is not None else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Authorization", auth_header)
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
attempt = 0
while True:
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
if e.code == 429 and attempt < max_retries:
header = e.headers.get("Retry-After") if e.headers else None
delay = parse_retry_after(header, now=now_func())
if delay is None:
delay = backoff_delay(attempt, base_delay, max_delay, rand_func)
attempt += 1
sleep_func(delay)
continue
try:
error_body = e.read().decode("utf-8", errors="replace")
except Exception:
error_body = ""
detail = _redact(error_body).strip()
if e.code in (502, 503, 504):
msg = f"HTTP {e.code}: Gitea upstream unavailable"
raise RuntimeError(f"{msg}: {detail}" if detail else msg) from e
raise RuntimeError(f"HTTP {e.code}: {detail}") from e
except (urllib.error.URLError, TimeoutError) as e:
reason = getattr(e, "reason", e)
raise RuntimeError(
f"network error contacting Gitea: {_redact(reason)}"
) from e
if not body:
return None
try:
return json.loads(body)
except ValueError as e:
raise RuntimeError("malformed JSON response from Gitea") from e
def api_get_all(url, auth_header, *, limit=None, page_size=50, max_pages=100,
**kwargs):
"""Fetch a paginated Gitea collection, following page-based pagination.
Issues successive ``GET`` requests with ``page`` and ``limit`` (per-page)
query parameters, accumulating list items until one of:
- a page returns fewer items than the page size (the last page),
- an empty or ``None`` page is returned (also treated as the end — this is
how missing/malformed pagination metadata degrades safely),
- *limit* total items have been collected, or
- *max_pages* pages have been fetched (a safety cap against runaway loops).
Pagination relies on the *length of each returned page*, not on
``X-Total-Count`` / ``Link`` headers, so it tolerates missing or malformed
pagination metadata. Returns a list (possibly empty). Raises ``RuntimeError``
(via :func:`api_request`) on network/HTTP/malformed failures, or if a page is
not a JSON list. Extra ``kwargs`` pass through to :func:`api_request`.
"""
if page_size < 1:
page_size = 1
if page_size > 50:
page_size = 50 # Gitea caps per-page results at 50
if limit is not None and limit < page_size:
page_size = max(1, limit)
results = []
for page in range(1, max_pages + 1):
page_url = _add_query(url, page=page, limit=page_size)
data = api_request("GET", page_url, auth_header, **kwargs)
if data is None:
break
if not isinstance(data, list):
raise RuntimeError(
f"expected a list page from Gitea, got {type(data).__name__}"
)
results.extend(data)
if limit is not None and len(results) >= limit:
return results[:limit]
if len(data) < page_size:
break
return results
def repo_api_url(host, org, repo):
"""Return the base API URL for a repo: https://host/api/v1/repos/org/repo"""
return f"https://{host}/api/v1/repos/{org}/{repo}"
def get_profile():
"""Return safe runtime *profile* metadata for this MCP process.
A runtime profile is how the same server code is launched as separate MCP
entries (e.g. ``gitea-tools-author`` vs ``gitea-tools-reviewer``): each
process is configured with its own token *and* its own profile name via
environment variables. This function reads only the non-secret profile
metadata:
- ``GITEA_PROFILE_NAME`` — a human label for the running profile.
- ``GITEA_ALLOWED_OPERATIONS`` — optional comma-separated operation
categories (descriptive only; not enforced here).
- ``GITEA_FORBIDDEN_OPERATIONS`` — optional comma-separated operation
categories this profile must not perform (descriptive only).
- ``GITEA_AUDIT_LABEL`` — optional short label for audit records.
- ``GITEA_TOKEN_SOURCE`` — optional *name* of the secret source
(e.g. an env var name). This is a name only, never a token value.
- ``GITEA_BASE_URL`` — optional informational base URL.
It never reads, returns, or logs ``GITEA_TOKEN`` or any credential. The
token continues to be resolved separately by ``get_auth_header`` and is
never part of this metadata. Callers may surface the result safely.
A JSON runtime-profile config (``GITEA_MCP_CONFIG`` + ``GITEA_MCP_PROFILE``,
see ``gitea_config``) may supply these same fields as a base layer. Explicit
environment variables always override the JSON profile; the JSON profile
only fills fields the environment leaves unset. With no config configured,
behaviour is exactly the environment-only behaviour above.
Returns:
dict with 'profile_name', 'allowed_operations' (list),
'forbidden_operations' (list), 'audit_label', 'token_source_name',
'base_url', 'username', and 'default_owner'. ``profile_name`` maps to a
JSON profile's ``execution_profile``; ``token_source_name`` is the
non-secret auth reference name (env var name or ``keychain:<id>``).
"""
# JSON layer (base). None when GITEA_MCP_CONFIG is unset; raises ConfigError
# on a misconfigured file/profile so the problem surfaces clearly at startup.
jp = gitea_config.resolve_profile() or {}
def _env_csv(env_key):
raw = os.environ.get(env_key)
if raw is None:
return None
return [o.strip() for o in raw.split(",") if o.strip()]
def _json_list(key):
val = jp.get(key)
return list(val) if isinstance(val, (list, tuple)) else []
# profile_name: env > JSON execution_profile > default.
name = (os.environ.get("GITEA_PROFILE_NAME")
or jp.get("execution_profile") or "gitea-default")
name = str(name).strip() or "gitea-default"
ops = _env_csv("GITEA_ALLOWED_OPERATIONS")
if ops is None:
ops = _json_list("allowed_operations")
forbidden = _env_csv("GITEA_FORBIDDEN_OPERATIONS")
if forbidden is None:
forbidden = _json_list("forbidden_operations")
audit_label = (os.environ.get("GITEA_AUDIT_LABEL") or "").strip() \
or (jp.get("audit_label") or None)
# A *name* of the token source (env var name / keychain id), never a value.
token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() \
or gitea_config.auth_source_name(jp)
base_url = os.environ.get("GITEA_BASE_URL") or jp.get("base_url") or None
return {
"profile_name": name,
"allowed_operations": ops,
"forbidden_operations": forbidden,
"audit_label": audit_label,
"token_source_name": token_source,
"base_url": base_url,
"username": jp.get("username") or None,
"default_owner": jp.get("default_owner") or None,
}