diff --git a/README.md b/README.md index b32f68c..8db69e4 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n | `gitea_view_issue` | Get full details of a single issue | | `gitea_whoami` | Read-only: identify the authenticated Gitea account (safe metadata only) | | `gitea_get_profile` | Read-only: describe the active runtime execution profile (safe metadata only) | +| `gitea_check_pr_eligibility` | Read-only: check if the current identity/profile may review/approve/request_changes/merge a PR | | `gitea_mark_issue` | Claim/release an issue (start/done) | | `gitea_list_labels` | List all available labels in a repository | | `gitea_create_label` | Create a new label with custom color | diff --git a/mcp_server.py b/mcp_server.py index 44b736a..9b6ade1 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -210,6 +210,149 @@ def gitea_view_pr( } +# Actions whose eligibility this tool can evaluate. +_ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge") + + +@mcp.tool() +def gitea_check_pr_eligibility( + pr_number: int, + action: str, + remote: str = "dadeschools", + host: str | None = None, + org: str | None = None, + repo: str | None = None, +) -> dict: + """Read-only: is the current identity/profile eligible to perform *action* + on a PR? + + Evaluates eligibility only — it NEVER reviews, approves, requests changes, + merges, or mutates anything. It inspects the authenticated identity + (via the /user endpoint), the active runtime profile metadata + (``get_profile``), and the target PR (author, state, head SHA, + mergeability), then returns a decision with clear reasons. + + Fail-closed rules: + - Unknown action or unknown remote → not eligible. + - Profile has no configured allowed operations, or the action is not in + the profile's allowed operations (or is forbidden) → not eligible. + - Authenticated identity cannot be determined → not eligible. + - Authenticated user equals the PR author → not eligible to ``approve`` or + ``merge``. + - PR is not open → not eligible. + - For ``merge``, PR must be reported mergeable. + + Never returns the token, Authorization header, or any credential material. + + Args: + pr_number: Target PR number. + action: One of 'review', 'approve', 'request_changes', 'merge'. + remote: Known instance — 'dadeschools' or 'prgs'. + host: Override the Gitea host. + org: Override the owner/organization. + repo: Override the repository name. + + Returns: + dict with 'eligible' (bool), the inputs inspected, and 'reasons'. + """ + action = (action or "").strip().lower() + profile = get_profile() + result = { + "eligible": False, + "requested_action": action, + "authenticated_user": None, + "profile_name": profile["profile_name"], + "allowed_operations": profile["allowed_operations"], + "pr_author": None, + "pr_number": pr_number, + "pr_state": None, + "head_sha": None, + "mergeable": None, + "remote": remote if remote in REMOTES else None, + "reasons": [], + } + reasons = result["reasons"] + + if action not in _ELIGIBILITY_ACTIONS: + reasons.append( + f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}" + ) + return result + + if remote not in REMOTES: + reasons.append(f"unknown remote '{remote}'") + return result + + # Profile capability check (metadata only; not enforcement of the action). + allowed = profile["allowed_operations"] + forbidden = profile["forbidden_operations"] + if not allowed: + reasons.append("profile has no configured allowed operations (fail closed)") + if action in forbidden: + reasons.append(f"profile forbids '{action}'") + elif action not in allowed: + reasons.append(f"profile is not allowed to {action}") + + h, o, r = _resolve(remote, host, org, repo) + + # Authenticated identity (read-only). Fail soft; never leak error/secret. + try: + auth = _auth(h) + except Exception: + auth = None + auth_user = None + if auth: + try: + who = api_request("GET", f"https://{h}/api/v1/user", auth) + auth_user = (who or {}).get("login") + except Exception: + auth_user = None + result["authenticated_user"] = auth_user + if not auth_user: + reasons.append("authenticated identity could not be determined") + + # PR facts (read-only GET; no mutation). + pr_author = None + pr_state = None + if auth: + try: + pr = api_request( + "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth + ) + pr_author = (pr or {}).get("user", {}).get("login") + pr_state = (pr or {}).get("state") + result["head_sha"] = ((pr or {}).get("head") or {}).get("sha") + result["mergeable"] = (pr or {}).get("mergeable") + except Exception: + reasons.append("PR details could not be retrieved") + else: + reasons.append("PR details could not be retrieved (no credentials)") + result["pr_author"] = pr_author + result["pr_state"] = pr_state + + # PR must be open to act on. + if pr_state is None: + reasons.append("PR state unknown") + elif pr_state != "open": + reasons.append(f"PR is not open (state={pr_state})") + + # Self-author must not approve or merge their own PR. + if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"): + reasons.append("authenticated user is PR author") + + # Merge needs a positive mergeability signal. + if action == "merge": + if result["mergeable"] is False: + reasons.append("PR is not mergeable") + elif result["mergeable"] is None: + reasons.append("PR mergeability unknown") + + result["eligible"] = len(reasons) == 0 + if result["eligible"]: + reasons.append("all eligibility checks passed") + return result + + @mcp.tool() def gitea_edit_pr( pr_number: int, diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 86ead57..ee44c0f 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -28,6 +28,7 @@ from mcp_server import ( # noqa: E402 gitea_commit_files, gitea_whoami, gitea_get_profile, + gitea_check_pr_eligibility, ) from gitea_auth import get_profile # noqa: E402 @@ -656,5 +657,106 @@ class TestProfileDiscovery(unittest.TestCase): self.assertIn("remote_error", result) +# --------------------------------------------------------------------------- +# PR eligibility checks (read-only) — issue #14 +# --------------------------------------------------------------------------- +class TestPrEligibility(unittest.TestCase): + + def _pr(self, author, state="open", sha="abc123", mergeable=True): + return { + "user": {"login": author}, + "state": state, + "head": {"sha": sha}, + "mergeable": mergeable, + } + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_reviewer_eligible_to_review(self, _auth, mock_api): + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs") + self.assertTrue(r["eligible"]) + self.assertEqual(r["authenticated_user"], "reviewer-bot") + self.assertEqual(r["pr_author"], "author-bot") + self.assertEqual(r["head_sha"], "abc123") + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_self_author_cannot_merge(self, _auth, mock_api): + mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs") + self.assertFalse(r["eligible"]) + self.assertIn("authenticated user is PR author", r["reasons"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_profile_not_allowed_to_merge(self, _auth, mock_api): + mock_api.side_effect = [{"login": "author-bot"}, self._pr("someone-else")] + env = {"GITEA_PROFILE_NAME": "gitea-author", + "GITEA_ALLOWED_OPERATIONS": "read,pr.create"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs") + self.assertFalse(r["eligible"]) + self.assertIn("profile is not allowed to merge", r["reasons"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_forbidden_operation_blocks(self, _auth, mock_api): + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review", + "GITEA_FORBIDDEN_OPERATIONS": "merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs") + self.assertFalse(r["eligible"]) + self.assertIn("profile forbids 'merge'", r["reasons"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_closed_pr_not_eligible(self, _auth, mock_api): + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot", state="closed")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs") + self.assertFalse(r["eligible"]) + self.assertIn("PR is not open (state=closed)", r["reasons"]) + + @patch("mcp_server.get_auth_header", return_value=None) + def test_unknown_identity_fails_closed(self, _auth): + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs") + self.assertFalse(r["eligible"]) + self.assertIn("authenticated identity could not be determined", r["reasons"]) + self.assertIsNone(r["authenticated_user"]) + + def test_unknown_action_rejected(self): + with patch.dict(os.environ, {}, clear=True): + r = gitea_check_pr_eligibility(pr_number=8, action="delete", remote="prgs") + self.assertFalse(r["eligible"]) + self.assertTrue(any("unknown action" in x for x in r["reasons"])) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_never_exposes_secrets(self, _auth, mock_api): + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review", + "GITEA_TOKEN": "super-secret-token"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs") + blob = repr(r).lower() + for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()): + self.assertNotIn(secret, blob) + + if __name__ == "__main__": unittest.main()