feat: add gated Gitea PR review actions (#15) #25
@@ -45,7 +45,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n
|
||||
| `gitea_list_prs` | List pull requests with state/remote |
|
||||
| `gitea_view_pr` | Get full details of a single pull request |
|
||||
| `gitea_merge_pr` | Merge a pull request (merge, squash, or rebase) |
|
||||
| `gitea_review_pr` | Submit a review on a pull request and optionally merge it |
|
||||
| `gitea_review_pr` | Legacy wrapper for `gitea_submit_pr_review` (merging disabled) |
|
||||
| `gitea_delete_branch` | Delete a remote branch |
|
||||
| `gitea_close_issue` | Close an issue by number |
|
||||
| `gitea_list_issues` | List issues with state/label filters |
|
||||
@@ -53,6 +53,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n
|
||||
| `gitea_whoami` | Read-only: identify the authenticated Gitea account (safe metadata only) |
|
||||
| `gitea_get_profile` | Read-only: describe the active runtime execution profile (safe metadata only) |
|
||||
| `gitea_check_pr_eligibility` | Read-only: check if the current identity/profile may review/approve/request_changes/merge a PR |
|
||||
| `gitea_submit_pr_review` | Gated review mutation: comment/approve/request_changes, only after identity+profile+eligibility gates pass (no merge, no self-approval) |
|
||||
| `gitea_mark_issue` | Claim/release an issue (start/done) |
|
||||
| `gitea_list_labels` | List all available labels in a repository |
|
||||
| `gitea_create_label` | Create a new label with custom color |
|
||||
|
||||
+212
-36
@@ -353,6 +353,184 @@ def gitea_check_pr_eligibility(
|
||||
return result
|
||||
|
||||
|
||||
# Review actions this gated tool can perform, mapped to (eligibility action,
|
||||
# Gitea review *event*). The eligibility action is fed to
|
||||
# ``gitea_check_pr_eligibility`` (#14) so every mutation reuses the same
|
||||
# identity/profile/author gates. Note: 'merge' is deliberately absent — merge
|
||||
# belongs to a separate tool/issue and is never performed here.
|
||||
_REVIEW_ACTIONS = {
|
||||
# 'comment' posts review findings without an approval/rejection state.
|
||||
# #14 names this eligibility category 'review'.
|
||||
"comment": ("review", "COMMENT"),
|
||||
"approve": ("approve", "APPROVE"),
|
||||
"request_changes": ("request_changes", "REQUEST_CHANGES"),
|
||||
}
|
||||
|
||||
# Patterns scrubbed from any surfaced error text so a credential can never leak.
|
||||
_SECRET_PREFIXES = ("token ", "Basic ")
|
||||
|
||||
|
||||
def _redact(text: str) -> str:
|
||||
"""Strip anything that looks like an Authorization credential from *text*.
|
||||
|
||||
Errors raised by ``api_request`` echo the server response body, not the
|
||||
request headers, so a token should never appear — this is defence in depth
|
||||
so a future change can't leak ``token …`` / ``Basic …`` material into a
|
||||
tool result or log line.
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
out = text
|
||||
for prefix in _SECRET_PREFIXES:
|
||||
idx = 0
|
||||
while True:
|
||||
i = out.find(prefix, idx)
|
||||
if i == -1:
|
||||
break
|
||||
j = i + len(prefix)
|
||||
while j < len(out) and not out[j].isspace():
|
||||
j += 1
|
||||
out = out[:i] + prefix + "[REDACTED]" + out[j:]
|
||||
idx = i + len(prefix) + len("[REDACTED]")
|
||||
return out
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def gitea_submit_pr_review(
|
||||
pr_number: int,
|
||||
action: str,
|
||||
body: str = "",
|
||||
expected_head_sha: str | None = None,
|
||||
remote: str = "dadeschools",
|
||||
host: str | None = None,
|
||||
org: str | None = None,
|
||||
repo: str | None = None,
|
||||
) -> dict:
|
||||
"""Gated PR review mutation: comment findings, request changes, or approve.
|
||||
|
||||
This is the only tool that submits a Gitea PR *review*. It performs a
|
||||
mutation **only after every safety gate passes**; if any gate fails it
|
||||
returns ``performed=False`` and never calls the mutating endpoint.
|
||||
|
||||
Gate order (fail-closed at each step):
|
||||
|
||||
1. Validate ``action`` is one of 'comment', 'approve', 'request_changes'.
|
||||
2. Reuse ``gitea_check_pr_eligibility`` (#14), which runs the authenticated
|
||||
-user lookup, active-profile lookup, PR-author lookup, self-approval
|
||||
block, and profile-allowed-operation check. ``approve`` requires
|
||||
eligibility for 'approve', ``request_changes`` requires
|
||||
'request_changes', and ``comment`` requires 'review'.
|
||||
3. Redundantly block self-approval (authenticated user == PR author).
|
||||
4. If ``expected_head_sha`` is supplied and the PR head has moved, abort.
|
||||
5. Only then POST the review.
|
||||
|
||||
Endpoint: ``POST /repos/{owner}/{repo}/pulls/{n}/reviews``. This is the
|
||||
*formal review* API (it records an APPROVE / COMMENT / REQUEST_CHANGES
|
||||
review state tied to the head commit), chosen over the plain issue-comment
|
||||
endpoint (``/issues/{n}/comments``) so that approvals and change requests
|
||||
carry real review state — a plain comment cannot approve or block a PR.
|
||||
|
||||
Merge is intentionally NOT implemented here.
|
||||
|
||||
Never returns the token, Authorization header, or any credential material.
|
||||
|
||||
Args:
|
||||
pr_number: Target PR number.
|
||||
action: 'comment', 'approve', or 'request_changes'.
|
||||
body: Review body / finding text.
|
||||
expected_head_sha: Optional. If given and the PR head SHA differs, the
|
||||
review is refused (guards against reviewing a changed PR).
|
||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
||||
host: Override the Gitea host.
|
||||
org: Override the owner/organization.
|
||||
repo: Override the repository name.
|
||||
|
||||
Returns:
|
||||
dict describing the attempt: action, whether it was performed, the
|
||||
authenticated user, profile name, PR author, PR number, head SHA
|
||||
checked, and the reasons/gates passed or blocked. Never secrets.
|
||||
"""
|
||||
action = (action or "").strip().lower()
|
||||
result = {
|
||||
"requested_action": action,
|
||||
"performed": False,
|
||||
"authenticated_user": None,
|
||||
"profile_name": get_profile()["profile_name"],
|
||||
"pr_author": None,
|
||||
"pr_number": pr_number,
|
||||
"head_sha": None,
|
||||
"expected_head_sha": expected_head_sha,
|
||||
"remote": remote if remote in REMOTES else None,
|
||||
"reasons": [],
|
||||
}
|
||||
reasons = result["reasons"]
|
||||
|
||||
# Gate 1 — valid review action (no mutation on unknown action).
|
||||
if action not in _REVIEW_ACTIONS:
|
||||
reasons.append(
|
||||
f"unknown review action '{action}'; expected one of "
|
||||
f"{sorted(_REVIEW_ACTIONS)}"
|
||||
)
|
||||
return result
|
||||
eligibility_action, event = _REVIEW_ACTIONS[action]
|
||||
|
||||
# Gate 2 — reuse #14 eligibility (identity + profile + author + self-approve
|
||||
# + profile-allowed). This performs only read-only GETs.
|
||||
elig = gitea_check_pr_eligibility(
|
||||
pr_number=pr_number,
|
||||
action=eligibility_action,
|
||||
remote=remote,
|
||||
host=host,
|
||||
org=org,
|
||||
repo=repo,
|
||||
)
|
||||
result["authenticated_user"] = elig.get("authenticated_user")
|
||||
result["profile_name"] = elig.get("profile_name", result["profile_name"])
|
||||
result["pr_author"] = elig.get("pr_author")
|
||||
result["head_sha"] = elig.get("head_sha")
|
||||
if not elig.get("eligible"):
|
||||
reasons.append(
|
||||
f"eligibility check for '{eligibility_action}' failed (fail closed)"
|
||||
)
|
||||
reasons.extend(elig.get("reasons", []))
|
||||
return result
|
||||
|
||||
# Gate 3 — redundant self-approval block (belt-and-suspenders over #14).
|
||||
auth_user = result["authenticated_user"]
|
||||
pr_author = result["pr_author"]
|
||||
if action == "approve" and auth_user and pr_author and auth_user == pr_author:
|
||||
reasons.append("self-approval blocked (authenticated user is PR author)")
|
||||
return result
|
||||
|
||||
# Gate 4 — head SHA must match if the caller pinned one.
|
||||
actual_sha = result["head_sha"]
|
||||
if expected_head_sha and actual_sha and expected_head_sha != actual_sha:
|
||||
reasons.append(
|
||||
"expected head SHA does not match current PR head (fail closed)"
|
||||
)
|
||||
return result
|
||||
if not actual_sha:
|
||||
# Should be unreachable — eligibility fails closed without a head SHA —
|
||||
# but never submit a review without a commit to pin it to.
|
||||
reasons.append("PR head SHA unavailable (fail closed)")
|
||||
return result
|
||||
|
||||
# All gates passed — perform the single mutating call.
|
||||
h, o, r = _resolve(remote, host, org, repo)
|
||||
try:
|
||||
auth = _auth(h)
|
||||
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
|
||||
payload = {"body": body, "event": event, "commit_id": actual_sha}
|
||||
api_request("POST", review_url, auth, payload)
|
||||
except Exception as exc: # noqa: BLE001 — redact before surfacing
|
||||
reasons.append(f"review submission failed: {_redact(str(exc))}")
|
||||
return result
|
||||
|
||||
result["performed"] = True
|
||||
reasons.append(f"all gates passed; submitted '{event}' review on PR #{pr_number}")
|
||||
return result
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
def gitea_edit_pr(
|
||||
pr_number: int,
|
||||
@@ -548,14 +726,18 @@ def gitea_review_pr(
|
||||
org: str | None = None,
|
||||
repo: str | None = None,
|
||||
) -> dict:
|
||||
"""Submit a review on a Gitea pull request and optionally merge it.
|
||||
"""Submit a review on a Gitea pull request (Legacy wrapper).
|
||||
|
||||
This tool is a compatibility wrapper around the safe `gitea_submit_pr_review`.
|
||||
It uses the same #14 eligibility gates.
|
||||
Merging via this tool is no longer supported and will fail closed (see #16).
|
||||
|
||||
Args:
|
||||
pr_number: The PR number to review.
|
||||
event: Review type — 'APPROVE', 'COMMENT', or 'REQUEST_CHANGES'.
|
||||
body: Review body text / comment.
|
||||
merge: If True and event is 'APPROVE', automatically merge the PR.
|
||||
merge_method: Merge style to use if merging — 'merge', 'squash', or 'rebase'.
|
||||
merge: Merging is disabled; if True, the tool fails closed.
|
||||
merge_method: Ignored.
|
||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
||||
host: Override the Gitea host.
|
||||
org: Override the owner/organization.
|
||||
@@ -564,45 +746,39 @@ def gitea_review_pr(
|
||||
Returns:
|
||||
dict with success status and message.
|
||||
"""
|
||||
if merge:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "merge=True is no longer supported in this tool (belongs to #16)."
|
||||
}
|
||||
|
||||
if event not in ["APPROVE", "COMMENT", "REQUEST_CHANGES"]:
|
||||
raise ValueError(f"Invalid review event: '{event}'. Choose from 'APPROVE', 'COMMENT', 'REQUEST_CHANGES'.")
|
||||
if merge_method not in ["merge", "squash", "rebase"]:
|
||||
raise ValueError(f"Invalid merge method: '{merge_method}'. Choose from 'merge', 'squash', 'rebase'.")
|
||||
|
||||
h, o, r = _resolve(remote, host, org, repo)
|
||||
auth = _auth(h)
|
||||
|
||||
# 1. Fetch PR to get the latest head commit SHA (required for review payload)
|
||||
pr_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
|
||||
pr_data = api_request("GET", pr_url, auth)
|
||||
commit_sha = pr_data.get("head", {}).get("sha")
|
||||
if not commit_sha:
|
||||
raise RuntimeError(f"Could not find head commit SHA for PR #{pr_number}.")
|
||||
|
||||
# 2. Submit the PR review
|
||||
review_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/reviews"
|
||||
payload = {
|
||||
"body": body,
|
||||
"event": event,
|
||||
"commit_id": commit_sha
|
||||
# Map legacy event string to the action expected by gitea_submit_pr_review
|
||||
event_map = {
|
||||
"APPROVE": "approve",
|
||||
"COMMENT": "comment",
|
||||
"REQUEST_CHANGES": "request_changes"
|
||||
}
|
||||
api_request("POST", review_url, auth, payload)
|
||||
msg = f"Successfully submitted review for PR #{pr_number} with event '{event}'."
|
||||
action = event_map[event]
|
||||
|
||||
# 3. Merge PR if merge is True and event is APPROVE
|
||||
if merge:
|
||||
if event != "APPROVE":
|
||||
msg += " Warning: Skipping merge because review event is not 'APPROVE'."
|
||||
else:
|
||||
merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge"
|
||||
merge_payload = {
|
||||
"Do": merge_method,
|
||||
"force_merge": False
|
||||
}
|
||||
api_request("POST", merge_url, auth, merge_payload)
|
||||
msg += f" Successfully merged PR #{pr_number} using '{merge_method}' method."
|
||||
result = gitea_submit_pr_review(
|
||||
pr_number=pr_number,
|
||||
action=action,
|
||||
body=body,
|
||||
expected_head_sha=None,
|
||||
remote=remote,
|
||||
host=host,
|
||||
org=org,
|
||||
repo=repo
|
||||
)
|
||||
|
||||
return {"success": True, "message": msg}
|
||||
if result.get("performed"):
|
||||
return {"success": True, "message": f"Successfully submitted review for PR #{pr_number} with event '{event}'."}
|
||||
else:
|
||||
reasons = result.get("reasons", [])
|
||||
return {"success": False, "message": f"Review submission failed eligibility gates: {reasons}"}
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
|
||||
+278
-24
@@ -29,6 +29,7 @@ from mcp_server import ( # noqa: E402
|
||||
gitea_whoami,
|
||||
gitea_get_profile,
|
||||
gitea_check_pr_eligibility,
|
||||
gitea_submit_pr_review,
|
||||
)
|
||||
from gitea_auth import get_profile # noqa: E402
|
||||
|
||||
@@ -320,38 +321,67 @@ class TestReviewPR(unittest.TestCase):
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_review_pr_and_merge(self, _auth, mock_api):
|
||||
# GET PR response (fetch head SHA)
|
||||
def test_legacy_review_pr_merge_fails_closed(self, _auth, mock_api):
|
||||
result = gitea_review_pr(
|
||||
pr_number=1,
|
||||
event="APPROVE",
|
||||
body="Looks good",
|
||||
merge=True
|
||||
)
|
||||
self.assertFalse(result["success"])
|
||||
self.assertIn("no longer supported", result["message"])
|
||||
self.assertEqual(mock_api.call_count, 0)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
@patch("mcp_server.get_profile")
|
||||
def test_legacy_review_pr_uses_gates(self, mock_get_profile, _auth, mock_api):
|
||||
# Mock profile to lack approve capability (fails gate)
|
||||
mock_get_profile.return_value = {
|
||||
"profile_name": "gitea-readonly",
|
||||
"allowed_operations": ["read"],
|
||||
"forbidden_operations": [],
|
||||
"base_url": None,
|
||||
}
|
||||
# mock_api responses for auth_user and pr_author
|
||||
mock_api.side_effect = [
|
||||
{"head": {"sha": "sha-val-123"}}, # GET PR pulls/1
|
||||
{}, # POST review
|
||||
{}, # POST merge
|
||||
{"login": "reviewer1"}, # /api/v1/user
|
||||
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "author1"}}, # /pulls/1
|
||||
]
|
||||
result = gitea_review_pr(
|
||||
pr_number=1,
|
||||
event="APPROVE",
|
||||
body="Looks good",
|
||||
merge=True,
|
||||
merge_method="squash"
|
||||
merge=False
|
||||
)
|
||||
self.assertTrue(result["success"])
|
||||
self.assertIn("Successfully submitted review", result["message"])
|
||||
self.assertIn("Successfully merged", result["message"])
|
||||
self.assertFalse(result["success"])
|
||||
self.assertIn("Review submission failed eligibility gates", result["message"])
|
||||
self.assertIn("not allowed to approve", result["message"])
|
||||
|
||||
# Check call counts and arguments
|
||||
self.assertEqual(mock_api.call_count, 3)
|
||||
|
||||
# Verify GET PR
|
||||
self.assertEqual(mock_api.call_args_list[0][0][0], "GET")
|
||||
|
||||
# Verify POST review
|
||||
self.assertEqual(mock_api.call_args_list[1][0][0], "POST")
|
||||
self.assertEqual(mock_api.call_args_list[1][0][3]["event"], "APPROVE")
|
||||
self.assertEqual(mock_api.call_args_list[1][0][3]["commit_id"], "sha-val-123")
|
||||
|
||||
# Verify POST merge
|
||||
self.assertEqual(mock_api.call_args_list[2][0][0], "POST")
|
||||
self.assertEqual(mock_api.call_args_list[2][0][3]["Do"], "squash")
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
@patch("mcp_server.get_profile")
|
||||
def test_legacy_review_pr_self_approval_blocked(self, mock_get_profile, _auth, mock_api):
|
||||
mock_get_profile.return_value = {
|
||||
"profile_name": "gitea-reviewer",
|
||||
"allowed_operations": ["read", "approve"],
|
||||
"forbidden_operations": [],
|
||||
"base_url": None,
|
||||
}
|
||||
# mock_api responses for auth_user and pr_author
|
||||
mock_api.side_effect = [
|
||||
{"login": "jcwalker3"}, # /api/v1/user
|
||||
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "jcwalker3"}}, # /pulls/1
|
||||
]
|
||||
result = gitea_review_pr(
|
||||
pr_number=1,
|
||||
event="APPROVE",
|
||||
body="Self approve",
|
||||
merge=False
|
||||
)
|
||||
self.assertFalse(result["success"])
|
||||
self.assertIn("Review submission failed eligibility gates", result["message"])
|
||||
self.assertIn("authenticated user is PR author", result["message"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -785,5 +815,229 @@ class TestPrEligibility(unittest.TestCase):
|
||||
self.assertNotIn(secret, blob)
|
||||
|
||||
|
||||
class TestSubmitPrReview(unittest.TestCase):
|
||||
"""Gated review-mutation tool (#15)."""
|
||||
|
||||
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
||||
return {
|
||||
"user": {"login": author},
|
||||
"state": state,
|
||||
"head": {"sha": sha},
|
||||
"mergeable": mergeable,
|
||||
}
|
||||
|
||||
def _methods(self, mock_api):
|
||||
return [c.args[0] for c in mock_api.call_args_list]
|
||||
|
||||
def _assert_no_mutation(self, mock_api):
|
||||
# A review mutation is POST .../reviews; eligibility only ever GETs.
|
||||
for c in mock_api.call_args_list:
|
||||
method, url = c.args[0], c.args[1]
|
||||
self.assertFalse(
|
||||
method == "POST" and url.endswith("/reviews"),
|
||||
f"unexpected review mutation: {method} {url}",
|
||||
)
|
||||
|
||||
# -- approve --------------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_approve_blocked_when_author(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
self.assertIn("authenticated user is PR author", r["reasons"])
|
||||
self._assert_no_mutation(mock_api)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_approve_succeeds_when_eligible(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="approve", body="LGTM", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
||||
self.assertEqual(r["pr_author"], "author-bot")
|
||||
self.assertEqual(r["head_sha"], "abc123")
|
||||
method, url = mock_api.call_args.args[0], mock_api.call_args.args[1]
|
||||
self.assertEqual(method, "POST")
|
||||
self.assertTrue(url.endswith("/pulls/8/reviews"))
|
||||
payload = mock_api.call_args.args[3]
|
||||
self.assertEqual(payload["event"], "APPROVE")
|
||||
self.assertEqual(payload["commit_id"], "abc123")
|
||||
|
||||
# -- request_changes ------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_request_changes_succeeds_when_eligible(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 9},
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,request_changes"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="request_changes",
|
||||
body="needs work", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
self.assertEqual(mock_api.call_args.args[3]["event"], "REQUEST_CHANGES")
|
||||
|
||||
def test_request_changes_blocked_without_eligibility(self):
|
||||
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) as _a, \
|
||||
patch("mcp_server.api_request") as mock_api:
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review"} # no request_changes
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="request_changes", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
self.assertIn("profile is not allowed to request_changes", r["reasons"])
|
||||
self._assert_no_mutation(mock_api)
|
||||
|
||||
# -- comment --------------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_comment_succeeds_when_review_eligible(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 3},
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="comment", body="finding", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
self.assertEqual(mock_api.call_args.args[3]["event"], "COMMENT")
|
||||
|
||||
def test_comment_by_author_allowed(self):
|
||||
# Commenting on your own PR is fine — only approve is self-blocked.
|
||||
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||
patch("mcp_server.api_request") as mock_api:
|
||||
mock_api.side_effect = [
|
||||
{"login": "jcwalker3"}, self._pr("jcwalker3"), {"id": 4},
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="comment", body="note", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
|
||||
# -- identity / profile fail-closed ---------------------------------------
|
||||
|
||||
@patch("mcp_server.get_auth_header", return_value=None)
|
||||
def test_unknown_identity_blocks(self, _auth):
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
self.assertIsNone(r["authenticated_user"])
|
||||
self.assertIn("authenticated identity could not be determined", r["reasons"])
|
||||
|
||||
def test_disallowed_profile_operation_blocks(self):
|
||||
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||
patch("mcp_server.api_request") as mock_api:
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="approve", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
self.assertIn("profile is not allowed to approve", r["reasons"])
|
||||
self._assert_no_mutation(mock_api)
|
||||
|
||||
# -- head SHA guard -------------------------------------------------------
|
||||
|
||||
def test_head_sha_mismatch_blocks(self):
|
||||
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||
patch("mcp_server.api_request") as mock_api:
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="approve",
|
||||
expected_head_sha="deadbeef", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
self.assertIn(
|
||||
"expected head SHA does not match current PR head (fail closed)",
|
||||
r["reasons"])
|
||||
self._assert_no_mutation(mock_api)
|
||||
|
||||
def test_head_sha_match_allows(self):
|
||||
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
||||
patch("mcp_server.api_request") as mock_api:
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
|
||||
{"id": 5},
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(
|
||||
pr_number=8, action="approve",
|
||||
expected_head_sha="abc123", remote="prgs")
|
||||
self.assertTrue(r["performed"])
|
||||
|
||||
# -- invalid action -------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
def test_invalid_review_action_rejected(self, mock_api):
|
||||
with patch.dict(os.environ, {}, clear=True):
|
||||
r = gitea_submit_pr_review(pr_number=8, action="delete", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
self.assertTrue(any("unknown review action" in x for x in r["reasons"]))
|
||||
mock_api.assert_not_called() # never touches the API on a bad action
|
||||
|
||||
# -- redaction ------------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_output_redacts_secrets(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 1},
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
||||
"GITEA_TOKEN": "super-secret-token"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
|
||||
blob = repr(r).lower()
|
||||
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
||||
self.assertNotIn(secret, blob)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_error_message_redacts_credential(self, _auth, mock_api):
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-bot"},
|
||||
self._pr("author-bot"),
|
||||
RuntimeError("HTTP 500: token abc-secret-xyz rejected"),
|
||||
]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
|
||||
self.assertFalse(r["performed"])
|
||||
blob = repr(r)
|
||||
self.assertIn("[REDACTED]", blob)
|
||||
self.assertNotIn("abc-secret-xyz", blob)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user