From 835fbbf3243cded5100b33066289fe2bbfc1adb0 Mon Sep 17 00:00:00 2001 From: Jason Walker <913443@dadeschools.net> Date: Wed, 1 Jul 2026 23:32:24 -0400 Subject: [PATCH] feat: interactive setup menu for canonical Gitea MCP profiles (#31) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an interactive utility so users create/edit/validate canonical runtime profiles and generate safe LLM launcher snippets without hand-editing JSON or pasting tokens into Claude/Gemini/Codex configs. Run: `python gitea_config.py menu` (or `python gitea_config_menu.py`). gitea_config.py — pure, testable authoring helpers: - is_valid_profile_name, build_profile, keychain_auth/env_auth, empty_config - validate_config (reports missing base_url/auth, inline token/password — never echoing the secret value) - add_profile (preserves existing, rejects dup/invalid name/missing base_url), upsert_profile, remove_profile - save_config: mkdir parents + atomic temp-then-os.replace, pretty JSON - launcher_entry: thin MCP entry (command/args + GITEA_MCP_CONFIG/PROFILE only) - keychain_set: store a token via `security add-generic-password` (token passed as an arg, never returned/printed/logged; injectable runner) - `menu` __main__ dispatch gitea_config_menu.py — interactive loop with fully injectable IO/secret/HTTP/ keychain so it is testable without a real terminal, keychain, or network: - list / add / edit / remove / validate profiles - test authentication + show authenticated user (calls /user only on request) - reviewer-eligibility helper (authenticated user vs PR author, open state) — read-only, never approves/merges - launcher snippets for Claude / Gemini / Codex (no secrets) Security: tokens are never written to profiles.json, launcher snippets, logs, or errors — only keychain ids / env var names are stored. Backwards compatible: menu is optional; env-only mode and MCP server startup are unchanged. Tests: tests/test_config_menu.py (21 cases) — name validation, preserve-on-add, dup/invalid/missing-field rejection, atomic write (+ replace-failure leaves the original intact, no temp debris), keychain_set stores-without-printing, launcher snippets secret-free, eligibility eligible/self-author/closed, and a full menu add→list→quit flow proving the token value never reaches disk or stdout. Stacked on #30 (canonical profiles); base branch feat/json-runtime-profiles. Refs #10, #19. Closes #31. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 13 ++ gitea_config.py | 213 +++++++++++++++++++++++++++ gitea_config_menu.py | 235 ++++++++++++++++++++++++++++++ tests/test_config_menu.py | 295 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 756 insertions(+) create mode 100644 gitea_config_menu.py create mode 100644 tests/test_config_menu.py diff --git a/README.md b/README.md index 093fbb5..cea59eb 100644 --- a/README.md +++ b/README.md @@ -252,6 +252,19 @@ var for the secret), then delete the `GITEA_USER_*` / `GITEA_PASS_*` / `GITEA_SITE_*` blocks from every LLM `mcp_config.json`, leaving only `GITEA_MCP_CONFIG` + `GITEA_MCP_PROFILE`. Existing env-only setups keep working unchanged until migrated. + +**Interactive setup — no hand-editing JSON.** Run the menu to create/edit/ +validate profiles, store a token in the macOS keychain (never echoed or written +to any config), test a profile's authentication, print the authenticated user, +check reviewer eligibility for a PR, and generate ready-to-paste launcher +snippets for Claude / Gemini / Codex: + +```bash +python gitea_config.py menu +``` + +The generated launcher snippets contain only `command`, `args`, +`GITEA_MCP_CONFIG`, and `GITEA_MCP_PROFILE` — never a token or password.
diff --git a/gitea_config.py b/gitea_config.py index 0478cca..69ac7d1 100644 --- a/gitea_config.py +++ b/gitea_config.py @@ -44,7 +44,10 @@ Design constraints: tokens, or passwords. """ import os +import re +import sys import json +import tempfile import subprocess ENV_CONFIG_PATH = "GITEA_MCP_CONFIG" @@ -53,6 +56,14 @@ ENV_PROFILE = "GITEA_MCP_PROFILE" SUPPORTED_VERSION = 1 _AUTH_TYPES = ("keychain", "env") +# Profile names go into env vars, keychain ids, and JSON keys — keep them tame. +_PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$") + +# Default canonical config location (one file shared by all LLM launchers). +DEFAULT_CONFIG_PATH = os.path.join( + os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json" +) + class ConfigError(Exception): """Raised for a bad config file, profile selection, or secret reference. @@ -234,3 +245,205 @@ def resolve_token(profile, keychain_lookup=_keychain_token): ) return value raise ConfigError(f"unsupported auth type {atype!r} in the selected profile") + + +# ── Config authoring helpers (used by the interactive menu; pure + testable) ──── + +def is_valid_profile_name(name): + """True if *name* is a safe profile key (alnum, dot, dash, underscore).""" + return bool(name) and bool(_PROFILE_NAME_RE.match(name)) + + +def keychain_auth(item_id): + """Build a keychain auth reference.""" + return {"type": "keychain", "id": item_id} + + +def env_auth(var_name): + """Build an env auth reference.""" + return {"type": "env", "name": var_name} + + +def build_profile(*, base_url, auth, username=None, default_owner=None, + default_repo=None, execution_profile=None): + """Assemble a profile dict, omitting empty optional fields. No secrets.""" + profile = {"base_url": base_url, "auth": auth} + if username: + profile["username"] = username + if default_owner: + profile["default_owner"] = default_owner + if default_repo: + profile["default_repo"] = default_repo + if execution_profile: + profile["execution_profile"] = execution_profile + return profile + + +def empty_config(): + """Return a fresh, valid canonical config with no profiles.""" + return {"version": SUPPORTED_VERSION, "profiles": {}} + + +def validate_config(config): + """Return a list of human-readable problems with *config* (empty = valid). + + Never includes secret material — profiles carry only auth *references*. + """ + problems = [] + if not isinstance(config, dict): + return ["config is not a JSON object"] + if config.get("version", SUPPORTED_VERSION) != SUPPORTED_VERSION: + problems.append( + f"unsupported version {config.get('version')!r} (expected {SUPPORTED_VERSION})" + ) + profiles = config.get("profiles") + if not isinstance(profiles, dict): + problems.append("missing 'profiles' object") + return problems + for name, profile in profiles.items(): + if not is_valid_profile_name(name): + problems.append(f"invalid profile name {name!r}") + if not isinstance(profile, dict): + problems.append(f"profile '{name}' is not an object") + continue + if not profile.get("base_url"): + problems.append(f"profile '{name}' is missing 'base_url'") + for secret_key in ("token", "password"): + if secret_key in profile: + problems.append( + f"profile '{name}' has an inline '{secret_key}' (use an auth reference)" + ) + try: + _validate_auth(name, profile.get("auth")) + except ConfigError as exc: + problems.append(str(exc)) + else: + if profile.get("auth") is None: + problems.append(f"profile '{name}' is missing an 'auth' reference") + return problems + + +def add_profile(config, name, profile): + """Return a copy of *config* with *profile* added under *name*. + + Preserves existing profiles. Raises :class:`ConfigError` on an invalid name, + a duplicate name, or an invalid profile. + """ + if not is_valid_profile_name(name): + raise ConfigError( + f"invalid profile name {name!r}; use letters, digits, '.', '-', '_'" + ) + profiles = dict(config.get("profiles") or {}) + if name in profiles: + raise ConfigError(f"profile '{name}' already exists; edit or remove it first") + candidate = {"version": config.get("version", SUPPORTED_VERSION), + "profiles": {**profiles, name: profile}} + # Validate just this profile (reuse select_profile's checks). + select_profile(candidate, name) + if not profile.get("base_url"): + raise ConfigError(f"profile '{name}' is missing 'base_url'") + return candidate + + +def upsert_profile(config, name, profile): + """Like :func:`add_profile` but replaces an existing profile (for edits).""" + if not is_valid_profile_name(name): + raise ConfigError(f"invalid profile name {name!r}") + profiles = dict(config.get("profiles") or {}) + profiles[name] = profile + candidate = {"version": config.get("version", SUPPORTED_VERSION), + "profiles": profiles} + select_profile(candidate, name) + return candidate + + +def remove_profile(config, name): + """Return a copy of *config* without profile *name*.""" + profiles = dict(config.get("profiles") or {}) + if name not in profiles: + raise ConfigError(f"profile '{name}' not found") + del profiles[name] + return {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles} + + +def save_config(config, path=None): + """Atomically write *config* to *path* as pretty JSON, creating parent dirs. + + Writes a temp file in the same directory then ``os.replace``s it into place, + so a crash mid-write never truncates an existing config. + """ + path = path or config_path() or DEFAULT_CONFIG_PATH + directory = os.path.dirname(os.path.abspath(path)) + os.makedirs(directory, exist_ok=True) + fd, tmp = tempfile.mkstemp(dir=directory, prefix=".profiles-", suffix=".json") + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + json.dump(config, fh, indent=2, sort_keys=True) + fh.write("\n") + os.replace(tmp, path) + except BaseException: + try: + os.unlink(tmp) + except OSError: + pass + raise + return path + + +def server_command(): + """Return (command, args) that launch this repo's MCP server.""" + root = os.path.dirname(os.path.abspath(__file__)) + python = os.path.join(root, "venv", "bin", "python3") + if not os.path.exists(python): + python = sys.executable + return python, [os.path.join(root, "mcp_server.py")] + + +def launcher_entry(profile_name, config_path=None): + """Return a thin MCP launcher entry for *profile_name*. + + Contains only command/args and the two GITEA_MCP_* env vars — never a token + or password. Suitable for Claude / Gemini / Codex ``mcpServers`` blocks. + """ + command, args = server_command() + return { + "gitea-tools": { + "command": command, + "args": args, + "env": { + "GITEA_MCP_CONFIG": config_path or DEFAULT_CONFIG_PATH, + "GITEA_MCP_PROFILE": profile_name, + }, + } + } + + +def keychain_set(item_id, token, account=None, runner=subprocess.run): + """Store *token* in the macOS keychain under service *item_id*. + + The token is passed to ``security`` as an argument only; it is never + returned, printed, or logged here. *runner* is injectable for testing. + Raises :class:`ConfigError` on failure (without echoing the token). + """ + if not item_id: + raise ConfigError("keychain item id is required") + if not token: + raise ConfigError("refusing to store an empty token") + account = account or os.environ.get("USER") or "gitea-tools" + cmd = ["security", "add-generic-password", "-U", + "-s", item_id, "-a", account, "-w", token] + try: + proc = runner(cmd, capture_output=True, text=True) + except (OSError, subprocess.SubprocessError) as exc: + raise ConfigError(f"could not run keychain store for '{item_id}'") from exc + if getattr(proc, "returncode", 1) != 0: + raise ConfigError(f"keychain store failed for item '{item_id}'") + return item_id + + +if __name__ == "__main__": # pragma: no cover - thin CLI dispatch + if len(sys.argv) > 1 and sys.argv[1] == "menu": + import gitea_config_menu + raise SystemExit(gitea_config_menu.main(sys.argv[2:])) + print("usage: python gitea_config.py menu", file=sys.stderr) + raise SystemExit(2) diff --git a/gitea_config_menu.py b/gitea_config_menu.py new file mode 100644 index 0000000..69c02f1 --- /dev/null +++ b/gitea_config_menu.py @@ -0,0 +1,235 @@ +"""Interactive setup menu for the canonical Gitea MCP profile config (#31). + +Create / edit / remove / validate profiles, test a profile's authentication, +show the authenticated user, check reviewer eligibility for a PR, and print +thin launcher snippets for Claude / Gemini / Codex — without hand-editing JSON +and without ever putting a raw token in a launcher config. + +Run: ``python gitea_config.py menu`` (or ``python gitea_config_menu.py``) + +Every side effect (stdin, secret prompt, stdout, keychain store, token +resolution, HTTP) is injectable so the menu logic is testable without a real +keychain, terminal, or network. +""" +import sys +import json +import getpass +import urllib.parse + +import gitea_config +import gitea_auth + + +def _host(base_url): + """Extract the host from a profile base_url (e.g. https://gitea.x → gitea.x).""" + netloc = urllib.parse.urlparse(base_url or "").netloc + return netloc or (base_url or "").strip() + + +def resolve_identity(profile, *, resolve_token=gitea_config.resolve_token, + api_request=gitea_auth.api_request): + """Return the authenticated username for *profile*, or raise ConfigError. + + Resolves the profile's auth reference to a token (keychain/env), calls the + read-only current-user endpoint, and returns ``login``. Never returns or + logs the token. + """ + token = resolve_token(profile) + if not token: + raise gitea_config.ConfigError("no token resolved for this profile") + host = _host(profile.get("base_url")) + data = api_request("GET", f"https://{host}/api/v1/user", f"token {token}") + login = (data or {}).get("login") + if not login: + raise gitea_config.ConfigError("could not determine authenticated user") + return login + + +def check_eligibility(profile, pr_number, owner=None, repo=None, *, + resolve_token=gitea_config.resolve_token, + api_request=gitea_auth.api_request): + """Report whether *profile* is reviewer-eligible for a PR. Read-only. + + Eligible ≙ the PR is open AND the authenticated user is NOT the PR author. + Never approves or merges. Returns a dict with the facts and a verdict. + """ + owner = owner or profile.get("default_owner") + repo = repo or profile.get("default_repo") + if not owner or not repo: + raise gitea_config.ConfigError( + "owner/repo required (set default_owner/default_repo or pass them)" + ) + token = resolve_token(profile) + if not token: + raise gitea_config.ConfigError("no token resolved for this profile") + auth = f"token {token}" + host = _host(profile.get("base_url")) + me = (api_request("GET", f"https://{host}/api/v1/user", auth) or {}).get("login") + pr = api_request( + "GET", f"https://{host}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}", auth + ) or {} + author = (pr.get("user") or {}).get("login") + state = pr.get("state") + reasons = [] + if not me: + reasons.append("authenticated user unknown") + if state != "open": + reasons.append(f"PR is not open (state={state})") + if me and author and me == author: + reasons.append("authenticated user is the PR author") + eligible = not reasons + return { + "authenticated_user": me, + "pr_author": author, + "pr_state": state, + "eligible": eligible, + "reasons": reasons or ["open PR and reviewer is not the author"], + } + + +def launcher_snippets(profile_name, config_path): + """Return {client: pretty-JSON snippet} for Claude / Gemini / Codex. + + All three MCP hosts consume the same server entry; the snippet carries only + command/args + GITEA_MCP_CONFIG/GITEA_MCP_PROFILE — never a secret. + """ + entry = gitea_config.launcher_entry(profile_name, config_path) + claude = json.dumps({"mcpServers": entry}, indent=2) + gemini = json.dumps({"mcpServers": entry}, indent=2) + codex = json.dumps(entry, indent=2) + return {"claude": claude, "gemini": gemini, "codex": codex} + + +# ── Interactive prompts (thin wrappers over injected IO) ──────────────────────── + +def _prompt_profile(input_fn, secret_fn, out, keychain_set): + """Collect a new/edited profile from the user. Returns (name, profile). + + Tokens are read with the secret prompt (no echo) and stored in the keychain; + they are never placed in the returned profile. + """ + name = input_fn("Profile name (e.g. prgs-reviewer): ").strip() + if not gitea_config.is_valid_profile_name(name): + raise gitea_config.ConfigError(f"invalid profile name {name!r}") + base_url = input_fn("Base URL (e.g. https://gitea.prgs.cc): ").strip() + username = input_fn("Gitea username: ").strip() + default_owner = input_fn("Default owner (optional): ").strip() + default_repo = input_fn("Default repo (optional): ").strip() + execution_profile = input_fn("Execution profile (optional): ").strip() + auth_type = input_fn("Auth type [keychain/env]: ").strip().lower() + + if auth_type == "keychain": + default_id = f"{name}-gitea-token" + item_id = input_fn(f"Keychain item id [{default_id}]: ").strip() or default_id + store = input_fn("Store a token in the keychain now? [y/N]: ").strip().lower() + if store == "y": + token = secret_fn("Gitea token (hidden): ") + keychain_set(item_id, token) # token never echoed + out(f"Stored token in keychain item '{item_id}'.") + auth = gitea_config.keychain_auth(item_id) + elif auth_type == "env": + default_var = f"GITEA_TOKEN_{name.upper().replace('-', '_').replace('.', '_')}" + var = input_fn(f"Env var name [{default_var}]: ").strip() or default_var + out(f"Set {var} in the environment; its value is never stored here.") + auth = gitea_config.env_auth(var) + else: + raise gitea_config.ConfigError("auth type must be 'keychain' or 'env'") + + profile = gitea_config.build_profile( + base_url=base_url, auth=auth, username=username, + default_owner=default_owner, default_repo=default_repo, + execution_profile=execution_profile, + ) + return name, profile + + +_MENU = """ +Gitea MCP profile setup + 1) list profiles + 2) add profile + 3) edit profile + 4) remove profile + 5) validate config + 6) test profile authentication + 7) show authenticated user + 8) generate launcher snippets (Claude/Gemini/Codex) + 9) check reviewer eligibility for a PR + 0) quit +""" + + +def main(argv=None, *, input_fn=input, secret_fn=None, out=None, config_path=None, + resolve_token=gitea_config.resolve_token, + api_request=gitea_auth.api_request, + keychain_set=gitea_config.keychain_set): + """Run the interactive menu loop. Returns a process exit code.""" + secret_fn = secret_fn or getpass.getpass + out = out or (lambda *a: print(*a)) + path = config_path or gitea_config.config_path() or gitea_config.DEFAULT_CONFIG_PATH + + try: + config = gitea_config.load_config(path) or gitea_config.empty_config() + except gitea_config.ConfigError as exc: + out(f"config error: {exc}") + config = gitea_config.empty_config() + + while True: + out(_MENU) + choice = input_fn("choice: ").strip() + try: + if choice == "0": + out("bye") + return 0 + elif choice == "1": + names = sorted(config.get("profiles") or {}) + out("profiles: " + (", ".join(names) if names else "(none)")) + elif choice == "2": + name, profile = _prompt_profile(input_fn, secret_fn, out, keychain_set) + config = gitea_config.add_profile(config, name, profile) + saved = gitea_config.save_config(config, path) + out(f"added '{name}'; wrote {saved}") + elif choice == "3": + name, profile = _prompt_profile(input_fn, secret_fn, out, keychain_set) + config = gitea_config.upsert_profile(config, name, profile) + saved = gitea_config.save_config(config, path) + out(f"saved '{name}'; wrote {saved}") + elif choice == "4": + name = input_fn("profile to remove: ").strip() + config = gitea_config.remove_profile(config, name) + saved = gitea_config.save_config(config, path) + out(f"removed '{name}'; wrote {saved}") + elif choice == "5": + problems = gitea_config.validate_config(config) + out("valid" if not problems else "problems:\n " + "\n ".join(problems)) + elif choice in ("6", "7"): + name = input_fn("profile: ").strip() + profile = gitea_config.select_profile(config, name) + who = resolve_identity(profile, resolve_token=resolve_token, + api_request=api_request) + out(f"authenticated as: {who}") + elif choice == "8": + name = input_fn("profile: ").strip() + for client, snippet in launcher_snippets(name, path).items(): + out(f"--- {client} ---\n{snippet}") + elif choice == "9": + name = input_fn("profile: ").strip() + pr_number = input_fn("PR number: ").strip() + profile = gitea_config.select_profile(config, name) + result = check_eligibility( + profile, pr_number, resolve_token=resolve_token, + api_request=api_request) + out(f"authenticated user: {result['authenticated_user']}") + out(f"PR author: {result['pr_author']}") + out("ELIGIBLE" if result["eligible"] else "INELIGIBLE") + for r in result["reasons"]: + out(f" - {r}") + else: + out("unknown choice") + except gitea_config.ConfigError as exc: + out(f"error: {exc}") + except Exception as exc: # noqa: BLE001 - keep the menu alive, no secrets + out(f"error: {type(exc).__name__}") + + +if __name__ == "__main__": # pragma: no cover + raise SystemExit(main(sys.argv[1:])) diff --git a/tests/test_config_menu.py b/tests/test_config_menu.py new file mode 100644 index 0000000..d1596f5 --- /dev/null +++ b/tests/test_config_menu.py @@ -0,0 +1,295 @@ +"""Tests for the interactive profile-setup menu (#31): gitea_config authoring +helpers + gitea_config_menu, all exercised without a real keychain, terminal, +or network via injected IO/auth/HTTP. +""" +import os +import sys +import json +import tempfile +import unittest +from unittest.mock import patch + +sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) + +import gitea_config # noqa: E402 +import gitea_config_menu as menu # noqa: E402 + +SECRET = "super-secret-token-value" + + +class _InputQueue: + """Callable that returns queued answers for successive input() prompts.""" + + def __init__(self, answers): + self._answers = list(answers) + + def __call__(self, _prompt=""): + return self._answers.pop(0) + + +class _Out: + """Capture out() lines.""" + + def __init__(self): + self.lines = [] + + def __call__(self, *args): + self.lines.append(" ".join(str(a) for a in args)) + + @property + def text(self): + return "\n".join(self.lines) + + +# --------------------------------------------------------------------------- +# Config authoring helpers +# --------------------------------------------------------------------------- +class TestAuthoringHelpers(unittest.TestCase): + + def test_profile_name_validation(self): + self.assertTrue(gitea_config.is_valid_profile_name("prgs-reviewer")) + self.assertTrue(gitea_config.is_valid_profile_name("prgs.author_1")) + for bad in ("", "-leading", "has space", "a/b", "b@d"): + self.assertFalse(gitea_config.is_valid_profile_name(bad), bad) + + def test_add_profile_preserves_existing(self): + cfg = gitea_config.empty_config() + cfg = gitea_config.add_profile(cfg, "prgs-author", gitea_config.build_profile( + base_url="https://gitea.prgs.cc", auth=gitea_config.env_auth("GITEA_TOKEN_A"))) + cfg = gitea_config.add_profile(cfg, "prgs-reviewer", gitea_config.build_profile( + base_url="https://gitea.prgs.cc", + auth=gitea_config.keychain_auth("prgs-reviewer-token"))) + self.assertEqual(sorted(cfg["profiles"]), ["prgs-author", "prgs-reviewer"]) + + def test_add_duplicate_rejected(self): + cfg = gitea_config.add_profile(gitea_config.empty_config(), "p", + gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"))) + with self.assertRaises(gitea_config.ConfigError): + gitea_config.add_profile(cfg, "p", gitea_config.build_profile( + base_url="https://y", auth=gitea_config.env_auth("W"))) + + def test_add_invalid_name_rejected(self): + with self.assertRaises(gitea_config.ConfigError): + gitea_config.add_profile(gitea_config.empty_config(), "bad name", + gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"))) + + def test_add_missing_base_url_rejected(self): + with self.assertRaises(gitea_config.ConfigError): + gitea_config.add_profile(gitea_config.empty_config(), "p", + {"auth": gitea_config.env_auth("V")}) + + def test_validate_reports_missing_fields(self): + cfg = {"version": 1, "profiles": { + "noauth": {"base_url": "https://x"}, + "nobase": {"auth": gitea_config.env_auth("V")}, + "inline": {"base_url": "https://x", "auth": gitea_config.env_auth("V"), + "token": "leak"}, + }} + problems = gitea_config.validate_config(cfg) + joined = "\n".join(problems) + self.assertIn("noauth", joined) + self.assertIn("nobase", joined) + self.assertIn("inline", joined) + self.assertNotIn("leak", joined) # inline secret value never echoed + + def test_build_profile_omits_empty(self): + p = gitea_config.build_profile(base_url="https://x", + auth=gitea_config.env_auth("V"), username="") + self.assertNotIn("username", p) + self.assertNotIn("default_owner", p) + + +class TestAtomicSave(unittest.TestCase): + + def test_save_creates_parents_and_pretty_json(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "nested", "dir", "profiles.json") + cfg = gitea_config.add_profile(gitea_config.empty_config(), "p", + gitea_config.build_profile(base_url="https://x", + auth=gitea_config.env_auth("V"))) + gitea_config.save_config(cfg, path) + self.assertTrue(os.path.isfile(path)) + with open(path, encoding="utf-8") as fh: + body = fh.read() + self.assertEqual(json.loads(body), cfg) + self.assertIn("\n ", body) # pretty-printed + # No leftover temp files in the directory. + leftovers = [n for n in os.listdir(os.path.dirname(path)) + if n.startswith(".profiles-")] + self.assertEqual(leftovers, []) + + def test_save_is_atomic_via_replace(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "profiles.json") + gitea_config.save_config(gitea_config.empty_config(), path) + with patch("gitea_config.os.replace", side_effect=OSError("boom")): + with self.assertRaises(OSError): + gitea_config.save_config( + gitea_config.add_profile(gitea_config.empty_config(), "p", + gitea_config.build_profile(base_url="https://x", + auth=gitea_config.env_auth("V"))), path) + # Original file intact; no temp debris. + with open(path, encoding="utf-8") as fh: + self.assertEqual(json.load(fh), gitea_config.empty_config()) + leftovers = [n for n in os.listdir(d) if n.startswith(".profiles-")] + self.assertEqual(leftovers, []) + + +class TestKeychainSet(unittest.TestCase): + + def test_stores_token_via_security_without_printing(self): + calls = {} + + class _Proc: + returncode = 0 + + def runner(cmd, **kw): + calls["cmd"] = cmd + return _Proc() + + out = _Out() + rv = gitea_config.keychain_set("prgs-reviewer-token", SECRET, runner=runner) + self.assertEqual(rv, "prgs-reviewer-token") + self.assertEqual(calls["cmd"][0], "security") + self.assertIn(SECRET, calls["cmd"]) # token handed to security... + self.assertNotIn(SECRET, out.text) # ...never printed by us + + def test_failure_raises_without_token(self): + class _Proc: + returncode = 1 + + with self.assertRaises(gitea_config.ConfigError) as ctx: + gitea_config.keychain_set("id", SECRET, runner=lambda *a, **k: _Proc()) + self.assertNotIn(SECRET, str(ctx.exception)) + + def test_empty_token_rejected(self): + with self.assertRaises(gitea_config.ConfigError): + gitea_config.keychain_set("id", "", runner=lambda *a, **k: None) + + +# --------------------------------------------------------------------------- +# Launcher snippets +# --------------------------------------------------------------------------- +class TestLauncherSnippets(unittest.TestCase): + + def test_only_safe_keys_no_secrets(self): + entry = gitea_config.launcher_entry("prgs", "/cfg/profiles.json")["gitea-tools"] + self.assertEqual(set(entry), {"command", "args", "env"}) + self.assertEqual(set(entry["env"]), {"GITEA_MCP_CONFIG", "GITEA_MCP_PROFILE"}) + self.assertEqual(entry["env"]["GITEA_MCP_PROFILE"], "prgs") + blob = json.dumps(entry).lower() + for word in ("token", "password", "secret"): + self.assertNotIn(word, blob) + + def test_snippets_for_all_clients(self): + snips = menu.launcher_snippets("prgs", "/cfg/profiles.json") + self.assertEqual(set(snips), {"claude", "gemini", "codex"}) + for text in snips.values(): + self.assertIn("GITEA_MCP_PROFILE", text) + self.assertNotIn("token", text.lower()) + + +# --------------------------------------------------------------------------- +# Auth test + reviewer eligibility (mocked API + token resolution) +# --------------------------------------------------------------------------- +class TestIdentityAndEligibility(unittest.TestCase): + + def _profile(self): + return {"base_url": "https://gitea.prgs.cc", + "default_owner": "Org", "default_repo": "Repo", + "auth": gitea_config.env_auth("GITEA_TOKEN_X")} + + def test_resolve_identity(self): + def api(m, url, auth): + return {"login": "reviewer-bot"} if url.endswith("/user") else {} + who = menu.resolve_identity(self._profile(), + resolve_token=lambda p: "tok", api_request=api) + self.assertEqual(who, "reviewer-bot") + + def test_eligible_when_open_and_not_author(self): + def api(m, url, auth): + if url.endswith("/user"): + return {"login": "reviewer-bot"} + return {"user": {"login": "author-bot"}, "state": "open"} + r = menu.check_eligibility(self._profile(), 8, + resolve_token=lambda p: "tok", api_request=api) + self.assertTrue(r["eligible"]) + self.assertEqual(r["authenticated_user"], "reviewer-bot") + self.assertEqual(r["pr_author"], "author-bot") + + def test_ineligible_when_self_author(self): + def api(m, url, auth): + if url.endswith("/user"): + return {"login": "same-bot"} + return {"user": {"login": "same-bot"}, "state": "open"} + r = menu.check_eligibility(self._profile(), 8, + resolve_token=lambda p: "tok", api_request=api) + self.assertFalse(r["eligible"]) + self.assertTrue(any("author" in x for x in r["reasons"])) + + def test_ineligible_when_closed(self): + def api(m, url, auth): + if url.endswith("/user"): + return {"login": "reviewer-bot"} + return {"user": {"login": "author-bot"}, "state": "closed"} + r = menu.check_eligibility(self._profile(), 8, + resolve_token=lambda p: "tok", api_request=api) + self.assertFalse(r["eligible"]) + + +# --------------------------------------------------------------------------- +# Prompt flow + full menu loop (no real IO/keychain/network) +# --------------------------------------------------------------------------- +class TestMenuFlow(unittest.TestCase): + + def test_prompt_keychain_stores_but_profile_has_no_token(self): + stored = {} + answers = ["prgs-reviewer", "https://gitea.prgs.cc", "jcwalker3", + "Org", "Repo", "personal-prgs", "keychain", + "prgs-reviewer-token", "y"] + out = _Out() + name, profile = menu._prompt_profile( + _InputQueue(answers), secret_fn=lambda _p: SECRET, out=out, + keychain_set=lambda i, t: stored.update({i: t})) + self.assertEqual(name, "prgs-reviewer") + self.assertEqual(profile["auth"], {"type": "keychain", "id": "prgs-reviewer-token"}) + self.assertNotIn(SECRET, json.dumps(profile)) # token not in profile + self.assertNotIn(SECRET, out.text) # token not printed + self.assertEqual(stored, {"prgs-reviewer-token": SECRET}) # stored via keychain + + def test_prompt_env_reference(self): + answers = ["prgs-author", "https://gitea.prgs.cc", "jcwalker3", + "", "", "personal-prgs", "env", ""] # blank -> default var name + name, profile = menu._prompt_profile( + _InputQueue(answers), secret_fn=lambda _p: SECRET, out=_Out(), + keychain_set=lambda *a: None) + self.assertEqual(profile["auth"], {"type": "env", "name": "GITEA_TOKEN_PRGS_AUTHOR"}) + + def test_menu_add_then_quit_writes_config_without_secrets(self): + with tempfile.TemporaryDirectory() as d: + path = os.path.join(d, "profiles.json") + answers = [ + "2", # add profile + "prgs-reviewer", "https://gitea.prgs.cc", "jcwalker3", + "Org", "Repo", "personal-prgs", "keychain", + "prgs-reviewer-token", "y", + "1", # list + "0", # quit + ] + out = _Out() + rc = menu.main( + input_fn=_InputQueue(answers), secret_fn=lambda _p: SECRET, + out=out, config_path=path, keychain_set=lambda *a: None) + self.assertEqual(rc, 0) + self.assertTrue(os.path.isfile(path)) + with open(path, encoding="utf-8") as fh: + body = fh.read() + self.assertIn("prgs-reviewer", body) + self.assertIn("prgs-reviewer-token", body) # keychain id (non-secret) + self.assertNotIn(SECRET, body) # never the token value + self.assertNotIn(SECRET, out.text) + self.assertIn("prgs-reviewer", out.text) # listed + + +if __name__ == "__main__": + unittest.main()