Merge pull request 'feat: profiles.json v2 parser with validation invariants (#103)' (#114) from feat/issue-103-profiles-v2-parser into master
This commit was merged in pull request #114.
This commit is contained in:
+255
-9
@@ -54,11 +54,61 @@ ENV_CONFIG_PATH = "GITEA_MCP_CONFIG"
|
|||||||
ENV_PROFILE = "GITEA_MCP_PROFILE"
|
ENV_PROFILE = "GITEA_MCP_PROFILE"
|
||||||
|
|
||||||
SUPPORTED_VERSION = 1
|
SUPPORTED_VERSION = 1
|
||||||
|
SUPPORTED_VERSIONS = (1, 2)
|
||||||
_AUTH_TYPES = ("keychain", "env")
|
_AUTH_TYPES = ("keychain", "env")
|
||||||
|
|
||||||
# Profile names go into env vars, keychain ids, and JSON keys — keep them tame.
|
# Profile names go into env vars, keychain ids, and JSON keys — keep them tame.
|
||||||
_PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
|
_PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
|
||||||
|
|
||||||
|
# v2 address segments (environment / service / identity) must be dot-free so
|
||||||
|
# the dotted profile address {env}.{service}.{identity} stays unambiguous.
|
||||||
|
_SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$")
|
||||||
|
|
||||||
|
# Placeholder usernames must never activate (fail closed until provisioned).
|
||||||
|
_TBD_RE = re.compile(r"(?i)^tbd(-|$)")
|
||||||
|
|
||||||
|
# Keys that would mean an inline secret wherever they appear.
|
||||||
|
_INLINE_SECRET_KEYS = ("token", "password", "secret")
|
||||||
|
|
||||||
|
# ── Minimal operation normalization (#103) ─────────────────────────────────────
|
||||||
|
# Only what the #103 invariants need. The full normalization table, deprecation
|
||||||
|
# handling, and enforcement test matrix belong to issue #106 — do not grow this
|
||||||
|
# beyond invariant safety here.
|
||||||
|
_MINIMAL_GITEA_OP_MAP = {
|
||||||
|
"read": "gitea.read",
|
||||||
|
"review": "gitea.pr.review",
|
||||||
|
"comment": "gitea.pr.comment",
|
||||||
|
"approve": "gitea.pr.approve",
|
||||||
|
"request_changes": "gitea.pr.request_changes",
|
||||||
|
"merge": "gitea.pr.merge",
|
||||||
|
"pr.create": "gitea.pr.create",
|
||||||
|
"branch.push": "gitea.branch.push",
|
||||||
|
}
|
||||||
|
_REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
|
||||||
|
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_op(service, op, addr):
|
||||||
|
"""Normalize *op* for *service*, or fail closed (#103 minimal subset).
|
||||||
|
|
||||||
|
- already namespaced for this service (``{service}.*``) → unchanged
|
||||||
|
- known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP``
|
||||||
|
- unqualified single-word ops on non-Gitea services → ``{service}.{op}``
|
||||||
|
- anything else (foreign prefixes, unknown unqualified names) → ConfigError
|
||||||
|
"""
|
||||||
|
if not isinstance(op, str) or not op:
|
||||||
|
raise ConfigError(f"identity '{addr}' has an empty or non-string operation")
|
||||||
|
if op.startswith(service + "."):
|
||||||
|
return op
|
||||||
|
if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP:
|
||||||
|
return _MINIMAL_GITEA_OP_MAP[op]
|
||||||
|
if service != "gitea" and "." not in op:
|
||||||
|
return f"{service}.{op}"
|
||||||
|
raise ConfigError(
|
||||||
|
f"identity '{addr}' has operation {op!r} that cannot be normalized "
|
||||||
|
f"safely for service '{service}' (fail closed; full table is issue #106)"
|
||||||
|
)
|
||||||
|
|
||||||
# Default canonical config location (one file shared by all LLM launchers).
|
# Default canonical config location (one file shared by all LLM launchers).
|
||||||
DEFAULT_CONFIG_PATH = os.path.join(
|
DEFAULT_CONFIG_PATH = os.path.join(
|
||||||
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
|
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
|
||||||
@@ -108,16 +158,193 @@ def load_config(path=None):
|
|||||||
) from None
|
) from None
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
raise ConfigError(f"could not read {path}: {exc.strerror}") from None
|
raise ConfigError(f"could not read {path}: {exc.strerror}") from None
|
||||||
if not isinstance(data, dict) or not isinstance(data.get("profiles"), dict):
|
if not isinstance(data, dict):
|
||||||
raise ConfigError(f"{path} must be a JSON object with a 'profiles' object")
|
raise ConfigError(f"{path} must be a JSON object")
|
||||||
version = data.get("version", SUPPORTED_VERSION)
|
version = data.get("version")
|
||||||
|
if version is None:
|
||||||
|
# Fail closed (#103): an unversioned config is ambiguous between v1 and
|
||||||
|
# v2 shapes, so it is refused rather than guessed.
|
||||||
|
raise ConfigError(
|
||||||
|
f"{path} is missing the required 'version' field; "
|
||||||
|
f"expected one of {list(SUPPORTED_VERSIONS)}"
|
||||||
|
)
|
||||||
|
if version == 2:
|
||||||
|
return _load_v2(data, path)
|
||||||
if version != SUPPORTED_VERSION:
|
if version != SUPPORTED_VERSION:
|
||||||
raise ConfigError(
|
raise ConfigError(
|
||||||
f"{path} has unsupported version {version!r}; expected {SUPPORTED_VERSION}"
|
f"{path} has unsupported version {version!r}; "
|
||||||
|
f"expected one of {list(SUPPORTED_VERSIONS)}"
|
||||||
)
|
)
|
||||||
|
if not isinstance(data.get("profiles"), dict):
|
||||||
|
raise ConfigError(f"{path} must be a JSON object with a 'profiles' object")
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
# ── profiles.json version 2 (#103): environment → service → identity ──────────
|
||||||
|
# v2 files are validated and *flattened* at load time into the same
|
||||||
|
# {"profiles": {...}} shape v1 consumers already understand, keyed by the
|
||||||
|
# canonical dotted address {environment}.{service}.{identity}. Two extra
|
||||||
|
# top-level keys are carried: "aliases" (exact-name compatibility selectors)
|
||||||
|
# and "unavailable" (addresses that fail closed at selection, e.g. TBD users).
|
||||||
|
|
||||||
|
def _validate_identity_auth(addr, auth):
|
||||||
|
"""Require and validate an identity 'auth' reference. Rejects inline secrets."""
|
||||||
|
if auth is None:
|
||||||
|
raise ConfigError(f"identity '{addr}' is missing an 'auth' reference")
|
||||||
|
if not isinstance(auth, dict):
|
||||||
|
raise ConfigError(f"identity '{addr}' has a non-object 'auth'")
|
||||||
|
for key in _INLINE_SECRET_KEYS:
|
||||||
|
if key in auth:
|
||||||
|
raise ConfigError(
|
||||||
|
f"identity '{addr}' auth must not contain an inline '{key}'; "
|
||||||
|
"store secrets in the keychain and reference them by id"
|
||||||
|
)
|
||||||
|
_validate_auth(addr, auth)
|
||||||
|
|
||||||
|
|
||||||
|
def _flatten_identity(env_name, svc_name, svc, ident_name, ident):
|
||||||
|
"""Validate one v2 identity and return (addr, flattened_profile).
|
||||||
|
|
||||||
|
The flattened profile is v1-shaped (base_url/auth/username/defaults) plus
|
||||||
|
v2 metadata (profile_path, environment, service, identity, role) and
|
||||||
|
normalized operation lists. Raises ConfigError on any invariant violation.
|
||||||
|
"""
|
||||||
|
addr = f"{env_name}.{svc_name}.{ident_name}"
|
||||||
|
if not isinstance(ident, dict):
|
||||||
|
raise ConfigError(f"identity '{addr}' must be a JSON object")
|
||||||
|
for key in _INLINE_SECRET_KEYS:
|
||||||
|
if key in ident:
|
||||||
|
raise ConfigError(
|
||||||
|
f"identity '{addr}' must not contain an inline '{key}'; "
|
||||||
|
"use an 'auth' reference instead"
|
||||||
|
)
|
||||||
|
_validate_identity_auth(addr, ident.get("auth"))
|
||||||
|
|
||||||
|
base_url = ident.get("base_url") or svc.get("base_url")
|
||||||
|
if not base_url:
|
||||||
|
raise ConfigError(
|
||||||
|
f"identity '{addr}' has no 'base_url' at identity or service level"
|
||||||
|
)
|
||||||
|
|
||||||
|
allowed = ident.get("allowed_operations") or []
|
||||||
|
forbidden = ident.get("forbidden_operations") or []
|
||||||
|
if not isinstance(allowed, list) or not isinstance(forbidden, list):
|
||||||
|
raise ConfigError(f"identity '{addr}' operation fields must be lists")
|
||||||
|
allowed_n = {_normalize_op(svc_name, op, addr) for op in allowed}
|
||||||
|
forbidden_n = {_normalize_op(svc_name, op, addr) for op in forbidden}
|
||||||
|
|
||||||
|
# Reviewer-identity deadlock rule (#100/#103): an identity that may approve
|
||||||
|
# or merge PRs must explicitly forbid creating PRs and pushing branches,
|
||||||
|
# so the reviewer identity can never author the PR it must review.
|
||||||
|
if allowed_n & _REVIEW_MERGE_OPS:
|
||||||
|
missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n)
|
||||||
|
if missing:
|
||||||
|
raise ConfigError(
|
||||||
|
f"identity '{addr}' allows PR approve/merge but does not forbid "
|
||||||
|
f"{missing}; reviewer identities must forbid gitea.pr.create and "
|
||||||
|
"gitea.branch.push (reviewer-identity deadlock rule)"
|
||||||
|
)
|
||||||
|
|
||||||
|
profile = {
|
||||||
|
"profile_path": addr,
|
||||||
|
"environment": env_name,
|
||||||
|
"service": svc_name,
|
||||||
|
"identity": ident_name,
|
||||||
|
"base_url": base_url,
|
||||||
|
"auth": ident["auth"],
|
||||||
|
"allowed_operations": sorted(allowed_n),
|
||||||
|
"forbidden_operations": sorted(forbidden_n),
|
||||||
|
}
|
||||||
|
# Service-level defaults inherit unless the identity overrides them.
|
||||||
|
for key in ("default_owner", "default_repo", "default_org"):
|
||||||
|
value = ident.get(key, svc.get(key))
|
||||||
|
if value:
|
||||||
|
profile[key] = value
|
||||||
|
for key in ("role", "username", "execution_profile", "audit_label"):
|
||||||
|
if ident.get(key):
|
||||||
|
profile[key] = ident[key]
|
||||||
|
return addr, profile
|
||||||
|
|
||||||
|
|
||||||
|
def _load_v2(data, path):
|
||||||
|
"""Validate a v2 config and return the flattened, resolvable structure."""
|
||||||
|
environments = data.get("environments")
|
||||||
|
if not isinstance(environments, dict) or not environments:
|
||||||
|
raise ConfigError(
|
||||||
|
f"{path} version 2 config requires a non-empty 'environments' object"
|
||||||
|
)
|
||||||
|
profiles = {}
|
||||||
|
unavailable = {}
|
||||||
|
for env_name, env in environments.items():
|
||||||
|
if not _SEGMENT_RE.match(env_name or ""):
|
||||||
|
raise ConfigError(f"invalid environment name {env_name!r} (no dots)")
|
||||||
|
if not isinstance(env, dict):
|
||||||
|
raise ConfigError(f"environment '{env_name}' must be a JSON object")
|
||||||
|
services = env.get("services")
|
||||||
|
if not isinstance(services, dict) or not services:
|
||||||
|
raise ConfigError(
|
||||||
|
f"environment '{env_name}' requires a non-empty 'services' object"
|
||||||
|
)
|
||||||
|
for svc_name, svc in services.items():
|
||||||
|
if not _SEGMENT_RE.match(svc_name or ""):
|
||||||
|
raise ConfigError(
|
||||||
|
f"invalid service name {svc_name!r} in '{env_name}' (no dots)"
|
||||||
|
)
|
||||||
|
if not isinstance(svc, dict):
|
||||||
|
raise ConfigError(
|
||||||
|
f"service '{env_name}.{svc_name}' must be a JSON object"
|
||||||
|
)
|
||||||
|
identities = svc.get("identities")
|
||||||
|
if not isinstance(identities, dict) or not identities:
|
||||||
|
raise ConfigError(
|
||||||
|
f"service '{env_name}.{svc_name}' requires a non-empty "
|
||||||
|
"'identities' object"
|
||||||
|
)
|
||||||
|
for ident_name, ident in identities.items():
|
||||||
|
if not _SEGMENT_RE.match(ident_name or ""):
|
||||||
|
raise ConfigError(
|
||||||
|
f"invalid identity name {ident_name!r} in "
|
||||||
|
f"'{env_name}.{svc_name}' (no dots)"
|
||||||
|
)
|
||||||
|
addr, profile = _flatten_identity(
|
||||||
|
env_name, svc_name, svc, ident_name, ident
|
||||||
|
)
|
||||||
|
username = profile.get("username") or ""
|
||||||
|
if _TBD_RE.match(username):
|
||||||
|
# Fail closed at selection, without blocking every other
|
||||||
|
# identity in the file (see #103 acceptance criteria).
|
||||||
|
unavailable[addr] = (
|
||||||
|
f"identity '{addr}' username {username!r} is a TBD "
|
||||||
|
"placeholder; provision the account before use "
|
||||||
|
"(fail closed)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
profiles[addr] = profile
|
||||||
|
|
||||||
|
aliases = data.get("aliases") or {}
|
||||||
|
if not isinstance(aliases, dict):
|
||||||
|
raise ConfigError(f"{path} 'aliases' must be a JSON object")
|
||||||
|
known = set(profiles) | set(unavailable)
|
||||||
|
for alias, target in aliases.items():
|
||||||
|
if not isinstance(target, str) or not target:
|
||||||
|
raise ConfigError(f"alias '{alias}' target must be a non-empty string")
|
||||||
|
if alias in known and alias != target:
|
||||||
|
raise ConfigError(
|
||||||
|
f"selector '{alias}' is both an alias and a profile address "
|
||||||
|
"with a different target (conflicting selector; fail closed)"
|
||||||
|
)
|
||||||
|
if target not in known:
|
||||||
|
raise ConfigError(
|
||||||
|
f"alias '{alias}' points to unknown profile '{target}'"
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"version": 2,
|
||||||
|
"profiles": profiles,
|
||||||
|
"aliases": dict(aliases),
|
||||||
|
"unavailable": unavailable,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _validate_auth(name, auth):
|
def _validate_auth(name, auth):
|
||||||
"""Validate a profile's optional ``auth`` reference. Never echoes secrets."""
|
"""Validate a profile's optional ``auth`` reference. Never echoes secrets."""
|
||||||
if auth is None:
|
if auth is None:
|
||||||
@@ -147,18 +374,25 @@ def select_profile(config, name=None):
|
|||||||
if config is None:
|
if config is None:
|
||||||
return None
|
return None
|
||||||
profiles = config.get("profiles", {})
|
profiles = config.get("profiles", {})
|
||||||
|
aliases = config.get("aliases") or {}
|
||||||
|
unavailable = config.get("unavailable") or {}
|
||||||
name = name or selected_profile_name()
|
name = name or selected_profile_name()
|
||||||
available = sorted(profiles)
|
available = sorted(set(profiles) | set(aliases))
|
||||||
if not name:
|
if not name:
|
||||||
raise ConfigError(
|
raise ConfigError(
|
||||||
f"{ENV_CONFIG_PATH} is set but {ENV_PROFILE} is not; "
|
f"{ENV_CONFIG_PATH} is set but {ENV_PROFILE} is not; "
|
||||||
f"available profiles: {available}"
|
f"available profiles: {available}"
|
||||||
)
|
)
|
||||||
if name not in profiles:
|
# Strict resolution order (#103): exact alias → exact profile address →
|
||||||
|
# fail closed. No fuzzy matching, no partial matches, no defaults.
|
||||||
|
resolved = aliases.get(name, name)
|
||||||
|
if resolved in unavailable:
|
||||||
|
raise ConfigError(unavailable[resolved])
|
||||||
|
if resolved not in profiles:
|
||||||
raise ConfigError(
|
raise ConfigError(
|
||||||
f"profile '{name}' not found in config; available profiles: {available}"
|
f"profile '{name}' not found in config; available profiles: {available}"
|
||||||
)
|
)
|
||||||
profile = profiles[name]
|
profile = profiles[resolved]
|
||||||
if not isinstance(profile, dict):
|
if not isinstance(profile, dict):
|
||||||
raise ConfigError(f"profile '{name}' must be a JSON object")
|
raise ConfigError(f"profile '{name}' must be a JSON object")
|
||||||
for secret_key in ("token", "password"):
|
for secret_key in ("token", "password"):
|
||||||
@@ -292,9 +526,21 @@ def validate_config(config):
|
|||||||
problems = []
|
problems = []
|
||||||
if not isinstance(config, dict):
|
if not isinstance(config, dict):
|
||||||
return ["config is not a JSON object"]
|
return ["config is not a JSON object"]
|
||||||
if config.get("version", SUPPORTED_VERSION) != SUPPORTED_VERSION:
|
version = config.get("version")
|
||||||
|
if version is None:
|
||||||
problems.append(
|
problems.append(
|
||||||
f"unsupported version {config.get('version')!r} (expected {SUPPORTED_VERSION})"
|
f"missing required 'version' (expected one of {list(SUPPORTED_VERSIONS)})"
|
||||||
|
)
|
||||||
|
elif version == 2:
|
||||||
|
# v2 validation is all-or-nothing via the loader's invariants.
|
||||||
|
try:
|
||||||
|
_load_v2(config, "<config>")
|
||||||
|
except ConfigError as exc:
|
||||||
|
problems.append(str(exc))
|
||||||
|
return problems
|
||||||
|
elif version != SUPPORTED_VERSION:
|
||||||
|
problems.append(
|
||||||
|
f"unsupported version {version!r} (expected one of {list(SUPPORTED_VERSIONS)})"
|
||||||
)
|
)
|
||||||
profiles = config.get("profiles")
|
profiles = config.get("profiles")
|
||||||
if not isinstance(profiles, dict):
|
if not isinstance(profiles, dict):
|
||||||
|
|||||||
@@ -127,11 +127,14 @@ class TestLoadSelect(_ConfigBase):
|
|||||||
gitea_config.resolve_profile()
|
gitea_config.resolve_profile()
|
||||||
self.assertIn("version", str(ctx.exception))
|
self.assertIn("version", str(ctx.exception))
|
||||||
|
|
||||||
def test_missing_version_defaults_ok(self):
|
def test_missing_version_fails_closed(self):
|
||||||
|
# Changed by #103: an unversioned config is ambiguous between the v1
|
||||||
|
# and v2 shapes, so the loader now refuses to guess.
|
||||||
self._write({"profiles": {"prgs": {"base_url": "https://x"}}})
|
self._write({"profiles": {"prgs": {"base_url": "https://x"}}})
|
||||||
with patch.dict(os.environ, self._env("prgs"), clear=True):
|
with patch.dict(os.environ, self._env("prgs"), clear=True):
|
||||||
self.assertEqual(
|
with self.assertRaises(gitea_config.ConfigError) as ctx:
|
||||||
gitea_config.resolve_profile()["base_url"], "https://x")
|
gitea_config.resolve_profile()
|
||||||
|
self.assertIn("version", str(ctx.exception))
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -0,0 +1,367 @@
|
|||||||
|
"""Tests for profiles.json version 2 (#103): environment → service → identity.
|
||||||
|
|
||||||
|
Covers: v2 loading + flattening, dotted-path and alias resolution with strict
|
||||||
|
order (exact alias → exact address → fail closed), legacy v1 names via aliases,
|
||||||
|
fail-closed validation (missing/unknown version, malformed hierarchy, ambiguous
|
||||||
|
selectors, TBD-* usernames, reviewer-identity deadlock rule, inline secrets,
|
||||||
|
missing auth, unnormalizable operations), service-default inheritance, and that
|
||||||
|
flattened v2 profiles still work with resolve_token. No network, no secrets.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import copy
|
||||||
|
import json
|
||||||
|
import tempfile
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
||||||
|
|
||||||
|
import gitea_config # noqa: E402
|
||||||
|
|
||||||
|
FAKE_TOKEN = "fake-token-for-tests" # not a real credential
|
||||||
|
|
||||||
|
|
||||||
|
def v2_config():
|
||||||
|
"""A fresh, valid v2 config exercising both environments."""
|
||||||
|
return {
|
||||||
|
"version": 2,
|
||||||
|
"environments": {
|
||||||
|
"prgs": {
|
||||||
|
"services": {
|
||||||
|
"gitea": {
|
||||||
|
"base_url": "https://gitea.prgs.cc",
|
||||||
|
"default_owner": "Scaled-Tech-Consulting",
|
||||||
|
"identities": {
|
||||||
|
"author": {
|
||||||
|
"role": "author",
|
||||||
|
"username": "jcwalker3",
|
||||||
|
"auth": {"type": "keychain",
|
||||||
|
"id": "prgs.gitea.author.token"},
|
||||||
|
"execution_profile": "prgs-author",
|
||||||
|
"audit_label": "prgs-author",
|
||||||
|
"allowed_operations": [
|
||||||
|
"gitea.read", "gitea.issue.create",
|
||||||
|
"gitea.branch.push", "gitea.pr.create",
|
||||||
|
],
|
||||||
|
"forbidden_operations": [
|
||||||
|
"gitea.pr.approve", "gitea.pr.merge",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
"reviewer": {
|
||||||
|
"role": "reviewer",
|
||||||
|
"username": "sysadmin",
|
||||||
|
"auth": {"type": "env",
|
||||||
|
"name": "PRGS_REVIEWER_TOKEN"},
|
||||||
|
"execution_profile": "prgs-reviewer",
|
||||||
|
"audit_label": "prgs-reviewer",
|
||||||
|
"default_repo": "Gitea-Tools",
|
||||||
|
"allowed_operations": [
|
||||||
|
"read", "review", "comment", "approve",
|
||||||
|
"request_changes", "merge",
|
||||||
|
],
|
||||||
|
"forbidden_operations": [
|
||||||
|
"gitea.pr.create", "gitea.branch.push",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"mdcps": {
|
||||||
|
"services": {
|
||||||
|
"gitea": {
|
||||||
|
"base_url": "https://gitea.dadeschools.net",
|
||||||
|
"identities": {
|
||||||
|
"author": {
|
||||||
|
"role": "author",
|
||||||
|
"username": "913443",
|
||||||
|
"auth": {"type": "keychain",
|
||||||
|
"id": "mdcps.gitea.author.token"},
|
||||||
|
"allowed_operations": ["gitea.read"],
|
||||||
|
"forbidden_operations": [
|
||||||
|
"gitea.pr.approve", "gitea.pr.merge",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
"reviewer": {
|
||||||
|
"role": "reviewer",
|
||||||
|
"username": "TBD-second-mdcps-user",
|
||||||
|
"auth": {"type": "keychain",
|
||||||
|
"id": "mdcps.gitea.reviewer.token"},
|
||||||
|
"allowed_operations": [
|
||||||
|
"gitea.read", "gitea.pr.approve",
|
||||||
|
"gitea.pr.merge",
|
||||||
|
],
|
||||||
|
"forbidden_operations": [
|
||||||
|
"gitea.pr.create", "gitea.branch.push",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"jenkins": {
|
||||||
|
"base_url": "https://jenkins.dadeschools.net",
|
||||||
|
"identities": {
|
||||||
|
"reader": {
|
||||||
|
"role": "reader",
|
||||||
|
"username": "svc-jenkins-read",
|
||||||
|
"auth": {"type": "keychain",
|
||||||
|
"id": "mdcps.jenkins.reader.token"},
|
||||||
|
"allowed_operations": ["read", "jenkins.build.read"],
|
||||||
|
"forbidden_operations": ["jenkins.build.trigger"],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"aliases": {
|
||||||
|
"mdcps": "mdcps.gitea.author",
|
||||||
|
"prgs-author": "prgs.gitea.author",
|
||||||
|
"prgs-reviewer": "prgs.gitea.reviewer",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class _V2Base(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self._dir = tempfile.TemporaryDirectory()
|
||||||
|
self.path = os.path.join(self._dir.name, "profiles.json")
|
||||||
|
self._write(v2_config())
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self._dir.cleanup()
|
||||||
|
|
||||||
|
def _write(self, obj):
|
||||||
|
with open(self.path, "w", encoding="utf-8") as fh:
|
||||||
|
fh.write(obj if isinstance(obj, str) else json.dumps(obj))
|
||||||
|
|
||||||
|
def _env(self, profile, **extra):
|
||||||
|
env = {"GITEA_MCP_CONFIG": self.path, "GITEA_MCP_PROFILE": profile}
|
||||||
|
env.update(extra)
|
||||||
|
return env
|
||||||
|
|
||||||
|
def _resolve(self, profile):
|
||||||
|
with patch.dict(os.environ, self._env(profile), clear=True):
|
||||||
|
return gitea_config.resolve_profile()
|
||||||
|
|
||||||
|
def _load_raises(self, mutate, needle):
|
||||||
|
cfg = v2_config()
|
||||||
|
mutate(cfg)
|
||||||
|
self._write(cfg)
|
||||||
|
with patch.dict(os.environ, self._env("prgs.gitea.author"), clear=True):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError) as ctx:
|
||||||
|
gitea_config.resolve_profile()
|
||||||
|
self.assertIn(needle, str(ctx.exception))
|
||||||
|
return str(ctx.exception)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Happy path: loading, dotted paths, aliases, inheritance
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestV2Loads(_V2Base):
|
||||||
|
|
||||||
|
def test_dotted_path_resolution(self):
|
||||||
|
p = self._resolve("prgs.gitea.author")
|
||||||
|
self.assertEqual(p["base_url"], "https://gitea.prgs.cc")
|
||||||
|
self.assertEqual(p["username"], "jcwalker3")
|
||||||
|
self.assertEqual(p["profile_path"], "prgs.gitea.author")
|
||||||
|
self.assertEqual(p["environment"], "prgs")
|
||||||
|
self.assertEqual(p["service"], "gitea")
|
||||||
|
self.assertEqual(p["identity"], "author")
|
||||||
|
self.assertEqual(p["role"], "author")
|
||||||
|
|
||||||
|
def test_alias_resolution_legacy_names(self):
|
||||||
|
for legacy, addr in (
|
||||||
|
("mdcps", "mdcps.gitea.author"),
|
||||||
|
("prgs-author", "prgs.gitea.author"),
|
||||||
|
("prgs-reviewer", "prgs.gitea.reviewer"),
|
||||||
|
):
|
||||||
|
p = self._resolve(legacy)
|
||||||
|
self.assertEqual(p["profile_path"], addr, legacy)
|
||||||
|
|
||||||
|
def test_service_defaults_inherit_and_identity_overrides(self):
|
||||||
|
author = self._resolve("prgs.gitea.author")
|
||||||
|
self.assertEqual(author["default_owner"], "Scaled-Tech-Consulting")
|
||||||
|
self.assertNotIn("default_repo", author)
|
||||||
|
reviewer = self._resolve("prgs.gitea.reviewer")
|
||||||
|
self.assertEqual(reviewer["default_owner"], "Scaled-Tech-Consulting")
|
||||||
|
self.assertEqual(reviewer["default_repo"], "Gitea-Tools")
|
||||||
|
|
||||||
|
def test_unqualified_ops_normalized_minimally(self):
|
||||||
|
reviewer = self._resolve("prgs.gitea.reviewer")
|
||||||
|
self.assertIn("gitea.pr.merge", reviewer["allowed_operations"])
|
||||||
|
self.assertIn("gitea.read", reviewer["allowed_operations"])
|
||||||
|
self.assertNotIn("merge", reviewer["allowed_operations"])
|
||||||
|
jenkins = self._resolve("mdcps.jenkins.reader")
|
||||||
|
self.assertIn("jenkins.read", jenkins["allowed_operations"])
|
||||||
|
self.assertIn("jenkins.build.read", jenkins["allowed_operations"])
|
||||||
|
|
||||||
|
def test_resolve_token_works_on_flattened_profile(self):
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
self._env("prgs.gitea.reviewer", PRGS_REVIEWER_TOKEN=FAKE_TOKEN),
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
profile = gitea_config.resolve_profile()
|
||||||
|
self.assertEqual(gitea_config.resolve_token(profile), FAKE_TOKEN)
|
||||||
|
|
||||||
|
def test_auth_source_name_on_flattened_profile(self):
|
||||||
|
p = self._resolve("mdcps.gitea.author")
|
||||||
|
self.assertEqual(
|
||||||
|
gitea_config.auth_source_name(p), "keychain:mdcps.gitea.author.token"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_v1_config_still_loads(self):
|
||||||
|
self._write({
|
||||||
|
"version": 1,
|
||||||
|
"profiles": {"prgs": {
|
||||||
|
"base_url": "https://gitea.prgs.cc",
|
||||||
|
"auth": {"type": "keychain", "id": "prgs-gitea-token"},
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
p = self._resolve("prgs")
|
||||||
|
self.assertEqual(p["base_url"], "https://gitea.prgs.cc")
|
||||||
|
|
||||||
|
def test_validate_config_accepts_valid_v2(self):
|
||||||
|
self.assertEqual(gitea_config.validate_config(v2_config()), [])
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fail-closed: selectors
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestV2Selectors(_V2Base):
|
||||||
|
|
||||||
|
def test_unknown_selector_fails_closed(self):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError) as ctx:
|
||||||
|
self._resolve("prgs.gitea") # partial address — no fuzzy matching
|
||||||
|
self.assertIn("not found", str(ctx.exception))
|
||||||
|
|
||||||
|
def test_no_fuzzy_matching_on_near_miss(self):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError):
|
||||||
|
self._resolve("prgs-reviewers")
|
||||||
|
|
||||||
|
def test_conflicting_alias_and_address_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
cfg["aliases"]["prgs.gitea.author"] = "prgs.gitea.reviewer"
|
||||||
|
self._load_raises(mutate, "conflicting selector")
|
||||||
|
|
||||||
|
def test_alias_to_unknown_target_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
cfg["aliases"]["ghost"] = "prgs.gitea.nope"
|
||||||
|
self._load_raises(mutate, "unknown profile")
|
||||||
|
|
||||||
|
def test_tbd_username_fails_closed_on_selection(self):
|
||||||
|
with self.assertRaises(gitea_config.ConfigError) as ctx:
|
||||||
|
self._resolve("mdcps.gitea.reviewer")
|
||||||
|
msg = str(ctx.exception)
|
||||||
|
self.assertIn("TBD", msg)
|
||||||
|
self.assertIn("provision", msg)
|
||||||
|
|
||||||
|
def test_tbd_identity_does_not_block_other_identities(self):
|
||||||
|
# Same file contains the TBD reviewer; author still resolves.
|
||||||
|
p = self._resolve("mdcps.gitea.author")
|
||||||
|
self.assertEqual(p["username"], "913443")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fail-closed: structure and versions
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestV2Structure(_V2Base):
|
||||||
|
|
||||||
|
def test_missing_version_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
del cfg["version"]
|
||||||
|
self._load_raises(mutate, "version")
|
||||||
|
|
||||||
|
def test_unknown_version_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
cfg["version"] = 3
|
||||||
|
self._load_raises(mutate, "unsupported version")
|
||||||
|
|
||||||
|
def test_missing_environments_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
del cfg["environments"]
|
||||||
|
self._load_raises(mutate, "environments")
|
||||||
|
|
||||||
|
def test_malformed_environment_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
cfg["environments"]["prgs"] = "not-an-object"
|
||||||
|
self._load_raises(mutate, "must be a JSON object")
|
||||||
|
|
||||||
|
def test_missing_services_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
cfg["environments"]["prgs"]["services"] = {}
|
||||||
|
self._load_raises(mutate, "services")
|
||||||
|
|
||||||
|
def test_missing_identities_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
cfg["environments"]["prgs"]["services"]["gitea"]["identities"] = {}
|
||||||
|
self._load_raises(mutate, "identities")
|
||||||
|
|
||||||
|
def test_dotted_segment_name_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
envs = cfg["environments"]
|
||||||
|
envs["bad.env"] = copy.deepcopy(envs["prgs"])
|
||||||
|
self._load_raises(mutate, "invalid environment name")
|
||||||
|
|
||||||
|
def test_missing_base_url_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
svc = cfg["environments"]["prgs"]["services"]["gitea"]
|
||||||
|
del svc["base_url"]
|
||||||
|
self._load_raises(mutate, "base_url")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Fail-closed: identity invariants
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
class TestV2IdentityInvariants(_V2Base):
|
||||||
|
|
||||||
|
def _ident(self, cfg, addr="prgs.gitea.author"):
|
||||||
|
env, svc, ident = addr.split(".")
|
||||||
|
return cfg["environments"][env]["services"][svc]["identities"][ident]
|
||||||
|
|
||||||
|
def test_missing_auth_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
del self._ident(cfg)["auth"]
|
||||||
|
self._load_raises(mutate, "missing an 'auth' reference")
|
||||||
|
|
||||||
|
def test_inline_secret_in_identity_rejected(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
self._ident(cfg)["token"] = "oops-not-a-real-secret"
|
||||||
|
msg = self._load_raises(mutate, "inline 'token'")
|
||||||
|
self.assertNotIn("oops-not-a-real-secret", msg)
|
||||||
|
|
||||||
|
def test_inline_secret_in_auth_rejected(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
self._ident(cfg)["auth"]["password"] = "oops-not-a-real-secret"
|
||||||
|
msg = self._load_raises(mutate, "inline 'password'")
|
||||||
|
self.assertNotIn("oops-not-a-real-secret", msg)
|
||||||
|
|
||||||
|
def test_reviewer_deadlock_invariant_enforced(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
reviewer = self._ident(cfg, "prgs.gitea.reviewer")
|
||||||
|
reviewer["forbidden_operations"] = [] # can approve/merge AND create
|
||||||
|
msg = self._load_raises(mutate, "deadlock")
|
||||||
|
self.assertIn("gitea.pr.create", msg)
|
||||||
|
|
||||||
|
def test_reviewer_deadlock_applies_to_unqualified_merge(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
author = self._ident(cfg)
|
||||||
|
author["allowed_operations"] = ["merge"] # normalized to gitea.pr.merge
|
||||||
|
author["forbidden_operations"] = []
|
||||||
|
self._load_raises(mutate, "deadlock")
|
||||||
|
|
||||||
|
def test_unnormalizable_operation_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
self._ident(cfg)["allowed_operations"] = ["frobnicate"]
|
||||||
|
self._load_raises(mutate, "cannot be normalized")
|
||||||
|
|
||||||
|
def test_foreign_namespace_operation_fails_closed(self):
|
||||||
|
def mutate(cfg):
|
||||||
|
reader = self._ident(cfg, "mdcps.jenkins.reader")
|
||||||
|
reader["allowed_operations"] = ["gitea.pr.merge"]
|
||||||
|
self._load_raises(mutate, "cannot be normalized")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user