69d4edf37d
Make the interactive profile menu feel like a real terminal menu, via a new injectable MenuIO abstraction (no menu logic change, no auth/secret-storage change). - Single-key top-level actions in a TTY (termios/tty raw read); no Enter needed. Non-TTY / test runs fall back to line input. - Enter backs out: Enter (or 0) on the main menu quits; Enter cancels any submenu/profile prompt and returns. - Profile chooser: everywhere a profile is needed, show a numbered list and pick by key (1-9), with an explicit 'm) type a name manually' path and Enter to cancel. Empty config handled gracefully. - Clear screen before redrawing the main menu and chooser — TTY only; never emits clear codes in non-TTY/test runs. - Result actions (validate/test-auth/whoami/eligibility) print a concise result then pause for a keypress in a TTY; non-TTY never blocks. Helpers: read_key (via default_io) / choose_menu_option / choose_profile / clear_screen / pause_for_key, plus MenuIO(is_tty, clear_enabled). TTY detected with sys.stdin.isatty() and sys.stdout.isatty(); stdlib only. Safety unchanged: no tokens/passwords printed, no raw config dumps, no .env.personal, no change to auth behavior or secret storage. Tests: rewrote menu tests around a scripted _FakeIO (no real terminal): single- key select + clear, main-menu Enter/0 quit, submenu Enter cancel (no change), chooser lists/selects/no-profiles/manual/out-of-range, non-TTY line fallback, clear-only-when-enabled, pause never hangs non-TTY, and add-flow proving the token value never reaches disk or stdout. Docs: runbook note on single-key nav / Enter back-out / numbered chooser. scripts/gitea-config-menu unchanged. Closes #36. Refs #31, #34. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
424 lines
18 KiB
Python
424 lines
18 KiB
Python
"""Tests for the interactive profile-setup menu (#31): gitea_config authoring
|
|
helpers + gitea_config_menu, all exercised without a real keychain, terminal,
|
|
or network via injected IO/auth/HTTP.
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from unittest.mock import patch
|
|
|
|
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
|
|
|
import gitea_config # noqa: E402
|
|
import gitea_config_menu as menu # noqa: E402
|
|
|
|
SECRET = "super-secret-token-value"
|
|
|
|
|
|
class _InputQueue:
|
|
"""Callable that returns queued answers for successive input() prompts."""
|
|
|
|
def __init__(self, answers):
|
|
self._answers = list(answers)
|
|
|
|
def __call__(self, _prompt=""):
|
|
return self._answers.pop(0)
|
|
|
|
|
|
class _Out:
|
|
"""Capture out() lines."""
|
|
|
|
def __init__(self):
|
|
self.lines = []
|
|
|
|
def __call__(self, *args):
|
|
self.lines.append(" ".join(str(a) for a in args))
|
|
|
|
@property
|
|
def text(self):
|
|
return "\n".join(self.lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config authoring helpers
|
|
# ---------------------------------------------------------------------------
|
|
class TestAuthoringHelpers(unittest.TestCase):
|
|
|
|
def test_profile_name_validation(self):
|
|
self.assertTrue(gitea_config.is_valid_profile_name("prgs-reviewer"))
|
|
self.assertTrue(gitea_config.is_valid_profile_name("prgs.author_1"))
|
|
for bad in ("", "-leading", "has space", "a/b", "b@d"):
|
|
self.assertFalse(gitea_config.is_valid_profile_name(bad), bad)
|
|
|
|
def test_add_profile_preserves_existing(self):
|
|
cfg = gitea_config.empty_config()
|
|
cfg = gitea_config.add_profile(cfg, "prgs-author", gitea_config.build_profile(
|
|
base_url="https://gitea.prgs.cc", auth=gitea_config.env_auth("GITEA_TOKEN_A")))
|
|
cfg = gitea_config.add_profile(cfg, "prgs-reviewer", gitea_config.build_profile(
|
|
base_url="https://gitea.prgs.cc",
|
|
auth=gitea_config.keychain_auth("prgs-reviewer-token")))
|
|
self.assertEqual(sorted(cfg["profiles"]), ["prgs-author", "prgs-reviewer"])
|
|
|
|
def test_add_duplicate_rejected(self):
|
|
cfg = gitea_config.add_profile(gitea_config.empty_config(), "p",
|
|
gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V")))
|
|
with self.assertRaises(gitea_config.ConfigError):
|
|
gitea_config.add_profile(cfg, "p", gitea_config.build_profile(
|
|
base_url="https://y", auth=gitea_config.env_auth("W")))
|
|
|
|
def test_add_invalid_name_rejected(self):
|
|
with self.assertRaises(gitea_config.ConfigError):
|
|
gitea_config.add_profile(gitea_config.empty_config(), "bad name",
|
|
gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V")))
|
|
|
|
def test_add_missing_base_url_rejected(self):
|
|
with self.assertRaises(gitea_config.ConfigError):
|
|
gitea_config.add_profile(gitea_config.empty_config(), "p",
|
|
{"auth": gitea_config.env_auth("V")})
|
|
|
|
def test_validate_reports_missing_fields(self):
|
|
cfg = {"version": 1, "profiles": {
|
|
"noauth": {"base_url": "https://x"},
|
|
"nobase": {"auth": gitea_config.env_auth("V")},
|
|
"inline": {"base_url": "https://x", "auth": gitea_config.env_auth("V"),
|
|
"token": "leak"},
|
|
}}
|
|
problems = gitea_config.validate_config(cfg)
|
|
joined = "\n".join(problems)
|
|
self.assertIn("noauth", joined)
|
|
self.assertIn("nobase", joined)
|
|
self.assertIn("inline", joined)
|
|
self.assertNotIn("leak", joined) # inline secret value never echoed
|
|
|
|
def test_build_profile_omits_empty(self):
|
|
p = gitea_config.build_profile(base_url="https://x",
|
|
auth=gitea_config.env_auth("V"), username="")
|
|
self.assertNotIn("username", p)
|
|
self.assertNotIn("default_owner", p)
|
|
|
|
|
|
class TestAtomicSave(unittest.TestCase):
|
|
|
|
def test_save_creates_parents_and_pretty_json(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = os.path.join(d, "nested", "dir", "profiles.json")
|
|
cfg = gitea_config.add_profile(gitea_config.empty_config(), "p",
|
|
gitea_config.build_profile(base_url="https://x",
|
|
auth=gitea_config.env_auth("V")))
|
|
gitea_config.save_config(cfg, path)
|
|
self.assertTrue(os.path.isfile(path))
|
|
with open(path, encoding="utf-8") as fh:
|
|
body = fh.read()
|
|
self.assertEqual(json.loads(body), cfg)
|
|
self.assertIn("\n ", body) # pretty-printed
|
|
# No leftover temp files in the directory.
|
|
leftovers = [n for n in os.listdir(os.path.dirname(path))
|
|
if n.startswith(".profiles-")]
|
|
self.assertEqual(leftovers, [])
|
|
|
|
def test_save_is_atomic_via_replace(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = os.path.join(d, "profiles.json")
|
|
gitea_config.save_config(gitea_config.empty_config(), path)
|
|
with patch("gitea_config.os.replace", side_effect=OSError("boom")):
|
|
with self.assertRaises(OSError):
|
|
gitea_config.save_config(
|
|
gitea_config.add_profile(gitea_config.empty_config(), "p",
|
|
gitea_config.build_profile(base_url="https://x",
|
|
auth=gitea_config.env_auth("V"))), path)
|
|
# Original file intact; no temp debris.
|
|
with open(path, encoding="utf-8") as fh:
|
|
self.assertEqual(json.load(fh), gitea_config.empty_config())
|
|
leftovers = [n for n in os.listdir(d) if n.startswith(".profiles-")]
|
|
self.assertEqual(leftovers, [])
|
|
|
|
|
|
class TestKeychainSet(unittest.TestCase):
|
|
|
|
def test_stores_token_via_security_without_printing(self):
|
|
calls = {}
|
|
|
|
class _Proc:
|
|
returncode = 0
|
|
|
|
def runner(cmd, **kw):
|
|
calls["cmd"] = cmd
|
|
return _Proc()
|
|
|
|
out = _Out()
|
|
rv = gitea_config.keychain_set("prgs-reviewer-token", SECRET, runner=runner)
|
|
self.assertEqual(rv, "prgs-reviewer-token")
|
|
self.assertEqual(calls["cmd"][0], "security")
|
|
self.assertIn(SECRET, calls["cmd"]) # token handed to security...
|
|
self.assertNotIn(SECRET, out.text) # ...never printed by us
|
|
|
|
def test_failure_raises_without_token(self):
|
|
class _Proc:
|
|
returncode = 1
|
|
|
|
with self.assertRaises(gitea_config.ConfigError) as ctx:
|
|
gitea_config.keychain_set("id", SECRET, runner=lambda *a, **k: _Proc())
|
|
self.assertNotIn(SECRET, str(ctx.exception))
|
|
|
|
def test_empty_token_rejected(self):
|
|
with self.assertRaises(gitea_config.ConfigError):
|
|
gitea_config.keychain_set("id", "", runner=lambda *a, **k: None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Launcher snippets
|
|
# ---------------------------------------------------------------------------
|
|
class TestLauncherSnippets(unittest.TestCase):
|
|
|
|
def test_only_safe_keys_no_secrets(self):
|
|
entry = gitea_config.launcher_entry("prgs", "/cfg/profiles.json")["gitea-tools"]
|
|
self.assertEqual(set(entry), {"command", "args", "env"})
|
|
self.assertEqual(set(entry["env"]), {"GITEA_MCP_CONFIG", "GITEA_MCP_PROFILE"})
|
|
self.assertEqual(entry["env"]["GITEA_MCP_PROFILE"], "prgs")
|
|
blob = json.dumps(entry).lower()
|
|
for word in ("token", "password", "secret"):
|
|
self.assertNotIn(word, blob)
|
|
|
|
def test_snippets_for_all_clients(self):
|
|
snips = menu.launcher_snippets("prgs", "/cfg/profiles.json")
|
|
self.assertEqual(set(snips), {"claude", "gemini", "codex"})
|
|
for text in snips.values():
|
|
self.assertIn("GITEA_MCP_PROFILE", text)
|
|
self.assertNotIn("token", text.lower())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth test + reviewer eligibility (mocked API + token resolution)
|
|
# ---------------------------------------------------------------------------
|
|
class TestIdentityAndEligibility(unittest.TestCase):
|
|
|
|
def _profile(self):
|
|
return {"base_url": "https://gitea.prgs.cc",
|
|
"default_owner": "Org", "default_repo": "Repo",
|
|
"auth": gitea_config.env_auth("GITEA_TOKEN_X")}
|
|
|
|
def test_resolve_identity(self):
|
|
def api(m, url, auth):
|
|
return {"login": "reviewer-bot"} if url.endswith("/user") else {}
|
|
who = menu.resolve_identity(self._profile(),
|
|
resolve_token=lambda p: "tok", api_request=api)
|
|
self.assertEqual(who, "reviewer-bot")
|
|
|
|
def test_eligible_when_open_and_not_author(self):
|
|
def api(m, url, auth):
|
|
if url.endswith("/user"):
|
|
return {"login": "reviewer-bot"}
|
|
return {"user": {"login": "author-bot"}, "state": "open"}
|
|
r = menu.check_eligibility(self._profile(), 8,
|
|
resolve_token=lambda p: "tok", api_request=api)
|
|
self.assertTrue(r["eligible"])
|
|
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
|
self.assertEqual(r["pr_author"], "author-bot")
|
|
|
|
def test_ineligible_when_self_author(self):
|
|
def api(m, url, auth):
|
|
if url.endswith("/user"):
|
|
return {"login": "same-bot"}
|
|
return {"user": {"login": "same-bot"}, "state": "open"}
|
|
r = menu.check_eligibility(self._profile(), 8,
|
|
resolve_token=lambda p: "tok", api_request=api)
|
|
self.assertFalse(r["eligible"])
|
|
self.assertTrue(any("author" in x for x in r["reasons"]))
|
|
|
|
def test_ineligible_when_closed(self):
|
|
def api(m, url, auth):
|
|
if url.endswith("/user"):
|
|
return {"login": "reviewer-bot"}
|
|
return {"user": {"login": "author-bot"}, "state": "closed"}
|
|
r = menu.check_eligibility(self._profile(), 8,
|
|
resolve_token=lambda p: "tok", api_request=api)
|
|
self.assertFalse(r["eligible"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UX: single-key nav, Enter back-out, profile chooser, clear, pause (#36)
|
|
# ---------------------------------------------------------------------------
|
|
class _FakeIO:
|
|
"""Duck-typed MenuIO: scripted keys/lines, captured output, clear counter.
|
|
|
|
read_key/read_line pop from their queues; when a queue is exhausted they
|
|
return "" (Enter/EOF → cancel/quit), so a script can never hang the loop.
|
|
"""
|
|
|
|
def __init__(self, keys=(), lines=(), is_tty=False, clear_enabled=None):
|
|
self.keys = list(keys)
|
|
self.lines = list(lines)
|
|
self.out_lines = []
|
|
self.clears = 0
|
|
self.is_tty = is_tty
|
|
self.clear_enabled = is_tty if clear_enabled is None else clear_enabled
|
|
|
|
def read_key(self, prompt=""):
|
|
return self.keys.pop(0) if self.keys else ""
|
|
|
|
def read_line(self, prompt=""):
|
|
return self.lines.pop(0) if self.lines else ""
|
|
|
|
def out(self, text=""):
|
|
self.out_lines.append(str(text))
|
|
|
|
def clear(self):
|
|
if self.clear_enabled:
|
|
self.clears += 1
|
|
|
|
@property
|
|
def text(self):
|
|
return "\n".join(self.out_lines)
|
|
|
|
|
|
def _cfg(*names):
|
|
cfg = gitea_config.empty_config()
|
|
for n in names:
|
|
cfg["profiles"][n] = {"base_url": "https://x",
|
|
"auth": gitea_config.env_auth("V")}
|
|
return cfg
|
|
|
|
|
|
class TestChooser(unittest.TestCase):
|
|
|
|
def test_menu_single_key_and_clears(self):
|
|
io = _FakeIO(keys=["3"], is_tty=True)
|
|
self.assertEqual(menu.choose_menu_option(io), "3") # one keypress, no Enter
|
|
self.assertEqual(io.clears, 1) # cleared before draw
|
|
self.assertIn("Gitea MCP profile setup", io.text)
|
|
|
|
def test_profile_chooser_lists_and_selects_by_key(self):
|
|
io = _FakeIO(keys=["2"])
|
|
self.assertEqual(menu.choose_profile(io, _cfg("aaa", "bbb", "ccc")), "bbb")
|
|
self.assertIn("1) aaa", io.text)
|
|
self.assertIn("2) bbb", io.text)
|
|
|
|
def test_profile_chooser_enter_cancels(self):
|
|
io = _FakeIO(keys=[""])
|
|
self.assertIsNone(menu.choose_profile(io, _cfg("aaa")))
|
|
|
|
def test_profile_chooser_no_profiles(self):
|
|
io = _FakeIO(keys=[])
|
|
self.assertIsNone(menu.choose_profile(io, gitea_config.empty_config()))
|
|
self.assertIn("no profiles", io.text)
|
|
|
|
def test_profile_chooser_manual_entry(self):
|
|
io = _FakeIO(keys=["m"], lines=["typed-name"])
|
|
self.assertEqual(menu.choose_profile(io, _cfg("aaa")), "typed-name")
|
|
|
|
def test_profile_chooser_out_of_range_key(self):
|
|
io = _FakeIO(keys=["9"])
|
|
self.assertIsNone(menu.choose_profile(io, _cfg("aaa")))
|
|
self.assertIn("invalid selection", io.text)
|
|
|
|
|
|
class TestClearAndPause(unittest.TestCase):
|
|
|
|
def test_clear_only_when_enabled(self):
|
|
off = _FakeIO(keys=["x"], clear_enabled=False)
|
|
menu.choose_menu_option(off)
|
|
self.assertEqual(off.clears, 0)
|
|
on = _FakeIO(keys=["x"], clear_enabled=True)
|
|
menu.choose_menu_option(on)
|
|
self.assertEqual(on.clears, 1)
|
|
|
|
def test_pause_does_not_read_in_non_tty(self):
|
|
io = _FakeIO(keys=["a"], is_tty=False)
|
|
menu.pause_for_key(io)
|
|
self.assertEqual(io.keys, ["a"]) # nothing consumed, no hang
|
|
|
|
def test_pause_reads_one_key_in_tty(self):
|
|
io = _FakeIO(keys=["a"], is_tty=True)
|
|
menu.pause_for_key(io)
|
|
self.assertEqual(io.keys, []) # consumed the keypress
|
|
|
|
def test_default_io_non_tty_line_fallback(self):
|
|
import io as _io
|
|
with patch("gitea_config_menu.sys.stdin", _io.StringIO("7\nrest\n")):
|
|
mio = menu.default_io()
|
|
self.assertFalse(mio.is_tty)
|
|
self.assertEqual(mio.read_key(), "7") # first char of the line
|
|
|
|
|
|
class TestMenuNav(unittest.TestCase):
|
|
|
|
def test_main_enter_quits(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
io = _FakeIO(keys=[""]) # Enter at main menu
|
|
rc = menu.main(io=io, config_path=os.path.join(d, "p.json"),
|
|
secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None)
|
|
self.assertEqual(rc, 0)
|
|
self.assertIn("bye", io.text)
|
|
|
|
def test_main_zero_quits(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
io = _FakeIO(keys=["0"])
|
|
self.assertEqual(
|
|
menu.main(io=io, config_path=os.path.join(d, "p.json"),
|
|
secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None), 0)
|
|
|
|
def test_submenu_enter_cancels_without_change(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = os.path.join(d, "p.json")
|
|
gitea_config.save_config(_cfg("keep-me"), path)
|
|
io = _FakeIO(keys=["4", ""]) # remove → chooser Enter cancels → quit
|
|
menu.main(io=io, config_path=path,
|
|
secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None)
|
|
with open(path, encoding="utf-8") as fh:
|
|
self.assertIn("keep-me", fh.read()) # not removed
|
|
|
|
|
|
class TestMenuAddFlow(unittest.TestCase):
|
|
|
|
def test_add_then_quit_writes_config_without_secrets(self):
|
|
with tempfile.TemporaryDirectory() as d:
|
|
path = os.path.join(d, "profiles.json")
|
|
io = _FakeIO(
|
|
keys=["2", ""], # add profile, then Enter → quit
|
|
lines=["prgs-reviewer", "https://gitea.prgs.cc", "jcwalker3",
|
|
"Org", "Repo", "personal-prgs", "keychain",
|
|
"prgs-reviewer-token", "y"],
|
|
)
|
|
rc = menu.main(io=io, config_path=path, secret_fn=lambda _p: SECRET,
|
|
keychain_set=lambda *a: None)
|
|
self.assertEqual(rc, 0)
|
|
with open(path, encoding="utf-8") as fh:
|
|
body = fh.read()
|
|
self.assertIn("prgs-reviewer", body)
|
|
self.assertIn("prgs-reviewer-token", body) # keychain id (non-secret)
|
|
self.assertNotIn(SECRET, body) # token value never on disk
|
|
self.assertNotIn(SECRET, io.text) # nor printed
|
|
|
|
def test_prompt_fields_keychain_no_token_leak(self):
|
|
stored = {}
|
|
io = _FakeIO(lines=["https://gitea.prgs.cc", "jcwalker3", "Org", "Repo",
|
|
"personal-prgs", "keychain", "prgs-reviewer-token", "y"])
|
|
profile = menu._prompt_profile_fields(
|
|
io, secret_fn=lambda _p: SECRET,
|
|
keychain_set=lambda i, t: stored.update({i: t}), name="prgs-reviewer")
|
|
self.assertEqual(profile["auth"], {"type": "keychain", "id": "prgs-reviewer-token"})
|
|
self.assertNotIn(SECRET, json.dumps(profile))
|
|
self.assertNotIn(SECRET, io.text)
|
|
self.assertEqual(stored, {"prgs-reviewer-token": SECRET})
|
|
|
|
def test_prompt_fields_env_reference(self):
|
|
io = _FakeIO(lines=["https://gitea.prgs.cc", "jcwalker3", "", "",
|
|
"personal-prgs", "env", ""])
|
|
profile = menu._prompt_profile_fields(
|
|
io, secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None,
|
|
name="prgs-author")
|
|
self.assertEqual(profile["auth"],
|
|
{"type": "env", "name": "GITEA_TOKEN_PRGS_AUTHOR"})
|
|
|
|
def test_prompt_fields_blank_base_url_cancels(self):
|
|
io = _FakeIO(lines=[""]) # blank Base URL → cancel
|
|
self.assertIsNone(menu._prompt_profile_fields(
|
|
io, secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None,
|
|
name="x"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|