ff920a6496
Support the canonical contexts-shape version 2 config (contexts / profiles / projects / rules) alongside the existing environments shape and v1: - Require a boolean 'enabled' on every context, profile, service, and project. Disabled entries are surfaced in audits but fail closed at selection/resolution — never a silent fallback to another profile, service, or credential source. - Resolve the active identity from GITEA_MCP_PROFILE via the existing select_profile path; profile base_url falls back to the context's enabled gitea block. - Add resolve_service() and project_for_path() for context service and project-to-context resolution (internal use; fail closed on disabled). - get_auth_header now propagates ConfigError when GITEA_MCP_CONFIG is set instead of silently degrading to Basic auth. - Hide endpoint URLs and keychain ids from normal LLM-facing output: gitea_whoami / gitea_get_profile report logical names and auth status only; new gitea_audit_config tool reports enabled/disabled state and safe one-line service summaries. The GITEA_MCP_REVEAL_ENDPOINTS opt-in (and 'python3 gitea_config.py audit --reveal-endpoints' locally) restores endpoints and auth source names for admin diagnostics; token values are never printed on any path. - Ship gitea-mcp.v2-contexts.example.json (synthetic values) and validate it in tests. Implements #120 Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
1074 lines
43 KiB
Python
1074 lines
43 KiB
Python
"""Canonical JSON runtime-profile configuration for Gitea MCP (issue #19).
|
|
|
|
One canonical config file defines every Gitea runtime profile. Each LLM MCP
|
|
launcher (Claude / Gemini / Codex) stays a *thin* launcher that only points at
|
|
that file and names a profile — no duplicated ``GITEA_USER_*`` / ``GITEA_PASS_*``
|
|
blocks, no raw tokens in client configs:
|
|
|
|
GITEA_MCP_CONFIG=/path/to/profiles.json # the canonical file
|
|
GITEA_MCP_PROFILE=prgs # which named profile to use
|
|
|
|
File shape (see ``gitea-mcp.example.json``)::
|
|
|
|
{
|
|
"version": 1,
|
|
"profiles": {
|
|
"prgs": {
|
|
"base_url": "https://gitea.prgs.cc",
|
|
"username": "jcwalker3",
|
|
"auth": {"type": "keychain", "id": "prgs-gitea-token"},
|
|
"default_owner": "Scaled-Tech-Consulting",
|
|
"execution_profile": "personal-prgs"
|
|
}
|
|
}
|
|
}
|
|
|
|
Auth is referenced *indirectly*, never inline:
|
|
|
|
- ``{"type": "keychain", "id": "prgs-gitea-token"}`` — resolved from the macOS
|
|
keychain at token-resolution time.
|
|
- ``{"type": "env", "name": "GITEA_TOKEN_PRGS"}`` — read from that env var.
|
|
|
|
Design constraints:
|
|
|
|
- **Backwards compatible.** With ``GITEA_MCP_CONFIG`` unset, every entry point
|
|
returns ``None`` and callers fall back to pure environment behaviour.
|
|
- **No inline secrets.** A raw ``token``/``password`` key is rejected. Token
|
|
values are resolved only via an auth *reference* and are never stored in, or
|
|
returned as, profile metadata.
|
|
- **No network.** Parsing only reads and decodes a local file. Token resolution
|
|
(keychain/env) happens separately, on demand.
|
|
- **Fail safely.** A missing file, invalid JSON, unsupported version, unknown/
|
|
unset selected profile, or an unresolvable secret reference raises
|
|
:class:`ConfigError` with a message that never includes file contents,
|
|
tokens, or passwords.
|
|
"""
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import tempfile
|
|
import subprocess
|
|
|
|
ENV_CONFIG_PATH = "GITEA_MCP_CONFIG"
|
|
ENV_PROFILE = "GITEA_MCP_PROFILE"
|
|
|
|
SUPPORTED_VERSION = 1
|
|
SUPPORTED_VERSIONS = (1, 2)
|
|
_AUTH_TYPES = ("keychain", "env")
|
|
|
|
# Profile names go into env vars, keychain ids, and JSON keys — keep them tame.
|
|
_PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
|
|
|
|
# v2 address segments (environment / service / identity) must be dot-free so
|
|
# the dotted profile address {env}.{service}.{identity} stays unambiguous.
|
|
_SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$")
|
|
|
|
# Placeholder usernames must never activate (fail closed until provisioned).
|
|
_TBD_RE = re.compile(r"(?i)^tbd(-|$)")
|
|
|
|
# Keys that would mean an inline secret wherever they appear.
|
|
_INLINE_SECRET_KEYS = ("token", "password", "secret")
|
|
|
|
# ── Minimal operation normalization (#103) ─────────────────────────────────────
|
|
# Only what the #103 invariants need. The full normalization table, deprecation
|
|
# handling, and enforcement test matrix belong to issue #106 — do not grow this
|
|
# beyond invariant safety here.
|
|
_MINIMAL_GITEA_OP_MAP = {
|
|
"read": "gitea.read",
|
|
"review": "gitea.pr.review",
|
|
"comment": "gitea.pr.comment",
|
|
"approve": "gitea.pr.approve",
|
|
"request_changes": "gitea.pr.request_changes",
|
|
"merge": "gitea.pr.merge",
|
|
"pr.create": "gitea.pr.create",
|
|
"branch.push": "gitea.branch.push",
|
|
# Contexts-shape author verbs (#120) — the invariant checks below depend on
|
|
# "push"/"open_pr" normalizing to the two author-only ops.
|
|
"branch": "gitea.branch.create",
|
|
"commit": "gitea.repo.commit",
|
|
"push": "gitea.branch.push",
|
|
"open_pr": "gitea.pr.create",
|
|
}
|
|
_REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
|
|
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
|
|
|
|
|
|
def _normalize_op(service, op, addr):
|
|
"""Normalize *op* for *service*, or fail closed (#103 minimal subset).
|
|
|
|
- already namespaced for this service (``{service}.*``) → unchanged
|
|
- known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP``
|
|
- unqualified single-word ops on non-Gitea services → ``{service}.{op}``
|
|
- anything else (foreign prefixes, unknown unqualified names) → ConfigError
|
|
"""
|
|
if not isinstance(op, str) or not op:
|
|
raise ConfigError(f"identity '{addr}' has an empty or non-string operation")
|
|
if op.startswith(service + "."):
|
|
return op
|
|
if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP:
|
|
return _MINIMAL_GITEA_OP_MAP[op]
|
|
if service != "gitea" and "." not in op:
|
|
return f"{service}.{op}"
|
|
raise ConfigError(
|
|
f"identity '{addr}' has operation {op!r} that cannot be normalized "
|
|
f"safely for service '{service}' (fail closed; full table is issue #106)"
|
|
)
|
|
|
|
# Default canonical config location (one file shared by all LLM launchers).
|
|
DEFAULT_CONFIG_PATH = os.path.join(
|
|
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
|
|
)
|
|
|
|
|
|
class ConfigError(Exception):
|
|
"""Raised for a bad config file, profile selection, or secret reference.
|
|
|
|
Messages are safe to surface: they never include file contents, tokens, or
|
|
passwords — only non-secret names/ids/positions.
|
|
"""
|
|
|
|
|
|
def config_path():
|
|
"""Return the configured JSON path, or None when the JSON layer is off."""
|
|
return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None
|
|
|
|
|
|
def selected_profile_name():
|
|
"""Return the selected profile name from the environment, or None."""
|
|
return (os.environ.get(ENV_PROFILE) or "").strip() or None
|
|
|
|
|
|
def load_config(path=None):
|
|
"""Load and minimally validate the canonical JSON config.
|
|
|
|
Returns the parsed dict, or None when no config path is configured (so
|
|
env-only usage is never broken). Raises :class:`ConfigError` when a path is
|
|
configured but the file is missing, unreadable, not valid JSON, declares an
|
|
unsupported ``version``, or lacks a ``profiles`` object.
|
|
"""
|
|
path = path or config_path()
|
|
if not path:
|
|
return None
|
|
if not os.path.isfile(path):
|
|
raise ConfigError(f"{ENV_CONFIG_PATH} points to a missing file: {path}")
|
|
try:
|
|
with open(path, encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
except json.JSONDecodeError as exc:
|
|
# Report position only (msg/line/col). `from None` suppresses the
|
|
# original exception, whose `.doc` holds the raw file text — which could
|
|
# contain a token — so it never reaches a traceback.
|
|
raise ConfigError(
|
|
f"invalid JSON in {path} (line {exc.lineno}, column {exc.colno}): {exc.msg}"
|
|
) from None
|
|
except OSError as exc:
|
|
raise ConfigError(f"could not read {path}: {exc.strerror}") from None
|
|
if not isinstance(data, dict):
|
|
raise ConfigError(f"{path} must be a JSON object")
|
|
version = data.get("version")
|
|
if version is None:
|
|
# Fail closed (#103): an unversioned config is ambiguous between v1 and
|
|
# v2 shapes, so it is refused rather than guessed.
|
|
raise ConfigError(
|
|
f"{path} is missing the required 'version' field; "
|
|
f"expected one of {list(SUPPORTED_VERSIONS)}"
|
|
)
|
|
if version == 2:
|
|
return _load_v2_any(data, path)
|
|
if version != SUPPORTED_VERSION:
|
|
raise ConfigError(
|
|
f"{path} has unsupported version {version!r}; "
|
|
f"expected one of {list(SUPPORTED_VERSIONS)}"
|
|
)
|
|
if not isinstance(data.get("profiles"), dict):
|
|
raise ConfigError(f"{path} must be a JSON object with a 'profiles' object")
|
|
return data
|
|
|
|
|
|
# ── profiles.json version 2 (#103): environment → service → identity ──────────
|
|
# v2 files are validated and *flattened* at load time into the same
|
|
# {"profiles": {...}} shape v1 consumers already understand, keyed by the
|
|
# canonical dotted address {environment}.{service}.{identity}. Two extra
|
|
# top-level keys are carried: "aliases" (exact-name compatibility selectors)
|
|
# and "unavailable" (addresses that fail closed at selection, e.g. TBD users).
|
|
|
|
def _validate_identity_auth(addr, auth):
|
|
"""Require and validate an identity 'auth' reference. Rejects inline secrets."""
|
|
if auth is None:
|
|
raise ConfigError(f"identity '{addr}' is missing an 'auth' reference")
|
|
if not isinstance(auth, dict):
|
|
raise ConfigError(f"identity '{addr}' has a non-object 'auth'")
|
|
for key in _INLINE_SECRET_KEYS:
|
|
if key in auth:
|
|
raise ConfigError(
|
|
f"identity '{addr}' auth must not contain an inline '{key}'; "
|
|
"store secrets in the keychain and reference them by id"
|
|
)
|
|
_validate_auth(addr, auth)
|
|
|
|
|
|
def _flatten_identity(env_name, svc_name, svc, ident_name, ident):
|
|
"""Validate one v2 identity and return (addr, flattened_profile).
|
|
|
|
The flattened profile is v1-shaped (base_url/auth/username/defaults) plus
|
|
v2 metadata (profile_path, environment, service, identity, role) and
|
|
normalized operation lists. Raises ConfigError on any invariant violation.
|
|
"""
|
|
addr = f"{env_name}.{svc_name}.{ident_name}"
|
|
if not isinstance(ident, dict):
|
|
raise ConfigError(f"identity '{addr}' must be a JSON object")
|
|
for key in _INLINE_SECRET_KEYS:
|
|
if key in ident:
|
|
raise ConfigError(
|
|
f"identity '{addr}' must not contain an inline '{key}'; "
|
|
"use an 'auth' reference instead"
|
|
)
|
|
_validate_identity_auth(addr, ident.get("auth"))
|
|
|
|
base_url = ident.get("base_url") or svc.get("base_url")
|
|
if not base_url:
|
|
raise ConfigError(
|
|
f"identity '{addr}' has no 'base_url' at identity or service level"
|
|
)
|
|
|
|
allowed = ident.get("allowed_operations") or []
|
|
forbidden = ident.get("forbidden_operations") or []
|
|
if not isinstance(allowed, list) or not isinstance(forbidden, list):
|
|
raise ConfigError(f"identity '{addr}' operation fields must be lists")
|
|
allowed_n = {_normalize_op(svc_name, op, addr) for op in allowed}
|
|
forbidden_n = {_normalize_op(svc_name, op, addr) for op in forbidden}
|
|
|
|
# Reviewer-identity deadlock rule (#100/#103): an identity that may approve
|
|
# or merge PRs must explicitly forbid creating PRs and pushing branches,
|
|
# so the reviewer identity can never author the PR it must review.
|
|
if allowed_n & _REVIEW_MERGE_OPS:
|
|
missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n)
|
|
if missing:
|
|
raise ConfigError(
|
|
f"identity '{addr}' allows PR approve/merge but does not forbid "
|
|
f"{missing}; reviewer identities must forbid gitea.pr.create and "
|
|
"gitea.branch.push (reviewer-identity deadlock rule)"
|
|
)
|
|
|
|
profile = {
|
|
"profile_path": addr,
|
|
"environment": env_name,
|
|
"service": svc_name,
|
|
"identity": ident_name,
|
|
"base_url": base_url,
|
|
"auth": ident["auth"],
|
|
"allowed_operations": sorted(allowed_n),
|
|
"forbidden_operations": sorted(forbidden_n),
|
|
}
|
|
# Service-level defaults inherit unless the identity overrides them.
|
|
for key in ("default_owner", "default_repo", "default_org"):
|
|
value = ident.get(key, svc.get(key))
|
|
if value:
|
|
profile[key] = value
|
|
for key in ("role", "username", "execution_profile", "audit_label"):
|
|
if ident.get(key):
|
|
profile[key] = ident[key]
|
|
return addr, profile
|
|
|
|
|
|
def _load_v2(data, path):
|
|
"""Validate a v2 config and return the flattened, resolvable structure."""
|
|
environments = data.get("environments")
|
|
if not isinstance(environments, dict) or not environments:
|
|
raise ConfigError(
|
|
f"{path} version 2 config requires a non-empty 'environments' object"
|
|
)
|
|
profiles = {}
|
|
unavailable = {}
|
|
for env_name, env in environments.items():
|
|
if not _SEGMENT_RE.match(env_name or ""):
|
|
raise ConfigError(f"invalid environment name {env_name!r} (no dots)")
|
|
if not isinstance(env, dict):
|
|
raise ConfigError(f"environment '{env_name}' must be a JSON object")
|
|
services = env.get("services")
|
|
if not isinstance(services, dict) or not services:
|
|
raise ConfigError(
|
|
f"environment '{env_name}' requires a non-empty 'services' object"
|
|
)
|
|
for svc_name, svc in services.items():
|
|
if not _SEGMENT_RE.match(svc_name or ""):
|
|
raise ConfigError(
|
|
f"invalid service name {svc_name!r} in '{env_name}' (no dots)"
|
|
)
|
|
if not isinstance(svc, dict):
|
|
raise ConfigError(
|
|
f"service '{env_name}.{svc_name}' must be a JSON object"
|
|
)
|
|
identities = svc.get("identities")
|
|
if not isinstance(identities, dict) or not identities:
|
|
raise ConfigError(
|
|
f"service '{env_name}.{svc_name}' requires a non-empty "
|
|
"'identities' object"
|
|
)
|
|
for ident_name, ident in identities.items():
|
|
if not _SEGMENT_RE.match(ident_name or ""):
|
|
raise ConfigError(
|
|
f"invalid identity name {ident_name!r} in "
|
|
f"'{env_name}.{svc_name}' (no dots)"
|
|
)
|
|
addr, profile = _flatten_identity(
|
|
env_name, svc_name, svc, ident_name, ident
|
|
)
|
|
username = profile.get("username") or ""
|
|
if _TBD_RE.match(username):
|
|
# Fail closed at selection, without blocking every other
|
|
# identity in the file (see #103 acceptance criteria).
|
|
unavailable[addr] = (
|
|
f"identity '{addr}' username {username!r} is a TBD "
|
|
"placeholder; provision the account before use "
|
|
"(fail closed)"
|
|
)
|
|
else:
|
|
profiles[addr] = profile
|
|
|
|
aliases = data.get("aliases") or {}
|
|
if not isinstance(aliases, dict):
|
|
raise ConfigError(f"{path} 'aliases' must be a JSON object")
|
|
known = set(profiles) | set(unavailable)
|
|
for alias, target in aliases.items():
|
|
if not isinstance(target, str) or not target:
|
|
raise ConfigError(f"alias '{alias}' target must be a non-empty string")
|
|
if alias in known and alias != target:
|
|
raise ConfigError(
|
|
f"selector '{alias}' is both an alias and a profile address "
|
|
"with a different target (conflicting selector; fail closed)"
|
|
)
|
|
if target not in known:
|
|
raise ConfigError(
|
|
f"alias '{alias}' points to unknown profile '{target}'"
|
|
)
|
|
return {
|
|
"version": 2,
|
|
"profiles": profiles,
|
|
"aliases": dict(aliases),
|
|
"unavailable": unavailable,
|
|
}
|
|
|
|
|
|
# ── profiles.json version 2 *contexts* shape (#120) ───────────────────────────
|
|
# The canonical machine config groups everything by context: top-level
|
|
# "contexts" (each with a gitea block and non-Gitea "services"), flat
|
|
# "profiles" (Gitea identities pointing at a context), "projects" (local repo
|
|
# paths mapped to a context), and "rules". Every context/profile/service/
|
|
# project carries a required boolean "enabled": disabled entries are surfaced
|
|
# in audits but fail closed at selection — never a silent fallback. Loading
|
|
# flattens profiles into the same {"profiles": {...}, "unavailable": {...}}
|
|
# model v1 consumers and select_profile() already understand, and carries the
|
|
# validated "contexts"/"projects"/"rules" through for service resolution.
|
|
|
|
def _load_v2_any(data, path):
|
|
"""Dispatch a version-2 file to its shape loader; ambiguity fails closed."""
|
|
has_contexts = "contexts" in data
|
|
has_environments = "environments" in data
|
|
if has_contexts and has_environments:
|
|
raise ConfigError(
|
|
f"{path} version 2 config must not mix 'contexts' and "
|
|
"'environments' shapes (ambiguous; fail closed)"
|
|
)
|
|
if has_contexts:
|
|
return _load_v2_contexts(data, path)
|
|
return _load_v2(data, path)
|
|
|
|
|
|
def _require_enabled(kind, name, obj):
|
|
"""Return the required boolean ``enabled`` flag, failing closed."""
|
|
enabled = obj.get("enabled")
|
|
if not isinstance(enabled, bool):
|
|
raise ConfigError(
|
|
f"{kind} '{name}' requires a boolean 'enabled' flag (fail closed)"
|
|
)
|
|
return enabled
|
|
|
|
|
|
def _reject_inline_secrets(kind, name, obj):
|
|
for key in _INLINE_SECRET_KEYS:
|
|
if key in obj:
|
|
raise ConfigError(
|
|
f"{kind} '{name}' must not contain an inline '{key}'; "
|
|
"store secrets in the keychain and reference them by id"
|
|
)
|
|
|
|
|
|
def _validate_context_service(ctx_name, svc_name, svc):
|
|
"""Validate one context service entry (auth reference only, no secrets)."""
|
|
addr = f"{ctx_name}.{svc_name}"
|
|
if not isinstance(svc, dict):
|
|
raise ConfigError(f"service '{addr}' must be a JSON object")
|
|
_require_enabled("service", addr, svc)
|
|
_reject_inline_secrets("service", addr, svc)
|
|
if "auth" in svc:
|
|
_validate_auth(addr, svc["auth"])
|
|
|
|
|
|
def _load_v2_contexts(data, path):
|
|
"""Validate a v2 contexts-shape config and return the resolvable structure."""
|
|
contexts = data.get("contexts")
|
|
if not isinstance(contexts, dict) or not contexts:
|
|
raise ConfigError(
|
|
f"{path} version 2 contexts config requires a non-empty "
|
|
"'contexts' object"
|
|
)
|
|
for ctx_name, ctx in contexts.items():
|
|
if not _PROFILE_NAME_RE.match(ctx_name or ""):
|
|
raise ConfigError(f"invalid context name {ctx_name!r}")
|
|
if not isinstance(ctx, dict):
|
|
raise ConfigError(f"context '{ctx_name}' must be a JSON object")
|
|
_require_enabled("context", ctx_name, ctx)
|
|
gitea = ctx.get("gitea")
|
|
if gitea is not None:
|
|
if not isinstance(gitea, dict):
|
|
raise ConfigError(
|
|
f"context '{ctx_name}' has a non-object 'gitea' block")
|
|
_require_enabled("service", f"{ctx_name}.gitea", gitea)
|
|
_reject_inline_secrets("service", f"{ctx_name}.gitea", gitea)
|
|
services = ctx.get("services") or {}
|
|
if not isinstance(services, dict):
|
|
raise ConfigError(
|
|
f"context '{ctx_name}' has a non-object 'services' block")
|
|
for svc_name, svc in services.items():
|
|
_validate_context_service(ctx_name, svc_name, svc)
|
|
|
|
raw_profiles = data.get("profiles")
|
|
if not isinstance(raw_profiles, dict) or not raw_profiles:
|
|
raise ConfigError(
|
|
f"{path} version 2 contexts config requires a non-empty "
|
|
"'profiles' object"
|
|
)
|
|
profiles = {}
|
|
unavailable = {}
|
|
for name, raw in raw_profiles.items():
|
|
if not is_valid_profile_name(name):
|
|
raise ConfigError(f"invalid profile name {name!r}")
|
|
if not isinstance(raw, dict):
|
|
raise ConfigError(f"profile '{name}' must be a JSON object")
|
|
enabled = _require_enabled("profile", name, raw)
|
|
_reject_inline_secrets("profile", name, raw)
|
|
_validate_identity_auth(name, raw.get("auth"))
|
|
ctx_name = raw.get("context")
|
|
if ctx_name not in contexts:
|
|
raise ConfigError(
|
|
f"profile '{name}' references unknown context {ctx_name!r}")
|
|
context = contexts[ctx_name]
|
|
|
|
allowed = raw.get("allowed_operations") or []
|
|
forbidden = raw.get("forbidden_operations") or []
|
|
if not isinstance(allowed, list) or not isinstance(forbidden, list):
|
|
raise ConfigError(f"profile '{name}' operation fields must be lists")
|
|
allowed_n = {_normalize_op("gitea", op, name) for op in allowed}
|
|
forbidden_n = {_normalize_op("gitea", op, name) for op in forbidden}
|
|
# Reviewer-identity deadlock rule (#100/#103) applies here unchanged.
|
|
if allowed_n & _REVIEW_MERGE_OPS:
|
|
missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n)
|
|
if missing:
|
|
raise ConfigError(
|
|
f"profile '{name}' allows PR approve/merge but does not "
|
|
f"forbid {missing}; reviewer identities must forbid "
|
|
"gitea.pr.create and gitea.branch.push "
|
|
"(reviewer-identity deadlock rule)"
|
|
)
|
|
|
|
profile = dict(raw)
|
|
profile["allowed_operations"] = sorted(allowed_n)
|
|
profile["forbidden_operations"] = sorted(forbidden_n)
|
|
gitea = context.get("gitea") or {}
|
|
if not profile.get("base_url") and gitea.get("enabled"):
|
|
profile["base_url"] = gitea.get("base_url")
|
|
|
|
username = profile.get("username") or ""
|
|
if not enabled:
|
|
unavailable[name] = (
|
|
f"profile '{name}' is disabled (enabled: false); defined but "
|
|
"unavailable for action — refusing, no fallback"
|
|
)
|
|
elif not context.get("enabled"):
|
|
unavailable[name] = (
|
|
f"profile '{name}' belongs to context '{ctx_name}' which is "
|
|
"disabled (enabled: false); refusing, no fallback"
|
|
)
|
|
elif not profile.get("base_url"):
|
|
unavailable[name] = (
|
|
f"profile '{name}' has no usable base_url (none set and the "
|
|
f"context '{ctx_name}' gitea service is disabled or has none); "
|
|
"fail closed"
|
|
)
|
|
elif _TBD_RE.match(username):
|
|
unavailable[name] = (
|
|
f"profile '{name}' username {username!r} is a TBD placeholder; "
|
|
"provision the account before use (fail closed)"
|
|
)
|
|
else:
|
|
profiles[name] = profile
|
|
continue
|
|
# Unavailable profiles keep their (secret-free) body for audits only.
|
|
profile["_unavailable_reason"] = unavailable[name]
|
|
profiles.setdefault("_audit_only", {})
|
|
profiles["_audit_only"][name] = profile
|
|
|
|
projects = data.get("projects") or {}
|
|
if not isinstance(projects, dict):
|
|
raise ConfigError(f"{path} 'projects' must be a JSON object")
|
|
for proj_path, proj in projects.items():
|
|
if not isinstance(proj, dict):
|
|
raise ConfigError(f"project '{proj_path}' must be a JSON object")
|
|
_require_enabled("project", proj_path, proj)
|
|
if proj.get("context") not in contexts:
|
|
raise ConfigError(
|
|
f"project '{proj_path}' references unknown context "
|
|
f"{proj.get('context')!r}"
|
|
)
|
|
|
|
rules = data.get("rules") or {}
|
|
if not isinstance(rules, dict):
|
|
raise ConfigError(f"{path} 'rules' must be a JSON object")
|
|
|
|
audit_only = profiles.pop("_audit_only", {})
|
|
return {
|
|
"version": 2,
|
|
"shape": "contexts",
|
|
"profiles": profiles,
|
|
"unavailable": unavailable,
|
|
"audit_only_profiles": audit_only,
|
|
"contexts": contexts,
|
|
"projects": projects,
|
|
"rules": rules,
|
|
}
|
|
|
|
|
|
def resolve_service(config, context_name, service_name):
|
|
"""Return one context service's config for *internal* MCP use.
|
|
|
|
The returned dict includes the endpoint base_url and the keychain auth
|
|
*reference* — both are for MCP-internal resolution only and must never be
|
|
echoed into normal LLM-facing output (see audit_config/service_summaries).
|
|
Fails closed on an unknown or disabled context/service; never falls back
|
|
to another service.
|
|
"""
|
|
contexts = (config or {}).get("contexts")
|
|
if not isinstance(contexts, dict):
|
|
raise ConfigError(
|
|
"service resolution requires a version 2 contexts config")
|
|
ctx = contexts.get(context_name)
|
|
if ctx is None:
|
|
raise ConfigError(
|
|
f"unknown context '{context_name}' (fail closed, no fallback)")
|
|
if not ctx.get("enabled"):
|
|
raise ConfigError(
|
|
f"context '{context_name}' is disabled; its services are defined "
|
|
"but unavailable for action (no fallback)"
|
|
)
|
|
if service_name == "gitea":
|
|
service = ctx.get("gitea")
|
|
else:
|
|
service = (ctx.get("services") or {}).get(service_name)
|
|
if service is None:
|
|
raise ConfigError(
|
|
f"unknown service '{service_name}' in context '{context_name}' "
|
|
"(fail closed, no fallback)"
|
|
)
|
|
if not service.get("enabled"):
|
|
raise ConfigError(
|
|
f"service '{context_name}.{service_name}' is disabled; defined "
|
|
"but unavailable for action — refusing, no fallback"
|
|
)
|
|
return dict(service)
|
|
|
|
|
|
def project_for_path(config, path):
|
|
"""Map a local project *path* to its context entry, failing closed.
|
|
|
|
Returns None when the path is not configured (feature off for that repo).
|
|
Raises :class:`ConfigError` when the project or its context is disabled —
|
|
a configured-but-disabled project must never be acted on.
|
|
"""
|
|
projects = (config or {}).get("projects") or {}
|
|
project = projects.get(path)
|
|
if project is None:
|
|
return None
|
|
if not project.get("enabled"):
|
|
raise ConfigError(
|
|
f"project '{path}' is disabled (enabled: false); refusing, "
|
|
"no fallback"
|
|
)
|
|
contexts = (config or {}).get("contexts") or {}
|
|
ctx = contexts.get(project.get("context")) or {}
|
|
if not ctx.get("enabled"):
|
|
raise ConfigError(
|
|
f"project '{path}' maps to context '{project.get('context')}' "
|
|
"which is disabled; refusing, no fallback"
|
|
)
|
|
return dict(project)
|
|
|
|
|
|
def _audit_profile_entry(name, profile, enabled, reveal_endpoints):
|
|
"""One LLM-safe audit row: no endpoint URLs, no keychain ids, no tokens."""
|
|
auth = profile.get("auth") if isinstance(profile, dict) else None
|
|
entry = {
|
|
"name": name,
|
|
"enabled": enabled,
|
|
"context": profile.get("context") or profile.get("environment"),
|
|
"role": profile.get("role"),
|
|
"username": profile.get("username"),
|
|
"auth": (auth or {}).get("type") if isinstance(auth, dict) else None,
|
|
}
|
|
reason = profile.get("_unavailable_reason")
|
|
if reason:
|
|
entry["reason"] = reason
|
|
if reveal_endpoints:
|
|
entry["base_url"] = profile.get("base_url")
|
|
entry["auth_source"] = auth_source_name(profile)
|
|
return entry
|
|
|
|
|
|
def audit_config(config, reveal_endpoints=False):
|
|
"""Report enabled/disabled profiles and services without secrets.
|
|
|
|
Default output is LLM-safe: names, contexts, enabled state, capability
|
|
labels, and the auth *type* only — never endpoint URLs, keychain ids,
|
|
token values, or auth source names. ``reveal_endpoints=True`` is the
|
|
explicit admin/debug opt-in for local diagnostics: it adds base URLs and
|
|
non-secret auth source names (``keychain:<id>`` / env var name). Token
|
|
values are never included on any path.
|
|
"""
|
|
if config is None:
|
|
return {"version": None, "profiles": [], "services": []}
|
|
report = {
|
|
"version": config.get("version"),
|
|
"shape": config.get("shape") or ("environments"
|
|
if config.get("aliases") is not None
|
|
else "profiles"),
|
|
"profiles": [],
|
|
"services": [],
|
|
}
|
|
for name, profile in (config.get("profiles") or {}).items():
|
|
if not isinstance(profile, dict):
|
|
continue
|
|
report["profiles"].append(_audit_profile_entry(
|
|
name, profile, True, reveal_endpoints))
|
|
for name, profile in (config.get("audit_only_profiles") or {}).items():
|
|
report["profiles"].append(_audit_profile_entry(
|
|
name, profile, False, reveal_endpoints))
|
|
|
|
for ctx_name, ctx in (config.get("contexts") or {}).items():
|
|
ctx_enabled = bool(ctx.get("enabled"))
|
|
for svc_name, svc in (ctx.get("services") or {}).items():
|
|
entry = {
|
|
"context": ctx_name,
|
|
"name": svc_name,
|
|
"kind": svc.get("kind"),
|
|
"label": svc.get("label"),
|
|
"enabled": ctx_enabled and bool(svc.get("enabled")),
|
|
"capabilities": list(svc.get("capabilities") or []),
|
|
"auth": (svc.get("auth") or {}).get("type"),
|
|
}
|
|
if reveal_endpoints:
|
|
entry["base_url"] = svc.get("base_url")
|
|
entry["auth_source"] = auth_source_name(svc)
|
|
report["services"].append(entry)
|
|
return report
|
|
|
|
|
|
def service_summaries(config, auth_check=None):
|
|
"""Safe one-line service summaries for LLM sessions.
|
|
|
|
Each line reports label + state only (e.g. ``PRGS Jenkins: enabled,
|
|
read-only, authenticated`` / ``PRGS Sentry: disabled``) — never endpoint
|
|
URLs, keychain ids, or token values. *auth_check* is a callable taking the
|
|
service dict and returning True when its credential resolves; it defaults
|
|
to a local keychain presence check and its result is reported only as
|
|
``authenticated`` / ``no credential``.
|
|
"""
|
|
if auth_check is None:
|
|
def auth_check(service):
|
|
auth = service.get("auth") or {}
|
|
if auth.get("type") == "keychain":
|
|
return _keychain_token(auth.get("id")) is not None
|
|
if auth.get("type") == "env":
|
|
return bool(os.environ.get(auth.get("name") or ""))
|
|
return False
|
|
|
|
lines = []
|
|
for ctx_name, ctx in (config.get("contexts") or {}).items():
|
|
ctx_enabled = bool(ctx.get("enabled"))
|
|
for svc_name, svc in (ctx.get("services") or {}).items():
|
|
label = svc.get("label") or f"{ctx_name} {svc_name}"
|
|
if not (ctx_enabled and svc.get("enabled")):
|
|
lines.append(f"{label}: disabled")
|
|
continue
|
|
caps = list(svc.get("capabilities") or [])
|
|
cap_part = "read-only" if caps == ["read"] else ", ".join(caps)
|
|
auth_part = "authenticated" if auth_check(svc) else "no credential"
|
|
parts = ["enabled"] + ([cap_part] if cap_part else []) + [auth_part]
|
|
lines.append(f"{label}: " + ", ".join(parts))
|
|
return lines
|
|
|
|
|
|
def _validate_auth(name, auth):
|
|
"""Validate a profile's optional ``auth`` reference. Never echoes secrets."""
|
|
if auth is None:
|
|
return
|
|
if not isinstance(auth, dict):
|
|
raise ConfigError(f"profile '{name}' has a non-object 'auth'")
|
|
atype = auth.get("type")
|
|
if atype not in _AUTH_TYPES:
|
|
raise ConfigError(
|
|
f"profile '{name}' has invalid auth type {atype!r}; "
|
|
f"expected one of {list(_AUTH_TYPES)}"
|
|
)
|
|
if atype == "keychain" and not auth.get("id"):
|
|
raise ConfigError(f"profile '{name}' keychain auth requires an 'id'")
|
|
if atype == "env" and not auth.get("name"):
|
|
raise ConfigError(f"profile '{name}' env auth requires a 'name'")
|
|
|
|
|
|
def select_profile(config, name=None):
|
|
"""Return the selected profile dict from a loaded *config*.
|
|
|
|
Returns None when *config* is None. Raises :class:`ConfigError` when no
|
|
profile is selected, the selected profile is unknown or not an object, the
|
|
profile embeds a raw ``token``/``password``, or its ``auth`` reference is
|
|
malformed.
|
|
"""
|
|
if config is None:
|
|
return None
|
|
profiles = config.get("profiles", {})
|
|
aliases = config.get("aliases") or {}
|
|
unavailable = config.get("unavailable") or {}
|
|
name = name or selected_profile_name()
|
|
available = sorted(set(profiles) | set(aliases))
|
|
if not name:
|
|
raise ConfigError(
|
|
f"{ENV_CONFIG_PATH} is set but {ENV_PROFILE} is not; "
|
|
f"available profiles: {available}"
|
|
)
|
|
# Strict resolution order (#103): exact alias → exact profile address →
|
|
# fail closed. No fuzzy matching, no partial matches, no defaults.
|
|
resolved = aliases.get(name, name)
|
|
if resolved in unavailable:
|
|
raise ConfigError(unavailable[resolved])
|
|
if resolved not in profiles:
|
|
raise ConfigError(
|
|
f"profile '{name}' not found in config; available profiles: {available}"
|
|
)
|
|
profile = profiles[resolved]
|
|
if not isinstance(profile, dict):
|
|
raise ConfigError(f"profile '{name}' must be a JSON object")
|
|
for secret_key in ("token", "password"):
|
|
if secret_key in profile:
|
|
# Never accept (or echo) an inline secret; require an auth reference.
|
|
raise ConfigError(
|
|
f"profile '{name}' must not contain an inline '{secret_key}'; "
|
|
"use an 'auth' reference ({\"type\": \"keychain\"|\"env\", ...}) instead"
|
|
)
|
|
_validate_auth(name, profile.get("auth"))
|
|
return profile
|
|
|
|
|
|
def resolve_profile(path=None, name=None):
|
|
"""Load the config and return the selected profile dict, or None if off."""
|
|
return select_profile(load_config(path), name)
|
|
|
|
|
|
def auth_source_name(profile):
|
|
"""Return a *non-secret* name for a profile's token source, or None.
|
|
|
|
For env auth this is the env var name; for keychain auth, ``keychain:<id>``.
|
|
Safe to surface in profile metadata (never the token value).
|
|
"""
|
|
if not profile:
|
|
return None
|
|
auth = profile.get("auth")
|
|
if not isinstance(auth, dict):
|
|
return None
|
|
if auth.get("type") == "env":
|
|
return auth.get("name")
|
|
if auth.get("type") == "keychain":
|
|
return f"keychain:{auth.get('id')}"
|
|
return None
|
|
|
|
|
|
def _keychain_token(item_id):
|
|
"""Read a token from the macOS keychain by service *item_id*.
|
|
|
|
Returns the secret string, or None if it cannot be found. Never logs the
|
|
value; failures are swallowed so the caller can raise a safe error.
|
|
"""
|
|
try:
|
|
proc = subprocess.run(
|
|
["security", "find-generic-password", "-s", item_id, "-w"],
|
|
capture_output=True, text=True,
|
|
)
|
|
except (OSError, subprocess.SubprocessError):
|
|
return None
|
|
if proc.returncode != 0:
|
|
return None
|
|
return proc.stdout.strip() or None
|
|
|
|
|
|
def resolve_token(profile, keychain_lookup=_keychain_token):
|
|
"""Resolve the token for *profile* via its ``auth`` reference.
|
|
|
|
Returns None when *profile* is None or has no ``auth``. Raises
|
|
:class:`ConfigError` when the reference cannot be resolved (env var unset or
|
|
keychain item missing). Never accepts an inline token and never logs, echoes,
|
|
or includes the secret *value* in any error. *keychain_lookup* is injectable
|
|
for testing.
|
|
"""
|
|
if not profile:
|
|
return None
|
|
auth = profile.get("auth")
|
|
if not isinstance(auth, dict):
|
|
return None
|
|
atype = auth.get("type")
|
|
if atype == "env":
|
|
env_name = auth.get("name")
|
|
value = os.environ.get(env_name) if env_name else None
|
|
if not value:
|
|
raise ConfigError(
|
|
f"auth env var '{env_name}' referenced by the profile is not set"
|
|
)
|
|
return value
|
|
if atype == "keychain":
|
|
item_id = auth.get("id")
|
|
value = keychain_lookup(item_id) if item_id else None
|
|
if not value:
|
|
raise ConfigError(
|
|
f"keychain item '{item_id}' referenced by the profile was not found"
|
|
)
|
|
return value
|
|
raise ConfigError(f"unsupported auth type {atype!r} in the selected profile")
|
|
|
|
|
|
# ── Config authoring helpers (used by the interactive menu; pure + testable) ────
|
|
|
|
def is_valid_profile_name(name):
|
|
"""True if *name* is a safe profile key (alnum, dot, dash, underscore)."""
|
|
return bool(name) and bool(_PROFILE_NAME_RE.match(name))
|
|
|
|
|
|
def keychain_auth(item_id):
|
|
"""Build a keychain auth reference."""
|
|
return {"type": "keychain", "id": item_id}
|
|
|
|
|
|
def env_auth(var_name):
|
|
"""Build an env auth reference."""
|
|
return {"type": "env", "name": var_name}
|
|
|
|
|
|
def build_profile(*, base_url, auth, username=None, default_owner=None,
|
|
default_repo=None, execution_profile=None):
|
|
"""Assemble a profile dict, omitting empty optional fields. No secrets."""
|
|
profile = {"base_url": base_url, "auth": auth}
|
|
if username:
|
|
profile["username"] = username
|
|
if default_owner:
|
|
profile["default_owner"] = default_owner
|
|
if default_repo:
|
|
profile["default_repo"] = default_repo
|
|
if execution_profile:
|
|
profile["execution_profile"] = execution_profile
|
|
return profile
|
|
|
|
|
|
def empty_config():
|
|
"""Return a fresh, valid canonical config with no profiles."""
|
|
return {"version": SUPPORTED_VERSION, "profiles": {}}
|
|
|
|
|
|
def validate_config(config):
|
|
"""Return a list of human-readable problems with *config* (empty = valid).
|
|
|
|
Never includes secret material — profiles carry only auth *references*.
|
|
"""
|
|
problems = []
|
|
if not isinstance(config, dict):
|
|
return ["config is not a JSON object"]
|
|
version = config.get("version")
|
|
if version is None:
|
|
problems.append(
|
|
f"missing required 'version' (expected one of {list(SUPPORTED_VERSIONS)})"
|
|
)
|
|
elif version == 2:
|
|
# v2 validation is all-or-nothing via the loader's invariants.
|
|
try:
|
|
_load_v2_any(config, "<config>")
|
|
except ConfigError as exc:
|
|
problems.append(str(exc))
|
|
return problems
|
|
elif version != SUPPORTED_VERSION:
|
|
problems.append(
|
|
f"unsupported version {version!r} (expected one of {list(SUPPORTED_VERSIONS)})"
|
|
)
|
|
profiles = config.get("profiles")
|
|
if not isinstance(profiles, dict):
|
|
problems.append("missing 'profiles' object")
|
|
return problems
|
|
for name, profile in profiles.items():
|
|
if not is_valid_profile_name(name):
|
|
problems.append(f"invalid profile name {name!r}")
|
|
if not isinstance(profile, dict):
|
|
problems.append(f"profile '{name}' is not an object")
|
|
continue
|
|
if not profile.get("base_url"):
|
|
problems.append(f"profile '{name}' is missing 'base_url'")
|
|
for secret_key in ("token", "password"):
|
|
if secret_key in profile:
|
|
problems.append(
|
|
f"profile '{name}' has an inline '{secret_key}' (use an auth reference)"
|
|
)
|
|
try:
|
|
_validate_auth(name, profile.get("auth"))
|
|
except ConfigError as exc:
|
|
problems.append(str(exc))
|
|
else:
|
|
if profile.get("auth") is None:
|
|
problems.append(f"profile '{name}' is missing an 'auth' reference")
|
|
return problems
|
|
|
|
|
|
def add_profile(config, name, profile):
|
|
"""Return a copy of *config* with *profile* added under *name*.
|
|
|
|
Preserves existing profiles. Raises :class:`ConfigError` on an invalid name,
|
|
a duplicate name, or an invalid profile.
|
|
"""
|
|
if not is_valid_profile_name(name):
|
|
raise ConfigError(
|
|
f"invalid profile name {name!r}; use letters, digits, '.', '-', '_'"
|
|
)
|
|
profiles = dict(config.get("profiles") or {})
|
|
if name in profiles:
|
|
raise ConfigError(f"profile '{name}' already exists; edit or remove it first")
|
|
candidate = {"version": config.get("version", SUPPORTED_VERSION),
|
|
"profiles": {**profiles, name: profile}}
|
|
# Validate just this profile (reuse select_profile's checks).
|
|
select_profile(candidate, name)
|
|
if not profile.get("base_url"):
|
|
raise ConfigError(f"profile '{name}' is missing 'base_url'")
|
|
return candidate
|
|
|
|
|
|
def upsert_profile(config, name, profile):
|
|
"""Like :func:`add_profile` but replaces an existing profile (for edits)."""
|
|
if not is_valid_profile_name(name):
|
|
raise ConfigError(f"invalid profile name {name!r}")
|
|
profiles = dict(config.get("profiles") or {})
|
|
profiles[name] = profile
|
|
candidate = {"version": config.get("version", SUPPORTED_VERSION),
|
|
"profiles": profiles}
|
|
select_profile(candidate, name)
|
|
return candidate
|
|
|
|
|
|
def remove_profile(config, name):
|
|
"""Return a copy of *config* without profile *name*."""
|
|
profiles = dict(config.get("profiles") or {})
|
|
if name not in profiles:
|
|
raise ConfigError(f"profile '{name}' not found")
|
|
del profiles[name]
|
|
return {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles}
|
|
|
|
|
|
def save_config(config, path=None):
|
|
"""Atomically write *config* to *path* as pretty JSON, creating parent dirs.
|
|
|
|
Writes a temp file in the same directory then ``os.replace``s it into place,
|
|
so a crash mid-write never truncates an existing config.
|
|
"""
|
|
path = path or config_path() or DEFAULT_CONFIG_PATH
|
|
directory = os.path.dirname(os.path.abspath(path))
|
|
os.makedirs(directory, exist_ok=True)
|
|
fd, tmp = tempfile.mkstemp(dir=directory, prefix=".profiles-", suffix=".json")
|
|
try:
|
|
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
|
json.dump(config, fh, indent=2, sort_keys=True)
|
|
fh.write("\n")
|
|
os.replace(tmp, path)
|
|
except BaseException:
|
|
try:
|
|
os.unlink(tmp)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
return path
|
|
|
|
|
|
def server_command():
|
|
"""Return (command, args) that launch this repo's MCP server."""
|
|
root = os.path.dirname(os.path.abspath(__file__))
|
|
python = os.path.join(root, "venv", "bin", "python3")
|
|
if not os.path.exists(python):
|
|
python = sys.executable
|
|
return python, [os.path.join(root, "mcp_server.py")]
|
|
|
|
|
|
def launcher_entry(profile_name, config_path=None):
|
|
"""Return a thin MCP launcher entry for *profile_name*.
|
|
|
|
Contains only command/args and the two GITEA_MCP_* env vars — never a token
|
|
or password. Suitable for Claude / Gemini / Codex ``mcpServers`` blocks.
|
|
"""
|
|
command, args = server_command()
|
|
return {
|
|
"gitea-tools": {
|
|
"command": command,
|
|
"args": args,
|
|
"env": {
|
|
"GITEA_MCP_CONFIG": config_path or DEFAULT_CONFIG_PATH,
|
|
"GITEA_MCP_PROFILE": profile_name,
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
def keychain_set(item_id, token, account=None, runner=subprocess.run):
|
|
"""Store *token* in the macOS keychain under service *item_id*.
|
|
|
|
The token is passed to ``security`` as an argument only; it is never
|
|
returned, printed, or logged here. *runner* is injectable for testing.
|
|
Raises :class:`ConfigError` on failure (without echoing the token).
|
|
"""
|
|
if not item_id:
|
|
raise ConfigError("keychain item id is required")
|
|
if not token:
|
|
raise ConfigError("refusing to store an empty token")
|
|
account = account or os.environ.get("USER") or "gitea-tools"
|
|
cmd = ["security", "add-generic-password", "-U",
|
|
"-s", item_id, "-a", account, "-w", token]
|
|
try:
|
|
proc = runner(cmd, capture_output=True, text=True)
|
|
except (OSError, subprocess.SubprocessError) as exc:
|
|
raise ConfigError(f"could not run keychain store for '{item_id}'") from exc
|
|
if getattr(proc, "returncode", 1) != 0:
|
|
raise ConfigError(f"keychain store failed for item '{item_id}'")
|
|
return item_id
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover - thin CLI dispatch
|
|
if len(sys.argv) > 1 and sys.argv[1] == "menu":
|
|
import gitea_config_menu
|
|
raise SystemExit(gitea_config_menu.main(sys.argv[2:]))
|
|
if len(sys.argv) > 1 and sys.argv[1] == "audit":
|
|
# Local admin/debug diagnostics (#120). --reveal-endpoints is the
|
|
# explicit opt-in that adds base URLs and non-secret auth source
|
|
# names; token values are never printed on any path.
|
|
try:
|
|
config = load_config(config_path() or DEFAULT_CONFIG_PATH)
|
|
report = audit_config(
|
|
config, reveal_endpoints="--reveal-endpoints" in sys.argv[2:])
|
|
report["summaries"] = service_summaries(config)
|
|
except ConfigError as exc:
|
|
print(f"config error: {exc}", file=sys.stderr)
|
|
raise SystemExit(1)
|
|
print(json.dumps(report, indent=2))
|
|
raise SystemExit(0)
|
|
print("usage: python gitea_config.py menu | audit [--reveal-endpoints]",
|
|
file=sys.stderr)
|
|
raise SystemExit(2)
|