feat: load profiles.json v2 contexts shape with enabled enforcement and LLM-safe output (#120)

Support the canonical contexts-shape version 2 config (contexts / profiles /
projects / rules) alongside the existing environments shape and v1:

- Require a boolean 'enabled' on every context, profile, service, and
  project. Disabled entries are surfaced in audits but fail closed at
  selection/resolution — never a silent fallback to another profile,
  service, or credential source.
- Resolve the active identity from GITEA_MCP_PROFILE via the existing
  select_profile path; profile base_url falls back to the context's enabled
  gitea block.
- Add resolve_service() and project_for_path() for context service and
  project-to-context resolution (internal use; fail closed on disabled).
- get_auth_header now propagates ConfigError when GITEA_MCP_CONFIG is set
  instead of silently degrading to Basic auth.
- Hide endpoint URLs and keychain ids from normal LLM-facing output:
  gitea_whoami / gitea_get_profile report logical names and auth status
  only; new gitea_audit_config tool reports enabled/disabled state and safe
  one-line service summaries. The GITEA_MCP_REVEAL_ENDPOINTS opt-in (and
  'python3 gitea_config.py audit --reveal-endpoints' locally) restores
  endpoints and auth source names for admin diagnostics; token values are
  never printed on any path.
- Ship gitea-mcp.v2-contexts.example.json (synthetic values) and validate
  it in tests.

Implements #120

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
2026-07-03 02:19:39 -04:00
parent fbf1bc5f5c
commit ff920a6496
8 changed files with 1127 additions and 22 deletions
+381 -3
View File
@@ -83,6 +83,12 @@ _MINIMAL_GITEA_OP_MAP = {
"merge": "gitea.pr.merge",
"pr.create": "gitea.pr.create",
"branch.push": "gitea.branch.push",
# Contexts-shape author verbs (#120) — the invariant checks below depend on
# "push"/"open_pr" normalizing to the two author-only ops.
"branch": "gitea.branch.create",
"commit": "gitea.repo.commit",
"push": "gitea.branch.push",
"open_pr": "gitea.pr.create",
}
_REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
@@ -169,7 +175,7 @@ def load_config(path=None):
f"expected one of {list(SUPPORTED_VERSIONS)}"
)
if version == 2:
return _load_v2(data, path)
return _load_v2_any(data, path)
if version != SUPPORTED_VERSION:
raise ConfigError(
f"{path} has unsupported version {version!r}; "
@@ -345,6 +351,363 @@ def _load_v2(data, path):
}
# ── profiles.json version 2 *contexts* shape (#120) ───────────────────────────
# The canonical machine config groups everything by context: top-level
# "contexts" (each with a gitea block and non-Gitea "services"), flat
# "profiles" (Gitea identities pointing at a context), "projects" (local repo
# paths mapped to a context), and "rules". Every context/profile/service/
# project carries a required boolean "enabled": disabled entries are surfaced
# in audits but fail closed at selection — never a silent fallback. Loading
# flattens profiles into the same {"profiles": {...}, "unavailable": {...}}
# model v1 consumers and select_profile() already understand, and carries the
# validated "contexts"/"projects"/"rules" through for service resolution.
def _load_v2_any(data, path):
"""Dispatch a version-2 file to its shape loader; ambiguity fails closed."""
has_contexts = "contexts" in data
has_environments = "environments" in data
if has_contexts and has_environments:
raise ConfigError(
f"{path} version 2 config must not mix 'contexts' and "
"'environments' shapes (ambiguous; fail closed)"
)
if has_contexts:
return _load_v2_contexts(data, path)
return _load_v2(data, path)
def _require_enabled(kind, name, obj):
"""Return the required boolean ``enabled`` flag, failing closed."""
enabled = obj.get("enabled")
if not isinstance(enabled, bool):
raise ConfigError(
f"{kind} '{name}' requires a boolean 'enabled' flag (fail closed)"
)
return enabled
def _reject_inline_secrets(kind, name, obj):
for key in _INLINE_SECRET_KEYS:
if key in obj:
raise ConfigError(
f"{kind} '{name}' must not contain an inline '{key}'; "
"store secrets in the keychain and reference them by id"
)
def _validate_context_service(ctx_name, svc_name, svc):
"""Validate one context service entry (auth reference only, no secrets)."""
addr = f"{ctx_name}.{svc_name}"
if not isinstance(svc, dict):
raise ConfigError(f"service '{addr}' must be a JSON object")
_require_enabled("service", addr, svc)
_reject_inline_secrets("service", addr, svc)
if "auth" in svc:
_validate_auth(addr, svc["auth"])
def _load_v2_contexts(data, path):
"""Validate a v2 contexts-shape config and return the resolvable structure."""
contexts = data.get("contexts")
if not isinstance(contexts, dict) or not contexts:
raise ConfigError(
f"{path} version 2 contexts config requires a non-empty "
"'contexts' object"
)
for ctx_name, ctx in contexts.items():
if not _PROFILE_NAME_RE.match(ctx_name or ""):
raise ConfigError(f"invalid context name {ctx_name!r}")
if not isinstance(ctx, dict):
raise ConfigError(f"context '{ctx_name}' must be a JSON object")
_require_enabled("context", ctx_name, ctx)
gitea = ctx.get("gitea")
if gitea is not None:
if not isinstance(gitea, dict):
raise ConfigError(
f"context '{ctx_name}' has a non-object 'gitea' block")
_require_enabled("service", f"{ctx_name}.gitea", gitea)
_reject_inline_secrets("service", f"{ctx_name}.gitea", gitea)
services = ctx.get("services") or {}
if not isinstance(services, dict):
raise ConfigError(
f"context '{ctx_name}' has a non-object 'services' block")
for svc_name, svc in services.items():
_validate_context_service(ctx_name, svc_name, svc)
raw_profiles = data.get("profiles")
if not isinstance(raw_profiles, dict) or not raw_profiles:
raise ConfigError(
f"{path} version 2 contexts config requires a non-empty "
"'profiles' object"
)
profiles = {}
unavailable = {}
for name, raw in raw_profiles.items():
if not is_valid_profile_name(name):
raise ConfigError(f"invalid profile name {name!r}")
if not isinstance(raw, dict):
raise ConfigError(f"profile '{name}' must be a JSON object")
enabled = _require_enabled("profile", name, raw)
_reject_inline_secrets("profile", name, raw)
_validate_identity_auth(name, raw.get("auth"))
ctx_name = raw.get("context")
if ctx_name not in contexts:
raise ConfigError(
f"profile '{name}' references unknown context {ctx_name!r}")
context = contexts[ctx_name]
allowed = raw.get("allowed_operations") or []
forbidden = raw.get("forbidden_operations") or []
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ConfigError(f"profile '{name}' operation fields must be lists")
allowed_n = {_normalize_op("gitea", op, name) for op in allowed}
forbidden_n = {_normalize_op("gitea", op, name) for op in forbidden}
# Reviewer-identity deadlock rule (#100/#103) applies here unchanged.
if allowed_n & _REVIEW_MERGE_OPS:
missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n)
if missing:
raise ConfigError(
f"profile '{name}' allows PR approve/merge but does not "
f"forbid {missing}; reviewer identities must forbid "
"gitea.pr.create and gitea.branch.push "
"(reviewer-identity deadlock rule)"
)
profile = dict(raw)
profile["allowed_operations"] = sorted(allowed_n)
profile["forbidden_operations"] = sorted(forbidden_n)
gitea = context.get("gitea") or {}
if not profile.get("base_url") and gitea.get("enabled"):
profile["base_url"] = gitea.get("base_url")
username = profile.get("username") or ""
if not enabled:
unavailable[name] = (
f"profile '{name}' is disabled (enabled: false); defined but "
"unavailable for action — refusing, no fallback"
)
elif not context.get("enabled"):
unavailable[name] = (
f"profile '{name}' belongs to context '{ctx_name}' which is "
"disabled (enabled: false); refusing, no fallback"
)
elif not profile.get("base_url"):
unavailable[name] = (
f"profile '{name}' has no usable base_url (none set and the "
f"context '{ctx_name}' gitea service is disabled or has none); "
"fail closed"
)
elif _TBD_RE.match(username):
unavailable[name] = (
f"profile '{name}' username {username!r} is a TBD placeholder; "
"provision the account before use (fail closed)"
)
else:
profiles[name] = profile
continue
# Unavailable profiles keep their (secret-free) body for audits only.
profile["_unavailable_reason"] = unavailable[name]
profiles.setdefault("_audit_only", {})
profiles["_audit_only"][name] = profile
projects = data.get("projects") or {}
if not isinstance(projects, dict):
raise ConfigError(f"{path} 'projects' must be a JSON object")
for proj_path, proj in projects.items():
if not isinstance(proj, dict):
raise ConfigError(f"project '{proj_path}' must be a JSON object")
_require_enabled("project", proj_path, proj)
if proj.get("context") not in contexts:
raise ConfigError(
f"project '{proj_path}' references unknown context "
f"{proj.get('context')!r}"
)
rules = data.get("rules") or {}
if not isinstance(rules, dict):
raise ConfigError(f"{path} 'rules' must be a JSON object")
audit_only = profiles.pop("_audit_only", {})
return {
"version": 2,
"shape": "contexts",
"profiles": profiles,
"unavailable": unavailable,
"audit_only_profiles": audit_only,
"contexts": contexts,
"projects": projects,
"rules": rules,
}
def resolve_service(config, context_name, service_name):
"""Return one context service's config for *internal* MCP use.
The returned dict includes the endpoint base_url and the keychain auth
*reference* — both are for MCP-internal resolution only and must never be
echoed into normal LLM-facing output (see audit_config/service_summaries).
Fails closed on an unknown or disabled context/service; never falls back
to another service.
"""
contexts = (config or {}).get("contexts")
if not isinstance(contexts, dict):
raise ConfigError(
"service resolution requires a version 2 contexts config")
ctx = contexts.get(context_name)
if ctx is None:
raise ConfigError(
f"unknown context '{context_name}' (fail closed, no fallback)")
if not ctx.get("enabled"):
raise ConfigError(
f"context '{context_name}' is disabled; its services are defined "
"but unavailable for action (no fallback)"
)
if service_name == "gitea":
service = ctx.get("gitea")
else:
service = (ctx.get("services") or {}).get(service_name)
if service is None:
raise ConfigError(
f"unknown service '{service_name}' in context '{context_name}' "
"(fail closed, no fallback)"
)
if not service.get("enabled"):
raise ConfigError(
f"service '{context_name}.{service_name}' is disabled; defined "
"but unavailable for action — refusing, no fallback"
)
return dict(service)
def project_for_path(config, path):
"""Map a local project *path* to its context entry, failing closed.
Returns None when the path is not configured (feature off for that repo).
Raises :class:`ConfigError` when the project or its context is disabled —
a configured-but-disabled project must never be acted on.
"""
projects = (config or {}).get("projects") or {}
project = projects.get(path)
if project is None:
return None
if not project.get("enabled"):
raise ConfigError(
f"project '{path}' is disabled (enabled: false); refusing, "
"no fallback"
)
contexts = (config or {}).get("contexts") or {}
ctx = contexts.get(project.get("context")) or {}
if not ctx.get("enabled"):
raise ConfigError(
f"project '{path}' maps to context '{project.get('context')}' "
"which is disabled; refusing, no fallback"
)
return dict(project)
def _audit_profile_entry(name, profile, enabled, reveal_endpoints):
"""One LLM-safe audit row: no endpoint URLs, no keychain ids, no tokens."""
auth = profile.get("auth") if isinstance(profile, dict) else None
entry = {
"name": name,
"enabled": enabled,
"context": profile.get("context") or profile.get("environment"),
"role": profile.get("role"),
"username": profile.get("username"),
"auth": (auth or {}).get("type") if isinstance(auth, dict) else None,
}
reason = profile.get("_unavailable_reason")
if reason:
entry["reason"] = reason
if reveal_endpoints:
entry["base_url"] = profile.get("base_url")
entry["auth_source"] = auth_source_name(profile)
return entry
def audit_config(config, reveal_endpoints=False):
"""Report enabled/disabled profiles and services without secrets.
Default output is LLM-safe: names, contexts, enabled state, capability
labels, and the auth *type* only — never endpoint URLs, keychain ids,
token values, or auth source names. ``reveal_endpoints=True`` is the
explicit admin/debug opt-in for local diagnostics: it adds base URLs and
non-secret auth source names (``keychain:<id>`` / env var name). Token
values are never included on any path.
"""
if config is None:
return {"version": None, "profiles": [], "services": []}
report = {
"version": config.get("version"),
"shape": config.get("shape") or ("environments"
if config.get("aliases") is not None
else "profiles"),
"profiles": [],
"services": [],
}
for name, profile in (config.get("profiles") or {}).items():
if not isinstance(profile, dict):
continue
report["profiles"].append(_audit_profile_entry(
name, profile, True, reveal_endpoints))
for name, profile in (config.get("audit_only_profiles") or {}).items():
report["profiles"].append(_audit_profile_entry(
name, profile, False, reveal_endpoints))
for ctx_name, ctx in (config.get("contexts") or {}).items():
ctx_enabled = bool(ctx.get("enabled"))
for svc_name, svc in (ctx.get("services") or {}).items():
entry = {
"context": ctx_name,
"name": svc_name,
"kind": svc.get("kind"),
"label": svc.get("label"),
"enabled": ctx_enabled and bool(svc.get("enabled")),
"capabilities": list(svc.get("capabilities") or []),
"auth": (svc.get("auth") or {}).get("type"),
}
if reveal_endpoints:
entry["base_url"] = svc.get("base_url")
entry["auth_source"] = auth_source_name(svc)
report["services"].append(entry)
return report
def service_summaries(config, auth_check=None):
"""Safe one-line service summaries for LLM sessions.
Each line reports label + state only (e.g. ``PRGS Jenkins: enabled,
read-only, authenticated`` / ``PRGS Sentry: disabled``) — never endpoint
URLs, keychain ids, or token values. *auth_check* is a callable taking the
service dict and returning True when its credential resolves; it defaults
to a local keychain presence check and its result is reported only as
``authenticated`` / ``no credential``.
"""
if auth_check is None:
def auth_check(service):
auth = service.get("auth") or {}
if auth.get("type") == "keychain":
return _keychain_token(auth.get("id")) is not None
if auth.get("type") == "env":
return bool(os.environ.get(auth.get("name") or ""))
return False
lines = []
for ctx_name, ctx in (config.get("contexts") or {}).items():
ctx_enabled = bool(ctx.get("enabled"))
for svc_name, svc in (ctx.get("services") or {}).items():
label = svc.get("label") or f"{ctx_name} {svc_name}"
if not (ctx_enabled and svc.get("enabled")):
lines.append(f"{label}: disabled")
continue
caps = list(svc.get("capabilities") or [])
cap_part = "read-only" if caps == ["read"] else ", ".join(caps)
auth_part = "authenticated" if auth_check(svc) else "no credential"
parts = ["enabled"] + ([cap_part] if cap_part else []) + [auth_part]
lines.append(f"{label}: " + ", ".join(parts))
return lines
def _validate_auth(name, auth):
"""Validate a profile's optional ``auth`` reference. Never echoes secrets."""
if auth is None:
@@ -534,7 +897,7 @@ def validate_config(config):
elif version == 2:
# v2 validation is all-or-nothing via the loader's invariants.
try:
_load_v2(config, "<config>")
_load_v2_any(config, "<config>")
except ConfigError as exc:
problems.append(str(exc))
return problems
@@ -691,5 +1054,20 @@ if __name__ == "__main__": # pragma: no cover - thin CLI dispatch
if len(sys.argv) > 1 and sys.argv[1] == "menu":
import gitea_config_menu
raise SystemExit(gitea_config_menu.main(sys.argv[2:]))
print("usage: python gitea_config.py menu", file=sys.stderr)
if len(sys.argv) > 1 and sys.argv[1] == "audit":
# Local admin/debug diagnostics (#120). --reveal-endpoints is the
# explicit opt-in that adds base URLs and non-secret auth source
# names; token values are never printed on any path.
try:
config = load_config(config_path() or DEFAULT_CONFIG_PATH)
report = audit_config(
config, reveal_endpoints="--reveal-endpoints" in sys.argv[2:])
report["summaries"] = service_summaries(config)
except ConfigError as exc:
print(f"config error: {exc}", file=sys.stderr)
raise SystemExit(1)
print(json.dumps(report, indent=2))
raise SystemExit(0)
print("usage: python gitea_config.py menu | audit [--reveal-endpoints]",
file=sys.stderr)
raise SystemExit(2)