diff --git a/docs/gitea-execution-profiles.md b/docs/gitea-execution-profiles.md index 19a6c7b..42b671e 100644 --- a/docs/gitea-execution-profiles.md +++ b/docs/gitea-execution-profiles.md @@ -252,6 +252,26 @@ the "one server per trust boundary" model described in [`tool-boundaries.md`](tool-boundaries.md) and [`credential-isolation.md`](credential-isolation.md). +## Profile Activation and Runtime Identity Clarity (#131) + +To make Gitea MCP profile activation and runtime identity state explicit, the following mechanisms are supported: + +### 1. Static-Profile vs. Dynamic-Profile Mode +- **Static-Profile Mode (Default):** The active profile is fixed at server launch based on the `GITEA_MCP_PROFILE` environment variable (with `GITEA_MCP_CONFIG` pointing to the config path). Local environment variables are static once a subprocess is spawned by the host. Modifying the environment variables on the host does not dynamically update an already-connected MCP server process. +- **Dynamic-Profile Mode:** Profile switching via the `gitea_activate_profile` tool is supported **only** if the configuration JSON explicitly opts in by setting `"allow_runtime_switching": true` under rules or top-level keys. Otherwise, attempting to switch profiles dynamically will fail closed. + +### 2. Dual MCP Namespaces Recommendation +For security-sensitive or high-risk tasks, the preferred safety model uses separate, isolated MCP server instances (namespaces/sessions) launched with static profiles: +- `gitea-author`: Exposes tools configured with author permissions; cannot perform approvals or merges. +- `gitea-reviewer`: Exposes tools configured with reviewer permissions; used for PR reviews and merges. +This layout maintains physical separation of credentials and prevents privilege escalation within a single session. + +### 3. Verification Post-Switching +When dynamic profile switching is enabled and a profile is activated via `gitea_activate_profile`, the session MUST immediately: +1. Clear the cached identity. +2. Call `gitea_whoami` with the target remote to prove and verify the fresh Gitea authenticated identity. +This guarantees the active profile operations align with the actual Gitea authenticated user credential. + ## Relationship to roadmap issues This document defines the **model only**. Related work is tracked separately diff --git a/gitea_config.py b/gitea_config.py index b029478..38f7b33 100644 --- a/gitea_config.py +++ b/gitea_config.py @@ -192,11 +192,32 @@ def config_path(): return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None +_active_profile_override = None + + def selected_profile_name(): """Return the selected profile name from the environment, or None.""" + if _active_profile_override is not None: + return _active_profile_override return (os.environ.get(ENV_PROFILE) or "").strip() or None +def is_runtime_switching_enabled(path=None): + """Check if runtime profile switching is explicitly enabled in config.""" + try: + config = load_config(path) + except Exception: + return False + if not config: + return False + rules = config.get("rules") or {} + if rules.get("allow_runtime_switching") is True: + return True + if config.get("allow_runtime_switching") is True: + return True + return False + + def load_config(path=None): """Load and minimally validate the canonical JSON config. diff --git a/mcp_server.py b/mcp_server.py index 45ba28b..b480686 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -597,6 +597,83 @@ def gitea_check_pr_eligibility( result["eligible"] = len(reasons) == 0 if result["eligible"]: reasons.append("all eligibility checks passed") + + # Add enhanced error clarity details (#131) + auth_user = result["authenticated_user"] + pr_author = result["pr_author"] + profile_name = result["profile_name"] + is_self = bool(auth_user and pr_author and auth_user == pr_author) + + result["active_identity"] = auth_user + result["active_profile"] = profile_name + result["self_author"] = is_self + + # Determine missing permission + missing_perm = None + if not op_ok: + missing_perm = f"gitea.pr.{action}" if action in ("approve", "request_changes", "merge") else f"gitea.{action}" + + result["missing_permission"] = missing_perm + + # Determine required profile + req_profile = None + if action in ("approve", "request_changes", "review"): + req_profile = "A profile with reviewer role permissions (allowing approve/merge/review, and forbidding author operations)" + elif action == "merge": + req_profile = "A profile with reviewer role permissions and explicit merge permission" + result["required_profile"] = req_profile + + # Determine required identity + req_identity = None + if is_self: + req_identity = f"Any Gitea user other than PR author '{pr_author}'" + result["required_identity"] = req_identity + + # Determine if fixable by profile switch vs requires different namespace + switching_supported = gitea_config.is_runtime_switching_enabled() + fixable_by_switch = False + req_different_ns = False + + if not op_ok: + config = gitea_config.load_config() + has_capable_profile = False + if config: + for name, p in (config.get("profiles") or {}).items(): + p_allowed = p.get("allowed_operations", []) + p_forbidden = p.get("forbidden_operations", []) + ok, _ = gitea_config.check_operation(action, p_allowed, p_forbidden) + if ok: + has_capable_profile = True + break + if switching_supported and has_capable_profile: + fixable_by_switch = True + else: + req_different_ns = True + + result["fixable_by_profile_switch"] = fixable_by_switch + result["requires_different_namespace"] = req_different_ns + + # Determine safe next step + safe_step = "Ready." + if not result["eligible"]: + if is_self: + safe_step = "Self-review/self-merge is forbidden. Ask a different operator/reviewer to review and merge this PR." + elif not auth_user: + safe_step = f"Ask the operator to configure valid credentials/token for profile '{profile_name}'." + elif not op_ok: + if fixable_by_switch: + safe_step = f"Switch to a reviewer profile by calling gitea_activate_profile with a profile that allows {action}." + else: + safe_step = "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile." + elif result["pr_state"] != "open": + safe_step = "No action can be taken on a closed/merged PR." + elif action == "merge": + if result["mergeable"] is False: + safe_step = "Address conflicts in the PR branches first, or wait for CI checks to pass before merging." + elif result["mergeable"] is None: + safe_step = "Wait for Gitea to finish calculating mergeability or trigger a re-check." + + result["safe_next_step"] = safe_step return result @@ -1574,13 +1651,11 @@ _GUIDE_RULES = { "(gitea.issue.comment) are gated separately from PR reviews " "(gitea.pr.*)."), "profile_switching": [ - "The active profile comes from GITEA_MCP_PROFILE (with " - "GITEA_MCP_CONFIG pointing at the profiles config).", - "Switching profiles requires changing the launcher environment and " - "reconnecting the MCP server (operator action, e.g. /mcp); the " - "server process cannot switch identities in place.", - "After any switch, verify with gitea_whoami before doing anything " - "else.", + "The active profile is selected via GITEA_MCP_PROFILE (with GITEA_MCP_CONFIG pointing at the config path).", + "By default, static-profile mode is active: changing local environment variables does not dynamically update the already-connected MCP server process.", + "If runtime profile switching is explicitly enabled in the config (allow_runtime_switching: true), use gitea_activate_profile to switch profiles in-place.", + "If switching is disabled, you must change the launcher configuration/environment (dual-namespace) and reconnect/restart the MCP session.", + "After any profile switch or startup, you must verify the new runtime identity with a fresh gitea_whoami call before taking action." ], "identity_verification": [ "Call gitea_whoami with an explicit remote first; confirm the " @@ -1703,10 +1778,12 @@ _PROJECT_SKILLS = { "required_operations": [], "status": "available", "steps": [ - "Ask the operator to update GITEA_MCP_PROFILE in the launcher " - "environment.", - "Operator reconnects the MCP server (e.g. /mcp).", - "Verify the new identity with gitea_whoami before acting.", + "Check profile mode using gitea_get_runtime_context.", + "If dynamic-profile mode is enabled (allow_runtime_switching: true), " + "call gitea_activate_profile to switch in-place.", + "If static-profile mode (default), ask the operator to update GITEA_MCP_PROFILE " + "in the launcher environment and reconnect/restart the MCP server.", + "Immediately verify the new identity with a fresh gitea_whoami call before taking action.", "If identity does not match the expected role, STOP.", ], }, @@ -2115,6 +2192,306 @@ def gitea_get_profile( return result +@mcp.tool() +def gitea_get_runtime_context( + remote: str = "dadeschools", + host: str | None = None, +) -> dict: + """Read-only: explicit visibility into active profile, configuration model, and eligibility. + + Reports config model shape, profile mode (static vs dynamic), and detailed blocks. + + Args: + remote: Known instance — 'dadeschools' or 'prgs'. + host: Override the Gitea host. + """ + profile = get_profile() + config = gitea_config.load_config() + reveal = _reveal_endpoints() + + # Determine config model/version + if config is None: + config_model = "env-only" + elif config.get("version") == 1: + config_model = "v1" + elif config.get("version") == 2: + if "contexts" in config or config.get("shape") == "contexts": + config_model = "v2-contexts" + else: + config_model = "v2-environments" + else: + config_model = f"unknown-version-{config.get('version')}" + + # Determine profile source + if os.environ.get("GITEA_PROFILE_NAME"): + profile_source = "env var" + elif gitea_config.selected_profile_name(): + profile_source = "config file profile" + else: + profile_source = "default" + + h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None) + username = _authenticated_username(h) if h else None + + switching = gitea_config.is_runtime_switching_enabled() + profile_mode = "dynamic-profile" if switching else "static-profile" + + # Evaluate review/merge eligibility + allowed = profile["allowed_operations"] or [] + forbidden = profile["forbidden_operations"] or [] + + # Check approve + approve_ok, _ = gitea_config.check_operation("approve", allowed, forbidden) + # Check merge + merge_ok, _ = gitea_config.check_operation("merge", allowed, forbidden) + + review_merge_allowed = False + blocked_reasons = [] + suggested_fix = "none" + + if not username: + blocked_reasons.append("Authenticated identity is unresolved. Gitea credentials are missing or invalid.") + suggested_fix = "operator action" + elif not (approve_ok or merge_ok): + blocked_reasons.append( + f"Active profile '{profile['profile_name']}' does not permit review or merge operations." + ) + if switching: + suggested_fix = "profile switch" + else: + suggested_fix = "reviewer namespace" + else: + review_merge_allowed = True + + # Note that self-review and self-merge are blocked regardless of profile roles + blocked_reasons.append( + "Note: self-review and self-merge are always blocked at runtime for any PR authored by the active user." + ) + + safe_next_action = "None; ready for operations." + if suggested_fix == "operator action": + safe_next_action = f"Ask the operator to configure valid credentials/token for profile '{profile['profile_name']}'." + elif suggested_fix == "profile switch": + safe_next_action = "Switch to a reviewer profile by calling gitea_activate_profile with a reviewer profile." + elif suggested_fix == "reviewer namespace": + safe_next_action = ( + "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, " + "or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile." + ) + + result = { + "active_profile": profile["profile_name"], + "authenticated_username": username, + "remote": remote if remote in REMOTES else None, + "config_model": config_model, + "profile_source": profile_source, + "allowed_operations": allowed, + "forbidden_operations": forbidden, + "runtime_switching_supported": switching, + "profile_mode": profile_mode, + "review_merge_allowed": review_merge_allowed, + "review_merge_blocked_reasons": blocked_reasons, + "suggested_fix": suggested_fix, + "safe_next_action": safe_next_action, + } + + if reveal and h: + result["server"] = f"https://{h}" + + return result + + +@mcp.tool() +def gitea_list_profiles() -> dict: + """Read-only: list all Gitea MCP profiles with redacted metadata. + + Exposes names, role kinds, enabled state, allowed/forbidden operations, and active status. + Token values, keychain IDs, and raw URLs are hidden unless GITEA_MCP_REVEAL_ENDPOINTS=1 is enabled. + """ + config = gitea_config.load_config() + reveal = _reveal_endpoints() + active_name = gitea_config.selected_profile_name() or get_profile()["profile_name"] + + profiles_out = [] + + if config is None: + # Env-only: return the active profile + p = get_profile() + role = _role_kind(p["allowed_operations"], p["forbidden_operations"]) + h = REMOTES.get("dadeschools", {}).get("host") # default host + username = _authenticated_username(h) if h else None + + prof = { + "name": p["profile_name"], + "role_kind": role, + "allowed_operations": p["allowed_operations"], + "forbidden_operations": p["forbidden_operations"], + "identity_status": "verified" if username else "unresolved", + "is_active": True, + "enabled": True, + } + if reveal: + prof["token_source_name"] = p["token_source_name"] + prof["base_url"] = p["base_url"] + profiles_out.append(prof) + else: + # Load from config profiles + profiles_dict = config.get("profiles") or {} + unavailable_dict = config.get("unavailable") or {} + audit_only_profiles = config.get("audit_only_profiles") or {} + + # First, process available profiles + for name, p in profiles_dict.items(): + role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", [])) + is_active = (name == active_name) + + # Identity status lookup + if is_active: + h = REMOTES.get("dadeschools", {}).get("host") # default host + username = _authenticated_username(h) if h else None + identity_status = "verified" if username else "unresolved" + else: + # Safely resolve if credentials are present without networking + try: + tok = gitea_config.resolve_token(p) + identity_status = "credentials present" if tok else "missing credentials" + except Exception: + identity_status = "missing credentials" + + prof = { + "name": name, + "role_kind": role, + "allowed_operations": p.get("allowed_operations", []), + "forbidden_operations": p.get("forbidden_operations", []), + "identity_status": identity_status, + "is_active": is_active, + "enabled": p.get("enabled", True), + } + if reveal: + if isinstance(p.get("auth"), dict): + prof["auth"] = p["auth"] + prof["base_url"] = p.get("base_url") + else: + if isinstance(p.get("auth"), dict): + prof["auth"] = {k: ("" if k != "type" else v) for k, v in p["auth"].items()} + prof["base_url"] = "" + profiles_out.append(prof) + + # Process unavailable/disabled/audit-only profiles + for name, p in audit_only_profiles.items(): + role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", [])) + is_active = (name == active_name) + prof = { + "name": name, + "role_kind": role, + "allowed_operations": p.get("allowed_operations", []), + "forbidden_operations": p.get("forbidden_operations", []), + "identity_status": "unavailable (disabled)", + "is_active": is_active, + "enabled": p.get("enabled", True), + "unavailable_reason": p.get("_unavailable_reason"), + } + if reveal: + if isinstance(p.get("auth"), dict): + prof["auth"] = p["auth"] + prof["base_url"] = p.get("base_url") + else: + if isinstance(p.get("auth"), dict): + prof["auth"] = {k: ("" if k != "type" else v) for k, v in p["auth"].items()} + prof["base_url"] = "" + profiles_out.append(prof) + + # Handle unavailable mappings that are not in audit_only + for name, reason in unavailable_dict.items(): + if not any(x["name"] == name for x in profiles_out): + profiles_out.append({ + "name": name, + "role_kind": "limited", + "allowed_operations": [], + "forbidden_operations": [], + "identity_status": "unavailable", + "is_active": (name == active_name), + "enabled": False, + "unavailable_reason": reason, + }) + + return {"profiles": profiles_out} + + +@mcp.tool() +def gitea_activate_profile( + profile_name: str, + remote: str = "dadeschools", + host: str | None = None, +) -> dict: + """Gated profile activation. Switch the active profile in-place for this runtime session. + + Only allowed if "allow_runtime_switching": true is explicitly configured. + + Args: + profile_name: Profile to activate. + remote: Known instance — 'dadeschools' or 'prgs'. + host: Override the Gitea host. + """ + if not gitea_config.is_runtime_switching_enabled(): + return { + "success": False, + "message": ( + "Runtime profile switching is disabled in this configuration. Static profile mode is active. " + "To enable switching, set 'allow_runtime_switching': true in the config rules." + ) + } + + config = gitea_config.load_config() + if not config: + return { + "success": False, + "message": "No profiles configuration file is loaded. Switching is not supported in env-only mode." + } + + profiles_dict = config.get("profiles") or {} + if profile_name not in profiles_dict: + return { + "success": False, + "message": f"Profile '{profile_name}' not found in loaded config." + } + + h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None) + + # 1. Record before state + before_profile = get_profile()["profile_name"] + before_identity = _authenticated_username(h) if h else None + + # 2. Perform switch + gitea_config._active_profile_override = profile_name + + # 3. Clear identity cache to force a fresh verification + if h: + _IDENTITY_CACHE.pop(h, None) + + # 4. Resolve fresh identity + after_profile = get_profile()["profile_name"] + after_identity = _authenticated_username(h) if h else None + + # 5. Audit the switch if auditing is on + _audit( + "activate_profile", + host=h, + remote=remote, + result={"success": True, "before": before_profile, "after": after_profile}, + username=after_identity, + ) + + return { + "success": True, + "message": f"Successfully activated profile '{profile_name}' (fresh identity verification complete).", + "before_profile": before_profile, + "before_identity": before_identity, + "after_profile": after_profile, + "after_identity": after_identity, + } + + @mcp.tool() def gitea_audit_config() -> dict: """Audit the configured profiles/services: enabled state, no secrets. diff --git a/tests/test_runtime_clarity.py b/tests/test_runtime_clarity.py new file mode 100644 index 0000000..0f78259 --- /dev/null +++ b/tests/test_runtime_clarity.py @@ -0,0 +1,256 @@ +"""Tests for runtime context, profile activation, profile listing, and enhanced error clarity. + +Covers Issue #131 requirements. +""" +import os +import sys +import json +import tempfile +import unittest +from unittest.mock import patch, MagicMock + +sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) + +import gitea_config +import gitea_auth +import mcp_server + +CONFIG_SWITCHING_DISABLED = { + "version": 2, + "contexts": { + "ctx": { + "enabled": True, + "gitea": { + "enabled": True, + "base_url": "https://gitea.example.com" + } + } + }, + "profiles": { + "author-profile": { + "enabled": True, + "context": "ctx", + "role": "author", + "username": "author-user", + "auth": {"type": "env", "name": "GITEA_TOKEN_AUTHOR"}, + "allowed_operations": ["gitea.read", "gitea.pr.create", "gitea.branch.push"], + "forbidden_operations": ["gitea.pr.approve", "gitea.pr.merge"], + "execution_profile": "author-profile" + }, + "reviewer-profile": { + "enabled": True, + "context": "ctx", + "role": "reviewer", + "username": "reviewer-user", + "auth": {"type": "env", "name": "GITEA_TOKEN_REVIEWER"}, + "allowed_operations": ["gitea.read", "gitea.pr.approve", "gitea.pr.merge"], + "forbidden_operations": ["gitea.pr.create", "gitea.branch.push"], + "execution_profile": "reviewer-profile" + } + }, + "rules": { + "allow_runtime_switching": False + } +} + +CONFIG_SWITCHING_ENABLED = { + **CONFIG_SWITCHING_DISABLED, + "rules": { + "allow_runtime_switching": True + } +} + +class TestRuntimeClarity(unittest.TestCase): + def setUp(self): + self._remotes_patch = patch.dict(mcp_server.REMOTES, { + "dadeschools": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"}, + "prgs": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"} + }) + self._remotes_patch.start() + mcp_server._IDENTITY_CACHE.clear() + gitea_config._active_profile_override = None + self._dir = tempfile.TemporaryDirectory() + self.config_path = os.path.join(self._dir.name, "profiles.json") + self._write_config(CONFIG_SWITCHING_DISABLED) + + def tearDown(self): + self._remotes_patch.stop() + mcp_server._IDENTITY_CACHE.clear() + gitea_config._active_profile_override = None + self._dir.cleanup() + + def _write_config(self, obj): + with open(self.config_path, "w", encoding="utf-8") as fh: + fh.write(json.dumps(obj)) + + def _env(self, profile="author-profile", reveal="0"): + return { + "GITEA_MCP_CONFIG": self.config_path, + "GITEA_MCP_PROFILE": profile, + "GITEA_MCP_REVEAL_ENDPOINTS": reveal, + "GITEA_TOKEN_AUTHOR": "author-pass", + "GITEA_TOKEN_REVIEWER": "reviewer-pass", + } + + # ------------------------------------------------------------------------- + # gitea_get_runtime_context + # ------------------------------------------------------------------------- + @patch("mcp_server.api_request", return_value={"login": "author-user"}) + @patch("mcp_server.get_auth_header", return_value="token author-pass") + def test_get_runtime_context_author(self, _auth, _api): + with patch.dict(os.environ, self._env("author-profile"), clear=True): + ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools") + self.assertEqual(ctx["active_profile"], "author-profile") + self.assertEqual(ctx["authenticated_username"], "author-user") + self.assertEqual(ctx["config_model"], "v2-contexts") + self.assertEqual(ctx["profile_source"], "config file profile") + self.assertFalse(ctx["runtime_switching_supported"]) + self.assertEqual(ctx["profile_mode"], "static-profile") + self.assertFalse(ctx["review_merge_allowed"]) + self.assertEqual(ctx["suggested_fix"], "reviewer namespace") + self.assertIn("does not permit review or merge", ctx["review_merge_blocked_reasons"][0]) + self.assertIn("Switch to the reviewer MCP session", ctx["safe_next_action"]) + + @patch("mcp_server.api_request", return_value={"login": "reviewer-user"}) + @patch("mcp_server.get_auth_header", return_value="token reviewer-pass") + def test_get_runtime_context_reviewer(self, _auth, _api): + with patch.dict(os.environ, self._env("reviewer-profile"), clear=True): + ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools") + self.assertEqual(ctx["active_profile"], "reviewer-profile") + self.assertEqual(ctx["authenticated_username"], "reviewer-user") + self.assertTrue(ctx["review_merge_allowed"]) + self.assertEqual(ctx["suggested_fix"], "none") + self.assertEqual(ctx["safe_next_action"], "None; ready for operations.") + + # ------------------------------------------------------------------------- + # gitea_list_profiles + # ------------------------------------------------------------------------- + @patch("mcp_server.api_request", return_value={"login": "author-user"}) + @patch("mcp_server.get_auth_header", return_value="token author-pass") + def test_list_profiles_redacted_by_default(self, _auth, _api): + with patch.dict(os.environ, self._env("author-profile", reveal="0"), clear=True): + res = mcp_server.gitea_list_profiles() + profiles = res["profiles"] + self.assertEqual(len(profiles), 2) + + author_prof = next(p for p in profiles if p["name"] == "author-profile") + self.assertTrue(author_prof["is_active"]) + self.assertEqual(author_prof["role_kind"], "author") + self.assertEqual(author_prof["auth"]["name"], "") + self.assertEqual(author_prof["base_url"], "") + self.assertEqual(author_prof["identity_status"], "verified") + + reviewer_prof = next(p for p in profiles if p["name"] == "reviewer-profile") + self.assertFalse(reviewer_prof["is_active"]) + self.assertEqual(reviewer_prof["role_kind"], "reviewer") + self.assertEqual(reviewer_prof["auth"]["name"], "") + self.assertEqual(reviewer_prof["base_url"], "") + self.assertEqual(reviewer_prof["identity_status"], "credentials present") + + @patch("mcp_server.api_request", return_value={"login": "author-user"}) + @patch("mcp_server.get_auth_header", return_value="token author-pass") + def test_list_profiles_revealed_under_opt_in(self, _auth, _api): + with patch.dict(os.environ, self._env("author-profile", reveal="1"), clear=True): + res = mcp_server.gitea_list_profiles() + profiles = res["profiles"] + + author_prof = next(p for p in profiles if p["name"] == "author-profile") + self.assertEqual(author_prof["auth"]["name"], "GITEA_TOKEN_AUTHOR") + self.assertEqual(author_prof["base_url"], "https://gitea.example.com") + + # ------------------------------------------------------------------------- + # gitea_activate_profile + # ------------------------------------------------------------------------- + def test_activate_profile_fails_when_disabled(self): + self._write_config(CONFIG_SWITCHING_DISABLED) + with patch.dict(os.environ, self._env("author-profile"), clear=True): + res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile") + self.assertFalse(res["success"]) + self.assertIn("switching is disabled", res["message"].lower()) + self.assertIsNone(gitea_config._active_profile_override) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header") + def test_activate_profile_succeeds_when_enabled(self, mock_auth, mock_api): + self._write_config(CONFIG_SWITCHING_ENABLED) + + # Setup mock responses for whoami checks + mock_auth.side_effect = ["token author-pass", "token reviewer-pass"] + mock_api.side_effect = [{"login": "author-user"}, {"login": "reviewer-user"}] + + with patch.dict(os.environ, self._env("author-profile"), clear=True): + # Check before state + self.assertEqual(gitea_config.selected_profile_name(), "author-profile") + + res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile") + + self.assertTrue(res["success"]) + self.assertEqual(res["before_profile"], "author-profile") + self.assertEqual(res["before_identity"], "author-user") + self.assertEqual(res["after_profile"], "reviewer-profile") + self.assertEqual(res["after_identity"], "reviewer-user") + + # Global variable override should be set + self.assertEqual(gitea_config._active_profile_override, "reviewer-profile") + self.assertEqual(gitea_config.selected_profile_name(), "reviewer-profile") + + # ------------------------------------------------------------------------- + # gitea_check_pr_eligibility enhanced error clarity + # ------------------------------------------------------------------------- + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value="token reviewer-pass") + def test_eligibility_failure_self_author(self, _auth, mock_api): + # PR is authored by "reviewer-user" and reviewer-user is trying to approve it. + mock_api.side_effect = [ + {"login": "reviewer-user"}, # user whoami lookup + {"user": {"login": "reviewer-user"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details + ] + + with patch.dict(os.environ, self._env("reviewer-profile"), clear=True): + res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve") + + self.assertFalse(res["eligible"]) + self.assertEqual(res["active_identity"], "reviewer-user") + self.assertTrue(res["self_author"]) + self.assertEqual(res["required_identity"], "Any Gitea user other than PR author 'reviewer-user'") + self.assertIn("Self-review/self-merge is forbidden", res["safe_next_step"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value="token author-pass") + def test_eligibility_failure_missing_permissions(self, _auth, mock_api): + # PR is authored by "someone-else" and author-user (who lacks approve) is trying to approve it. + mock_api.side_effect = [ + {"login": "author-user"}, # user whoami lookup + {"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details + ] + + self._write_config(CONFIG_SWITCHING_ENABLED) # Enable switching to verify fixable_by_profile_switch + + with patch.dict(os.environ, self._env("author-profile"), clear=True): + res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve") + + self.assertFalse(res["eligible"]) + self.assertEqual(res["missing_permission"], "gitea.pr.approve") + self.assertTrue(res["fixable_by_profile_switch"]) + self.assertFalse(res["requires_different_namespace"]) + self.assertIn("Switch to a reviewer profile by calling gitea_activate_profile", res["safe_next_step"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value="token author-pass") + def test_eligibility_failure_missing_permissions_switching_disabled(self, _auth, mock_api): + # PR is authored by "someone-else" and author-user (lacks approve) tries to approve it when switching is disabled. + mock_api.side_effect = [ + {"login": "author-user"}, # user whoami lookup + {"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details + ] + + self._write_config(CONFIG_SWITCHING_DISABLED) # Disable switching + + with patch.dict(os.environ, self._env("author-profile"), clear=True): + res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve") + + self.assertFalse(res["eligible"]) + self.assertEqual(res["missing_permission"], "gitea.pr.approve") + self.assertFalse(res["fixable_by_profile_switch"]) + self.assertTrue(res["requires_different_namespace"]) + self.assertIn("Switch to the reviewer MCP session", res["safe_next_step"])