"""Canonical JSON runtime-profile configuration for Gitea MCP (issue #19). One canonical config file defines every Gitea runtime profile. Each LLM MCP launcher (Claude / Gemini / Codex) stays a *thin* launcher that only points at that file and names a profile — no duplicated ``GITEA_USER_*`` / ``GITEA_PASS_*`` blocks, no raw tokens in client configs: GITEA_MCP_CONFIG=/path/to/profiles.json # the canonical file GITEA_MCP_PROFILE=prgs # which named profile to use File shape (see ``gitea-mcp.example.json``):: { "version": 1, "profiles": { "prgs": { "base_url": "https://gitea.prgs.cc", "username": "jcwalker3", "auth": {"type": "keychain", "id": "prgs-gitea-token"}, "default_owner": "Scaled-Tech-Consulting", "execution_profile": "personal-prgs" } } } Auth is referenced *indirectly*, never inline: - ``{"type": "keychain", "id": "prgs-gitea-token"}`` — resolved from the macOS keychain at token-resolution time. - ``{"type": "env", "name": "GITEA_TOKEN_PRGS"}`` — read from that env var. Design constraints: - **Backwards compatible.** With ``GITEA_MCP_CONFIG`` unset, every entry point returns ``None`` and callers fall back to pure environment behaviour. - **No inline secrets.** A raw ``token``/``password`` key is rejected. Token values are resolved only via an auth *reference* and are never stored in, or returned as, profile metadata. - **No network.** Parsing only reads and decodes a local file. Token resolution (keychain/env) happens separately, on demand. - **Fail safely.** A missing file, invalid JSON, unsupported version, unknown/ unset selected profile, or an unresolvable secret reference raises :class:`ConfigError` with a message that never includes file contents, tokens, or passwords. """ import os import re import sys import json import tempfile import subprocess ENV_CONFIG_PATH = "GITEA_MCP_CONFIG" ENV_PROFILE = "GITEA_MCP_PROFILE" SUPPORTED_VERSION = 1 SUPPORTED_VERSIONS = (1, 2) _AUTH_TYPES = ("keychain", "env") # Profile names go into env vars, keychain ids, and JSON keys — keep them tame. _PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$") # v2 address segments (environment / service / identity) must be dot-free so # the dotted profile address {env}.{service}.{identity} stays unambiguous. _SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$") # Placeholder usernames must never activate (fail closed until provisioned). _TBD_RE = re.compile(r"(?i)^tbd(-|$)") # Keys that would mean an inline secret wherever they appear. _INLINE_SECRET_KEYS = ("token", "password", "secret") # ── Operation-name normalization table (#106; minimal subset landed in #103) ─── # Canonical operations are namespaced ({service}.{area}.{verb}). Legacy # unqualified spellings are accepted ONLY through this explicit table — never # by guessing. The same table is the documentation of record (see # docs/gitea-execution-profiles.md) and is exercised by # tests/test_op_normalization.py. GITEA_OPERATION_ALIASES = { "read": "gitea.read", "review": "gitea.pr.review", "comment": "gitea.pr.comment", "approve": "gitea.pr.approve", "request_changes": "gitea.pr.request_changes", "merge": "gitea.pr.merge", "pr.create": "gitea.pr.create", "branch.push": "gitea.branch.push", # Contexts-shape author verbs (#120) — the invariant checks below depend on # "push"/"open_pr" normalizing to the two author-only ops. "branch": "gitea.branch.create", "commit": "gitea.repo.commit", "push": "gitea.branch.push", "open_pr": "gitea.pr.create", } _REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"}) _AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"}) def normalize_operation(op, service="gitea"): """Return the canonical namespaced name for *op*, or fail closed (#106). - already namespaced for this service (``{service}.*``) → unchanged - known unqualified Gitea ops → mapped via ``GITEA_OPERATION_ALIASES`` - unqualified single-word ops on non-Gitea services → ``{service}.{op}`` - anything else — foreign service prefixes, dotted names outside the table, unknown unqualified names — is unknown or ambiguous → ConfigError Normalization never crosses services (a Gitea alias is never applied to another service) and never widens permissions: an operation that cannot be normalized grants and matches nothing. """ if not isinstance(op, str) or not op: raise ConfigError("operation must be a non-empty string (fail closed)") if op.startswith(service + "."): return op if service == "gitea" and op in GITEA_OPERATION_ALIASES: return GITEA_OPERATION_ALIASES[op] if service != "gitea" and "." not in op: return f"{service}.{op}" raise ConfigError( f"operation {op!r} cannot be normalized safely for service " f"'{service}' (unknown, ambiguous, or cross-service; fail closed)" ) def check_operation(op, allowed, forbidden=(), service="gitea"): """Decide whether *op* is permitted. Returns ``(bool, reason)`` (#106). Everything is normalized via :func:`normalize_operation` BEFORE any membership check, so legacy and canonical spellings always compare equal. Reasons: ``allowed``, ``invalid-operation``, ``invalid-forbidden-entry``, ``forbidden``, ``no-allowed-operations``, ``not-allowed``. Fail-closed rules: - an *op* that cannot be normalized is denied (``invalid-operation``) - a forbidden entry that cannot be normalized denies the request (``invalid-forbidden-entry``) — dropping it would silently narrow the forbidden set, i.e. widen permissions - an allowed entry that cannot be normalized is ignored — it grants nothing, so permissions never widen - ``forbidden`` always overrides ``allowed`` - an empty or missing allowed list denies everything """ try: op_n = normalize_operation(op, service) except ConfigError: return (False, "invalid-operation") forbidden_n = set() for entry in (forbidden or ()): try: forbidden_n.add(normalize_operation(entry, service)) except ConfigError: return (False, "invalid-forbidden-entry") if op_n in forbidden_n: return (False, "forbidden") if not allowed: return (False, "no-allowed-operations") allowed_n = set() for entry in allowed: try: allowed_n.add(normalize_operation(entry, service)) except ConfigError: continue if op_n in allowed_n: return (True, "allowed") return (False, "not-allowed") def _normalize_op(service, op, addr): """Normalize *op* for identity *addr*, or fail closed with context.""" try: return normalize_operation(op, service) except ConfigError as exc: raise ConfigError(f"identity '{addr}': {exc}") from None # Default canonical config location (one file shared by all LLM launchers). DEFAULT_CONFIG_PATH = os.path.join( os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json" ) class ConfigError(Exception): """Raised for a bad config file, profile selection, or secret reference. Messages are safe to surface: they never include file contents, tokens, or passwords — only non-secret names/ids/positions. """ def config_path(): """Return the configured JSON path, or None when the JSON layer is off.""" return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None def selected_profile_name(): """Return the selected profile name from the environment, or None.""" return (os.environ.get(ENV_PROFILE) or "").strip() or None def load_config(path=None): """Load and minimally validate the canonical JSON config. Returns the parsed dict, or None when no config path is configured (so env-only usage is never broken). Raises :class:`ConfigError` when a path is configured but the file is missing, unreadable, not valid JSON, declares an unsupported ``version``, or lacks a ``profiles`` object. """ path = path or config_path() if not path: return None if not os.path.isfile(path): raise ConfigError(f"{ENV_CONFIG_PATH} points to a missing file: {path}") try: with open(path, encoding="utf-8") as fh: data = json.load(fh) except json.JSONDecodeError as exc: # Report position only (msg/line/col). `from None` suppresses the # original exception, whose `.doc` holds the raw file text — which could # contain a token — so it never reaches a traceback. raise ConfigError( f"invalid JSON in {path} (line {exc.lineno}, column {exc.colno}): {exc.msg}" ) from None except OSError as exc: raise ConfigError(f"could not read {path}: {exc.strerror}") from None if not isinstance(data, dict): raise ConfigError(f"{path} must be a JSON object") version = data.get("version") if version is None: # Fail closed (#103): an unversioned config is ambiguous between v1 and # v2 shapes, so it is refused rather than guessed. raise ConfigError( f"{path} is missing the required 'version' field; " f"expected one of {list(SUPPORTED_VERSIONS)}" ) if version == 2: return _load_v2_any(data, path) if version != SUPPORTED_VERSION: raise ConfigError( f"{path} has unsupported version {version!r}; " f"expected one of {list(SUPPORTED_VERSIONS)}" ) if not isinstance(data.get("profiles"), dict): raise ConfigError(f"{path} must be a JSON object with a 'profiles' object") return data # ── profiles.json version 2 (#103): environment → service → identity ────────── # v2 files are validated and *flattened* at load time into the same # {"profiles": {...}} shape v1 consumers already understand, keyed by the # canonical dotted address {environment}.{service}.{identity}. Two extra # top-level keys are carried: "aliases" (exact-name compatibility selectors) # and "unavailable" (addresses that fail closed at selection, e.g. TBD users). def _validate_identity_auth(addr, auth): """Require and validate an identity 'auth' reference. Rejects inline secrets.""" if auth is None: raise ConfigError(f"identity '{addr}' is missing an 'auth' reference") if not isinstance(auth, dict): raise ConfigError(f"identity '{addr}' has a non-object 'auth'") for key in _INLINE_SECRET_KEYS: if key in auth: raise ConfigError( f"identity '{addr}' auth must not contain an inline '{key}'; " "store secrets in the keychain and reference them by id" ) _validate_auth(addr, auth) def _flatten_identity(env_name, svc_name, svc, ident_name, ident): """Validate one v2 identity and return (addr, flattened_profile). The flattened profile is v1-shaped (base_url/auth/username/defaults) plus v2 metadata (profile_path, environment, service, identity, role) and normalized operation lists. Raises ConfigError on any invariant violation. """ addr = f"{env_name}.{svc_name}.{ident_name}" if not isinstance(ident, dict): raise ConfigError(f"identity '{addr}' must be a JSON object") for key in _INLINE_SECRET_KEYS: if key in ident: raise ConfigError( f"identity '{addr}' must not contain an inline '{key}'; " "use an 'auth' reference instead" ) _validate_identity_auth(addr, ident.get("auth")) base_url = ident.get("base_url") or svc.get("base_url") if not base_url: raise ConfigError( f"identity '{addr}' has no 'base_url' at identity or service level" ) allowed = ident.get("allowed_operations") or [] forbidden = ident.get("forbidden_operations") or [] if not isinstance(allowed, list) or not isinstance(forbidden, list): raise ConfigError(f"identity '{addr}' operation fields must be lists") allowed_n = {_normalize_op(svc_name, op, addr) for op in allowed} forbidden_n = {_normalize_op(svc_name, op, addr) for op in forbidden} # Reviewer-identity deadlock rule (#100/#103): an identity that may approve # or merge PRs must explicitly forbid creating PRs and pushing branches, # so the reviewer identity can never author the PR it must review. if allowed_n & _REVIEW_MERGE_OPS: missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n) if missing: raise ConfigError( f"identity '{addr}' allows PR approve/merge but does not forbid " f"{missing}; reviewer identities must forbid gitea.pr.create and " "gitea.branch.push (reviewer-identity deadlock rule)" ) profile = { "profile_path": addr, "environment": env_name, "service": svc_name, "identity": ident_name, "base_url": base_url, "auth": ident["auth"], "allowed_operations": sorted(allowed_n), "forbidden_operations": sorted(forbidden_n), } # Service-level defaults inherit unless the identity overrides them. for key in ("default_owner", "default_repo", "default_org"): value = ident.get(key, svc.get(key)) if value: profile[key] = value for key in ("role", "username", "execution_profile", "audit_label"): if ident.get(key): profile[key] = ident[key] return addr, profile def _load_v2(data, path): """Validate a v2 config and return the flattened, resolvable structure.""" environments = data.get("environments") if not isinstance(environments, dict) or not environments: raise ConfigError( f"{path} version 2 config requires a non-empty 'environments' object" ) profiles = {} unavailable = {} for env_name, env in environments.items(): if not _SEGMENT_RE.match(env_name or ""): raise ConfigError(f"invalid environment name {env_name!r} (no dots)") if not isinstance(env, dict): raise ConfigError(f"environment '{env_name}' must be a JSON object") services = env.get("services") if not isinstance(services, dict) or not services: raise ConfigError( f"environment '{env_name}' requires a non-empty 'services' object" ) for svc_name, svc in services.items(): if not _SEGMENT_RE.match(svc_name or ""): raise ConfigError( f"invalid service name {svc_name!r} in '{env_name}' (no dots)" ) if not isinstance(svc, dict): raise ConfigError( f"service '{env_name}.{svc_name}' must be a JSON object" ) identities = svc.get("identities") if not isinstance(identities, dict) or not identities: raise ConfigError( f"service '{env_name}.{svc_name}' requires a non-empty " "'identities' object" ) for ident_name, ident in identities.items(): if not _SEGMENT_RE.match(ident_name or ""): raise ConfigError( f"invalid identity name {ident_name!r} in " f"'{env_name}.{svc_name}' (no dots)" ) addr, profile = _flatten_identity( env_name, svc_name, svc, ident_name, ident ) username = profile.get("username") or "" if _TBD_RE.match(username): # Fail closed at selection, without blocking every other # identity in the file (see #103 acceptance criteria). unavailable[addr] = ( f"identity '{addr}' username {username!r} is a TBD " "placeholder; provision the account before use " "(fail closed)" ) else: profiles[addr] = profile aliases = data.get("aliases") or {} if not isinstance(aliases, dict): raise ConfigError(f"{path} 'aliases' must be a JSON object") known = set(profiles) | set(unavailable) for alias, target in aliases.items(): if not isinstance(target, str) or not target: raise ConfigError(f"alias '{alias}' target must be a non-empty string") if alias in known and alias != target: raise ConfigError( f"selector '{alias}' is both an alias and a profile address " "with a different target (conflicting selector; fail closed)" ) if target not in known: raise ConfigError( f"alias '{alias}' points to unknown profile '{target}'" ) return { "version": 2, "profiles": profiles, "aliases": dict(aliases), "unavailable": unavailable, } # ── profiles.json version 2 *contexts* shape (#120) ─────────────────────────── # The canonical machine config groups everything by context: top-level # "contexts" (each with a gitea block and non-Gitea "services"), flat # "profiles" (Gitea identities pointing at a context), "projects" (local repo # paths mapped to a context), and "rules". Every context/profile/service/ # project carries a required boolean "enabled": disabled entries are surfaced # in audits but fail closed at selection — never a silent fallback. Loading # flattens profiles into the same {"profiles": {...}, "unavailable": {...}} # model v1 consumers and select_profile() already understand, and carries the # validated "contexts"/"projects"/"rules" through for service resolution. def _load_v2_any(data, path): """Dispatch a version-2 file to its shape loader; ambiguity fails closed.""" has_contexts = "contexts" in data has_environments = "environments" in data if has_contexts and has_environments: raise ConfigError( f"{path} version 2 config must not mix 'contexts' and " "'environments' shapes (ambiguous; fail closed)" ) if has_contexts: return _load_v2_contexts(data, path) return _load_v2(data, path) def _require_enabled(kind, name, obj): """Return the required boolean ``enabled`` flag, failing closed.""" enabled = obj.get("enabled") if not isinstance(enabled, bool): raise ConfigError( f"{kind} '{name}' requires a boolean 'enabled' flag (fail closed)" ) return enabled def _reject_inline_secrets(kind, name, obj): for key in _INLINE_SECRET_KEYS: if key in obj: raise ConfigError( f"{kind} '{name}' must not contain an inline '{key}'; " "store secrets in the keychain and reference them by id" ) def _validate_context_service(ctx_name, svc_name, svc): """Validate one context service entry (auth reference only, no secrets).""" addr = f"{ctx_name}.{svc_name}" if not isinstance(svc, dict): raise ConfigError(f"service '{addr}' must be a JSON object") _require_enabled("service", addr, svc) _reject_inline_secrets("service", addr, svc) if "auth" in svc: _validate_auth(addr, svc["auth"]) def _load_v2_contexts(data, path): """Validate a v2 contexts-shape config and return the resolvable structure.""" contexts = data.get("contexts") if not isinstance(contexts, dict) or not contexts: raise ConfigError( f"{path} version 2 contexts config requires a non-empty " "'contexts' object" ) for ctx_name, ctx in contexts.items(): if not _PROFILE_NAME_RE.match(ctx_name or ""): raise ConfigError(f"invalid context name {ctx_name!r}") if not isinstance(ctx, dict): raise ConfigError(f"context '{ctx_name}' must be a JSON object") _require_enabled("context", ctx_name, ctx) gitea = ctx.get("gitea") if gitea is not None: if not isinstance(gitea, dict): raise ConfigError( f"context '{ctx_name}' has a non-object 'gitea' block") _require_enabled("service", f"{ctx_name}.gitea", gitea) _reject_inline_secrets("service", f"{ctx_name}.gitea", gitea) services = ctx.get("services") or {} if not isinstance(services, dict): raise ConfigError( f"context '{ctx_name}' has a non-object 'services' block") for svc_name, svc in services.items(): _validate_context_service(ctx_name, svc_name, svc) raw_profiles = data.get("profiles") if not isinstance(raw_profiles, dict) or not raw_profiles: raise ConfigError( f"{path} version 2 contexts config requires a non-empty " "'profiles' object" ) profiles = {} unavailable = {} for name, raw in raw_profiles.items(): if not is_valid_profile_name(name): raise ConfigError(f"invalid profile name {name!r}") if not isinstance(raw, dict): raise ConfigError(f"profile '{name}' must be a JSON object") enabled = _require_enabled("profile", name, raw) _reject_inline_secrets("profile", name, raw) _validate_identity_auth(name, raw.get("auth")) ctx_name = raw.get("context") if ctx_name not in contexts: raise ConfigError( f"profile '{name}' references unknown context {ctx_name!r}") context = contexts[ctx_name] allowed = raw.get("allowed_operations") or [] forbidden = raw.get("forbidden_operations") or [] if not isinstance(allowed, list) or not isinstance(forbidden, list): raise ConfigError(f"profile '{name}' operation fields must be lists") allowed_n = {_normalize_op("gitea", op, name) for op in allowed} forbidden_n = {_normalize_op("gitea", op, name) for op in forbidden} # Reviewer-identity deadlock rule (#100/#103) applies here unchanged. if allowed_n & _REVIEW_MERGE_OPS: missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n) if missing: raise ConfigError( f"profile '{name}' allows PR approve/merge but does not " f"forbid {missing}; reviewer identities must forbid " "gitea.pr.create and gitea.branch.push " "(reviewer-identity deadlock rule)" ) profile = dict(raw) profile["allowed_operations"] = sorted(allowed_n) profile["forbidden_operations"] = sorted(forbidden_n) gitea = context.get("gitea") or {} if not profile.get("base_url") and gitea.get("enabled"): profile["base_url"] = gitea.get("base_url") username = profile.get("username") or "" if not enabled: unavailable[name] = ( f"profile '{name}' is disabled (enabled: false); defined but " "unavailable for action — refusing, no fallback" ) elif not context.get("enabled"): unavailable[name] = ( f"profile '{name}' belongs to context '{ctx_name}' which is " "disabled (enabled: false); refusing, no fallback" ) elif not profile.get("base_url"): unavailable[name] = ( f"profile '{name}' has no usable base_url (none set and the " f"context '{ctx_name}' gitea service is disabled or has none); " "fail closed" ) elif _TBD_RE.match(username): unavailable[name] = ( f"profile '{name}' username {username!r} is a TBD placeholder; " "provision the account before use (fail closed)" ) else: profiles[name] = profile continue # Unavailable profiles keep their (secret-free) body for audits only. profile["_unavailable_reason"] = unavailable[name] profiles.setdefault("_audit_only", {}) profiles["_audit_only"][name] = profile projects = data.get("projects") or {} if not isinstance(projects, dict): raise ConfigError(f"{path} 'projects' must be a JSON object") for proj_path, proj in projects.items(): if not isinstance(proj, dict): raise ConfigError(f"project '{proj_path}' must be a JSON object") _require_enabled("project", proj_path, proj) if proj.get("context") not in contexts: raise ConfigError( f"project '{proj_path}' references unknown context " f"{proj.get('context')!r}" ) rules = data.get("rules") or {} if not isinstance(rules, dict): raise ConfigError(f"{path} 'rules' must be a JSON object") audit_only = profiles.pop("_audit_only", {}) return { "version": 2, "shape": "contexts", "profiles": profiles, "unavailable": unavailable, "audit_only_profiles": audit_only, "contexts": contexts, "projects": projects, "rules": rules, } def resolve_service(config, context_name, service_name): """Return one context service's config for *internal* MCP use. The returned dict includes the endpoint base_url and the keychain auth *reference* — both are for MCP-internal resolution only and must never be echoed into normal LLM-facing output (see audit_config/service_summaries). Fails closed on an unknown or disabled context/service; never falls back to another service. """ contexts = (config or {}).get("contexts") if not isinstance(contexts, dict): raise ConfigError( "service resolution requires a version 2 contexts config") ctx = contexts.get(context_name) if ctx is None: raise ConfigError( f"unknown context '{context_name}' (fail closed, no fallback)") if not ctx.get("enabled"): raise ConfigError( f"context '{context_name}' is disabled; its services are defined " "but unavailable for action (no fallback)" ) if service_name == "gitea": service = ctx.get("gitea") else: service = (ctx.get("services") or {}).get(service_name) if service is None: raise ConfigError( f"unknown service '{service_name}' in context '{context_name}' " "(fail closed, no fallback)" ) if not service.get("enabled"): raise ConfigError( f"service '{context_name}.{service_name}' is disabled; defined " "but unavailable for action — refusing, no fallback" ) return dict(service) def project_for_path(config, path): """Map a local project *path* to its context entry, failing closed. Returns None when the path is not configured (feature off for that repo). Raises :class:`ConfigError` when the project or its context is disabled — a configured-but-disabled project must never be acted on. """ projects = (config or {}).get("projects") or {} project = projects.get(path) if project is None: return None if not project.get("enabled"): raise ConfigError( f"project '{path}' is disabled (enabled: false); refusing, " "no fallback" ) contexts = (config or {}).get("contexts") or {} ctx = contexts.get(project.get("context")) or {} if not ctx.get("enabled"): raise ConfigError( f"project '{path}' maps to context '{project.get('context')}' " "which is disabled; refusing, no fallback" ) return dict(project) def _audit_profile_entry(name, profile, enabled, reveal_endpoints): """One LLM-safe audit row: no endpoint URLs, no keychain ids, no tokens.""" auth = profile.get("auth") if isinstance(profile, dict) else None entry = { "name": name, "enabled": enabled, "context": profile.get("context") or profile.get("environment"), "role": profile.get("role"), "username": profile.get("username"), "auth": (auth or {}).get("type") if isinstance(auth, dict) else None, } reason = profile.get("_unavailable_reason") if reason: entry["reason"] = reason if reveal_endpoints: entry["base_url"] = profile.get("base_url") entry["auth_source"] = auth_source_name(profile) return entry def audit_config(config, reveal_endpoints=False): """Report enabled/disabled profiles and services without secrets. Default output is LLM-safe: names, contexts, enabled state, capability labels, and the auth *type* only — never endpoint URLs, keychain ids, token values, or auth source names. ``reveal_endpoints=True`` is the explicit admin/debug opt-in for local diagnostics: it adds base URLs and non-secret auth source names (``keychain:`` / env var name). Token values are never included on any path. """ if config is None: return {"version": None, "profiles": [], "services": []} report = { "version": config.get("version"), "shape": config.get("shape") or ("environments" if config.get("aliases") is not None else "profiles"), "profiles": [], "services": [], } for name, profile in (config.get("profiles") or {}).items(): if not isinstance(profile, dict): continue report["profiles"].append(_audit_profile_entry( name, profile, True, reveal_endpoints)) for name, profile in (config.get("audit_only_profiles") or {}).items(): report["profiles"].append(_audit_profile_entry( name, profile, False, reveal_endpoints)) for ctx_name, ctx in (config.get("contexts") or {}).items(): ctx_enabled = bool(ctx.get("enabled")) for svc_name, svc in (ctx.get("services") or {}).items(): entry = { "context": ctx_name, "name": svc_name, "kind": svc.get("kind"), "label": svc.get("label"), "enabled": ctx_enabled and bool(svc.get("enabled")), "capabilities": list(svc.get("capabilities") or []), "auth": (svc.get("auth") or {}).get("type"), } if reveal_endpoints: entry["base_url"] = svc.get("base_url") entry["auth_source"] = auth_source_name(svc) report["services"].append(entry) return report def service_summaries(config, auth_check=None): """Safe one-line service summaries for LLM sessions. Each line reports label + state only (e.g. ``PRGS Jenkins: enabled, read-only, authenticated`` / ``PRGS Sentry: disabled``) — never endpoint URLs, keychain ids, or token values. *auth_check* is a callable taking the service dict and returning True when its credential resolves; it defaults to a local keychain presence check and its result is reported only as ``authenticated`` / ``no credential``. """ if auth_check is None: def auth_check(service): auth = service.get("auth") or {} if auth.get("type") == "keychain": return _keychain_token(auth.get("id")) is not None if auth.get("type") == "env": return bool(os.environ.get(auth.get("name") or "")) return False lines = [] for ctx_name, ctx in (config.get("contexts") or {}).items(): ctx_enabled = bool(ctx.get("enabled")) for svc_name, svc in (ctx.get("services") or {}).items(): label = svc.get("label") or f"{ctx_name} {svc_name}" if not (ctx_enabled and svc.get("enabled")): lines.append(f"{label}: disabled") continue caps = list(svc.get("capabilities") or []) cap_part = "read-only" if caps == ["read"] else ", ".join(caps) auth_part = "authenticated" if auth_check(svc) else "no credential" parts = ["enabled"] + ([cap_part] if cap_part else []) + [auth_part] lines.append(f"{label}: " + ", ".join(parts)) return lines def _validate_auth(name, auth): """Validate a profile's optional ``auth`` reference. Never echoes secrets.""" if auth is None: return if not isinstance(auth, dict): raise ConfigError(f"profile '{name}' has a non-object 'auth'") atype = auth.get("type") if atype not in _AUTH_TYPES: raise ConfigError( f"profile '{name}' has invalid auth type {atype!r}; " f"expected one of {list(_AUTH_TYPES)}" ) if atype == "keychain" and not auth.get("id"): raise ConfigError(f"profile '{name}' keychain auth requires an 'id'") if atype == "env" and not auth.get("name"): raise ConfigError(f"profile '{name}' env auth requires a 'name'") def select_profile(config, name=None): """Return the selected profile dict from a loaded *config*. Returns None when *config* is None. Raises :class:`ConfigError` when no profile is selected, the selected profile is unknown or not an object, the profile embeds a raw ``token``/``password``, or its ``auth`` reference is malformed. """ if config is None: return None profiles = config.get("profiles", {}) aliases = config.get("aliases") or {} unavailable = config.get("unavailable") or {} name = name or selected_profile_name() available = sorted(set(profiles) | set(aliases)) if not name: raise ConfigError( f"{ENV_CONFIG_PATH} is set but {ENV_PROFILE} is not; " f"available profiles: {available}" ) # Strict resolution order (#103): exact alias → exact profile address → # fail closed. No fuzzy matching, no partial matches, no defaults. resolved = aliases.get(name, name) if resolved in unavailable: raise ConfigError(unavailable[resolved]) if resolved not in profiles: raise ConfigError( f"profile '{name}' not found in config; available profiles: {available}" ) profile = profiles[resolved] if not isinstance(profile, dict): raise ConfigError(f"profile '{name}' must be a JSON object") for secret_key in ("token", "password"): if secret_key in profile: # Never accept (or echo) an inline secret; require an auth reference. raise ConfigError( f"profile '{name}' must not contain an inline '{secret_key}'; " "use an 'auth' reference ({\"type\": \"keychain\"|\"env\", ...}) instead" ) _validate_auth(name, profile.get("auth")) return profile def resolve_profile(path=None, name=None): """Load the config and return the selected profile dict, or None if off.""" return select_profile(load_config(path), name) def auth_source_name(profile): """Return a *non-secret* name for a profile's token source, or None. For env auth this is the env var name; for keychain auth, ``keychain:``. Safe to surface in profile metadata (never the token value). """ if not profile: return None auth = profile.get("auth") if not isinstance(auth, dict): return None if auth.get("type") == "env": return auth.get("name") if auth.get("type") == "keychain": return f"keychain:{auth.get('id')}" return None def _keychain_token(item_id): """Read a token from the macOS keychain by service *item_id*. Returns the secret string, or None if it cannot be found. Never logs the value; failures are swallowed so the caller can raise a safe error. """ try: proc = subprocess.run( ["security", "find-generic-password", "-s", item_id, "-w"], capture_output=True, text=True, ) except (OSError, subprocess.SubprocessError): return None if proc.returncode != 0: return None return proc.stdout.strip() or None def resolve_token(profile, keychain_lookup=_keychain_token): """Resolve the token for *profile* via its ``auth`` reference. Returns None when *profile* is None or has no ``auth``. Raises :class:`ConfigError` when the reference cannot be resolved (env var unset or keychain item missing). Never accepts an inline token and never logs, echoes, or includes the secret *value* in any error. *keychain_lookup* is injectable for testing. """ if not profile: return None auth = profile.get("auth") if not isinstance(auth, dict): return None atype = auth.get("type") if atype == "env": env_name = auth.get("name") value = os.environ.get(env_name) if env_name else None if not value: raise ConfigError( f"auth env var '{env_name}' referenced by the profile is not set" ) return value if atype == "keychain": item_id = auth.get("id") value = keychain_lookup(item_id) if item_id else None if not value: raise ConfigError( f"keychain item '{item_id}' referenced by the profile was not found" ) return value raise ConfigError(f"unsupported auth type {atype!r} in the selected profile") # ── Config authoring helpers (used by the interactive menu; pure + testable) ──── def is_valid_profile_name(name): """True if *name* is a safe profile key (alnum, dot, dash, underscore).""" return bool(name) and bool(_PROFILE_NAME_RE.match(name)) def keychain_auth(item_id): """Build a keychain auth reference.""" return {"type": "keychain", "id": item_id} def env_auth(var_name): """Build an env auth reference.""" return {"type": "env", "name": var_name} def build_profile(*, base_url, auth, username=None, default_owner=None, default_repo=None, execution_profile=None): """Assemble a profile dict, omitting empty optional fields. No secrets.""" profile = {"base_url": base_url, "auth": auth} if username: profile["username"] = username if default_owner: profile["default_owner"] = default_owner if default_repo: profile["default_repo"] = default_repo if execution_profile: profile["execution_profile"] = execution_profile return profile def empty_config(): """Return a fresh, valid canonical config with no profiles.""" return {"version": SUPPORTED_VERSION, "profiles": {}} def validate_config(config): """Return a list of human-readable problems with *config* (empty = valid). Never includes secret material — profiles carry only auth *references*. """ problems = [] if not isinstance(config, dict): return ["config is not a JSON object"] version = config.get("version") if version is None: problems.append( f"missing required 'version' (expected one of {list(SUPPORTED_VERSIONS)})" ) elif version == 2: # v2 validation is all-or-nothing via the loader's invariants. try: _load_v2_any(config, "") except ConfigError as exc: problems.append(str(exc)) return problems elif version != SUPPORTED_VERSION: problems.append( f"unsupported version {version!r} (expected one of {list(SUPPORTED_VERSIONS)})" ) profiles = config.get("profiles") if not isinstance(profiles, dict): problems.append("missing 'profiles' object") return problems for name, profile in profiles.items(): if not is_valid_profile_name(name): problems.append(f"invalid profile name {name!r}") if not isinstance(profile, dict): problems.append(f"profile '{name}' is not an object") continue if not profile.get("base_url"): problems.append(f"profile '{name}' is missing 'base_url'") for secret_key in ("token", "password"): if secret_key in profile: problems.append( f"profile '{name}' has an inline '{secret_key}' (use an auth reference)" ) try: _validate_auth(name, profile.get("auth")) except ConfigError as exc: problems.append(str(exc)) else: if profile.get("auth") is None: problems.append(f"profile '{name}' is missing an 'auth' reference") return problems def add_profile(config, name, profile): """Return a copy of *config* with *profile* added under *name*. Preserves existing profiles. Raises :class:`ConfigError` on an invalid name, a duplicate name, or an invalid profile. """ if not is_valid_profile_name(name): raise ConfigError( f"invalid profile name {name!r}; use letters, digits, '.', '-', '_'" ) profiles = dict(config.get("profiles") or {}) if name in profiles: raise ConfigError(f"profile '{name}' already exists; edit or remove it first") candidate = {"version": config.get("version", SUPPORTED_VERSION), "profiles": {**profiles, name: profile}} # Validate just this profile (reuse select_profile's checks). select_profile(candidate, name) if not profile.get("base_url"): raise ConfigError(f"profile '{name}' is missing 'base_url'") return candidate def upsert_profile(config, name, profile): """Like :func:`add_profile` but replaces an existing profile (for edits).""" if not is_valid_profile_name(name): raise ConfigError(f"invalid profile name {name!r}") profiles = dict(config.get("profiles") or {}) profiles[name] = profile candidate = {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles} select_profile(candidate, name) return candidate def remove_profile(config, name): """Return a copy of *config* without profile *name*.""" profiles = dict(config.get("profiles") or {}) if name not in profiles: raise ConfigError(f"profile '{name}' not found") del profiles[name] return {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles} def save_config(config, path=None): """Atomically write *config* to *path* as pretty JSON, creating parent dirs. Writes a temp file in the same directory then ``os.replace``s it into place, so a crash mid-write never truncates an existing config. """ path = path or config_path() or DEFAULT_CONFIG_PATH directory = os.path.dirname(os.path.abspath(path)) os.makedirs(directory, exist_ok=True) fd, tmp = tempfile.mkstemp(dir=directory, prefix=".profiles-", suffix=".json") try: with os.fdopen(fd, "w", encoding="utf-8") as fh: json.dump(config, fh, indent=2, sort_keys=True) fh.write("\n") os.replace(tmp, path) except BaseException: try: os.unlink(tmp) except OSError: pass raise return path def server_command(): """Return (command, args) that launch this repo's MCP server.""" root = os.path.dirname(os.path.abspath(__file__)) python = os.path.join(root, "venv", "bin", "python3") if not os.path.exists(python): python = sys.executable return python, [os.path.join(root, "mcp_server.py")] def launcher_entry(profile_name, config_path=None): """Return a thin MCP launcher entry for *profile_name*. Contains only command/args and the two GITEA_MCP_* env vars — never a token or password. Suitable for Claude / Gemini / Codex ``mcpServers`` blocks. """ command, args = server_command() return { "gitea-tools": { "command": command, "args": args, "env": { "GITEA_MCP_CONFIG": config_path or DEFAULT_CONFIG_PATH, "GITEA_MCP_PROFILE": profile_name, }, } } def keychain_set(item_id, token, account=None, runner=subprocess.run): """Store *token* in the macOS keychain under service *item_id*. The token is passed to ``security`` as an argument only; it is never returned, printed, or logged here. *runner* is injectable for testing. Raises :class:`ConfigError` on failure (without echoing the token). """ if not item_id: raise ConfigError("keychain item id is required") if not token: raise ConfigError("refusing to store an empty token") account = account or os.environ.get("USER") or "gitea-tools" cmd = ["security", "add-generic-password", "-U", "-s", item_id, "-a", account, "-w", token] try: proc = runner(cmd, capture_output=True, text=True) except (OSError, subprocess.SubprocessError) as exc: raise ConfigError(f"could not run keychain store for '{item_id}'") from exc if getattr(proc, "returncode", 1) != 0: raise ConfigError(f"keychain store failed for item '{item_id}'") return item_id if __name__ == "__main__": # pragma: no cover - thin CLI dispatch if len(sys.argv) > 1 and sys.argv[1] == "menu": import gitea_config_menu raise SystemExit(gitea_config_menu.main(sys.argv[2:])) if len(sys.argv) > 1 and sys.argv[1] == "audit": # Local admin/debug diagnostics (#120). --reveal-endpoints is the # explicit opt-in that adds base URLs and non-secret auth source # names; token values are never printed on any path. try: config = load_config(config_path() or DEFAULT_CONFIG_PATH) report = audit_config( config, reveal_endpoints="--reveal-endpoints" in sys.argv[2:]) report["summaries"] = service_summaries(config) except ConfigError as exc: print(f"config error: {exc}", file=sys.stderr) raise SystemExit(1) print(json.dumps(report, indent=2)) raise SystemExit(0) print("usage: python gitea_config.py menu | audit [--reveal-endpoints]", file=sys.stderr) raise SystemExit(2)