12 Commits

Author SHA1 Message Date
sysadmin 87172229aa feat: support and test MDCPS reviewer identity per #107
- Update v2 test config: mdcps reviewer username '913443' (distinct from author 'jcwalker3'), author 'jcwalker3'
- Adjust TBD negative test to use mutate (post-provisioning)
- Add mdcps-reviewer example to gitea-mcp.example.json and README
- Verifies distinct identities, reviewer cannot create/push, author cannot review/merge

Closes #107

Checks:
- config tests pass
- no secrets in changes (usernames only)
- py_compile and diff clean
2026-07-04 18:35:45 -04:00
sysadmin e88ca1d64b Merge pull request 'feat: complete operator guide and skill registry requirements (#129)' (#134) from feat/issue-129-operator-guide-skills into master 2026-07-04 16:49:45 -05:00
sysadmin 4253f8a52a feat: satisfy Issue #129 operator guide and skill registry requirements 2026-07-04 17:46:19 -04:00
sysadmin cd1d8d71a2 Merge pull request 'fix: redact Gitea web links from PR/issue MCP tool output (#125)' (#133) from feat/issue-125-pr-url-redaction into master 2026-07-04 16:44:24 -05:00
sysadmin c349b98206 docs: note in-place mutation contract on _with_optional_url
Subagent review (read-only) found no blockers; this addresses its one
LOW note by documenting that the helper mutates the passed dict and
must receive a freshly-built one.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-04 17:06:41 -04:00
sysadmin 552f538d97 fix: redact Gitea web links from MCP PR output 2026-07-04 16:46:55 -04:00
sysadmin 2beeeceb90 Merge pull request 'feat: make Gitea MCP profile activation and runtime identity state explicit (#131)' (#132) from feat/issue-131-profile-identity-clarity into master 2026-07-04 13:59:10 -05:00
sysadmin 10a29d1bd5 style: remove trailing whitespace in mcp_server.py and test_runtime_clarity.py 2026-07-04 14:57:21 -04:00
sysadmin a0e7d3360e feat: implement profile activation and runtime identity clarity (#131) 2026-07-04 02:04:07 -04:00
sysadmin b1256d73b4 Merge pull request 'feat: add operator guide and project skills discovery MCP tools (#128)' (#130) from feat/issue-128-operator-guide-skills into master 2026-07-04 00:06:04 -05:00
sysadmin 6a8a9d99b7 feat: add operator guide and project skills discovery MCP tools (#128)
Add three read-only capability-discovery tools so new LLM sessions can
learn the workflow rules and available project skills from the MCP
server instead of long pasted operator prompts:

- mcp_get_control_plane_guide: active profile, authenticated identity
  (fail-soft; unresolved identity returns STOP instructions),
  allowed/forbidden operations, profile-aware guidance (author profiles
  are told review/approve/merge is forbidden; reviewer profiles are told
  review/merge requires eligibility checks and a pinned head SHA; mixed
  profiles get a misconfiguration warning), and the standing rules: hard
  stops, fail-closed behavior, head-SHA pinning, merge confirmation,
  redaction, author/reviewer/merger separation, profile switching, and
  identity verification.
- mcp_list_project_skills: registry of ten project workflows (issue
  authoring, PR creation, PR review, PR merge, issue comments, profile
  switching, redaction/security review, Jenkins read-only, GlitchTip
  read-only, release/operator) with description, when-to-use, required
  operations, status, and per-profile availability. Unimplemented
  services are listed as designed-not-implemented rather than omitted.
- mcp_get_skill_guide: step-by-step guide per skill; unknown names fail
  closed with the list of valid names.

All three are read-only and change no existing gate or permission.
Normal output contains no endpoint URLs or keychain IDs; the guide
includes the server host only under GITEA_MCP_REVEAL_ENDPOINTS=1.

Tests (tests/test_operator_guide.py, 17 new): profile-aware guidance
for author/reviewer, unresolved-identity STOP, read-only behavior,
redaction defaults and reveal opt-in, rules coverage, registry
completeness and profile awareness, unimplemented-service marking,
fail-closed unknown skill names.

Docs: llm-workflow-runbooks.md now tells new sessions to call the guide
tools first.

Closes #128

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 19:49:11 -04:00
sysadmin 349bc06da7 Merge pull request 'feat: add Gitea issue comment list/create MCP tools (#126)' (#127) from feat/issue-126-issue-comment-tools into master 2026-07-03 18:40:38 -05:00
10 changed files with 1522 additions and 30 deletions
+6
View File
@@ -221,6 +221,12 @@ Canonical profile file (e.g. `~/.config/gitea-tools/profiles.json`):
"username": "913443",
"auth": { "type": "env", "name": "GITEA_TOKEN_MDCPS" },
"execution_profile": "mdcps"
},
"mdcps-reviewer": {
"base_url": "https://gitea.dadeschools.net",
"username": "913443",
"auth": { "type": "keychain", "id": "mdcps.gitea.reviewer.token" },
"execution_profile": "mdcps-reviewer"
}
}
}
+20
View File
@@ -252,6 +252,26 @@ the "one server per trust boundary" model described in
[`tool-boundaries.md`](tool-boundaries.md) and
[`credential-isolation.md`](credential-isolation.md).
## Profile Activation and Runtime Identity Clarity (#131)
To make Gitea MCP profile activation and runtime identity state explicit, the following mechanisms are supported:
### 1. Static-Profile vs. Dynamic-Profile Mode
- **Static-Profile Mode (Default):** The active profile is fixed at server launch based on the `GITEA_MCP_PROFILE` environment variable (with `GITEA_MCP_CONFIG` pointing to the config path). Local environment variables are static once a subprocess is spawned by the host. Modifying the environment variables on the host does not dynamically update an already-connected MCP server process.
- **Dynamic-Profile Mode:** Profile switching via the `gitea_activate_profile` tool is supported **only** if the configuration JSON explicitly opts in by setting `"allow_runtime_switching": true` under rules or top-level keys. Otherwise, attempting to switch profiles dynamically will fail closed.
### 2. Dual MCP Namespaces Recommendation
For security-sensitive or high-risk tasks, the preferred safety model uses separate, isolated MCP server instances (namespaces/sessions) launched with static profiles:
- `gitea-author`: Exposes tools configured with author permissions; cannot perform approvals or merges.
- `gitea-reviewer`: Exposes tools configured with reviewer permissions; used for PR reviews and merges.
This layout maintains physical separation of credentials and prevents privilege escalation within a single session.
### 3. Verification Post-Switching
When dynamic profile switching is enabled and a profile is activated via `gitea_activate_profile`, the session MUST immediately:
1. Clear the cached identity.
2. Call `gitea_whoami` with the target remote to prove and verify the fresh Gitea authenticated identity.
This guarantees the active profile operations align with the actual Gitea authenticated user credential.
## Relationship to roadmap issues
This document defines the **model only**. Related work is tracked separately
+12
View File
@@ -18,6 +18,18 @@ behavior they rely on already exists (canonical runtime profiles, the
interactive setup menu, identity/eligibility checks, gated review/merge, and
audit logging). See [Related documents](#related-documents).
> **New session? Call the guide tools first (#128 / #129).** Before using any other
> Gitea MCP tool in a fresh session, call `mcp_get_control_plane_guide`
> (read-only): it reports the active profile, authenticated identity,
> allowed/forbidden operations, profile-aware do/don't guidance, and the
> non-negotiable rules (hard stops, fail-closed behavior, head-SHA pinning,
> merge confirmation, redaction, author/reviewer separation, profile
> switching). Also call `gitea_get_runtime_context` and `mcp_list_project_skills`
> to discover the available project workflows and `mcp_get_skill_guide(<name>)`
> for step-by-step instructions. This replaces long pasted operator prompts for
> the standard rules; operator prompts still control task-specific scope.
> See issue #129 for the skill registry design.
For cross-project use, copy the portable workflow skill at
[`../skills/llm-project-workflow/SKILL.md`](../skills/llm-project-workflow/SKILL.md).
It extracts the issue-first, isolated-worktree, no-self-review, profile-safety,
+12
View File
@@ -21,6 +21,18 @@
"default_owner": "Contractor",
"execution_profile": "mdcps"
},
"mdcps-reviewer": {
"base_url": "https://gitea.dadeschools.net",
"username": "913443",
"auth": {
"type": "keychain",
"id": "mdcps.gitea.reviewer.token"
},
"default_owner": "MDCPS",
"execution_profile": "mdcps-reviewer",
"allowed_operations": ["read", "review", "approve", "merge"],
"forbidden_operations": ["branch.push", "pr.create"]
},
"prgs-env": {
"base_url": "https://gitea.prgs.cc",
"username": "jcwalker3",
+21
View File
@@ -192,11 +192,32 @@ def config_path():
return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None
_active_profile_override = None
def selected_profile_name():
"""Return the selected profile name from the environment, or None."""
if _active_profile_override is not None:
return _active_profile_override
return (os.environ.get(ENV_PROFILE) or "").strip() or None
def is_runtime_switching_enabled(path=None):
"""Check if runtime profile switching is explicitly enabled in config."""
try:
config = load_config(path)
except Exception:
return False
if not config:
return False
rules = config.get("rules") or {}
if rules.get("allow_runtime_switching") is True:
return True
if config.get("allow_runtime_switching") is True:
return True
return False
def load_config(path=None):
"""Load and minimally validate the canonical JSON config.
+845 -17
View File
@@ -54,6 +54,17 @@ def _reveal_endpoints() -> bool:
return (os.environ.get("GITEA_MCP_REVEAL_ENDPOINTS") or "").strip().lower() \
in ("1", "true", "yes")
def _with_optional_url(result: dict, url: str | None) -> dict:
"""Attach web links only under the explicit endpoint reveal opt-in.
Mutates *result* in place and returns it; pass a freshly-built dict,
never a shared/aliased one.
"""
if _reveal_endpoints() and url:
result["url"] = url
return result
mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
"Use the gitea_ prefixed tools to create issues, PRs, list issues, etc."
@@ -310,7 +321,7 @@ def gitea_create_issue(
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created issue.
dict with 'number' of the created issue ('url' only with the reveal opt-in).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
@@ -325,7 +336,7 @@ def gitea_create_issue(
_audit("create_issue", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.SUCCEEDED, issue_number=data["number"],
request_metadata={"title": title})
return {"number": data["number"], "url": data["html_url"]}
return _with_optional_url({"number": data["number"]}, data.get("html_url"))
@mcp.tool()
@@ -352,7 +363,7 @@ def gitea_create_pr(
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created PR.
dict with 'number' of the created PR ('url' only with the reveal opt-in).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
@@ -369,7 +380,7 @@ def gitea_create_pr(
_audit("create_pr", host=h, remote=remote, org=o, repo=r,
result=gitea_audit.SUCCEEDED, pr_number=data["number"],
target_branch=head, request_metadata=meta)
return {"number": data["number"], "url": data["html_url"]}
return _with_optional_url({"number": data["number"]}, data.get("html_url"))
@mcp.tool()
@@ -390,24 +401,25 @@ def gitea_list_prs(
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'head', 'base', 'url', 'mergeable'.
List of dicts with 'number', 'title', 'state', 'head', 'base', and
'mergeable' ('url' only with the reveal opt-in).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls?state={state}"
prs = api_get_all(url, auth)
return [
{
results = []
for pr in prs:
entry = {
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
}
for pr in prs
]
results.append(_with_optional_url(entry, pr.get("html_url")))
return results
@mcp.tool()
@@ -434,17 +446,17 @@ def gitea_view_pr(
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls/{pr_number}"
pr = api_request("GET", url, auth)
return {
result = {
"number": pr["number"],
"title": pr["title"],
"body": pr.get("body", ""),
"state": pr["state"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"],
"mergeable": pr.get("mergeable"),
"user": pr.get("user", {}).get("login", ""),
}
return _with_optional_url(result, pr.get("html_url"))
# Actions whose eligibility this tool can evaluate.
@@ -597,6 +609,83 @@ def gitea_check_pr_eligibility(
result["eligible"] = len(reasons) == 0
if result["eligible"]:
reasons.append("all eligibility checks passed")
# Add enhanced error clarity details (#131)
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
profile_name = result["profile_name"]
is_self = bool(auth_user and pr_author and auth_user == pr_author)
result["active_identity"] = auth_user
result["active_profile"] = profile_name
result["self_author"] = is_self
# Determine missing permission
missing_perm = None
if not op_ok:
missing_perm = f"gitea.pr.{action}" if action in ("approve", "request_changes", "merge") else f"gitea.{action}"
result["missing_permission"] = missing_perm
# Determine required profile
req_profile = None
if action in ("approve", "request_changes", "review"):
req_profile = "A profile with reviewer role permissions (allowing approve/merge/review, and forbidding author operations)"
elif action == "merge":
req_profile = "A profile with reviewer role permissions and explicit merge permission"
result["required_profile"] = req_profile
# Determine required identity
req_identity = None
if is_self:
req_identity = f"Any Gitea user other than PR author '{pr_author}'"
result["required_identity"] = req_identity
# Determine if fixable by profile switch vs requires different namespace
switching_supported = gitea_config.is_runtime_switching_enabled()
fixable_by_switch = False
req_different_ns = False
if not op_ok:
config = gitea_config.load_config()
has_capable_profile = False
if config:
for name, p in (config.get("profiles") or {}).items():
p_allowed = p.get("allowed_operations", [])
p_forbidden = p.get("forbidden_operations", [])
ok, _ = gitea_config.check_operation(action, p_allowed, p_forbidden)
if ok:
has_capable_profile = True
break
if switching_supported and has_capable_profile:
fixable_by_switch = True
else:
req_different_ns = True
result["fixable_by_profile_switch"] = fixable_by_switch
result["requires_different_namespace"] = req_different_ns
# Determine safe next step
safe_step = "Ready."
if not result["eligible"]:
if is_self:
safe_step = "Self-review/self-merge is forbidden. Ask a different operator/reviewer to review and merge this PR."
elif not auth_user:
safe_step = f"Ask the operator to configure valid credentials/token for profile '{profile_name}'."
elif not op_ok:
if fixable_by_switch:
safe_step = f"Switch to a reviewer profile by calling gitea_activate_profile with a profile that allows {action}."
else:
safe_step = "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile."
elif result["pr_state"] != "open":
safe_step = "No action can be taken on a closed/merged PR."
elif action == "merge":
if result["mergeable"] is False:
safe_step = "Address conflicts in the PR branches first, or wait for CI checks to pass before merging."
elif result["mergeable"] is None:
safe_step = "Wait for Gitea to finish calculating mergeability or trigger a re-check."
result["safe_next_step"] = safe_step
return result
@@ -844,15 +933,15 @@ def gitea_edit_pr(
except Exception:
pass
return {
result = {
"success": True,
"number": data["number"],
"title": data["title"],
"body": data.get("body", ""),
"state": data["state"],
"url": data["html_url"],
"cleanup_status": cleanup_status,
}
return _with_optional_url(result, data.get("html_url"))
@mcp.tool()
@@ -1347,21 +1436,22 @@ def gitea_view_issue(
repo: Override the repository name.
Returns:
dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'.
dict with 'number', 'title', 'body', 'state', 'labels', and 'assignee'
('url' only with the reveal opt-in).
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
i = api_request("GET", url, auth)
return {
result = {
"number": i["number"],
"title": i["title"],
"body": i.get("body", ""),
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
"url": i["html_url"],
}
return _with_optional_url(result, i.get("html_url"))
def _issue_comment_gate(op: str) -> list[str]:
@@ -1503,6 +1593,444 @@ def gitea_create_issue_comment(
return result
# ── Operator guide / project skills (#128) ───────────────────────────────────
# Read-only capability discovery for new LLM sessions: what the active
# profile may do, how identity is verified, the non-negotiable safety rules,
# and which project workflows ("skills") exist. Nothing here mutates state or
# widens any permission — the guide describes the gates, it never bypasses
# them.
def _role_kind(allowed, forbidden) -> str:
"""Classify the active profile from its normalized operations.
'author' can create PRs / push branches; 'reviewer' can approve/merge;
'mixed' can do both (a config smell — the reviewer-deadlock invariant
forbids it in v2 configs); 'limited' can do neither.
"""
def can(op):
return gitea_config.check_operation(op, allowed, forbidden)[0]
review = can("gitea.pr.approve") or can("gitea.pr.merge")
author = can("gitea.pr.create") or can("gitea.branch.push")
if review and author:
return "mixed"
if review:
return "reviewer"
if author:
return "author"
return "limited"
_HARD_STOPS = [
"No self-review and no self-merge: the authenticated user must differ "
"from the PR author for review/approve/merge.",
"No tags or releases without an explicit operator instruction.",
"Never print token values, keychain IDs, passwords, or raw service "
"URLs in normal output.",
"No production access or production mutations.",
"No force-merge and no bypassing of merge gates; Gitea's own "
"mergeable signal is honoured, never overridden.",
"Do not rewrite the live profiles config; config changes are "
"operator-owned.",
]
_GUIDE_RULES = {
"hard_stops": _HARD_STOPS,
"fail_closed": (
"Every gate fails closed: unknown/disabled profiles, unknown "
"operations, empty allowed-operation lists, unresolved identity, "
"and unparseable config all deny the action instead of guessing. "
"Disabled profiles/services are never silently substituted."),
"head_sha_pinning": (
"Before reviewing or merging, record the PR head SHA and pass it as "
"expected_head_sha; if the head moves between review and merge, the "
"tool refuses and the review must be redone."),
"merge_confirmation": (
"gitea_merge_pr requires confirmation to equal 'MERGE PR <n>' "
"exactly (e.g. 'MERGE PR 42'); without it no API call is made. "
"Merge only when the operator has explicitly authorized it."),
"redaction": [
"Normal output contains no endpoint URLs, keychain IDs, or token "
"values; auth is reported as type/status only.",
"GITEA_MCP_REVEAL_ENDPOINTS=1 is the local admin/debug opt-in for "
"web links and endpoint names; it never reveals token values.",
"Errors are redacted before being surfaced.",
],
"separation": (
"Author, reviewer, and merger are separate identities. An author "
"profile creates branches/commits/PRs and must never "
"review/approve/merge its own work; a reviewer/merger profile "
"reviews and merges and must never create PRs or push branches "
"(reviewer-deadlock invariant). Issue discussion comments "
"(gitea.issue.comment) are gated separately from PR reviews "
"(gitea.pr.*)."),
"profile_switching": [
"The active profile is selected via GITEA_MCP_PROFILE (with GITEA_MCP_CONFIG pointing at the config path).",
"By default, static-profile mode is active: changing local environment variables does not dynamically update the already-connected MCP server process.",
"If runtime profile switching is explicitly enabled in the config (allow_runtime_switching: true), use gitea_activate_profile to switch profiles in-place.",
"If switching is disabled, you must change the launcher configuration/environment (dual-namespace) and reconnect/restart the MCP session.",
"After any profile switch or startup, you must verify the new runtime identity with a fresh gitea_whoami call before taking action."
],
"identity_verification": [
"Call gitea_whoami with an explicit remote first; confirm the "
"authenticated username and profile match the role you were asked "
"to perform.",
"If the username, profile, or allowed operations do not match the "
"task, STOP and report instead of proceeding.",
"Runtime identity (whoami) is the source of truth; config-declared "
"roles are metadata only.",
],
}
_COMMON_WORKFLOWS = [
"issue authoring: verify identity, create/claim the issue, keep scope "
"explicit (remote/org/repo).",
"implementation: claim issue, branch from fresh master, implement only "
"the issue scope, test, open a PR referencing the issue, stop.",
"PR review: verify reviewer identity, pin the head SHA, validate "
"independently, post a verdict; never review your own work.",
"PR merge: reviewer identity + eligibility check + pinned head + "
"explicit 'MERGE PR <n>' confirmation, only with operator "
"authorization.",
]
# Skill registry (#128). status: 'available' = backed by tools in this
# server; 'designed-not-implemented' = design exists but no MCP tools yet
# (listed rather than omitted so sessions know the boundary); 'operator-only'
# = never performed by an LLM session.
_PROJECT_SKILLS = {
"gitea-issue-authoring": {
"description": "Create, view, label, claim, and close Gitea issues.",
"when_to_use": "Filing new work, updating issue state, claiming an "
"issue before implementation (gitea_mark_issue).",
"required_operations": ["gitea.read"],
"status": "available",
"notes": "Always pass remote/org/repo explicitly.",
"steps": [
"Verify identity with gitea_whoami (explicit remote).",
"Check the issue queue with gitea_list_issues.",
"Create with gitea_create_issue or claim with gitea_mark_issue "
"action='start'.",
"Keep issue bodies free of secrets, tokens, and raw service "
"URLs.",
"Release the claim (action='done') or close when finished.",
],
},
"gitea-pr-creation": {
"description": "Author a feature branch and open a pull request.",
"when_to_use": "After implementing a claimed issue on a feature "
"branch; author profiles only.",
"required_operations": ["gitea.pr.create"],
"status": "available",
"steps": [
"Branch from fresh master (git pull first).",
"Implement only the claimed issue's scope; stage files "
"explicitly.",
"Run targeted tests, the full suite, py_compile, git diff "
"--check, and a secret sweep before committing.",
"Open the PR with gitea_create_pr referencing the issue "
"('Closes #<n>').",
"Stop: do not review or merge your own PR.",
],
},
"gitea-pr-review": {
"description": "Independently review a pull request and post a "
"verdict.",
"when_to_use": "Reviewer profile asked to evaluate someone else's "
"PR.",
"required_operations": ["gitea.pr.review"],
"status": "available",
"steps": [
"Verify reviewer identity with gitea_whoami; the PR author "
"must be a different user.",
"Pin the PR head SHA (gitea_view_pr) before validating.",
"Validate independently: scope vs the linked issue, tests, "
"diff check, secret sweep.",
"Post the verdict with gitea_review_pr / "
"gitea_submit_pr_review.",
"Do not merge unless the operator explicitly authorizes it.",
],
},
"gitea-pr-merge": {
"description": "Gated merge of an approved pull request.",
"when_to_use": "Reviewer/merger profile with explicit operator "
"authorization to merge.",
"required_operations": ["gitea.pr.merge"],
"status": "available",
"steps": [
"Confirm operator authorization for this specific merge.",
"Run gitea_check_pr_eligibility action='merge' and resolve "
"every reason it reports.",
"Call gitea_merge_pr with expected_head_sha pinned and "
"confirmation 'MERGE PR <n>'.",
"Confirm linked issues auto-closed and the in-progress label "
"was released.",
],
},
"gitea-issue-comments": {
"description": "Read and post issue discussion comments (distinct "
"from PR reviews).",
"when_to_use": "Design discussions, review notes on issues, "
"decision records.",
"required_operations": ["gitea.issue.comment"],
"status": "available",
"notes": "Listing needs only gitea.read; posting needs "
"gitea.issue.comment, gated separately from gitea.pr.*.",
"steps": [
"List with gitea_list_issue_comments (explicit issue number).",
"Post with gitea_create_issue_comment; markdown body, no "
"secrets or raw URLs.",
"Never use PR review tools for issue discussion or vice "
"versa.",
],
},
"profile-switching": {
"description": "Change the active MCP identity between author and "
"reviewer profiles.",
"when_to_use": "A task requires a role the current profile "
"forbids (e.g. moving from authoring to review).",
"required_operations": [],
"status": "available",
"steps": [
"Check profile mode using gitea_get_runtime_context.",
"If dynamic-profile mode is enabled (allow_runtime_switching: true), "
"call gitea_activate_profile to switch in-place.",
"If static-profile mode (default), ask the operator to update GITEA_MCP_PROFILE "
"in the launcher environment and reconnect/restart the MCP server.",
"Immediately verify the new identity with a fresh gitea_whoami call before taking action.",
"If identity does not match the expected role, STOP.",
],
},
"redaction-security-review": {
"description": "Sweep changes and tool output for secrets, "
"keychain IDs, token values, and raw service URLs.",
"when_to_use": "Before every commit/PR and when reviewing any "
"diff.",
"required_operations": ["gitea.read"],
"status": "available",
"steps": [
"Run git diff --check for whitespace damage.",
"Grep the diff for token/password/secret/keychain patterns; "
"only synthetic fixtures may match.",
"Confirm normal tool output contains no endpoint URLs or "
"keychain IDs (reveal opt-in excepted).",
"Confirm no live config or private machine paths are being "
"committed.",
],
},
"jenkins-readonly": {
"description": "Read-only Jenkins CI inspection (jobs, builds, "
"logs).",
"when_to_use": "Checking CI state once Jenkins MCP tools exist.",
"required_operations": ["jenkins.read"],
"status": "designed-not-implemented",
"notes": "Design exists (issues #72/#77); no Jenkins MCP server is "
"connected yet. Report SKIPPED rather than substituting "
"shell or direct API calls.",
"steps": [
"Confirm a Jenkins MCP server is connected; if not, report "
"SKIPPED.",
"Use read-only operations only; never trigger, cancel, or "
"configure builds.",
],
},
"glitchtip-readonly": {
"description": "Read-only GlitchTip error/event inspection.",
"when_to_use": "Investigating reported errors once GlitchTip MCP "
"tools exist.",
"required_operations": ["glitchtip.read"],
"status": "designed-not-implemented",
"notes": "Design exists (issue #73); no GlitchTip MCP server is "
"connected yet. Report SKIPPED rather than substituting "
"shell or direct API calls.",
"steps": [
"Confirm a GlitchTip MCP server is connected; if not, report "
"SKIPPED.",
"Use read-only operations only; never mutate issues or "
"settings.",
],
},
"release-operator": {
"description": "Release, tag, and version workflows.",
"when_to_use": "Never by an LLM session on its own: releases and "
"tags are operator-owned.",
"required_operations": [],
"status": "operator-only",
"notes": "See the release SOP docs. Hard stop: no tags or "
"releases without an explicit operator instruction.",
"steps": [
"Stop and hand off to the operator; do not create tags or "
"releases.",
],
},
}
@mcp.tool()
def mcp_get_control_plane_guide(
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Structured operator guide for the current Gitea MCP session (#128).
Read-only. Call this first in a new session: it reports the active
profile, the authenticated identity (fail-soft), what this profile may
and may not do, and the non-negotiable workflow rules (hard stops,
fail-closed behavior, head-SHA pinning, merge confirmation, redaction,
author/reviewer separation, profile switching). Guidance is
profile-aware; if identity cannot be resolved it instructs the session
to STOP.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
Returns:
dict with 'read_only', 'remote', 'profile' (name, operations,
role_kind), 'identity' (username/status/instruction; 'server' only
with the reveal opt-in), 'guidance' (profile-specific), 'rules',
'workflows', and 'skills_tool'. Never secrets.
"""
h, _, _ = _resolve(remote, host, None, None)
profile = get_profile()
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
role = _role_kind(allowed, forbidden)
username = _authenticated_username(h)
identity = {
"authenticated_username": username,
"remote": remote,
"status": "verified" if username else "unresolved",
"instruction": (
"Identity verified — confirm it matches the role this task "
"requires before acting."
if username else
"STOP: the authenticated identity could not be resolved. Do "
"not perform any mutating operation; report to the operator "
"and wait."),
}
if _reveal_endpoints():
identity["server"] = h
guidance = []
if not username:
guidance.append(
"STOP: identity unresolved — no mutating operations until the "
"operator fixes credentials/profile and gitea_whoami verifies.")
if role == "author":
guidance.append(
"Author profile: creating branches, commits, issues, and PRs "
"is allowed; review, approve, and merge are forbidden for this "
"profile — never attempt them, and never review or merge your "
"own PR from any profile.")
elif role == "reviewer":
guidance.append(
"Reviewer profile: review/approve/merge may proceed ONLY after "
"gitea_check_pr_eligibility passes and the PR head SHA is "
"pinned (expected_head_sha); the PR author must be a different "
"user, and merging additionally requires explicit operator "
"authorization plus the 'MERGE PR <n>' confirmation.")
elif role == "mixed":
guidance.append(
"WARNING: this profile allows both authoring and "
"review/merge, which defeats two-party review. Treat it as "
"misconfigured: STOP and report to the operator before any "
"review or merge.")
else:
guidance.append(
"Limited profile: no authoring and no review/merge "
"operations are allowed. Read-only work only; anything else "
"fails closed.")
return {
"read_only": True,
"remote": remote,
"profile": {
"name": profile["profile_name"],
"allowed_operations": allowed,
"forbidden_operations": forbidden,
"role_kind": role,
},
"identity": identity,
"guidance": guidance,
"rules": _GUIDE_RULES,
"workflows": _COMMON_WORKFLOWS,
"skills_tool": "mcp_list_project_skills",
}
@mcp.tool()
def mcp_list_project_skills() -> dict:
"""List the project's workflow skills and when to use them (#128).
Read-only; makes no API calls. Each skill reports its name,
description, when-to-use guidance, required operations, status
('available', 'designed-not-implemented', or 'operator-only'), and
whether the current profile's operations permit it. Use
mcp_get_skill_guide(name) for the step-by-step guide.
Returns:
dict with 'read_only', 'count', and 'skills' (list). Never secrets.
"""
profile = get_profile()
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
skills = []
for name, meta in _PROJECT_SKILLS.items():
ops = meta["required_operations"]
available = all(
gitea_config.check_operation(op, allowed, forbidden)[0]
for op in ops
) if ops else True
entry = {
"name": name,
"description": meta["description"],
"when_to_use": meta["when_to_use"],
"required_operations": ops,
"status": meta["status"],
"available_to_current_profile": (
available and meta["status"] == "available"),
}
if meta.get("notes"):
entry["notes"] = meta["notes"]
skills.append(entry)
return {"read_only": True, "count": len(skills), "skills": skills}
@mcp.tool()
def mcp_get_skill_guide(skill_name: str) -> dict:
"""Step-by-step guide for one named project skill (#128).
Read-only; makes no API calls. Unknown skill names fail closed with
the list of valid names instead of guessing.
Args:
skill_name: A name from mcp_list_project_skills (case-insensitive).
Returns:
dict with 'success', 'skill' metadata, and 'steps'; on an unknown
name, 'success' False with 'reasons' and 'valid_skills'.
"""
key = (skill_name or "").strip().lower()
meta = _PROJECT_SKILLS.get(key)
if meta is None:
return {
"success": False,
"reasons": [f"unknown skill '{skill_name}' (fail closed)"],
"valid_skills": sorted(_PROJECT_SKILLS),
}
skill = {
"name": key,
"description": meta["description"],
"when_to_use": meta["when_to_use"],
"required_operations": meta["required_operations"],
"status": meta["status"],
}
if meta.get("notes"):
skill["notes"] = meta["notes"]
return {"success": True, "skill": skill, "steps": list(meta["steps"])}
@mcp.tool()
def gitea_whoami(
remote: str = "dadeschools",
@@ -1677,6 +2205,306 @@ def gitea_get_profile(
return result
@mcp.tool()
def gitea_get_runtime_context(
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Read-only: explicit visibility into active profile, configuration model, and eligibility.
Reports config model shape, profile mode (static vs dynamic), and detailed blocks.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
"""
profile = get_profile()
config = gitea_config.load_config()
reveal = _reveal_endpoints()
# Determine config model/version
if config is None:
config_model = "env-only"
elif config.get("version") == 1:
config_model = "v1"
elif config.get("version") == 2:
if "contexts" in config or config.get("shape") == "contexts":
config_model = "v2-contexts"
else:
config_model = "v2-environments"
else:
config_model = f"unknown-version-{config.get('version')}"
# Determine profile source
if os.environ.get("GITEA_PROFILE_NAME"):
profile_source = "env var"
elif gitea_config.selected_profile_name():
profile_source = "config file profile"
else:
profile_source = "default"
h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None)
username = _authenticated_username(h) if h else None
switching = gitea_config.is_runtime_switching_enabled()
profile_mode = "dynamic-profile" if switching else "static-profile"
# Evaluate review/merge eligibility
allowed = profile["allowed_operations"] or []
forbidden = profile["forbidden_operations"] or []
# Check approve
approve_ok, _ = gitea_config.check_operation("approve", allowed, forbidden)
# Check merge
merge_ok, _ = gitea_config.check_operation("merge", allowed, forbidden)
review_merge_allowed = False
blocked_reasons = []
suggested_fix = "none"
if not username:
blocked_reasons.append("Authenticated identity is unresolved. Gitea credentials are missing or invalid.")
suggested_fix = "operator action"
elif not (approve_ok or merge_ok):
blocked_reasons.append(
f"Active profile '{profile['profile_name']}' does not permit review or merge operations."
)
if switching:
suggested_fix = "profile switch"
else:
suggested_fix = "reviewer namespace"
else:
review_merge_allowed = True
# Note that self-review and self-merge are blocked regardless of profile roles
blocked_reasons.append(
"Note: self-review and self-merge are always blocked at runtime for any PR authored by the active user."
)
safe_next_action = "None; ready for operations."
if suggested_fix == "operator action":
safe_next_action = f"Ask the operator to configure valid credentials/token for profile '{profile['profile_name']}'."
elif suggested_fix == "profile switch":
safe_next_action = "Switch to a reviewer profile by calling gitea_activate_profile with a reviewer profile."
elif suggested_fix == "reviewer namespace":
safe_next_action = (
"Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, "
"or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile."
)
result = {
"active_profile": profile["profile_name"],
"authenticated_username": username,
"remote": remote if remote in REMOTES else None,
"config_model": config_model,
"profile_source": profile_source,
"allowed_operations": allowed,
"forbidden_operations": forbidden,
"runtime_switching_supported": switching,
"profile_mode": profile_mode,
"review_merge_allowed": review_merge_allowed,
"review_merge_blocked_reasons": blocked_reasons,
"suggested_fix": suggested_fix,
"safe_next_action": safe_next_action,
}
if reveal and h:
result["server"] = f"https://{h}"
return result
@mcp.tool()
def gitea_list_profiles() -> dict:
"""Read-only: list all Gitea MCP profiles with redacted metadata.
Exposes names, role kinds, enabled state, allowed/forbidden operations, and active status.
Token values, keychain IDs, and raw URLs are hidden unless GITEA_MCP_REVEAL_ENDPOINTS=1 is enabled.
"""
config = gitea_config.load_config()
reveal = _reveal_endpoints()
active_name = gitea_config.selected_profile_name() or get_profile()["profile_name"]
profiles_out = []
if config is None:
# Env-only: return the active profile
p = get_profile()
role = _role_kind(p["allowed_operations"], p["forbidden_operations"])
h = REMOTES.get("dadeschools", {}).get("host") # default host
username = _authenticated_username(h) if h else None
prof = {
"name": p["profile_name"],
"role_kind": role,
"allowed_operations": p["allowed_operations"],
"forbidden_operations": p["forbidden_operations"],
"identity_status": "verified" if username else "unresolved",
"is_active": True,
"enabled": True,
}
if reveal:
prof["token_source_name"] = p["token_source_name"]
prof["base_url"] = p["base_url"]
profiles_out.append(prof)
else:
# Load from config profiles
profiles_dict = config.get("profiles") or {}
unavailable_dict = config.get("unavailable") or {}
audit_only_profiles = config.get("audit_only_profiles") or {}
# First, process available profiles
for name, p in profiles_dict.items():
role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", []))
is_active = (name == active_name)
# Identity status lookup
if is_active:
h = REMOTES.get("dadeschools", {}).get("host") # default host
username = _authenticated_username(h) if h else None
identity_status = "verified" if username else "unresolved"
else:
# Safely resolve if credentials are present without networking
try:
tok = gitea_config.resolve_token(p)
identity_status = "credentials present" if tok else "missing credentials"
except Exception:
identity_status = "missing credentials"
prof = {
"name": name,
"role_kind": role,
"allowed_operations": p.get("allowed_operations", []),
"forbidden_operations": p.get("forbidden_operations", []),
"identity_status": identity_status,
"is_active": is_active,
"enabled": p.get("enabled", True),
}
if reveal:
if isinstance(p.get("auth"), dict):
prof["auth"] = p["auth"]
prof["base_url"] = p.get("base_url")
else:
if isinstance(p.get("auth"), dict):
prof["auth"] = {k: ("<redacted>" if k != "type" else v) for k, v in p["auth"].items()}
prof["base_url"] = "<redacted>"
profiles_out.append(prof)
# Process unavailable/disabled/audit-only profiles
for name, p in audit_only_profiles.items():
role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", []))
is_active = (name == active_name)
prof = {
"name": name,
"role_kind": role,
"allowed_operations": p.get("allowed_operations", []),
"forbidden_operations": p.get("forbidden_operations", []),
"identity_status": "unavailable (disabled)",
"is_active": is_active,
"enabled": p.get("enabled", True),
"unavailable_reason": p.get("_unavailable_reason"),
}
if reveal:
if isinstance(p.get("auth"), dict):
prof["auth"] = p["auth"]
prof["base_url"] = p.get("base_url")
else:
if isinstance(p.get("auth"), dict):
prof["auth"] = {k: ("<redacted>" if k != "type" else v) for k, v in p["auth"].items()}
prof["base_url"] = "<redacted>"
profiles_out.append(prof)
# Handle unavailable mappings that are not in audit_only
for name, reason in unavailable_dict.items():
if not any(x["name"] == name for x in profiles_out):
profiles_out.append({
"name": name,
"role_kind": "limited",
"allowed_operations": [],
"forbidden_operations": [],
"identity_status": "unavailable",
"is_active": (name == active_name),
"enabled": False,
"unavailable_reason": reason,
})
return {"profiles": profiles_out}
@mcp.tool()
def gitea_activate_profile(
profile_name: str,
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Gated profile activation. Switch the active profile in-place for this runtime session.
Only allowed if "allow_runtime_switching": true is explicitly configured.
Args:
profile_name: Profile to activate.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
"""
if not gitea_config.is_runtime_switching_enabled():
return {
"success": False,
"message": (
"Runtime profile switching is disabled in this configuration. Static profile mode is active. "
"To enable switching, set 'allow_runtime_switching': true in the config rules."
)
}
config = gitea_config.load_config()
if not config:
return {
"success": False,
"message": "No profiles configuration file is loaded. Switching is not supported in env-only mode."
}
profiles_dict = config.get("profiles") or {}
if profile_name not in profiles_dict:
return {
"success": False,
"message": f"Profile '{profile_name}' not found in loaded config."
}
h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None)
# 1. Record before state
before_profile = get_profile()["profile_name"]
before_identity = _authenticated_username(h) if h else None
# 2. Perform switch
gitea_config._active_profile_override = profile_name
# 3. Clear identity cache to force a fresh verification
if h:
_IDENTITY_CACHE.pop(h, None)
# 4. Resolve fresh identity
after_profile = get_profile()["profile_name"]
after_identity = _authenticated_username(h) if h else None
# 5. Audit the switch if auditing is on
_audit(
"activate_profile",
host=h,
remote=remote,
result={"success": True, "before": before_profile, "after": after_profile},
username=after_identity,
)
return {
"success": True,
"message": f"Successfully activated profile '{profile_name}' (fresh identity verification complete).",
"before_profile": before_profile,
"before_identity": before_identity,
"after_profile": after_profile,
"after_identity": after_identity,
}
@mcp.tool()
def gitea_audit_config() -> dict:
"""Audit the configured profiles/services: enabled state, no secrets.
+11 -5
View File
@@ -75,7 +75,7 @@ def v2_config():
"identities": {
"author": {
"role": "author",
"username": "913443",
"username": "jcwalker3",
"auth": {"type": "keychain",
"id": "mdcps.gitea.author.token"},
"allowed_operations": ["gitea.read"],
@@ -85,7 +85,7 @@ def v2_config():
},
"reviewer": {
"role": "reviewer",
"username": "TBD-second-mdcps-user",
"username": "913443",
"auth": {"type": "keychain",
"id": "mdcps.gitea.reviewer.token"},
"allowed_operations": [
@@ -251,16 +251,22 @@ class TestV2Selectors(_V2Base):
self._load_raises(mutate, "unknown profile")
def test_tbd_username_fails_closed_on_selection(self):
def mutate(cfg):
cfg["environments"]["mdcps"]["services"]["gitea"]["identities"]["reviewer"]["username"] = "TBD-second-mdcps-user"
cfg = v2_config()
mutate(cfg)
self._write(cfg)
with patch.dict(os.environ, self._env("mdcps.gitea.reviewer"), clear=True):
with self.assertRaises(gitea_config.ConfigError) as ctx:
self._resolve("mdcps.gitea.reviewer")
gitea_config.resolve_profile()
msg = str(ctx.exception)
self.assertIn("TBD", msg)
self.assertIn("provision", msg)
# Note: after #107 provisioning, real username "913443" is used in live config and happy-path tests.
def test_tbd_identity_does_not_block_other_identities(self):
# Same file contains the TBD reviewer; author still resolves.
p = self._resolve("mdcps.gitea.author")
self.assertEqual(p["username"], "913443")
self.assertEqual(p["username"], "jcwalker3")
# ---------------------------------------------------------------------------
+86 -1
View File
@@ -48,15 +48,24 @@ class TestCreateIssue(unittest.TestCase):
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_issue(self, _auth, mock_api):
mock_api.return_value = {"number": 1, "html_url": "https://gitea.example.com/issues/1"}
with patch.dict(os.environ, {}, clear=True):
result = gitea_create_issue(title="Test issue", body="body text")
self.assertEqual(result["number"], 1)
self.assertIn("issues/1", result["url"])
self.assertNotIn("url", result)
mock_api.assert_called_once()
# Verify payload
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "POST")
self.assertEqual(call_args[0][3]["title"], "Test issue")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_issue_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = {"number": 1, "html_url": "https://gitea.example.com/issues/1"}
with patch.dict(os.environ, {"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_create_issue(title="Test issue", body="body text")
self.assertIn("issues/1", result["url"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_on_prgs(self, _auth, mock_api):
@@ -77,12 +86,22 @@ class TestCreatePR(unittest.TestCase):
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_pr(self, _auth, mock_api):
mock_api.return_value = {"number": 3, "html_url": "https://example.com/pulls/3"}
with patch.dict(os.environ, {}, clear=True):
result = gitea_create_pr(title="feat: X", head="feat/x", base="main")
self.assertEqual(result["number"], 3)
self.assertNotIn("url", result)
payload = mock_api.call_args[0][3]
self.assertEqual(payload["head"], "feat/x")
self.assertEqual(payload["base"], "main")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_pr_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = {"number": 3, "html_url": "https://example.com/pulls/3"}
with patch.dict(os.environ, {"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_create_pr(title="feat: X", head="feat/x", base="main")
self.assertIn("pulls/3", result["url"])
# ---------------------------------------------------------------------------
# Close Issue
@@ -158,11 +177,25 @@ class TestViewIssue(unittest.TestCase):
"assignee": {"login": "jason"},
"html_url": "https://gitea.prgs.cc/issues/7",
}
with patch.dict(os.environ, {}, clear=True):
result = gitea_view_issue(issue_number=7, remote="prgs")
self.assertEqual(result["number"], 7)
self.assertEqual(result["body"], "Build it")
self.assertEqual(result["labels"], ["important"])
self.assertEqual(result["assignee"], "jason")
self.assertNotIn("url", result)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_view_issue_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = {
"number": 7, "title": "MCP server", "body": "Build it",
"state": "open", "labels": [], "assignee": None,
"html_url": "https://gitea.prgs.cc/issues/7",
}
with patch.dict(os.environ, {"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_view_issue(issue_number=7, remote="prgs")
self.assertIn("issues/7", result["url"])
# ---------------------------------------------------------------------------
@@ -272,10 +305,26 @@ class TestListPRs(unittest.TestCase):
"html_url": "http://url1", "mergeable": True
}
]
with patch.dict(os.environ, {}, clear=True):
result = gitea_list_prs()
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["number"], 1)
self.assertEqual(result[0]["head"], "branch1")
self.assertNotIn("url", result[0])
@patch("mcp_server.api_get_all")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_prs_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = [
{
"number": 1, "title": "PR 1", "state": "open",
"head": {"ref": "branch1"}, "base": {"ref": "main"},
"html_url": "http://url1", "mergeable": True
}
]
with patch.dict(os.environ, {"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_list_prs()
self.assertEqual(result[0]["url"], "http://url1")
# ---------------------------------------------------------------------------
@@ -292,10 +341,25 @@ class TestViewPR(unittest.TestCase):
"html_url": "http://url1", "mergeable": True, "body": "description",
"user": {"login": "user1"}
}
with patch.dict(os.environ, {}, clear=True):
result = gitea_view_pr(pr_number=1)
self.assertEqual(result["number"], 1)
self.assertEqual(result["body"], "description")
self.assertEqual(result["user"], "user1")
self.assertNotIn("url", result)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_view_pr_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = {
"number": 1, "title": "PR 1", "state": "open",
"head": {"ref": "branch1"}, "base": {"ref": "main"},
"html_url": "http://url1", "mergeable": True, "body": "description",
"user": {"login": "user1"}
}
with patch.dict(os.environ, {"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_view_pr(pr_number=1)
self.assertEqual(result["url"], "http://url1")
# ---------------------------------------------------------------------------
@@ -1674,9 +1738,30 @@ class TestTrackerHygieneCleanup(unittest.TestCase):
return {}
self.mock_api.side_effect = api_side_effect
with patch.dict(os.environ, {}, clear=True):
res = gitea_edit_pr(pr_number=1, state="closed")
self.assertTrue(res["success"])
self.assertEqual(res["cleanup_status"].get(123), "released")
self.assertNotIn("url", res)
def test_edit_pr_reveal_opt_in_includes_url(self):
def api_side_effect(method, url, auth, payload=None):
if method == "PATCH" and "pulls/1" in url:
return {
"number": 1,
"title": "My PR",
"state": "open",
"html_url": "http://url1",
"body": "No issue link",
"head": {"ref": "feat/my-branch"}
}
return {}
self.mock_api.side_effect = api_side_effect
with patch.dict(os.environ, {"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
res = gitea_edit_pr(pr_number=1, title="Updated")
self.assertTrue(res["success"])
self.assertEqual(res["url"], "http://url1")
def test_multiple_linked_issues(self):
def api_side_effect(method, url, auth, payload=None):
+246
View File
@@ -0,0 +1,246 @@
"""Tests for the operator guide / project skills MCP tools (#128).
Read-only capability-discovery tools: mcp_get_control_plane_guide,
mcp_list_project_skills, mcp_get_skill_guide. Each is tested by calling the
underlying function directly with mocked API responses.
"""
import json
import os
import sys
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import mcp_server # noqa: E402
from mcp_server import ( # noqa: E402
mcp_get_control_plane_guide,
mcp_list_project_skills,
mcp_get_skill_guide,
)
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
AUTHOR_ENV = {
"GITEA_PROFILE_NAME": "author-test",
"GITEA_ALLOWED_OPERATIONS":
"gitea.read,gitea.repo.commit,gitea.branch.create,"
"gitea.branch.push,gitea.pr.create,gitea.pr.comment",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.approve,gitea.pr.merge",
}
REVIEWER_ENV = {
"GITEA_PROFILE_NAME": "reviewer-test",
"GITEA_ALLOWED_OPERATIONS":
"gitea.read,gitea.pr.review,gitea.pr.comment,gitea.pr.approve,"
"gitea.pr.request_changes,gitea.pr.merge",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.create,gitea.branch.push",
}
EXPECTED_SKILLS = [
"gitea-issue-authoring",
"gitea-pr-creation",
"gitea-pr-review",
"gitea-pr-merge",
"gitea-issue-comments",
"profile-switching",
"redaction-security-review",
"jenkins-readonly",
"glitchtip-readonly",
"release-operator",
]
class GuideTestBase(unittest.TestCase):
def setUp(self):
mcp_server._IDENTITY_CACHE.clear()
def tearDown(self):
mcp_server._IDENTITY_CACHE.clear()
# ---------------------------------------------------------------------------
# mcp_get_control_plane_guide
# ---------------------------------------------------------------------------
class TestControlPlaneGuide(GuideTestBase):
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_author_profile_guidance(self, _auth, _api):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
self.assertTrue(g["read_only"])
self.assertEqual(g["profile"]["role_kind"], "author")
self.assertEqual(g["identity"]["authenticated_username"], "author-bot")
self.assertEqual(g["identity"]["status"], "verified")
blob = " ".join(g["guidance"]).lower()
self.assertIn("forbidden", blob)
self.assertIn("review", blob)
@patch("mcp_server.api_request", return_value={"login": "reviewer-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_reviewer_profile_guidance(self, _auth, _api):
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
self.assertEqual(g["profile"]["role_kind"], "reviewer")
blob = " ".join(g["guidance"]).lower()
self.assertIn("eligibility", blob)
self.assertIn("pinned", blob)
@patch("mcp_server.get_auth_header", return_value=None)
def test_unresolved_identity_instructs_stop(self, _auth):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
self.assertEqual(g["identity"]["status"], "unresolved")
self.assertIsNone(g["identity"]["authenticated_username"])
self.assertIn("STOP", g["identity"]["instruction"])
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_guide_is_read_only(self, _auth, mock_api):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
mcp_get_control_plane_guide(remote="prgs")
for call in mock_api.call_args_list:
self.assertEqual(call[0][0], "GET")
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_no_urls_or_keychain_ids_by_default(self, _auth, _api):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
blob = json.dumps(g)
self.assertNotIn("https://", blob)
self.assertNotIn("http://", blob)
self.assertNotIn("keychain:", blob)
self.assertNotIn(FAKE_AUTH, blob)
self.assertNotIn("server", g["identity"])
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_reveal_opt_in_includes_server(self, _auth, _api):
env = dict(AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
with patch.dict(os.environ, env, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
self.assertIn("gitea.prgs.cc", g["identity"]["server"])
@patch("mcp_server.api_request", return_value={"login": "author-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_rules_cover_required_topics(self, _auth, _api):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
rules = g["rules"]
for key in ("hard_stops", "fail_closed", "head_sha_pinning",
"merge_confirmation", "redaction", "separation",
"profile_switching", "identity_verification"):
self.assertIn(key, rules)
self.assertIn("MERGE PR", json.dumps(rules["merge_confirmation"]))
self.assertTrue(rules["hard_stops"])
def test_unknown_remote_raises(self):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
with self.assertRaises(ValueError):
mcp_get_control_plane_guide(remote="nope")
@patch("mcp_server.api_request", return_value={"login": "new-session-bot"})
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_new_session_can_call_guide_for_operating_model(self, _auth, _api):
"""Covers #129 AC: New LLM sessions can call one guide tool to understand the MCP Control Plane operating model."""
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
g = mcp_get_control_plane_guide(remote="prgs")
self.assertTrue(g["read_only"])
self.assertIn("profile", g)
self.assertIn("identity", g)
self.assertIn("guidance", g)
self.assertIn("rules", g)
self.assertIn("workflows", g)
self.assertEqual(g["skills_tool"], "mcp_list_project_skills")
# ---------------------------------------------------------------------------
# mcp_list_project_skills
# ---------------------------------------------------------------------------
class TestProjectSkills(GuideTestBase):
@patch("mcp_server.api_request")
def test_registry_complete_and_no_api_calls(self, mock_api):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
r = mcp_list_project_skills()
self.assertTrue(r["read_only"])
names = [s["name"] for s in r["skills"]]
for expected in EXPECTED_SKILLS:
self.assertIn(expected, names)
self.assertEqual(r["count"], len(r["skills"]))
for s in r["skills"]:
self.assertTrue(s["description"])
self.assertTrue(s["when_to_use"])
self.assertIn("required_operations", s)
self.assertIn("status", s)
self.assertIn("available_to_current_profile", s)
mock_api.assert_not_called()
def test_profile_aware_availability(self):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
r = mcp_list_project_skills()
by_name = {s["name"]: s for s in r["skills"]}
self.assertTrue(by_name["gitea-pr-creation"]["available_to_current_profile"])
self.assertFalse(by_name["gitea-pr-merge"]["available_to_current_profile"])
def test_unimplemented_services_marked(self):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
r = mcp_list_project_skills()
by_name = {s["name"]: s for s in r["skills"]}
self.assertNotEqual(by_name["jenkins-readonly"]["status"], "available")
self.assertNotEqual(by_name["glitchtip-readonly"]["status"], "available")
def test_no_urls_in_registry(self):
with patch.dict(os.environ, AUTHOR_ENV, clear=True):
r = mcp_list_project_skills()
blob = json.dumps(r)
self.assertNotIn("https://", blob)
self.assertNotIn("http://", blob)
self.assertNotIn("keychain:", blob)
# ---------------------------------------------------------------------------
# mcp_get_skill_guide
# ---------------------------------------------------------------------------
class TestSkillGuide(GuideTestBase):
def test_known_skill_returns_steps(self):
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
r = mcp_get_skill_guide("gitea-pr-merge")
self.assertTrue(r["success"])
self.assertEqual(r["skill"]["name"], "gitea-pr-merge")
self.assertTrue(r["steps"])
self.assertIn("MERGE PR", " ".join(r["steps"]))
def test_case_and_whitespace_normalized(self):
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
r = mcp_get_skill_guide(" Gitea-PR-Merge ")
self.assertTrue(r["success"])
def test_unknown_skill_fails_closed(self):
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
r = mcp_get_skill_guide("no-such-skill")
self.assertFalse(r["success"])
self.assertTrue(r["reasons"])
for expected in EXPECTED_SKILLS:
self.assertIn(expected, r["valid_skills"])
@patch("mcp_server.api_request")
def test_read_only_no_api_calls(self, mock_api):
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
mcp_get_skill_guide("gitea-pr-review")
mock_api.assert_not_called()
def test_no_urls_in_skill_guides(self):
with patch.dict(os.environ, REVIEWER_ENV, clear=True):
for name in EXPECTED_SKILLS:
r = mcp_get_skill_guide(name)
blob = json.dumps(r)
self.assertNotIn("https://", blob)
self.assertNotIn("keychain:", blob)
if __name__ == "__main__":
unittest.main()
+256
View File
@@ -0,0 +1,256 @@
"""Tests for runtime context, profile activation, profile listing, and enhanced error clarity.
Covers Issue #131 requirements.
"""
import os
import sys
import json
import tempfile
import unittest
from unittest.mock import patch, MagicMock
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_config
import gitea_auth
import mcp_server
CONFIG_SWITCHING_DISABLED = {
"version": 2,
"contexts": {
"ctx": {
"enabled": True,
"gitea": {
"enabled": True,
"base_url": "https://gitea.example.com"
}
}
},
"profiles": {
"author-profile": {
"enabled": True,
"context": "ctx",
"role": "author",
"username": "author-user",
"auth": {"type": "env", "name": "GITEA_TOKEN_AUTHOR"},
"allowed_operations": ["gitea.read", "gitea.pr.create", "gitea.branch.push"],
"forbidden_operations": ["gitea.pr.approve", "gitea.pr.merge"],
"execution_profile": "author-profile"
},
"reviewer-profile": {
"enabled": True,
"context": "ctx",
"role": "reviewer",
"username": "reviewer-user",
"auth": {"type": "env", "name": "GITEA_TOKEN_REVIEWER"},
"allowed_operations": ["gitea.read", "gitea.pr.approve", "gitea.pr.merge"],
"forbidden_operations": ["gitea.pr.create", "gitea.branch.push"],
"execution_profile": "reviewer-profile"
}
},
"rules": {
"allow_runtime_switching": False
}
}
CONFIG_SWITCHING_ENABLED = {
**CONFIG_SWITCHING_DISABLED,
"rules": {
"allow_runtime_switching": True
}
}
class TestRuntimeClarity(unittest.TestCase):
def setUp(self):
self._remotes_patch = patch.dict(mcp_server.REMOTES, {
"dadeschools": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"},
"prgs": {"host": "gitea.example.com", "org": "Example-Org", "repo": "Example-Repo"}
})
self._remotes_patch.start()
mcp_server._IDENTITY_CACHE.clear()
gitea_config._active_profile_override = None
self._dir = tempfile.TemporaryDirectory()
self.config_path = os.path.join(self._dir.name, "profiles.json")
self._write_config(CONFIG_SWITCHING_DISABLED)
def tearDown(self):
self._remotes_patch.stop()
mcp_server._IDENTITY_CACHE.clear()
gitea_config._active_profile_override = None
self._dir.cleanup()
def _write_config(self, obj):
with open(self.config_path, "w", encoding="utf-8") as fh:
fh.write(json.dumps(obj))
def _env(self, profile="author-profile", reveal="0"):
return {
"GITEA_MCP_CONFIG": self.config_path,
"GITEA_MCP_PROFILE": profile,
"GITEA_MCP_REVEAL_ENDPOINTS": reveal,
"GITEA_TOKEN_AUTHOR": "author-pass",
"GITEA_TOKEN_REVIEWER": "reviewer-pass",
}
# -------------------------------------------------------------------------
# gitea_get_runtime_context
# -------------------------------------------------------------------------
@patch("mcp_server.api_request", return_value={"login": "author-user"})
@patch("mcp_server.get_auth_header", return_value="token author-pass")
def test_get_runtime_context_author(self, _auth, _api):
with patch.dict(os.environ, self._env("author-profile"), clear=True):
ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools")
self.assertEqual(ctx["active_profile"], "author-profile")
self.assertEqual(ctx["authenticated_username"], "author-user")
self.assertEqual(ctx["config_model"], "v2-contexts")
self.assertEqual(ctx["profile_source"], "config file profile")
self.assertFalse(ctx["runtime_switching_supported"])
self.assertEqual(ctx["profile_mode"], "static-profile")
self.assertFalse(ctx["review_merge_allowed"])
self.assertEqual(ctx["suggested_fix"], "reviewer namespace")
self.assertIn("does not permit review or merge", ctx["review_merge_blocked_reasons"][0])
self.assertIn("Switch to the reviewer MCP session", ctx["safe_next_action"])
@patch("mcp_server.api_request", return_value={"login": "reviewer-user"})
@patch("mcp_server.get_auth_header", return_value="token reviewer-pass")
def test_get_runtime_context_reviewer(self, _auth, _api):
with patch.dict(os.environ, self._env("reviewer-profile"), clear=True):
ctx = mcp_server.gitea_get_runtime_context(remote="dadeschools")
self.assertEqual(ctx["active_profile"], "reviewer-profile")
self.assertEqual(ctx["authenticated_username"], "reviewer-user")
self.assertTrue(ctx["review_merge_allowed"])
self.assertEqual(ctx["suggested_fix"], "none")
self.assertEqual(ctx["safe_next_action"], "None; ready for operations.")
# -------------------------------------------------------------------------
# gitea_list_profiles
# -------------------------------------------------------------------------
@patch("mcp_server.api_request", return_value={"login": "author-user"})
@patch("mcp_server.get_auth_header", return_value="token author-pass")
def test_list_profiles_redacted_by_default(self, _auth, _api):
with patch.dict(os.environ, self._env("author-profile", reveal="0"), clear=True):
res = mcp_server.gitea_list_profiles()
profiles = res["profiles"]
self.assertEqual(len(profiles), 2)
author_prof = next(p for p in profiles if p["name"] == "author-profile")
self.assertTrue(author_prof["is_active"])
self.assertEqual(author_prof["role_kind"], "author")
self.assertEqual(author_prof["auth"]["name"], "<redacted>")
self.assertEqual(author_prof["base_url"], "<redacted>")
self.assertEqual(author_prof["identity_status"], "verified")
reviewer_prof = next(p for p in profiles if p["name"] == "reviewer-profile")
self.assertFalse(reviewer_prof["is_active"])
self.assertEqual(reviewer_prof["role_kind"], "reviewer")
self.assertEqual(reviewer_prof["auth"]["name"], "<redacted>")
self.assertEqual(reviewer_prof["base_url"], "<redacted>")
self.assertEqual(reviewer_prof["identity_status"], "credentials present")
@patch("mcp_server.api_request", return_value={"login": "author-user"})
@patch("mcp_server.get_auth_header", return_value="token author-pass")
def test_list_profiles_revealed_under_opt_in(self, _auth, _api):
with patch.dict(os.environ, self._env("author-profile", reveal="1"), clear=True):
res = mcp_server.gitea_list_profiles()
profiles = res["profiles"]
author_prof = next(p for p in profiles if p["name"] == "author-profile")
self.assertEqual(author_prof["auth"]["name"], "GITEA_TOKEN_AUTHOR")
self.assertEqual(author_prof["base_url"], "https://gitea.example.com")
# -------------------------------------------------------------------------
# gitea_activate_profile
# -------------------------------------------------------------------------
def test_activate_profile_fails_when_disabled(self):
self._write_config(CONFIG_SWITCHING_DISABLED)
with patch.dict(os.environ, self._env("author-profile"), clear=True):
res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile")
self.assertFalse(res["success"])
self.assertIn("switching is disabled", res["message"].lower())
self.assertIsNone(gitea_config._active_profile_override)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header")
def test_activate_profile_succeeds_when_enabled(self, mock_auth, mock_api):
self._write_config(CONFIG_SWITCHING_ENABLED)
# Setup mock responses for whoami checks
mock_auth.side_effect = ["token author-pass", "token reviewer-pass"]
mock_api.side_effect = [{"login": "author-user"}, {"login": "reviewer-user"}]
with patch.dict(os.environ, self._env("author-profile"), clear=True):
# Check before state
self.assertEqual(gitea_config.selected_profile_name(), "author-profile")
res = mcp_server.gitea_activate_profile(profile_name="reviewer-profile")
self.assertTrue(res["success"])
self.assertEqual(res["before_profile"], "author-profile")
self.assertEqual(res["before_identity"], "author-user")
self.assertEqual(res["after_profile"], "reviewer-profile")
self.assertEqual(res["after_identity"], "reviewer-user")
# Global variable override should be set
self.assertEqual(gitea_config._active_profile_override, "reviewer-profile")
self.assertEqual(gitea_config.selected_profile_name(), "reviewer-profile")
# -------------------------------------------------------------------------
# gitea_check_pr_eligibility enhanced error clarity
# -------------------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value="token reviewer-pass")
def test_eligibility_failure_self_author(self, _auth, mock_api):
# PR is authored by "reviewer-user" and reviewer-user is trying to approve it.
mock_api.side_effect = [
{"login": "reviewer-user"}, # user whoami lookup
{"user": {"login": "reviewer-user"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
]
with patch.dict(os.environ, self._env("reviewer-profile"), clear=True):
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
self.assertFalse(res["eligible"])
self.assertEqual(res["active_identity"], "reviewer-user")
self.assertTrue(res["self_author"])
self.assertEqual(res["required_identity"], "Any Gitea user other than PR author 'reviewer-user'")
self.assertIn("Self-review/self-merge is forbidden", res["safe_next_step"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value="token author-pass")
def test_eligibility_failure_missing_permissions(self, _auth, mock_api):
# PR is authored by "someone-else" and author-user (who lacks approve) is trying to approve it.
mock_api.side_effect = [
{"login": "author-user"}, # user whoami lookup
{"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
]
self._write_config(CONFIG_SWITCHING_ENABLED) # Enable switching to verify fixable_by_profile_switch
with patch.dict(os.environ, self._env("author-profile"), clear=True):
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
self.assertFalse(res["eligible"])
self.assertEqual(res["missing_permission"], "gitea.pr.approve")
self.assertTrue(res["fixable_by_profile_switch"])
self.assertFalse(res["requires_different_namespace"])
self.assertIn("Switch to a reviewer profile by calling gitea_activate_profile", res["safe_next_step"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value="token author-pass")
def test_eligibility_failure_missing_permissions_switching_disabled(self, _auth, mock_api):
# PR is authored by "someone-else" and author-user (lacks approve) tries to approve it when switching is disabled.
mock_api.side_effect = [
{"login": "author-user"}, # user whoami lookup
{"user": {"login": "someone-else"}, "state": "open", "head": {"sha": "abc123sha"}, "mergeable": True} # PR details
]
self._write_config(CONFIG_SWITCHING_DISABLED) # Disable switching
with patch.dict(os.environ, self._env("author-profile"), clear=True):
res = mcp_server.gitea_check_pr_eligibility(pr_number=42, action="approve")
self.assertFalse(res["eligible"])
self.assertEqual(res["missing_permission"], "gitea.pr.approve")
self.assertFalse(res["fixable_by_profile_switch"])
self.assertTrue(res["requires_different_namespace"])
self.assertIn("Switch to the reviewer MCP session", res["safe_next_step"])