"""Canonical JSON runtime-profile configuration for Gitea MCP (issue #19). One canonical config file defines every Gitea runtime profile. Each LLM MCP launcher (Claude / Gemini / Codex) stays a *thin* launcher that only points at that file and names a profile — no duplicated ``GITEA_USER_*`` / ``GITEA_PASS_*`` blocks, no raw tokens in client configs: GITEA_MCP_CONFIG=/path/to/profiles.json # the canonical file GITEA_MCP_PROFILE=prgs # which named profile to use File shape (see ``gitea-mcp.example.json``):: { "version": 1, "profiles": { "prgs": { "base_url": "https://gitea.prgs.cc", "username": "jcwalker3", "auth": {"type": "keychain", "id": "prgs-gitea-token"}, "default_owner": "Scaled-Tech-Consulting", "execution_profile": "personal-prgs" } } } Auth is referenced *indirectly*, never inline: - ``{"type": "keychain", "id": "prgs-gitea-token"}`` — resolved from the macOS keychain at token-resolution time. - ``{"type": "env", "name": "GITEA_TOKEN_PRGS"}`` — read from that env var. Design constraints: - **Backwards compatible.** With ``GITEA_MCP_CONFIG`` unset, every entry point returns ``None`` and callers fall back to pure environment behaviour. - **No inline secrets.** A raw ``token``/``password`` key is rejected. Token values are resolved only via an auth *reference* and are never stored in, or returned as, profile metadata. - **No network.** Parsing only reads and decodes a local file. Token resolution (keychain/env) happens separately, on demand. - **Fail safely.** A missing file, invalid JSON, unsupported version, unknown/ unset selected profile, or an unresolvable secret reference raises :class:`ConfigError` with a message that never includes file contents, tokens, or passwords. """ import os import re import sys import json import tempfile import subprocess ENV_CONFIG_PATH = "GITEA_MCP_CONFIG" ENV_PROFILE = "GITEA_MCP_PROFILE" SUPPORTED_VERSION = 1 SUPPORTED_VERSIONS = (1, 2) _AUTH_TYPES = ("keychain", "env") # Profile names go into env vars, keychain ids, and JSON keys — keep them tame. _PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$") # v2 address segments (environment / service / identity) must be dot-free so # the dotted profile address {env}.{service}.{identity} stays unambiguous. _SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$") # Placeholder usernames must never activate (fail closed until provisioned). _TBD_RE = re.compile(r"(?i)^tbd(-|$)") # Keys that would mean an inline secret wherever they appear. _INLINE_SECRET_KEYS = ("token", "password", "secret") # ── Minimal operation normalization (#103) ───────────────────────────────────── # Only what the #103 invariants need. The full normalization table, deprecation # handling, and enforcement test matrix belong to issue #106 — do not grow this # beyond invariant safety here. _MINIMAL_GITEA_OP_MAP = { "read": "gitea.read", "review": "gitea.pr.review", "comment": "gitea.pr.comment", "approve": "gitea.pr.approve", "request_changes": "gitea.pr.request_changes", "merge": "gitea.pr.merge", "pr.create": "gitea.pr.create", "branch.push": "gitea.branch.push", } _REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"}) _AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"}) def _normalize_op(service, op, addr): """Normalize *op* for *service*, or fail closed (#103 minimal subset). - already namespaced for this service (``{service}.*``) → unchanged - known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP`` - unqualified single-word ops on non-Gitea services → ``{service}.{op}`` - anything else (foreign prefixes, unknown unqualified names) → ConfigError """ if not isinstance(op, str) or not op: raise ConfigError(f"identity '{addr}' has an empty or non-string operation") if op.startswith(service + "."): return op if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP: return _MINIMAL_GITEA_OP_MAP[op] if service != "gitea" and "." not in op: return f"{service}.{op}" raise ConfigError( f"identity '{addr}' has operation {op!r} that cannot be normalized " f"safely for service '{service}' (fail closed; full table is issue #106)" ) # Default canonical config location (one file shared by all LLM launchers). DEFAULT_CONFIG_PATH = os.path.join( os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json" ) class ConfigError(Exception): """Raised for a bad config file, profile selection, or secret reference. Messages are safe to surface: they never include file contents, tokens, or passwords — only non-secret names/ids/positions. """ def config_path(): """Return the configured JSON path, or None when the JSON layer is off.""" return (os.environ.get(ENV_CONFIG_PATH) or "").strip() or None def selected_profile_name(): """Return the selected profile name from the environment, or None.""" return (os.environ.get(ENV_PROFILE) or "").strip() or None def load_config(path=None): """Load and minimally validate the canonical JSON config. Returns the parsed dict, or None when no config path is configured (so env-only usage is never broken). Raises :class:`ConfigError` when a path is configured but the file is missing, unreadable, not valid JSON, declares an unsupported ``version``, or lacks a ``profiles`` object. """ path = path or config_path() if not path: return None if not os.path.isfile(path): raise ConfigError(f"{ENV_CONFIG_PATH} points to a missing file: {path}") try: with open(path, encoding="utf-8") as fh: data = json.load(fh) except json.JSONDecodeError as exc: # Report position only (msg/line/col). `from None` suppresses the # original exception, whose `.doc` holds the raw file text — which could # contain a token — so it never reaches a traceback. raise ConfigError( f"invalid JSON in {path} (line {exc.lineno}, column {exc.colno}): {exc.msg}" ) from None except OSError as exc: raise ConfigError(f"could not read {path}: {exc.strerror}") from None if not isinstance(data, dict): raise ConfigError(f"{path} must be a JSON object") version = data.get("version") if version is None: # Fail closed (#103): an unversioned config is ambiguous between v1 and # v2 shapes, so it is refused rather than guessed. raise ConfigError( f"{path} is missing the required 'version' field; " f"expected one of {list(SUPPORTED_VERSIONS)}" ) if version == 2: return _load_v2(data, path) if version != SUPPORTED_VERSION: raise ConfigError( f"{path} has unsupported version {version!r}; " f"expected one of {list(SUPPORTED_VERSIONS)}" ) if not isinstance(data.get("profiles"), dict): raise ConfigError(f"{path} must be a JSON object with a 'profiles' object") return data # ── profiles.json version 2 (#103): environment → service → identity ────────── # v2 files are validated and *flattened* at load time into the same # {"profiles": {...}} shape v1 consumers already understand, keyed by the # canonical dotted address {environment}.{service}.{identity}. Two extra # top-level keys are carried: "aliases" (exact-name compatibility selectors) # and "unavailable" (addresses that fail closed at selection, e.g. TBD users). def _validate_identity_auth(addr, auth): """Require and validate an identity 'auth' reference. Rejects inline secrets.""" if auth is None: raise ConfigError(f"identity '{addr}' is missing an 'auth' reference") if not isinstance(auth, dict): raise ConfigError(f"identity '{addr}' has a non-object 'auth'") for key in _INLINE_SECRET_KEYS: if key in auth: raise ConfigError( f"identity '{addr}' auth must not contain an inline '{key}'; " "store secrets in the keychain and reference them by id" ) _validate_auth(addr, auth) def _flatten_identity(env_name, svc_name, svc, ident_name, ident): """Validate one v2 identity and return (addr, flattened_profile). The flattened profile is v1-shaped (base_url/auth/username/defaults) plus v2 metadata (profile_path, environment, service, identity, role) and normalized operation lists. Raises ConfigError on any invariant violation. """ addr = f"{env_name}.{svc_name}.{ident_name}" if not isinstance(ident, dict): raise ConfigError(f"identity '{addr}' must be a JSON object") for key in _INLINE_SECRET_KEYS: if key in ident: raise ConfigError( f"identity '{addr}' must not contain an inline '{key}'; " "use an 'auth' reference instead" ) _validate_identity_auth(addr, ident.get("auth")) base_url = ident.get("base_url") or svc.get("base_url") if not base_url: raise ConfigError( f"identity '{addr}' has no 'base_url' at identity or service level" ) allowed = ident.get("allowed_operations") or [] forbidden = ident.get("forbidden_operations") or [] if not isinstance(allowed, list) or not isinstance(forbidden, list): raise ConfigError(f"identity '{addr}' operation fields must be lists") allowed_n = {_normalize_op(svc_name, op, addr) for op in allowed} forbidden_n = {_normalize_op(svc_name, op, addr) for op in forbidden} # Reviewer-identity deadlock rule (#100/#103): an identity that may approve # or merge PRs must explicitly forbid creating PRs and pushing branches, # so the reviewer identity can never author the PR it must review. if allowed_n & _REVIEW_MERGE_OPS: missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n) if missing: raise ConfigError( f"identity '{addr}' allows PR approve/merge but does not forbid " f"{missing}; reviewer identities must forbid gitea.pr.create and " "gitea.branch.push (reviewer-identity deadlock rule)" ) profile = { "profile_path": addr, "environment": env_name, "service": svc_name, "identity": ident_name, "base_url": base_url, "auth": ident["auth"], "allowed_operations": sorted(allowed_n), "forbidden_operations": sorted(forbidden_n), } # Service-level defaults inherit unless the identity overrides them. for key in ("default_owner", "default_repo", "default_org"): value = ident.get(key, svc.get(key)) if value: profile[key] = value for key in ("role", "username", "execution_profile", "audit_label"): if ident.get(key): profile[key] = ident[key] return addr, profile def _load_v2(data, path): """Validate a v2 config and return the flattened, resolvable structure.""" environments = data.get("environments") if not isinstance(environments, dict) or not environments: raise ConfigError( f"{path} version 2 config requires a non-empty 'environments' object" ) profiles = {} unavailable = {} for env_name, env in environments.items(): if not _SEGMENT_RE.match(env_name or ""): raise ConfigError(f"invalid environment name {env_name!r} (no dots)") if not isinstance(env, dict): raise ConfigError(f"environment '{env_name}' must be a JSON object") services = env.get("services") if not isinstance(services, dict) or not services: raise ConfigError( f"environment '{env_name}' requires a non-empty 'services' object" ) for svc_name, svc in services.items(): if not _SEGMENT_RE.match(svc_name or ""): raise ConfigError( f"invalid service name {svc_name!r} in '{env_name}' (no dots)" ) if not isinstance(svc, dict): raise ConfigError( f"service '{env_name}.{svc_name}' must be a JSON object" ) identities = svc.get("identities") if not isinstance(identities, dict) or not identities: raise ConfigError( f"service '{env_name}.{svc_name}' requires a non-empty " "'identities' object" ) for ident_name, ident in identities.items(): if not _SEGMENT_RE.match(ident_name or ""): raise ConfigError( f"invalid identity name {ident_name!r} in " f"'{env_name}.{svc_name}' (no dots)" ) addr, profile = _flatten_identity( env_name, svc_name, svc, ident_name, ident ) username = profile.get("username") or "" if _TBD_RE.match(username): # Fail closed at selection, without blocking every other # identity in the file (see #103 acceptance criteria). unavailable[addr] = ( f"identity '{addr}' username {username!r} is a TBD " "placeholder; provision the account before use " "(fail closed)" ) else: profiles[addr] = profile aliases = data.get("aliases") or {} if not isinstance(aliases, dict): raise ConfigError(f"{path} 'aliases' must be a JSON object") known = set(profiles) | set(unavailable) for alias, target in aliases.items(): if not isinstance(target, str) or not target: raise ConfigError(f"alias '{alias}' target must be a non-empty string") if alias in known and alias != target: raise ConfigError( f"selector '{alias}' is both an alias and a profile address " "with a different target (conflicting selector; fail closed)" ) if target not in known: raise ConfigError( f"alias '{alias}' points to unknown profile '{target}'" ) return { "version": 2, "profiles": profiles, "aliases": dict(aliases), "unavailable": unavailable, } def _validate_auth(name, auth): """Validate a profile's optional ``auth`` reference. Never echoes secrets.""" if auth is None: return if not isinstance(auth, dict): raise ConfigError(f"profile '{name}' has a non-object 'auth'") atype = auth.get("type") if atype not in _AUTH_TYPES: raise ConfigError( f"profile '{name}' has invalid auth type {atype!r}; " f"expected one of {list(_AUTH_TYPES)}" ) if atype == "keychain" and not auth.get("id"): raise ConfigError(f"profile '{name}' keychain auth requires an 'id'") if atype == "env" and not auth.get("name"): raise ConfigError(f"profile '{name}' env auth requires a 'name'") def select_profile(config, name=None): """Return the selected profile dict from a loaded *config*. Returns None when *config* is None. Raises :class:`ConfigError` when no profile is selected, the selected profile is unknown or not an object, the profile embeds a raw ``token``/``password``, or its ``auth`` reference is malformed. """ if config is None: return None profiles = config.get("profiles", {}) aliases = config.get("aliases") or {} unavailable = config.get("unavailable") or {} name = name or selected_profile_name() available = sorted(set(profiles) | set(aliases)) if not name: raise ConfigError( f"{ENV_CONFIG_PATH} is set but {ENV_PROFILE} is not; " f"available profiles: {available}" ) # Strict resolution order (#103): exact alias → exact profile address → # fail closed. No fuzzy matching, no partial matches, no defaults. resolved = aliases.get(name, name) if resolved in unavailable: raise ConfigError(unavailable[resolved]) if resolved not in profiles: raise ConfigError( f"profile '{name}' not found in config; available profiles: {available}" ) profile = profiles[resolved] if not isinstance(profile, dict): raise ConfigError(f"profile '{name}' must be a JSON object") for secret_key in ("token", "password"): if secret_key in profile: # Never accept (or echo) an inline secret; require an auth reference. raise ConfigError( f"profile '{name}' must not contain an inline '{secret_key}'; " "use an 'auth' reference ({\"type\": \"keychain\"|\"env\", ...}) instead" ) _validate_auth(name, profile.get("auth")) return profile def resolve_profile(path=None, name=None): """Load the config and return the selected profile dict, or None if off.""" return select_profile(load_config(path), name) def auth_source_name(profile): """Return a *non-secret* name for a profile's token source, or None. For env auth this is the env var name; for keychain auth, ``keychain:``. Safe to surface in profile metadata (never the token value). """ if not profile: return None auth = profile.get("auth") if not isinstance(auth, dict): return None if auth.get("type") == "env": return auth.get("name") if auth.get("type") == "keychain": return f"keychain:{auth.get('id')}" return None def _keychain_token(item_id): """Read a token from the macOS keychain by service *item_id*. Returns the secret string, or None if it cannot be found. Never logs the value; failures are swallowed so the caller can raise a safe error. """ try: proc = subprocess.run( ["security", "find-generic-password", "-s", item_id, "-w"], capture_output=True, text=True, ) except (OSError, subprocess.SubprocessError): return None if proc.returncode != 0: return None return proc.stdout.strip() or None def resolve_token(profile, keychain_lookup=_keychain_token): """Resolve the token for *profile* via its ``auth`` reference. Returns None when *profile* is None or has no ``auth``. Raises :class:`ConfigError` when the reference cannot be resolved (env var unset or keychain item missing). Never accepts an inline token and never logs, echoes, or includes the secret *value* in any error. *keychain_lookup* is injectable for testing. """ if not profile: return None auth = profile.get("auth") if not isinstance(auth, dict): return None atype = auth.get("type") if atype == "env": env_name = auth.get("name") value = os.environ.get(env_name) if env_name else None if not value: raise ConfigError( f"auth env var '{env_name}' referenced by the profile is not set" ) return value if atype == "keychain": item_id = auth.get("id") value = keychain_lookup(item_id) if item_id else None if not value: raise ConfigError( f"keychain item '{item_id}' referenced by the profile was not found" ) return value raise ConfigError(f"unsupported auth type {atype!r} in the selected profile") # ── Config authoring helpers (used by the interactive menu; pure + testable) ──── def is_valid_profile_name(name): """True if *name* is a safe profile key (alnum, dot, dash, underscore).""" return bool(name) and bool(_PROFILE_NAME_RE.match(name)) def keychain_auth(item_id): """Build a keychain auth reference.""" return {"type": "keychain", "id": item_id} def env_auth(var_name): """Build an env auth reference.""" return {"type": "env", "name": var_name} def build_profile(*, base_url, auth, username=None, default_owner=None, default_repo=None, execution_profile=None): """Assemble a profile dict, omitting empty optional fields. No secrets.""" profile = {"base_url": base_url, "auth": auth} if username: profile["username"] = username if default_owner: profile["default_owner"] = default_owner if default_repo: profile["default_repo"] = default_repo if execution_profile: profile["execution_profile"] = execution_profile return profile def empty_config(): """Return a fresh, valid canonical config with no profiles.""" return {"version": SUPPORTED_VERSION, "profiles": {}} def validate_config(config): """Return a list of human-readable problems with *config* (empty = valid). Never includes secret material — profiles carry only auth *references*. """ problems = [] if not isinstance(config, dict): return ["config is not a JSON object"] version = config.get("version") if version is None: problems.append( f"missing required 'version' (expected one of {list(SUPPORTED_VERSIONS)})" ) elif version == 2: # v2 validation is all-or-nothing via the loader's invariants. try: _load_v2(config, "") except ConfigError as exc: problems.append(str(exc)) return problems elif version != SUPPORTED_VERSION: problems.append( f"unsupported version {version!r} (expected one of {list(SUPPORTED_VERSIONS)})" ) profiles = config.get("profiles") if not isinstance(profiles, dict): problems.append("missing 'profiles' object") return problems for name, profile in profiles.items(): if not is_valid_profile_name(name): problems.append(f"invalid profile name {name!r}") if not isinstance(profile, dict): problems.append(f"profile '{name}' is not an object") continue if not profile.get("base_url"): problems.append(f"profile '{name}' is missing 'base_url'") for secret_key in ("token", "password"): if secret_key in profile: problems.append( f"profile '{name}' has an inline '{secret_key}' (use an auth reference)" ) try: _validate_auth(name, profile.get("auth")) except ConfigError as exc: problems.append(str(exc)) else: if profile.get("auth") is None: problems.append(f"profile '{name}' is missing an 'auth' reference") return problems def add_profile(config, name, profile): """Return a copy of *config* with *profile* added under *name*. Preserves existing profiles. Raises :class:`ConfigError` on an invalid name, a duplicate name, or an invalid profile. """ if not is_valid_profile_name(name): raise ConfigError( f"invalid profile name {name!r}; use letters, digits, '.', '-', '_'" ) profiles = dict(config.get("profiles") or {}) if name in profiles: raise ConfigError(f"profile '{name}' already exists; edit or remove it first") candidate = {"version": config.get("version", SUPPORTED_VERSION), "profiles": {**profiles, name: profile}} # Validate just this profile (reuse select_profile's checks). select_profile(candidate, name) if not profile.get("base_url"): raise ConfigError(f"profile '{name}' is missing 'base_url'") return candidate def upsert_profile(config, name, profile): """Like :func:`add_profile` but replaces an existing profile (for edits).""" if not is_valid_profile_name(name): raise ConfigError(f"invalid profile name {name!r}") profiles = dict(config.get("profiles") or {}) profiles[name] = profile candidate = {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles} select_profile(candidate, name) return candidate def remove_profile(config, name): """Return a copy of *config* without profile *name*.""" profiles = dict(config.get("profiles") or {}) if name not in profiles: raise ConfigError(f"profile '{name}' not found") del profiles[name] return {"version": config.get("version", SUPPORTED_VERSION), "profiles": profiles} def save_config(config, path=None): """Atomically write *config* to *path* as pretty JSON, creating parent dirs. Writes a temp file in the same directory then ``os.replace``s it into place, so a crash mid-write never truncates an existing config. """ path = path or config_path() or DEFAULT_CONFIG_PATH directory = os.path.dirname(os.path.abspath(path)) os.makedirs(directory, exist_ok=True) fd, tmp = tempfile.mkstemp(dir=directory, prefix=".profiles-", suffix=".json") try: with os.fdopen(fd, "w", encoding="utf-8") as fh: json.dump(config, fh, indent=2, sort_keys=True) fh.write("\n") os.replace(tmp, path) except BaseException: try: os.unlink(tmp) except OSError: pass raise return path def server_command(): """Return (command, args) that launch this repo's MCP server.""" root = os.path.dirname(os.path.abspath(__file__)) python = os.path.join(root, "venv", "bin", "python3") if not os.path.exists(python): python = sys.executable return python, [os.path.join(root, "mcp_server.py")] def launcher_entry(profile_name, config_path=None): """Return a thin MCP launcher entry for *profile_name*. Contains only command/args and the two GITEA_MCP_* env vars — never a token or password. Suitable for Claude / Gemini / Codex ``mcpServers`` blocks. """ command, args = server_command() return { "gitea-tools": { "command": command, "args": args, "env": { "GITEA_MCP_CONFIG": config_path or DEFAULT_CONFIG_PATH, "GITEA_MCP_PROFILE": profile_name, }, } } def keychain_set(item_id, token, account=None, runner=subprocess.run): """Store *token* in the macOS keychain under service *item_id*. The token is passed to ``security`` as an argument only; it is never returned, printed, or logged here. *runner* is injectable for testing. Raises :class:`ConfigError` on failure (without echoing the token). """ if not item_id: raise ConfigError("keychain item id is required") if not token: raise ConfigError("refusing to store an empty token") account = account or os.environ.get("USER") or "gitea-tools" cmd = ["security", "add-generic-password", "-U", "-s", item_id, "-a", account, "-w", token] try: proc = runner(cmd, capture_output=True, text=True) except (OSError, subprocess.SubprocessError) as exc: raise ConfigError(f"could not run keychain store for '{item_id}'") from exc if getattr(proc, "returncode", 1) != 0: raise ConfigError(f"keychain store failed for item '{item_id}'") return item_id if __name__ == "__main__": # pragma: no cover - thin CLI dispatch if len(sys.argv) > 1 and sys.argv[1] == "menu": import gitea_config_menu raise SystemExit(gitea_config_menu.main(sys.argv[2:])) print("usage: python gitea_config.py menu", file=sys.stderr) raise SystemExit(2)