feat: add gated Gitea PR review actions (#15) #25
@@ -45,7 +45,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n
|
|||||||
| `gitea_list_prs` | List pull requests with state/remote |
|
| `gitea_list_prs` | List pull requests with state/remote |
|
||||||
| `gitea_view_pr` | Get full details of a single pull request |
|
| `gitea_view_pr` | Get full details of a single pull request |
|
||||||
| `gitea_merge_pr` | Merge a pull request (merge, squash, or rebase) |
|
| `gitea_merge_pr` | Merge a pull request (merge, squash, or rebase) |
|
||||||
| `gitea_review_pr` | Submit a review on a pull request and optionally merge it |
|
| `gitea_review_pr` | Legacy wrapper for `gitea_submit_pr_review` (merging disabled) |
|
||||||
| `gitea_delete_branch` | Delete a remote branch |
|
| `gitea_delete_branch` | Delete a remote branch |
|
||||||
| `gitea_close_issue` | Close an issue by number |
|
| `gitea_close_issue` | Close an issue by number |
|
||||||
| `gitea_list_issues` | List issues with state/label filters |
|
| `gitea_list_issues` | List issues with state/label filters |
|
||||||
@@ -53,6 +53,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n
|
|||||||
| `gitea_whoami` | Read-only: identify the authenticated Gitea account (safe metadata only) |
|
| `gitea_whoami` | Read-only: identify the authenticated Gitea account (safe metadata only) |
|
||||||
| `gitea_get_profile` | Read-only: describe the active runtime execution profile (safe metadata only) |
|
| `gitea_get_profile` | Read-only: describe the active runtime execution profile (safe metadata only) |
|
||||||
| `gitea_check_pr_eligibility` | Read-only: check if the current identity/profile may review/approve/request_changes/merge a PR |
|
| `gitea_check_pr_eligibility` | Read-only: check if the current identity/profile may review/approve/request_changes/merge a PR |
|
||||||
|
| `gitea_submit_pr_review` | Gated review mutation: comment/approve/request_changes, only after identity+profile+eligibility gates pass (no merge, no self-approval) |
|
||||||
| `gitea_mark_issue` | Claim/release an issue (start/done) |
|
| `gitea_mark_issue` | Claim/release an issue (start/done) |
|
||||||
| `gitea_list_labels` | List all available labels in a repository |
|
| `gitea_list_labels` | List all available labels in a repository |
|
||||||
| `gitea_create_label` | Create a new label with custom color |
|
| `gitea_create_label` | Create a new label with custom color |
|
||||||
|
|||||||
+212
-36
@@ -353,6 +353,184 @@ def gitea_check_pr_eligibility(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# Review actions this gated tool can perform, mapped to (eligibility action,
|
||||||
|
# Gitea review *event*). The eligibility action is fed to
|
||||||
|
# ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same
|
||||||
|
# identity/profile/author gates. Note: 'merge' is deliberately absent — merge
|
||||||
|
# belongs to a separate tool/issue and is never performed here.
|
||||||
|
_REVIEW_ACTIONS = {
|
||||||
|
# 'comment' posts review findings without an approval/rejection state.
|
||||||
|
# #14 names this eligibility category 'review'.
|
||||||
|
"comment": ("review", "COMMENT"),
|
||||||
|
"approve": ("approve", "APPROVE"),
|
||||||
|
"request_changes": ("request_changes", "REQUEST_CHANGES"),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Patterns scrubbed from any surfaced error text so a credential can never leak.
|
||||||
|
_SECRET_PREFIXES = ("token ", "Basic ")
|
||||||
|
|
||||||
|
|
||||||
|
def _redact(text: str) -> str:
|
||||||
|
"""Strip anything that looks like an Authorization credential from *text*.
|
||||||
|
|
||||||
|
Errors raised by ``api_request`` echo the server response body, not the
|
||||||
|
request headers, so a token should never appear — this is defence in depth
|
||||||
|
so a future change can't leak ``token …`` / ``Basic …`` material into a
|
||||||
|
tool result or log line.
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return text
|
||||||
|
out = text
|
||||||
|
for prefix in _SECRET_PREFIXES:
|
||||||
|
idx = 0
|
||||||
|
while True:
|
||||||
|
i = out.find(prefix, idx)
|
||||||
|
if i == -1:
|
||||||
|
break
|
||||||
|
j = i + len(prefix)
|
||||||
|
while j < len(out) and not out[j].isspace():
|
||||||
|
j += 1
|
||||||
|
out = out[:i] + prefix + "[REDACTED]" + out[j:]
|
||||||
|
idx = i + len(prefix) + len("[REDACTED]")
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def gitea_submit_pr_review(
|
||||||
|
pr_number: int,
|
||||||
|
action: str,
|
||||||
|
body: str = "",
|
||||||
|
expected_head_sha: str | None = None,
|
||||||
|
remote: str = "dadeschools",
|
||||||
|
host: str | None = None,
|
||||||
|
org: str | None = None,
|
||||||
|
repo: str | None = None,
|
||||||
|
) -> dict:
|
||||||
|
"""Gated PR review mutation: comment findings, request changes, or approve.
|
||||||
|
|
||||||
|
This is the only tool that submits a Gitea PR *review*. It performs a
|
||||||
|
mutation **only after every safety gate passes**; if any gate fails it
|
||||||
|
returns ``performed=False`` and never calls the mutating endpoint.
|
||||||
|
|
||||||
|
Gate order (fail-closed at each step):
|
||||||
|
|
||||||
|
1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'.
|
||||||
|
2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated
|
||||||
|
-user lookup, active-profile lookup, PR-author lookup, self-approval
|
||||||
|
block, and profile-allowed-operation check. ``approve`` requires
|
||||||
|
eligibility for 'approve', ``request_changes`` requires
|
||||||
|
'request_changes', and ``comment`` requires 'review'.
|
||||||
|
3. Redundantly block self-approval (authenticated user == PR author).
|
||||||
|
4. If ``expected_head_sha`` is supplied and the PR head has moved, abort.
|
||||||
|
5. Only then POST the review.
|
||||||
|
|
||||||
|
Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the
|
||||||
|
*formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES
|
||||||
|
review state tied to the head commit), chosen over the plain issue-comment
|
||||||
|
endpoint (``/issues/{n}/comments``) so that approvals and change requests
|
||||||
|
carry real review state — a plain comment cannot approve or block a PR.
|
||||||
|
|
||||||
|
Merge is intentionally NOT implemented here.
|
||||||
|
|
||||||
|
Never returns the token, Authorization header, or any credential material.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
pr_number: Target PR number.
|
||||||
|
action: 'comment', 'approve', or 'request_changes'.
|
||||||
|
body: Review body / finding text.
|
||||||
|
expected_head_sha: Optional. If given and the PR head SHA differs, the
|
||||||
|
review is refused (guards against reviewing a changed PR).
|
||||||
|
remote: Known instance — 'dadeschools' or 'prgs'.
|
||||||
|
host: Override the Gitea host.
|
||||||
|
org: Override the owner/organization.
|
||||||
|
repo: Override the repository name.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict describing the attempt: action, whether it was performed, the
|
||||||
|
authenticated user, profile name, PR author, PR number, head SHA
|
||||||
|
checked, and the reasons/gates passed or blocked. Never secrets.
|
||||||
|
"""
|
||||||
|
action = (action or "").strip().lower()
|
||||||
|
result = {
|
||||||
|
"requested_action": action,
|
||||||
|
"performed": False,
|
||||||
|
"authenticated_user": None,
|
||||||
|
"profile_name": get_profile()["profile_name"],
|
||||||
|
"pr_author": None,
|
||||||
|
"pr_number": pr_number,
|
||||||
|
"head_sha": None,
|
||||||
|
"expected_head_sha": expected_head_sha,
|
||||||
|
"remote": remote if remote in REMOTES else None,
|
||||||
|
"reasons": [],
|
||||||
|
}
|
||||||
|
reasons = result["reasons"]
|
||||||
|
|
||||||
|
# Gate 1 — valid review action (no mutation on unknown action).
|
||||||
|
if action not in _REVIEW_ACTIONS:
|
||||||
|
reasons.append(
|
||||||
|
f"unknown review action '{action}'; expected one of "
|
||||||
|
f"{sorted(_REVIEW_ACTIONS)}"
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
eligibility_action, event = _REVIEW_ACTIONS[action]
|
||||||
|
|
||||||
|
# Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve
|
||||||
|
# + profile-allowed). This performs only read-only GETs.
|
||||||
|
elig = gitea_check_pr_eligibility(
|
||||||
|
pr_number=pr_number,
|
||||||
|
action=eligibility_action,
|
||||||
|
remote=remote,
|
||||||
|
host=host,
|
||||||
|
org=org,
|
||||||
|
repo=repo,
|
||||||
|
)
|
||||||
|
result["authenticated_user"] = elig.get("authenticated_user")
|
||||||
|
result["profile_name"] = elig.get("profile_name", result["profile_name"])
|
||||||
|
result["pr_author"] = elig.get("pr_author")
|
||||||
|
result["head_sha"] = elig.get("head_sha")
|
||||||
|
if not elig.get("eligible"):
|
||||||
|
reasons.append(
|
||||||
|
f"eligibility check for '{eligibility_action}' failed (fail closed)"
|
||||||
|
)
|
||||||
|
reasons.extend(elig.get("reasons", []))
|
||||||
|
return result
|
||||||
|
|
||||||
|
# Gate 3 — redundant self-approval block (belt-and-suspenders over #14).
|
||||||
|
auth_user = result["authenticated_user"]
|
||||||
|
pr_author = result["pr_author"]
|
||||||
|
if action == "approve" and auth_user and pr_author and auth_user == pr_author:
|
||||||
|
reasons.append("self-approval blocked (authenticated user is PR author)")
|
||||||
|
return result
|
||||||
|
|
||||||
|
# Gate 4 — head SHA must match if the caller pinned one.
|
||||||
|
actual_sha = result["head_sha"]
|
||||||
|
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
|
||||||
|
reasons.append(
|
||||||
|
"expected head SHA does not match current PR head (fail closed)"
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
if not actual_sha:
|
||||||
|
# Should be unreachable — eligibility fails closed without a head SHA —
|
||||||
|
# but never submit a review without a commit to pin it to.
|
||||||
|
reasons.append("PR head SHA unavailable (fail closed)")
|
||||||
|
return result
|
||||||
|
|
||||||
|
# All gates passed — perform the single mutating call.
|
||||||
|
h, o, r = _resolve(remote, host, org, repo)
|
||||||
|
try:
|
||||||
|
auth = _auth(h)
|
||||||
|
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
|
||||||
|
payload = {"body": body, "event": event, "commit_id": actual_sha}
|
||||||
|
api_request("POST", review_url, auth, payload)
|
||||||
|
except Exception as exc: # noqa: BLE001 — redact before surfacing
|
||||||
|
reasons.append(f"review submission failed: {_redact(str(exc))}")
|
||||||
|
return result
|
||||||
|
|
||||||
|
result["performed"] = True
|
||||||
|
reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def gitea_edit_pr(
|
def gitea_edit_pr(
|
||||||
pr_number: int,
|
pr_number: int,
|
||||||
@@ -548,14 +726,18 @@ def gitea_review_pr(
|
|||||||
org: str | None = None,
|
org: str | None = None,
|
||||||
repo: str | None = None,
|
repo: str | None = None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Submit a review on a Gitea pull request and optionally merge it.
|
"""Submit a review on a Gitea pull request (Legacy wrapper).
|
||||||
|
|
||||||
|
This tool is a compatibility wrapper around the safe `gitea_submit_pr_review`.
|
||||||
|
It uses the same #14 eligibility gates.
|
||||||
|
Merging via this tool is no longer supported and will fail closed (see #16).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
pr_number: The PR number to review.
|
pr_number: The PR number to review.
|
||||||
event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'.
|
event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'.
|
||||||
body: Review body text / comment.
|
body: Review body text / comment.
|
||||||
merge: If True and event is 'APPROVE', automatically merge the PR.
|
merge: Merging is disabled; if True, the tool fails closed.
|
||||||
merge_method: Merge style to use if merging — 'merge', 'squash', or 'rebase'.
|
merge_method: Ignored.
|
||||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
remote: Known instance — 'dadeschools' or 'prgs'.
|
||||||
host: Override the Gitea host.
|
host: Override the Gitea host.
|
||||||
org: Override the owner/organization.
|
org: Override the owner/organization.
|
||||||
@@ -564,45 +746,39 @@ def gitea_review_pr(
|
|||||||
Returns:
|
Returns:
|
||||||
dict with success status and message.
|
dict with success status and message.
|
||||||
"""
|
"""
|
||||||
|
if merge:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"message": "merge=True is no longer supported in this tool (belongs to #16)."
|
||||||
|
}
|
||||||
|
|
||||||
if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]:
|
if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]:
|
||||||
raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.")
|
raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.")
|
||||||
if merge_method not in ["merge", "squash", "rebase"]:
|
|
||||||
raise ValueError(f"Invalid merge method: '{merge_method}'. Choose from 'merge', 'squash', 'rebase'.")
|
|
||||||
|
|
||||||
h, o, r = _resolve(remote, host, org, repo)
|
# Map legacy event string to the action expected by gitea_submit_pr_review
|
||||||
auth = _auth(h)
|
event_map = {
|
||||||
|
"APPROVE": "approve",
|
||||||
# 1. Fetch PR to get the latest head commit SHA (required for review payload)
|
"COMMENT": "comment",
|
||||||
pr_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
|
"REQUEST_CHANGES": "request_changes"
|
||||||
pr_data = api_request("GET", pr_url, auth)
|
|
||||||
commit_sha = pr_data.get("head", {}).get("sha")
|
|
||||||
if not commit_sha:
|
|
||||||
raise RuntimeError(f"Could not find head commit SHA for PR #{pr_number}.")
|
|
||||||
|
|
||||||
# 2. Submit the PR review
|
|
||||||
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
|
|
||||||
payload = {
|
|
||||||
"body": body,
|
|
||||||
"event": event,
|
|
||||||
"commit_id": commit_sha
|
|
||||||
}
|
}
|
||||||
api_request("POST", review_url, auth, payload)
|
action = event_map[event]
|
||||||
msg = f"Successfully submitted review for PR #{pr_number} with event '{event}'."
|
|
||||||
|
|
||||||
# 3. Merge PR if merge is True and event is APPROVE
|
result = gitea_submit_pr_review(
|
||||||
if merge:
|
pr_number=pr_number,
|
||||||
if event != "APPROVE":
|
action=action,
|
||||||
msg += " Warning: Skipping merge because review event is not 'APPROVE'."
|
body=body,
|
||||||
|
expected_head_sha=None,
|
||||||
|
remote=remote,
|
||||||
|
host=host,
|
||||||
|
org=org,
|
||||||
|
repo=repo
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.get("performed"):
|
||||||
|
return {"success": True, "message": f"Successfully submitted review for PR #{pr_number} with event '{event}'."}
|
||||||
else:
|
else:
|
||||||
merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge"
|
reasons = result.get("reasons", [])
|
||||||
merge_payload = {
|
return {"success": False, "message": f"Review submission failed eligibility gates: {reasons}"}
|
||||||
"Do": merge_method,
|
|
||||||
"force_merge": False
|
|
||||||
}
|
|
||||||
api_request("POST", merge_url, auth, merge_payload)
|
|
||||||
msg += f" Successfully merged PR #{pr_number} using '{merge_method}' method."
|
|
||||||
|
|
||||||
return {"success": True, "message": msg}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
|
|||||||
+278
-24
@@ -29,6 +29,7 @@ from mcp_server import ( # noqa: E402
|
|||||||
gitea_whoami,
|
gitea_whoami,
|
||||||
gitea_get_profile,
|
gitea_get_profile,
|
||||||
gitea_check_pr_eligibility,
|
gitea_check_pr_eligibility,
|
||||||
|
gitea_submit_pr_review,
|
||||||
)
|
)
|
||||||
from gitea_auth import get_profile # noqa: E402
|
from gitea_auth import get_profile # noqa: E402
|
||||||
|
|
||||||
@@ -320,38 +321,67 @@ class TestReviewPR(unittest.TestCase):
|
|||||||
|
|
||||||
@patch("mcp_server.api_request")
|
@patch("mcp_server.api_request")
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
def test_review_pr_and_merge(self, _auth, mock_api):
|
def test_legacy_review_pr_merge_fails_closed(self, _auth, mock_api):
|
||||||
# GET PR response (fetch head SHA)
|
result = gitea_review_pr(
|
||||||
|
pr_number=1,
|
||||||
|
event="APPROVE",
|
||||||
|
body="Looks good",
|
||||||
|
merge=True
|
||||||
|
)
|
||||||
|
self.assertFalse(result["success"])
|
||||||
|
self.assertIn("no longer supported", result["message"])
|
||||||
|
self.assertEqual(mock_api.call_count, 0)
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
@patch("mcp_server.get_profile")
|
||||||
|
def test_legacy_review_pr_uses_gates(self, mock_get_profile, _auth, mock_api):
|
||||||
|
# Mock profile to lack approve capability (fails gate)
|
||||||
|
mock_get_profile.return_value = {
|
||||||
|
"profile_name": "gitea-readonly",
|
||||||
|
"allowed_operations": ["read"],
|
||||||
|
"forbidden_operations": [],
|
||||||
|
"base_url": None,
|
||||||
|
}
|
||||||
|
# mock_api responses for auth_user and pr_author
|
||||||
mock_api.side_effect = [
|
mock_api.side_effect = [
|
||||||
{"head": {"sha": "sha-val-123"}}, # GET PR pulls/1
|
{"login": "reviewer1"}, # /api/v1/user
|
||||||
{}, # POST review
|
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "author1"}}, # /pulls/1
|
||||||
{}, # POST merge
|
|
||||||
]
|
]
|
||||||
result = gitea_review_pr(
|
result = gitea_review_pr(
|
||||||
pr_number=1,
|
pr_number=1,
|
||||||
event="APPROVE",
|
event="APPROVE",
|
||||||
body="Looks good",
|
body="Looks good",
|
||||||
merge=True,
|
merge=False
|
||||||
merge_method="squash"
|
|
||||||
)
|
)
|
||||||
self.assertTrue(result["success"])
|
self.assertFalse(result["success"])
|
||||||
self.assertIn("Successfully submitted review", result["message"])
|
self.assertIn("Review submission failed eligibility gates", result["message"])
|
||||||
self.assertIn("Successfully merged", result["message"])
|
self.assertIn("not allowed to approve", result["message"])
|
||||||
|
|
||||||
# Check call counts and arguments
|
@patch("mcp_server.api_request")
|
||||||
self.assertEqual(mock_api.call_count, 3)
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
@patch("mcp_server.get_profile")
|
||||||
# Verify GET PR
|
def test_legacy_review_pr_self_approval_blocked(self, mock_get_profile, _auth, mock_api):
|
||||||
self.assertEqual(mock_api.call_args_list[0][0][0], "GET")
|
mock_get_profile.return_value = {
|
||||||
|
"profile_name": "gitea-reviewer",
|
||||||
# Verify POST review
|
"allowed_operations": ["read", "approve"],
|
||||||
self.assertEqual(mock_api.call_args_list[1][0][0], "POST")
|
"forbidden_operations": [],
|
||||||
self.assertEqual(mock_api.call_args_list[1][0][3]["event"], "APPROVE")
|
"base_url": None,
|
||||||
self.assertEqual(mock_api.call_args_list[1][0][3]["commit_id"], "sha-val-123")
|
}
|
||||||
|
# mock_api responses for auth_user and pr_author
|
||||||
# Verify POST merge
|
mock_api.side_effect = [
|
||||||
self.assertEqual(mock_api.call_args_list[2][0][0], "POST")
|
{"login": "jcwalker3"}, # /api/v1/user
|
||||||
self.assertEqual(mock_api.call_args_list[2][0][3]["Do"], "squash")
|
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "jcwalker3"}}, # /pulls/1
|
||||||
|
]
|
||||||
|
result = gitea_review_pr(
|
||||||
|
pr_number=1,
|
||||||
|
event="APPROVE",
|
||||||
|
body="Self approve",
|
||||||
|
merge=False
|
||||||
|
)
|
||||||
|
self.assertFalse(result["success"])
|
||||||
|
self.assertIn("Review submission failed eligibility gates", result["message"])
|
||||||
|
self.assertIn("authenticated user is PR author", result["message"])
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -785,5 +815,229 @@ class TestPrEligibility(unittest.TestCase):
|
|||||||
self.assertNotIn(secret, blob)
|
self.assertNotIn(secret, blob)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSubmitPrReview(unittest.TestCase):
|
||||||
|
"""Gated review-mutation tool (#15)."""
|
||||||
|
|
||||||
|
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
||||||
|
return {
|
||||||
|
"user": {"login": author},
|
||||||
|
"state": state,
|
||||||
|
"head": {"sha": sha},
|
||||||
|
"mergeable": mergeable,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _methods(self, mock_api):
|
||||||
|
return [c.args[0] for c in mock_api.call_args_list]
|
||||||
|
|
||||||
|
def _assert_no_mutation(self, mock_api):
|
||||||
|
# A review mutation is POST .../reviews; eligibility only ever GETs.
|
||||||
|
for c in mock_api.call_args_list:
|
||||||
|
method, url = c.args[0], c.args[1]
|
||||||
|
self.assertFalse(
|
||||||
|
method == "POST" and url.endswith("/reviews"),
|
||||||
|
f"unexpected review mutation: {method} {url}",
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- approve --------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
def test_approve_blocked_when_author(self, _auth, mock_api):
|
||||||
|
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
self.assertIn("authenticated user is PR author", r["reasons"])
|
||||||
|
self._assert_no_mutation(mock_api)
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
def test_approve_succeeds_when_eligible(self, _auth, mock_api):
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="approve", body="LGTM", remote="prgs")
|
||||||
|
self.assertTrue(r["performed"])
|
||||||
|
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
||||||
|
self.assertEqual(r["pr_author"], "author-bot")
|
||||||
|
self.assertEqual(r["head_sha"], "abc123")
|
||||||
|
method, url = mock_api.call_args.args[0], mock_api.call_args.args[1]
|
||||||
|
self.assertEqual(method, "POST")
|
||||||
|
self.assertTrue(url.endswith("/pulls/8/reviews"))
|
||||||
|
payload = mock_api.call_args.args[3]
|
||||||
|
self.assertEqual(payload["event"], "APPROVE")
|
||||||
|
self.assertEqual(payload["commit_id"], "abc123")
|
||||||
|
|
||||||
|
# -- request_changes ------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
def test_request_changes_succeeds_when_eligible(self, _auth, mock_api):
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 9},
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,request_changes"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="request_changes",
|
||||||
|
body="needs work", remote="prgs")
|
||||||
|
self.assertTrue(r["performed"])
|
||||||
|
self.assertEqual(mock_api.call_args.args[3]["event"], "REQUEST_CHANGES")
|
||||||
|
|
||||||
|
def test_request_changes_blocked_without_eligibility(self):
|
||||||
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) as _a, \
|
||||||
|
patch("mcp_server.api_request") as mock_api:
|
||||||
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review"} # no request_changes
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="request_changes", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
self.assertIn("profile is not allowed to request_changes", r["reasons"])
|
||||||
|
self._assert_no_mutation(mock_api)
|
||||||
|
|
||||||
|
# -- comment --------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
def test_comment_succeeds_when_review_eligible(self, _auth, mock_api):
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 3},
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="comment", body="finding", remote="prgs")
|
||||||
|
self.assertTrue(r["performed"])
|
||||||
|
self.assertEqual(mock_api.call_args.args[3]["event"], "COMMENT")
|
||||||
|
|
||||||
|
def test_comment_by_author_allowed(self):
|
||||||
|
# Commenting on your own PR is fine — only approve is self-blocked.
|
||||||
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||||
|
patch("mcp_server.api_request") as mock_api:
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "jcwalker3"}, self._pr("jcwalker3"), {"id": 4},
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="comment", body="note", remote="prgs")
|
||||||
|
self.assertTrue(r["performed"])
|
||||||
|
|
||||||
|
# -- identity / profile fail-closed ---------------------------------------
|
||||||
|
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=None)
|
||||||
|
def test_unknown_identity_blocks(self, _auth):
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
self.assertIsNone(r["authenticated_user"])
|
||||||
|
self.assertIn("authenticated identity could not be determined", r["reasons"])
|
||||||
|
|
||||||
|
def test_disallowed_profile_operation_blocks(self):
|
||||||
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||||
|
patch("mcp_server.api_request") as mock_api:
|
||||||
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="approve", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
self.assertIn("profile is not allowed to approve", r["reasons"])
|
||||||
|
self._assert_no_mutation(mock_api)
|
||||||
|
|
||||||
|
# -- head SHA guard -------------------------------------------------------
|
||||||
|
|
||||||
|
def test_head_sha_mismatch_blocks(self):
|
||||||
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||||
|
patch("mcp_server.api_request") as mock_api:
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="approve",
|
||||||
|
expected_head_sha="deadbeef", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
self.assertIn(
|
||||||
|
"expected head SHA does not match current PR head (fail closed)",
|
||||||
|
r["reasons"])
|
||||||
|
self._assert_no_mutation(mock_api)
|
||||||
|
|
||||||
|
def test_head_sha_match_allows(self):
|
||||||
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||||
|
patch("mcp_server.api_request") as mock_api:
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
|
||||||
|
{"id": 5},
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(
|
||||||
|
pr_number=8, action="approve",
|
||||||
|
expected_head_sha="abc123", remote="prgs")
|
||||||
|
self.assertTrue(r["performed"])
|
||||||
|
|
||||||
|
# -- invalid action -------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
def test_invalid_review_action_rejected(self, mock_api):
|
||||||
|
with patch.dict(os.environ, {}, clear=True):
|
||||||
|
r = gitea_submit_pr_review(pr_number=8, action="delete", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
self.assertTrue(any("unknown review action" in x for x in r["reasons"]))
|
||||||
|
mock_api.assert_not_called() # never touches the API on a bad action
|
||||||
|
|
||||||
|
# -- redaction ------------------------------------------------------------
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
def test_output_redacts_secrets(self, _auth, mock_api):
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 1},
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
||||||
|
"GITEA_TOKEN": "super-secret-token"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
|
||||||
|
blob = repr(r).lower()
|
||||||
|
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
||||||
|
self.assertNotIn(secret, blob)
|
||||||
|
|
||||||
|
@patch("mcp_server.api_request")
|
||||||
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||||
|
def test_error_message_redacts_credential(self, _auth, mock_api):
|
||||||
|
mock_api.side_effect = [
|
||||||
|
{"login": "reviewer-bot"},
|
||||||
|
self._pr("author-bot"),
|
||||||
|
RuntimeError("HTTP 500: token abc-secret-xyz rejected"),
|
||||||
|
]
|
||||||
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||||
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||||
|
with patch.dict(os.environ, env, clear=True):
|
||||||
|
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
|
||||||
|
self.assertFalse(r["performed"])
|
||||||
|
blob = repr(r)
|
||||||
|
self.assertIn("[REDACTED]", blob)
|
||||||
|
self.assertNotIn("abc-secret-xyz", blob)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user