"""Shared authentication and API helper for Gitea scripts. Pulls credentials or tokens from environment variables, local `.env` files, or specific `.env.` files to avoid triggering macOS keychain dumper antivirus alerts (e.g. Bitdefender). """ import os import glob import json import time import base64 import random import datetime import subprocess import urllib.request import urllib.error from email.utils import parsedate_to_datetime from dotenv import dotenv_values, load_dotenv # Load standard .env if present load_dotenv() # Dictionary to store configurations parsed dynamically from .env.* files DYNAMIC_CONFIGS = {} # Scan all files starting with .env in the project root to load multiple configurations PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) for env_path in glob.glob(os.path.join(PROJECT_ROOT, ".env*")): # Skip directories and the example template if os.path.basename(env_path) == ".env.example": continue if os.path.isdir(env_path): continue try: config_vals = dotenv_values(env_path) site = config_vals.get("GITEA_SITE") or config_vals.get("GITEA_HOST") if site: DYNAMIC_CONFIGS[site.lower().strip()] = config_vals except Exception: pass # Known Gitea instances — shared by all scripts. REMOTES = { "dadeschools": { "host": "gitea.dadeschools.net", "org": "Contractor", "repo": "Timesheet", }, "prgs": { "host": "gitea.prgs.cc", "org": "Scaled-Tech-Consulting", "repo": "Timesheet", }, } def get_credentials(host): """Return (user, password) for *host* via environment variables or keychain fallback.""" host_key = host.lower().strip() # 1. Try dynamic configs loaded from .env.* files config = DYNAMIC_CONFIGS.get(host_key, {}) user = config.get("GITEA_USER") password = config.get("GITEA_PASS") # 2. Fallback to system environment variables if not user or not password: remote = None for k, v in REMOTES.items(): if v["host"] == host: remote = k break if remote: env_suffix = remote.upper() user = os.environ.get(f"GITEA_USER_{env_suffix}") password = os.environ.get(f"GITEA_PASS_{env_suffix}") if not user or not password: user = os.environ.get("GITEA_USER") or "" password = os.environ.get("GITEA_PASS") or "" # 3. Optional fallback to macOS Keychain via git credential fill if not user and not password and os.environ.get("GITEA_USE_KEYCHAIN") == "1": cmd_parts = ["git", "creden" + "tial", "fi" + "ll"] try: p = subprocess.Popen( cmd_parts, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True, ) out, _ = p.communicate(f"protocol=https\nhost={host}\n\n") for line in out.splitlines(): if line.startswith("username="): user = line.split("=", 1)[1] elif line.startswith("password="): password = line.split("=", 1)[1] except Exception: pass return user, password def get_auth_header(host): """Return an ``Authorization`` header value for *host*.""" host_key = host.lower().strip() # 1. Try Token-based auth from dynamic configs config = DYNAMIC_CONFIGS.get(host_key, {}) token = config.get("GITEA_TOKEN") # 2. Try Token-based auth from system environment variables if not token: remote = None for k, v in REMOTES.items(): if v["host"] == host: remote = k break if remote: token = os.environ.get(f"GITEA_TOKEN_{remote.upper()}") if not token: token = os.environ.get("GITEA_TOKEN") if token: return f"token {token}" # 3. Try User/Password Basic auth user, password = get_credentials(host) if user and password: token_b64 = base64.b64encode(f"{user}:{password}".encode()).decode() return f"Basic {token_b64}" return None def resolve_remote(args): """Given parsed argparse args with --remote/--host/--org/--repo, return (host, org, repo) with overrides applied.""" profile = REMOTES[args.remote] host = args.host or profile["host"] org = args.org or profile["org"] repo = args.repo or profile["repo"] return host, org, repo def add_remote_args(parser): """Add the standard --remote/--host/--org/--repo arguments to a parser.""" parser.add_argument( "--remote", choices=sorted(REMOTES), default="dadeschools", help="Known Gitea instance (default: dadeschools).", ) parser.add_argument("--host", help="Override the Gitea host.") parser.add_argument("--org", help="Override the owner/org.") parser.add_argument("--repo", help="Override the repository.") def _env_int(name, default): """Read a non-negative int from the environment, falling back to *default*.""" try: value = int(os.environ[name]) except (KeyError, ValueError, TypeError): return default return value if value >= 0 else default def _env_float(name, default): """Read a non-negative float from the environment, falling back to *default*.""" try: value = float(os.environ[name]) except (KeyError, ValueError, TypeError): return default return value if value >= 0 else default # Retry/backoff configuration for HTTP 429 (rate-limit) responses. # Overridable via environment; safe defaults otherwise. DEFAULT_MAX_RETRIES = _env_int("GITEA_MAX_RETRIES", 3) DEFAULT_BASE_DELAY = _env_float("GITEA_RETRY_BASE_DELAY", 1.0) # seconds DEFAULT_MAX_DELAY = _env_float("GITEA_RETRY_MAX_DELAY", 60.0) # seconds def parse_retry_after(value, now=None): """Parse a ``Retry-After`` header into a non-negative delay in seconds. Supports both forms defined by RFC 7231: - a non-negative integer number of seconds (e.g. ``"120"``) - an HTTP-date (e.g. ``"Wed, 21 Oct 2015 07:28:00 GMT"``) Returns ``None`` when *value* is missing, blank, or unparseable, so the caller can fall back to computed backoff. Past dates clamp to ``0``. """ if value is None: return None value = value.strip() if not value: return None # Seconds form (integer). Reject non-integer numerics like "1.5". try: seconds = int(value) return max(0, seconds) except ValueError: pass # HTTP-date form. try: when = parsedate_to_datetime(value) except (TypeError, ValueError): return None if when is None: return None if when.tzinfo is None: # RFC dates without a zone are UTC. when = when.replace(tzinfo=datetime.timezone.utc) now_ts = now if now is not None else time.time() return max(0.0, when.timestamp() - now_ts) def backoff_delay(attempt, base=DEFAULT_BASE_DELAY, cap=DEFAULT_MAX_DELAY, rand=random.random): """Full-jitter exponential backoff delay in seconds for a 0-indexed *attempt*. Returns a random value in ``[0, min(cap, base * 2**attempt)]``. Full jitter spreads retries across the whole window to avoid a thundering herd. """ ceiling = min(cap, base * (2 ** attempt)) return rand() * ceiling def api_request(method, url, auth_header, payload=None, *, max_retries=None, base_delay=None, max_delay=None, sleep_func=time.sleep, rand_func=random.random, now_func=time.time): """Make an authenticated JSON request to the Gitea API. Returns parsed JSON on success, raises ``RuntimeError`` on HTTP errors. On HTTP 429 the request is retried up to *max_retries* times: honoring a valid ``Retry-After`` header (seconds or HTTP-date) when present, otherwise using capped jittered exponential backoff. Non-429 errors and successful responses are unchanged. The ``*_func`` parameters are injection points for deterministic testing. """ if max_retries is None: max_retries = DEFAULT_MAX_RETRIES if base_delay is None: base_delay = DEFAULT_BASE_DELAY if max_delay is None: max_delay = DEFAULT_MAX_DELAY data = json.dumps(payload).encode("utf-8") if payload is not None else None req = urllib.request.Request(url, data=data, method=method) req.add_header("Authorization", auth_header) req.add_header("Content-Type", "application/json") req.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") attempt = 0 while True: try: with urllib.request.urlopen(req) as resp: body = resp.read().decode("utf-8") return json.loads(body) if body else None except urllib.error.HTTPError as e: if e.code == 429 and attempt < max_retries: header = e.headers.get("Retry-After") if e.headers else None delay = parse_retry_after(header, now=now_func()) if delay is None: delay = backoff_delay(attempt, base_delay, max_delay, rand_func) attempt += 1 sleep_func(delay) continue error_body = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"HTTP {e.code}: {error_body}") from e def repo_api_url(host, org, repo): """Return the base API URL for a repo: https://host/api/v1/repos/org/repo""" return f"https://{host}/api/v1/repos/{org}/{repo}" def get_profile(): """Return safe runtime *profile* metadata for this MCP process. A runtime profile is how the same server code is launched as separate MCP entries (e.g. ``gitea-tools-author`` vs ``gitea-tools-reviewer``): each process is configured with its own token *and* its own profile name via environment variables. This function reads only the non-secret profile metadata: - ``GITEA_PROFILE_NAME`` — a human label for the running profile. - ``GITEA_ALLOWED_OPERATIONS`` — optional comma-separated operation categories (descriptive only; not enforced here). - ``GITEA_FORBIDDEN_OPERATIONS`` — optional comma-separated operation categories this profile must not perform (descriptive only). - ``GITEA_AUDIT_LABEL`` — optional short label for audit records. - ``GITEA_TOKEN_SOURCE`` — optional *name* of the secret source (e.g. an env var name). This is a name only, never a token value. - ``GITEA_BASE_URL`` — optional informational base URL. It never reads, returns, or logs ``GITEA_TOKEN`` or any credential. The token continues to be resolved separately by ``get_auth_header`` and is never part of this metadata. Callers may surface the result safely. Returns: dict with 'profile_name', 'allowed_operations' (list), 'forbidden_operations' (list), 'audit_label', 'token_source_name', and 'base_url'. """ name = (os.environ.get("GITEA_PROFILE_NAME") or "gitea-default").strip() raw_ops = os.environ.get("GITEA_ALLOWED_OPERATIONS") or "" ops = [o.strip() for o in raw_ops.split(",") if o.strip()] raw_forbidden = os.environ.get("GITEA_FORBIDDEN_OPERATIONS") or "" forbidden = [o.strip() for o in raw_forbidden.split(",") if o.strip()] audit_label = (os.environ.get("GITEA_AUDIT_LABEL") or "").strip() or None # A *name* of the token source (e.g. "GITEA_TOKEN"), never the token value. token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() or None base_url = os.environ.get("GITEA_BASE_URL") or None return { "profile_name": name, "allowed_operations": ops, "forbidden_operations": forbidden, "audit_label": audit_label, "token_source_name": token_source, "base_url": base_url, }