Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a0e7d3360e | |||
| b1256d73b4 | |||
| 6a8a9d99b7 | |||
| 349bc06da7 | |||
| 5aeb51f132 | |||
| 9c44fd6b27 | |||
| e880a210ec |
@@ -179,6 +179,30 @@ allowed/forbidden membership check):
|
||||
therefore never silently widen permissions.
|
||||
- An empty or missing `allowed_operations` list denies everything.
|
||||
|
||||
## Issue comments versus PR reviews (#126)
|
||||
|
||||
Issue discussion comments and PR reviews are different capabilities and are
|
||||
gated by different operations:
|
||||
|
||||
- **Issue comments** (`gitea_list_issue_comments`, `gitea_create_issue_comment`)
|
||||
post to and read from an issue's discussion thread
|
||||
(`/issues/{n}/comments`). Listing requires `gitea.read`; creating requires
|
||||
`gitea.issue.comment`. They never submit review verdicts.
|
||||
- **PR reviews** (`gitea_review_pr`, `gitea_submit_pr_review`) submit
|
||||
approve/request-changes/comment verdicts on pull requests
|
||||
(`/pulls/{n}/reviews`) and are gated by the `gitea.pr.*` family
|
||||
(`gitea.pr.review`, `gitea.pr.approve`, `gitea.pr.request_changes`,
|
||||
`gitea.pr.comment`).
|
||||
|
||||
A profile holding the full PR review/merge set still cannot post issue
|
||||
discussion comments unless it also allows `gitea.issue.comment`, and vice
|
||||
versa — neither family implies the other. Both comment tools require an
|
||||
explicit issue number; the target repo comes only from the standard
|
||||
remote/org/repo arguments. Create operations are audit-logged
|
||||
(`create_issue_comment`) when `GITEA_AUDIT_LOG` is configured, errors are
|
||||
redacted, and normal output contains no endpoint URLs
|
||||
(`GITEA_MCP_REVEAL_ENDPOINTS=1` is the local admin opt-in for web links).
|
||||
|
||||
## Identity and fail-closed rules
|
||||
|
||||
Before **any** mutating action, a workflow must know both:
|
||||
@@ -228,6 +252,26 @@ the "one server per trust boundary" model described in
|
||||
[`tool-boundaries.md`](tool-boundaries.md) and
|
||||
[`credential-isolation.md`](credential-isolation.md).
|
||||
|
||||
## Profile Activation and Runtime Identity Clarity (#131)
|
||||
|
||||
To make Gitea MCP profile activation and runtime identity state explicit, the following mechanisms are supported:
|
||||
|
||||
### 1. Static-Profile vs. Dynamic-Profile Mode
|
||||
- **Static-Profile Mode (Default):** The active profile is fixed at server launch based on the `GITEA_MCP_PROFILE` environment variable (with `GITEA_MCP_CONFIG` pointing to the config path). Local environment variables are static once a subprocess is spawned by the host. Modifying the environment variables on the host does not dynamically update an already-connected MCP server process.
|
||||
- **Dynamic-Profile Mode:** Profile switching via the `gitea_activate_profile` tool is supported **only** if the configuration JSON explicitly opts in by setting `"allow_runtime_switching": true` under rules or top-level keys. Otherwise, attempting to switch profiles dynamically will fail closed.
|
||||
|
||||
### 2. Dual MCP Namespaces Recommendation
|
||||
For security-sensitive or high-risk tasks, the preferred safety model uses separate, isolated MCP server instances (namespaces/sessions) launched with static profiles:
|
||||
- `gitea-author`: Exposes tools configured with author permissions; cannot perform approvals or merges.
|
||||
- `gitea-reviewer`: Exposes tools configured with reviewer permissions; used for PR reviews and merges.
|
||||
This layout maintains physical separation of credentials and prevents privilege escalation within a single session.
|
||||
|
||||
### 3. Verification Post-Switching
|
||||
When dynamic profile switching is enabled and a profile is activated via `gitea_activate_profile`, the session MUST immediately:
|
||||
1. Clear the cached identity.
|
||||
2. Call `gitea_whoami` with the target remote to prove and verify the fresh Gitea authenticated identity.
|
||||
This guarantees the active profile operations align with the actual Gitea authenticated user credential.
|
||||
|
||||
## Relationship to roadmap issues
|
||||
|
||||
This document defines the **model only**. Related work is tracked separately
|
||||
|
||||
@@ -18,6 +18,17 @@ behavior they rely on already exists (canonical runtime profiles, the
|
||||
interactive setup menu, identity/eligibility checks, gated review/merge, and
|
||||
audit logging). See [Related documents](#related-documents).
|
||||
|
||||
> **New session? Call the guide tools first (#128).** Before using any other
|
||||
> Gitea MCP tool in a fresh session, call `mcp_get_control_plane_guide`
|
||||
> (read-only): it reports the active profile, authenticated identity,
|
||||
> allowed/forbidden operations, profile-aware do/don't guidance, and the
|
||||
> non-negotiable rules (hard stops, fail-closed behavior, head-SHA pinning,
|
||||
> merge confirmation, redaction, author/reviewer separation, profile
|
||||
> switching). Then call `mcp_list_project_skills` to discover the available
|
||||
> project workflows and `mcp_get_skill_guide(<name>)` for step-by-step
|
||||
> instructions. This replaces long pasted operator prompts for the standard
|
||||
> rules; operator prompts still control task-specific scope.
|
||||
|
||||
For cross-project use, copy the portable workflow skill at
|
||||
[`../skills/llm-project-workflow/SKILL.md`](../skills/llm-project-workflow/SKILL.md).
|
||||
It extracts the issue-first, isolated-worktree, no-self-review, profile-safety,
|
||||
|
||||
+17
-1
@@ -473,13 +473,29 @@ def get_profile():
|
||||
token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() \
|
||||
or gitea_config.auth_source_name(jp)
|
||||
base_url = os.environ.get("GITEA_BASE_URL") or jp.get("base_url") or None
|
||||
auth_type = None
|
||||
if isinstance(jp.get("auth"), dict):
|
||||
auth_type = jp["auth"].get("type")
|
||||
elif token_source:
|
||||
if token_source.startswith("keychain:"):
|
||||
auth_type = "keychain"
|
||||
else:
|
||||
auth_type = "env"
|
||||
|
||||
return {
|
||||
"profile_name": name,
|
||||
"allowed_operations": ops,
|
||||
"forbidden_operations": forbidden,
|
||||
"audit_label": audit_label,
|
||||
"token_source_name": token_source,
|
||||
"auth_source_type": auth_type,
|
||||
"base_url": base_url,
|
||||
"username": jp.get("username") or None,
|
||||
"default_owner": jp.get("default_owner") or None,
|
||||
}
|
||||
"profile_path": jp.get("profile_path") or None,
|
||||
"environment": jp.get("environment") or None,
|
||||
"service": jp.get("service") or None,
|
||||
"identity": jp.get("identity") or None,
|
||||
"role": jp.get("role") or None,
|
||||
"execution_profile": jp.get("execution_profile") or None,
|
||||
}
|
||||
@@ -192,11 +192,32 @@ def config_path():
|
||||
return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None
|
||||
|
||||
|
||||
_active_profile_override = None
|
||||
|
||||
|
||||
def selected_profile_name():
|
||||
"""Return the selected profile name from the environment, or None."""
|
||||
if _active_profile_override is not None:
|
||||
return _active_profile_override
|
||||
return (os.environ.get(ENV_PROFILE) or "").strip() or None
|
||||
|
||||
|
||||
def is_runtime_switching_enabled(path=None):
|
||||
"""Check if runtime profile switching is explicitly enabled in config."""
|
||||
try:
|
||||
config = load_config(path)
|
||||
except Exception:
|
||||
return False
|
||||
if not config:
|
||||
return False
|
||||
rules = config.get("rules") or {}
|
||||
if rules.get("allow_runtime_switching") is True:
|
||||
return True
|
||||
if config.get("allow_runtime_switching") is True:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def load_config(path=None):
|
||||
"""Load and minimally validate the canonical JSON config.
|
||||
|
||||
|
||||
+970
File diff suppressed because it is too large
Load Diff
@@ -31,6 +31,8 @@ from mcp_server import ( # noqa: E402
|
||||
gitea_get_profile,
|
||||
gitea_check_pr_eligibility,
|
||||
gitea_submit_pr_review,
|
||||
gitea_list_issue_comments,
|
||||
gitea_create_issue_comment,
|
||||
)
|
||||
from gitea_auth import get_profile # noqa: E402
|
||||
|
||||
@@ -995,6 +997,65 @@ class TestRuntimeProfile(unittest.TestCase):
|
||||
for secret in ("super-secret-token", "token", "authorization", "basic "):
|
||||
self.assertNotIn(secret, blob)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_whoami_v2_metadata(self, _auth, mock_api):
|
||||
mock_api.return_value = {"id": 7, "login": "rev"}
|
||||
env = {
|
||||
"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "merge",
|
||||
"GITEA_AUDIT_LABEL": "reviewer-runtime",
|
||||
"GITEA_TOKEN_SOURCE": "keychain:prgs-reviewer-token",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_whoami(remote="prgs")
|
||||
profile = result["profile"]
|
||||
self.assertEqual(profile["environment"], None)
|
||||
self.assertEqual(profile["service"], None)
|
||||
self.assertEqual(profile["identity"], None)
|
||||
self.assertEqual(profile["role"], None)
|
||||
self.assertEqual(profile["profile_address"], None)
|
||||
self.assertEqual(profile["execution_profile"], None)
|
||||
self.assertEqual(profile["audit_label"], "reviewer-runtime")
|
||||
self.assertEqual(profile["auth_source_type"], "keychain")
|
||||
self.assertEqual(profile["forbidden_operations"], ["merge"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
@patch("mcp_server.get_profile")
|
||||
def test_whoami_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
|
||||
mock_api.return_value = {"id": 7, "login": "rev"}
|
||||
mock_get_profile.return_value = {
|
||||
"profile_name": "prgs.gitea.reviewer",
|
||||
"allowed_operations": ["read", "review"],
|
||||
"forbidden_operations": ["merge"],
|
||||
"audit_label": "rev-audit",
|
||||
"token_source_name": "keychain:prgs-reviewer-token",
|
||||
"auth_source_type": "keychain",
|
||||
"base_url": "https://gitea.prgs.cc",
|
||||
"username": "sysadmin",
|
||||
"default_owner": "Scaled-Tech-Consulting",
|
||||
"profile_path": "prgs.gitea.reviewer",
|
||||
"environment": "prgs",
|
||||
"service": "gitea",
|
||||
"identity": "reviewer",
|
||||
"role": "reviewer",
|
||||
"execution_profile": "reviewer-profile",
|
||||
}
|
||||
result = gitea_whoami(remote="prgs")
|
||||
profile = result["profile"]
|
||||
self.assertEqual(profile["environment"], "prgs")
|
||||
self.assertEqual(profile["service"], "gitea")
|
||||
self.assertEqual(profile["identity"], "reviewer")
|
||||
self.assertEqual(profile["role"], "reviewer")
|
||||
self.assertEqual(profile["profile_address"], "prgs.gitea.reviewer")
|
||||
self.assertEqual(profile["execution_profile"], "reviewer-profile")
|
||||
self.assertEqual(profile["audit_label"], "rev-audit")
|
||||
self.assertEqual(profile["auth_source_type"], "keychain")
|
||||
self.assertEqual(profile["forbidden_operations"], ["merge"])
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile discovery (read-only) — issue #13
|
||||
@@ -1082,6 +1143,39 @@ class TestProfileDiscovery(unittest.TestCase):
|
||||
self.assertIsNone(result["remote"])
|
||||
self.assertIn("remote_error", result)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
@patch("mcp_server.get_profile")
|
||||
def test_get_profile_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
|
||||
mock_api.return_value = {"id": 7, "login": "rev"}
|
||||
mock_get_profile.return_value = {
|
||||
"profile_name": "prgs.gitea.reviewer",
|
||||
"allowed_operations": ["read", "review"],
|
||||
"forbidden_operations": ["merge"],
|
||||
"audit_label": "rev-audit",
|
||||
"token_source_name": "keychain:prgs-reviewer-token",
|
||||
"auth_source_type": "keychain",
|
||||
"base_url": "https://gitea.prgs.cc",
|
||||
"username": "sysadmin",
|
||||
"default_owner": "Scaled-Tech-Consulting",
|
||||
"profile_path": "prgs.gitea.reviewer",
|
||||
"environment": "prgs",
|
||||
"service": "gitea",
|
||||
"identity": "reviewer",
|
||||
"role": "reviewer",
|
||||
"execution_profile": "reviewer-profile",
|
||||
}
|
||||
result = gitea_get_profile(remote="prgs")
|
||||
self.assertEqual(result["environment"], "prgs")
|
||||
self.assertEqual(result["service"], "gitea")
|
||||
self.assertEqual(result["identity"], "reviewer")
|
||||
self.assertEqual(result["role"], "reviewer")
|
||||
self.assertEqual(result["profile_address"], "prgs.gitea.reviewer")
|
||||
self.assertEqual(result["execution_profile"], "reviewer-profile")
|
||||
self.assertEqual(result["auth_source_type"], "keychain")
|
||||
self.assertEqual(result["forbidden_operations"], ["merge"])
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PR eligibility checks (read-only) — issue #14
|
||||
@@ -1811,3 +1905,199 @@ class TestEndpointRedaction(unittest.TestCase):
|
||||
from mcp_server import gitea_audit_config
|
||||
result = gitea_audit_config()
|
||||
self.assertFalse(result["configured"])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Issue comment tools (#126)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestIssueCommentTools(unittest.TestCase):
|
||||
"""gitea_list_issue_comments / gitea_create_issue_comment (#126).
|
||||
|
||||
Issue discussion comments are distinct from PR reviews: they hit the
|
||||
/issues/{n}/comments endpoint, are gated on gitea.read (list) and
|
||||
gitea.issue.comment (create) — never on the gitea.pr.* review/merge
|
||||
family — and their normal output is LLM-safe (no endpoint URLs; the
|
||||
GITEA_MCP_REVEAL_ENDPOINTS opt-in restores links for local diagnostics).
|
||||
"""
|
||||
|
||||
AUTHOR_ENV = {
|
||||
"GITEA_PROFILE_NAME": "gitea-author",
|
||||
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _comment(cid=101, login="alice", body="hello world"):
|
||||
return {
|
||||
"id": cid,
|
||||
"user": {"login": login},
|
||||
"body": body,
|
||||
"created_at": "2026-07-03T00:00:00Z",
|
||||
"updated_at": "2026-07-03T01:00:00Z",
|
||||
"html_url": (
|
||||
"https://gitea.example.com/o/r/issues/9#issuecomment-%d" % cid
|
||||
),
|
||||
}
|
||||
|
||||
# -- list ----------------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_lists_comments(self, _auth, mock_api):
|
||||
mock_api.return_value = [self._comment(101), self._comment(102, "bob")]
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
||||
self.assertTrue(result["success"])
|
||||
self.assertEqual(result["issue_number"], 9)
|
||||
self.assertEqual(len(result["comments"]), 2)
|
||||
first = result["comments"][0]
|
||||
self.assertEqual(first["id"], 101)
|
||||
self.assertEqual(first["author"], "alice")
|
||||
self.assertEqual(first["body"], "hello world")
|
||||
self.assertEqual(first["created_at"], "2026-07-03T00:00:00Z")
|
||||
self.assertEqual(first["updated_at"], "2026-07-03T01:00:00Z")
|
||||
url = mock_api.call_args[0][1]
|
||||
self.assertIn("/issues/9/comments", url)
|
||||
self.assertEqual(mock_api.call_args[0][0], "GET")
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_list_output_has_no_urls_by_default(self, _auth, mock_api):
|
||||
mock_api.return_value = [self._comment()]
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
||||
blob = json.dumps(result)
|
||||
self.assertNotIn("https://", blob)
|
||||
self.assertNotIn("http://", blob)
|
||||
self.assertNotIn("url", blob)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_list_reveal_opt_in_includes_url(self, _auth, mock_api):
|
||||
mock_api.return_value = [self._comment()]
|
||||
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
||||
self.assertIn("issuecomment-101", result["comments"][0]["url"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_list_blocked_without_read_permission(self, _auth, mock_api):
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-writer-only",
|
||||
"GITEA_ALLOWED_OPERATIONS": "gitea.issue.comment"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
||||
self.assertFalse(result["success"])
|
||||
self.assertTrue(result["reasons"])
|
||||
mock_api.assert_not_called()
|
||||
|
||||
def test_list_unknown_remote_raises(self):
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
with self.assertRaises(ValueError):
|
||||
gitea_list_issue_comments(issue_number=9, remote="nope")
|
||||
|
||||
# -- create ----------------------------------------------------------------
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_creates_comment(self, _auth, mock_api):
|
||||
mock_api.return_value = self._comment(555, "gitea-author", "posted")
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
result = gitea_create_issue_comment(
|
||||
issue_number=9, body="posted", remote="prgs")
|
||||
self.assertTrue(result["success"])
|
||||
self.assertEqual(result["comment_id"], 555)
|
||||
self.assertEqual(result["issue_number"], 9)
|
||||
self.assertEqual(mock_api.call_args[0][0], "POST")
|
||||
url = mock_api.call_args[0][1]
|
||||
self.assertIn("/issues/9/comments", url)
|
||||
self.assertIn("gitea.prgs.cc", url)
|
||||
self.assertIn("Scaled-Tech-Consulting", url)
|
||||
self.assertEqual(mock_api.call_args[0][3], {"body": "posted"})
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_output_has_no_urls_by_default(self, _auth, mock_api):
|
||||
mock_api.return_value = self._comment(555)
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
result = gitea_create_issue_comment(
|
||||
issue_number=9, body="posted", remote="prgs")
|
||||
blob = json.dumps(result)
|
||||
self.assertNotIn("https://", blob)
|
||||
self.assertNotIn("http://", blob)
|
||||
self.assertNotIn("url", blob)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_reveal_opt_in_includes_url(self, _auth, mock_api):
|
||||
mock_api.return_value = self._comment(555)
|
||||
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_create_issue_comment(
|
||||
issue_number=9, body="posted", remote="prgs")
|
||||
self.assertIn("issuecomment-555", result["url"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_review_permissions_do_not_grant_issue_comments(self, _auth, mock_api):
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS":
|
||||
"gitea.read,gitea.pr.review,gitea.pr.comment,"
|
||||
"gitea.pr.approve,gitea.pr.merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_create_issue_comment(
|
||||
issue_number=9, body="posted", remote="prgs")
|
||||
self.assertFalse(result["success"])
|
||||
self.assertFalse(result["performed"])
|
||||
self.assertTrue(result["reasons"])
|
||||
mock_api.assert_not_called()
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_forbidden_overrides_allowed(self, _auth, mock_api):
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
||||
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.issue.comment"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_create_issue_comment(
|
||||
issue_number=9, body="posted", remote="prgs")
|
||||
self.assertFalse(result["success"])
|
||||
mock_api.assert_not_called()
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_empty_body_blocked(self, _auth, mock_api):
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
result = gitea_create_issue_comment(
|
||||
issue_number=9, body=" ", remote="prgs")
|
||||
self.assertFalse(result["success"])
|
||||
mock_api.assert_not_called()
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_missing_issue_error_is_redacted(self, _auth, mock_api):
|
||||
mock_api.side_effect = RuntimeError(
|
||||
"Gitea API error 404 for issue (auth was token abc123secret)")
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
with self.assertRaises(RuntimeError) as cm:
|
||||
gitea_create_issue_comment(
|
||||
issue_number=99999, body="posted", remote="prgs")
|
||||
msg = str(cm.exception)
|
||||
self.assertNotIn("abc123secret", msg)
|
||||
self.assertIn("404", msg)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_create_never_touches_review_endpoints(self, _auth, mock_api):
|
||||
mock_api.return_value = self._comment(555)
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
gitea_create_issue_comment(
|
||||
issue_number=9, body="posted", remote="prgs")
|
||||
for call in mock_api.call_args_list:
|
||||
self.assertNotIn("reviews", call[0][1])
|
||||
self.assertNotIn("/pulls/", call[0][1])
|
||||
|
||||
def test_create_unknown_remote_raises(self):
|
||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
||||
with self.assertRaises(ValueError):
|
||||
gitea_create_issue_comment(
|
||||
issue_number=9, body="x", remote="nope")
|
||||
|
||||
@@ -0,0 +1,232 @@
|
||||
"""Tests for the operator guide / project skills MCP tools (#128).
|
||||
|
||||
Read-only capability-discovery tools: mcp_get_control_plane_guide,
|
||||
mcp_list_project_skills, mcp_get_skill_guide. Each is tested by calling the
|
||||
underlying function directly with mocked API responses.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
||||
|
||||
import mcp_server # noqa: E402
|
||||
from mcp_server import ( # noqa: E402
|
||||
mcp_get_control_plane_guide,
|
||||
mcp_list_project_skills,
|
||||
mcp_get_skill_guide,
|
||||
)
|
||||
|
||||
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
|
||||
|
||||
AUTHOR_ENV = {
|
||||
"GITEA_PROFILE_NAME": "author-test",
|
||||
"GITEA_ALLOWED_OPERATIONS":
|
||||
"gitea.read,gitea.repo.commit,gitea.branch.create,"
|
||||
"gitea.branch.push,gitea.pr.create,gitea.pr.comment",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.approve,gitea.pr.merge",
|
||||
}
|
||||
|
||||
REVIEWER_ENV = {
|
||||
"GITEA_PROFILE_NAME": "reviewer-test",
|
||||
"GITEA_ALLOWED_OPERATIONS":
|
||||
"gitea.read,gitea.pr.review,gitea.pr.comment,gitea.pr.approve,"
|
||||
"gitea.pr.request_changes,gitea.pr.merge",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.create,gitea.branch.push",
|
||||
}
|
||||
|
||||
EXPECTED_SKILLS = [
|
||||
"gitea-issue-authoring",
|
||||
"gitea-pr-creation",
|
||||
"gitea-pr-review",
|
||||
"gitea-pr-merge",
|
||||
"gitea-issue-comments",
|
||||
"profile-switching",
|
||||
"redaction-security-review",
|
||||
"jenkins-readonly",
|
||||
"glitchtip-readonly",
|
||||
"release-operator",
|
||||
]
|
||||
|
||||
|
||||
class GuideTestBase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
mcp_server._IDENTITY_CACHE.clear()
|
||||
|
||||
def tearDown(self):
|
||||
mcp_server._IDENTITY_CACHE.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# mcp_get_control_plane_guide
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestControlPlaneGuide(GuideTestBase):
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_author_profile_guidance(self, _auth, _api):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
g = mcp_get_control_plane_guide(remote="prgs")
|
||||
self.assertTrue(g["read_only"])
|
||||
self.assertEqual(g["profile"]["role_kind"], "author")
|
||||
self.assertEqual(g["identity"]["authenticated_username"], "author-bot")
|
||||
self.assertEqual(g["identity"]["status"], "verified")
|
||||
blob = " ".join(g["guidance"]).lower()
|
||||
self.assertIn("forbidden", blob)
|
||||
self.assertIn("review", blob)
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "reviewer-bot"})
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_reviewer_profile_guidance(self, _auth, _api):
|
||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
||||
g = mcp_get_control_plane_guide(remote="prgs")
|
||||
self.assertEqual(g["profile"]["role_kind"], "reviewer")
|
||||
blob = " ".join(g["guidance"]).lower()
|
||||
self.assertIn("eligibility", blob)
|
||||
self.assertIn("pinned", blob)
|
||||
|
||||
@patch("mcp_server.get_auth_header", return_value=None)
|
||||
def test_unresolved_identity_instructs_stop(self, _auth):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
g = mcp_get_control_plane_guide(remote="prgs")
|
||||
self.assertEqual(g["identity"]["status"], "unresolved")
|
||||
self.assertIsNone(g["identity"]["authenticated_username"])
|
||||
self.assertIn("STOP", g["identity"]["instruction"])
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_guide_is_read_only(self, _auth, mock_api):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
mcp_get_control_plane_guide(remote="prgs")
|
||||
for call in mock_api.call_args_list:
|
||||
self.assertEqual(call[0][0], "GET")
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_no_urls_or_keychain_ids_by_default(self, _auth, _api):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
g = mcp_get_control_plane_guide(remote="prgs")
|
||||
blob = json.dumps(g)
|
||||
self.assertNotIn("https://", blob)
|
||||
self.assertNotIn("http://", blob)
|
||||
self.assertNotIn("keychain:", blob)
|
||||
self.assertNotIn(FAKE_AUTH, blob)
|
||||
self.assertNotIn("server", g["identity"])
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_reveal_opt_in_includes_server(self, _auth, _api):
|
||||
env = dict(AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
g = mcp_get_control_plane_guide(remote="prgs")
|
||||
self.assertIn("gitea.prgs.cc", g["identity"]["server"])
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_rules_cover_required_topics(self, _auth, _api):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
g = mcp_get_control_plane_guide(remote="prgs")
|
||||
rules = g["rules"]
|
||||
for key in ("hard_stops", "fail_closed", "head_sha_pinning",
|
||||
"merge_confirmation", "redaction", "separation",
|
||||
"profile_switching", "identity_verification"):
|
||||
self.assertIn(key, rules)
|
||||
self.assertIn("MERGE PR", json.dumps(rules["merge_confirmation"]))
|
||||
self.assertTrue(rules["hard_stops"])
|
||||
|
||||
def test_unknown_remote_raises(self):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
with self.assertRaises(ValueError):
|
||||
mcp_get_control_plane_guide(remote="nope")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# mcp_list_project_skills
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestProjectSkills(GuideTestBase):
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
def test_registry_complete_and_no_api_calls(self, mock_api):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
r = mcp_list_project_skills()
|
||||
self.assertTrue(r["read_only"])
|
||||
names = [s["name"] for s in r["skills"]]
|
||||
for expected in EXPECTED_SKILLS:
|
||||
self.assertIn(expected, names)
|
||||
self.assertEqual(r["count"], len(r["skills"]))
|
||||
for s in r["skills"]:
|
||||
self.assertTrue(s["description"])
|
||||
self.assertTrue(s["when_to_use"])
|
||||
self.assertIn("required_operations", s)
|
||||
self.assertIn("status", s)
|
||||
self.assertIn("available_to_current_profile", s)
|
||||
mock_api.assert_not_called()
|
||||
|
||||
def test_profile_aware_availability(self):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
r = mcp_list_project_skills()
|
||||
by_name = {s["name"]: s for s in r["skills"]}
|
||||
self.assertTrue(by_name["gitea-pr-creation"]["available_to_current_profile"])
|
||||
self.assertFalse(by_name["gitea-pr-merge"]["available_to_current_profile"])
|
||||
|
||||
def test_unimplemented_services_marked(self):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
r = mcp_list_project_skills()
|
||||
by_name = {s["name"]: s for s in r["skills"]}
|
||||
self.assertNotEqual(by_name["jenkins-readonly"]["status"], "available")
|
||||
self.assertNotEqual(by_name["glitchtip-readonly"]["status"], "available")
|
||||
|
||||
def test_no_urls_in_registry(self):
|
||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
||||
r = mcp_list_project_skills()
|
||||
blob = json.dumps(r)
|
||||
self.assertNotIn("https://", blob)
|
||||
self.assertNotIn("http://", blob)
|
||||
self.assertNotIn("keychain:", blob)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# mcp_get_skill_guide
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestSkillGuide(GuideTestBase):
|
||||
|
||||
def test_known_skill_returns_steps(self):
|
||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
||||
r = mcp_get_skill_guide("gitea-pr-merge")
|
||||
self.assertTrue(r["success"])
|
||||
self.assertEqual(r["skill"]["name"], "gitea-pr-merge")
|
||||
self.assertTrue(r["steps"])
|
||||
self.assertIn("MERGE PR", " ".join(r["steps"]))
|
||||
|
||||
def test_case_and_whitespace_normalized(self):
|
||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
||||
r = mcp_get_skill_guide(" Gitea-PR-Merge ")
|
||||
self.assertTrue(r["success"])
|
||||
|
||||
def test_unknown_skill_fails_closed(self):
|
||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
||||
r = mcp_get_skill_guide("no-such-skill")
|
||||
self.assertFalse(r["success"])
|
||||
self.assertTrue(r["reasons"])
|
||||
for expected in EXPECTED_SKILLS:
|
||||
self.assertIn(expected, r["valid_skills"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
def test_read_only_no_api_calls(self, mock_api):
|
||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
||||
mcp_get_skill_guide("gitea-pr-review")
|
||||
mock_api.assert_not_called()
|
||||
|
||||
def test_no_urls_in_skill_guides(self):
|
||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
||||
for name in EXPECTED_SKILLS:
|
||||
r = mcp_get_skill_guide(name)
|
||||
blob = json.dumps(r)
|
||||
self.assertNotIn("https://", blob)
|
||||
self.assertNotIn("keychain:", blob)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,256 @@
|
||||
"""Tests for runtime context, profile activation, profile listing, and enhanced error clarity.
|
||||
|
||||
Covers Issue #131 requirements.
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
||||
|
||||
import gitea_config
|
||||
import gitea_auth
|
||||
import mcp_server
|
||||
|
||||
CONFIG_SWITCHING_DISABLED = {
|
||||
"version": 2,
|
||||
"contexts": {
|
||||
"ctx": {
|
||||
"enabled": True,
|
||||
"gitea": {
|
||||
"enabled": True,
|
||||
"base_url": "https://gitea.example.com"
|
||||
}
|
||||
}
|
||||
},
|
||||
"profiles": {
|
||||
"author-profile": {
|
||||
"enabled": True,
|
||||
"context": "ctx",
|
||||
"role": "author",
|
||||
"username": "author-user",
|
||||
"auth": {"type": "env", "name": "GITEA_TOKEN_AUTHOR"},
|
||||
"allowed_operations": ["gitea.read", "gitea.pr.create", "gitea.branch.push"],
|
||||
"forbidden_operations": ["gitea.pr.approve", "gitea.pr.merge"],
|
||||
"execution_profile": "author-profile"
|
||||
},
|
||||
"reviewer-profile": {
|
||||
"enabled": True,
|
||||
"context": "ctx",
|
||||
"role": "reviewer",
|
||||
"username": "reviewer-user",
|
||||
"auth": {"type": "env", "name": "GITEA_TOKEN_REVIEWER"},
|
||||
"allowed_operations": ["gitea.read", "gitea.pr.approve", "gitea.pr.merge"],
|
||||
"forbidden_operations": ["gitea.pr.create", "gitea.branch.push"],
|
||||
"execution_profile": "reviewer-profile"
|
||||
}
|
||||
},
|
||||
"rules": {
|
||||
"allow_runtime_switching": False
|
||||
}
|
||||
}
|
||||
|
||||
CONFIG_SWITCHING_ENABLED = {
|
||||
**CONFIG_SWITCHING_DISABLED,
|
||||
"rules": {
|
||||
"allow_runtime_switching": True
|
||||
}
|
||||
}
|
||||
|
||||
class TestRuntimeClarity(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._remotes_patch = patch.dict(mcp_server.REMOTES, {
|
||||
"dadeschools": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"},
|
||||
"prgs": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"}
|
||||
})
|
||||
self._remotes_patch.start()
|
||||
mcp_server._IDENTITY_CACHE.clear()
|
||||
gitea_config._active_profile_override = None
|
||||
self._dir = tempfile.TemporaryDirectory()
|
||||
self.config_path = os.path.join(self._dir.name, "profiles.json")
|
||||
self._write_config(CONFIG_SWITCHING_DISABLED)
|
||||
|
||||
def tearDown(self):
|
||||
self._remotes_patch.stop()
|
||||
mcp_server._IDENTITY_CACHE.clear()
|
||||
gitea_config._active_profile_override = None
|
||||
self._dir.cleanup()
|
||||
|
||||
def _write_config(self, obj):
|
||||
with open(self.config_path, "w", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(obj))
|
||||
|
||||
def _env(self, profile="author-profile", reveal="0"):
|
||||
return {
|
||||
"GITEA_MCP_CONFIG": self.config_path,
|
||||
"GITEA_MCP_PROFILE": profile,
|
||||
"GITEA_MCP_REVEAL_ENDPOINTS": reveal,
|
||||
"GITEA_TOKEN_AUTHOR": "author-pass",
|
||||
"GITEA_TOKEN_REVIEWER": "reviewer-pass",
|
||||
}
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# gitea_get_runtime_context
|
||||
# -------------------------------------------------------------------------
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-user"})
|
||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
||||
def test_get_runtime_context_author(self, _auth, _api):
|
||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
||||
ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools")
|
||||
self.assertEqual(ctx["active_profile"], "author-profile")
|
||||
self.assertEqual(ctx["authenticated_username"], "author-user")
|
||||
self.assertEqual(ctx["config_model"], "v2-contexts")
|
||||
self.assertEqual(ctx["profile_source"], "config file profile")
|
||||
self.assertFalse(ctx["runtime_switching_supported"])
|
||||
self.assertEqual(ctx["profile_mode"], "static-profile")
|
||||
self.assertFalse(ctx["review_merge_allowed"])
|
||||
self.assertEqual(ctx["suggested_fix"], "reviewer namespace")
|
||||
self.assertIn("does not permit review or merge", ctx["review_merge_blocked_reasons"][0])
|
||||
self.assertIn("Switch to the reviewer MCP session", ctx["safe_next_action"])
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "reviewer-user"})
|
||||
@patch("mcp_server.get_auth_header", return_value="token reviewer-pass")
|
||||
def test_get_runtime_context_reviewer(self, _auth, _api):
|
||||
with patch.dict(os.environ, self._env("reviewer-profile"), clear=True):
|
||||
ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools")
|
||||
self.assertEqual(ctx["active_profile"], "reviewer-profile")
|
||||
self.assertEqual(ctx["authenticated_username"], "reviewer-user")
|
||||
self.assertTrue(ctx["review_merge_allowed"])
|
||||
self.assertEqual(ctx["suggested_fix"], "none")
|
||||
self.assertEqual(ctx["safe_next_action"], "None; ready for operations.")
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# gitea_list_profiles
|
||||
# -------------------------------------------------------------------------
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-user"})
|
||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
||||
def test_list_profiles_redacted_by_default(self, _auth, _api):
|
||||
with patch.dict(os.environ, self._env("author-profile", reveal="0"), clear=True):
|
||||
res = mcp_server.gitea_list_profiles()
|
||||
profiles = res["profiles"]
|
||||
self.assertEqual(len(profiles), 2)
|
||||
|
||||
author_prof = next(p for p in profiles if p["name"] == "author-profile")
|
||||
self.assertTrue(author_prof["is_active"])
|
||||
self.assertEqual(author_prof["role_kind"], "author")
|
||||
self.assertEqual(author_prof["auth"]["name"], "<redacted>")
|
||||
self.assertEqual(author_prof["base_url"], "<redacted>")
|
||||
self.assertEqual(author_prof["identity_status"], "verified")
|
||||
|
||||
reviewer_prof = next(p for p in profiles if p["name"] == "reviewer-profile")
|
||||
self.assertFalse(reviewer_prof["is_active"])
|
||||
self.assertEqual(reviewer_prof["role_kind"], "reviewer")
|
||||
self.assertEqual(reviewer_prof["auth"]["name"], "<redacted>")
|
||||
self.assertEqual(reviewer_prof["base_url"], "<redacted>")
|
||||
self.assertEqual(reviewer_prof["identity_status"], "credentials present")
|
||||
|
||||
@patch("mcp_server.api_request", return_value={"login": "author-user"})
|
||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
||||
def test_list_profiles_revealed_under_opt_in(self, _auth, _api):
|
||||
with patch.dict(os.environ, self._env("author-profile", reveal="1"), clear=True):
|
||||
res = mcp_server.gitea_list_profiles()
|
||||
profiles = res["profiles"]
|
||||
|
||||
author_prof = next(p for p in profiles if p["name"] == "author-profile")
|
||||
self.assertEqual(author_prof["auth"]["name"], "GITEA_TOKEN_AUTHOR")
|
||||
self.assertEqual(author_prof["base_url"], "https://gitea.example.com")
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# gitea_activate_profile
|
||||
# -------------------------------------------------------------------------
|
||||
def test_activate_profile_fails_when_disabled(self):
|
||||
self._write_config(CONFIG_SWITCHING_DISABLED)
|
||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
||||
res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile")
|
||||
self.assertFalse(res["success"])
|
||||
self.assertIn("switching is disabled", res["message"].lower())
|
||||
self.assertIsNone(gitea_config._active_profile_override)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header")
|
||||
def test_activate_profile_succeeds_when_enabled(self, mock_auth, mock_api):
|
||||
self._write_config(CONFIG_SWITCHING_ENABLED)
|
||||
|
||||
# Setup mock responses for whoami checks
|
||||
mock_auth.side_effect = ["token author-pass", "token reviewer-pass"]
|
||||
mock_api.side_effect = [{"login": "author-user"}, {"login": "reviewer-user"}]
|
||||
|
||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
||||
# Check before state
|
||||
self.assertEqual(gitea_config.selected_profile_name(), "author-profile")
|
||||
|
||||
res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile")
|
||||
|
||||
self.assertTrue(res["success"])
|
||||
self.assertEqual(res["before_profile"], "author-profile")
|
||||
self.assertEqual(res["before_identity"], "author-user")
|
||||
self.assertEqual(res["after_profile"], "reviewer-profile")
|
||||
self.assertEqual(res["after_identity"], "reviewer-user")
|
||||
|
||||
# Global variable override should be set
|
||||
self.assertEqual(gitea_config._active_profile_override, "reviewer-profile")
|
||||
self.assertEqual(gitea_config.selected_profile_name(), "reviewer-profile")
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# gitea_check_pr_eligibility enhanced error clarity
|
||||
# -------------------------------------------------------------------------
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value="token reviewer-pass")
|
||||
def test_eligibility_failure_self_author(self, _auth, mock_api):
|
||||
# PR is authored by "reviewer-user" and reviewer-user is trying to approve it.
|
||||
mock_api.side_effect = [
|
||||
{"login": "reviewer-user"}, # user whoami lookup
|
||||
{"user": {"login": "reviewer-user"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
|
||||
]
|
||||
|
||||
with patch.dict(os.environ, self._env("reviewer-profile"), clear=True):
|
||||
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
|
||||
|
||||
self.assertFalse(res["eligible"])
|
||||
self.assertEqual(res["active_identity"], "reviewer-user")
|
||||
self.assertTrue(res["self_author"])
|
||||
self.assertEqual(res["required_identity"], "Any Gitea user other than PR author 'reviewer-user'")
|
||||
self.assertIn("Self-review/self-merge is forbidden", res["safe_next_step"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
||||
def test_eligibility_failure_missing_permissions(self, _auth, mock_api):
|
||||
# PR is authored by "someone-else" and author-user (who lacks approve) is trying to approve it.
|
||||
mock_api.side_effect = [
|
||||
{"login": "author-user"}, # user whoami lookup
|
||||
{"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
|
||||
]
|
||||
|
||||
self._write_config(CONFIG_SWITCHING_ENABLED) # Enable switching to verify fixable_by_profile_switch
|
||||
|
||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
||||
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
|
||||
|
||||
self.assertFalse(res["eligible"])
|
||||
self.assertEqual(res["missing_permission"], "gitea.pr.approve")
|
||||
self.assertTrue(res["fixable_by_profile_switch"])
|
||||
self.assertFalse(res["requires_different_namespace"])
|
||||
self.assertIn("Switch to a reviewer profile by calling gitea_activate_profile", res["safe_next_step"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
||||
def test_eligibility_failure_missing_permissions_switching_disabled(self, _auth, mock_api):
|
||||
# PR is authored by "someone-else" and author-user (lacks approve) tries to approve it when switching is disabled.
|
||||
mock_api.side_effect = [
|
||||
{"login": "author-user"}, # user whoami lookup
|
||||
{"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
|
||||
]
|
||||
|
||||
self._write_config(CONFIG_SWITCHING_DISABLED) # Disable switching
|
||||
|
||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
||||
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
|
||||
|
||||
self.assertFalse(res["eligible"])
|
||||
self.assertEqual(res["missing_permission"], "gitea.pr.approve")
|
||||
self.assertFalse(res["fixable_by_profile_switch"])
|
||||
self.assertTrue(res["requires_different_namespace"])
|
||||
self.assertIn("Switch to the reviewer MCP session", res["safe_next_step"])
|
||||
Reference in New Issue
Block a user