diff --git a/README.md b/README.md index c124ee8..fb12133 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools n | `gitea_edit_pr` | Edit details of an existing pull request | | `gitea_list_prs` | List pull requests with state/remote | | `gitea_view_pr` | Get full details of a single pull request | -| `gitea_merge_pr` | Merge a pull request (merge, squash, or rebase) | +| `gitea_merge_pr` | Gated merge: merge/squash/rebase only after identity+profile+eligibility gates pass, explicit `confirmation="MERGE PR "`, optional head-SHA and changed-files pinning (no self-merge, no force) | | `gitea_review_pr` | Legacy wrapper for `gitea_submit_pr_review` (merging disabled) | | `gitea_delete_branch` | Delete a remote branch | | `gitea_close_issue` | Close an issue by number | @@ -180,6 +180,9 @@ Notes: can inspect which runtime it is talking to before deciding to act. - See [`docs/gitea-execution-profiles.md`](docs/gitea-execution-profiles.md) for the full profile model. +- **Audit logging:** #16 returns structured gate/merge results but does not add + durable audit logging. Durable audit logging for Gitea MCP mutating actions is + tracked by #18.
@@ -204,7 +207,7 @@ The MCP tools can also be used as standalone CLI scripts: | `create_issue.py` | Create an issue (`--remote`, `--title`, `--body`, `--body-file`) | | `create_pr.py` | Open a Pull Request (`--remote`, `--title`, `--head`, `--base`) | | `edit_pr.py` | Edit a Pull Request (`--title`, `--body`, `--body-file`, etc.) | -| `review_pr.py` | Review and sign-off on a pull request (with optional merge) | +| `review_pr.py` | Review/sign-off on a pull request (`--merge` is disabled — fails closed; merge only via gated `gitea_merge_pr`) | | `close_issue.py` | Close a specific issue | | `mark_issue.py` | Claim/release an issue via `status:in-progress` label | | `manage_labels.py` | Create label set and apply label mappings (`--dry` to preview) | @@ -225,8 +228,9 @@ The MCP tools can also be used as standalone CLI scripts: # Edit a PR's description or title ./edit_pr.py 155 --body "Updated description wording" -# Review and approve a PR, then automatically merge it -./review_pr.py --pr-number 12 --event APPROVE --body "Approved" --merge +# Review and approve a PR (review only — CLI merge is disabled; use the +# gated gitea_merge_pr MCP workflow to merge) +./review_pr.py --pr-number 12 --event APPROVE --body "Approved" # Close issue #5 ./close_issue.py 5 diff --git a/mcp_server.py b/mcp_server.py index 6ba1a81..3810c3f 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -671,47 +671,193 @@ def gitea_commit_files( } +# Merge methods supported by the Gitea merge API. +_MERGE_METHODS = ("merge", "squash", "rebase") + + @mcp.tool() def gitea_merge_pr( pr_number: int, + confirmation: str = "", + expected_head_sha: str | None = None, + expected_changed_files: list[str] | None = None, do: str = "merge", title: str | None = None, message: str | None = None, - force: bool = False, remote: str = "dadeschools", host: str | None = None, org: str | None = None, repo: str | None = None, ) -> dict: - """Merge a Gitea pull request. + """Gated merge of a Gitea pull request (#16). + + This is the ONLY merge path this server exposes, and it mutates only after + every safety gate passes. No ungated merge tool remains: legacy + ``gitea_review_pr`` fails closed on ``merge=True`` and + ``gitea_submit_pr_review`` never merges. + + Gate order (fail-closed at each step; the merge API is called only if all + gates pass): + + 1. Merge method (``do``) is 'merge', 'squash', or 'rebase'. + 2. Explicit confirmation: ``confirmation`` must equal ``"MERGE PR "``. + Without it, the tool makes no API calls at all. + 3. Reuse ``gitea_check_pr_eligibility`` (#14) with action 'merge': this + proves the authenticated identity, the active profile (and that it + allows merge), the PR author, blocks self-merge, requires the PR to be + open, and fails closed when the PR is not mergeable or mergeability is + unknown. + 4. If ``expected_head_sha`` is given and the PR head moved → refuse. + 5. If ``expected_changed_files`` is given and the PR's changed file set + differs → refuse. + 6. Redundant self-merge block (authenticated user == PR author). + + No force / ignore-checks option is exposed. Gitea's own ``mergeable`` signal + (which reflects branch-protection required reviews and status checks) must + be positive, so required approval/check state is honoured, never bypassed. + + Never returns the token, Authorization header, or any credential material. Args: pr_number: The PR number to merge. + confirmation: Must be exactly ``"MERGE PR "`` or merge is refused. + expected_head_sha: Strongly recommended. If set and the PR head differs, refuse. + expected_changed_files: Optional. If set and the PR's changed file set + differs, refuse. do: Merge style — 'merge', 'squash', or 'rebase'. - title: Optional merge title. - message: Optional merge message. - force: Force merge, ignoring status checks. + title: Optional merge commit title. + message: Optional merge commit message. remote: Known instance — 'dadeschools' or 'prgs'. host: Override the Gitea host. org: Override the owner/organization. repo: Override the repository name. Returns: - dict with 'success' and 'message'. + dict describing the attempt: performed, authenticated user, profile + name, PR author, PR number, head SHA checked, merge method, + reasons/gates passed or blocked, and merge result / merge commit if + available. Never secrets. """ - h, o, r = _resolve(remote, host, org, repo) - auth = _auth(h) - url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge" - payload = { - "Do": do, - "force_merge": force, + do = (do or "").strip().lower() + result = { + "performed": False, + "authenticated_user": None, + "profile_name": get_profile()["profile_name"], + "pr_author": None, + "pr_number": pr_number, + "head_sha": None, + "expected_head_sha": expected_head_sha, + "merge_method": do, + "mergeable": None, + "remote": remote if remote in REMOTES else None, + "merge_result": None, + "merge_commit": None, + "reasons": [], } - if title: - payload["MergeTitleField"] = title - if message: - payload["MergeMessageField"] = message - api_request("POST", url, auth, payload) - return {"success": True, "message": f"PR #{pr_number} merged via '{do}'."} + reasons = result["reasons"] + + # Gate 1 — valid merge method (no API call on a bad method). + if do not in _MERGE_METHODS: + reasons.append( + f"unknown merge method '{do}'; expected one of {list(_MERGE_METHODS)}" + ) + return result + + # Gate 2 — explicit confirmation (fail fast; zero API calls without it). + expected_confirmation = f"MERGE PR {pr_number}" + if (confirmation or "").strip() != expected_confirmation: + reasons.append( + f"explicit confirmation required: pass confirmation='{expected_confirmation}'" + ) + return result + + # Gate 3 — reuse #14 eligibility (identity + profile + merge-allowed + + # author + self-merge block + open + mergeable/unknown fail-closed). + # Read-only GETs only. + elig = gitea_check_pr_eligibility( + pr_number=pr_number, action="merge", remote=remote, + host=host, org=org, repo=repo, + ) + result["authenticated_user"] = elig.get("authenticated_user") + result["profile_name"] = elig.get("profile_name", result["profile_name"]) + result["pr_author"] = elig.get("pr_author") + result["head_sha"] = elig.get("head_sha") + result["mergeable"] = elig.get("mergeable") + if not elig.get("eligible"): + reasons.append("eligibility check for 'merge' failed (fail closed)") + reasons.extend(elig.get("reasons", [])) + return result + + # Gate 4 — head SHA must match if the caller pinned a reviewed SHA. + actual_sha = result["head_sha"] + if expected_head_sha and actual_sha and expected_head_sha != actual_sha: + reasons.append( + "expected head SHA does not match current PR head (fail closed)" + ) + return result + if not actual_sha: + # Unreachable — eligibility fails closed without a head SHA — but never + # merge a PR whose head commit we could not read. + reasons.append("PR head SHA unavailable (fail closed)") + return result + + h, o, r = _resolve(remote, host, org, repo) + + # Gate 5 — changed files must match the reviewed set, if provided. + if expected_changed_files is not None: + try: + auth = _auth(h) + files = api_request( + "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}/files", auth + ) + actual_files = sorted( + (f or {}).get("filename", "") for f in (files or []) + ) + except Exception as exc: # noqa: BLE001 — redact before surfacing + reasons.append( + f"could not verify changed files (fail closed): {_redact(str(exc))}" + ) + return result + result["changed_files"] = actual_files + if actual_files != sorted(expected_changed_files): + reasons.append( + "PR changed files do not match expected_changed_files (fail closed)" + ) + return result + + # Gate 6 — redundant self-merge block (belt-and-suspenders over #14). + auth_user = result["authenticated_user"] + pr_author = result["pr_author"] + if auth_user and pr_author and auth_user == pr_author: + reasons.append("self-merge blocked (authenticated user is PR author)") + return result + + # All gates passed — perform the single merge mutation. + try: + auth = _auth(h) + merge_url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}/merge" + payload = {"Do": do} + if title: + payload["MergeTitleField"] = title + if message: + payload["MergeMessageField"] = message + api_request("POST", merge_url, auth, payload) + # Best-effort read-back of the merge commit SHA (redacted on error). + try: + merged = api_request( + "GET", f"{repo_api_url(h, o, r)}/pulls/{pr_number}", auth + ) + result["merge_commit"] = (merged or {}).get("merged_commit_sha") + except Exception: + result["merge_commit"] = None + except Exception as exc: # noqa: BLE001 — redact before surfacing + reasons.append(f"merge failed: {_redact(str(exc))}") + return result + + result["performed"] = True + result["merge_result"] = f"PR #{pr_number} merged via '{do}'." + reasons.append(f"all gates passed; merged PR #{pr_number} via '{do}'") + return result @mcp.tool() diff --git a/merge_pr.py b/merge_pr.py index c34c8d4..6f5ded6 100755 --- a/merge_pr.py +++ b/merge_pr.py @@ -1,7 +1,13 @@ #!/usr/bin/env python3 -"""Merge a Gitea pull request. +"""Merge a Gitea pull request — DISABLED (#16). -Usage: +Direct CLI merge is a fail-closed no-op. LLM automations in this project were +using this script as an ungated merge bypass, so it no longer performs a merge. +Merge is only available through the gated `gitea_merge_pr` MCP workflow (#16), +which enforces identity/profile/eligibility, explicit confirmation, expected +head SHA checking, and self-merge protection. + +Usage (fails closed): merge_pr.py --pr-number 84 --do squash --remote dadeschools """ import os @@ -29,29 +35,20 @@ def main(argv=None): parser.add_argument("--force", action="store_true", help="Force merge, ignoring status checks.") args = parser.parse_args(argv) - host, org, repo = resolve_remote(args) - auth = get_auth_header(host) - if not auth: - print(f"Could not get credentials for {host}.", file=sys.stderr) - return 1 - - url = f"{repo_api_url(host, org, repo)}/pulls/{args.pr_number}/merge" - payload = { - "Do": args.do, - "force_merge": args.force, - } - if args.title: - payload["MergeTitleField"] = args.title - if args.message: - payload["MergeMessageField"] = args.message - - try: - api_request("POST", url, auth, payload) - print(f"Successfully merged PR #{args.pr_number} using '{args.do}' method.") - return 0 - except Exception as e: - print(f"Error merging PR #{args.pr_number}: {e}", file=sys.stderr) - return 1 + # Fail closed: direct CLI merge is disabled (#16). No credentials are read + # and no merge API call is made. Merge is only available through the gated + # `gitea_merge_pr` MCP workflow, which enforces identity/profile/eligibility, + # explicit confirmation, expected head SHA checking, and self-merge + # protection. + print( + f"Direct CLI merge is disabled (#16). PR #{args.pr_number} was NOT " + "merged. Merge is only available through the gated workflow (MCP tool " + "'gitea_merge_pr'), which enforces identity/profile/eligibility, " + "explicit confirmation, expected head SHA checking, and self-merge " + "protection.", + file=sys.stderr, + ) + return 2 if __name__ == "__main__": diff --git a/review_pr.py b/review_pr.py index 374b4e8..464d348 100755 --- a/review_pr.py +++ b/review_pr.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 """Review and sign-off on a Gitea pull request. -Supports submitting a review (APPROVE, COMMENT, REQUEST_CHANGES) and optionally -merging the pull request in one command. +Submits a review (APPROVE, COMMENT, REQUEST_CHANGES). CLI merge is disabled: +the `--merge` flag is retained only for compatibility and fails closed without +making any API call. Merge is handled solely by the gated `gitea_merge_pr` MCP +workflow (#16), which enforces identity/profile/eligibility, explicit +confirmation, expected head SHA checking, and self-merge protection. -Usage: - review_pr.py --pr-number 12 --event APPROVE --body "Approved and signed off" --merge +Usage (review only): + review_pr.py --pr-number 12 --event APPROVE --body "Approved and signed off" """ import os import sys @@ -32,11 +35,29 @@ def main(argv=None): help="Review event/action type (default: APPROVE).") parser.add_argument("--body", default="", help="Review body/comment text.") parser.add_argument("--body-file", help="Read review body from this file ('-' for stdin).") - parser.add_argument("--merge", action="store_true", help="Automatically merge the PR if approved.") + parser.add_argument("--merge", action="store_true", + help="DISABLED — fails closed with no API call. CLI merge is not " + "supported; use the gated gitea_merge_pr MCP workflow (#16).") parser.add_argument("--merge-method", choices=["merge", "squash", "rebase"], default="merge", - help="Merge method/style to use if merging (default: merge).") + help="Ignored — CLI merge is disabled (see --merge).") args = parser.parse_args(argv) + # Fail closed: direct CLI merge is disabled (#16). LLM automations were + # using this flag as an ungated merge bypass. Merge is only available via + # the gated `gitea_merge_pr` MCP workflow, which enforces + # identity/profile/eligibility, explicit confirmation, expected head SHA, + # and self-merge protection. No API call is made here. + if args.merge: + print( + "Direct CLI merge is disabled. Merge is only available through the " + "gated #16 workflow (MCP tool 'gitea_merge_pr'), which enforces " + "identity/profile/eligibility, explicit confirmation, expected head " + "SHA checking, and self-merge protection. Re-run without --merge to " + "submit a review only.", + file=sys.stderr, + ) + return 2 + host, org, repo = resolve_remote(args) body = args.body @@ -80,25 +101,8 @@ def main(argv=None): print(f"Error submitting review: {e}", file=sys.stderr) return 1 - # 3. Merge PR if --merge is requested and event is APPROVE - if args.merge: - if args.event != "APPROVE": - print("Warning: Skipping merge because review event is not 'APPROVE'.", file=sys.stderr) - return 0 - - merge_url = f"{repo_api_url(host, org, repo)}/pulls/{args.pr_number}/merge" - merge_payload = { - "Do": args.merge_method, - "force_merge": False - } - - try: - api_request("POST", merge_url, auth, merge_payload) - print(f"Successfully merged PR #{args.pr_number} using '{args.merge_method}' method.") - except Exception as e: - print(f"Error merging PR: {e}", file=sys.stderr) - return 1 - + # Merge is intentionally not performed here — see the fail-closed guard + # above. Use the gated `gitea_merge_pr` MCP workflow (#16) to merge. return 0 diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 3f86942..b62c2d3 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -298,20 +298,320 @@ class TestViewPR(unittest.TestCase): # Merge PR # --------------------------------------------------------------------------- class TestMergePR(unittest.TestCase): + """Gated merge workflow (#16). gitea_merge_pr is the only merge path.""" + + def _pr(self, author, state="open", sha="abc123", mergeable=True): + return { + "user": {"login": author}, + "state": state, + "head": {"sha": sha}, + "mergeable": mergeable, + } + + def _confirm(self, n): + return f"MERGE PR {n}" + + def _assert_no_merge_call(self, mock_api): + for c in mock_api.call_args_list: + method, url = c.args[0], c.args[1] + self.assertFalse( + method == "POST" and url.endswith("/merge"), + f"unexpected merge mutation: {method} {url}", + ) + + # -- success -------------------------------------------------------------- @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) - def test_merge_pr(self, _auth, mock_api): - mock_api.return_value = {} - result = gitea_merge_pr(pr_number=1, do="squash", title="T", message="M", force=True) - self.assertTrue(result["success"]) - self.assertIn("merged", result["message"]) - # Check payload - payload = mock_api.call_args[0][3] + def test_merge_succeeds_when_all_gates_pass(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot"), + {}, # merge POST + {"merged_commit_sha": "mergecommit99"}, # read-back + ] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), + expected_head_sha="abc123", do="squash", + title="T", message="M", remote="prgs") + self.assertTrue(r["performed"]) + self.assertEqual(r["authenticated_user"], "merger-bot") + self.assertEqual(r["pr_author"], "author-bot") + self.assertEqual(r["head_sha"], "abc123") + self.assertEqual(r["merge_method"], "squash") + self.assertEqual(r["merge_commit"], "mergecommit99") + # 3rd call is the merge POST with the requested method/title/message. + merge_call = mock_api.call_args_list[2] + self.assertEqual(merge_call.args[0], "POST") + self.assertTrue(merge_call.args[1].endswith("/pulls/8/merge")) + payload = merge_call.args[3] self.assertEqual(payload["Do"], "squash") self.assertEqual(payload["MergeTitleField"], "T") self.assertEqual(payload["MergeMessageField"], "M") - self.assertEqual(payload["force_merge"], True) + self.assertNotIn("force_merge", payload) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_expected_changed_files_match_allows(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot"), + [{"filename": "a.py"}, {"filename": "b.py"}], # files + {}, # merge POST + {"merged_commit_sha": "c1"}, # read-back + ] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), + expected_changed_files=["b.py", "a.py"], remote="prgs") + self.assertTrue(r["performed"]) + + # -- confirmation --------------------------------------------------------- + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_missing_confirmation_blocks_with_no_api_call(self, _auth, mock_api): + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr(pr_number=8, confirmation="", remote="prgs") + self.assertFalse(r["performed"]) + self.assertTrue(any("explicit confirmation required" in x for x in r["reasons"])) + mock_api.assert_not_called() + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_wrong_confirmation_blocks(self, _auth, mock_api): + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 9", remote="prgs") + self.assertFalse(r["performed"]) + mock_api.assert_not_called() + + # -- identity / profile / eligibility fail-closed ------------------------- + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_self_author_cannot_merge(self, _auth, mock_api): + mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("authenticated user is PR author", r["reasons"]) + self._assert_no_merge_call(mock_api) + + @patch("mcp_server.get_auth_header", return_value=None) + def test_unknown_identity_blocks(self, _auth): + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIsNone(r["authenticated_user"]) + self.assertIn("authenticated identity could not be determined", r["reasons"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_unknown_profile_blocks(self, _auth, mock_api): + mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")] + env = {} # no GITEA_ALLOWED_OPERATIONS → empty allowed ops + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn( + "profile has no configured allowed operations (fail closed)", + r["reasons"]) + self._assert_no_merge_call(mock_api) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_profile_without_merge_permission_blocks(self, _auth, mock_api): + mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("profile is not allowed to merge", r["reasons"]) + self._assert_no_merge_call(mock_api) + + # -- PR state / mergeability ---------------------------------------------- + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_closed_pr_blocks(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot", state="closed")] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("PR is not open (state=closed)", r["reasons"]) + self._assert_no_merge_call(mock_api) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_non_mergeable_pr_blocks(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot", mergeable=False)] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("PR is not mergeable", r["reasons"]) + self._assert_no_merge_call(mock_api) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_unknown_mergeability_fails_closed(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot", mergeable=None)] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn("PR mergeability unknown", r["reasons"]) + self._assert_no_merge_call(mock_api) + + # -- head SHA / changed files --------------------------------------------- + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_head_sha_mismatch_blocks(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot", sha="abc123")] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), + expected_head_sha="deadbeef", remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn( + "expected head SHA does not match current PR head (fail closed)", + r["reasons"]) + self._assert_no_merge_call(mock_api) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_changed_files_mismatch_blocks(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot"), + [{"filename": "a.py"}, {"filename": "c.py"}], # actual files + ] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), + expected_changed_files=["a.py", "b.py"], remote="prgs") + self.assertFalse(r["performed"]) + self.assertIn( + "PR changed files do not match expected_changed_files (fail closed)", + r["reasons"]) + self._assert_no_merge_call(mock_api) + + # -- misc ----------------------------------------------------------------- + + @patch("mcp_server.api_request") + def test_invalid_merge_method_rejected(self, mock_api): + with patch.dict(os.environ, {}, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation="MERGE PR 8", do="octopus", remote="prgs") + self.assertFalse(r["performed"]) + self.assertTrue(any("unknown merge method" in x for x in r["reasons"])) + mock_api.assert_not_called() + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_output_redacts_secrets(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot"), + {}, {"merged_commit_sha": "c1"}, + ] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge", + "GITEA_TOKEN": "super-secret-token"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + blob = repr(r).lower() + for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()): + self.assertNotIn(secret, blob) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_merge_error_message_redacts_credential(self, _auth, mock_api): + mock_api.side_effect = [ + {"login": "merger-bot"}, self._pr("author-bot"), + RuntimeError("HTTP 500: token abc-secret-xyz rejected"), + ] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "read,merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_merge_pr( + pr_number=8, confirmation=self._confirm(8), remote="prgs") + self.assertFalse(r["performed"]) + blob = repr(r) + self.assertIn("[REDACTED]", blob) + self.assertNotIn("abc-secret-xyz", blob) + + +class TestNoUngatedMergePath(unittest.TestCase): + """Prove no other exposed tool can merge (#16 surface audit).""" + + def test_submit_pr_review_has_no_merge(self): + import inspect + import mcp_server + # No merge parameter on the review-mutation tool. + params = inspect.signature(mcp_server.gitea_submit_pr_review).parameters + self.assertNotIn("merge", params) + # And 'merge' is not a reviewable action. + self.assertNotIn("merge", mcp_server._REVIEW_ACTIONS) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_review_pr_merge_true_makes_no_api_call(self, _auth, mock_api): + r = gitea_review_pr(pr_number=1, event="APPROVE", body="x", merge=True) + self.assertFalse(r["success"]) + self.assertEqual(mock_api.call_count, 0) + + def test_only_merge_endpoint_is_in_gated_tool(self): + # Source-level audit: the merge endpoint appears only inside + # gitea_merge_pr, which is the gated path. + import inspect + import mcp_server + src = inspect.getsource(mcp_server) + self.assertEqual(src.count("/merge\""), 1) + merge_src = inspect.getsource(mcp_server.gitea_merge_pr) + self.assertIn("/merge\"", merge_src) + + def test_readme_no_longer_advertises_ungated_cli_merge(self): + import pathlib + readme = (pathlib.Path(__file__).resolve().parent.parent + / "README.md").read_text(encoding="utf-8") + # The old ungated example command must be gone. + self.assertNotIn('--body "Approved" --merge', readme) + # And the gated messaging / audit deferral must be present. + self.assertIn("disabled", readme.lower()) + self.assertIn("gitea_merge_pr", readme) + self.assertIn("#18", readme) # --------------------------------------------------------------------------- diff --git a/tests/test_merge_pr.py b/tests/test_merge_pr.py index 66a7ca9..4dca610 100644 --- a/tests/test_merge_pr.py +++ b/tests/test_merge_pr.py @@ -15,49 +15,48 @@ FAKE_CREDS = "Basic dGVzdHVzZXI6dGVzdHBhc3M=" class TestArgParsing(unittest.TestCase): - @patch("merge_pr.api_request") - @patch("merge_pr.get_auth_header", return_value=FAKE_CREDS) - def test_minimal_required_args(self, _auth, mock_api): - rc = merge_pr.main(["--pr-number", "81"]) - self.assertEqual(rc, 0) - mock_api.assert_called_once() - def test_missing_pr_number_exits(self): with self.assertRaises(SystemExit): merge_pr.main([]) -class TestAPIPayload(unittest.TestCase): +class TestMergeDisabled(unittest.TestCase): + """Direct CLI merge is disabled (#16) — fails closed, no API call.""" @patch("merge_pr.api_request") @patch("merge_pr.get_auth_header", return_value=FAKE_CREDS) - def test_payload_fields(self, _auth, mock_api): + def test_merge_fails_closed_without_api_call(self, _auth, mock_api): + rc = merge_pr.main(["--pr-number", "81"]) + self.assertEqual(rc, 2) + mock_api.assert_not_called() + + @patch("merge_pr.api_request") + @patch("merge_pr.get_auth_header", return_value=FAKE_CREDS) + def test_no_merge_even_with_force_and_method(self, _auth, mock_api): rc = merge_pr.main([ "--pr-number", "81", "--do", "squash", "--title", "Squash title", "--message", "Squash message", "--force", + "--remote", "prgs", ]) - self.assertEqual(rc, 0) - mock_api.assert_called_once() - method, url, auth, payload = mock_api.call_args[0] - self.assertEqual(method, "POST") - self.assertEqual(auth, FAKE_CREDS) - self.assertEqual(payload["Do"], "squash") - self.assertEqual(payload["MergeTitleField"], "Squash title") - self.assertEqual(payload["MergeMessageField"], "Squash message") - self.assertEqual(payload["force_merge"], True) + self.assertEqual(rc, 2) + mock_api.assert_not_called() - @patch("merge_pr.api_request") - @patch("merge_pr.get_auth_header", return_value=FAKE_CREDS) - def test_url_construction(self, _auth, mock_api): - merge_pr.main(["--pr-number", "81", "--remote", "prgs"]) - url = mock_api.call_args[0][1] - self.assertEqual( - url, - "https://gitea.prgs.cc/api/v1/repos/Scaled-Tech-Consulting/Timesheet/pulls/81/merge" - ) + def test_message_points_to_gated_workflow(self): + import io + import contextlib + with patch("merge_pr.get_auth_header", return_value=FAKE_CREDS), \ + patch("merge_pr.api_request") as mock_api: + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = merge_pr.main(["--pr-number", "81"]) + self.assertEqual(rc, 2) + mock_api.assert_not_called() + msg = buf.getvalue().lower() + self.assertIn("disabled", msg) + self.assertIn("gitea_merge_pr", msg) if __name__ == "__main__": diff --git a/tests/test_review_pr.py b/tests/test_review_pr.py index 0d0ac7f..119bb95 100644 --- a/tests/test_review_pr.py +++ b/tests/test_review_pr.py @@ -65,27 +65,34 @@ class TestAPIPayload(unittest.TestCase): @patch("review_pr.api_request") @patch("review_pr.get_auth_header", return_value=FAKE_CREDS) - def test_approve_and_merge_workflow(self, _auth, mock_api): - # Setup mock api_request to return PR details, review response, and merge response - mock_api.side_effect = [FAKE_PR_DATA, {}, {}] - + def test_merge_flag_fails_closed_without_api_call(self, _auth, mock_api): + # --merge is an ungated bypass and is disabled (#16). It must fail + # closed BEFORE any API call — no review, no merge. rc = review_pr.main([ "--pr-number", "81", "--event", "APPROVE", "--body", "Approved", "--merge", - "--merge-method", "squash" + "--merge-method", "squash", ]) - self.assertEqual(rc, 0) - self.assertEqual(mock_api.call_count, 3) + self.assertEqual(rc, 2) + self.assertEqual(mock_api.call_count, 0) - # Verify third call: POST merge - third_call_args = mock_api.call_args_list[2] - self.assertEqual(third_call_args[0][0], "POST") - self.assertEqual(third_call_args[0][1], "https://gitea.dadeschools.net/api/v1/repos/Contractor/Timesheet/pulls/81/merge") - payload = third_call_args[0][3] - self.assertEqual(payload["Do"], "squash") - self.assertEqual(payload["force_merge"], False) + def test_merge_flag_message_points_to_gated_workflow(self): + import io + import contextlib + with patch("review_pr.get_auth_header", return_value=FAKE_CREDS), \ + patch("review_pr.api_request") as mock_api: + buf = io.StringIO() + with contextlib.redirect_stderr(buf): + rc = review_pr.main([ + "--pr-number", "81", "--event", "APPROVE", "--merge", + ]) + self.assertEqual(rc, 2) + self.assertEqual(mock_api.call_count, 0) + msg = buf.getvalue().lower() + self.assertIn("disabled", msg) + self.assertIn("gitea_merge_pr", msg) if __name__ == "__main__":