feat: implement profile activation and runtime identity clarity (#131)

This commit is contained in:
2026-07-04 02:04:07 -04:00
parent b1256d73b4
commit a0e7d3360e
4 changed files with 685 additions and 11 deletions
+388 -11
View File
@@ -597,6 +597,83 @@ def gitea_check_pr_eligibility(
result["eligible"] = len(reasons) == 0
if result["eligible"]:
reasons.append("all eligibility checks passed")
# Add enhanced error clarity details (#131)
auth_user = result["authenticated_user"]
pr_author = result["pr_author"]
profile_name = result["profile_name"]
is_self = bool(auth_user and pr_author and auth_user == pr_author)
result["active_identity"] = auth_user
result["active_profile"] = profile_name
result["self_author"] = is_self
# Determine missing permission
missing_perm = None
if not op_ok:
missing_perm = f"gitea.pr.{action}" if action in ("approve", "request_changes", "merge") else f"gitea.{action}"
result["missing_permission"] = missing_perm
# Determine required profile
req_profile = None
if action in ("approve", "request_changes", "review"):
req_profile = "A profile with reviewer role permissions (allowing approve/merge/review, and forbidding author operations)"
elif action == "merge":
req_profile = "A profile with reviewer role permissions and explicit merge permission"
result["required_profile"] = req_profile
# Determine required identity
req_identity = None
if is_self:
req_identity = f"Any Gitea user other than PR author '{pr_author}'"
result["required_identity"] = req_identity
# Determine if fixable by profile switch vs requires different namespace
switching_supported = gitea_config.is_runtime_switching_enabled()
fixable_by_switch = False
req_different_ns = False
if not op_ok:
config = gitea_config.load_config()
has_capable_profile = False
if config:
for name, p in (config.get("profiles") or {}).items():
p_allowed = p.get("allowed_operations", [])
p_forbidden = p.get("forbidden_operations", [])
ok, _ = gitea_config.check_operation(action, p_allowed, p_forbidden)
if ok:
has_capable_profile = True
break
if switching_supported and has_capable_profile:
fixable_by_switch = True
else:
req_different_ns = True
result["fixable_by_profile_switch"] = fixable_by_switch
result["requires_different_namespace"] = req_different_ns
# Determine safe next step
safe_step = "Ready."
if not result["eligible"]:
if is_self:
safe_step = "Self-review/self-merge is forbidden. Ask a different operator/reviewer to review and merge this PR."
elif not auth_user:
safe_step = f"Ask the operator to configure valid credentials/token for profile '{profile_name}'."
elif not op_ok:
if fixable_by_switch:
safe_step = f"Switch to a reviewer profile by calling gitea_activate_profile with a profile that allows {action}."
else:
safe_step = "Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile."
elif result["pr_state"] != "open":
safe_step = "No action can be taken on a closed/merged PR."
elif action == "merge":
if result["mergeable"] is False:
safe_step = "Address conflicts in the PR branches first, or wait for CI checks to pass before merging."
elif result["mergeable"] is None:
safe_step = "Wait for Gitea to finish calculating mergeability or trigger a re-check."
result["safe_next_step"] = safe_step
return result
@@ -1574,13 +1651,11 @@ _GUIDE_RULES = {
"(gitea.issue.comment) are gated separately from PR reviews "
"(gitea.pr.*)."),
"profile_switching": [
"The active profile comes from GITEA_MCP_PROFILE (with "
"GITEA_MCP_CONFIG pointing at the profiles config).",
"Switching profiles requires changing the launcher environment and "
"reconnecting the MCP server (operator action, e.g. /mcp); the "
"server process cannot switch identities in place.",
"After any switch, verify with gitea_whoami before doing anything "
"else.",
"The active profile is selected via GITEA_MCP_PROFILE (with GITEA_MCP_CONFIG pointing at the config path).",
"By default, static-profile mode is active: changing local environment variables does not dynamically update the already-connected MCP server process.",
"If runtime profile switching is explicitly enabled in the config (allow_runtime_switching: true), use gitea_activate_profile to switch profiles in-place.",
"If switching is disabled, you must change the launcher configuration/environment (dual-namespace) and reconnect/restart the MCP session.",
"After any profile switch or startup, you must verify the new runtime identity with a fresh gitea_whoami call before taking action."
],
"identity_verification": [
"Call gitea_whoami with an explicit remote first; confirm the "
@@ -1703,10 +1778,12 @@ _PROJECT_SKILLS = {
"required_operations": [],
"status": "available",
"steps": [
"Ask the operator to update GITEA_MCP_PROFILE in the launcher "
"environment.",
"Operator reconnects the MCP server (e.g. /mcp).",
"Verify the new identity with gitea_whoami before acting.",
"Check profile mode using gitea_get_runtime_context.",
"If dynamic-profile mode is enabled (allow_runtime_switching: true), "
"call gitea_activate_profile to switch in-place.",
"If static-profile mode (default), ask the operator to update GITEA_MCP_PROFILE "
"in the launcher environment and reconnect/restart the MCP server.",
"Immediately verify the new identity with a fresh gitea_whoami call before taking action.",
"If identity does not match the expected role, STOP.",
],
},
@@ -2115,6 +2192,306 @@ def gitea_get_profile(
return result
@mcp.tool()
def gitea_get_runtime_context(
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Read-only: explicit visibility into active profile, configuration model, and eligibility.
Reports config model shape, profile mode (static vs dynamic), and detailed blocks.
Args:
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
"""
profile = get_profile()
config = gitea_config.load_config()
reveal = _reveal_endpoints()
# Determine config model/version
if config is None:
config_model = "env-only"
elif config.get("version") == 1:
config_model = "v1"
elif config.get("version") == 2:
if "contexts" in config or config.get("shape") == "contexts":
config_model = "v2-contexts"
else:
config_model = "v2-environments"
else:
config_model = f"unknown-version-{config.get('version')}"
# Determine profile source
if os.environ.get("GITEA_PROFILE_NAME"):
profile_source = "env var"
elif gitea_config.selected_profile_name():
profile_source = "config file profile"
else:
profile_source = "default"
h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None)
username = _authenticated_username(h) if h else None
switching = gitea_config.is_runtime_switching_enabled()
profile_mode = "dynamic-profile" if switching else "static-profile"
# Evaluate review/merge eligibility
allowed = profile["allowed_operations"] or []
forbidden = profile["forbidden_operations"] or []
# Check approve
approve_ok, _ = gitea_config.check_operation("approve", allowed, forbidden)
# Check merge
merge_ok, _ = gitea_config.check_operation("merge", allowed, forbidden)
review_merge_allowed = False
blocked_reasons = []
suggested_fix = "none"
if not username:
blocked_reasons.append("Authenticated identity is unresolved. Gitea credentials are missing or invalid.")
suggested_fix = "operator action"
elif not (approve_ok or merge_ok):
blocked_reasons.append(
f"Active profile '{profile['profile_name']}' does not permit review or merge operations."
)
if switching:
suggested_fix = "profile switch"
else:
suggested_fix = "reviewer namespace"
else:
review_merge_allowed = True
# Note that self-review and self-merge are blocked regardless of profile roles
blocked_reasons.append(
"Note: self-review and self-merge are always blocked at runtime for any PR authored by the active user."
)
safe_next_action = "None; ready for operations."
if suggested_fix == "operator action":
safe_next_action = f"Ask the operator to configure valid credentials/token for profile '{profile['profile_name']}'."
elif suggested_fix == "profile switch":
safe_next_action = "Switch to a reviewer profile by calling gitea_activate_profile with a reviewer profile."
elif suggested_fix == "reviewer namespace":
safe_next_action = (
"Switch to the reviewer MCP session (e.g. gitea-reviewer) which has reviewer permissions configured, "
"or ask the operator to update GITEA_MCP_PROFILE to a reviewer profile."
)
result = {
"active_profile": profile["profile_name"],
"authenticated_username": username,
"remote": remote if remote in REMOTES else None,
"config_model": config_model,
"profile_source": profile_source,
"allowed_operations": allowed,
"forbidden_operations": forbidden,
"runtime_switching_supported": switching,
"profile_mode": profile_mode,
"review_merge_allowed": review_merge_allowed,
"review_merge_blocked_reasons": blocked_reasons,
"suggested_fix": suggested_fix,
"safe_next_action": safe_next_action,
}
if reveal and h:
result["server"] = f"https://{h}"
return result
@mcp.tool()
def gitea_list_profiles() -> dict:
"""Read-only: list all Gitea MCP profiles with redacted metadata.
Exposes names, role kinds, enabled state, allowed/forbidden operations, and active status.
Token values, keychain IDs, and raw URLs are hidden unless GITEA_MCP_REVEAL_ENDPOINTS=1 is enabled.
"""
config = gitea_config.load_config()
reveal = _reveal_endpoints()
active_name = gitea_config.selected_profile_name() or get_profile()["profile_name"]
profiles_out = []
if config is None:
# Env-only: return the active profile
p = get_profile()
role = _role_kind(p["allowed_operations"], p["forbidden_operations"])
h = REMOTES.get("dadeschools", {}).get("host") # default host
username = _authenticated_username(h) if h else None
prof = {
"name": p["profile_name"],
"role_kind": role,
"allowed_operations": p["allowed_operations"],
"forbidden_operations": p["forbidden_operations"],
"identity_status": "verified" if username else "unresolved",
"is_active": True,
"enabled": True,
}
if reveal:
prof["token_source_name"] = p["token_source_name"]
prof["base_url"] = p["base_url"]
profiles_out.append(prof)
else:
# Load from config profiles
profiles_dict = config.get("profiles") or {}
unavailable_dict = config.get("unavailable") or {}
audit_only_profiles = config.get("audit_only_profiles") or {}
# First, process available profiles
for name, p in profiles_dict.items():
role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", []))
is_active = (name == active_name)
# Identity status lookup
if is_active:
h = REMOTES.get("dadeschools", {}).get("host") # default host
username = _authenticated_username(h) if h else None
identity_status = "verified" if username else "unresolved"
else:
# Safely resolve if credentials are present without networking
try:
tok = gitea_config.resolve_token(p)
identity_status = "credentials present" if tok else "missing credentials"
except Exception:
identity_status = "missing credentials"
prof = {
"name": name,
"role_kind": role,
"allowed_operations": p.get("allowed_operations", []),
"forbidden_operations": p.get("forbidden_operations", []),
"identity_status": identity_status,
"is_active": is_active,
"enabled": p.get("enabled", True),
}
if reveal:
if isinstance(p.get("auth"), dict):
prof["auth"] = p["auth"]
prof["base_url"] = p.get("base_url")
else:
if isinstance(p.get("auth"), dict):
prof["auth"] = {k: ("<redacted>" if k != "type" else v) for k, v in p["auth"].items()}
prof["base_url"] = "<redacted>"
profiles_out.append(prof)
# Process unavailable/disabled/audit-only profiles
for name, p in audit_only_profiles.items():
role = _role_kind(p.get("allowed_operations", []), p.get("forbidden_operations", []))
is_active = (name == active_name)
prof = {
"name": name,
"role_kind": role,
"allowed_operations": p.get("allowed_operations", []),
"forbidden_operations": p.get("forbidden_operations", []),
"identity_status": "unavailable (disabled)",
"is_active": is_active,
"enabled": p.get("enabled", True),
"unavailable_reason": p.get("_unavailable_reason"),
}
if reveal:
if isinstance(p.get("auth"), dict):
prof["auth"] = p["auth"]
prof["base_url"] = p.get("base_url")
else:
if isinstance(p.get("auth"), dict):
prof["auth"] = {k: ("<redacted>" if k != "type" else v) for k, v in p["auth"].items()}
prof["base_url"] = "<redacted>"
profiles_out.append(prof)
# Handle unavailable mappings that are not in audit_only
for name, reason in unavailable_dict.items():
if not any(x["name"] == name for x in profiles_out):
profiles_out.append({
"name": name,
"role_kind": "limited",
"allowed_operations": [],
"forbidden_operations": [],
"identity_status": "unavailable",
"is_active": (name == active_name),
"enabled": False,
"unavailable_reason": reason,
})
return {"profiles": profiles_out}
@mcp.tool()
def gitea_activate_profile(
profile_name: str,
remote: str = "dadeschools",
host: str | None = None,
) -> dict:
"""Gated profile activation. Switch the active profile in-place for this runtime session.
Only allowed if "allow_runtime_switching": true is explicitly configured.
Args:
profile_name: Profile to activate.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
"""
if not gitea_config.is_runtime_switching_enabled():
return {
"success": False,
"message": (
"Runtime profile switching is disabled in this configuration. Static profile mode is active. "
"To enable switching, set 'allow_runtime_switching': true in the config rules."
)
}
config = gitea_config.load_config()
if not config:
return {
"success": False,
"message": "No profiles configuration file is loaded. Switching is not supported in env-only mode."
}
profiles_dict = config.get("profiles") or {}
if profile_name not in profiles_dict:
return {
"success": False,
"message": f"Profile '{profile_name}' not found in loaded config."
}
h = host or (REMOTES.get(remote, {}).get("host") if remote in REMOTES else None)
# 1. Record before state
before_profile = get_profile()["profile_name"]
before_identity = _authenticated_username(h) if h else None
# 2. Perform switch
gitea_config._active_profile_override = profile_name
# 3. Clear identity cache to force a fresh verification
if h:
_IDENTITY_CACHE.pop(h, None)
# 4. Resolve fresh identity
after_profile = get_profile()["profile_name"]
after_identity = _authenticated_username(h) if h else None
# 5. Audit the switch if auditing is on
_audit(
"activate_profile",
host=h,
remote=remote,
result={"success": True, "before": before_profile, "after": after_profile},
username=after_identity,
)
return {
"success": True,
"message": f"Successfully activated profile '{profile_name}' (fresh identity verification complete).",
"before_profile": before_profile,
"before_identity": before_identity,
"after_profile": after_profile,
"after_identity": after_identity,
}
@mcp.tool()
def gitea_audit_config() -> dict:
"""Audit the configured profiles/services: enabled state, no secrets.