"""Tests for the interactive profile-setup menu (#31): gitea_config authoring helpers + gitea_config_menu, all exercised without a real keychain, terminal, or network via injected IO/auth/HTTP. """ import os import sys import json import tempfile import unittest from unittest.mock import patch sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) import gitea_config # noqa: E402 import gitea_config_menu as menu # noqa: E402 SECRET = "super-secret-token-value" class _InputQueue: """Callable that returns queued answers for successive input() prompts.""" def __init__(self, answers): self._answers = list(answers) def __call__(self, _prompt=""): return self._answers.pop(0) class _Out: """Capture out() lines.""" def __init__(self): self.lines = [] def __call__(self, *args): self.lines.append(" ".join(str(a) for a in args)) @property def text(self): return "\n".join(self.lines) # --------------------------------------------------------------------------- # Config authoring helpers # --------------------------------------------------------------------------- class TestAuthoringHelpers(unittest.TestCase): def test_profile_name_validation(self): self.assertTrue(gitea_config.is_valid_profile_name("prgs-reviewer")) self.assertTrue(gitea_config.is_valid_profile_name("prgs.author_1")) for bad in ("", "-leading", "has space", "a/b", "b@d"): self.assertFalse(gitea_config.is_valid_profile_name(bad), bad) def test_add_profile_preserves_existing(self): cfg = gitea_config.empty_config() cfg = gitea_config.add_profile(cfg, "prgs-author", gitea_config.build_profile( base_url="https://gitea.prgs.cc", auth=gitea_config.env_auth("GITEA_TOKEN_A"))) cfg = gitea_config.add_profile(cfg, "prgs-reviewer", gitea_config.build_profile( base_url="https://gitea.prgs.cc", auth=gitea_config.keychain_auth("prgs-reviewer-token"))) self.assertEqual(sorted(cfg["profiles"]), ["prgs-author", "prgs-reviewer"]) def test_add_duplicate_rejected(self): cfg = gitea_config.add_profile(gitea_config.empty_config(), "p", gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"))) with self.assertRaises(gitea_config.ConfigError): gitea_config.add_profile(cfg, "p", gitea_config.build_profile( base_url="https://y", auth=gitea_config.env_auth("W"))) def test_add_invalid_name_rejected(self): with self.assertRaises(gitea_config.ConfigError): gitea_config.add_profile(gitea_config.empty_config(), "bad name", gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"))) def test_add_missing_base_url_rejected(self): with self.assertRaises(gitea_config.ConfigError): gitea_config.add_profile(gitea_config.empty_config(), "p", {"auth": gitea_config.env_auth("V")}) def test_validate_reports_missing_fields(self): cfg = {"version": 1, "profiles": { "noauth": {"base_url": "https://x"}, "nobase": {"auth": gitea_config.env_auth("V")}, "inline": {"base_url": "https://x", "auth": gitea_config.env_auth("V"), "token": "leak"}, }} problems = gitea_config.validate_config(cfg) joined = "\n".join(problems) self.assertIn("noauth", joined) self.assertIn("nobase", joined) self.assertIn("inline", joined) self.assertNotIn("leak", joined) # inline secret value never echoed def test_build_profile_omits_empty(self): p = gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"), username="") self.assertNotIn("username", p) self.assertNotIn("default_owner", p) class TestAtomicSave(unittest.TestCase): def test_save_creates_parents_and_pretty_json(self): with tempfile.TemporaryDirectory() as d: path = os.path.join(d, "nested", "dir", "profiles.json") cfg = gitea_config.add_profile(gitea_config.empty_config(), "p", gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"))) gitea_config.save_config(cfg, path) self.assertTrue(os.path.isfile(path)) with open(path, encoding="utf-8") as fh: body = fh.read() self.assertEqual(json.loads(body), cfg) self.assertIn("\n ", body) # pretty-printed # No leftover temp files in the directory. leftovers = [n for n in os.listdir(os.path.dirname(path)) if n.startswith(".profiles-")] self.assertEqual(leftovers, []) def test_save_is_atomic_via_replace(self): with tempfile.TemporaryDirectory() as d: path = os.path.join(d, "profiles.json") gitea_config.save_config(gitea_config.empty_config(), path) with patch("gitea_config.os.replace", side_effect=OSError("boom")): with self.assertRaises(OSError): gitea_config.save_config( gitea_config.add_profile(gitea_config.empty_config(), "p", gitea_config.build_profile(base_url="https://x", auth=gitea_config.env_auth("V"))), path) # Original file intact; no temp debris. with open(path, encoding="utf-8") as fh: self.assertEqual(json.load(fh), gitea_config.empty_config()) leftovers = [n for n in os.listdir(d) if n.startswith(".profiles-")] self.assertEqual(leftovers, []) class TestKeychainSet(unittest.TestCase): def test_stores_token_via_security_without_printing(self): calls = {} class _Proc: returncode = 0 def runner(cmd, **kw): calls["cmd"] = cmd return _Proc() out = _Out() rv = gitea_config.keychain_set("prgs-reviewer-token", SECRET, runner=runner) self.assertEqual(rv, "prgs-reviewer-token") self.assertEqual(calls["cmd"][0], "security") self.assertIn(SECRET, calls["cmd"]) # token handed to security... self.assertNotIn(SECRET, out.text) # ...never printed by us def test_failure_raises_without_token(self): class _Proc: returncode = 1 with self.assertRaises(gitea_config.ConfigError) as ctx: gitea_config.keychain_set("id", SECRET, runner=lambda *a, **k: _Proc()) self.assertNotIn(SECRET, str(ctx.exception)) def test_empty_token_rejected(self): with self.assertRaises(gitea_config.ConfigError): gitea_config.keychain_set("id", "", runner=lambda *a, **k: None) # --------------------------------------------------------------------------- # Launcher snippets # --------------------------------------------------------------------------- class TestLauncherSnippets(unittest.TestCase): def test_only_safe_keys_no_secrets(self): entry = gitea_config.launcher_entry("prgs", "/cfg/profiles.json")["gitea-tools"] self.assertEqual(set(entry), {"command", "args", "env"}) self.assertEqual(set(entry["env"]), {"GITEA_MCP_CONFIG", "GITEA_MCP_PROFILE"}) self.assertEqual(entry["env"]["GITEA_MCP_PROFILE"], "prgs") blob = json.dumps(entry).lower() for word in ("token", "password", "secret"): self.assertNotIn(word, blob) def test_snippets_for_all_clients(self): snips = menu.launcher_snippets("prgs", "/cfg/profiles.json") self.assertEqual(set(snips), {"claude", "gemini", "codex"}) for text in snips.values(): self.assertIn("GITEA_MCP_PROFILE", text) self.assertNotIn("token", text.lower()) # --------------------------------------------------------------------------- # Auth test + reviewer eligibility (mocked API + token resolution) # --------------------------------------------------------------------------- class TestIdentityAndEligibility(unittest.TestCase): def _profile(self): return {"base_url": "https://gitea.prgs.cc", "default_owner": "Org", "default_repo": "Repo", "auth": gitea_config.env_auth("GITEA_TOKEN_X")} def test_resolve_identity(self): def api(m, url, auth): return {"login": "reviewer-bot"} if url.endswith("/user") else {} who = menu.resolve_identity(self._profile(), resolve_token=lambda p: "tok", api_request=api) self.assertEqual(who, "reviewer-bot") def test_eligible_when_open_and_not_author(self): def api(m, url, auth): if url.endswith("/user"): return {"login": "reviewer-bot"} return {"user": {"login": "author-bot"}, "state": "open"} r = menu.check_eligibility(self._profile(), 8, resolve_token=lambda p: "tok", api_request=api) self.assertTrue(r["eligible"]) self.assertEqual(r["authenticated_user"], "reviewer-bot") self.assertEqual(r["pr_author"], "author-bot") def test_ineligible_when_self_author(self): def api(m, url, auth): if url.endswith("/user"): return {"login": "same-bot"} return {"user": {"login": "same-bot"}, "state": "open"} r = menu.check_eligibility(self._profile(), 8, resolve_token=lambda p: "tok", api_request=api) self.assertFalse(r["eligible"]) self.assertTrue(any("author" in x for x in r["reasons"])) def test_ineligible_when_closed(self): def api(m, url, auth): if url.endswith("/user"): return {"login": "reviewer-bot"} return {"user": {"login": "author-bot"}, "state": "closed"} r = menu.check_eligibility(self._profile(), 8, resolve_token=lambda p: "tok", api_request=api) self.assertFalse(r["eligible"]) # --------------------------------------------------------------------------- # UX: single-key nav, Enter back-out, profile chooser, clear, pause (#36) # --------------------------------------------------------------------------- class _FakeIO: """Duck-typed MenuIO: scripted keys/lines, captured output, clear counter. read_key/read_line pop from their queues; when a queue is exhausted they return "" (Enter/EOF → cancel/quit), so a script can never hang the loop. """ def __init__(self, keys=(), lines=(), is_tty=False, clear_enabled=None): self.keys = list(keys) self.lines = list(lines) self.out_lines = [] self.clears = 0 self.is_tty = is_tty self.clear_enabled = is_tty if clear_enabled is None else clear_enabled def read_key(self, prompt=""): return self.keys.pop(0) if self.keys else "" def read_line(self, prompt=""): return self.lines.pop(0) if self.lines else "" def out(self, text=""): self.out_lines.append(str(text)) def clear(self): if self.clear_enabled: self.clears += 1 @property def text(self): return "\n".join(self.out_lines) def _cfg(*names): cfg = gitea_config.empty_config() for n in names: cfg["profiles"][n] = {"base_url": "https://x", "auth": gitea_config.env_auth("V")} return cfg class TestChooser(unittest.TestCase): def test_menu_single_key_and_clears(self): io = _FakeIO(keys=["3"], is_tty=True) self.assertEqual(menu.choose_menu_option(io), "3") # one keypress, no Enter self.assertEqual(io.clears, 1) # cleared before draw self.assertIn("Gitea MCP profile setup", io.text) def test_profile_chooser_lists_and_selects_by_key(self): io = _FakeIO(keys=["2"]) self.assertEqual(menu.choose_profile(io, _cfg("aaa", "bbb", "ccc")), "bbb") self.assertIn("1) aaa", io.text) self.assertIn("2) bbb", io.text) def test_profile_chooser_enter_cancels(self): io = _FakeIO(keys=[""]) self.assertIsNone(menu.choose_profile(io, _cfg("aaa"))) def test_profile_chooser_no_profiles(self): io = _FakeIO(keys=[]) self.assertIsNone(menu.choose_profile(io, gitea_config.empty_config())) self.assertIn("no profiles", io.text) def test_profile_chooser_manual_entry(self): io = _FakeIO(keys=["m"], lines=["typed-name"]) self.assertEqual(menu.choose_profile(io, _cfg("aaa")), "typed-name") def test_profile_chooser_out_of_range_key(self): io = _FakeIO(keys=["9"]) self.assertIsNone(menu.choose_profile(io, _cfg("aaa"))) self.assertIn("invalid selection", io.text) class TestClearAndPause(unittest.TestCase): def test_clear_only_when_enabled(self): off = _FakeIO(keys=["x"], clear_enabled=False) menu.choose_menu_option(off) self.assertEqual(off.clears, 0) on = _FakeIO(keys=["x"], clear_enabled=True) menu.choose_menu_option(on) self.assertEqual(on.clears, 1) def test_pause_does_not_read_in_non_tty(self): io = _FakeIO(keys=["a"], is_tty=False) menu.pause_for_key(io) self.assertEqual(io.keys, ["a"]) # nothing consumed, no hang def test_pause_reads_one_key_in_tty(self): io = _FakeIO(keys=["a"], is_tty=True) menu.pause_for_key(io) self.assertEqual(io.keys, []) # consumed the keypress def test_default_io_non_tty_line_fallback(self): import io as _io with patch("gitea_config_menu.sys.stdin", _io.StringIO("7\nrest\n")): mio = menu.default_io() self.assertFalse(mio.is_tty) self.assertEqual(mio.read_key(), "7") # first char of the line class TestMenuNav(unittest.TestCase): def test_main_enter_quits(self): with tempfile.TemporaryDirectory() as d: io = _FakeIO(keys=[""]) # Enter at main menu rc = menu.main(io=io, config_path=os.path.join(d, "p.json"), secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None) self.assertEqual(rc, 0) self.assertIn("bye", io.text) def test_main_zero_quits(self): with tempfile.TemporaryDirectory() as d: io = _FakeIO(keys=["0"]) self.assertEqual( menu.main(io=io, config_path=os.path.join(d, "p.json"), secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None), 0) def test_submenu_enter_cancels_without_change(self): with tempfile.TemporaryDirectory() as d: path = os.path.join(d, "p.json") gitea_config.save_config(_cfg("keep-me"), path) io = _FakeIO(keys=["4", ""]) # remove → chooser Enter cancels → quit menu.main(io=io, config_path=path, secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None) with open(path, encoding="utf-8") as fh: self.assertIn("keep-me", fh.read()) # not removed class TestMenuAddFlow(unittest.TestCase): def test_add_then_quit_writes_config_without_secrets(self): with tempfile.TemporaryDirectory() as d: path = os.path.join(d, "profiles.json") io = _FakeIO( keys=["2", ""], # add profile, then Enter → quit lines=["prgs-reviewer", "https://gitea.prgs.cc", "jcwalker3", "Org", "Repo", "personal-prgs", "keychain", "prgs-reviewer-token", "y"], ) rc = menu.main(io=io, config_path=path, secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None) self.assertEqual(rc, 0) with open(path, encoding="utf-8") as fh: body = fh.read() self.assertIn("prgs-reviewer", body) self.assertIn("prgs-reviewer-token", body) # keychain id (non-secret) self.assertNotIn(SECRET, body) # token value never on disk self.assertNotIn(SECRET, io.text) # nor printed def test_prompt_fields_keychain_no_token_leak(self): stored = {} io = _FakeIO(lines=["https://gitea.prgs.cc", "jcwalker3", "Org", "Repo", "personal-prgs", "keychain", "prgs-reviewer-token", "y"]) profile = menu._prompt_profile_fields( io, secret_fn=lambda _p: SECRET, keychain_set=lambda i, t: stored.update({i: t}), name="prgs-reviewer") self.assertEqual(profile["auth"], {"type": "keychain", "id": "prgs-reviewer-token"}) self.assertNotIn(SECRET, json.dumps(profile)) self.assertNotIn(SECRET, io.text) self.assertEqual(stored, {"prgs-reviewer-token": SECRET}) def test_prompt_fields_env_reference(self): io = _FakeIO(lines=["https://gitea.prgs.cc", "jcwalker3", "", "", "personal-prgs", "env", ""]) profile = menu._prompt_profile_fields( io, secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None, name="prgs-author") self.assertEqual(profile["auth"], {"type": "env", "name": "GITEA_TOKEN_PRGS_AUTHOR"}) def test_prompt_fields_blank_base_url_cancels(self): io = _FakeIO(lines=[""]) # blank Base URL → cancel self.assertIsNone(menu._prompt_profile_fields( io, secret_fn=lambda _p: SECRET, keychain_set=lambda *a: None, name="x")) if __name__ == "__main__": unittest.main()