From f05e58c84734c079c909a99dd686c3e5d702c45b Mon Sep 17 00:00:00 2001 From: Jason Walker <913443@dadeschools.net> Date: Wed, 1 Jul 2026 14:31:34 -0400 Subject: [PATCH] feat: add gated gitea_submit_pr_review review actions (#15) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add gitea_submit_pr_review, the only tool that submits a Gitea PR review. It performs a review mutation (comment / approve / request_changes) only after every safety gate passes, and never merges. Gates (fail-closed at each step): 1. Validate action is comment | approve | request_changes. 2. Reuse gitea_check_pr_eligibility (#14) for authenticated-user lookup, active-profile lookup, PR-author lookup, self-approval block, and the profile-allowed-operation check. approve requires 'approve' eligibility, request_changes requires 'request_changes', comment requires 'review'. 3. Redundant self-approval block (auth user == PR author). 4. Optional expected_head_sha: refuse if the PR head has moved. 5. Only then POST /repos/{owner}/{repo}/pulls/{n}/reviews (formal review endpoint, so approvals/change-requests carry real review state). Output reports action, whether performed, authenticated user, profile name, PR author, PR number, head SHA checked, and reasons — never a token, auth header, or credential. Error text is scrubbed via _redact as defence in depth. Merge is intentionally not implemented (belongs to #16). Tests cover: self-author approve blocked, approve/request_changes/comment succeed only when eligible, unknown identity fail-closed, disallowed profile op blocked, head-SHA mismatch blocked, no mutation when gates fail, invalid action rejected, and secret redaction in output and error paths. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 1 + mcp_server.py | 178 +++++++++++++++++++++++++++++++ tests/test_mcp_server.py | 225 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 404 insertions(+) diff --git a/README.md b/README.md index 8db69e4..d8c93e0 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n | `gitea_whoami` | Read-only: identify the authenticated Gitea account (safe metadata only) | | `gitea_get_profile` | Read-only: describe the active runtime execution profile (safe metadata only) | | `gitea_check_pr_eligibility` | Read-only: check if the current identity/profile may review/approve/request_changes/merge a PR | +| `gitea_submit_pr_review` | Gated review mutation: comment/approve/request_changes, only after identity+profile+eligibility gates pass (no merge, no self-approval) | | `gitea_mark_issue` | Claim/release an issue (start/done) | | `gitea_list_labels` | List all available labels in a repository | | `gitea_create_label` | Create a new label with custom color | diff --git a/mcp_server.py b/mcp_server.py index 9b6ade1..17154f2 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -353,6 +353,184 @@ def gitea_check_pr_eligibility( return result +# Review actions this gated tool can perform, mapped to (eligibility action, +# Gitea review *event*). The eligibility action is fed to +# ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same +# identity/profile/author gates. Note: 'merge' is deliberately absent — merge +# belongs to a separate tool/issue and is never performed here. +_REVIEW_ACTIONS = { + # 'comment' posts review findings without an approval/rejection state. + # #14 names this eligibility category 'review'. + "comment": ("review", "COMMENT"), + "approve": ("approve", "APPROVE"), + "request_changes": ("request_changes", "REQUEST_CHANGES"), +} + +# Patterns scrubbed from any surfaced error text so a credential can never leak. +_SECRET_PREFIXES = ("token ", "Basic ") + + +def _redact(text: str) -> str: + """Strip anything that looks like an Authorization credential from *text*. + + Errors raised by ``api_request`` echo the server response body, not the + request headers, so a token should never appear — this is defence in depth + so a future change can't leak ``token …`` / ``Basic …`` material into a + tool result or log line. + """ + if not text: + return text + out = text + for prefix in _SECRET_PREFIXES: + idx = 0 + while True: + i = out.find(prefix, idx) + if i == -1: + break + j = i + len(prefix) + while j < len(out) and not out[j].isspace(): + j += 1 + out = out[:i] + prefix + "[REDACTED]" + out[j:] + idx = i + len(prefix) + len("[REDACTED]") + return out + + +@mcp.tool() +def gitea_submit_pr_review( + pr_number: int, + action: str, + body: str = "", + expected_head_sha: str | None = None, + remote: str = "dadeschools", + host: str | None = None, + org: str | None = None, + repo: str | None = None, +) -> dict: + """Gated PR review mutation: comment findings, request changes, or approve. + + This is the only tool that submits a Gitea PR *review*. It performs a + mutation **only after every safety gate passes**; if any gate fails it + returns ``performed=False`` and never calls the mutating endpoint. + + Gate order (fail-closed at each step): + + 1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'. + 2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated + -user lookup, active-profile lookup, PR-author lookup, self-approval + block, and profile-allowed-operation check. ``approve`` requires + eligibility for 'approve', ``request_changes`` requires + 'request_changes', and ``comment`` requires 'review'. + 3. Redundantly block self-approval (authenticated user == PR author). + 4. If ``expected_head_sha`` is supplied and the PR head has moved, abort. + 5. Only then POST the review. + + Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the + *formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES + review state tied to the head commit), chosen over the plain issue-comment + endpoint (``/issues/{n}/comments``) so that approvals and change requests + carry real review state — a plain comment cannot approve or block a PR. + + Merge is intentionally NOT implemented here. + + Never returns the token, Authorization header, or any credential material. + + Args: + pr_number: Target PR number. + action: 'comment', 'approve', or 'request_changes'. + body: Review body / finding text. + expected_head_sha: Optional. If given and the PR head SHA differs, the + review is refused (guards against reviewing a changed PR). + remote: Known instance — 'dadeschools' or 'prgs'. + host: Override the Gitea host. + org: Override the owner/organization. + repo: Override the repository name. + + Returns: + dict describing the attempt: action, whether it was performed, the + authenticated user, profile name, PR author, PR number, head SHA + checked, and the reasons/gates passed or blocked. Never secrets. + """ + action = (action or "").strip().lower() + result = { + "requested_action": action, + "performed": False, + "authenticated_user": None, + "profile_name": get_profile()["profile_name"], + "pr_author": None, + "pr_number": pr_number, + "head_sha": None, + "expected_head_sha": expected_head_sha, + "remote": remote if remote in REMOTES else None, + "reasons": [], + } + reasons = result["reasons"] + + # Gate 1 — valid review action (no mutation on unknown action). + if action not in _REVIEW_ACTIONS: + reasons.append( + f"unknown review action '{action}'; expected one of " + f"{sorted(_REVIEW_ACTIONS)}" + ) + return result + eligibility_action, event = _REVIEW_ACTIONS[action] + + # Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve + # + profile-allowed). This performs only read-only GETs. + elig = gitea_check_pr_eligibility( + pr_number=pr_number, + action=eligibility_action, + remote=remote, + host=host, + org=org, + repo=repo, + ) + result["authenticated_user"] = elig.get("authenticated_user") + result["profile_name"] = elig.get("profile_name", result["profile_name"]) + result["pr_author"] = elig.get("pr_author") + result["head_sha"] = elig.get("head_sha") + if not elig.get("eligible"): + reasons.append( + f"eligibility check for '{eligibility_action}' failed (fail closed)" + ) + reasons.extend(elig.get("reasons", [])) + return result + + # Gate 3 — redundant self-approval block (belt-and-suspenders over #14). + auth_user = result["authenticated_user"] + pr_author = result["pr_author"] + if action == "approve" and auth_user and pr_author and auth_user == pr_author: + reasons.append("self-approval blocked (authenticated user is PR author)") + return result + + # Gate 4 — head SHA must match if the caller pinned one. + actual_sha = result["head_sha"] + if expected_head_sha and actual_sha and expected_head_sha != actual_sha: + reasons.append( + "expected head SHA does not match current PR head (fail closed)" + ) + return result + if not actual_sha: + # Should be unreachable — eligibility fails closed without a head SHA — + # but never submit a review without a commit to pin it to. + reasons.append("PR head SHA unavailable (fail closed)") + return result + + # All gates passed — perform the single mutating call. + h, o, r = _resolve(remote, host, org, repo) + try: + auth = _auth(h) + review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews" + payload = {"body": body, "event": event, "commit_id": actual_sha} + api_request("POST", review_url, auth, payload) + except Exception as exc: # noqa: BLE001 — redact before surfacing + reasons.append(f"review submission failed: {_redact(str(exc))}") + return result + + result["performed"] = True + reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}") + return result + + @mcp.tool() def gitea_edit_pr( pr_number: int, diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 90b4142..993e3b6 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -29,6 +29,7 @@ from mcp_server import ( # noqa: E402 gitea_whoami, gitea_get_profile, gitea_check_pr_eligibility, + gitea_submit_pr_review, ) from gitea_auth import get_profile # noqa: E402 @@ -785,5 +786,229 @@ class TestPrEligibility(unittest.TestCase): self.assertNotIn(secret, blob) +class TestSubmitPrReview(unittest.TestCase): + """Gated review-mutation tool (#15).""" + + def _pr(self, author, state="open", sha="abc123", mergeable=True): + return { + "user": {"login": author}, + "state": state, + "head": {"sha": sha}, + "mergeable": mergeable, + } + + def _methods(self, mock_api): + return [c.args[0] for c in mock_api.call_args_list] + + def _assert_no_mutation(self, mock_api): + # A review mutation is POST .../reviews; eligibility only ever GETs. + for c in mock_api.call_args_list: + method, url = c.args[0], c.args[1] + self.assertFalse( + method == "POST" and url.endswith("/reviews"), + f"unexpected review mutation: {method} {url}", + ) + + # -- approve -------------------------------------------------------------- + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_approve_blocked_when_author(self, _auth, mock_api): + mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("authenticated user is PR author", r["reasons"]) + self._assert_no_mutation(mock_api) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_approve_succeeds_when_eligible(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="approve", body="LGTM", remote="prgs") + self.assertTrue(r["performed"]) + self.assertEqual(r["authenticated_user"], "reviewer-bot") + self.assertEqual(r["pr_author"], "author-bot") + self.assertEqual(r["head_sha"], "abc123") + method, url = mock_api.call_args.args[0], mock_api.call_args.args[1] + self.assertEqual(method, "POST") + self.assertTrue(url.endswith("/pulls/8/reviews")) + payload = mock_api.call_args.args[3] + self.assertEqual(payload["event"], "APPROVE") + self.assertEqual(payload["commit_id"], "abc123") + + # -- request_changes ------------------------------------------------------ + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_request_changes_succeeds_when_eligible(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 9}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,request_changes"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="request_changes", + body="needs work", remote="prgs") + self.assertTrue(r["performed"]) + self.assertEqual(mock_api.call_args.args[3]["event"], "REQUEST_CHANGES") + + def test_request_changes_blocked_without_eligibility(self): + with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) as _a, \ + patch("mcp_server.api_request") as mock_api: + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review"} # no request_changes + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="request_changes", remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("profile is not allowed to request_changes", r["reasons"]) + self._assert_no_mutation(mock_api) + + # -- comment -------------------------------------------------------------- + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_comment_succeeds_when_review_eligible(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 3}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="comment", body="finding", remote="prgs") + self.assertTrue(r["performed"]) + self.assertEqual(mock_api.call_args.args[3]["event"], "COMMENT") + + def test_comment_by_author_allowed(self): + # Commenting on your own PR is fine — only approve is self-blocked. + with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \ + patch("mcp_server.api_request") as mock_api: + mock_api.side_effect = [ + {"login": "jcwalker3"}, self._pr("jcwalker3"), {"id": 4}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="comment", body="note", remote="prgs") + self.assertTrue(r["performed"]) + + # -- identity / profile fail-closed --------------------------------------- + + @patch("mcp_server.get_auth_header", return_value=None) + def test_unknown_identity_blocks(self, _auth): + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs") + self.assertFalse(r["performed"]) + self.assertIsNone(r["authenticated_user"]) + self.assertIn("authenticated identity could not be determined", r["reasons"]) + + def test_disallowed_profile_operation_blocks(self): + with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \ + patch("mcp_server.api_request") as mock_api: + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-author", + "GITEA_ALLOWED_OPERATIONS": "read,pr.create"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="approve", remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("profile is not allowed to approve", r["reasons"]) + self._assert_no_mutation(mock_api) + + # -- head SHA guard ------------------------------------------------------- + + def test_head_sha_mismatch_blocks(self): + with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \ + patch("mcp_server.api_request") as mock_api: + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"), + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="approve", + expected_head_sha="deadbeef", remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn( + "expected head SHA does not match current PR head (fail closed)", + r["reasons"]) + self._assert_no_mutation(mock_api) + + def test_head_sha_match_allows(self): + with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \ + patch("mcp_server.api_request") as mock_api: + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"), + {"id": 5}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review( + pr_number=8, action="approve", + expected_head_sha="abc123", remote="prgs") + self.assertTrue(r["performed"]) + + # -- invalid action ------------------------------------------------------- + + @patch("mcp_server.api_request") + def test_invalid_review_action_rejected(self, mock_api): + with patch.dict(os.environ, {}, clear=True): + r = gitea_submit_pr_review(pr_number=8, action="delete", remote="prgs") + self.assertFalse(r["performed"]) + self.assertTrue(any("unknown review action" in x for x in r["reasons"])) + mock_api.assert_not_called() # never touches the API on a bad action + + # -- redaction ------------------------------------------------------------ + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_output_redacts_secrets(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 1}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve", + "GITEA_TOKEN": "super-secret-token"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs") + blob = repr(r).lower() + for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()): + self.assertNotIn(secret, blob) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_error_message_redacts_credential(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "reviewer-bot"}, + self._pr("author-bot"), + RuntimeError("HTTP 500: token abc-secret-xyz rejected"), + ] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs") + self.assertFalse(r["performed"]) + blob = repr(r) + self.assertIn("[REDACTED]", blob) + self.assertNotIn("abc-secret-xyz", blob) + + if __name__ == "__main__": unittest.main()