feat: interactive setup menu for canonical Gitea MCP profiles (#31)
Add an interactive utility so users create/edit/validate canonical runtime profiles and generate safe LLM launcher snippets without hand-editing JSON or pasting tokens into Claude/Gemini/Codex configs. Run: `python gitea_config.py menu` (or `python gitea_config_menu.py`). gitea_config.py — pure, testable authoring helpers: - is_valid_profile_name, build_profile, keychain_auth/env_auth, empty_config - validate_config (reports missing base_url/auth, inline token/password — never echoing the secret value) - add_profile (preserves existing, rejects dup/invalid name/missing base_url), upsert_profile, remove_profile - save_config: mkdir parents + atomic temp-then-os.replace, pretty JSON - launcher_entry: thin MCP entry (command/args + GITEA_MCP_CONFIG/PROFILE only) - keychain_set: store a token via `security add-generic-password` (token passed as an arg, never returned/printed/logged; injectable runner) - `menu` __main__ dispatch gitea_config_menu.py — interactive loop with fully injectable IO/secret/HTTP/ keychain so it is testable without a real terminal, keychain, or network: - list / add / edit / remove / validate profiles - test authentication + show authenticated user (calls /user only on request) - reviewer-eligibility helper (authenticated user vs PR author, open state) — read-only, never approves/merges - launcher snippets for Claude / Gemini / Codex (no secrets) Security: tokens are never written to profiles.json, launcher snippets, logs, or errors — only keychain ids / env var names are stored. Backwards compatible: menu is optional; env-only mode and MCP server startup are unchanged. Tests: tests/test_config_menu.py (21 cases) — name validation, preserve-on-add, dup/invalid/missing-field rejection, atomic write (+ replace-failure leaves the original intact, no temp debris), keychain_set stores-without-printing, launcher snippets secret-free, eligibility eligible/self-author/closed, and a full menu add→list→quit flow proving the token value never reaches disk or stdout. Stacked on #30 (canonical profiles); base branch feat/json-runtime-profiles. Refs #10, #19. Closes #31. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -252,6 +252,19 @@ var for the secret), then delete the `GITEA_USER_*` / `GITEA_PASS_*` /
|
|||||||
`GITEA_SITE_*` blocks from every LLM `mcp_config.json`, leaving only
|
`GITEA_SITE_*` blocks from every LLM `mcp_config.json`, leaving only
|
||||||
`GITEA_MCP_CONFIG` + `GITEA_MCP_PROFILE`. Existing env-only setups keep working
|
`GITEA_MCP_CONFIG` + `GITEA_MCP_PROFILE`. Existing env-only setups keep working
|
||||||
unchanged until migrated.
|
unchanged until migrated.
|
||||||
|
|
||||||
|
**Interactive setup — no hand-editing JSON.** Run the menu to create/edit/
|
||||||
|
validate profiles, store a token in the macOS keychain (never echoed or written
|
||||||
|
to any config), test a profile's authentication, print the authenticated user,
|
||||||
|
check reviewer eligibility for a PR, and generate ready-to-paste launcher
|
||||||
|
snippets for Claude / Gemini / Codex:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python gitea_config.py menu
|
||||||
|
```
|
||||||
|
|
||||||
|
The generated launcher snippets contain only `command`, `args`,
|
||||||
|
`GITEA_MCP_CONFIG`, and `GITEA_MCP_PROFILE` — never a token or password.
|
||||||
</details>
|
</details>
|
||||||
|
|
||||||
<details>
|
<details>
|
||||||
|
|||||||
+213
@@ -44,7 +44,10 @@ Design constraints:
|
|||||||
tokens, or passwords.
|
tokens, or passwords.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
import json
|
import json
|
||||||
|
import tempfile
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
ENV_CONFIG_PATH = "GITEA_MCP_CONFIG"
|
ENV_CONFIG_PATH = "GITEA_MCP_CONFIG"
|
||||||
@@ -53,6 +56,14 @@ ENV_PROFILE = "GITEA_MCP_PROFILE"
|
|||||||
SUPPORTED_VERSION = 1
|
SUPPORTED_VERSION = 1
|
||||||
_AUTH_TYPES = ("keychain", "env")
|
_AUTH_TYPES = ("keychain", "env")
|
||||||
|
|
||||||
|
# Profile names go into env vars, keychain ids, and JSON keys — keep them tame.
|
||||||
|
_PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
|
||||||
|
|
||||||
|
# Default canonical config location (one file shared by all LLM launchers).
|
||||||
|
DEFAULT_CONFIG_PATH = os.path.join(
|
||||||
|
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ConfigError(Exception):
|
class ConfigError(Exception):
|
||||||
"""Raised for a bad config file, profile selection, or secret reference.
|
"""Raised for a bad config file, profile selection, or secret reference.
|
||||||
@@ -234,3 +245,205 @@ def resolve_token(profile, keychain_lookup=_keychain_token):
|
|||||||
)
|
)
|
||||||
return value
|
return value
|
||||||
raise ConfigError(f"unsupported auth type {atype!r} in the selected profile")
|
raise ConfigError(f"unsupported auth type {atype!r} in the selected profile")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Config authoring helpers (used by the interactive menu; pure + testable) ────
|
||||||
|
|
||||||
|
def is_valid_profile_name(name):
|
||||||
|
"""True if *name* is a safe profile key (alnum, dot, dash, underscore)."""
|
||||||
|
return bool(name) and bool(_PROFILE_NAME_RE.match(name))
|
||||||
|
|
||||||
|
|
||||||
|
def keychain_auth(item_id):
|
||||||
|
"""Build a keychain auth reference."""
|
||||||
|
return {"type": "keychain", "id": item_id}
|
||||||
|
|
||||||
|
|
||||||
|
def env_auth(var_name):
|
||||||
|
"""Build an env auth reference."""
|
||||||
|
return {"type": "env", "name": var_name}
|
||||||
|
|
||||||
|
|
||||||
|
def build_profile(*, base_url, auth, username=None, default_owner=None,
|
||||||
|
default_repo=None, execution_profile=None):
|
||||||
|
"""Assemble a profile dict, omitting empty optional fields. No secrets."""
|
||||||
|
profile = {"base_url": base_url, "auth": auth}
|
||||||
|
if username:
|
||||||
|
profile["username"] = username
|
||||||
|
if default_owner:
|
||||||
|
profile["default_owner"] = default_owner
|
||||||
|
if default_repo:
|
||||||
|
profile["default_repo"] = default_repo
|
||||||
|
if execution_profile:
|
||||||
|
profile["execution_profile"] = execution_profile
|
||||||
|
return profile
|
||||||
|
|
||||||
|
|
||||||
|
def empty_config():
|
||||||
|
"""Return a fresh, valid canonical config with no profiles."""
|
||||||
|
return {"version": SUPPORTED_VERSION, "profiles": {}}
|
||||||
|
|
||||||
|
|
||||||
|
def validate_config(config):
|
||||||
|
"""Return a list of human-readable problems with *config* (empty = valid).
|
||||||
|
|
||||||
|
Never includes secret material — profiles carry only auth *references*.
|
||||||
|
"""
|
||||||
|
problems = []
|
||||||
|
if not isinstance(config, dict):
|
||||||
|
return ["config is not a JSON object"]
|
||||||
|
if config.get("version", SUPPORTED_VERSION) != SUPPORTED_VERSION:
|
||||||
|
problems.append(
|
||||||
|
f"unsupported version {config.get('version')!r} (expected {SUPPORTED_VERSION})"
|
||||||
|
)
|
||||||
|
profiles = config.get("profiles")
|
||||||
|
if not isinstance(profiles, dict):
|
||||||
|
problems.append("missing 'profiles' object")
|
||||||
|
return problems
|
||||||
|
for name, profile in profiles.items():
|
||||||
|
if not is_valid_profile_name(name):
|
||||||
|
problems.append(f"invalid profile name {name!r}")
|
||||||
|
if not isinstance(profile, dict):
|
||||||
|
problems.append(f"profile '{name}' is not an object")
|
||||||
|
continue
|
||||||
|
if not profile.get("base_url"):
|
||||||
|
problems.append(f"profile '{name}' is missing 'base_url'")
|
||||||
|
for secret_key in ("token", "password"):
|
||||||
|
if secret_key in profile:
|
||||||
|
problems.append(
|
||||||
|
f"profile '{name}' has an inline '{secret_key}' (use an auth reference)"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
_validate_auth(name, profile.get("auth"))
|
||||||
|
except ConfigError as exc:
|
||||||
|
problems.append(str(exc))
|
||||||
|
else:
|
||||||
|
if profile.get("auth") is None:
|
||||||
|
problems.append(f"profile '{name}' is missing an 'auth' reference")
|
||||||
|
return problems
|
||||||
|
|
||||||
|
|
||||||
|
def add_profile(config, name, profile):
|
||||||
|
"""Return a copy of *config* with *profile* added under *name*.
|
||||||
|
|
||||||
|
Preserves existing profiles. Raises :class:`ConfigError` on an invalid name,
|
||||||
|
a duplicate name, or an invalid profile.
|
||||||
|
"""
|
||||||
|
if not is_valid_profile_name(name):
|
||||||
|
raise ConfigError(
|
||||||
|
f"invalid profile name {name!r}; use letters, digits, '.', '-', '_'"
|
||||||
|
)
|
||||||
|
profiles = dict(config.get("profiles") or {})
|
||||||
|
if name in profiles:
|
||||||
|
raise ConfigError(f"profile '{name}' already exists; edit or remove it first")
|
||||||
|
candidate = {"version": config.get("version", SUPPORTED_VERSION),
|
||||||
|
"profiles": {**profiles, name: profile}}
|
||||||
|
# Validate just this profile (reuse select_profile's checks).
|
||||||
|
select_profile(candidate, name)
|
||||||
|
if not profile.get("base_url"):
|
||||||
|
raise ConfigError(f"profile '{name}' is missing 'base_url'")
|
||||||
|
return candidate
|
||||||
|
|
||||||
|
|
||||||
|
def upsert_profile(config, name, profile):
|
||||||
|
"""Like :func:`add_profile` but replaces an existing profile (for edits)."""
|
||||||
|
if not is_valid_profile_name(name):
|
||||||
|
raise ConfigError(f"invalid profile name {name!r}")
|
||||||
|
profiles = dict(config.get("profiles") or {})
|
||||||
|
profiles[name] = profile
|
||||||
|
candidate = {"version": config.get("version", SUPPORTED_VERSION),
|
||||||
|
"profiles": profiles}
|
||||||
|
select_profile(candidate, name)
|
||||||
|
return candidate
|
||||||
|
|
||||||
|
|
||||||
|
def remove_profile(config, name):
|
||||||
|
"""Return a copy of *config* without profile *name*."""
|
||||||
|
profiles = dict(config.get("profiles") or {})
|
||||||
|
if name not in profiles:
|
||||||
|
raise ConfigError(f"profile '{name}' not found")
|
||||||
|
del profiles[name]
|
||||||
|
return {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles}
|
||||||
|
|
||||||
|
|
||||||
|
def save_config(config, path=None):
|
||||||
|
"""Atomically write *config* to *path* as pretty JSON, creating parent dirs.
|
||||||
|
|
||||||
|
Writes a temp file in the same directory then ``os.replace``s it into place,
|
||||||
|
so a crash mid-write never truncates an existing config.
|
||||||
|
"""
|
||||||
|
path = path or config_path() or DEFAULT_CONFIG_PATH
|
||||||
|
directory = os.path.dirname(os.path.abspath(path))
|
||||||
|
os.makedirs(directory, exist_ok=True)
|
||||||
|
fd, tmp = tempfile.mkstemp(dir=directory, prefix=".profiles-", suffix=".json")
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
||||||
|
json.dump(config, fh, indent=2, sort_keys=True)
|
||||||
|
fh.write("\n")
|
||||||
|
os.replace(tmp, path)
|
||||||
|
except BaseException:
|
||||||
|
try:
|
||||||
|
os.unlink(tmp)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def server_command():
|
||||||
|
"""Return (command, args) that launch this repo's MCP server."""
|
||||||
|
root = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
python = os.path.join(root, "venv", "bin", "python3")
|
||||||
|
if not os.path.exists(python):
|
||||||
|
python = sys.executable
|
||||||
|
return python, [os.path.join(root, "mcp_server.py")]
|
||||||
|
|
||||||
|
|
||||||
|
def launcher_entry(profile_name, config_path=None):
|
||||||
|
"""Return a thin MCP launcher entry for *profile_name*.
|
||||||
|
|
||||||
|
Contains only command/args and the two GITEA_MCP_* env vars — never a token
|
||||||
|
or password. Suitable for Claude / Gemini / Codex ``mcpServers`` blocks.
|
||||||
|
"""
|
||||||
|
command, args = server_command()
|
||||||
|
return {
|
||||||
|
"gitea-tools": {
|
||||||
|
"command": command,
|
||||||
|
"args": args,
|
||||||
|
"env": {
|
||||||
|
"GITEA_MCP_CONFIG": config_path or DEFAULT_CONFIG_PATH,
|
||||||
|
"GITEA_MCP_PROFILE": profile_name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def keychain_set(item_id, token, account=None, runner=subprocess.run):
|
||||||
|
"""Store *token* in the macOS keychain under service *item_id*.
|
||||||
|
|
||||||
|
The token is passed to ``security`` as an argument only; it is never
|
||||||
|
returned, printed, or logged here. *runner* is injectable for testing.
|
||||||
|
Raises :class:`ConfigError` on failure (without echoing the token).
|
||||||
|
"""
|
||||||
|
if not item_id:
|
||||||
|
raise ConfigError("keychain item id is required")
|
||||||
|
if not token:
|
||||||
|
raise ConfigError("refusing to store an empty token")
|
||||||
|
account = account or os.environ.get("USER") or "gitea-tools"
|
||||||
|
cmd = ["security", "add-generic-password", "-U",
|
||||||
|
"-s", item_id, "-a", account, "-w", token]
|
||||||
|
try:
|
||||||
|
proc = runner(cmd, capture_output=True, text=True)
|
||||||
|
except (OSError, subprocess.SubprocessError) as exc:
|
||||||
|
raise ConfigError(f"could not run keychain store for '{item_id}'") from exc
|
||||||
|
if getattr(proc, "returncode", 1) != 0:
|
||||||
|
raise ConfigError(f"keychain store failed for item '{item_id}'")
|
||||||
|
return item_id
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover - thin CLI dispatch
|
||||||
|
if len(sys.argv) > 1 and sys.argv[1] == "menu":
|
||||||
|
import gitea_config_menu
|
||||||
|
raise SystemExit(gitea_config_menu.main(sys.argv[2:]))
|
||||||
|
print("usage: python gitea_config.py menu", file=sys.stderr)
|
||||||
|
raise SystemExit(2)
|
||||||
|
|||||||
@@ -0,0 +1,235 @@
|
|||||||
|
"""Interactive setup menu for the canonical Gitea MCP profile config (#31).
|
||||||
|
|
||||||
|
Create / edit / remove / validate profiles, test a profile's authentication,
|
||||||
|
show the authenticated user, check reviewer eligibility for a PR, and print
|
||||||
|
thin launcher snippets for Claude / Gemini / Codex — without hand-editing JSON
|
||||||
|
and without ever putting a raw token in a launcher config.
|
||||||
|
|
||||||
|
Run: ``python gitea_config.py menu`` (or ``python gitea_config_menu.py``)
|
||||||
|
|
||||||
|
Every side effect (stdin, secret prompt, stdout, keychain store, token
|
||||||
|
resolution, HTTP) is injectable so the menu logic is testable without a real
|
||||||
|
keychain, terminal, or network.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import getpass
|
||||||
|
import urllib.parse
|
||||||
|
|
||||||
|
import gitea_config
|
||||||
|
import gitea_auth
|
||||||
|
|
||||||
|
|
||||||
|
def _host(base_url):
|
||||||
|
"""Extract the host from a profile base_url (e.g. https://gitea.x → gitea.x)."""
|
||||||
|
netloc = urllib.parse.urlparse(base_url or "").netloc
|
||||||
|
return netloc or (base_url or "").strip()
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_identity(profile, *, resolve_token=gitea_config.resolve_token,
|
||||||
|
api_request=gitea_auth.api_request):
|
||||||
|
"""Return the authenticated username for *profile*, or raise ConfigError.
|
||||||
|
|
||||||
|
Resolves the profile's auth reference to a token (keychain/env), calls the
|
||||||
|
read-only current-user endpoint, and returns ``login``. Never returns or
|
||||||
|
logs the token.
|
||||||
|
"""
|
||||||
|
token = resolve_token(profile)
|
||||||
|
if not token:
|
||||||
|
raise gitea_config.ConfigError("no token resolved for this profile")
|
||||||
|
host = _host(profile.get("base_url"))
|
||||||
|
data = api_request("GET", f"https://{host}/api/v1/user", f"token {token}")
|
||||||
|
login = (data or {}).get("login")
|
||||||
|
if not login:
|
||||||
|
raise gitea_config.ConfigError("could not determine authenticated user")
|
||||||
|
return login
|
||||||
|
|
||||||
|
|
||||||
|
def check_eligibility(profile, pr_number, owner=None, repo=None, *,
|
||||||
|
resolve_token=gitea_config.resolve_token,
|
||||||
|
api_request=gitea_auth.api_request):
|
||||||
|
"""Report whether *profile* is reviewer-eligible for a PR. Read-only.
|
||||||
|
|
||||||
|
Eligible ≙ the PR is open AND the authenticated user is NOT the PR author.
|
||||||
|
Never approves or merges. Returns a dict with the facts and a verdict.
|
||||||
|
"""
|
||||||
|
owner = owner or profile.get("default_owner")
|
||||||
|
repo = repo or profile.get("default_repo")
|
||||||
|
if not owner or not repo:
|
||||||
|
raise gitea_config.ConfigError(
|
||||||
|
"owner/repo required (set default_owner/default_repo or pass them)"
|
||||||
|
)
|
||||||
|
token = resolve_token(profile)
|
||||||
|
if not token:
|
||||||
|
raise gitea_config.ConfigError("no token resolved for this profile")
|
||||||
|
auth = f"token {token}"
|
||||||
|
host = _host(profile.get("base_url"))
|
||||||
|
me = (api_request("GET", f"https://{host}/api/v1/user", auth) or {}).get("login")
|
||||||
|
pr = api_request(
|
||||||
|
"GET", f"https://{host}/api/v1/repos/{owner}/{repo}/pulls/{pr_number}", auth
|
||||||
|
) or {}
|
||||||
|
author = (pr.get("user") or {}).get("login")
|
||||||
|
state = pr.get("state")
|
||||||
|
reasons = []
|
||||||
|
if not me:
|
||||||
|
reasons.append("authenticated user unknown")
|
||||||
|
if state != "open":
|
||||||
|
reasons.append(f"PR is not open (state={state})")
|
||||||
|
if me and author and me == author:
|
||||||
|
reasons.append("authenticated user is the PR author")
|
||||||
|
eligible = not reasons
|
||||||
|
return {
|
||||||
|
"authenticated_user": me,
|
||||||
|
"pr_author": author,
|
||||||
|
"pr_state": state,
|
||||||
|
"eligible": eligible,
|
||||||
|
"reasons": reasons or ["open PR and reviewer is not the author"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def launcher_snippets(profile_name, config_path):
|
||||||
|
"""Return {client: pretty-JSON snippet} for Claude / Gemini / Codex.
|
||||||
|
|
||||||
|
All three MCP hosts consume the same server entry; the snippet carries only
|
||||||
|
command/args + GITEA_MCP_CONFIG/GITEA_MCP_PROFILE — never a secret.
|
||||||
|
"""
|
||||||
|
entry = gitea_config.launcher_entry(profile_name, config_path)
|
||||||
|
claude = json.dumps({"mcpServers": entry}, indent=2)
|
||||||
|
gemini = json.dumps({"mcpServers": entry}, indent=2)
|
||||||
|
codex = json.dumps(entry, indent=2)
|
||||||
|
return {"claude": claude, "gemini": gemini, "codex": codex}
|
||||||
|
|
||||||
|
|
||||||
|
# ── Interactive prompts (thin wrappers over injected IO) ────────────────────────
|
||||||
|
|
||||||
|
def _prompt_profile(input_fn, secret_fn, out, keychain_set):
|
||||||
|
"""Collect a new/edited profile from the user. Returns (name, profile).
|
||||||
|
|
||||||
|
Tokens are read with the secret prompt (no echo) and stored in the keychain;
|
||||||
|
they are never placed in the returned profile.
|
||||||
|
"""
|
||||||
|
name = input_fn("Profile name (e.g. prgs-reviewer): ").strip()
|
||||||
|
if not gitea_config.is_valid_profile_name(name):
|
||||||
|
raise gitea_config.ConfigError(f"invalid profile name {name!r}")
|
||||||
|
base_url = input_fn("Base URL (e.g. https://gitea.prgs.cc): ").strip()
|
||||||
|
username = input_fn("Gitea username: ").strip()
|
||||||
|
default_owner = input_fn("Default owner (optional): ").strip()
|
||||||
|
default_repo = input_fn("Default repo (optional): ").strip()
|
||||||
|
execution_profile = input_fn("Execution profile (optional): ").strip()
|
||||||
|
auth_type = input_fn("Auth type [keychain/env]: ").strip().lower()
|
||||||
|
|
||||||
|
if auth_type == "keychain":
|
||||||
|
default_id = f"{name}-gitea-token"
|
||||||
|
item_id = input_fn(f"Keychain item id [{default_id}]: ").strip() or default_id
|
||||||
|
store = input_fn("Store a token in the keychain now? [y/N]: ").strip().lower()
|
||||||
|
if store == "y":
|
||||||
|
token = secret_fn("Gitea token (hidden): ")
|
||||||
|
keychain_set(item_id, token) # token never echoed
|
||||||
|
out(f"Stored token in keychain item '{item_id}'.")
|
||||||
|
auth = gitea_config.keychain_auth(item_id)
|
||||||
|
elif auth_type == "env":
|
||||||
|
default_var = f"GITEA_TOKEN_{name.upper().replace('-', '_').replace('.', '_')}"
|
||||||
|
var = input_fn(f"Env var name [{default_var}]: ").strip() or default_var
|
||||||
|
out(f"Set {var} in the environment; its value is never stored here.")
|
||||||
|
auth = gitea_config.env_auth(var)
|
||||||
|
else:
|
||||||
|
raise gitea_config.ConfigError("auth type must be 'keychain' or 'env'")
|
||||||
|
|
||||||
|
profile = gitea_config.build_profile(
|
||||||
|
base_url=base_url, auth=auth, username=username,
|
||||||
|
default_owner=default_owner, default_repo=default_repo,
|
||||||
|
execution_profile=execution_profile,
|
||||||
|
)
|
||||||
|
return name, profile
|
||||||
|
|
||||||
|
|
||||||
|
_MENU = """
|
||||||
|
Gitea MCP profile setup
|
||||||
|
1) list profiles
|
||||||
|
2) add profile
|
||||||
|
3) edit profile
|
||||||
|
4) remove profile
|
||||||
|
5) validate config
|
||||||
|
6) test profile authentication
|
||||||
|
7) show authenticated user
|
||||||
|
8) generate launcher snippets (Claude/Gemini/Codex)
|
||||||
|
9) check reviewer eligibility for a PR
|
||||||
|
0) quit
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv=None, *, input_fn=input, secret_fn=None, out=None, config_path=None,
|
||||||
|
resolve_token=gitea_config.resolve_token,
|
||||||
|
api_request=gitea_auth.api_request,
|
||||||
|
keychain_set=gitea_config.keychain_set):
|
||||||
|
"""Run the interactive menu loop. Returns a process exit code."""
|
||||||
|
secret_fn = secret_fn or getpass.getpass
|
||||||
|
out = out or (lambda *a: print(*a))
|
||||||
|
path = config_path or gitea_config.config_path() or gitea_config.DEFAULT_CONFIG_PATH
|
||||||
|
|
||||||
|
try:
|
||||||
|
config = gitea_config.load_config(path) or gitea_config.empty_config()
|
||||||
|
except gitea_config.ConfigError as exc:
|
||||||
|
out(f"config error: {exc}")
|
||||||
|
config = gitea_config.empty_config()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
out(_MENU)
|
||||||
|
choice = input_fn("choice: ").strip()
|
||||||
|
try:
|
||||||
|
if choice == "0":
|
||||||
|
out("bye")
|
||||||
|
return 0
|
||||||
|
elif choice == "1":
|
||||||
|
names = sorted(config.get("profiles") or {})
|
||||||
|
out("profiles: " + (", ".join(names) if names else "(none)"))
|
||||||
|
elif choice == "2":
|
||||||
|
name, profile = _prompt_profile(input_fn, secret_fn, out, keychain_set)
|
||||||
|
config = gitea_config.add_profile(config, name, profile)
|
||||||
|
saved = gitea_config.save_config(config, path)
|
||||||
|
out(f"added '{name}'; wrote {saved}")
|
||||||
|
elif choice == "3":
|
||||||
|
name, profile = _prompt_profile(input_fn, secret_fn, out, keychain_set)
|
||||||
|
config = gitea_config.upsert_profile(config, name, profile)
|
||||||
|
saved = gitea_config.save_config(config, path)
|
||||||
|
out(f"saved '{name}'; wrote {saved}")
|
||||||
|
elif choice == "4":
|
||||||
|
name = input_fn("profile to remove: ").strip()
|
||||||
|
config = gitea_config.remove_profile(config, name)
|
||||||
|
saved = gitea_config.save_config(config, path)
|
||||||
|
out(f"removed '{name}'; wrote {saved}")
|
||||||
|
elif choice == "5":
|
||||||
|
problems = gitea_config.validate_config(config)
|
||||||
|
out("valid" if not problems else "problems:\n " + "\n ".join(problems))
|
||||||
|
elif choice in ("6", "7"):
|
||||||
|
name = input_fn("profile: ").strip()
|
||||||
|
profile = gitea_config.select_profile(config, name)
|
||||||
|
who = resolve_identity(profile, resolve_token=resolve_token,
|
||||||
|
api_request=api_request)
|
||||||
|
out(f"authenticated as: {who}")
|
||||||
|
elif choice == "8":
|
||||||
|
name = input_fn("profile: ").strip()
|
||||||
|
for client, snippet in launcher_snippets(name, path).items():
|
||||||
|
out(f"--- {client} ---\n{snippet}")
|
||||||
|
elif choice == "9":
|
||||||
|
name = input_fn("profile: ").strip()
|
||||||
|
pr_number = input_fn("PR number: ").strip()
|
||||||
|
profile = gitea_config.select_profile(config, name)
|
||||||
|
result = check_eligibility(
|
||||||
|
profile, pr_number, resolve_token=resolve_token,
|
||||||
|
api_request=api_request)
|
||||||
|
out(f"authenticated user: {result['authenticated_user']}")
|
||||||
|
out(f"PR author: {result['pr_author']}")
|
||||||
|
out("ELIGIBLE" if result["eligible"] else "INELIGIBLE")
|
||||||
|
for r in result["reasons"]:
|
||||||
|
out(f" - {r}")
|
||||||
|
else:
|
||||||
|
out("unknown choice")
|
||||||
|
except gitea_config.ConfigError as exc:
|
||||||
|
out(f"error: {exc}")
|
||||||
|
except Exception as exc: # noqa: BLE001 - keep the menu alive, no secrets
|
||||||
|
out(f"error: {type(exc).__name__}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover
|
||||||
|
raise SystemExit(main(sys.argv[1:]))
|
||||||
@@ -0,0 +1,295 @@
|
|||||||
|
"""Tests for the interactive profile-setup menu (#31): gitea_config authoring
|
||||||
|
helpers + gitea_config_menu, all exercised without a real keychain, terminal,
|
||||||
|
or network via injected IO/auth/HTTP.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
||||||
|
|
||||||
|
import gitea_config # noqa: E402
|
||||||
|
import gitea_config_menu as menu # noqa: E402
|
||||||
|
|
||||||
|
SECRET = "super-secret-token-value"
|
||||||
|
|
||||||
|
|
||||||
|
class _InputQueue:
|
||||||
|
"""Callable that returns queued answers for successive input() prompts."""
|
||||||
|
|
||||||
|
def __init__(self, answers):
|
||||||
|
self._answers = list(answers)
|
||||||
|
|
||||||
|
def __call__(self, _prompt=""):
|
||||||
|
return self._answers.pop(0)
|
||||||
|
|
||||||
|
|
||||||
|
class _Out:
|
||||||
|
"""Capture out() lines."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.lines = []
|
||||||
|
|
||||||
|
def __call__(self, *args):
|
||||||
|
self.lines.append(" ".join(str(a) for a in args))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def text(self):
|
||||||
|
return "\n".join(self.lines)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Config authoring helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestAuthoringHelpers(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_profile_name_validation(self):
|
||||||
|
self.assertTrue(gitea_config.is_valid_profile_name("prgs-reviewer"))
|
||||||
|
self.assertTrue(gitea_config.is_valid_profile_name("prgs.author_1"))
|
||||||
|
for bad in ("", "-leading", "has space", "a/b", "b@d"):
|
||||||
|
self.assertFalse(gitea_config.is_valid_profile_name(bad), bad)
|
||||||
|
|
||||||
|
def test_add_profile_preserves_existing(self):
|
||||||
|
cfg = gitea_config.empty_config()
|
||||||
|
cfg = gitea_config.add_profile(cfg, "prgs-author", gitea_config.build_profile(
|
||||||
|
base_url="https://gitea.prgs.cc", auth=gitea_config.env_auth("GITEA_TOKEN_A")))
|
||||||
|
cfg = gitea_config.add_profile(cfg, "prgs-reviewer", gitea_config.build_profile(
|
||||||
|
base_url="https://gitea.prgs.cc",
|
||||||
|
auth=gitea_config.keychain_auth("prgs-reviewer-token")))
|
||||||
|
self.assertEqual(sorted(cfg["profiles"]), ["prgs-author", "prgs-reviewer"])
|
||||||
|
|
||||||
|
def test_add_duplicate_rejected(self):
|
||||||
|
cfg = gitea_config.add_profile(gitea_config.empty_config(), "p",
|
||||||
|
gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V")))
|
||||||
|
with self.assertRaises(gitea_config.ConfigError):
|
||||||
|
gitea_config.add_profile(cfg, "p", gitea_config.build_profile(
|
||||||
|
base_url="https://y", auth=gitea_config.env_auth("W")))
|
||||||
|
|
||||||
|
def test_add_invalid_name_rejected(self):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError):
|
||||||
|
gitea_config.add_profile(gitea_config.empty_config(), "bad name",
|
||||||
|
gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V")))
|
||||||
|
|
||||||
|
def test_add_missing_base_url_rejected(self):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError):
|
||||||
|
gitea_config.add_profile(gitea_config.empty_config(), "p",
|
||||||
|
{"auth": gitea_config.env_auth("V")})
|
||||||
|
|
||||||
|
def test_validate_reports_missing_fields(self):
|
||||||
|
cfg = {"version": 1, "profiles": {
|
||||||
|
"noauth": {"base_url": "https://x"},
|
||||||
|
"nobase": {"auth": gitea_config.env_auth("V")},
|
||||||
|
"inline": {"base_url": "https://x", "auth": gitea_config.env_auth("V"),
|
||||||
|
"token": "leak"},
|
||||||
|
}}
|
||||||
|
problems = gitea_config.validate_config(cfg)
|
||||||
|
joined = "\n".join(problems)
|
||||||
|
self.assertIn("noauth", joined)
|
||||||
|
self.assertIn("nobase", joined)
|
||||||
|
self.assertIn("inline", joined)
|
||||||
|
self.assertNotIn("leak", joined) # inline secret value never echoed
|
||||||
|
|
||||||
|
def test_build_profile_omits_empty(self):
|
||||||
|
p = gitea_config.build_profile(base_url="https://x",
|
||||||
|
auth=gitea_config.env_auth("V"), username="")
|
||||||
|
self.assertNotIn("username", p)
|
||||||
|
self.assertNotIn("default_owner", p)
|
||||||
|
|
||||||
|
|
||||||
|
class TestAtomicSave(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_save_creates_parents_and_pretty_json(self):
|
||||||
|
with tempfile.TemporaryDirectory() as d:
|
||||||
|
path = os.path.join(d, "nested", "dir", "profiles.json")
|
||||||
|
cfg = gitea_config.add_profile(gitea_config.empty_config(), "p",
|
||||||
|
gitea_config.build_profile(base_url="https://x",
|
||||||
|
auth=gitea_config.env_auth("V")))
|
||||||
|
gitea_config.save_config(cfg, path)
|
||||||
|
self.assertTrue(os.path.isfile(path))
|
||||||
|
with open(path, encoding="utf-8") as fh:
|
||||||
|
body = fh.read()
|
||||||
|
self.assertEqual(json.loads(body), cfg)
|
||||||
|
self.assertIn("\n ", body) # pretty-printed
|
||||||
|
# No leftover temp files in the directory.
|
||||||
|
leftovers = [n for n in os.listdir(os.path.dirname(path))
|
||||||
|
if n.startswith(".profiles-")]
|
||||||
|
self.assertEqual(leftovers, [])
|
||||||
|
|
||||||
|
def test_save_is_atomic_via_replace(self):
|
||||||
|
with tempfile.TemporaryDirectory() as d:
|
||||||
|
path = os.path.join(d, "profiles.json")
|
||||||
|
gitea_config.save_config(gitea_config.empty_config(), path)
|
||||||
|
with patch("gitea_config.os.replace", side_effect=OSError("boom")):
|
||||||
|
with self.assertRaises(OSError):
|
||||||
|
gitea_config.save_config(
|
||||||
|
gitea_config.add_profile(gitea_config.empty_config(), "p",
|
||||||
|
gitea_config.build_profile(base_url="https://x",
|
||||||
|
auth=gitea_config.env_auth("V"))), path)
|
||||||
|
# Original file intact; no temp debris.
|
||||||
|
with open(path, encoding="utf-8") as fh:
|
||||||
|
self.assertEqual(json.load(fh), gitea_config.empty_config())
|
||||||
|
leftovers = [n for n in os.listdir(d) if n.startswith(".profiles-")]
|
||||||
|
self.assertEqual(leftovers, [])
|
||||||
|
|
||||||
|
|
||||||
|
class TestKeychainSet(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_stores_token_via_security_without_printing(self):
|
||||||
|
calls = {}
|
||||||
|
|
||||||
|
class _Proc:
|
||||||
|
returncode = 0
|
||||||
|
|
||||||
|
def runner(cmd, **kw):
|
||||||
|
calls["cmd"] = cmd
|
||||||
|
return _Proc()
|
||||||
|
|
||||||
|
out = _Out()
|
||||||
|
rv = gitea_config.keychain_set("prgs-reviewer-token", SECRET, runner=runner)
|
||||||
|
self.assertEqual(rv, "prgs-reviewer-token")
|
||||||
|
self.assertEqual(calls["cmd"][0], "security")
|
||||||
|
self.assertIn(SECRET, calls["cmd"]) # token handed to security...
|
||||||
|
self.assertNotIn(SECRET, out.text) # ...never printed by us
|
||||||
|
|
||||||
|
def test_failure_raises_without_token(self):
|
||||||
|
class _Proc:
|
||||||
|
returncode = 1
|
||||||
|
|
||||||
|
with self.assertRaises(gitea_config.ConfigError) as ctx:
|
||||||
|
gitea_config.keychain_set("id", SECRET, runner=lambda *a, **k: _Proc())
|
||||||
|
self.assertNotIn(SECRET, str(ctx.exception))
|
||||||
|
|
||||||
|
def test_empty_token_rejected(self):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError):
|
||||||
|
gitea_config.keychain_set("id", "", runner=lambda *a, **k: None)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Launcher snippets
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestLauncherSnippets(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_only_safe_keys_no_secrets(self):
|
||||||
|
entry = gitea_config.launcher_entry("prgs", "/cfg/profiles.json")["gitea-tools"]
|
||||||
|
self.assertEqual(set(entry), {"command", "args", "env"})
|
||||||
|
self.assertEqual(set(entry["env"]), {"GITEA_MCP_CONFIG", "GITEA_MCP_PROFILE"})
|
||||||
|
self.assertEqual(entry["env"]["GITEA_MCP_PROFILE"], "prgs")
|
||||||
|
blob = json.dumps(entry).lower()
|
||||||
|
for word in ("token", "password", "secret"):
|
||||||
|
self.assertNotIn(word, blob)
|
||||||
|
|
||||||
|
def test_snippets_for_all_clients(self):
|
||||||
|
snips = menu.launcher_snippets("prgs", "/cfg/profiles.json")
|
||||||
|
self.assertEqual(set(snips), {"claude", "gemini", "codex"})
|
||||||
|
for text in snips.values():
|
||||||
|
self.assertIn("GITEA_MCP_PROFILE", text)
|
||||||
|
self.assertNotIn("token", text.lower())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Auth test + reviewer eligibility (mocked API + token resolution)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestIdentityAndEligibility(unittest.TestCase):
|
||||||
|
|
||||||
|
def _profile(self):
|
||||||
|
return {"base_url": "https://gitea.prgs.cc",
|
||||||
|
"default_owner": "Org", "default_repo": "Repo",
|
||||||
|
"auth": gitea_config.env_auth("GITEA_TOKEN_X")}
|
||||||
|
|
||||||
|
def test_resolve_identity(self):
|
||||||
|
def api(m, url, auth):
|
||||||
|
return {"login": "reviewer-bot"} if url.endswith("/user") else {}
|
||||||
|
who = menu.resolve_identity(self._profile(),
|
||||||
|
resolve_token=lambda p: "tok", api_request=api)
|
||||||
|
self.assertEqual(who, "reviewer-bot")
|
||||||
|
|
||||||
|
def test_eligible_when_open_and_not_author(self):
|
||||||
|
def api(m, url, auth):
|
||||||
|
if url.endswith("/user"):
|
||||||
|
return {"login": "reviewer-bot"}
|
||||||
|
return {"user": {"login": "author-bot"}, "state": "open"}
|
||||||
|
r = menu.check_eligibility(self._profile(), 8,
|
||||||
|
resolve_token=lambda p: "tok", api_request=api)
|
||||||
|
self.assertTrue(r["eligible"])
|
||||||
|
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
||||||
|
self.assertEqual(r["pr_author"], "author-bot")
|
||||||
|
|
||||||
|
def test_ineligible_when_self_author(self):
|
||||||
|
def api(m, url, auth):
|
||||||
|
if url.endswith("/user"):
|
||||||
|
return {"login": "same-bot"}
|
||||||
|
return {"user": {"login": "same-bot"}, "state": "open"}
|
||||||
|
r = menu.check_eligibility(self._profile(), 8,
|
||||||
|
resolve_token=lambda p: "tok", api_request=api)
|
||||||
|
self.assertFalse(r["eligible"])
|
||||||
|
self.assertTrue(any("author" in x for x in r["reasons"]))
|
||||||
|
|
||||||
|
def test_ineligible_when_closed(self):
|
||||||
|
def api(m, url, auth):
|
||||||
|
if url.endswith("/user"):
|
||||||
|
return {"login": "reviewer-bot"}
|
||||||
|
return {"user": {"login": "author-bot"}, "state": "closed"}
|
||||||
|
r = menu.check_eligibility(self._profile(), 8,
|
||||||
|
resolve_token=lambda p: "tok", api_request=api)
|
||||||
|
self.assertFalse(r["eligible"])
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Prompt flow + full menu loop (no real IO/keychain/network)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestMenuFlow(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_prompt_keychain_stores_but_profile_has_no_token(self):
|
||||||
|
stored = {}
|
||||||
|
answers = ["prgs-reviewer", "https://gitea.prgs.cc", "jcwalker3",
|
||||||
|
"Org", "Repo", "personal-prgs", "keychain",
|
||||||
|
"prgs-reviewer-token", "y"]
|
||||||
|
out = _Out()
|
||||||
|
name, profile = menu._prompt_profile(
|
||||||
|
_InputQueue(answers), secret_fn=lambda _p: SECRET, out=out,
|
||||||
|
keychain_set=lambda i, t: stored.update({i: t}))
|
||||||
|
self.assertEqual(name, "prgs-reviewer")
|
||||||
|
self.assertEqual(profile["auth"], {"type": "keychain", "id": "prgs-reviewer-token"})
|
||||||
|
self.assertNotIn(SECRET, json.dumps(profile)) # token not in profile
|
||||||
|
self.assertNotIn(SECRET, out.text) # token not printed
|
||||||
|
self.assertEqual(stored, {"prgs-reviewer-token": SECRET}) # stored via keychain
|
||||||
|
|
||||||
|
def test_prompt_env_reference(self):
|
||||||
|
answers = ["prgs-author", "https://gitea.prgs.cc", "jcwalker3",
|
||||||
|
"", "", "personal-prgs", "env", ""] # blank -> default var name
|
||||||
|
name, profile = menu._prompt_profile(
|
||||||
|
_InputQueue(answers), secret_fn=lambda _p: SECRET, out=_Out(),
|
||||||
|
keychain_set=lambda *a: None)
|
||||||
|
self.assertEqual(profile["auth"], {"type": "env", "name": "GITEA_TOKEN_PRGS_AUTHOR"})
|
||||||
|
|
||||||
|
def test_menu_add_then_quit_writes_config_without_secrets(self):
|
||||||
|
with tempfile.TemporaryDirectory() as d:
|
||||||
|
path = os.path.join(d, "profiles.json")
|
||||||
|
answers = [
|
||||||
|
"2", # add profile
|
||||||
|
"prgs-reviewer", "https://gitea.prgs.cc", "jcwalker3",
|
||||||
|
"Org", "Repo", "personal-prgs", "keychain",
|
||||||
|
"prgs-reviewer-token", "y",
|
||||||
|
"1", # list
|
||||||
|
"0", # quit
|
||||||
|
]
|
||||||
|
out = _Out()
|
||||||
|
rc = menu.main(
|
||||||
|
input_fn=_InputQueue(answers), secret_fn=lambda _p: SECRET,
|
||||||
|
out=out, config_path=path, keychain_set=lambda *a: None)
|
||||||
|
self.assertEqual(rc, 0)
|
||||||
|
self.assertTrue(os.path.isfile(path))
|
||||||
|
with open(path, encoding="utf-8") as fh:
|
||||||
|
body = fh.read()
|
||||||
|
self.assertIn("prgs-reviewer", body)
|
||||||
|
self.assertIn("prgs-reviewer-token", body) # keychain id (non-secret)
|
||||||
|
self.assertNotIn(SECRET, body) # never the token value
|
||||||
|
self.assertNotIn(SECRET, out.text)
|
||||||
|
self.assertIn("prgs-reviewer", out.text) # listed
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user