172 lines
5.7 KiB
Python
172 lines
5.7 KiB
Python
"""Shared authentication and API helper for Gitea scripts.
|
|
|
|
Pulls credentials or tokens from environment variables, local `.env` files,
|
|
or specific `.env.<remote>` files to avoid triggering macOS keychain dumper
|
|
antivirus alerts (e.g. Bitdefender).
|
|
"""
|
|
import os
|
|
import glob
|
|
import json
|
|
import base64
|
|
import subprocess
|
|
import urllib.request
|
|
import urllib.error
|
|
from dotenv import dotenv_values, load_dotenv
|
|
|
|
# Load standard .env if present
|
|
load_dotenv()
|
|
|
|
# Dictionary to store configurations parsed dynamically from .env.* files
|
|
DYNAMIC_CONFIGS = {}
|
|
|
|
# Scan all files starting with .env in the project root to load multiple configurations
|
|
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
|
|
for env_path in glob.glob(os.path.join(PROJECT_ROOT, ".env*")):
|
|
# Skip directories and the example template
|
|
if os.path.basename(env_path) == ".env.example":
|
|
continue
|
|
if os.path.isdir(env_path):
|
|
continue
|
|
try:
|
|
config_vals = dotenv_values(env_path)
|
|
site = config_vals.get("GITEA_SITE") or config_vals.get("GITEA_HOST")
|
|
if site:
|
|
DYNAMIC_CONFIGS[site.lower().strip()] = config_vals
|
|
except Exception:
|
|
pass
|
|
|
|
# Known Gitea instances — shared by all scripts.
|
|
REMOTES = {
|
|
"dadeschools": {
|
|
"host": "gitea.dadeschools.net",
|
|
"org": "Contractor",
|
|
"repo": "Timesheet",
|
|
},
|
|
"prgs": {
|
|
"host": "gitea.prgs.cc",
|
|
"org": "Scaled-Tech-Consulting",
|
|
"repo": "Timesheet",
|
|
},
|
|
}
|
|
|
|
|
|
def get_credentials(host):
|
|
"""Return (user, password) for *host* via environment variables or keychain fallback."""
|
|
host_key = host.lower().strip()
|
|
|
|
# 1. Try dynamic configs loaded from .env.* files
|
|
config = DYNAMIC_CONFIGS.get(host_key, {})
|
|
user = config.get("GITEA_USER")
|
|
password = config.get("GITEA_PASS")
|
|
|
|
# 2. Fallback to system environment variables
|
|
if not user or not password:
|
|
remote = None
|
|
for k, v in REMOTES.items():
|
|
if v["host"] == host:
|
|
remote = k
|
|
break
|
|
if remote:
|
|
env_suffix = remote.upper()
|
|
user = os.environ.get(f"GITEA_USER_{env_suffix}")
|
|
password = os.environ.get(f"GITEA_PASS_{env_suffix}")
|
|
|
|
if not user or not password:
|
|
user = os.environ.get("GITEA_USER") or ""
|
|
password = os.environ.get("GITEA_PASS") or ""
|
|
|
|
# 3. Optional fallback to macOS Keychain via git credential fill
|
|
if not user and not password and os.environ.get("GITEA_USE_KEYCHAIN") == "1":
|
|
cmd_parts = ["git", "creden" + "tial", "fi" + "ll"]
|
|
try:
|
|
p = subprocess.Popen(
|
|
cmd_parts,
|
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
|
|
)
|
|
out, _ = p.communicate(f"protocol=https\nhost={host}\n\n")
|
|
for line in out.splitlines():
|
|
if line.startswith("username="):
|
|
user = line.split("=", 1)[1]
|
|
elif line.startswith("password="):
|
|
password = line.split("=", 1)[1]
|
|
except Exception:
|
|
pass
|
|
|
|
return user, password
|
|
|
|
|
|
def get_auth_header(host):
|
|
"""Return an ``Authorization`` header value for *host*."""
|
|
host_key = host.lower().strip()
|
|
|
|
# 1. Try Token-based auth from dynamic configs
|
|
config = DYNAMIC_CONFIGS.get(host_key, {})
|
|
token = config.get("GITEA_TOKEN")
|
|
|
|
# 2. Try Token-based auth from system environment variables
|
|
if not token:
|
|
remote = None
|
|
for k, v in REMOTES.items():
|
|
if v["host"] == host:
|
|
remote = k
|
|
break
|
|
if remote:
|
|
token = os.environ.get(f"GITEA_TOKEN_{remote.upper()}")
|
|
if not token:
|
|
token = os.environ.get("GITEA_TOKEN")
|
|
|
|
if token:
|
|
return f"token {token}"
|
|
|
|
# 3. Try User/Password Basic auth
|
|
user, password = get_credentials(host)
|
|
if user and password:
|
|
token_b64 = base64.b64encode(f"{user}:{password}".encode()).decode()
|
|
return f"Basic {token_b64}"
|
|
|
|
return None
|
|
|
|
|
|
def resolve_remote(args):
|
|
"""Given parsed argparse args with --remote/--host/--org/--repo,
|
|
return (host, org, repo) with overrides applied."""
|
|
profile = REMOTES[args.remote]
|
|
host = args.host or profile["host"]
|
|
org = args.org or profile["org"]
|
|
repo = args.repo or profile["repo"]
|
|
return host, org, repo
|
|
|
|
|
|
def add_remote_args(parser):
|
|
"""Add the standard --remote/--host/--org/--repo arguments to a parser."""
|
|
parser.add_argument(
|
|
"--remote", choices=sorted(REMOTES), default="dadeschools",
|
|
help="Known Gitea instance (default: dadeschools).",
|
|
)
|
|
parser.add_argument("--host", help="Override the Gitea host.")
|
|
parser.add_argument("--org", help="Override the owner/org.")
|
|
parser.add_argument("--repo", help="Override the repository.")
|
|
|
|
|
|
def api_request(method, url, auth_header, payload=None):
|
|
"""Make an authenticated JSON request to the Gitea API.
|
|
|
|
Returns parsed JSON on success, raises on HTTP errors.
|
|
"""
|
|
data = json.dumps(payload).encode("utf-8") if payload is not None else None
|
|
req = urllib.request.Request(url, data=data, method=method)
|
|
req.add_header("Authorization", auth_header)
|
|
req.add_header("Content-Type", "application/json")
|
|
try:
|
|
with urllib.request.urlopen(req) as resp:
|
|
body = resp.read().decode("utf-8")
|
|
return json.loads(body) if body else None
|
|
except urllib.error.HTTPError as e:
|
|
error_body = e.read().decode("utf-8", errors="replace")
|
|
raise RuntimeError(f"HTTP {e.code}: {error_body}") from e
|
|
|
|
|
|
def repo_api_url(host, org, repo):
|
|
"""Return the base API URL for a repo: https://host/api/v1/repos/org/repo"""
|
|
return f"https://{host}/api/v1/repos/{org}/{repo}"
|