1564 lines
69 KiB
Python
1564 lines
69 KiB
Python
"""Tests for the MCP server tool functions.
|
|
|
|
Each tool is tested by calling the underlying function directly (not through
|
|
the MCP protocol) with mocked API responses.
|
|
"""
|
|
import os
|
|
import sys
|
|
import unittest
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
|
|
|
from mcp_server import ( # noqa: E402
|
|
gitea_create_issue,
|
|
gitea_create_pr,
|
|
gitea_close_issue,
|
|
gitea_list_issues,
|
|
gitea_view_issue,
|
|
gitea_mark_issue,
|
|
gitea_mirror_refs,
|
|
gitea_list_prs,
|
|
gitea_view_pr,
|
|
gitea_merge_pr,
|
|
gitea_review_pr,
|
|
gitea_delete_branch,
|
|
gitea_edit_pr,
|
|
gitea_get_file,
|
|
gitea_commit_files,
|
|
gitea_whoami,
|
|
gitea_get_profile,
|
|
gitea_check_pr_eligibility,
|
|
gitea_submit_pr_review,
|
|
)
|
|
from gitea_auth import get_profile # noqa: E402
|
|
|
|
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Create Issue
|
|
# ---------------------------------------------------------------------------
|
|
class TestCreateIssue(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_creates_issue(self, _auth, mock_api):
|
|
mock_api.return_value = {"number": 1, "html_url": "https://gitea.example.com/issues/1"}
|
|
result = gitea_create_issue(title="Test issue", body="body text")
|
|
self.assertEqual(result["number"], 1)
|
|
self.assertIn("issues/1", result["url"])
|
|
mock_api.assert_called_once()
|
|
# Verify payload
|
|
call_args = mock_api.call_args
|
|
self.assertEqual(call_args[0][0], "POST")
|
|
self.assertEqual(call_args[0][3]["title"], "Test issue")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_creates_on_prgs(self, _auth, mock_api):
|
|
mock_api.return_value = {"number": 5, "html_url": "https://gitea.prgs.cc/issues/5"}
|
|
result = gitea_create_issue(title="Test", remote="prgs")
|
|
self.assertEqual(result["number"], 5)
|
|
url = mock_api.call_args[0][1]
|
|
self.assertIn("gitea.prgs.cc", url)
|
|
self.assertIn("Scaled-Tech-Consulting", url)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Create PR
|
|
# ---------------------------------------------------------------------------
|
|
class TestCreatePR(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_creates_pr(self, _auth, mock_api):
|
|
mock_api.return_value = {"number": 3, "html_url": "https://example.com/pulls/3"}
|
|
result = gitea_create_pr(title="feat: X", head="feat/x", base="main")
|
|
self.assertEqual(result["number"], 3)
|
|
payload = mock_api.call_args[0][3]
|
|
self.assertEqual(payload["head"], "feat/x")
|
|
self.assertEqual(payload["base"], "main")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Close Issue
|
|
# ---------------------------------------------------------------------------
|
|
class TestCloseIssue(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_closes_issue(self, _auth, mock_api):
|
|
mock_api.return_value = {"state": "closed"}
|
|
result = gitea_close_issue(issue_number=42)
|
|
self.assertTrue(result["success"])
|
|
self.assertIn("42", result["message"])
|
|
patch_call = next(call for call in mock_api.call_args_list if call[0][0] == "PATCH")
|
|
payload = patch_call[0][3]
|
|
self.assertEqual(payload["state"], "closed")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# List Issues
|
|
# ---------------------------------------------------------------------------
|
|
class TestListIssues(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_returns_formatted_list(self, _auth, mock_api):
|
|
mock_api.return_value = [
|
|
{
|
|
"number": 1, "title": "Bug", "state": "open",
|
|
"labels": [{"name": "bug"}],
|
|
"assignee": {"login": "alice"},
|
|
},
|
|
{
|
|
"number": 2, "title": "Feature", "state": "open",
|
|
"labels": [], "assignee": None,
|
|
},
|
|
]
|
|
result = gitea_list_issues()
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0]["number"], 1)
|
|
self.assertEqual(result[0]["labels"], ["bug"])
|
|
self.assertEqual(result[0]["assignee"], "alice")
|
|
self.assertEqual(result[1]["assignee"], "")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_passes_label_filter(self, _auth, mock_api):
|
|
mock_api.return_value = []
|
|
gitea_list_issues(label="important")
|
|
url = mock_api.call_args[0][1]
|
|
self.assertIn("labels=important", url)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_passes_state_filter(self, _auth, mock_api):
|
|
mock_api.return_value = []
|
|
gitea_list_issues(state="closed")
|
|
url = mock_api.call_args[0][1]
|
|
self.assertIn("state=closed", url)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# View Issue
|
|
# ---------------------------------------------------------------------------
|
|
class TestViewIssue(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_returns_full_details(self, _auth, mock_api):
|
|
mock_api.return_value = {
|
|
"number": 7, "title": "MCP server", "body": "Build it",
|
|
"state": "open", "labels": [{"name": "important"}],
|
|
"assignee": {"login": "jason"},
|
|
"html_url": "https://gitea.prgs.cc/issues/7",
|
|
}
|
|
result = gitea_view_issue(issue_number=7, remote="prgs")
|
|
self.assertEqual(result["number"], 7)
|
|
self.assertEqual(result["body"], "Build it")
|
|
self.assertEqual(result["labels"], ["important"])
|
|
self.assertEqual(result["assignee"], "jason")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mark Issue
|
|
# ---------------------------------------------------------------------------
|
|
class TestMarkIssue(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_start_adds_label(self, _auth, mock_api):
|
|
# First call: get labels; second call: add label
|
|
mock_api.side_effect = [
|
|
[{"id": 10, "name": "status:in-progress"}],
|
|
[{"name": "status:in-progress"}],
|
|
]
|
|
result = gitea_mark_issue(issue_number=5, action="start")
|
|
self.assertTrue(result["success"])
|
|
self.assertIn("claimed", result["message"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_done_removes_label(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
[{"id": 10, "name": "status:in-progress"}],
|
|
None,
|
|
]
|
|
result = gitea_mark_issue(issue_number=5, action="done")
|
|
self.assertTrue(result["success"])
|
|
self.assertIn("released", result["message"])
|
|
|
|
def test_invalid_action_raises(self):
|
|
with self.assertRaises(ValueError):
|
|
gitea_mark_issue(issue_number=5, action="pause")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_missing_label_raises(self, _auth, mock_api):
|
|
mock_api.return_value = [] # no labels exist
|
|
with self.assertRaises(RuntimeError):
|
|
gitea_mark_issue(issue_number=5, action="start")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth errors
|
|
# ---------------------------------------------------------------------------
|
|
class TestAuthErrors(unittest.TestCase):
|
|
|
|
@patch("mcp_server.get_auth_header", return_value=None)
|
|
def test_no_credentials_raises(self, _auth):
|
|
with self.assertRaises(RuntimeError):
|
|
gitea_create_issue(title="test")
|
|
|
|
def test_unknown_remote_raises(self):
|
|
with self.assertRaises(ValueError):
|
|
gitea_create_issue(title="test", remote="nonexistent")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mirror Refs
|
|
# ---------------------------------------------------------------------------
|
|
class TestMirrorRefs(unittest.TestCase):
|
|
|
|
@patch("mcp_server.subprocess.run")
|
|
def test_dry_run_default(self, mock_run):
|
|
mock_run.return_value = MagicMock(
|
|
stdout="DRY RUN\n", stderr="", returncode=0
|
|
)
|
|
result = gitea_mirror_refs()
|
|
self.assertEqual(result["return_code"], 0)
|
|
# Should NOT have --apply
|
|
args = mock_run.call_args[0][0]
|
|
self.assertNotIn("--apply", args)
|
|
self.assertNotIn("--force", args)
|
|
|
|
@patch("mcp_server.subprocess.run")
|
|
def test_apply_flag(self, mock_run):
|
|
mock_run.return_value = MagicMock(
|
|
stdout="done\n", stderr="", returncode=0
|
|
)
|
|
result = gitea_mirror_refs(apply=True)
|
|
args = mock_run.call_args[0][0]
|
|
self.assertIn("--apply", args)
|
|
|
|
@patch("mcp_server.subprocess.run")
|
|
def test_force_flag(self, mock_run):
|
|
mock_run.return_value = MagicMock(
|
|
stdout="", stderr="", returncode=0
|
|
)
|
|
gitea_mirror_refs(apply=True, force=True)
|
|
args = mock_run.call_args[0][0]
|
|
self.assertIn("--apply", args)
|
|
self.assertIn("--force", args)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# List PRs
|
|
# ---------------------------------------------------------------------------
|
|
class TestListPRs(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_list_prs(self, _auth, mock_api):
|
|
mock_api.return_value = [
|
|
{
|
|
"number": 1, "title": "PR 1", "state": "open",
|
|
"head": {"ref": "branch1"}, "base": {"ref": "main"},
|
|
"html_url": "http://url1", "mergeable": True
|
|
}
|
|
]
|
|
result = gitea_list_prs()
|
|
self.assertEqual(len(result), 1)
|
|
self.assertEqual(result[0]["number"], 1)
|
|
self.assertEqual(result[0]["head"], "branch1")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# View PR
|
|
# ---------------------------------------------------------------------------
|
|
class TestViewPR(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_view_pr(self, _auth, mock_api):
|
|
mock_api.return_value = {
|
|
"number": 1, "title": "PR 1", "state": "open",
|
|
"head": {"ref": "branch1"}, "base": {"ref": "main"},
|
|
"html_url": "http://url1", "mergeable": True, "body": "description",
|
|
"user": {"login": "user1"}
|
|
}
|
|
result = gitea_view_pr(pr_number=1)
|
|
self.assertEqual(result["number"], 1)
|
|
self.assertEqual(result["body"], "description")
|
|
self.assertEqual(result["user"], "user1")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Merge PR
|
|
# ---------------------------------------------------------------------------
|
|
class TestMergePR(unittest.TestCase):
|
|
"""Gated merge workflow (#16). gitea_merge_pr is the only merge path."""
|
|
|
|
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
|
return {
|
|
"user": {"login": author},
|
|
"state": state,
|
|
"head": {"sha": sha},
|
|
"mergeable": mergeable,
|
|
}
|
|
|
|
def _confirm(self, n):
|
|
return f"MERGE PR {n}"
|
|
|
|
def _assert_no_merge_call(self, mock_api):
|
|
for c in mock_api.call_args_list:
|
|
method, url = c.args[0], c.args[1]
|
|
self.assertFalse(
|
|
method == "POST" and url.endswith("/merge"),
|
|
f"unexpected merge mutation: {method} {url}",
|
|
)
|
|
|
|
# -- success --------------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_merge_succeeds_when_all_gates_pass(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot"),
|
|
{}, # merge POST
|
|
{"merged_commit_sha": "mergecommit99"}, # read-back
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8),
|
|
expected_head_sha="abc123", do="squash",
|
|
title="T", message="M", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
self.assertEqual(r["authenticated_user"], "merger-bot")
|
|
self.assertEqual(r["pr_author"], "author-bot")
|
|
self.assertEqual(r["head_sha"], "abc123")
|
|
self.assertEqual(r["merge_method"], "squash")
|
|
self.assertEqual(r["merge_commit"], "mergecommit99")
|
|
# 3rd call is the merge POST with the requested method/title/message.
|
|
merge_call = mock_api.call_args_list[2]
|
|
self.assertEqual(merge_call.args[0], "POST")
|
|
self.assertTrue(merge_call.args[1].endswith("/pulls/8/merge"))
|
|
payload = merge_call.args[3]
|
|
self.assertEqual(payload["Do"], "squash")
|
|
self.assertEqual(payload["MergeTitleField"], "T")
|
|
self.assertEqual(payload["MergeMessageField"], "M")
|
|
self.assertNotIn("force_merge", payload)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_expected_changed_files_match_allows(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot"),
|
|
[{"filename": "a.py"}, {"filename": "b.py"}], # files
|
|
{}, # merge POST
|
|
{"merged_commit_sha": "c1"}, # read-back
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8),
|
|
expected_changed_files=["b.py", "a.py"], remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
|
|
# -- confirmation ---------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_missing_confirmation_blocks_with_no_api_call(self, _auth, mock_api):
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(pr_number=8, confirmation="", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertTrue(any("explicit confirmation required" in x for x in r["reasons"]))
|
|
mock_api.assert_not_called()
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_wrong_confirmation_blocks(self, _auth, mock_api):
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(pr_number=8, confirmation="MERGE PR 9", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
mock_api.assert_not_called()
|
|
|
|
# -- identity / profile / eligibility fail-closed -------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_self_author_cannot_merge(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("authenticated user is PR author", r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
@patch("mcp_server.get_auth_header", return_value=None)
|
|
def test_unknown_identity_blocks(self, _auth):
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIsNone(r["authenticated_user"])
|
|
self.assertIn("authenticated identity could not be determined", r["reasons"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_unknown_profile_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
|
|
env = {} # no GITEA_ALLOWED_OPERATIONS → empty allowed ops
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn(
|
|
"profile has no configured allowed operations (fail closed)",
|
|
r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_profile_without_merge_permission_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("profile is not allowed to merge", r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
# -- PR state / mergeability ----------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_closed_pr_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot", state="closed")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("PR is not open (state=closed)", r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_non_mergeable_pr_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot", mergeable=False)]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("PR is not mergeable", r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_unknown_mergeability_fails_closed(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot", mergeable=None)]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("PR mergeability unknown", r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
# -- head SHA / changed files ---------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_head_sha_mismatch_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot", sha="abc123")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8),
|
|
expected_head_sha="deadbeef", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn(
|
|
"expected head SHA does not match current PR head (fail closed)",
|
|
r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_changed_files_mismatch_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot"),
|
|
[{"filename": "a.py"}, {"filename": "c.py"}], # actual files
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8),
|
|
expected_changed_files=["a.py", "b.py"], remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn(
|
|
"PR changed files do not match expected_changed_files (fail closed)",
|
|
r["reasons"])
|
|
self._assert_no_merge_call(mock_api)
|
|
|
|
# -- misc -----------------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
def test_invalid_merge_method_rejected(self, mock_api):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation="MERGE PR 8", do="octopus", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertTrue(any("unknown merge method" in x for x in r["reasons"]))
|
|
mock_api.assert_not_called()
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_output_redacts_secrets(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot"),
|
|
{}, {"merged_commit_sha": "c1"},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge",
|
|
"GITEA_TOKEN": "super-secret-token"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
blob = repr(r).lower()
|
|
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_merge_error_message_redacts_credential(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"}, self._pr("author-bot"),
|
|
RuntimeError("HTTP 500: token abc-secret-xyz rejected"),
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_merge_pr(
|
|
pr_number=8, confirmation=self._confirm(8), remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
blob = repr(r)
|
|
self.assertIn("[REDACTED]", blob)
|
|
self.assertNotIn("abc-secret-xyz", blob)
|
|
|
|
|
|
class TestNoUngatedMergePath(unittest.TestCase):
|
|
"""Prove no other exposed tool can merge (#16 surface audit)."""
|
|
|
|
def test_submit_pr_review_has_no_merge(self):
|
|
import inspect
|
|
import mcp_server
|
|
# No merge parameter on the review-mutation tool.
|
|
params = inspect.signature(mcp_server.gitea_submit_pr_review).parameters
|
|
self.assertNotIn("merge", params)
|
|
# And 'merge' is not a reviewable action.
|
|
self.assertNotIn("merge", mcp_server._REVIEW_ACTIONS)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_review_pr_merge_true_makes_no_api_call(self, _auth, mock_api):
|
|
r = gitea_review_pr(pr_number=1, event="APPROVE", body="x", merge=True)
|
|
self.assertFalse(r["success"])
|
|
self.assertEqual(mock_api.call_count, 0)
|
|
|
|
def test_only_merge_endpoint_is_in_gated_tool(self):
|
|
# Source-level audit: the merge endpoint appears only inside
|
|
# gitea_merge_pr, which is the gated path.
|
|
import inspect
|
|
import mcp_server
|
|
src = inspect.getsource(mcp_server)
|
|
self.assertEqual(src.count("/merge\""), 1)
|
|
merge_src = inspect.getsource(mcp_server.gitea_merge_pr)
|
|
self.assertIn("/merge\"", merge_src)
|
|
|
|
def test_readme_no_longer_advertises_ungated_cli_merge(self):
|
|
import pathlib
|
|
readme = (pathlib.Path(__file__).resolve().parent.parent
|
|
/ "README.md").read_text(encoding="utf-8")
|
|
# The old ungated example command must be gone.
|
|
self.assertNotIn('--body "Approved" --merge', readme)
|
|
# And the gated messaging / audit deferral must be present.
|
|
self.assertIn("disabled", readme.lower())
|
|
self.assertIn("gitea_merge_pr", readme)
|
|
self.assertIn("#18", readme)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Review PR
|
|
# ---------------------------------------------------------------------------
|
|
class TestReviewPR(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_legacy_review_pr_merge_fails_closed(self, _auth, mock_api):
|
|
result = gitea_review_pr(
|
|
pr_number=1,
|
|
event="APPROVE",
|
|
body="Looks good",
|
|
merge=True
|
|
)
|
|
self.assertFalse(result["success"])
|
|
self.assertIn("no longer supported", result["message"])
|
|
self.assertEqual(mock_api.call_count, 0)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
@patch("mcp_server.get_profile")
|
|
def test_legacy_review_pr_uses_gates(self, mock_get_profile, _auth, mock_api):
|
|
# Mock profile to lack approve capability (fails gate)
|
|
mock_get_profile.return_value = {
|
|
"profile_name": "gitea-readonly",
|
|
"allowed_operations": ["read"],
|
|
"forbidden_operations": [],
|
|
"base_url": None,
|
|
}
|
|
# mock_api responses for auth_user and pr_author
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer1"}, # /api/v1/user
|
|
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "author1"}}, # /pulls/1
|
|
]
|
|
result = gitea_review_pr(
|
|
pr_number=1,
|
|
event="APPROVE",
|
|
body="Looks good",
|
|
merge=False
|
|
)
|
|
self.assertFalse(result["success"])
|
|
self.assertIn("Review submission failed eligibility gates", result["message"])
|
|
self.assertIn("not allowed to approve", result["message"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
@patch("mcp_server.get_profile")
|
|
def test_legacy_review_pr_self_approval_blocked(self, mock_get_profile, _auth, mock_api):
|
|
mock_get_profile.return_value = {
|
|
"profile_name": "gitea-reviewer",
|
|
"allowed_operations": ["read", "approve"],
|
|
"forbidden_operations": [],
|
|
"base_url": None,
|
|
}
|
|
# mock_api responses for auth_user and pr_author
|
|
mock_api.side_effect = [
|
|
{"login": "jcwalker3"}, # /api/v1/user
|
|
{"state": "open", "head": {"sha": "abc1234"}, "mergeable": True, "user": {"login": "jcwalker3"}}, # /pulls/1
|
|
]
|
|
result = gitea_review_pr(
|
|
pr_number=1,
|
|
event="APPROVE",
|
|
body="Self approve",
|
|
merge=False
|
|
)
|
|
self.assertFalse(result["success"])
|
|
self.assertIn("Review submission failed eligibility gates", result["message"])
|
|
self.assertIn("authenticated user is PR author", result["message"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Delete Branch
|
|
# ---------------------------------------------------------------------------
|
|
class TestDeleteBranch(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_delete_branch(self, _auth, mock_api):
|
|
mock_api.return_value = {}
|
|
result = gitea_delete_branch(branch="feat/branch")
|
|
self.assertTrue(result["success"])
|
|
self.assertIn("deleted", result["message"])
|
|
# Check url encoding of branch name
|
|
url = mock_api.call_args[0][1]
|
|
self.assertIn("feat%2Fbranch", url)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Edit PR
|
|
# ---------------------------------------------------------------------------
|
|
class TestEditPR(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_edit_pr_success(self, _auth, mock_api):
|
|
mock_api.return_value = {
|
|
"number": 1,
|
|
"title": "New Title",
|
|
"body": "New description",
|
|
"state": "open",
|
|
"html_url": "https://gitea.example.com/pulls/1"
|
|
}
|
|
result = gitea_edit_pr(
|
|
pr_number=1,
|
|
title="New Title",
|
|
body="New description",
|
|
state="open"
|
|
)
|
|
self.assertTrue(result["success"])
|
|
self.assertEqual(result["title"], "New Title")
|
|
self.assertEqual(result["body"], "New description")
|
|
|
|
# Verify PATCH and payload
|
|
call_args = mock_api.call_args
|
|
self.assertEqual(call_args[0][0], "PATCH")
|
|
self.assertEqual(call_args[0][3]["title"], "New Title")
|
|
self.assertEqual(call_args[0][3]["body"], "New description")
|
|
self.assertEqual(call_args[0][3]["state"], "open")
|
|
|
|
def test_edit_pr_no_fields_raises(self):
|
|
with self.assertRaises(ValueError):
|
|
gitea_edit_pr(pr_number=1)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header")
|
|
def test_edit_pr_no_fields_validates_before_auth(self, mock_auth, mock_api):
|
|
# No-fields validation must not depend on credentials/network: it raises
|
|
# ValueError before touching auth or the API, even with no creds.
|
|
mock_auth.return_value = None # simulate an unauthenticated environment
|
|
with self.assertRaises(ValueError):
|
|
gitea_edit_pr(pr_number=1)
|
|
mock_auth.assert_not_called()
|
|
mock_api.assert_not_called()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Get File
|
|
# ---------------------------------------------------------------------------
|
|
class TestGetFile(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_get_file_success(self, _auth, mock_api):
|
|
mock_api.return_value = {
|
|
"name": "README.md",
|
|
"path": "README.md",
|
|
"sha": "3a0b123",
|
|
"size": 100,
|
|
"encoding": "base64",
|
|
"content": "SGVsbG8gV29ybGQ="
|
|
}
|
|
result = gitea_get_file(filepath="README.md", ref="main")
|
|
self.assertEqual(result["name"], "README.md")
|
|
self.assertEqual(result["sha"], "3a0b123")
|
|
self.assertEqual(result["content"], "SGVsbG8gV29ybGQ=")
|
|
|
|
# Verify endpoint and GET method
|
|
call_args = mock_api.call_args
|
|
self.assertEqual(call_args[0][0], "GET")
|
|
self.assertIn("contents/README.md", call_args[0][1])
|
|
self.assertIn("ref=main", call_args[0][1])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Commit Files
|
|
# ---------------------------------------------------------------------------
|
|
class TestCommitFiles(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_commit_files_success(self, _auth, mock_api):
|
|
mock_api.return_value = {
|
|
"commit": {"sha": "commit-sha-123"},
|
|
"branch": {"name": "test-branch"}
|
|
}
|
|
files = [
|
|
{"operation": "create", "path": "test.txt", "content": "SGVsbG8="}
|
|
]
|
|
result = gitea_commit_files(
|
|
files=files,
|
|
message="Initial commit",
|
|
new_branch="test-branch"
|
|
)
|
|
self.assertTrue(result["success"])
|
|
self.assertEqual(result["commit"], "commit-sha-123")
|
|
self.assertEqual(result["branch"], "test-branch")
|
|
|
|
# Verify POST method and payload
|
|
call_args = mock_api.call_args
|
|
self.assertEqual(call_args[0][0], "POST")
|
|
self.assertIn("/contents", call_args[0][1])
|
|
payload = call_args[0][3]
|
|
self.assertEqual(payload["message"], "Initial commit")
|
|
self.assertEqual(payload["new_branch"], "test-branch")
|
|
self.assertEqual(payload["files"], files)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Whoami (authenticated-user identity lookup)
|
|
# ---------------------------------------------------------------------------
|
|
class TestWhoami(unittest.TestCase):
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_returns_safe_identity(self, _auth, mock_api):
|
|
mock_api.return_value = {
|
|
"id": 42,
|
|
"login": "reviewer-bot",
|
|
"full_name": "Reviewer Bot",
|
|
"email": "reviewer@example.com",
|
|
}
|
|
result = gitea_whoami(remote="prgs")
|
|
self.assertTrue(result["authenticated"])
|
|
self.assertEqual(result["username"], "reviewer-bot")
|
|
self.assertEqual(result["display_name"], "Reviewer Bot")
|
|
self.assertEqual(result["user_id"], 42)
|
|
self.assertEqual(result["server"], "https://gitea.prgs.cc")
|
|
self.assertEqual(result["remote"], "prgs")
|
|
# Read-only: GET against the authenticated-user endpoint.
|
|
call_args = mock_api.call_args
|
|
self.assertEqual(call_args[0][0], "GET")
|
|
self.assertTrue(call_args[0][1].endswith("/api/v1/user"))
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_never_exposes_secrets(self, _auth, mock_api):
|
|
mock_api.return_value = {"id": 1, "login": "someone"}
|
|
result = gitea_whoami(remote="prgs")
|
|
blob = repr(result).lower()
|
|
for secret in ("token", "authorization", "basic ", "password", FAKE_AUTH.lower()):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_fails_closed_without_login(self, _auth, mock_api):
|
|
mock_api.return_value = {"id": 1} # no 'login'
|
|
with self.assertRaises(RuntimeError):
|
|
gitea_whoami(remote="prgs")
|
|
|
|
def test_rejects_unknown_remote(self):
|
|
with self.assertRaises(ValueError):
|
|
gitea_whoami(remote="nope")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Runtime profile (env-configured profile metadata) — issue #19
|
|
# ---------------------------------------------------------------------------
|
|
class TestRuntimeProfile(unittest.TestCase):
|
|
|
|
def test_defaults_when_unset(self):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
p = get_profile()
|
|
self.assertEqual(p["profile_name"], "gitea-default")
|
|
self.assertEqual(p["allowed_operations"], [])
|
|
self.assertIsNone(p["base_url"])
|
|
|
|
def test_reads_env_metadata(self):
|
|
env = {
|
|
"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read, review , approve",
|
|
"GITEA_BASE_URL": "https://gitea.example.invalid",
|
|
}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
p = get_profile()
|
|
self.assertEqual(p["profile_name"], "gitea-reviewer")
|
|
self.assertEqual(p["allowed_operations"], ["read", "review", "approve"])
|
|
self.assertEqual(p["base_url"], "https://gitea.example.invalid")
|
|
|
|
def test_never_includes_token(self):
|
|
env = {
|
|
"GITEA_PROFILE_NAME": "gitea-author",
|
|
"GITEA_TOKEN": "super-secret-token",
|
|
}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
p = get_profile()
|
|
blob = repr(p).lower()
|
|
# The token VALUE must never appear. (The field name
|
|
# 'token_source_name' is non-secret metadata and may exist.)
|
|
self.assertNotIn("super-secret-token", blob)
|
|
self.assertIsNone(p.get("token_source_name")) # GITEA_TOKEN_SOURCE unset
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_whoami_surfaces_profile_without_token(self, _auth, mock_api):
|
|
mock_api.return_value = {"id": 7, "login": "rev"}
|
|
env = {
|
|
"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
|
"GITEA_TOKEN": "super-secret-token",
|
|
}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
result = gitea_whoami(remote="prgs")
|
|
self.assertEqual(result["profile"]["profile_name"], "gitea-reviewer")
|
|
self.assertEqual(
|
|
result["profile"]["allowed_operations"], ["read", "review", "approve"]
|
|
)
|
|
blob = repr(result).lower()
|
|
for secret in ("super-secret-token", "token", "authorization", "basic "):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Profile discovery (read-only) — issue #13
|
|
# ---------------------------------------------------------------------------
|
|
class TestProfileDiscovery(unittest.TestCase):
|
|
|
|
def test_get_profile_new_fields_default_empty(self):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
p = get_profile()
|
|
self.assertEqual(p["forbidden_operations"], [])
|
|
self.assertIsNone(p["audit_label"])
|
|
self.assertIsNone(p["token_source_name"])
|
|
|
|
def test_get_profile_reads_all_metadata(self):
|
|
env = {
|
|
"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
|
"GITEA_FORBIDDEN_OPERATIONS": "merge, branch.push",
|
|
"GITEA_AUDIT_LABEL": "reviewer-runtime",
|
|
"GITEA_TOKEN_SOURCE": "GITEA_TOKEN",
|
|
}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
p = get_profile()
|
|
self.assertEqual(p["forbidden_operations"], ["merge", "branch.push"])
|
|
self.assertEqual(p["audit_label"], "reviewer-runtime")
|
|
self.assertEqual(p["token_source_name"], "GITEA_TOKEN")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_discovery_verified_identity(self, _auth, mock_api):
|
|
mock_api.return_value = {"id": 3, "login": "reviewer-bot"}
|
|
env = {
|
|
"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
|
"GITEA_TOKEN_SOURCE": "GITEA_TOKEN",
|
|
"GITEA_TOKEN": "super-secret-token",
|
|
}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
result = gitea_get_profile(remote="prgs")
|
|
self.assertEqual(result["profile_name"], "gitea-reviewer")
|
|
self.assertEqual(result["allowed_operations"], ["read", "review", "approve"])
|
|
self.assertEqual(result["authenticated_username"], "reviewer-bot")
|
|
self.assertEqual(result["identity_status"], "verified")
|
|
self.assertEqual(result["server"], "https://gitea.prgs.cc")
|
|
self.assertEqual(result["token_source_name"], "GITEA_TOKEN")
|
|
# Read-only: only a GET to the user endpoint was issued.
|
|
self.assertEqual(mock_api.call_args[0][0], "GET")
|
|
self.assertTrue(mock_api.call_args[0][1].endswith("/api/v1/user"))
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_discovery_never_exposes_secrets(self, _auth, mock_api):
|
|
mock_api.return_value = {"id": 3, "login": "reviewer-bot"}
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer", "GITEA_TOKEN": "super-secret-token"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
result = gitea_get_profile(remote="prgs")
|
|
blob = repr(result).lower()
|
|
for secret in ("super-secret-token", "token dgvzd", "authorization", "basic ", FAKE_AUTH.lower()):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
@patch("mcp_server.get_auth_header", return_value=None)
|
|
def test_discovery_identity_unavailable_fails_soft(self, _auth):
|
|
# No credentials -> _auth raises inside the tool; identity marked
|
|
# unavailable but the profile config is still returned (not raised).
|
|
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
|
|
result = gitea_get_profile(remote="prgs")
|
|
self.assertEqual(result["profile_name"], "gitea-author")
|
|
self.assertIsNone(result["authenticated_username"])
|
|
self.assertEqual(result["identity_status"], "unavailable")
|
|
|
|
def test_discovery_can_skip_identity(self):
|
|
with patch.dict(os.environ, {"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
|
|
result = gitea_get_profile(remote="prgs", resolve_identity=False)
|
|
self.assertEqual(result["identity_status"], "not_resolved")
|
|
self.assertIsNone(result["authenticated_username"])
|
|
|
|
def test_discovery_unknown_remote_marks_unknown(self):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
result = gitea_get_profile(remote="nope")
|
|
self.assertEqual(result["identity_status"], "unknown")
|
|
self.assertIsNone(result["remote"])
|
|
self.assertIn("remote_error", result)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PR eligibility checks (read-only) — issue #14
|
|
# ---------------------------------------------------------------------------
|
|
class TestPrEligibility(unittest.TestCase):
|
|
|
|
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
|
return {
|
|
"user": {"login": author},
|
|
"state": state,
|
|
"head": {"sha": sha},
|
|
"mergeable": mergeable,
|
|
}
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_reviewer_eligible_to_review(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs")
|
|
self.assertTrue(r["eligible"])
|
|
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
|
self.assertEqual(r["pr_author"], "author-bot")
|
|
self.assertEqual(r["head_sha"], "abc123")
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_self_author_cannot_merge(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIn("authenticated user is PR author", r["reasons"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_self_author_cannot_approve(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIn("authenticated user is PR author", r["reasons"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_merge_fails_closed_when_mergeability_unknown(self, _auth, mock_api):
|
|
# Gitea reports mergeable as None/null (not yet computed).
|
|
mock_api.side_effect = [
|
|
{"login": "merger-bot"},
|
|
self._pr("author-bot", mergeable=None),
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIsNone(r["mergeable"])
|
|
self.assertIn("PR mergeability unknown", r["reasons"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_profile_not_allowed_to_merge(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "author-bot"}, self._pr("someone-else")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIn("profile is not allowed to merge", r["reasons"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_forbidden_operation_blocks(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review",
|
|
"GITEA_FORBIDDEN_OPERATIONS": "merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIn("profile forbids 'merge'", r["reasons"])
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_closed_pr_not_eligible(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot", state="closed")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="approve", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIn("PR is not open (state=closed)", r["reasons"])
|
|
|
|
@patch("mcp_server.get_auth_header", return_value=None)
|
|
def test_unknown_identity_fails_closed(self, _auth):
|
|
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,merge"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="merge", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertIn("authenticated identity could not be determined", r["reasons"])
|
|
self.assertIsNone(r["authenticated_user"])
|
|
|
|
def test_unknown_action_rejected(self):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=8, action="delete", remote="prgs")
|
|
self.assertFalse(r["eligible"])
|
|
self.assertTrue(any("unknown action" in x for x in r["reasons"]))
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_never_exposes_secrets(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review",
|
|
"GITEA_TOKEN": "super-secret-token"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs")
|
|
blob = repr(r).lower()
|
|
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
|
|
class TestSubmitPrReview(unittest.TestCase):
|
|
"""Gated review-mutation tool (#15)."""
|
|
|
|
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
|
return {
|
|
"user": {"login": author},
|
|
"state": state,
|
|
"head": {"sha": sha},
|
|
"mergeable": mergeable,
|
|
}
|
|
|
|
def _methods(self, mock_api):
|
|
return [c.args[0] for c in mock_api.call_args_list]
|
|
|
|
def _assert_no_mutation(self, mock_api):
|
|
# A review mutation is POST .../reviews; eligibility only ever GETs.
|
|
for c in mock_api.call_args_list:
|
|
method, url = c.args[0], c.args[1]
|
|
self.assertFalse(
|
|
method == "POST" and url.endswith("/reviews"),
|
|
f"unexpected review mutation: {method} {url}",
|
|
)
|
|
|
|
# -- approve --------------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_approve_blocked_when_author(self, _auth, mock_api):
|
|
mock_api.side_effect = [{"login": "jcwalker3"}, self._pr("jcwalker3")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("authenticated user is PR author", r["reasons"])
|
|
self._assert_no_mutation(mock_api)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_approve_succeeds_when_eligible(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 7},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="approve", body="LGTM", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
self.assertEqual(r["authenticated_user"], "reviewer-bot")
|
|
self.assertEqual(r["pr_author"], "author-bot")
|
|
self.assertEqual(r["head_sha"], "abc123")
|
|
method, url = mock_api.call_args.args[0], mock_api.call_args.args[1]
|
|
self.assertEqual(method, "POST")
|
|
self.assertTrue(url.endswith("/pulls/8/reviews"))
|
|
payload = mock_api.call_args.args[3]
|
|
self.assertEqual(payload["event"], "APPROVE")
|
|
self.assertEqual(payload["commit_id"], "abc123")
|
|
|
|
# -- request_changes ------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_request_changes_succeeds_when_eligible(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 9},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,request_changes"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="request_changes",
|
|
body="needs work", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
self.assertEqual(mock_api.call_args.args[3]["event"], "REQUEST_CHANGES")
|
|
|
|
def test_request_changes_blocked_without_eligibility(self):
|
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) as _a, \
|
|
patch("mcp_server.api_request") as mock_api:
|
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review"} # no request_changes
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="request_changes", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("profile is not allowed to request_changes", r["reasons"])
|
|
self._assert_no_mutation(mock_api)
|
|
|
|
# -- comment --------------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_comment_succeeds_when_review_eligible(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 3},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="comment", body="finding", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
self.assertEqual(mock_api.call_args.args[3]["event"], "COMMENT")
|
|
|
|
def test_comment_by_author_allowed(self):
|
|
# Commenting on your own PR is fine — only approve is self-blocked.
|
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
|
patch("mcp_server.api_request") as mock_api:
|
|
mock_api.side_effect = [
|
|
{"login": "jcwalker3"}, self._pr("jcwalker3"), {"id": 4},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="comment", body="note", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
|
|
# -- identity / profile fail-closed ---------------------------------------
|
|
|
|
@patch("mcp_server.get_auth_header", return_value=None)
|
|
def test_unknown_identity_blocks(self, _auth):
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(pr_number=8, action="approve", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIsNone(r["authenticated_user"])
|
|
self.assertIn("authenticated identity could not be determined", r["reasons"])
|
|
|
|
def test_disallowed_profile_operation_blocks(self):
|
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
|
patch("mcp_server.api_request") as mock_api:
|
|
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,pr.create"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="approve", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn("profile is not allowed to approve", r["reasons"])
|
|
self._assert_no_mutation(mock_api)
|
|
|
|
# -- head SHA guard -------------------------------------------------------
|
|
|
|
def test_head_sha_mismatch_blocks(self):
|
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
|
patch("mcp_server.api_request") as mock_api:
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="approve",
|
|
expected_head_sha="deadbeef", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertIn(
|
|
"expected head SHA does not match current PR head (fail closed)",
|
|
r["reasons"])
|
|
self._assert_no_mutation(mock_api)
|
|
|
|
def test_head_sha_match_allows(self):
|
|
with patch("mcp_server.get_auth_header", return_value=FAKE_AUTH), \
|
|
patch("mcp_server.api_request") as mock_api:
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot", sha="abc123"),
|
|
{"id": 5},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(
|
|
pr_number=8, action="approve",
|
|
expected_head_sha="abc123", remote="prgs")
|
|
self.assertTrue(r["performed"])
|
|
|
|
# -- invalid action -------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
def test_invalid_review_action_rejected(self, mock_api):
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
r = gitea_submit_pr_review(pr_number=8, action="delete", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
self.assertTrue(any("unknown review action" in x for x in r["reasons"]))
|
|
mock_api.assert_not_called() # never touches the API on a bad action
|
|
|
|
# -- redaction ------------------------------------------------------------
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_output_redacts_secrets(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"}, self._pr("author-bot"), {"id": 1},
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
|
"GITEA_TOKEN": "super-secret-token"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
|
|
blob = repr(r).lower()
|
|
for secret in ("super-secret-token", "authorization", "basic ", FAKE_AUTH.lower()):
|
|
self.assertNotIn(secret, blob)
|
|
|
|
@patch("mcp_server.api_request")
|
|
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
def test_error_message_redacts_credential(self, _auth, mock_api):
|
|
mock_api.side_effect = [
|
|
{"login": "reviewer-bot"},
|
|
self._pr("author-bot"),
|
|
RuntimeError("HTTP 500: token abc-secret-xyz rejected"),
|
|
]
|
|
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
|
with patch.dict(os.environ, env, clear=True):
|
|
r = gitea_submit_pr_review(pr_number=5, action="approve", remote="prgs")
|
|
self.assertFalse(r["performed"])
|
|
blob = repr(r)
|
|
self.assertIn("[REDACTED]", blob)
|
|
self.assertNotIn("abc-secret-xyz", blob)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tracker Hygiene Cleanup Tests
|
|
# ---------------------------------------------------------------------------
|
|
class TestTrackerHygieneCleanup(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.mock_api = patch("mcp_server.api_request").start()
|
|
self.mock_auth = patch("mcp_server.get_auth_header", return_value=FAKE_AUTH).start()
|
|
patch("gitea_audit.audit_enabled", return_value=True).start()
|
|
self.mock_audit = patch("gitea_audit.write_event").start()
|
|
patch("mcp_server.get_profile", return_value={"profile_name": "test", "allowed_operations": ["merge", "edit", "close"], "audit_label": "test", "forbidden_operations": []}).start()
|
|
|
|
def tearDown(self):
|
|
patch.stopall()
|
|
|
|
def test_close_issue_removes_in_progress(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "PATCH" and "issues/1" in url:
|
|
return {"state": "closed"}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}]
|
|
if method == "GET" and "issues/1" in url:
|
|
return {"labels": [{"name": "status:in-progress"}, {"name": "bug"}]}
|
|
if method == "PUT" and "labels" in url:
|
|
self.assertEqual(payload["labels"], [2])
|
|
return []
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_close_issue(issue_number=1)
|
|
self.assertTrue(res["success"])
|
|
self.assertEqual(res["cleanup_status"].get(1), "released")
|
|
self.mock_audit.assert_called()
|
|
|
|
def test_close_issue_no_label_is_noop(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "PATCH" and "issues/1" in url:
|
|
return {"state": "closed"}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}]
|
|
if method == "GET" and "issues/1" in url:
|
|
return {"labels": [{"name": "bug"}]}
|
|
if method == "PUT" and "labels" in url:
|
|
self.fail("Should not PUT labels")
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_close_issue(issue_number=1)
|
|
self.assertTrue(res["success"])
|
|
self.assertEqual(res["cleanup_status"].get(1), "not present")
|
|
|
|
def test_merge_pr_with_closes_removes_label(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "GET" and "/user" in url:
|
|
return {"login": "merger"}
|
|
if method == "GET" and "pulls/1" in url and "/files" not in url:
|
|
return {
|
|
"user": {"login": "author"},
|
|
"state": "open",
|
|
"head": {"sha": "sha123", "ref": "feat/my-branch"},
|
|
"base": {"ref": "main"},
|
|
"mergeable": True,
|
|
"merged_commit_sha": "merge123",
|
|
"title": "My PR",
|
|
"body": "Closes #123"
|
|
}
|
|
if method == "POST" and "merge" in url:
|
|
return {}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}]
|
|
if method == "GET" and "issues/123" in url:
|
|
return {"labels": [{"name": "status:in-progress"}, {"name": "bug"}]}
|
|
if method == "PUT" and "labels" in url:
|
|
self.assertEqual(payload["labels"], [2])
|
|
return []
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_merge_pr(pr_number=1, confirmation="MERGE PR 1", do="merge")
|
|
self.assertTrue(res["performed"])
|
|
self.assertEqual(res["cleanup_status"].get(123), "released")
|
|
|
|
def test_merge_pr_with_branch_name_removes_label(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "GET" and "/user" in url:
|
|
return {"login": "merger"}
|
|
if method == "GET" and "pulls/1" in url and "/files" not in url:
|
|
return {
|
|
"user": {"login": "author"},
|
|
"state": "open",
|
|
"head": {"sha": "sha123", "ref": "fix/issue-123-slug"},
|
|
"base": {"ref": "main"},
|
|
"mergeable": True,
|
|
"merged_commit_sha": "merge123",
|
|
"title": "My PR",
|
|
"body": "Fixing things"
|
|
}
|
|
if method == "POST" and "merge" in url:
|
|
return {}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}]
|
|
if method == "GET" and "issues/123" in url:
|
|
return {"labels": [{"name": "status:in-progress"}, {"name": "bug"}]}
|
|
if method == "PUT" and "labels" in url:
|
|
self.assertEqual(payload["labels"], [2])
|
|
return []
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_merge_pr(pr_number=1, confirmation="MERGE PR 1", do="merge")
|
|
self.assertTrue(res["performed"])
|
|
self.assertEqual(res["cleanup_status"].get(123), "released")
|
|
|
|
def test_close_pr_removes_label_but_does_not_close_issue(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "PATCH" and "pulls/1" in url:
|
|
return {
|
|
"number": 1,
|
|
"title": "My PR",
|
|
"state": "closed",
|
|
"html_url": "url",
|
|
"body": "Closes #123",
|
|
"head": {"ref": "feat/my-branch"}
|
|
}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}]
|
|
if method == "GET" and "issues/123" in url:
|
|
return {"labels": [{"name": "status:in-progress"}]}
|
|
if method == "PUT" and "labels" in url:
|
|
self.assertEqual(payload["labels"], [])
|
|
return []
|
|
if method == "POST" and "comments" in url:
|
|
return {}
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_edit_pr(pr_number=1, state="closed")
|
|
self.assertTrue(res["success"])
|
|
self.assertEqual(res["cleanup_status"].get(123), "released")
|
|
|
|
def test_multiple_linked_issues(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "PATCH" and "pulls/1" in url:
|
|
return {
|
|
"number": 1,
|
|
"title": "My PR",
|
|
"state": "closed",
|
|
"html_url": "url",
|
|
"body": "Closes #123\nFixes #124",
|
|
"head": {"ref": "issue-125"}
|
|
}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}]
|
|
if method == "GET" and "issues/123" in url:
|
|
return {"labels": [{"name": "status:in-progress"}]}
|
|
if method == "GET" and "issues/124" in url:
|
|
return {"labels": [{"name": "status:in-progress"}]}
|
|
if method == "GET" and "issues/125" in url:
|
|
return {"labels": []}
|
|
if method == "PUT" and "labels" in url:
|
|
self.assertEqual(payload["labels"], [])
|
|
return []
|
|
if method == "POST" and "comments" in url:
|
|
return {}
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_edit_pr(pr_number=1, state="closed")
|
|
self.assertTrue(res["success"])
|
|
self.assertEqual(res["cleanup_status"].get(123), "released")
|
|
self.assertEqual(res["cleanup_status"].get(124), "released")
|
|
self.assertEqual(res["cleanup_status"].get(125), "not present")
|
|
|
|
def test_no_linked_issue_found(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "PATCH" and "pulls/1" in url:
|
|
return {
|
|
"number": 1,
|
|
"title": "My PR",
|
|
"state": "closed",
|
|
"html_url": "url",
|
|
"body": "No issue link",
|
|
"head": {"ref": "main"}
|
|
}
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_edit_pr(pr_number=1, state="closed")
|
|
self.assertTrue(res["success"])
|
|
self.assertEqual(res["cleanup_status"], "no linked issue found")
|
|
|
|
def test_label_removal_failure_reported(self):
|
|
def api_side_effect(method, url, auth, payload=None):
|
|
if method == "PATCH" and "issues/1" in url:
|
|
return {"state": "closed"}
|
|
if method == "GET" and "labels" in url and "issues" not in url:
|
|
return [{"name": "status:in-progress", "id": 1}]
|
|
if method == "GET" and "issues/1" in url:
|
|
return {"labels": [{"name": "status:in-progress"}]}
|
|
if method == "PUT" and "labels" in url:
|
|
raise RuntimeError("API failure")
|
|
return {}
|
|
self.mock_api.side_effect = api_side_effect
|
|
|
|
res = gitea_close_issue(issue_number=1)
|
|
self.assertTrue(res["success"])
|
|
self.assertIn("error:", res["cleanup_status"].get(1))
|