Merge pull request 'feat: add read-only gitea_check_pr_eligibility (#14)' (#24) from feature/14-gitea-pr-eligibility-checks into master
This commit was merged in pull request #24.
This commit is contained in:
@@ -52,6 +52,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n
|
||||
| `gitea_view_issue` | Get full details of a single issue |
|
||||
| `gitea_whoami` | Read-only: identify the authenticated Gitea account (safe metadata only) |
|
||||
| `gitea_get_profile` | Read-only: describe the active runtime execution profile (safe metadata only) |
|
||||
| `gitea_check_pr_eligibility` | Read-only: check if the current identity/profile may review/approve/request_changes/merge a PR |
|
||||
| `gitea_mark_issue` | Claim/release an issue (start/done) |
|
||||
| `gitea_list_labels` | List all available labels in a repository |
|
||||
| `gitea_create_label` | Create a new label with custom color |
|
||||
|
||||
+143
@@ -210,6 +210,149 @@ def gitea_view_pr(
|
||||
}
|
||||
|
||||
|
||||
# Actions whose eligibility this tool can evaluate.
|
||||
_ELIGIBILITY_ACTIONS = ("review", "approve", "request_changes", "merge")
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def gitea_check_pr_eligibility(
|
||||
pr_number: int,
|
||||
action: str,
|
||||
remote: str = "dadeschools",
|
||||
host: str | None = None,
|
||||
org: str | None = None,
|
||||
repo: str | None = None,
|
||||
) -> dict:
|
||||
"""Read-only: is the current identity/profile eligible to perform *action*
|
||||
on a PR?
|
||||
|
||||
Evaluates eligibility only — it NEVER reviews, approves, requests changes,
|
||||
merges, or mutates anything. It inspects the authenticated identity
|
||||
(via the /user endpoint), the active runtime profile metadata
|
||||
(``get_profile``), and the target PR (author, state, head SHA,
|
||||
mergeability), then returns a decision with clear reasons.
|
||||
|
||||
Fail-closed rules:
|
||||
- Unknown action or unknown remote → not eligible.
|
||||
- Profile has no configured allowed operations, or the action is not in
|
||||
the profile's allowed operations (or is forbidden) → not eligible.
|
||||
- Authenticated identity cannot be determined → not eligible.
|
||||
- Authenticated user equals the PR author → not eligible to ``approve`` or
|
||||
``merge``.
|
||||
- PR is not open → not eligible.
|
||||
- For ``merge``, PR must be reported mergeable.
|
||||
|
||||
Never returns the token, Authorization header, or any credential material.
|
||||
|
||||
Args:
|
||||
pr_number: Target PR number.
|
||||
action: One of 'review', 'approve', 'request_changes', 'merge'.
|
||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
||||
host: Override the Gitea host.
|
||||
org: Override the owner/organization.
|
||||
repo: Override the repository name.
|
||||
|
||||
Returns:
|
||||
dict with 'eligible' (bool), the inputs inspected, and 'reasons'.
|
||||
"""
|
||||
action = (action or "").strip().lower()
|
||||
profile = get_profile()
|
||||
result = {
|
||||
"eligible": False,
|
||||
"requested_action": action,
|
||||
"authenticated_user": None,
|
||||
"profile_name": profile["profile_name"],
|
||||
"allowed_operations": profile["allowed_operations"],
|
||||
"pr_author": None,
|
||||
"pr_number": pr_number,
|
||||
"pr_state": None,
|
||||
"head_sha": None,
|
||||
"mergeable": None,
|
||||
"remote": remote if remote in REMOTES else None,
|
||||
"reasons": [],
|
||||
}
|
||||
reasons = result["reasons"]
|
||||
|
||||
if action not in _ELIGIBILITY_ACTIONS:
|
||||
reasons.append(
|
||||
f"unknown action '{action}'; expected one of {list(_ELIGIBILITY_ACTIONS)}"
|
||||
)
|
||||
return result
|
||||
|
||||
if remote not in REMOTES:
|
||||
reasons.append(f"unknown remote '{remote}'")
|
||||
return result
|
||||
|
||||
# Profile capability check (metadata only; not enforcement of the action).
|
||||
allowed = profile["allowed_operations"]
|
||||
forbidden = profile["forbidden_operations"]
|
||||
if not allowed:
|
||||
reasons.append("profile has no configured allowed operations (fail closed)")
|
||||
if action in forbidden:
|
||||
reasons.append(f"profile forbids '{action}'")
|
||||
elif action not in allowed:
|
||||
reasons.append(f"profile is not allowed to {action}")
|
||||
|
||||
h, o, r = _resolve(remote, host, org, repo)
|
||||
|
||||
# Authenticated identity (read-only). Fail soft; never leak error/secret.
|
||||
try:
|
||||
auth = _auth(h)
|
||||
except Exception:
|
||||
auth = None
|
||||
auth_user = None
|
||||
if auth:
|
||||
try:
|
||||
who = api_request("GET", f"https://{h}/api/v1/user", auth)
|
||||
auth_user = (who or {}).get("login")
|
||||
except Exception:
|
||||
auth_user = None
|
||||
result["authenticated_user"] = auth_user
|
||||
if not auth_user:
|
||||
reasons.append("authenticated identity could not be determined")
|
||||
|
||||
# PR facts (read-only GET; no mutation).
|
||||
pr_author = None
|
||||
pr_state = None
|
||||
if auth:
|
||||
try:
|
||||
pr = api_request(
|
||||
"GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth
|
||||
)
|
||||
pr_author = (pr or {}).get("user", {}).get("login")
|
||||
pr_state = (pr or {}).get("state")
|
||||
result["head_sha"] = ((pr or {}).get("head") or {}).get("sha")
|
||||
result["mergeable"] = (pr or {}).get("mergeable")
|
||||
except Exception:
|
||||
reasons.append("PR details could not be retrieved")
|
||||
else:
|
||||
reasons.append("PR details could not be retrieved (no credentials)")
|
||||
result["pr_author"] = pr_author
|
||||
result["pr_state"] = pr_state
|
||||
|
||||
# PR must be open to act on.
|
||||
if pr_state is None:
|
||||
reasons.append("PR state unknown")
|
||||
elif pr_state != "open":
|
||||
reasons.append(f"PR is not open (state={pr_state})")
|
||||
|
||||
# Self-author must not approve or merge their own PR.
|
||||
if auth_user and pr_author and auth_user == pr_author and action in ("approve", "merge"):
|
||||
reasons.append("authenticated user is PR author")
|
||||
|
||||
# Merge needs a positive mergeability signal.
|
||||
if action == "merge":
|
||||
if result["mergeable"] is False:
|
||||
reasons.append("PR is not mergeable")
|
||||
elif result["mergeable"] is None:
|
||||
reasons.append("PR mergeability unknown")
|
||||
|
||||
result["eligible"] = len(reasons) == 0
|
||||
if result["eligible"]:
|
||||
reasons.append("all eligibility checks passed")
|
||||
return result
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def gitea_edit_pr(
|
||||
pr_number: int,
|
||||
|
||||
@@ -28,6 +28,7 @@ from mcp_server import ( # noqa: E402
|
||||
gitea_commit_files,
|
||||
gitea_whoami,
|
||||
gitea_get_profile,
|
||||
gitea_check_pr_eligibility,
|
||||
)
|
||||
from gitea_auth import get_profile # noqa: E402
|
||||
|
||||
@@ -656,5 +657,133 @@ class TestProfileDiscovery(unittest.TestCase):
|
||||
self.assertIn("remote_error", result)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PR eligibility checks (read-only) — issue #14
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestPrEligibility(unittest.TestCase):
|
||||
|
||||
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
||||
return {
|
||||
"user": {"login": author},
|
||||
"state": state,
|
||||
"head": {"sha": sha},
|
||||
"mergeable": mergeable,
|
||||
}
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_reviewer_eligible_to_review(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs")
|
||||
self.assertTrue(r["eligible"])
|
||||
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
||||
self.assertEqual(r["pr_author"], "author-bot")
|
||||
self.assertEqual(r["head_sha"], "abc123")
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_self_author_cannot_merge(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("authenticated user is PR author", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_self_author_cannot_approve(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("authenticated user is PR author", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_merge_fails_closed_when_mergeability_unknown(self, _auth, mock_api):
|
||||
# Gitea reports mergeable as None/null (not yet computed).
|
||||
mock_api.side_effect = [
|
||||
{"login": "merger-bot"},
|
||||
self._pr("author-bot", mergeable=None),
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIsNone(r["mergeable"])
|
||||
self.assertIn("PR mergeability unknown", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_profile_not_allowed_to_merge(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "author-bot"}, self._pr("someone-else")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("profile is not allowed to merge", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_forbidden_operation_blocks(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("profile forbids 'merge'", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_closed_pr_not_eligible(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot", state="closed")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("PR is not open (state=closed)", r["reasons"])
|
||||
|
||||
@patch("mcp_server.get_auth_header", return_value=None)
|
||||
def test_unknown_identity_fails_closed(self, _auth):
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("authenticated identity could not be determined", r["reasons"])
|
||||
self.assertIsNone(r["authenticated_user"])
|
||||
|
||||
def test_unknown_action_rejected(self):
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=8, action="delete", remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertTrue(any("unknown action" in x for x in r["reasons"]))
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_never_exposes_secrets(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review",
|
||||
"GITEA_TOKEN": "super-secret-token"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs")
|
||||
blob = repr(r).lower()
|
||||
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
||||
self.assertNotIn(secret, blob)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user