Files
Gitea-Tools/tests/test_mcp_server.py
sysadmin 7bcdd44fe5 fix: validate gitea_edit_pr no-fields before authentication (#43)
gitea_edit_pr called _auth() (and resolved the remote) before checking whether
any editable field was provided, so a pure validation error (no fields) surfaced
as a RuntimeError "no credentials" in environments without Gitea auth — making
test_edit_pr_no_fields_raises depend on credentials/network/env.

Move the payload build + no-fields ValueError ahead of _resolve/_auth/URL setup.
Behavior is unchanged when fields are provided (same _resolve → _auth → audited
PATCH path). No change to auth, retry/backoff, audit, config profiles, or
worktree helpers.

Tests: add test_edit_pr_no_fields_validates_before_auth asserting the no-fields
path raises ValueError and calls neither get_auth_header nor api_request (even
with auth mocked to None). Existing edit-PR tests unchanged.

Full suite passes with no Gitea credentials (287 passed, 0 failures) — the
no-fields test no longer depends on the environment.

Closes #43.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-07-02 03:08:39 -04:00

1355 lines
60 KiB
Python

"""Tests for the MCP server tool functions.
Each tool is tested by calling the underlying function directly (not through
the MCP protocol) with mocked API responses.
"""
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
from mcp_server import ( # noqa: E402
gitea_create_issue,
gitea_create_pr,
gitea_close_issue,
gitea_list_issues,
gitea_view_issue,
gitea_mark_issue,
gitea_mirror_refs,
gitea_list_prs,
gitea_view_pr,
gitea_merge_pr,
gitea_review_pr,
gitea_delete_branch,
gitea_edit_pr,
gitea_get_file,
gitea_commit_files,
gitea_whoami,
gitea_get_profile,
gitea_check_pr_eligibility,
gitea_submit_pr_review,
)
from gitea_auth import get_profile # noqa: E402
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
# ---------------------------------------------------------------------------
# Create Issue
# ---------------------------------------------------------------------------
class TestCreateIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_issue(self, _auth, mock_api):
mock_api.return_value = {"number": 1, "html_url": "https://gitea.example.com/issues/1"}
result = gitea_create_issue(title="Test issue", body="body text")
self.assertEqual(result["number"], 1)
self.assertIn("issues/1", result["url"])
mock_api.assert_called_once()
# Verify payload
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "POST")
self.assertEqual(call_args[0][3]["title"], "Test issue")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_on_prgs(self, _auth, mock_api):
mock_api.return_value = {"number": 5, "html_url": "https://gitea.prgs.cc/issues/5"}
result = gitea_create_issue(title="Test", remote="prgs")
self.assertEqual(result["number"], 5)
url = mock_api.call_args[0][1]
self.assertIn("gitea.prgs.cc", url)
self.assertIn("Scaled-Tech-Consulting", url)
# ---------------------------------------------------------------------------
# Create PR
# ---------------------------------------------------------------------------
class TestCreatePR(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_pr(self, _auth, mock_api):
mock_api.return_value = {"number": 3, "html_url": "https://example.com/pulls/3"}
result = gitea_create_pr(title="feat: X", head="feat/x", base="main")
self.assertEqual(result["number"], 3)
payload = mock_api.call_args[0][3]
self.assertEqual(payload["head"], "feat/x")
self.assertEqual(payload["base"], "main")
# ---------------------------------------------------------------------------
# Close Issue
# ---------------------------------------------------------------------------
class TestCloseIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_closes_issue(self, _auth, mock_api):
mock_api.return_value = {"state": "closed"}
result = gitea_close_issue(issue_number=42)
self.assertTrue(result["success"])
self.assertIn("42", result["message"])
payload = mock_api.call_args[0][3]
self.assertEqual(payload["state"], "closed")
# ---------------------------------------------------------------------------
# List Issues
# ---------------------------------------------------------------------------
class TestListIssues(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_returns_formatted_list(self, _auth, mock_api):
mock_api.return_value = [
{
"number": 1, "title": "Bug", "state": "open",
"labels": [{"name": "bug"}],
"assignee": {"login": "alice"},
},
{
"number": 2, "title": "Feature", "state": "open",
"labels": [], "assignee": None,
},
]
result = gitea_list_issues()
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["number"], 1)
self.assertEqual(result[0]["labels"], ["bug"])
self.assertEqual(result[0]["assignee"], "alice")
self.assertEqual(result[1]["assignee"], "")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_passes_label_filter(self, _auth, mock_api):
mock_api.return_value = []
gitea_list_issues(label="important")
url = mock_api.call_args[0][1]
self.assertIn("labels=important", url)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_passes_state_filter(self, _auth, mock_api):
mock_api.return_value = []
gitea_list_issues(state="closed")
url = mock_api.call_args[0][1]
self.assertIn("state=closed", url)
# ---------------------------------------------------------------------------
# View Issue
# ---------------------------------------------------------------------------
class TestViewIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_returns_full_details(self, _auth, mock_api):
mock_api.return_value = {
"number": 7, "title": "MCP server", "body": "Build it",
"state": "open", "labels": [{"name": "important"}],
"assignee": {"login": "jason"},
"html_url": "https://gitea.prgs.cc/issues/7",
}
result = gitea_view_issue(issue_number=7, remote="prgs")
self.assertEqual(result["number"], 7)
self.assertEqual(result["body"], "Build it")
self.assertEqual(result["labels"], ["important"])
self.assertEqual(result["assignee"], "jason")
# ---------------------------------------------------------------------------
# Mark Issue
# ---------------------------------------------------------------------------
class TestMarkIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_start_adds_label(self, _auth, mock_api):
# First call: get labels; second call: add label
mock_api.side_effect = [
[{"id": 10, "name": "status:in-progress"}],
[{"name": "status:in-progress"}],
]
result = gitea_mark_issue(issue_number=5, action="start")
self.assertTrue(result["success"])
self.assertIn("claimed", result["message"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_done_removes_label(self, _auth, mock_api):
mock_api.side_effect = [
[{"id": 10, "name": "status:in-progress"}],
None,
]
result = gitea_mark_issue(issue_number=5, action="done")
self.assertTrue(result["success"])
self.assertIn("released", result["message"])
def test_invalid_action_raises(self):
with self.assertRaises(ValueError):
gitea_mark_issue(issue_number=5, action="pause")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_missing_label_raises(self, _auth, mock_api):
mock_api.return_value = [] # no labels exist
with self.assertRaises(RuntimeError):
gitea_mark_issue(issue_number=5, action="start")
# ---------------------------------------------------------------------------
# Auth errors
# ---------------------------------------------------------------------------
class TestAuthErrors(unittest.TestCase):
@patch("mcp_server.get_auth_header", return_value=None)
def test_no_credentials_raises(self, _auth):
with self.assertRaises(RuntimeError):
gitea_create_issue(title="test")
def test_unknown_remote_raises(self):
with self.assertRaises(ValueError):
gitea_create_issue(title="test", remote="nonexistent")
# ---------------------------------------------------------------------------
# Mirror Refs
# ---------------------------------------------------------------------------
class TestMirrorRefs(unittest.TestCase):
@patch("mcp_server.subprocess.run")
def test_dry_run_default(self, mock_run):
mock_run.return_value = MagicMock(
stdout="DRY RUN\n", stderr="", returncode=0
)
result = gitea_mirror_refs()
self.assertEqual(result["return_code"], 0)
# Should NOT have --apply
args = mock_run.call_args[0][0]
self.assertNotIn("--apply", args)
self.assertNotIn("--force", args)
@patch("mcp_server.subprocess.run")
def test_apply_flag(self, mock_run):
mock_run.return_value = MagicMock(
stdout="done\n", stderr="", returncode=0
)
result = gitea_mirror_refs(apply=True)
args = mock_run.call_args[0][0]
self.assertIn("--apply", args)
@patch("mcp_server.subprocess.run")
def test_force_flag(self, mock_run):
mock_run.return_value = MagicMock(
stdout="", stderr="", returncode=0
)
gitea_mirror_refs(apply=True, force=True)
args = mock_run.call_args[0][0]
self.assertIn("--apply", args)
self.assertIn("--force", args)
# ---------------------------------------------------------------------------
# List PRs
# ---------------------------------------------------------------------------
class TestListPRs(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_prs(self, _auth, mock_api):
mock_api.return_value = [
{
"number": 1, "title": "PR 1", "state": "open",
"head": {"ref": "branch1"}, "base": {"ref": "main"},
"html_url": "http://url1", "mergeable": True
}
]
result = gitea_list_prs()
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["number"], 1)
self.assertEqual(result[0]["head"], "branch1")
# ---------------------------------------------------------------------------
# View PR
# ---------------------------------------------------------------------------
class TestViewPR(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_view_pr(self, _auth, mock_api):
mock_api.return_value = {
"number": 1, "title": "PR 1", "state": "open",
"head": {"ref": "branch1"}, "base": {"ref": "main"},
"html_url": "http://url1", "mergeable": True, "body": "description",
"user": {"login": "user1"}
}
result = gitea_view_pr(pr_number=1)
self.assertEqual(result["number"], 1)
self.assertEqual(result["body"], "description")
self.assertEqual(result["user"], "user1")
# ---------------------------------------------------------------------------
# Merge PR
# ---------------------------------------------------------------------------
class TestMergePR(unittest.TestCase):
"""Gated merge workflow (#16). gitea_merge_pr is the only merge path."""
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {
"user": {"login": author},
"state": state,
"head": {"sha": sha},
"mergeable": mergeable,
}
def _confirm(self, n):
return f"MERGE PR {n}"
def _assert_no_merge_call(self, mock_api):
for c in mock_api.call_args_list:
method, url = c.args[0], c.args[1]
self.assertFalse(
method == "POST" and url.endswith("/merge"),
f"unexpected merge mutation: {method} {url}",
)
# -- success --------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_succeeds_when_all_gates_pass(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
{}, # merge POST
{"merged_commit_sha": "mergecommit99"}, # read-back
]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8),
expected_head_sha="abc123", do="squash",
title="T", message="M", remote="prgs")
self.assertTrue(r["performed"])
self.assertEqual(r["authenticated_user"], "merger-bot")
self.assertEqual(r["pr_author"], "author-bot")
self.assertEqual(r["head_sha"], "abc123")
self.assertEqual(r["merge_method"], "squash")
self.assertEqual(r["merge_commit"], "mergecommit99")
# 3rd call is the merge POST with the requested method/title/message.
merge_call = mock_api.call_args_list[2]
self.assertEqual(merge_call.args[0], "POST")
self.assertTrue(merge_call.args[1].endswith("/pulls/8/merge"))
payload = merge_call.args[3]
self.assertEqual(payload["Do"], "squash")
self.assertEqual(payload["MergeTitleField"], "T")
self.assertEqual(payload["MergeMessageField"], "M")
self.assertNotIn("force_merge", payload)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_expected_changed_files_match_allows(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
[{"filename": "a.py"}, {"filename": "b.py"}], # files
{}, # merge POST
{"merged_commit_sha": "c1"}, # read-back
]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8),
expected_changed_files=["b.py", "a.py"], remote="prgs")
self.assertTrue(r["performed"])
# -- confirmation ---------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_missing_confirmation_blocks_with_no_api_call(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(pr_number=8, confirmation="", remote="prgs")
self.assertFalse(r["performed"])
self.assertTrue(any("explicit confirmation required" in x for x in r["reasons"]))
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_wrong_confirmation_blocks(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 9", remote="prgs")
self.assertFalse(r["performed"])
mock_api.assert_not_called()
# -- identity / profile / eligibility fail-closed -------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_self_author_cannot_merge(self, _auth, mock_api):
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("authenticated user is PR author", r["reasons"])
self._assert_no_merge_call(mock_api)
@patch("mcp_server.get_auth_header", return_value=None)
def test_unknown_identity_blocks(self, _auth):
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIsNone(r["authenticated_user"])
self.assertIn("authenticated identity could not be determined", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_unknown_profile_blocks(self, _auth, mock_api):
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {} # no GITEA_ALLOWED_OPERATIONS → empty allowed ops
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIn(
"profile has no configured allowed operations (fail closed)",
r["reasons"])
self._assert_no_merge_call(mock_api)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_profile_without_merge_permission_blocks(self, _auth, mock_api):
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("profile is not allowed to merge", r["reasons"])
self._assert_no_merge_call(mock_api)
# -- PR state / mergeability ----------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_closed_pr_blocks(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot", state="closed")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("PR is not open (state=closed)", r["reasons"])
self._assert_no_merge_call(mock_api)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_non_mergeable_pr_blocks(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot", mergeable=False)]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("PR is not mergeable", r["reasons"])
self._assert_no_merge_call(mock_api)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_unknown_mergeability_fails_closed(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot", mergeable=None)]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("PR mergeability unknown", r["reasons"])
self._assert_no_merge_call(mock_api)
# -- head SHA / changed files ---------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_head_sha_mismatch_blocks(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot", sha="abc123")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8),
expected_head_sha="deadbeef", remote="prgs")
self.assertFalse(r["performed"])
self.assertIn(
"expected head SHA does not match current PR head (fail closed)",
r["reasons"])
self._assert_no_merge_call(mock_api)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_changed_files_mismatch_blocks(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
[{"filename": "a.py"}, {"filename": "c.py"}], # actual files
]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8),
expected_changed_files=["a.py", "b.py"], remote="prgs")
self.assertFalse(r["performed"])
self.assertIn(
"PR changed files do not match expected_changed_files (fail closed)",
r["reasons"])
self._assert_no_merge_call(mock_api)
# -- misc -----------------------------------------------------------------
@patch("mcp_server.api_request")
def test_invalid_merge_method_rejected(self, mock_api):
with patch.dict(os.environ, {}, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation="MERGE PR 8", do="octopus", remote="prgs")
self.assertFalse(r["performed"])
self.assertTrue(any("unknown merge method" in x for x in r["reasons"]))
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_output_redacts_secrets(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
{}, {"merged_commit_sha": "c1"},
]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge",
"GITEA_TOKEN": "super-secret-token"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
blob = repr(r).lower()
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_error_message_redacts_credential(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "merger-bot"}, self._pr("author-bot"),
RuntimeError("HTTP 500: token abc-secret-xyz rejected"),
]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_merge_pr(
pr_number=8, confirmation=self._confirm(8), remote="prgs")
self.assertFalse(r["performed"])
blob = repr(r)
self.assertIn("[REDACTED]", blob)
self.assertNotIn("abc-secret-xyz", blob)
class TestNoUngatedMergePath(unittest.TestCase):
"""Prove no other exposed tool can merge (#16 surface audit)."""
def test_submit_pr_review_has_no_merge(self):
import inspect
import mcp_server
# No merge parameter on the review-mutation tool.
params = inspect.signature(mcp_server.gitea_submit_pr_review).parameters
self.assertNotIn("merge", params)
# And 'merge' is not a reviewable action.
self.assertNotIn("merge", mcp_server._REVIEW_ACTIONS)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_review_pr_merge_true_makes_no_api_call(self, _auth, mock_api):
r = gitea_review_pr(pr_number=1, event="APPROVE", body="x", merge=True)
self.assertFalse(r["success"])
self.assertEqual(mock_api.call_count, 0)
def test_only_merge_endpoint_is_in_gated_tool(self):
# Source-level audit: the merge endpoint appears only inside
# gitea_merge_pr, which is the gated path.
import inspect
import mcp_server
src = inspect.getsource(mcp_server)
self.assertEqual(src.count("/merge\""), 1)
merge_src = inspect.getsource(mcp_server.gitea_merge_pr)
self.assertIn("/merge\"", merge_src)
def test_readme_no_longer_advertises_ungated_cli_merge(self):
import pathlib
readme = (pathlib.Path(__file__).resolve().parent.parent
/ "README.md").read_text(encoding="utf-8")
# The old ungated example command must be gone.
self.assertNotIn('--body "Approved" --merge', readme)
# And the gated messaging / audit deferral must be present.
self.assertIn("disabled", readme.lower())
self.assertIn("gitea_merge_pr", readme)
self.assertIn("#18", readme)
# ---------------------------------------------------------------------------
# Review PR
# ---------------------------------------------------------------------------
class TestReviewPR(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_legacy_review_pr_merge_fails_closed(self, _auth, mock_api):
result = gitea_review_pr(
pr_number=1,
event="APPROVE",
body="Looks good",
merge=True
)
self.assertFalse(result["success"])
self.assertIn("no longer supported", result["message"])
self.assertEqual(mock_api.call_count, 0)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
@patch("mcp_server.get_profile")
def test_legacy_review_pr_uses_gates(self, mock_get_profile, _auth, mock_api):
# Mock profile to lack approve capability (fails gate)
mock_get_profile.return_value = {
"profile_name": "gitea-readonly",
"allowed_operations": ["read"],
"forbidden_operations": [],
"base_url": None,
}
# mock_api responses for auth_user and pr_author
mock_api.side_effect = [
{"login": "reviewer1"}, # /api/v1/user
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "author1"}}, # /pulls/1
]
result = gitea_review_pr(
pr_number=1,
event="APPROVE",
body="Looks good",
merge=False
)
self.assertFalse(result["success"])
self.assertIn("Review submission failed eligibility gates", result["message"])
self.assertIn("not allowed to approve", result["message"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
@patch("mcp_server.get_profile")
def test_legacy_review_pr_self_approval_blocked(self, mock_get_profile, _auth, mock_api):
mock_get_profile.return_value = {
"profile_name": "gitea-reviewer",
"allowed_operations": ["read", "approve"],
"forbidden_operations": [],
"base_url": None,
}
# mock_api responses for auth_user and pr_author
mock_api.side_effect = [
{"login": "jcwalker3"}, # /api/v1/user
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "jcwalker3"}}, # /pulls/1
]
result = gitea_review_pr(
pr_number=1,
event="APPROVE",
body="Self approve",
merge=False
)
self.assertFalse(result["success"])
self.assertIn("Review submission failed eligibility gates", result["message"])
self.assertIn("authenticated user is PR author", result["message"])
# ---------------------------------------------------------------------------
# Delete Branch
# ---------------------------------------------------------------------------
class TestDeleteBranch(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_delete_branch(self, _auth, mock_api):
mock_api.return_value = {}
result = gitea_delete_branch(branch="feat/branch")
self.assertTrue(result["success"])
self.assertIn("deleted", result["message"])
# Check url encoding of branch name
url = mock_api.call_args[0][1]
self.assertIn("feat%2Fbranch", url)
# ---------------------------------------------------------------------------
# Edit PR
# ---------------------------------------------------------------------------
class TestEditPR(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_edit_pr_success(self, _auth, mock_api):
mock_api.return_value = {
"number": 1,
"title": "New Title",
"body": "New description",
"state": "open",
"html_url": "https://gitea.example.com/pulls/1"
}
result = gitea_edit_pr(
pr_number=1,
title="New Title",
body="New description",
state="open"
)
self.assertTrue(result["success"])
self.assertEqual(result["title"], "New Title")
self.assertEqual(result["body"], "New description")
# Verify PATCH and payload
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "PATCH")
self.assertEqual(call_args[0][3]["title"], "New Title")
self.assertEqual(call_args[0][3]["body"], "New description")
self.assertEqual(call_args[0][3]["state"], "open")
def test_edit_pr_no_fields_raises(self):
with self.assertRaises(ValueError):
gitea_edit_pr(pr_number=1)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header")
def test_edit_pr_no_fields_validates_before_auth(self, mock_auth, mock_api):
# No-fields validation must not depend on credentials/network: it raises
# ValueError before touching auth or the API, even with no creds.
mock_auth.return_value = None # simulate an unauthenticated environment
with self.assertRaises(ValueError):
gitea_edit_pr(pr_number=1)
mock_auth.assert_not_called()
mock_api.assert_not_called()
# ---------------------------------------------------------------------------
# Get File
# ---------------------------------------------------------------------------
class TestGetFile(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_get_file_success(self, _auth, mock_api):
mock_api.return_value = {
"name": "README.md",
"path": "README.md",
"sha": "3a0b123",
"size": 100,
"encoding": "base64",
"content": "SGVsbG8gV29ybGQ="
}
result = gitea_get_file(filepath="README.md", ref="main")
self.assertEqual(result["name"], "README.md")
self.assertEqual(result["sha"], "3a0b123")
self.assertEqual(result["content"], "SGVsbG8gV29ybGQ=")
# Verify endpoint and GET method
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "GET")
self.assertIn("contents/README.md", call_args[0][1])
self.assertIn("ref=main", call_args[0][1])
# ---------------------------------------------------------------------------
# Commit Files
# ---------------------------------------------------------------------------
class TestCommitFiles(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_commit_files_success(self, _auth, mock_api):
mock_api.return_value = {
"commit": {"sha": "commit-sha-123"},
"branch": {"name": "test-branch"}
}
files = [
{"operation": "create", "path": "test.txt", "content": "SGVsbG8="}
]
result = gitea_commit_files(
files=files,
message="Initial commit",
new_branch="test-branch"
)
self.assertTrue(result["success"])
self.assertEqual(result["commit"], "commit-sha-123")
self.assertEqual(result["branch"], "test-branch")
# Verify POST method and payload
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "POST")
self.assertIn("/contents", call_args[0][1])
payload = call_args[0][3]
self.assertEqual(payload["message"], "Initial commit")
self.assertEqual(payload["new_branch"], "test-branch")
self.assertEqual(payload["files"], files)
# ---------------------------------------------------------------------------
# Whoami (authenticated-user identity lookup)
# ---------------------------------------------------------------------------
class TestWhoami(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_returns_safe_identity(self, _auth, mock_api):
mock_api.return_value = {
"id": 42,
"login": "reviewer-bot",
"full_name": "Reviewer Bot",
"email": "reviewer@example.com",
}
result = gitea_whoami(remote="prgs")
self.assertTrue(result["authenticated"])
self.assertEqual(result["username"], "reviewer-bot")
self.assertEqual(result["display_name"], "Reviewer Bot")
self.assertEqual(result["user_id"], 42)
self.assertEqual(result["server"], "https://gitea.prgs.cc")
self.assertEqual(result["remote"], "prgs")
# Read-only: GET against the authenticated-user endpoint.
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "GET")
self.assertTrue(call_args[0][1].endswith("/api/v1/user"))
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_never_exposes_secrets(self, _auth, mock_api):
mock_api.return_value = {"id": 1, "login": "someone"}
result = gitea_whoami(remote="prgs")
blob = repr(result).lower()
for secret in ("token", "authorization", "basic ", "password", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_fails_closed_without_login(self, _auth, mock_api):
mock_api.return_value = {"id": 1} # no 'login'
with self.assertRaises(RuntimeError):
gitea_whoami(remote="prgs")
def test_rejects_unknown_remote(self):
with self.assertRaises(ValueError):
gitea_whoami(remote="nope")
# ---------------------------------------------------------------------------
# Runtime profile (env-configured profile metadata) — issue #19
# ---------------------------------------------------------------------------
class TestRuntimeProfile(unittest.TestCase):
def test_defaults_when_unset(self):
with patch.dict(os.environ, {}, clear=True):
p = get_profile()
self.assertEqual(p["profile_name"], "gitea-default")
self.assertEqual(p["allowed_operations"], [])
self.assertIsNone(p["base_url"])
def test_reads_env_metadata(self):
env = {
"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read, review , approve",
"GITEA_BASE_URL": "https://gitea.example.invalid",
}
with patch.dict(os.environ, env, clear=True):
p = get_profile()
self.assertEqual(p["profile_name"], "gitea-reviewer")
self.assertEqual(p["allowed_operations"], ["read", "review", "approve"])
self.assertEqual(p["base_url"], "https://gitea.example.invalid")
def test_never_includes_token(self):
env = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_TOKEN": "super-secret-token",
}
with patch.dict(os.environ, env, clear=True):
p = get_profile()
blob = repr(p).lower()
# The token VALUE must never appear. (The field name
# 'token_source_name' is non-secret metadata and may exist.)
self.assertNotIn("super-secret-token", blob)
self.assertIsNone(p.get("token_source_name")) # GITEA_TOKEN_SOURCE unset
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_surfaces_profile_without_token(self, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
env = {
"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
"GITEA_TOKEN": "super-secret-token",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_whoami(remote="prgs")
self.assertEqual(result["profile"]["profile_name"], "gitea-reviewer")
self.assertEqual(
result["profile"]["allowed_operations"], ["read", "review", "approve"]
)
blob = repr(result).lower()
for secret in ("super-secret-token", "token", "authorization", "basic "):
self.assertNotIn(secret, blob)
# ---------------------------------------------------------------------------
# Profile discovery (read-only) — issue #13
# ---------------------------------------------------------------------------
class TestProfileDiscovery(unittest.TestCase):
def test_get_profile_new_fields_default_empty(self):
with patch.dict(os.environ, {}, clear=True):
p = get_profile()
self.assertEqual(p["forbidden_operations"], [])
self.assertIsNone(p["audit_label"])
self.assertIsNone(p["token_source_name"])
def test_get_profile_reads_all_metadata(self):
env = {
"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
"GITEA_FORBIDDEN_OPERATIONS": "merge, branch.push",
"GITEA_AUDIT_LABEL": "reviewer-runtime",
"GITEA_TOKEN_SOURCE": "GITEA_TOKEN",
}
with patch.dict(os.environ, env, clear=True):
p = get_profile()
self.assertEqual(p["forbidden_operations"], ["merge", "branch.push"])
self.assertEqual(p["audit_label"], "reviewer-runtime")
self.assertEqual(p["token_source_name"], "GITEA_TOKEN")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_discovery_verified_identity(self, _auth, mock_api):
mock_api.return_value = {"id": 3, "login": "reviewer-bot"}
env = {
"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
"GITEA_TOKEN_SOURCE": "GITEA_TOKEN",
"GITEA_TOKEN": "super-secret-token",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_get_profile(remote="prgs")
self.assertEqual(result["profile_name"], "gitea-reviewer")
self.assertEqual(result["allowed_operations"], ["read", "review", "approve"])
self.assertEqual(result["authenticated_username"], "reviewer-bot")
self.assertEqual(result["identity_status"], "verified")
self.assertEqual(result["server"], "https://gitea.prgs.cc")
self.assertEqual(result["token_source_name"], "GITEA_TOKEN")
# Read-only: only a GET to the user endpoint was issued.
self.assertEqual(mock_api.call_args[0][0], "GET")
self.assertTrue(mock_api.call_args[0][1].endswith("/api/v1/user"))
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_discovery_never_exposes_secrets(self, _auth, mock_api):
mock_api.return_value = {"id": 3, "login": "reviewer-bot"}
env = {"GITEA_PROFILE_NAME": "gitea-reviewer", "GITEA_TOKEN": "super-secret-token"}
with patch.dict(os.environ, env, clear=True):
result = gitea_get_profile(remote="prgs")
blob = repr(result).lower()
for secret in ("super-secret-token", "token dgvzd", "authorization", "basic ", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
@patch("mcp_server.get_auth_header", return_value=None)
def test_discovery_identity_unavailable_fails_soft(self, _auth):
# No credentials -> _auth raises inside the tool; identity marked
# unavailable but the profile config is still returned (not raised).
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
result = gitea_get_profile(remote="prgs")
self.assertEqual(result["profile_name"], "gitea-author")
self.assertIsNone(result["authenticated_username"])
self.assertEqual(result["identity_status"], "unavailable")
def test_discovery_can_skip_identity(self):
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
result = gitea_get_profile(remote="prgs", resolve_identity=False)
self.assertEqual(result["identity_status"], "not_resolved")
self.assertIsNone(result["authenticated_username"])
def test_discovery_unknown_remote_marks_unknown(self):
with patch.dict(os.environ, {}, clear=True):
result = gitea_get_profile(remote="nope")
self.assertEqual(result["identity_status"], "unknown")
self.assertIsNone(result["remote"])
self.assertIn("remote_error", result)
# ---------------------------------------------------------------------------
# PR eligibility checks (read-only) — issue #14
# ---------------------------------------------------------------------------
class TestPrEligibility(unittest.TestCase):
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {
"user": {"login": author},
"state": state,
"head": {"sha": sha},
"mergeable": mergeable,
}
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_reviewer_eligible_to_review(self, _auth, mock_api):
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs")
self.assertTrue(r["eligible"])
self.assertEqual(r["authenticated_user"], "reviewer-bot")
self.assertEqual(r["pr_author"], "author-bot")
self.assertEqual(r["head_sha"], "abc123")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_self_author_cannot_merge(self, _auth, mock_api):
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("authenticated user is PR author", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_self_author_cannot_approve(self, _auth, mock_api):
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("authenticated user is PR author", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_merge_fails_closed_when_mergeability_unknown(self, _auth, mock_api):
# Gitea reports mergeable as None/null (not yet computed).
mock_api.side_effect = [
{"login": "merger-bot"},
self._pr("author-bot", mergeable=None),
]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIsNone(r["mergeable"])
self.assertIn("PR mergeability unknown", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_profile_not_allowed_to_merge(self, _auth, mock_api):
mock_api.side_effect = [{"login": "author-bot"}, self._pr("someone-else")]
env = {"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("profile is not allowed to merge", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_forbidden_operation_blocks(self, _auth, mock_api):
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review",
"GITEA_FORBIDDEN_OPERATIONS": "merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("profile forbids 'merge'", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_closed_pr_not_eligible(self, _auth, mock_api):
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot", state="closed")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("PR is not open (state=closed)", r["reasons"])
@patch("mcp_server.get_auth_header", return_value=None)
def test_unknown_identity_fails_closed(self, _auth):
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("authenticated identity could not be determined", r["reasons"])
self.assertIsNone(r["authenticated_user"])
def test_unknown_action_rejected(self):
with patch.dict(os.environ, {}, clear=True):
r = gitea_check_pr_eligibility(pr_number=8, action="delete", remote="prgs")
self.assertFalse(r["eligible"])
self.assertTrue(any("unknown action" in x for x in r["reasons"]))
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_never_exposes_secrets(self, _auth, mock_api):
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review",
"GITEA_TOKEN": "super-secret-token"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs")
blob = repr(r).lower()
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
class TestSubmitPrReview(unittest.TestCase):
"""Gated review-mutation tool (#15)."""
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {
"user": {"login": author},
"state": state,
"head": {"sha": sha},
"mergeable": mergeable,
}
def _methods(self, mock_api):
return [c.args[0] for c in mock_api.call_args_list]
def _assert_no_mutation(self, mock_api):
# A review mutation is POST .../reviews; eligibility only ever GETs.
for c in mock_api.call_args_list:
method, url = c.args[0], c.args[1]
self.assertFalse(
method == "POST" and url.endswith("/reviews"),
f"unexpected review mutation: {method} {url}",
)
# -- approve --------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_approve_blocked_when_author(self, _auth, mock_api):
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("authenticated user is PR author", r["reasons"])
self._assert_no_mutation(mock_api)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_approve_succeeds_when_eligible(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="approve", body="LGTM", remote="prgs")
self.assertTrue(r["performed"])
self.assertEqual(r["authenticated_user"], "reviewer-bot")
self.assertEqual(r["pr_author"], "author-bot")
self.assertEqual(r["head_sha"], "abc123")
method, url = mock_api.call_args.args[0], mock_api.call_args.args[1]
self.assertEqual(method, "POST")
self.assertTrue(url.endswith("/pulls/8/reviews"))
payload = mock_api.call_args.args[3]
self.assertEqual(payload["event"], "APPROVE")
self.assertEqual(payload["commit_id"], "abc123")
# -- request_changes ------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_request_changes_succeeds_when_eligible(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 9},
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,request_changes"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="request_changes",
body="needs work", remote="prgs")
self.assertTrue(r["performed"])
self.assertEqual(mock_api.call_args.args[3]["event"], "REQUEST_CHANGES")
def test_request_changes_blocked_without_eligibility(self):
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) as _a, \
patch("mcp_server.api_request") as mock_api:
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review"} # no request_changes
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="request_changes", remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("profile is not allowed to request_changes", r["reasons"])
self._assert_no_mutation(mock_api)
# -- comment --------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_comment_succeeds_when_review_eligible(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 3},
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="comment", body="finding", remote="prgs")
self.assertTrue(r["performed"])
self.assertEqual(mock_api.call_args.args[3]["event"], "COMMENT")
def test_comment_by_author_allowed(self):
# Commenting on your own PR is fine — only approve is self-blocked.
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
patch("mcp_server.api_request") as mock_api:
mock_api.side_effect = [
{"login": "jcwalker3"}, self._pr("jcwalker3"), {"id": 4},
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="comment", body="note", remote="prgs")
self.assertTrue(r["performed"])
# -- identity / profile fail-closed ---------------------------------------
@patch("mcp_server.get_auth_header", return_value=None)
def test_unknown_identity_blocks(self, _auth):
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
self.assertFalse(r["performed"])
self.assertIsNone(r["authenticated_user"])
self.assertIn("authenticated identity could not be determined", r["reasons"])
def test_disallowed_profile_operation_blocks(self):
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
patch("mcp_server.api_request") as mock_api:
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="approve", remote="prgs")
self.assertFalse(r["performed"])
self.assertIn("profile is not allowed to approve", r["reasons"])
self._assert_no_mutation(mock_api)
# -- head SHA guard -------------------------------------------------------
def test_head_sha_mismatch_blocks(self):
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
patch("mcp_server.api_request") as mock_api:
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="approve",
expected_head_sha="deadbeef", remote="prgs")
self.assertFalse(r["performed"])
self.assertIn(
"expected head SHA does not match current PR head (fail closed)",
r["reasons"])
self._assert_no_mutation(mock_api)
def test_head_sha_match_allows(self):
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
patch("mcp_server.api_request") as mock_api:
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
{"id": 5},
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(
pr_number=8, action="approve",
expected_head_sha="abc123", remote="prgs")
self.assertTrue(r["performed"])
# -- invalid action -------------------------------------------------------
@patch("mcp_server.api_request")
def test_invalid_review_action_rejected(self, mock_api):
with patch.dict(os.environ, {}, clear=True):
r = gitea_submit_pr_review(pr_number=8, action="delete", remote="prgs")
self.assertFalse(r["performed"])
self.assertTrue(any("unknown review action" in x for x in r["reasons"]))
mock_api.assert_not_called() # never touches the API on a bad action
# -- redaction ------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_output_redacts_secrets(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 1},
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
"GITEA_TOKEN": "super-secret-token"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
blob = repr(r).lower()
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_error_message_redacts_credential(self, _auth, mock_api):
mock_api.side_effect = [
{"login": "reviewer-bot"},
self._pr("author-bot"),
RuntimeError("HTTP 500: token abc-secret-xyz rejected"),
]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
self.assertFalse(r["performed"])
blob = repr(r)
self.assertIn("[REDACTED]", blob)
self.assertNotIn("abc-secret-xyz", blob)
if __name__ == "__main__":
unittest.main()