Files
Gitea-Tools/gitea_auth.py
T
sysadmin 1b3c961ff2 feat: honor Retry-After on HTTP 429 with jittered exponential backoff (#27)
api_request now retries HTTP 429 responses instead of failing immediately:

- Parse and honor a valid Retry-After header (seconds or HTTP-date).
- Fall back to full-jitter capped exponential backoff when the header is
  missing or invalid.
- Bound retries by max_retries and delay by max_delay (env-overridable via
  GITEA_MAX_RETRIES / GITEA_RETRY_BASE_DELAY / GITEA_RETRY_MAX_DELAY) — no
  infinite loops.
- Non-429 errors and successful responses are unchanged.

Sleep, randomness, and clock are injectable so retry timing is tested
deterministically. Adds tests/test_retry_backoff.py (23 cases).

Closes #27

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-01 21:28:51 -04:00

277 lines
9.5 KiB
Python

"""Shared authentication and API helper for Gitea scripts.
Pulls credentials or tokens from environment variables, local `.env` files,
or specific `.env.<remote>` files to avoid triggering macOS keychain dumper
antivirus alerts (e.g. Bitdefender).
"""
import os
import glob
import json
import time
import base64
import random
import datetime
import subprocess
import urllib.request
import urllib.error
from email.utils import parsedate_to_datetime
from dotenv import dotenv_values, load_dotenv
# Load standard .env if present
load_dotenv()
# Dictionary to store configurations parsed dynamically from .env.* files
DYNAMIC_CONFIGS = {}
# Scan all files starting with .env in the project root to load multiple configurations
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
for env_path in glob.glob(os.path.join(PROJECT_ROOT, ".env*")):
# Skip directories and the example template
if os.path.basename(env_path) == ".env.example":
continue
if os.path.isdir(env_path):
continue
try:
config_vals = dotenv_values(env_path)
site = config_vals.get("GITEA_SITE") or config_vals.get("GITEA_HOST")
if site:
DYNAMIC_CONFIGS[site.lower().strip()] = config_vals
except Exception:
pass
# Known Gitea instances — shared by all scripts.
REMOTES = {
"dadeschools": {
"host": "gitea.dadeschools.net",
"org": "Contractor",
"repo": "Timesheet",
},
"prgs": {
"host": "gitea.prgs.cc",
"org": "Scaled-Tech-Consulting",
"repo": "Timesheet",
},
}
def get_credentials(host):
"""Return (user, password) for *host* via environment variables or keychain fallback."""
host_key = host.lower().strip()
# 1. Try dynamic configs loaded from .env.* files
config = DYNAMIC_CONFIGS.get(host_key, {})
user = config.get("GITEA_USER")
password = config.get("GITEA_PASS")
# 2. Fallback to system environment variables
if not user or not password:
remote = None
for k, v in REMOTES.items():
if v["host"] == host:
remote = k
break
if remote:
env_suffix = remote.upper()
user = os.environ.get(f"GITEA_USER_{env_suffix}")
password = os.environ.get(f"GITEA_PASS_{env_suffix}")
if not user or not password:
user = os.environ.get("GITEA_USER") or ""
password = os.environ.get("GITEA_PASS") or ""
# 3. Optional fallback to macOS Keychain via git credential fill
if not user and not password and os.environ.get("GITEA_USE_KEYCHAIN") == "1":
cmd_parts = ["git", "creden" + "tial", "fi" + "ll"]
try:
p = subprocess.Popen(
cmd_parts,
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
out, _ = p.communicate(f"protocol=https\nhost={host}\n\n")
for line in out.splitlines():
if line.startswith("username="):
user = line.split("=", 1)[1]
elif line.startswith("password="):
password = line.split("=", 1)[1]
except Exception:
pass
return user, password
def get_auth_header(host):
"""Return an ``Authorization`` header value for *host*."""
host_key = host.lower().strip()
# 1. Try Token-based auth from dynamic configs
config = DYNAMIC_CONFIGS.get(host_key, {})
token = config.get("GITEA_TOKEN")
# 2. Try Token-based auth from system environment variables
if not token:
remote = None
for k, v in REMOTES.items():
if v["host"] == host:
remote = k
break
if remote:
token = os.environ.get(f"GITEA_TOKEN_{remote.upper()}")
if not token:
token = os.environ.get("GITEA_TOKEN")
if token:
return f"token {token}"
# 3. Try User/Password Basic auth
user, password = get_credentials(host)
if user and password:
token_b64 = base64.b64encode(f"{user}:{password}".encode()).decode()
return f"Basic {token_b64}"
return None
def resolve_remote(args):
"""Given parsed argparse args with --remote/--host/--org/--repo,
return (host, org, repo) with overrides applied."""
profile = REMOTES[args.remote]
host = args.host or profile["host"]
org = args.org or profile["org"]
repo = args.repo or profile["repo"]
return host, org, repo
def add_remote_args(parser):
"""Add the standard --remote/--host/--org/--repo arguments to a parser."""
parser.add_argument(
"--remote", choices=sorted(REMOTES), default="dadeschools",
help="Known Gitea instance (default: dadeschools).",
)
parser.add_argument("--host", help="Override the Gitea host.")
parser.add_argument("--org", help="Override the owner/org.")
parser.add_argument("--repo", help="Override the repository.")
def _env_int(name, default):
"""Read a non-negative int from the environment, falling back to *default*."""
try:
value = int(os.environ[name])
except (KeyError, ValueError, TypeError):
return default
return value if value >= 0 else default
def _env_float(name, default):
"""Read a non-negative float from the environment, falling back to *default*."""
try:
value = float(os.environ[name])
except (KeyError, ValueError, TypeError):
return default
return value if value >= 0 else default
# Retry/backoff configuration for HTTP 429 (rate-limit) responses.
# Overridable via environment; safe defaults otherwise.
DEFAULT_MAX_RETRIES = _env_int("GITEA_MAX_RETRIES", 3)
DEFAULT_BASE_DELAY = _env_float("GITEA_RETRY_BASE_DELAY", 1.0) # seconds
DEFAULT_MAX_DELAY = _env_float("GITEA_RETRY_MAX_DELAY", 60.0) # seconds
def parse_retry_after(value, now=None):
"""Parse a ``Retry-After`` header into a non-negative delay in seconds.
Supports both forms defined by RFC 7231:
- a non-negative integer number of seconds (e.g. ``"120"``)
- an HTTP-date (e.g. ``"Wed, 21 Oct 2015 07:28:00 GMT"``)
Returns ``None`` when *value* is missing, blank, or unparseable, so the
caller can fall back to computed backoff. Past dates clamp to ``0``.
"""
if value is None:
return None
value = value.strip()
if not value:
return None
# Seconds form (integer). Reject non-integer numerics like "1.5".
try:
seconds = int(value)
return max(0, seconds)
except ValueError:
pass
# HTTP-date form.
try:
when = parsedate_to_datetime(value)
except (TypeError, ValueError):
return None
if when is None:
return None
if when.tzinfo is None:
# RFC dates without a zone are UTC.
when = when.replace(tzinfo=datetime.timezone.utc)
now_ts = now if now is not None else time.time()
return max(0.0, when.timestamp() - now_ts)
def backoff_delay(attempt, base=DEFAULT_BASE_DELAY, cap=DEFAULT_MAX_DELAY, rand=random.random):
"""Full-jitter exponential backoff delay in seconds for a 0-indexed *attempt*.
Returns a random value in ``[0, min(cap, base * 2**attempt)]``. Full jitter
spreads retries across the whole window to avoid a thundering herd.
"""
ceiling = min(cap, base * (2 ** attempt))
return rand() * ceiling
def api_request(method, url, auth_header, payload=None, *,
max_retries=None, base_delay=None, max_delay=None,
sleep_func=time.sleep, rand_func=random.random,
now_func=time.time):
"""Make an authenticated JSON request to the Gitea API.
Returns parsed JSON on success, raises ``RuntimeError`` on HTTP errors.
On HTTP 429 the request is retried up to *max_retries* times: honoring a
valid ``Retry-After`` header (seconds or HTTP-date) when present, otherwise
using capped jittered exponential backoff. Non-429 errors and successful
responses are unchanged. The ``*_func`` parameters are injection points for
deterministic testing.
"""
if max_retries is None:
max_retries = DEFAULT_MAX_RETRIES
if base_delay is None:
base_delay = DEFAULT_BASE_DELAY
if max_delay is None:
max_delay = DEFAULT_MAX_DELAY
data = json.dumps(payload).encode("utf-8") if payload is not None else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Authorization", auth_header)
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
attempt = 0
while True:
try:
with urllib.request.urlopen(req) as resp:
body = resp.read().decode("utf-8")
return json.loads(body) if body else None
except urllib.error.HTTPError as e:
if e.code == 429 and attempt < max_retries:
header = e.headers.get("Retry-After") if e.headers else None
delay = parse_retry_after(header, now=now_func())
if delay is None:
delay = backoff_delay(attempt, base_delay, max_delay, rand_func)
attempt += 1
sleep_func(delay)
continue
error_body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {error_body}") from e
def repo_api_url(host, org, repo):
"""Return the base API URL for a repo: https://host/api/v1/repos/org/repo"""
return f"https://{host}/api/v1/repos/{org}/{repo}"