Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ebdfd62e56 |
@@ -179,30 +179,6 @@ allowed/forbidden membership check):
|
|||||||
therefore never silently widen permissions.
|
therefore never silently widen permissions.
|
||||||
- An empty or missing `allowed_operations` list denies everything.
|
- An empty or missing `allowed_operations` list denies everything.
|
||||||
|
|
||||||
## Issue comments versus PR reviews (#126)
|
|
||||||
|
|
||||||
Issue discussion comments and PR reviews are different capabilities and are
|
|
||||||
gated by different operations:
|
|
||||||
|
|
||||||
- **Issue comments** (`gitea_list_issue_comments`, `gitea_create_issue_comment`)
|
|
||||||
post to and read from an issue's discussion thread
|
|
||||||
(`/issues/{n}/comments`). Listing requires `gitea.read`; creating requires
|
|
||||||
`gitea.issue.comment`. They never submit review verdicts.
|
|
||||||
- **PR reviews** (`gitea_review_pr`, `gitea_submit_pr_review`) submit
|
|
||||||
approve/request-changes/comment verdicts on pull requests
|
|
||||||
(`/pulls/{n}/reviews`) and are gated by the `gitea.pr.*` family
|
|
||||||
(`gitea.pr.review`, `gitea.pr.approve`, `gitea.pr.request_changes`,
|
|
||||||
`gitea.pr.comment`).
|
|
||||||
|
|
||||||
A profile holding the full PR review/merge set still cannot post issue
|
|
||||||
discussion comments unless it also allows `gitea.issue.comment`, and vice
|
|
||||||
versa — neither family implies the other. Both comment tools require an
|
|
||||||
explicit issue number; the target repo comes only from the standard
|
|
||||||
remote/org/repo arguments. Create operations are audit-logged
|
|
||||||
(`create_issue_comment`) when `GITEA_AUDIT_LOG` is configured, errors are
|
|
||||||
redacted, and normal output contains no endpoint URLs
|
|
||||||
(`GITEA_MCP_REVEAL_ENDPOINTS=1` is the local admin opt-in for web links).
|
|
||||||
|
|
||||||
## Identity and fail-closed rules
|
## Identity and fail-closed rules
|
||||||
|
|
||||||
Before **any** mutating action, a workflow must know both:
|
Before **any** mutating action, a workflow must know both:
|
||||||
@@ -252,26 +228,6 @@ the "one server per trust boundary" model described in
|
|||||||
[`tool-boundaries.md`](tool-boundaries.md) and
|
[`tool-boundaries.md`](tool-boundaries.md) and
|
||||||
[`credential-isolation.md`](credential-isolation.md).
|
[`credential-isolation.md`](credential-isolation.md).
|
||||||
|
|
||||||
## Profile Activation and Runtime Identity Clarity (#131)
|
|
||||||
|
|
||||||
To make Gitea MCP profile activation and runtime identity state explicit, the following mechanisms are supported:
|
|
||||||
|
|
||||||
### 1. Static-Profile vs. Dynamic-Profile Mode
|
|
||||||
- **Static-Profile Mode (Default):** The active profile is fixed at server launch based on the `GITEA_MCP_PROFILE` environment variable (with `GITEA_MCP_CONFIG` pointing to the config path). Local environment variables are static once a subprocess is spawned by the host. Modifying the environment variables on the host does not dynamically update an already-connected MCP server process.
|
|
||||||
- **Dynamic-Profile Mode:** Profile switching via the `gitea_activate_profile` tool is supported **only** if the configuration JSON explicitly opts in by setting `"allow_runtime_switching": true` under rules or top-level keys. Otherwise, attempting to switch profiles dynamically will fail closed.
|
|
||||||
|
|
||||||
### 2. Dual MCP Namespaces Recommendation
|
|
||||||
For security-sensitive or high-risk tasks, the preferred safety model uses separate, isolated MCP server instances (namespaces/sessions) launched with static profiles:
|
|
||||||
- `gitea-author`: Exposes tools configured with author permissions; cannot perform approvals or merges.
|
|
||||||
- `gitea-reviewer`: Exposes tools configured with reviewer permissions; used for PR reviews and merges.
|
|
||||||
This layout maintains physical separation of credentials and prevents privilege escalation within a single session.
|
|
||||||
|
|
||||||
### 3. Verification Post-Switching
|
|
||||||
When dynamic profile switching is enabled and a profile is activated via `gitea_activate_profile`, the session MUST immediately:
|
|
||||||
1. Clear the cached identity.
|
|
||||||
2. Call `gitea_whoami` with the target remote to prove and verify the fresh Gitea authenticated identity.
|
|
||||||
This guarantees the active profile operations align with the actual Gitea authenticated user credential.
|
|
||||||
|
|
||||||
## Relationship to roadmap issues
|
## Relationship to roadmap issues
|
||||||
|
|
||||||
This document defines the **model only**. Related work is tracked separately
|
This document defines the **model only**. Related work is tracked separately
|
||||||
|
|||||||
@@ -18,17 +18,6 @@ behavior they rely on already exists (canonical runtime profiles, the
|
|||||||
interactive setup menu, identity/eligibility checks, gated review/merge, and
|
interactive setup menu, identity/eligibility checks, gated review/merge, and
|
||||||
audit logging). See [Related documents](#related-documents).
|
audit logging). See [Related documents](#related-documents).
|
||||||
|
|
||||||
> **New session? Call the guide tools first (#128).** Before using any other
|
|
||||||
> Gitea MCP tool in a fresh session, call `mcp_get_control_plane_guide`
|
|
||||||
> (read-only): it reports the active profile, authenticated identity,
|
|
||||||
> allowed/forbidden operations, profile-aware do/don't guidance, and the
|
|
||||||
> non-negotiable rules (hard stops, fail-closed behavior, head-SHA pinning,
|
|
||||||
> merge confirmation, redaction, author/reviewer separation, profile
|
|
||||||
> switching). Then call `mcp_list_project_skills` to discover the available
|
|
||||||
> project workflows and `mcp_get_skill_guide(<name>)` for step-by-step
|
|
||||||
> instructions. This replaces long pasted operator prompts for the standard
|
|
||||||
> rules; operator prompts still control task-specific scope.
|
|
||||||
|
|
||||||
For cross-project use, copy the portable workflow skill at
|
For cross-project use, copy the portable workflow skill at
|
||||||
[`../skills/llm-project-workflow/SKILL.md`](../skills/llm-project-workflow/SKILL.md).
|
[`../skills/llm-project-workflow/SKILL.md`](../skills/llm-project-workflow/SKILL.md).
|
||||||
It extracts the issue-first, isolated-worktree, no-self-review, profile-safety,
|
It extracts the issue-first, isolated-worktree, no-self-review, profile-safety,
|
||||||
|
|||||||
+2
-1
@@ -498,4 +498,5 @@ def get_profile():
|
|||||||
"identity": jp.get("identity") or None,
|
"identity": jp.get("identity") or None,
|
||||||
"role": jp.get("role") or None,
|
"role": jp.get("role") or None,
|
||||||
"execution_profile": jp.get("execution_profile") or None,
|
"execution_profile": jp.get("execution_profile") or None,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -192,32 +192,11 @@ def config_path():
|
|||||||
return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None
|
return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None
|
||||||
|
|
||||||
|
|
||||||
_active_profile_override = None
|
|
||||||
|
|
||||||
|
|
||||||
def selected_profile_name():
|
def selected_profile_name():
|
||||||
"""Return the selected profile name from the environment, or None."""
|
"""Return the selected profile name from the environment, or None."""
|
||||||
if _active_profile_override is not None:
|
|
||||||
return _active_profile_override
|
|
||||||
return (os.environ.get(ENV_PROFILE) or "").strip() or None
|
return (os.environ.get(ENV_PROFILE) or "").strip() or None
|
||||||
|
|
||||||
|
|
||||||
def is_runtime_switching_enabled(path=None):
|
|
||||||
"""Check if runtime profile switching is explicitly enabled in config."""
|
|
||||||
try:
|
|
||||||
config = load_config(path)
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
if not config:
|
|
||||||
return False
|
|
||||||
rules = config.get("rules") or {}
|
|
||||||
if rules.get("allow_runtime_switching") is True:
|
|
||||||
return True
|
|
||||||
if config.get("allow_runtime_switching") is True:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def load_config(path=None):
|
def load_config(path=None):
|
||||||
"""Load and minimally validate the canonical JSON config.
|
"""Load and minimally validate the canonical JSON config.
|
||||||
|
|
||||||
|
|||||||
-954
@@ -597,83 +597,6 @@ def gitea_check_pr_eligibility(
|
|||||||
result["eligible"] = len(reasons) == 0
|
result["eligible"] = len(reasons) == 0
|
||||||
if result["eligible"]:
|
if result["eligible"]:
|
||||||
reasons.append("all eligibility checks passed")
|
reasons.append("all eligibility checks passed")
|
||||||
|
|
||||||
# Add enhanced error clarity details (#131)
|
|
||||||
auth_user = result["authenticated_user"]
|
|
||||||
pr_author = result["pr_author"]
|
|
||||||
profile_name = result["profile_name"]
|
|
||||||
is_self = bool(auth_user and pr_author and auth_user == pr_author)
|
|
||||||
|
|
||||||
result["active_identity"] = auth_user
|
|
||||||
result["active_profile"] = profile_name
|
|
||||||
result["self_author"] = is_self
|
|
||||||
|
|
||||||
# Determine missing permission
|
|
||||||
missing_perm = None
|
|
||||||
if not op_ok:
|
|
||||||
missing_perm = f"gitea.pr.{action}" if action in ("approve", "request_changes", "merge") else f"gitea.{action}"
|
|
||||||
|
|
||||||
result["missing_permission"] = missing_perm
|
|
||||||
|
|
||||||
# Determine required profile
|
|
||||||
req_profile = None
|
|
||||||
if action in ("approve", "request_changes", "review"):
|
|
||||||
req_profile = "A profile with reviewer role permissions (allowing approve/merge/review, and forbidding author operations)"
|
|
||||||
elif action == "merge":
|
|
||||||
req_profile = "A profile with reviewer role permissions and explicit merge permission"
|
|
||||||
result["required_profile"] = req_profile
|
|
||||||
|
|
||||||
# Determine required identity
|
|
||||||
req_identity = None
|
|
||||||
if is_self:
|
|
||||||
req_identity = f"Any Gitea user other than PR author '{pr_author}'"
|
|
||||||
result["required_identity"] = req_identity
|
|
||||||
|
|
||||||
# Determine if fixable by profile switch vs requires different namespace
|
|
||||||
switching_supported = gitea_config.is_runtime_switching_enabled()
|
|
||||||
fixable_by_switch = False
|
|
||||||
req_different_ns = False
|
|
||||||
|
|
||||||
if not op_ok:
|
|
||||||
config = gitea_config.load_config()
|
|
||||||
has_capable_profile = False
|
|
||||||
if config:
|
|
||||||
for name, p in (config.get("profiles") or {}).items():
|
|
||||||
p_allowed = p.get("allowed_operations", [])
|
|
||||||
p_forbidden = p.get("forbidden_operations", [])
|
|
||||||
ok, _ = gitea_config.check_operation(action, p_allowed, p_forbidden)
|
|
||||||
if ok:
|
|
||||||
has_capable_profile = True
|
|
||||||
break
|
|
||||||
if switching_supported and has_capable_profile:
|
|
||||||
fixable_by_switch = True
|
|
||||||
else:
|
|
||||||
req_different_ns = True
|
|
||||||
|
|
||||||
result["fixable_by_profile_switch"] = fixable_by_switch
|
|
||||||
result["requires_different_namespace"] = req_different_ns
|
|
||||||
|
|
||||||
# Determine safe next step
|
|
||||||
safe_step = "Ready."
|
|
||||||
if not result["eligible"]:
|
|
||||||
if is_self:
|
|
||||||
safe_step = "Self-review/self-merge is forbidden. Ask a different operator/reviewer to review and merge this PR."
|
|
||||||
elif not auth_user:
|
|
||||||
safe_step = f"Ask the operator to configure valid credentials/token for profile '{profile_name}'."
|
|
||||||
elif not op_ok:
|
|
||||||
if fixable_by_switch:
|
|
||||||
safe_step = f"Switch to a reviewer profile by calling gitea_activate_profile with a profile that allows {action}."
|
|
||||||
else:
|
|
||||||
safe_step = "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile."
|
|
||||||
elif result["pr_state"] != "open":
|
|
||||||
safe_step = "No action can be taken on a closed/merged PR."
|
|
||||||
elif action == "merge":
|
|
||||||
if result["mergeable"] is False:
|
|
||||||
safe_step = "Address conflicts in the PR branches first, or wait for CI checks to pass before merging."
|
|
||||||
elif result["mergeable"] is None:
|
|
||||||
safe_step = "Wait for Gitea to finish calculating mergeability or trigger a re-check."
|
|
||||||
|
|
||||||
result["safe_next_step"] = safe_step
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@@ -1441,583 +1364,6 @@ def gitea_view_issue(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def _issue_comment_gate(op: str) -> list[str]:
|
|
||||||
"""Profile permission check for issue-comment tools (#126).
|
|
||||||
|
|
||||||
Issue discussion comments are gated separately from the gitea.pr.*
|
|
||||||
review/merge family: listing requires ``gitea.read``, creating requires
|
|
||||||
``gitea.issue.comment``. Returns a list of block reasons (empty = allowed);
|
|
||||||
an unreadable profile fails closed.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
profile = get_profile()
|
|
||||||
except Exception as exc:
|
|
||||||
return [f"profile could not be resolved (fail closed): {_redact(str(exc))}"]
|
|
||||||
op_ok, op_reason = gitea_config.check_operation(
|
|
||||||
op, profile["allowed_operations"], profile["forbidden_operations"])
|
|
||||||
if op_ok:
|
|
||||||
return []
|
|
||||||
if op_reason == "no-allowed-operations":
|
|
||||||
return ["profile has no configured allowed operations (fail closed)"]
|
|
||||||
if op_reason == "forbidden":
|
|
||||||
return [f"profile forbids '{op}'"]
|
|
||||||
if op_reason == "invalid-forbidden-entry":
|
|
||||||
return ["profile has an unrecognized forbidden operation entry (fail closed)"]
|
|
||||||
return [f"profile is not allowed to {op}"]
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def gitea_list_issue_comments(
|
|
||||||
issue_number: int,
|
|
||||||
limit: int = 50,
|
|
||||||
remote: str = "dadeschools",
|
|
||||||
host: str | None = None,
|
|
||||||
org: str | None = None,
|
|
||||||
repo: str | None = None,
|
|
||||||
) -> dict:
|
|
||||||
"""List discussion comments on a Gitea issue.
|
|
||||||
|
|
||||||
Read-only. Issue discussion comments are distinct from PR reviews: this
|
|
||||||
reads the issue comment thread and never touches review endpoints. The
|
|
||||||
profile must allow ``gitea.read`` (fail closed otherwise).
|
|
||||||
|
|
||||||
Normal output is LLM-safe: no endpoint URLs. Set
|
|
||||||
GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include each
|
|
||||||
comment's web link.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
issue_number: The issue number whose comments to list (required).
|
|
||||||
limit: Max number of comments to return (default: 50).
|
|
||||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
|
||||||
host: Override the Gitea host.
|
|
||||||
org: Override the owner/organization.
|
|
||||||
repo: Override the repository name.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict with 'success', 'issue_number', and 'comments' (each with 'id',
|
|
||||||
'author', 'created_at', 'updated_at', 'body'); on a permission block,
|
|
||||||
'success' False and 'reasons' with no API call made.
|
|
||||||
"""
|
|
||||||
reasons = _issue_comment_gate("gitea.read")
|
|
||||||
if reasons:
|
|
||||||
return {"success": False, "issue_number": issue_number,
|
|
||||||
"reasons": reasons}
|
|
||||||
h, o, r = _resolve(remote, host, org, repo)
|
|
||||||
auth = _auth(h)
|
|
||||||
api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments"
|
|
||||||
comments = api_request("GET", api, auth) or []
|
|
||||||
reveal = _reveal_endpoints()
|
|
||||||
out = []
|
|
||||||
for c in comments[:limit]:
|
|
||||||
entry = {
|
|
||||||
"id": c["id"],
|
|
||||||
"author": (c.get("user") or {}).get("login", ""),
|
|
||||||
"created_at": c.get("created_at"),
|
|
||||||
"updated_at": c.get("updated_at"),
|
|
||||||
"body": c.get("body", ""),
|
|
||||||
}
|
|
||||||
if reveal:
|
|
||||||
entry["url"] = c.get("html_url")
|
|
||||||
out.append(entry)
|
|
||||||
return {"success": True, "issue_number": issue_number, "comments": out}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def gitea_create_issue_comment(
|
|
||||||
issue_number: int,
|
|
||||||
body: str,
|
|
||||||
remote: str = "dadeschools",
|
|
||||||
host: str | None = None,
|
|
||||||
org: str | None = None,
|
|
||||||
repo: str | None = None,
|
|
||||||
) -> dict:
|
|
||||||
"""Post a markdown comment to a Gitea issue's discussion thread.
|
|
||||||
|
|
||||||
Issue discussion comments are distinct from PR reviews: this posts to the
|
|
||||||
issue comment thread only and never submits review verdicts. The profile
|
|
||||||
must allow ``gitea.issue.comment`` — gated separately from the gitea.pr.*
|
|
||||||
review/merge operations (fail closed otherwise). The target issue is
|
|
||||||
always explicit; there is no inference beyond the standard remote
|
|
||||||
defaults used by every tool.
|
|
||||||
|
|
||||||
Normal output is LLM-safe: comment id + issue number, no endpoint URLs.
|
|
||||||
Set GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include the
|
|
||||||
comment's web link. Errors are redacted before being raised.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
issue_number: The issue number to comment on (required).
|
|
||||||
body: Markdown comment body (required, non-empty).
|
|
||||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
|
||||||
host: Override the Gitea host.
|
|
||||||
org: Override the owner/organization.
|
|
||||||
repo: Override the repository name.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict with 'success', 'comment_id', and 'issue_number' ('url' only
|
|
||||||
with the reveal opt-in); on a permission block or empty body,
|
|
||||||
'success'/'performed' False and 'reasons' with no API call made.
|
|
||||||
"""
|
|
||||||
reasons = _issue_comment_gate("gitea.issue.comment")
|
|
||||||
if not (body or "").strip():
|
|
||||||
reasons.append("comment body must be a non-empty string")
|
|
||||||
if reasons:
|
|
||||||
return {"success": False, "performed": False,
|
|
||||||
"issue_number": issue_number, "reasons": reasons}
|
|
||||||
h, o, r = _resolve(remote, host, org, repo)
|
|
||||||
auth = _auth(h)
|
|
||||||
api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments"
|
|
||||||
try:
|
|
||||||
with _audited("create_issue_comment", host=h, remote=remote, org=o,
|
|
||||||
repo=r, issue_number=issue_number,
|
|
||||||
request_metadata={"body_chars": len(body)}):
|
|
||||||
data = api_request("POST", api, auth, {"body": body})
|
|
||||||
except Exception as exc:
|
|
||||||
raise RuntimeError(_redact(str(exc))) from None
|
|
||||||
result = {"success": True, "performed": True,
|
|
||||||
"comment_id": data["id"], "issue_number": issue_number}
|
|
||||||
if _reveal_endpoints():
|
|
||||||
result["url"] = data.get("html_url")
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
# ── Operator guide / project skills (#128) ───────────────────────────────────
|
|
||||||
# Read-only capability discovery for new LLM sessions: what the active
|
|
||||||
# profile may do, how identity is verified, the non-negotiable safety rules,
|
|
||||||
# and which project workflows ("skills") exist. Nothing here mutates state or
|
|
||||||
# widens any permission — the guide describes the gates, it never bypasses
|
|
||||||
# them.
|
|
||||||
|
|
||||||
def _role_kind(allowed, forbidden) -> str:
|
|
||||||
"""Classify the active profile from its normalized operations.
|
|
||||||
|
|
||||||
'author' can create PRs / push branches; 'reviewer' can approve/merge;
|
|
||||||
'mixed' can do both (a config smell — the reviewer-deadlock invariant
|
|
||||||
forbids it in v2 configs); 'limited' can do neither.
|
|
||||||
"""
|
|
||||||
def can(op):
|
|
||||||
return gitea_config.check_operation(op, allowed, forbidden)[0]
|
|
||||||
review = can("gitea.pr.approve") or can("gitea.pr.merge")
|
|
||||||
author = can("gitea.pr.create") or can("gitea.branch.push")
|
|
||||||
if review and author:
|
|
||||||
return "mixed"
|
|
||||||
if review:
|
|
||||||
return "reviewer"
|
|
||||||
if author:
|
|
||||||
return "author"
|
|
||||||
return "limited"
|
|
||||||
|
|
||||||
|
|
||||||
_HARD_STOPS = [
|
|
||||||
"No self-review and no self-merge: the authenticated user must differ "
|
|
||||||
"from the PR author for review/approve/merge.",
|
|
||||||
"No tags or releases without an explicit operator instruction.",
|
|
||||||
"Never print token values, keychain IDs, passwords, or raw service "
|
|
||||||
"URLs in normal output.",
|
|
||||||
"No production access or production mutations.",
|
|
||||||
"No force-merge and no bypassing of merge gates; Gitea's own "
|
|
||||||
"mergeable signal is honoured, never overridden.",
|
|
||||||
"Do not rewrite the live profiles config; config changes are "
|
|
||||||
"operator-owned.",
|
|
||||||
]
|
|
||||||
|
|
||||||
_GUIDE_RULES = {
|
|
||||||
"hard_stops": _HARD_STOPS,
|
|
||||||
"fail_closed": (
|
|
||||||
"Every gate fails closed: unknown/disabled profiles, unknown "
|
|
||||||
"operations, empty allowed-operation lists, unresolved identity, "
|
|
||||||
"and unparseable config all deny the action instead of guessing. "
|
|
||||||
"Disabled profiles/services are never silently substituted."),
|
|
||||||
"head_sha_pinning": (
|
|
||||||
"Before reviewing or merging, record the PR head SHA and pass it as "
|
|
||||||
"expected_head_sha; if the head moves between review and merge, the "
|
|
||||||
"tool refuses and the review must be redone."),
|
|
||||||
"merge_confirmation": (
|
|
||||||
"gitea_merge_pr requires confirmation to equal 'MERGE PR <n>' "
|
|
||||||
"exactly (e.g. 'MERGE PR 42'); without it no API call is made. "
|
|
||||||
"Merge only when the operator has explicitly authorized it."),
|
|
||||||
"redaction": [
|
|
||||||
"Normal output contains no endpoint URLs, keychain IDs, or token "
|
|
||||||
"values; auth is reported as type/status only.",
|
|
||||||
"GITEA_MCP_REVEAL_ENDPOINTS=1 is the local admin/debug opt-in for "
|
|
||||||
"web links and endpoint names; it never reveals token values.",
|
|
||||||
"Errors are redacted before being surfaced.",
|
|
||||||
],
|
|
||||||
"separation": (
|
|
||||||
"Author, reviewer, and merger are separate identities. An author "
|
|
||||||
"profile creates branches/commits/PRs and must never "
|
|
||||||
"review/approve/merge its own work; a reviewer/merger profile "
|
|
||||||
"reviews and merges and must never create PRs or push branches "
|
|
||||||
"(reviewer-deadlock invariant). Issue discussion comments "
|
|
||||||
"(gitea.issue.comment) are gated separately from PR reviews "
|
|
||||||
"(gitea.pr.*)."),
|
|
||||||
"profile_switching": [
|
|
||||||
"The active profile is selected via GITEA_MCP_PROFILE (with GITEA_MCP_CONFIG pointing at the config path).",
|
|
||||||
"By default, static-profile mode is active: changing local environment variables does not dynamically update the already-connected MCP server process.",
|
|
||||||
"If runtime profile switching is explicitly enabled in the config (allow_runtime_switching: true), use gitea_activate_profile to switch profiles in-place.",
|
|
||||||
"If switching is disabled, you must change the launcher configuration/environment (dual-namespace) and reconnect/restart the MCP session.",
|
|
||||||
"After any profile switch or startup, you must verify the new runtime identity with a fresh gitea_whoami call before taking action."
|
|
||||||
],
|
|
||||||
"identity_verification": [
|
|
||||||
"Call gitea_whoami with an explicit remote first; confirm the "
|
|
||||||
"authenticated username and profile match the role you were asked "
|
|
||||||
"to perform.",
|
|
||||||
"If the username, profile, or allowed operations do not match the "
|
|
||||||
"task, STOP and report instead of proceeding.",
|
|
||||||
"Runtime identity (whoami) is the source of truth; config-declared "
|
|
||||||
"roles are metadata only.",
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
_COMMON_WORKFLOWS = [
|
|
||||||
"issue authoring: verify identity, create/claim the issue, keep scope "
|
|
||||||
"explicit (remote/org/repo).",
|
|
||||||
"implementation: claim issue, branch from fresh master, implement only "
|
|
||||||
"the issue scope, test, open a PR referencing the issue, stop.",
|
|
||||||
"PR review: verify reviewer identity, pin the head SHA, validate "
|
|
||||||
"independently, post a verdict; never review your own work.",
|
|
||||||
"PR merge: reviewer identity + eligibility check + pinned head + "
|
|
||||||
"explicit 'MERGE PR <n>' confirmation, only with operator "
|
|
||||||
"authorization.",
|
|
||||||
]
|
|
||||||
|
|
||||||
# Skill registry (#128). status: 'available' = backed by tools in this
|
|
||||||
# server; 'designed-not-implemented' = design exists but no MCP tools yet
|
|
||||||
# (listed rather than omitted so sessions know the boundary); 'operator-only'
|
|
||||||
# = never performed by an LLM session.
|
|
||||||
_PROJECT_SKILLS = {
|
|
||||||
"gitea-issue-authoring": {
|
|
||||||
"description": "Create, view, label, claim, and close Gitea issues.",
|
|
||||||
"when_to_use": "Filing new work, updating issue state, claiming an "
|
|
||||||
"issue before implementation (gitea_mark_issue).",
|
|
||||||
"required_operations": ["gitea.read"],
|
|
||||||
"status": "available",
|
|
||||||
"notes": "Always pass remote/org/repo explicitly.",
|
|
||||||
"steps": [
|
|
||||||
"Verify identity with gitea_whoami (explicit remote).",
|
|
||||||
"Check the issue queue with gitea_list_issues.",
|
|
||||||
"Create with gitea_create_issue or claim with gitea_mark_issue "
|
|
||||||
"action='start'.",
|
|
||||||
"Keep issue bodies free of secrets, tokens, and raw service "
|
|
||||||
"URLs.",
|
|
||||||
"Release the claim (action='done') or close when finished.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"gitea-pr-creation": {
|
|
||||||
"description": "Author a feature branch and open a pull request.",
|
|
||||||
"when_to_use": "After implementing a claimed issue on a feature "
|
|
||||||
"branch; author profiles only.",
|
|
||||||
"required_operations": ["gitea.pr.create"],
|
|
||||||
"status": "available",
|
|
||||||
"steps": [
|
|
||||||
"Branch from fresh master (git pull first).",
|
|
||||||
"Implement only the claimed issue's scope; stage files "
|
|
||||||
"explicitly.",
|
|
||||||
"Run targeted tests, the full suite, py_compile, git diff "
|
|
||||||
"--check, and a secret sweep before committing.",
|
|
||||||
"Open the PR with gitea_create_pr referencing the issue "
|
|
||||||
"('Closes #<n>').",
|
|
||||||
"Stop: do not review or merge your own PR.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"gitea-pr-review": {
|
|
||||||
"description": "Independently review a pull request and post a "
|
|
||||||
"verdict.",
|
|
||||||
"when_to_use": "Reviewer profile asked to evaluate someone else's "
|
|
||||||
"PR.",
|
|
||||||
"required_operations": ["gitea.pr.review"],
|
|
||||||
"status": "available",
|
|
||||||
"steps": [
|
|
||||||
"Verify reviewer identity with gitea_whoami; the PR author "
|
|
||||||
"must be a different user.",
|
|
||||||
"Pin the PR head SHA (gitea_view_pr) before validating.",
|
|
||||||
"Validate independently: scope vs the linked issue, tests, "
|
|
||||||
"diff check, secret sweep.",
|
|
||||||
"Post the verdict with gitea_review_pr / "
|
|
||||||
"gitea_submit_pr_review.",
|
|
||||||
"Do not merge unless the operator explicitly authorizes it.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"gitea-pr-merge": {
|
|
||||||
"description": "Gated merge of an approved pull request.",
|
|
||||||
"when_to_use": "Reviewer/merger profile with explicit operator "
|
|
||||||
"authorization to merge.",
|
|
||||||
"required_operations": ["gitea.pr.merge"],
|
|
||||||
"status": "available",
|
|
||||||
"steps": [
|
|
||||||
"Confirm operator authorization for this specific merge.",
|
|
||||||
"Run gitea_check_pr_eligibility action='merge' and resolve "
|
|
||||||
"every reason it reports.",
|
|
||||||
"Call gitea_merge_pr with expected_head_sha pinned and "
|
|
||||||
"confirmation 'MERGE PR <n>'.",
|
|
||||||
"Confirm linked issues auto-closed and the in-progress label "
|
|
||||||
"was released.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"gitea-issue-comments": {
|
|
||||||
"description": "Read and post issue discussion comments (distinct "
|
|
||||||
"from PR reviews).",
|
|
||||||
"when_to_use": "Design discussions, review notes on issues, "
|
|
||||||
"decision records.",
|
|
||||||
"required_operations": ["gitea.issue.comment"],
|
|
||||||
"status": "available",
|
|
||||||
"notes": "Listing needs only gitea.read; posting needs "
|
|
||||||
"gitea.issue.comment, gated separately from gitea.pr.*.",
|
|
||||||
"steps": [
|
|
||||||
"List with gitea_list_issue_comments (explicit issue number).",
|
|
||||||
"Post with gitea_create_issue_comment; markdown body, no "
|
|
||||||
"secrets or raw URLs.",
|
|
||||||
"Never use PR review tools for issue discussion or vice "
|
|
||||||
"versa.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"profile-switching": {
|
|
||||||
"description": "Change the active MCP identity between author and "
|
|
||||||
"reviewer profiles.",
|
|
||||||
"when_to_use": "A task requires a role the current profile "
|
|
||||||
"forbids (e.g. moving from authoring to review).",
|
|
||||||
"required_operations": [],
|
|
||||||
"status": "available",
|
|
||||||
"steps": [
|
|
||||||
"Check profile mode using gitea_get_runtime_context.",
|
|
||||||
"If dynamic-profile mode is enabled (allow_runtime_switching: true), "
|
|
||||||
"call gitea_activate_profile to switch in-place.",
|
|
||||||
"If static-profile mode (default), ask the operator to update GITEA_MCP_PROFILE "
|
|
||||||
"in the launcher environment and reconnect/restart the MCP server.",
|
|
||||||
"Immediately verify the new identity with a fresh gitea_whoami call before taking action.",
|
|
||||||
"If identity does not match the expected role, STOP.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"redaction-security-review": {
|
|
||||||
"description": "Sweep changes and tool output for secrets, "
|
|
||||||
"keychain IDs, token values, and raw service URLs.",
|
|
||||||
"when_to_use": "Before every commit/PR and when reviewing any "
|
|
||||||
"diff.",
|
|
||||||
"required_operations": ["gitea.read"],
|
|
||||||
"status": "available",
|
|
||||||
"steps": [
|
|
||||||
"Run git diff --check for whitespace damage.",
|
|
||||||
"Grep the diff for token/password/secret/keychain patterns; "
|
|
||||||
"only synthetic fixtures may match.",
|
|
||||||
"Confirm normal tool output contains no endpoint URLs or "
|
|
||||||
"keychain IDs (reveal opt-in excepted).",
|
|
||||||
"Confirm no live config or private machine paths are being "
|
|
||||||
"committed.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"jenkins-readonly": {
|
|
||||||
"description": "Read-only Jenkins CI inspection (jobs, builds, "
|
|
||||||
"logs).",
|
|
||||||
"when_to_use": "Checking CI state once Jenkins MCP tools exist.",
|
|
||||||
"required_operations": ["jenkins.read"],
|
|
||||||
"status": "designed-not-implemented",
|
|
||||||
"notes": "Design exists (issues #72/#77); no Jenkins MCP server is "
|
|
||||||
"connected yet. Report SKIPPED rather than substituting "
|
|
||||||
"shell or direct API calls.",
|
|
||||||
"steps": [
|
|
||||||
"Confirm a Jenkins MCP server is connected; if not, report "
|
|
||||||
"SKIPPED.",
|
|
||||||
"Use read-only operations only; never trigger, cancel, or "
|
|
||||||
"configure builds.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"glitchtip-readonly": {
|
|
||||||
"description": "Read-only GlitchTip error/event inspection.",
|
|
||||||
"when_to_use": "Investigating reported errors once GlitchTip MCP "
|
|
||||||
"tools exist.",
|
|
||||||
"required_operations": ["glitchtip.read"],
|
|
||||||
"status": "designed-not-implemented",
|
|
||||||
"notes": "Design exists (issue #73); no GlitchTip MCP server is "
|
|
||||||
"connected yet. Report SKIPPED rather than substituting "
|
|
||||||
"shell or direct API calls.",
|
|
||||||
"steps": [
|
|
||||||
"Confirm a GlitchTip MCP server is connected; if not, report "
|
|
||||||
"SKIPPED.",
|
|
||||||
"Use read-only operations only; never mutate issues or "
|
|
||||||
"settings.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
"release-operator": {
|
|
||||||
"description": "Release, tag, and version workflows.",
|
|
||||||
"when_to_use": "Never by an LLM session on its own: releases and "
|
|
||||||
"tags are operator-owned.",
|
|
||||||
"required_operations": [],
|
|
||||||
"status": "operator-only",
|
|
||||||
"notes": "See the release SOP docs. Hard stop: no tags or "
|
|
||||||
"releases without an explicit operator instruction.",
|
|
||||||
"steps": [
|
|
||||||
"Stop and hand off to the operator; do not create tags or "
|
|
||||||
"releases.",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def mcp_get_control_plane_guide(
|
|
||||||
remote: str = "dadeschools",
|
|
||||||
host: str | None = None,
|
|
||||||
) -> dict:
|
|
||||||
"""Structured operator guide for the current Gitea MCP session (#128).
|
|
||||||
|
|
||||||
Read-only. Call this first in a new session: it reports the active
|
|
||||||
profile, the authenticated identity (fail-soft), what this profile may
|
|
||||||
and may not do, and the non-negotiable workflow rules (hard stops,
|
|
||||||
fail-closed behavior, head-SHA pinning, merge confirmation, redaction,
|
|
||||||
author/reviewer separation, profile switching). Guidance is
|
|
||||||
profile-aware; if identity cannot be resolved it instructs the session
|
|
||||||
to STOP.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
|
||||||
host: Override the Gitea host.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict with 'read_only', 'remote', 'profile' (name, operations,
|
|
||||||
role_kind), 'identity' (username/status/instruction; 'server' only
|
|
||||||
with the reveal opt-in), 'guidance' (profile-specific), 'rules',
|
|
||||||
'workflows', and 'skills_tool'. Never secrets.
|
|
||||||
"""
|
|
||||||
h, _, _ = _resolve(remote, host, None, None)
|
|
||||||
profile = get_profile()
|
|
||||||
allowed = profile["allowed_operations"]
|
|
||||||
forbidden = profile["forbidden_operations"]
|
|
||||||
role = _role_kind(allowed, forbidden)
|
|
||||||
username = _authenticated_username(h)
|
|
||||||
|
|
||||||
identity = {
|
|
||||||
"authenticated_username": username,
|
|
||||||
"remote": remote,
|
|
||||||
"status": "verified" if username else "unresolved",
|
|
||||||
"instruction": (
|
|
||||||
"Identity verified — confirm it matches the role this task "
|
|
||||||
"requires before acting."
|
|
||||||
if username else
|
|
||||||
"STOP: the authenticated identity could not be resolved. Do "
|
|
||||||
"not perform any mutating operation; report to the operator "
|
|
||||||
"and wait."),
|
|
||||||
}
|
|
||||||
if _reveal_endpoints():
|
|
||||||
identity["server"] = h
|
|
||||||
|
|
||||||
guidance = []
|
|
||||||
if not username:
|
|
||||||
guidance.append(
|
|
||||||
"STOP: identity unresolved — no mutating operations until the "
|
|
||||||
"operator fixes credentials/profile and gitea_whoami verifies.")
|
|
||||||
if role == "author":
|
|
||||||
guidance.append(
|
|
||||||
"Author profile: creating branches, commits, issues, and PRs "
|
|
||||||
"is allowed; review, approve, and merge are forbidden for this "
|
|
||||||
"profile — never attempt them, and never review or merge your "
|
|
||||||
"own PR from any profile.")
|
|
||||||
elif role == "reviewer":
|
|
||||||
guidance.append(
|
|
||||||
"Reviewer profile: review/approve/merge may proceed ONLY after "
|
|
||||||
"gitea_check_pr_eligibility passes and the PR head SHA is "
|
|
||||||
"pinned (expected_head_sha); the PR author must be a different "
|
|
||||||
"user, and merging additionally requires explicit operator "
|
|
||||||
"authorization plus the 'MERGE PR <n>' confirmation.")
|
|
||||||
elif role == "mixed":
|
|
||||||
guidance.append(
|
|
||||||
"WARNING: this profile allows both authoring and "
|
|
||||||
"review/merge, which defeats two-party review. Treat it as "
|
|
||||||
"misconfigured: STOP and report to the operator before any "
|
|
||||||
"review or merge.")
|
|
||||||
else:
|
|
||||||
guidance.append(
|
|
||||||
"Limited profile: no authoring and no review/merge "
|
|
||||||
"operations are allowed. Read-only work only; anything else "
|
|
||||||
"fails closed.")
|
|
||||||
|
|
||||||
return {
|
|
||||||
"read_only": True,
|
|
||||||
"remote": remote,
|
|
||||||
"profile": {
|
|
||||||
"name": profile["profile_name"],
|
|
||||||
"allowed_operations": allowed,
|
|
||||||
"forbidden_operations": forbidden,
|
|
||||||
"role_kind": role,
|
|
||||||
},
|
|
||||||
"identity": identity,
|
|
||||||
"guidance": guidance,
|
|
||||||
"rules": _GUIDE_RULES,
|
|
||||||
"workflows": _COMMON_WORKFLOWS,
|
|
||||||
"skills_tool": "mcp_list_project_skills",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def mcp_list_project_skills() -> dict:
|
|
||||||
"""List the project's workflow skills and when to use them (#128).
|
|
||||||
|
|
||||||
Read-only; makes no API calls. Each skill reports its name,
|
|
||||||
description, when-to-use guidance, required operations, status
|
|
||||||
('available', 'designed-not-implemented', or 'operator-only'), and
|
|
||||||
whether the current profile's operations permit it. Use
|
|
||||||
mcp_get_skill_guide(name) for the step-by-step guide.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict with 'read_only', 'count', and 'skills' (list). Never secrets.
|
|
||||||
"""
|
|
||||||
profile = get_profile()
|
|
||||||
allowed = profile["allowed_operations"]
|
|
||||||
forbidden = profile["forbidden_operations"]
|
|
||||||
skills = []
|
|
||||||
for name, meta in _PROJECT_SKILLS.items():
|
|
||||||
ops = meta["required_operations"]
|
|
||||||
available = all(
|
|
||||||
gitea_config.check_operation(op, allowed, forbidden)[0]
|
|
||||||
for op in ops
|
|
||||||
) if ops else True
|
|
||||||
entry = {
|
|
||||||
"name": name,
|
|
||||||
"description": meta["description"],
|
|
||||||
"when_to_use": meta["when_to_use"],
|
|
||||||
"required_operations": ops,
|
|
||||||
"status": meta["status"],
|
|
||||||
"available_to_current_profile": (
|
|
||||||
available and meta["status"] == "available"),
|
|
||||||
}
|
|
||||||
if meta.get("notes"):
|
|
||||||
entry["notes"] = meta["notes"]
|
|
||||||
skills.append(entry)
|
|
||||||
return {"read_only": True, "count": len(skills), "skills": skills}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def mcp_get_skill_guide(skill_name: str) -> dict:
|
|
||||||
"""Step-by-step guide for one named project skill (#128).
|
|
||||||
|
|
||||||
Read-only; makes no API calls. Unknown skill names fail closed with
|
|
||||||
the list of valid names instead of guessing.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
skill_name: A name from mcp_list_project_skills (case-insensitive).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict with 'success', 'skill' metadata, and 'steps'; on an unknown
|
|
||||||
name, 'success' False with 'reasons' and 'valid_skills'.
|
|
||||||
"""
|
|
||||||
key = (skill_name or "").strip().lower()
|
|
||||||
meta = _PROJECT_SKILLS.get(key)
|
|
||||||
if meta is None:
|
|
||||||
return {
|
|
||||||
"success": False,
|
|
||||||
"reasons": [f"unknown skill '{skill_name}' (fail closed)"],
|
|
||||||
"valid_skills": sorted(_PROJECT_SKILLS),
|
|
||||||
}
|
|
||||||
skill = {
|
|
||||||
"name": key,
|
|
||||||
"description": meta["description"],
|
|
||||||
"when_to_use": meta["when_to_use"],
|
|
||||||
"required_operations": meta["required_operations"],
|
|
||||||
"status": meta["status"],
|
|
||||||
}
|
|
||||||
if meta.get("notes"):
|
|
||||||
skill["notes"] = meta["notes"]
|
|
||||||
return {"success": True, "skill": skill, "steps": list(meta["steps"])}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def gitea_whoami(
|
def gitea_whoami(
|
||||||
remote: str = "dadeschools",
|
remote: str = "dadeschools",
|
||||||
@@ -2192,306 +1538,6 @@ def gitea_get_profile(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def gitea_get_runtime_context(
|
|
||||||
remote: str = "dadeschools",
|
|
||||||
host: str | None = None,
|
|
||||||
) -> dict:
|
|
||||||
"""Read-only: explicit visibility into active profile, configuration model, and eligibility.
|
|
||||||
|
|
||||||
Reports config model shape, profile mode (static vs dynamic), and detailed blocks.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
|
||||||
host: Override the Gitea host.
|
|
||||||
"""
|
|
||||||
profile = get_profile()
|
|
||||||
config = gitea_config.load_config()
|
|
||||||
reveal = _reveal_endpoints()
|
|
||||||
|
|
||||||
# Determine config model/version
|
|
||||||
if config is None:
|
|
||||||
config_model = "env-only"
|
|
||||||
elif config.get("version") == 1:
|
|
||||||
config_model = "v1"
|
|
||||||
elif config.get("version") == 2:
|
|
||||||
if "contexts" in config or config.get("shape") == "contexts":
|
|
||||||
config_model = "v2-contexts"
|
|
||||||
else:
|
|
||||||
config_model = "v2-environments"
|
|
||||||
else:
|
|
||||||
config_model = f"unknown-version-{config.get('version')}"
|
|
||||||
|
|
||||||
# Determine profile source
|
|
||||||
if os.environ.get("GITEA_PROFILE_NAME"):
|
|
||||||
profile_source = "env var"
|
|
||||||
elif gitea_config.selected_profile_name():
|
|
||||||
profile_source = "config file profile"
|
|
||||||
else:
|
|
||||||
profile_source = "default"
|
|
||||||
|
|
||||||
h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None)
|
|
||||||
username = _authenticated_username(h) if h else None
|
|
||||||
|
|
||||||
switching = gitea_config.is_runtime_switching_enabled()
|
|
||||||
profile_mode = "dynamic-profile" if switching else "static-profile"
|
|
||||||
|
|
||||||
# Evaluate review/merge eligibility
|
|
||||||
allowed = profile["allowed_operations"] or []
|
|
||||||
forbidden = profile["forbidden_operations"] or []
|
|
||||||
|
|
||||||
# Check approve
|
|
||||||
approve_ok, _ = gitea_config.check_operation("approve", allowed, forbidden)
|
|
||||||
# Check merge
|
|
||||||
merge_ok, _ = gitea_config.check_operation("merge", allowed, forbidden)
|
|
||||||
|
|
||||||
review_merge_allowed = False
|
|
||||||
blocked_reasons = []
|
|
||||||
suggested_fix = "none"
|
|
||||||
|
|
||||||
if not username:
|
|
||||||
blocked_reasons.append("Authenticated identity is unresolved. Gitea credentials are missing or invalid.")
|
|
||||||
suggested_fix = "operator action"
|
|
||||||
elif not (approve_ok or merge_ok):
|
|
||||||
blocked_reasons.append(
|
|
||||||
f"Active profile '{profile['profile_name']}' does not permit review or merge operations."
|
|
||||||
)
|
|
||||||
if switching:
|
|
||||||
suggested_fix = "profile switch"
|
|
||||||
else:
|
|
||||||
suggested_fix = "reviewer namespace"
|
|
||||||
else:
|
|
||||||
review_merge_allowed = True
|
|
||||||
|
|
||||||
# Note that self-review and self-merge are blocked regardless of profile roles
|
|
||||||
blocked_reasons.append(
|
|
||||||
"Note: self-review and self-merge are always blocked at runtime for any PR authored by the active user."
|
|
||||||
)
|
|
||||||
|
|
||||||
safe_next_action = "None; ready for operations."
|
|
||||||
if suggested_fix == "operator action":
|
|
||||||
safe_next_action = f"Ask the operator to configure valid credentials/token for profile '{profile['profile_name']}'."
|
|
||||||
elif suggested_fix == "profile switch":
|
|
||||||
safe_next_action = "Switch to a reviewer profile by calling gitea_activate_profile with a reviewer profile."
|
|
||||||
elif suggested_fix == "reviewer namespace":
|
|
||||||
safe_next_action = (
|
|
||||||
"Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, "
|
|
||||||
"or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile."
|
|
||||||
)
|
|
||||||
|
|
||||||
result = {
|
|
||||||
"active_profile": profile["profile_name"],
|
|
||||||
"authenticated_username": username,
|
|
||||||
"remote": remote if remote in REMOTES else None,
|
|
||||||
"config_model": config_model,
|
|
||||||
"profile_source": profile_source,
|
|
||||||
"allowed_operations": allowed,
|
|
||||||
"forbidden_operations": forbidden,
|
|
||||||
"runtime_switching_supported": switching,
|
|
||||||
"profile_mode": profile_mode,
|
|
||||||
"review_merge_allowed": review_merge_allowed,
|
|
||||||
"review_merge_blocked_reasons": blocked_reasons,
|
|
||||||
"suggested_fix": suggested_fix,
|
|
||||||
"safe_next_action": safe_next_action,
|
|
||||||
}
|
|
||||||
|
|
||||||
if reveal and h:
|
|
||||||
result["server"] = f"https://{h}"
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def gitea_list_profiles() -> dict:
|
|
||||||
"""Read-only: list all Gitea MCP profiles with redacted metadata.
|
|
||||||
|
|
||||||
Exposes names, role kinds, enabled state, allowed/forbidden operations, and active status.
|
|
||||||
Token values, keychain IDs, and raw URLs are hidden unless GITEA_MCP_REVEAL_ENDPOINTS=1 is enabled.
|
|
||||||
"""
|
|
||||||
config = gitea_config.load_config()
|
|
||||||
reveal = _reveal_endpoints()
|
|
||||||
active_name = gitea_config.selected_profile_name() or get_profile()["profile_name"]
|
|
||||||
|
|
||||||
profiles_out = []
|
|
||||||
|
|
||||||
if config is None:
|
|
||||||
# Env-only: return the active profile
|
|
||||||
p = get_profile()
|
|
||||||
role = _role_kind(p["allowed_operations"], p["forbidden_operations"])
|
|
||||||
h = REMOTES.get("dadeschools", {}).get("host") # default host
|
|
||||||
username = _authenticated_username(h) if h else None
|
|
||||||
|
|
||||||
prof = {
|
|
||||||
"name": p["profile_name"],
|
|
||||||
"role_kind": role,
|
|
||||||
"allowed_operations": p["allowed_operations"],
|
|
||||||
"forbidden_operations": p["forbidden_operations"],
|
|
||||||
"identity_status": "verified" if username else "unresolved",
|
|
||||||
"is_active": True,
|
|
||||||
"enabled": True,
|
|
||||||
}
|
|
||||||
if reveal:
|
|
||||||
prof["token_source_name"] = p["token_source_name"]
|
|
||||||
prof["base_url"] = p["base_url"]
|
|
||||||
profiles_out.append(prof)
|
|
||||||
else:
|
|
||||||
# Load from config profiles
|
|
||||||
profiles_dict = config.get("profiles") or {}
|
|
||||||
unavailable_dict = config.get("unavailable") or {}
|
|
||||||
audit_only_profiles = config.get("audit_only_profiles") or {}
|
|
||||||
|
|
||||||
# First, process available profiles
|
|
||||||
for name, p in profiles_dict.items():
|
|
||||||
role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", []))
|
|
||||||
is_active = (name == active_name)
|
|
||||||
|
|
||||||
# Identity status lookup
|
|
||||||
if is_active:
|
|
||||||
h = REMOTES.get("dadeschools", {}).get("host") # default host
|
|
||||||
username = _authenticated_username(h) if h else None
|
|
||||||
identity_status = "verified" if username else "unresolved"
|
|
||||||
else:
|
|
||||||
# Safely resolve if credentials are present without networking
|
|
||||||
try:
|
|
||||||
tok = gitea_config.resolve_token(p)
|
|
||||||
identity_status = "credentials present" if tok else "missing credentials"
|
|
||||||
except Exception:
|
|
||||||
identity_status = "missing credentials"
|
|
||||||
|
|
||||||
prof = {
|
|
||||||
"name": name,
|
|
||||||
"role_kind": role,
|
|
||||||
"allowed_operations": p.get("allowed_operations", []),
|
|
||||||
"forbidden_operations": p.get("forbidden_operations", []),
|
|
||||||
"identity_status": identity_status,
|
|
||||||
"is_active": is_active,
|
|
||||||
"enabled": p.get("enabled", True),
|
|
||||||
}
|
|
||||||
if reveal:
|
|
||||||
if isinstance(p.get("auth"), dict):
|
|
||||||
prof["auth"] = p["auth"]
|
|
||||||
prof["base_url"] = p.get("base_url")
|
|
||||||
else:
|
|
||||||
if isinstance(p.get("auth"), dict):
|
|
||||||
prof["auth"] = {k: ("<redacted>" if k != "type" else v) for k, v in p["auth"].items()}
|
|
||||||
prof["base_url"] = "<redacted>"
|
|
||||||
profiles_out.append(prof)
|
|
||||||
|
|
||||||
# Process unavailable/disabled/audit-only profiles
|
|
||||||
for name, p in audit_only_profiles.items():
|
|
||||||
role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", []))
|
|
||||||
is_active = (name == active_name)
|
|
||||||
prof = {
|
|
||||||
"name": name,
|
|
||||||
"role_kind": role,
|
|
||||||
"allowed_operations": p.get("allowed_operations", []),
|
|
||||||
"forbidden_operations": p.get("forbidden_operations", []),
|
|
||||||
"identity_status": "unavailable (disabled)",
|
|
||||||
"is_active": is_active,
|
|
||||||
"enabled": p.get("enabled", True),
|
|
||||||
"unavailable_reason": p.get("_unavailable_reason"),
|
|
||||||
}
|
|
||||||
if reveal:
|
|
||||||
if isinstance(p.get("auth"), dict):
|
|
||||||
prof["auth"] = p["auth"]
|
|
||||||
prof["base_url"] = p.get("base_url")
|
|
||||||
else:
|
|
||||||
if isinstance(p.get("auth"), dict):
|
|
||||||
prof["auth"] = {k: ("<redacted>" if k != "type" else v) for k, v in p["auth"].items()}
|
|
||||||
prof["base_url"] = "<redacted>"
|
|
||||||
profiles_out.append(prof)
|
|
||||||
|
|
||||||
# Handle unavailable mappings that are not in audit_only
|
|
||||||
for name, reason in unavailable_dict.items():
|
|
||||||
if not any(x["name"] == name for x in profiles_out):
|
|
||||||
profiles_out.append({
|
|
||||||
"name": name,
|
|
||||||
"role_kind": "limited",
|
|
||||||
"allowed_operations": [],
|
|
||||||
"forbidden_operations": [],
|
|
||||||
"identity_status": "unavailable",
|
|
||||||
"is_active": (name == active_name),
|
|
||||||
"enabled": False,
|
|
||||||
"unavailable_reason": reason,
|
|
||||||
})
|
|
||||||
|
|
||||||
return {"profiles": profiles_out}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def gitea_activate_profile(
|
|
||||||
profile_name: str,
|
|
||||||
remote: str = "dadeschools",
|
|
||||||
host: str | None = None,
|
|
||||||
) -> dict:
|
|
||||||
"""Gated profile activation. Switch the active profile in-place for this runtime session.
|
|
||||||
|
|
||||||
Only allowed if "allow_runtime_switching": true is explicitly configured.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
profile_name: Profile to activate.
|
|
||||||
remote: Known instance — 'dadeschools' or 'prgs'.
|
|
||||||
host: Override the Gitea host.
|
|
||||||
"""
|
|
||||||
if not gitea_config.is_runtime_switching_enabled():
|
|
||||||
return {
|
|
||||||
"success": False,
|
|
||||||
"message": (
|
|
||||||
"Runtime profile switching is disabled in this configuration. Static profile mode is active. "
|
|
||||||
"To enable switching, set 'allow_runtime_switching': true in the config rules."
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
config = gitea_config.load_config()
|
|
||||||
if not config:
|
|
||||||
return {
|
|
||||||
"success": False,
|
|
||||||
"message": "No profiles configuration file is loaded. Switching is not supported in env-only mode."
|
|
||||||
}
|
|
||||||
|
|
||||||
profiles_dict = config.get("profiles") or {}
|
|
||||||
if profile_name not in profiles_dict:
|
|
||||||
return {
|
|
||||||
"success": False,
|
|
||||||
"message": f"Profile '{profile_name}' not found in loaded config."
|
|
||||||
}
|
|
||||||
|
|
||||||
h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None)
|
|
||||||
|
|
||||||
# 1. Record before state
|
|
||||||
before_profile = get_profile()["profile_name"]
|
|
||||||
before_identity = _authenticated_username(h) if h else None
|
|
||||||
|
|
||||||
# 2. Perform switch
|
|
||||||
gitea_config._active_profile_override = profile_name
|
|
||||||
|
|
||||||
# 3. Clear identity cache to force a fresh verification
|
|
||||||
if h:
|
|
||||||
_IDENTITY_CACHE.pop(h, None)
|
|
||||||
|
|
||||||
# 4. Resolve fresh identity
|
|
||||||
after_profile = get_profile()["profile_name"]
|
|
||||||
after_identity = _authenticated_username(h) if h else None
|
|
||||||
|
|
||||||
# 5. Audit the switch if auditing is on
|
|
||||||
_audit(
|
|
||||||
"activate_profile",
|
|
||||||
host=h,
|
|
||||||
remote=remote,
|
|
||||||
result={"success": True, "before": before_profile, "after": after_profile},
|
|
||||||
username=after_identity,
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"success": True,
|
|
||||||
"message": f"Successfully activated profile '{profile_name}' (fresh identity verification complete).",
|
|
||||||
"before_profile": before_profile,
|
|
||||||
"before_identity": before_identity,
|
|
||||||
"after_profile": after_profile,
|
|
||||||
"after_identity": after_identity,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def gitea_audit_config() -> dict:
|
def gitea_audit_config() -> dict:
|
||||||
"""Audit the configured profiles/services: enabled state, no secrets.
|
"""Audit the configured profiles/services: enabled state, no secrets.
|
||||||
|
|||||||
@@ -31,8 +31,6 @@ from mcp_server import ( # noqa: E402
|
|||||||
gitea_get_profile,
|
gitea_get_profile,
|
||||||
gitea_check_pr_eligibility,
|
gitea_check_pr_eligibility,
|
||||||
gitea_submit_pr_review,
|
gitea_submit_pr_review,
|
||||||
gitea_list_issue_comments,
|
|
||||||
gitea_create_issue_comment,
|
|
||||||
)
|
)
|
||||||
from gitea_auth import get_profile # noqa: E402
|
from gitea_auth import get_profile # noqa: E402
|
||||||
|
|
||||||
@@ -1905,199 +1903,3 @@ class TestEndpointRedaction(unittest.TestCase):
|
|||||||
from mcp_server import gitea_audit_config
|
from mcp_server import gitea_audit_config
|
||||||
result = gitea_audit_config()
|
result = gitea_audit_config()
|
||||||
self.assertFalse(result["configured"])
|
self.assertFalse(result["configured"])
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Issue comment tools (#126)
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
class TestIssueCommentTools(unittest.TestCase):
|
|
||||||
"""gitea_list_issue_comments / gitea_create_issue_comment (#126).
|
|
||||||
|
|
||||||
Issue discussion comments are distinct from PR reviews: they hit the
|
|
||||||
/issues/{n}/comments endpoint, are gated on gitea.read (list) and
|
|
||||||
gitea.issue.comment (create) — never on the gitea.pr.* review/merge
|
|
||||||
family — and their normal output is LLM-safe (no endpoint URLs; the
|
|
||||||
GITEA_MCP_REVEAL_ENDPOINTS opt-in restores links for local diagnostics).
|
|
||||||
"""
|
|
||||||
|
|
||||||
AUTHOR_ENV = {
|
|
||||||
"GITEA_PROFILE_NAME": "gitea-author",
|
|
||||||
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _comment(cid=101, login="alice", body="hello world"):
|
|
||||||
return {
|
|
||||||
"id": cid,
|
|
||||||
"user": {"login": login},
|
|
||||||
"body": body,
|
|
||||||
"created_at": "2026-07-03T00:00:00Z",
|
|
||||||
"updated_at": "2026-07-03T01:00:00Z",
|
|
||||||
"html_url": (
|
|
||||||
"https://gitea.example.com/o/r/issues/9#issuecomment-%d" % cid
|
|
||||||
),
|
|
||||||
}
|
|
||||||
|
|
||||||
# -- list ----------------------------------------------------------------
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_lists_comments(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = [self._comment(101), self._comment(102, "bob")]
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
|
||||||
self.assertTrue(result["success"])
|
|
||||||
self.assertEqual(result["issue_number"], 9)
|
|
||||||
self.assertEqual(len(result["comments"]), 2)
|
|
||||||
first = result["comments"][0]
|
|
||||||
self.assertEqual(first["id"], 101)
|
|
||||||
self.assertEqual(first["author"], "alice")
|
|
||||||
self.assertEqual(first["body"], "hello world")
|
|
||||||
self.assertEqual(first["created_at"], "2026-07-03T00:00:00Z")
|
|
||||||
self.assertEqual(first["updated_at"], "2026-07-03T01:00:00Z")
|
|
||||||
url = mock_api.call_args[0][1]
|
|
||||||
self.assertIn("/issues/9/comments", url)
|
|
||||||
self.assertEqual(mock_api.call_args[0][0], "GET")
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_list_output_has_no_urls_by_default(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = [self._comment()]
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
|
||||||
blob = json.dumps(result)
|
|
||||||
self.assertNotIn("https://", blob)
|
|
||||||
self.assertNotIn("http://", blob)
|
|
||||||
self.assertNotIn("url", blob)
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_list_reveal_opt_in_includes_url(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = [self._comment()]
|
|
||||||
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
|
|
||||||
with patch.dict(os.environ, env, clear=True):
|
|
||||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
|
||||||
self.assertIn("issuecomment-101", result["comments"][0]["url"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_list_blocked_without_read_permission(self, _auth, mock_api):
|
|
||||||
env = {"GITEA_PROFILE_NAME": "gitea-writer-only",
|
|
||||||
"GITEA_ALLOWED_OPERATIONS": "gitea.issue.comment"}
|
|
||||||
with patch.dict(os.environ, env, clear=True):
|
|
||||||
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
|
|
||||||
self.assertFalse(result["success"])
|
|
||||||
self.assertTrue(result["reasons"])
|
|
||||||
mock_api.assert_not_called()
|
|
||||||
|
|
||||||
def test_list_unknown_remote_raises(self):
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
gitea_list_issue_comments(issue_number=9, remote="nope")
|
|
||||||
|
|
||||||
# -- create ----------------------------------------------------------------
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_creates_comment(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = self._comment(555, "gitea-author", "posted")
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
result = gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="posted", remote="prgs")
|
|
||||||
self.assertTrue(result["success"])
|
|
||||||
self.assertEqual(result["comment_id"], 555)
|
|
||||||
self.assertEqual(result["issue_number"], 9)
|
|
||||||
self.assertEqual(mock_api.call_args[0][0], "POST")
|
|
||||||
url = mock_api.call_args[0][1]
|
|
||||||
self.assertIn("/issues/9/comments", url)
|
|
||||||
self.assertIn("gitea.prgs.cc", url)
|
|
||||||
self.assertIn("Scaled-Tech-Consulting", url)
|
|
||||||
self.assertEqual(mock_api.call_args[0][3], {"body": "posted"})
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_create_output_has_no_urls_by_default(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = self._comment(555)
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
result = gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="posted", remote="prgs")
|
|
||||||
blob = json.dumps(result)
|
|
||||||
self.assertNotIn("https://", blob)
|
|
||||||
self.assertNotIn("http://", blob)
|
|
||||||
self.assertNotIn("url", blob)
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_create_reveal_opt_in_includes_url(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = self._comment(555)
|
|
||||||
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
|
|
||||||
with patch.dict(os.environ, env, clear=True):
|
|
||||||
result = gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="posted", remote="prgs")
|
|
||||||
self.assertIn("issuecomment-555", result["url"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_review_permissions_do_not_grant_issue_comments(self, _auth, mock_api):
|
|
||||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
|
||||||
"GITEA_ALLOWED_OPERATIONS":
|
|
||||||
"gitea.read,gitea.pr.review,gitea.pr.comment,"
|
|
||||||
"gitea.pr.approve,gitea.pr.merge"}
|
|
||||||
with patch.dict(os.environ, env, clear=True):
|
|
||||||
result = gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="posted", remote="prgs")
|
|
||||||
self.assertFalse(result["success"])
|
|
||||||
self.assertFalse(result["performed"])
|
|
||||||
self.assertTrue(result["reasons"])
|
|
||||||
mock_api.assert_not_called()
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_create_forbidden_overrides_allowed(self, _auth, mock_api):
|
|
||||||
env = {"GITEA_PROFILE_NAME": "gitea-author",
|
|
||||||
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
|
|
||||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.issue.comment"}
|
|
||||||
with patch.dict(os.environ, env, clear=True):
|
|
||||||
result = gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="posted", remote="prgs")
|
|
||||||
self.assertFalse(result["success"])
|
|
||||||
mock_api.assert_not_called()
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_create_empty_body_blocked(self, _auth, mock_api):
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
result = gitea_create_issue_comment(
|
|
||||||
issue_number=9, body=" ", remote="prgs")
|
|
||||||
self.assertFalse(result["success"])
|
|
||||||
mock_api.assert_not_called()
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_create_missing_issue_error_is_redacted(self, _auth, mock_api):
|
|
||||||
mock_api.side_effect = RuntimeError(
|
|
||||||
"Gitea API error 404 for issue (auth was token abc123secret)")
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
with self.assertRaises(RuntimeError) as cm:
|
|
||||||
gitea_create_issue_comment(
|
|
||||||
issue_number=99999, body="posted", remote="prgs")
|
|
||||||
msg = str(cm.exception)
|
|
||||||
self.assertNotIn("abc123secret", msg)
|
|
||||||
self.assertIn("404", msg)
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_create_never_touches_review_endpoints(self, _auth, mock_api):
|
|
||||||
mock_api.return_value = self._comment(555)
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="posted", remote="prgs")
|
|
||||||
for call in mock_api.call_args_list:
|
|
||||||
self.assertNotIn("reviews", call[0][1])
|
|
||||||
self.assertNotIn("/pulls/", call[0][1])
|
|
||||||
|
|
||||||
def test_create_unknown_remote_raises(self):
|
|
||||||
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
gitea_create_issue_comment(
|
|
||||||
issue_number=9, body="x", remote="nope")
|
|
||||||
|
|||||||
@@ -1,232 +0,0 @@
|
|||||||
"""Tests for the operator guide / project skills MCP tools (#128).
|
|
||||||
|
|
||||||
Read-only capability-discovery tools: mcp_get_control_plane_guide,
|
|
||||||
mcp_list_project_skills, mcp_get_skill_guide. Each is tested by calling the
|
|
||||||
underlying function directly with mocked API responses.
|
|
||||||
"""
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
|
||||||
|
|
||||||
import mcp_server # noqa: E402
|
|
||||||
from mcp_server import ( # noqa: E402
|
|
||||||
mcp_get_control_plane_guide,
|
|
||||||
mcp_list_project_skills,
|
|
||||||
mcp_get_skill_guide,
|
|
||||||
)
|
|
||||||
|
|
||||||
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
|
|
||||||
|
|
||||||
AUTHOR_ENV = {
|
|
||||||
"GITEA_PROFILE_NAME": "author-test",
|
|
||||||
"GITEA_ALLOWED_OPERATIONS":
|
|
||||||
"gitea.read,gitea.repo.commit,gitea.branch.create,"
|
|
||||||
"gitea.branch.push,gitea.pr.create,gitea.pr.comment",
|
|
||||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.approve,gitea.pr.merge",
|
|
||||||
}
|
|
||||||
|
|
||||||
REVIEWER_ENV = {
|
|
||||||
"GITEA_PROFILE_NAME": "reviewer-test",
|
|
||||||
"GITEA_ALLOWED_OPERATIONS":
|
|
||||||
"gitea.read,gitea.pr.review,gitea.pr.comment,gitea.pr.approve,"
|
|
||||||
"gitea.pr.request_changes,gitea.pr.merge",
|
|
||||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.create,gitea.branch.push",
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECTED_SKILLS = [
|
|
||||||
"gitea-issue-authoring",
|
|
||||||
"gitea-pr-creation",
|
|
||||||
"gitea-pr-review",
|
|
||||||
"gitea-pr-merge",
|
|
||||||
"gitea-issue-comments",
|
|
||||||
"profile-switching",
|
|
||||||
"redaction-security-review",
|
|
||||||
"jenkins-readonly",
|
|
||||||
"glitchtip-readonly",
|
|
||||||
"release-operator",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class GuideTestBase(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
mcp_server._IDENTITY_CACHE.clear()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
mcp_server._IDENTITY_CACHE.clear()
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# mcp_get_control_plane_guide
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
class TestControlPlaneGuide(GuideTestBase):
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_author_profile_guidance(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
g = mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
self.assertTrue(g["read_only"])
|
|
||||||
self.assertEqual(g["profile"]["role_kind"], "author")
|
|
||||||
self.assertEqual(g["identity"]["authenticated_username"], "author-bot")
|
|
||||||
self.assertEqual(g["identity"]["status"], "verified")
|
|
||||||
blob = " ".join(g["guidance"]).lower()
|
|
||||||
self.assertIn("forbidden", blob)
|
|
||||||
self.assertIn("review", blob)
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "reviewer-bot"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_reviewer_profile_guidance(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
|
||||||
g = mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
self.assertEqual(g["profile"]["role_kind"], "reviewer")
|
|
||||||
blob = " ".join(g["guidance"]).lower()
|
|
||||||
self.assertIn("eligibility", blob)
|
|
||||||
self.assertIn("pinned", blob)
|
|
||||||
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=None)
|
|
||||||
def test_unresolved_identity_instructs_stop(self, _auth):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
g = mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
self.assertEqual(g["identity"]["status"], "unresolved")
|
|
||||||
self.assertIsNone(g["identity"]["authenticated_username"])
|
|
||||||
self.assertIn("STOP", g["identity"]["instruction"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_guide_is_read_only(self, _auth, mock_api):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
for call in mock_api.call_args_list:
|
|
||||||
self.assertEqual(call[0][0], "GET")
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_no_urls_or_keychain_ids_by_default(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
g = mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
blob = json.dumps(g)
|
|
||||||
self.assertNotIn("https://", blob)
|
|
||||||
self.assertNotIn("http://", blob)
|
|
||||||
self.assertNotIn("keychain:", blob)
|
|
||||||
self.assertNotIn(FAKE_AUTH, blob)
|
|
||||||
self.assertNotIn("server", g["identity"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_reveal_opt_in_includes_server(self, _auth, _api):
|
|
||||||
env = dict(AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
|
|
||||||
with patch.dict(os.environ, env, clear=True):
|
|
||||||
g = mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
self.assertIn("gitea.prgs.cc", g["identity"]["server"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
|
||||||
def test_rules_cover_required_topics(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
g = mcp_get_control_plane_guide(remote="prgs")
|
|
||||||
rules = g["rules"]
|
|
||||||
for key in ("hard_stops", "fail_closed", "head_sha_pinning",
|
|
||||||
"merge_confirmation", "redaction", "separation",
|
|
||||||
"profile_switching", "identity_verification"):
|
|
||||||
self.assertIn(key, rules)
|
|
||||||
self.assertIn("MERGE PR", json.dumps(rules["merge_confirmation"]))
|
|
||||||
self.assertTrue(rules["hard_stops"])
|
|
||||||
|
|
||||||
def test_unknown_remote_raises(self):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
mcp_get_control_plane_guide(remote="nope")
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# mcp_list_project_skills
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
class TestProjectSkills(GuideTestBase):
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
def test_registry_complete_and_no_api_calls(self, mock_api):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
r = mcp_list_project_skills()
|
|
||||||
self.assertTrue(r["read_only"])
|
|
||||||
names = [s["name"] for s in r["skills"]]
|
|
||||||
for expected in EXPECTED_SKILLS:
|
|
||||||
self.assertIn(expected, names)
|
|
||||||
self.assertEqual(r["count"], len(r["skills"]))
|
|
||||||
for s in r["skills"]:
|
|
||||||
self.assertTrue(s["description"])
|
|
||||||
self.assertTrue(s["when_to_use"])
|
|
||||||
self.assertIn("required_operations", s)
|
|
||||||
self.assertIn("status", s)
|
|
||||||
self.assertIn("available_to_current_profile", s)
|
|
||||||
mock_api.assert_not_called()
|
|
||||||
|
|
||||||
def test_profile_aware_availability(self):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
r = mcp_list_project_skills()
|
|
||||||
by_name = {s["name"]: s for s in r["skills"]}
|
|
||||||
self.assertTrue(by_name["gitea-pr-creation"]["available_to_current_profile"])
|
|
||||||
self.assertFalse(by_name["gitea-pr-merge"]["available_to_current_profile"])
|
|
||||||
|
|
||||||
def test_unimplemented_services_marked(self):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
r = mcp_list_project_skills()
|
|
||||||
by_name = {s["name"]: s for s in r["skills"]}
|
|
||||||
self.assertNotEqual(by_name["jenkins-readonly"]["status"], "available")
|
|
||||||
self.assertNotEqual(by_name["glitchtip-readonly"]["status"], "available")
|
|
||||||
|
|
||||||
def test_no_urls_in_registry(self):
|
|
||||||
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
|
|
||||||
r = mcp_list_project_skills()
|
|
||||||
blob = json.dumps(r)
|
|
||||||
self.assertNotIn("https://", blob)
|
|
||||||
self.assertNotIn("http://", blob)
|
|
||||||
self.assertNotIn("keychain:", blob)
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# mcp_get_skill_guide
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
class TestSkillGuide(GuideTestBase):
|
|
||||||
|
|
||||||
def test_known_skill_returns_steps(self):
|
|
||||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
|
||||||
r = mcp_get_skill_guide("gitea-pr-merge")
|
|
||||||
self.assertTrue(r["success"])
|
|
||||||
self.assertEqual(r["skill"]["name"], "gitea-pr-merge")
|
|
||||||
self.assertTrue(r["steps"])
|
|
||||||
self.assertIn("MERGE PR", " ".join(r["steps"]))
|
|
||||||
|
|
||||||
def test_case_and_whitespace_normalized(self):
|
|
||||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
|
||||||
r = mcp_get_skill_guide(" Gitea-PR-Merge ")
|
|
||||||
self.assertTrue(r["success"])
|
|
||||||
|
|
||||||
def test_unknown_skill_fails_closed(self):
|
|
||||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
|
||||||
r = mcp_get_skill_guide("no-such-skill")
|
|
||||||
self.assertFalse(r["success"])
|
|
||||||
self.assertTrue(r["reasons"])
|
|
||||||
for expected in EXPECTED_SKILLS:
|
|
||||||
self.assertIn(expected, r["valid_skills"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
def test_read_only_no_api_calls(self, mock_api):
|
|
||||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
|
||||||
mcp_get_skill_guide("gitea-pr-review")
|
|
||||||
mock_api.assert_not_called()
|
|
||||||
|
|
||||||
def test_no_urls_in_skill_guides(self):
|
|
||||||
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
|
|
||||||
for name in EXPECTED_SKILLS:
|
|
||||||
r = mcp_get_skill_guide(name)
|
|
||||||
blob = json.dumps(r)
|
|
||||||
self.assertNotIn("https://", blob)
|
|
||||||
self.assertNotIn("keychain:", blob)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
@@ -1,256 +0,0 @@
|
|||||||
"""Tests for runtime context, profile activation, profile listing, and enhanced error clarity.
|
|
||||||
|
|
||||||
Covers Issue #131 requirements.
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import patch, MagicMock
|
|
||||||
|
|
||||||
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
|
||||||
|
|
||||||
import gitea_config
|
|
||||||
import gitea_auth
|
|
||||||
import mcp_server
|
|
||||||
|
|
||||||
CONFIG_SWITCHING_DISABLED = {
|
|
||||||
"version": 2,
|
|
||||||
"contexts": {
|
|
||||||
"ctx": {
|
|
||||||
"enabled": True,
|
|
||||||
"gitea": {
|
|
||||||
"enabled": True,
|
|
||||||
"base_url": "https://gitea.example.com"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"profiles": {
|
|
||||||
"author-profile": {
|
|
||||||
"enabled": True,
|
|
||||||
"context": "ctx",
|
|
||||||
"role": "author",
|
|
||||||
"username": "author-user",
|
|
||||||
"auth": {"type": "env", "name": "GITEA_TOKEN_AUTHOR"},
|
|
||||||
"allowed_operations": ["gitea.read", "gitea.pr.create", "gitea.branch.push"],
|
|
||||||
"forbidden_operations": ["gitea.pr.approve", "gitea.pr.merge"],
|
|
||||||
"execution_profile": "author-profile"
|
|
||||||
},
|
|
||||||
"reviewer-profile": {
|
|
||||||
"enabled": True,
|
|
||||||
"context": "ctx",
|
|
||||||
"role": "reviewer",
|
|
||||||
"username": "reviewer-user",
|
|
||||||
"auth": {"type": "env", "name": "GITEA_TOKEN_REVIEWER"},
|
|
||||||
"allowed_operations": ["gitea.read", "gitea.pr.approve", "gitea.pr.merge"],
|
|
||||||
"forbidden_operations": ["gitea.pr.create", "gitea.branch.push"],
|
|
||||||
"execution_profile": "reviewer-profile"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"rules": {
|
|
||||||
"allow_runtime_switching": False
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
CONFIG_SWITCHING_ENABLED = {
|
|
||||||
**CONFIG_SWITCHING_DISABLED,
|
|
||||||
"rules": {
|
|
||||||
"allow_runtime_switching": True
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class TestRuntimeClarity(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self._remotes_patch = patch.dict(mcp_server.REMOTES, {
|
|
||||||
"dadeschools": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"},
|
|
||||||
"prgs": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"}
|
|
||||||
})
|
|
||||||
self._remotes_patch.start()
|
|
||||||
mcp_server._IDENTITY_CACHE.clear()
|
|
||||||
gitea_config._active_profile_override = None
|
|
||||||
self._dir = tempfile.TemporaryDirectory()
|
|
||||||
self.config_path = os.path.join(self._dir.name, "profiles.json")
|
|
||||||
self._write_config(CONFIG_SWITCHING_DISABLED)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
self._remotes_patch.stop()
|
|
||||||
mcp_server._IDENTITY_CACHE.clear()
|
|
||||||
gitea_config._active_profile_override = None
|
|
||||||
self._dir.cleanup()
|
|
||||||
|
|
||||||
def _write_config(self, obj):
|
|
||||||
with open(self.config_path, "w", encoding="utf-8") as fh:
|
|
||||||
fh.write(json.dumps(obj))
|
|
||||||
|
|
||||||
def _env(self, profile="author-profile", reveal="0"):
|
|
||||||
return {
|
|
||||||
"GITEA_MCP_CONFIG": self.config_path,
|
|
||||||
"GITEA_MCP_PROFILE": profile,
|
|
||||||
"GITEA_MCP_REVEAL_ENDPOINTS": reveal,
|
|
||||||
"GITEA_TOKEN_AUTHOR": "author-pass",
|
|
||||||
"GITEA_TOKEN_REVIEWER": "reviewer-pass",
|
|
||||||
}
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# gitea_get_runtime_context
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-user"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
|
||||||
def test_get_runtime_context_author(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
|
||||||
ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools")
|
|
||||||
self.assertEqual(ctx["active_profile"], "author-profile")
|
|
||||||
self.assertEqual(ctx["authenticated_username"], "author-user")
|
|
||||||
self.assertEqual(ctx["config_model"], "v2-contexts")
|
|
||||||
self.assertEqual(ctx["profile_source"], "config file profile")
|
|
||||||
self.assertFalse(ctx["runtime_switching_supported"])
|
|
||||||
self.assertEqual(ctx["profile_mode"], "static-profile")
|
|
||||||
self.assertFalse(ctx["review_merge_allowed"])
|
|
||||||
self.assertEqual(ctx["suggested_fix"], "reviewer namespace")
|
|
||||||
self.assertIn("does not permit review or merge", ctx["review_merge_blocked_reasons"][0])
|
|
||||||
self.assertIn("Switch to the reviewer MCP session", ctx["safe_next_action"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "reviewer-user"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token reviewer-pass")
|
|
||||||
def test_get_runtime_context_reviewer(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, self._env("reviewer-profile"), clear=True):
|
|
||||||
ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools")
|
|
||||||
self.assertEqual(ctx["active_profile"], "reviewer-profile")
|
|
||||||
self.assertEqual(ctx["authenticated_username"], "reviewer-user")
|
|
||||||
self.assertTrue(ctx["review_merge_allowed"])
|
|
||||||
self.assertEqual(ctx["suggested_fix"], "none")
|
|
||||||
self.assertEqual(ctx["safe_next_action"], "None; ready for operations.")
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# gitea_list_profiles
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-user"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
|
||||||
def test_list_profiles_redacted_by_default(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, self._env("author-profile", reveal="0"), clear=True):
|
|
||||||
res = mcp_server.gitea_list_profiles()
|
|
||||||
profiles = res["profiles"]
|
|
||||||
self.assertEqual(len(profiles), 2)
|
|
||||||
|
|
||||||
author_prof = next(p for p in profiles if p["name"] == "author-profile")
|
|
||||||
self.assertTrue(author_prof["is_active"])
|
|
||||||
self.assertEqual(author_prof["role_kind"], "author")
|
|
||||||
self.assertEqual(author_prof["auth"]["name"], "<redacted>")
|
|
||||||
self.assertEqual(author_prof["base_url"], "<redacted>")
|
|
||||||
self.assertEqual(author_prof["identity_status"], "verified")
|
|
||||||
|
|
||||||
reviewer_prof = next(p for p in profiles if p["name"] == "reviewer-profile")
|
|
||||||
self.assertFalse(reviewer_prof["is_active"])
|
|
||||||
self.assertEqual(reviewer_prof["role_kind"], "reviewer")
|
|
||||||
self.assertEqual(reviewer_prof["auth"]["name"], "<redacted>")
|
|
||||||
self.assertEqual(reviewer_prof["base_url"], "<redacted>")
|
|
||||||
self.assertEqual(reviewer_prof["identity_status"], "credentials present")
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request", return_value={"login": "author-user"})
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
|
||||||
def test_list_profiles_revealed_under_opt_in(self, _auth, _api):
|
|
||||||
with patch.dict(os.environ, self._env("author-profile", reveal="1"), clear=True):
|
|
||||||
res = mcp_server.gitea_list_profiles()
|
|
||||||
profiles = res["profiles"]
|
|
||||||
|
|
||||||
author_prof = next(p for p in profiles if p["name"] == "author-profile")
|
|
||||||
self.assertEqual(author_prof["auth"]["name"], "GITEA_TOKEN_AUTHOR")
|
|
||||||
self.assertEqual(author_prof["base_url"], "https://gitea.example.com")
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# gitea_activate_profile
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
def test_activate_profile_fails_when_disabled(self):
|
|
||||||
self._write_config(CONFIG_SWITCHING_DISABLED)
|
|
||||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
|
||||||
res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile")
|
|
||||||
self.assertFalse(res["success"])
|
|
||||||
self.assertIn("switching is disabled", res["message"].lower())
|
|
||||||
self.assertIsNone(gitea_config._active_profile_override)
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header")
|
|
||||||
def test_activate_profile_succeeds_when_enabled(self, mock_auth, mock_api):
|
|
||||||
self._write_config(CONFIG_SWITCHING_ENABLED)
|
|
||||||
|
|
||||||
# Setup mock responses for whoami checks
|
|
||||||
mock_auth.side_effect = ["token author-pass", "token reviewer-pass"]
|
|
||||||
mock_api.side_effect = [{"login": "author-user"}, {"login": "reviewer-user"}]
|
|
||||||
|
|
||||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
|
||||||
# Check before state
|
|
||||||
self.assertEqual(gitea_config.selected_profile_name(), "author-profile")
|
|
||||||
|
|
||||||
res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile")
|
|
||||||
|
|
||||||
self.assertTrue(res["success"])
|
|
||||||
self.assertEqual(res["before_profile"], "author-profile")
|
|
||||||
self.assertEqual(res["before_identity"], "author-user")
|
|
||||||
self.assertEqual(res["after_profile"], "reviewer-profile")
|
|
||||||
self.assertEqual(res["after_identity"], "reviewer-user")
|
|
||||||
|
|
||||||
# Global variable override should be set
|
|
||||||
self.assertEqual(gitea_config._active_profile_override, "reviewer-profile")
|
|
||||||
self.assertEqual(gitea_config.selected_profile_name(), "reviewer-profile")
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# gitea_check_pr_eligibility enhanced error clarity
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token reviewer-pass")
|
|
||||||
def test_eligibility_failure_self_author(self, _auth, mock_api):
|
|
||||||
# PR is authored by "reviewer-user" and reviewer-user is trying to approve it.
|
|
||||||
mock_api.side_effect = [
|
|
||||||
{"login": "reviewer-user"}, # user whoami lookup
|
|
||||||
{"user": {"login": "reviewer-user"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
|
|
||||||
]
|
|
||||||
|
|
||||||
with patch.dict(os.environ, self._env("reviewer-profile"), clear=True):
|
|
||||||
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
|
|
||||||
|
|
||||||
self.assertFalse(res["eligible"])
|
|
||||||
self.assertEqual(res["active_identity"], "reviewer-user")
|
|
||||||
self.assertTrue(res["self_author"])
|
|
||||||
self.assertEqual(res["required_identity"], "Any Gitea user other than PR author 'reviewer-user'")
|
|
||||||
self.assertIn("Self-review/self-merge is forbidden", res["safe_next_step"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
|
||||||
def test_eligibility_failure_missing_permissions(self, _auth, mock_api):
|
|
||||||
# PR is authored by "someone-else" and author-user (who lacks approve) is trying to approve it.
|
|
||||||
mock_api.side_effect = [
|
|
||||||
{"login": "author-user"}, # user whoami lookup
|
|
||||||
{"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
|
|
||||||
]
|
|
||||||
|
|
||||||
self._write_config(CONFIG_SWITCHING_ENABLED) # Enable switching to verify fixable_by_profile_switch
|
|
||||||
|
|
||||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
|
||||||
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
|
|
||||||
|
|
||||||
self.assertFalse(res["eligible"])
|
|
||||||
self.assertEqual(res["missing_permission"], "gitea.pr.approve")
|
|
||||||
self.assertTrue(res["fixable_by_profile_switch"])
|
|
||||||
self.assertFalse(res["requires_different_namespace"])
|
|
||||||
self.assertIn("Switch to a reviewer profile by calling gitea_activate_profile", res["safe_next_step"])
|
|
||||||
|
|
||||||
@patch("mcp_server.api_request")
|
|
||||||
@patch("mcp_server.get_auth_header", return_value="token author-pass")
|
|
||||||
def test_eligibility_failure_missing_permissions_switching_disabled(self, _auth, mock_api):
|
|
||||||
# PR is authored by "someone-else" and author-user (lacks approve) tries to approve it when switching is disabled.
|
|
||||||
mock_api.side_effect = [
|
|
||||||
{"login": "author-user"}, # user whoami lookup
|
|
||||||
{"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
|
|
||||||
]
|
|
||||||
|
|
||||||
self._write_config(CONFIG_SWITCHING_DISABLED) # Disable switching
|
|
||||||
|
|
||||||
with patch.dict(os.environ, self._env("author-profile"), clear=True):
|
|
||||||
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
|
|
||||||
|
|
||||||
self.assertFalse(res["eligible"])
|
|
||||||
self.assertEqual(res["missing_permission"], "gitea.pr.approve")
|
|
||||||
self.assertFalse(res["fixable_by_profile_switch"])
|
|
||||||
self.assertTrue(res["requires_different_namespace"])
|
|
||||||
self.assertIn("Switch to the reviewer MCP session", res["safe_next_step"])
|
|
||||||
Reference in New Issue
Block a user