diff --git a/gitea_config.py b/gitea_config.py index 69ac7d1..82e9aa7 100644 --- a/gitea_config.py +++ b/gitea_config.py @@ -54,11 +54,61 @@ ENV_CONFIG_PATH = "GITEA_MCP_CONFIG" ENV_PROFILE = "GITEA_MCP_PROFILE" SUPPORTED_VERSION = 1 +SUPPORTED_VERSIONS = (1, 2) _AUTH_TYPES = ("keychain", "env") # Profile names go into env vars, keychain ids, and JSON keys — keep them tame. _PROFILE_NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$") +# v2 address segments (environment / service / identity) must be dot-free so +# the dotted profile address {env}.{service}.{identity} stays unambiguous. +_SEGMENT_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]*$") + +# Placeholder usernames must never activate (fail closed until provisioned). +_TBD_RE = re.compile(r"(?i)^tbd(-|$)") + +# Keys that would mean an inline secret wherever they appear. +_INLINE_SECRET_KEYS = ("token", "password", "secret") + +# ── Minimal operation normalization (#103) ───────────────────────────────────── +# Only what the #103 invariants need. The full normalization table, deprecation +# handling, and enforcement test matrix belong to issue #106 — do not grow this +# beyond invariant safety here. +_MINIMAL_GITEA_OP_MAP = { + "read": "gitea.read", + "review": "gitea.pr.review", + "comment": "gitea.pr.comment", + "approve": "gitea.pr.approve", + "request_changes": "gitea.pr.request_changes", + "merge": "gitea.pr.merge", + "pr.create": "gitea.pr.create", + "branch.push": "gitea.branch.push", +} +_REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"}) +_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"}) + + +def _normalize_op(service, op, addr): + """Normalize *op* for *service*, or fail closed (#103 minimal subset). + + - already namespaced for this service (``{service}.*``) → unchanged + - known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP`` + - unqualified single-word ops on non-Gitea services → ``{service}.{op}`` + - anything else (foreign prefixes, unknown unqualified names) → ConfigError + """ + if not isinstance(op, str) or not op: + raise ConfigError(f"identity '{addr}' has an empty or non-string operation") + if op.startswith(service + "."): + return op + if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP: + return _MINIMAL_GITEA_OP_MAP[op] + if service != "gitea" and "." not in op: + return f"{service}.{op}" + raise ConfigError( + f"identity '{addr}' has operation {op!r} that cannot be normalized " + f"safely for service '{service}' (fail closed; full table is issue #106)" + ) + # Default canonical config location (one file shared by all LLM launchers). DEFAULT_CONFIG_PATH = os.path.join( os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json" @@ -108,16 +158,193 @@ def load_config(path=None): ) from None except OSError as exc: raise ConfigError(f"could not read {path}: {exc.strerror}") from None - if not isinstance(data, dict) or not isinstance(data.get("profiles"), dict): - raise ConfigError(f"{path} must be a JSON object with a 'profiles' object") - version = data.get("version", SUPPORTED_VERSION) + if not isinstance(data, dict): + raise ConfigError(f"{path} must be a JSON object") + version = data.get("version") + if version is None: + # Fail closed (#103): an unversioned config is ambiguous between v1 and + # v2 shapes, so it is refused rather than guessed. + raise ConfigError( + f"{path} is missing the required 'version' field; " + f"expected one of {list(SUPPORTED_VERSIONS)}" + ) + if version == 2: + return _load_v2(data, path) if version != SUPPORTED_VERSION: raise ConfigError( - f"{path} has unsupported version {version!r}; expected {SUPPORTED_VERSION}" + f"{path} has unsupported version {version!r}; " + f"expected one of {list(SUPPORTED_VERSIONS)}" ) + if not isinstance(data.get("profiles"), dict): + raise ConfigError(f"{path} must be a JSON object with a 'profiles' object") return data +# ── profiles.json version 2 (#103): environment → service → identity ────────── +# v2 files are validated and *flattened* at load time into the same +# {"profiles": {...}} shape v1 consumers already understand, keyed by the +# canonical dotted address {environment}.{service}.{identity}. Two extra +# top-level keys are carried: "aliases" (exact-name compatibility selectors) +# and "unavailable" (addresses that fail closed at selection, e.g. TBD users). + +def _validate_identity_auth(addr, auth): + """Require and validate an identity 'auth' reference. Rejects inline secrets.""" + if auth is None: + raise ConfigError(f"identity '{addr}' is missing an 'auth' reference") + if not isinstance(auth, dict): + raise ConfigError(f"identity '{addr}' has a non-object 'auth'") + for key in _INLINE_SECRET_KEYS: + if key in auth: + raise ConfigError( + f"identity '{addr}' auth must not contain an inline '{key}'; " + "store secrets in the keychain and reference them by id" + ) + _validate_auth(addr, auth) + + +def _flatten_identity(env_name, svc_name, svc, ident_name, ident): + """Validate one v2 identity and return (addr, flattened_profile). + + The flattened profile is v1-shaped (base_url/auth/username/defaults) plus + v2 metadata (profile_path, environment, service, identity, role) and + normalized operation lists. Raises ConfigError on any invariant violation. + """ + addr = f"{env_name}.{svc_name}.{ident_name}" + if not isinstance(ident, dict): + raise ConfigError(f"identity '{addr}' must be a JSON object") + for key in _INLINE_SECRET_KEYS: + if key in ident: + raise ConfigError( + f"identity '{addr}' must not contain an inline '{key}'; " + "use an 'auth' reference instead" + ) + _validate_identity_auth(addr, ident.get("auth")) + + base_url = ident.get("base_url") or svc.get("base_url") + if not base_url: + raise ConfigError( + f"identity '{addr}' has no 'base_url' at identity or service level" + ) + + allowed = ident.get("allowed_operations") or [] + forbidden = ident.get("forbidden_operations") or [] + if not isinstance(allowed, list) or not isinstance(forbidden, list): + raise ConfigError(f"identity '{addr}' operation fields must be lists") + allowed_n = {_normalize_op(svc_name, op, addr) for op in allowed} + forbidden_n = {_normalize_op(svc_name, op, addr) for op in forbidden} + + # Reviewer-identity deadlock rule (#100/#103): an identity that may approve + # or merge PRs must explicitly forbid creating PRs and pushing branches, + # so the reviewer identity can never author the PR it must review. + if allowed_n & _REVIEW_MERGE_OPS: + missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n) + if missing: + raise ConfigError( + f"identity '{addr}' allows PR approve/merge but does not forbid " + f"{missing}; reviewer identities must forbid gitea.pr.create and " + "gitea.branch.push (reviewer-identity deadlock rule)" + ) + + profile = { + "profile_path": addr, + "environment": env_name, + "service": svc_name, + "identity": ident_name, + "base_url": base_url, + "auth": ident["auth"], + "allowed_operations": sorted(allowed_n), + "forbidden_operations": sorted(forbidden_n), + } + # Service-level defaults inherit unless the identity overrides them. + for key in ("default_owner", "default_repo", "default_org"): + value = ident.get(key, svc.get(key)) + if value: + profile[key] = value + for key in ("role", "username", "execution_profile", "audit_label"): + if ident.get(key): + profile[key] = ident[key] + return addr, profile + + +def _load_v2(data, path): + """Validate a v2 config and return the flattened, resolvable structure.""" + environments = data.get("environments") + if not isinstance(environments, dict) or not environments: + raise ConfigError( + f"{path} version 2 config requires a non-empty 'environments' object" + ) + profiles = {} + unavailable = {} + for env_name, env in environments.items(): + if not _SEGMENT_RE.match(env_name or ""): + raise ConfigError(f"invalid environment name {env_name!r} (no dots)") + if not isinstance(env, dict): + raise ConfigError(f"environment '{env_name}' must be a JSON object") + services = env.get("services") + if not isinstance(services, dict) or not services: + raise ConfigError( + f"environment '{env_name}' requires a non-empty 'services' object" + ) + for svc_name, svc in services.items(): + if not _SEGMENT_RE.match(svc_name or ""): + raise ConfigError( + f"invalid service name {svc_name!r} in '{env_name}' (no dots)" + ) + if not isinstance(svc, dict): + raise ConfigError( + f"service '{env_name}.{svc_name}' must be a JSON object" + ) + identities = svc.get("identities") + if not isinstance(identities, dict) or not identities: + raise ConfigError( + f"service '{env_name}.{svc_name}' requires a non-empty " + "'identities' object" + ) + for ident_name, ident in identities.items(): + if not _SEGMENT_RE.match(ident_name or ""): + raise ConfigError( + f"invalid identity name {ident_name!r} in " + f"'{env_name}.{svc_name}' (no dots)" + ) + addr, profile = _flatten_identity( + env_name, svc_name, svc, ident_name, ident + ) + username = profile.get("username") or "" + if _TBD_RE.match(username): + # Fail closed at selection, without blocking every other + # identity in the file (see #103 acceptance criteria). + unavailable[addr] = ( + f"identity '{addr}' username {username!r} is a TBD " + "placeholder; provision the account before use " + "(fail closed)" + ) + else: + profiles[addr] = profile + + aliases = data.get("aliases") or {} + if not isinstance(aliases, dict): + raise ConfigError(f"{path} 'aliases' must be a JSON object") + known = set(profiles) | set(unavailable) + for alias, target in aliases.items(): + if not isinstance(target, str) or not target: + raise ConfigError(f"alias '{alias}' target must be a non-empty string") + if alias in known and alias != target: + raise ConfigError( + f"selector '{alias}' is both an alias and a profile address " + "with a different target (conflicting selector; fail closed)" + ) + if target not in known: + raise ConfigError( + f"alias '{alias}' points to unknown profile '{target}'" + ) + return { + "version": 2, + "profiles": profiles, + "aliases": dict(aliases), + "unavailable": unavailable, + } + + def _validate_auth(name, auth): """Validate a profile's optional ``auth`` reference. Never echoes secrets.""" if auth is None: @@ -147,18 +374,25 @@ def select_profile(config, name=None): if config is None: return None profiles = config.get("profiles", {}) + aliases = config.get("aliases") or {} + unavailable = config.get("unavailable") or {} name = name or selected_profile_name() - available = sorted(profiles) + available = sorted(set(profiles) | set(aliases)) if not name: raise ConfigError( f"{ENV_CONFIG_PATH} is set but {ENV_PROFILE} is not; " f"available profiles: {available}" ) - if name not in profiles: + # Strict resolution order (#103): exact alias → exact profile address → + # fail closed. No fuzzy matching, no partial matches, no defaults. + resolved = aliases.get(name, name) + if resolved in unavailable: + raise ConfigError(unavailable[resolved]) + if resolved not in profiles: raise ConfigError( f"profile '{name}' not found in config; available profiles: {available}" ) - profile = profiles[name] + profile = profiles[resolved] if not isinstance(profile, dict): raise ConfigError(f"profile '{name}' must be a JSON object") for secret_key in ("token", "password"): @@ -292,9 +526,21 @@ def validate_config(config): problems = [] if not isinstance(config, dict): return ["config is not a JSON object"] - if config.get("version", SUPPORTED_VERSION) != SUPPORTED_VERSION: + version = config.get("version") + if version is None: problems.append( - f"unsupported version {config.get('version')!r} (expected {SUPPORTED_VERSION})" + f"missing required 'version' (expected one of {list(SUPPORTED_VERSIONS)})" + ) + elif version == 2: + # v2 validation is all-or-nothing via the loader's invariants. + try: + _load_v2(config, "") + except ConfigError as exc: + problems.append(str(exc)) + return problems + elif version != SUPPORTED_VERSION: + problems.append( + f"unsupported version {version!r} (expected one of {list(SUPPORTED_VERSIONS)})" ) profiles = config.get("profiles") if not isinstance(profiles, dict): diff --git a/tests/test_config.py b/tests/test_config.py index 30dc6e3..fa02082 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -127,11 +127,14 @@ class TestLoadSelect(_ConfigBase): gitea_config.resolve_profile() self.assertIn("version", str(ctx.exception)) - def test_missing_version_defaults_ok(self): + def test_missing_version_fails_closed(self): + # Changed by #103: an unversioned config is ambiguous between the v1 + # and v2 shapes, so the loader now refuses to guess. self._write({"profiles": {"prgs": {"base_url": "https://x"}}}) with patch.dict(os.environ, self._env("prgs"), clear=True): - self.assertEqual( - gitea_config.resolve_profile()["base_url"], "https://x") + with self.assertRaises(gitea_config.ConfigError) as ctx: + gitea_config.resolve_profile() + self.assertIn("version", str(ctx.exception)) # --------------------------------------------------------------------------- diff --git a/tests/test_config_v2.py b/tests/test_config_v2.py new file mode 100644 index 0000000..1b951c0 --- /dev/null +++ b/tests/test_config_v2.py @@ -0,0 +1,367 @@ +"""Tests for profiles.json version 2 (#103): environment → service → identity. + +Covers: v2 loading + flattening, dotted-path and alias resolution with strict +order (exact alias → exact address → fail closed), legacy v1 names via aliases, +fail-closed validation (missing/unknown version, malformed hierarchy, ambiguous +selectors, TBD-* usernames, reviewer-identity deadlock rule, inline secrets, +missing auth, unnormalizable operations), service-default inheritance, and that +flattened v2 profiles still work with resolve_token. No network, no secrets. +""" +import os +import sys +import copy +import json +import tempfile +import unittest +from unittest.mock import patch + +sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) + +import gitea_config # noqa: E402 + +FAKE_TOKEN = "fake-token-for-tests" # not a real credential + + +def v2_config(): + """A fresh, valid v2 config exercising both environments.""" + return { + "version": 2, + "environments": { + "prgs": { + "services": { + "gitea": { + "base_url": "https://gitea.prgs.cc", + "default_owner": "Scaled-Tech-Consulting", + "identities": { + "author": { + "role": "author", + "username": "jcwalker3", + "auth": {"type": "keychain", + "id": "prgs.gitea.author.token"}, + "execution_profile": "prgs-author", + "audit_label": "prgs-author", + "allowed_operations": [ + "gitea.read", "gitea.issue.create", + "gitea.branch.push", "gitea.pr.create", + ], + "forbidden_operations": [ + "gitea.pr.approve", "gitea.pr.merge", + ], + }, + "reviewer": { + "role": "reviewer", + "username": "sysadmin", + "auth": {"type": "env", + "name": "PRGS_REVIEWER_TOKEN"}, + "execution_profile": "prgs-reviewer", + "audit_label": "prgs-reviewer", + "default_repo": "Gitea-Tools", + "allowed_operations": [ + "read", "review", "comment", "approve", + "request_changes", "merge", + ], + "forbidden_operations": [ + "gitea.pr.create", "gitea.branch.push", + ], + }, + }, + }, + }, + }, + "mdcps": { + "services": { + "gitea": { + "base_url": "https://gitea.dadeschools.net", + "identities": { + "author": { + "role": "author", + "username": "913443", + "auth": {"type": "keychain", + "id": "mdcps.gitea.author.token"}, + "allowed_operations": ["gitea.read"], + "forbidden_operations": [ + "gitea.pr.approve", "gitea.pr.merge", + ], + }, + "reviewer": { + "role": "reviewer", + "username": "TBD-second-mdcps-user", + "auth": {"type": "keychain", + "id": "mdcps.gitea.reviewer.token"}, + "allowed_operations": [ + "gitea.read", "gitea.pr.approve", + "gitea.pr.merge", + ], + "forbidden_operations": [ + "gitea.pr.create", "gitea.branch.push", + ], + }, + }, + }, + "jenkins": { + "base_url": "https://jenkins.dadeschools.net", + "identities": { + "reader": { + "role": "reader", + "username": "svc-jenkins-read", + "auth": {"type": "keychain", + "id": "mdcps.jenkins.reader.token"}, + "allowed_operations": ["read", "jenkins.build.read"], + "forbidden_operations": ["jenkins.build.trigger"], + }, + }, + }, + }, + }, + }, + "aliases": { + "mdcps": "mdcps.gitea.author", + "prgs-author": "prgs.gitea.author", + "prgs-reviewer": "prgs.gitea.reviewer", + }, + } + + +class _V2Base(unittest.TestCase): + def setUp(self): + self._dir = tempfile.TemporaryDirectory() + self.path = os.path.join(self._dir.name, "profiles.json") + self._write(v2_config()) + + def tearDown(self): + self._dir.cleanup() + + def _write(self, obj): + with open(self.path, "w", encoding="utf-8") as fh: + fh.write(obj if isinstance(obj, str) else json.dumps(obj)) + + def _env(self, profile, **extra): + env = {"GITEA_MCP_CONFIG": self.path, "GITEA_MCP_PROFILE": profile} + env.update(extra) + return env + + def _resolve(self, profile): + with patch.dict(os.environ, self._env(profile), clear=True): + return gitea_config.resolve_profile() + + def _load_raises(self, mutate, needle): + cfg = v2_config() + mutate(cfg) + self._write(cfg) + with patch.dict(os.environ, self._env("prgs.gitea.author"), clear=True): + with self.assertRaises(gitea_config.ConfigError) as ctx: + gitea_config.resolve_profile() + self.assertIn(needle, str(ctx.exception)) + return str(ctx.exception) + + +# --------------------------------------------------------------------------- +# Happy path: loading, dotted paths, aliases, inheritance +# --------------------------------------------------------------------------- +class TestV2Loads(_V2Base): + + def test_dotted_path_resolution(self): + p = self._resolve("prgs.gitea.author") + self.assertEqual(p["base_url"], "https://gitea.prgs.cc") + self.assertEqual(p["username"], "jcwalker3") + self.assertEqual(p["profile_path"], "prgs.gitea.author") + self.assertEqual(p["environment"], "prgs") + self.assertEqual(p["service"], "gitea") + self.assertEqual(p["identity"], "author") + self.assertEqual(p["role"], "author") + + def test_alias_resolution_legacy_names(self): + for legacy, addr in ( + ("mdcps", "mdcps.gitea.author"), + ("prgs-author", "prgs.gitea.author"), + ("prgs-reviewer", "prgs.gitea.reviewer"), + ): + p = self._resolve(legacy) + self.assertEqual(p["profile_path"], addr, legacy) + + def test_service_defaults_inherit_and_identity_overrides(self): + author = self._resolve("prgs.gitea.author") + self.assertEqual(author["default_owner"], "Scaled-Tech-Consulting") + self.assertNotIn("default_repo", author) + reviewer = self._resolve("prgs.gitea.reviewer") + self.assertEqual(reviewer["default_owner"], "Scaled-Tech-Consulting") + self.assertEqual(reviewer["default_repo"], "Gitea-Tools") + + def test_unqualified_ops_normalized_minimally(self): + reviewer = self._resolve("prgs.gitea.reviewer") + self.assertIn("gitea.pr.merge", reviewer["allowed_operations"]) + self.assertIn("gitea.read", reviewer["allowed_operations"]) + self.assertNotIn("merge", reviewer["allowed_operations"]) + jenkins = self._resolve("mdcps.jenkins.reader") + self.assertIn("jenkins.read", jenkins["allowed_operations"]) + self.assertIn("jenkins.build.read", jenkins["allowed_operations"]) + + def test_resolve_token_works_on_flattened_profile(self): + with patch.dict( + os.environ, + self._env("prgs.gitea.reviewer", PRGS_REVIEWER_TOKEN=FAKE_TOKEN), + clear=True, + ): + profile = gitea_config.resolve_profile() + self.assertEqual(gitea_config.resolve_token(profile), FAKE_TOKEN) + + def test_auth_source_name_on_flattened_profile(self): + p = self._resolve("mdcps.gitea.author") + self.assertEqual( + gitea_config.auth_source_name(p), "keychain:mdcps.gitea.author.token" + ) + + def test_v1_config_still_loads(self): + self._write({ + "version": 1, + "profiles": {"prgs": { + "base_url": "https://gitea.prgs.cc", + "auth": {"type": "keychain", "id": "prgs-gitea-token"}, + }}, + }) + p = self._resolve("prgs") + self.assertEqual(p["base_url"], "https://gitea.prgs.cc") + + def test_validate_config_accepts_valid_v2(self): + self.assertEqual(gitea_config.validate_config(v2_config()), []) + + +# --------------------------------------------------------------------------- +# Fail-closed: selectors +# --------------------------------------------------------------------------- +class TestV2Selectors(_V2Base): + + def test_unknown_selector_fails_closed(self): + with self.assertRaises(gitea_config.ConfigError) as ctx: + self._resolve("prgs.gitea") # partial address — no fuzzy matching + self.assertIn("not found", str(ctx.exception)) + + def test_no_fuzzy_matching_on_near_miss(self): + with self.assertRaises(gitea_config.ConfigError): + self._resolve("prgs-reviewers") + + def test_conflicting_alias_and_address_fails_closed(self): + def mutate(cfg): + cfg["aliases"]["prgs.gitea.author"] = "prgs.gitea.reviewer" + self._load_raises(mutate, "conflicting selector") + + def test_alias_to_unknown_target_fails_closed(self): + def mutate(cfg): + cfg["aliases"]["ghost"] = "prgs.gitea.nope" + self._load_raises(mutate, "unknown profile") + + def test_tbd_username_fails_closed_on_selection(self): + with self.assertRaises(gitea_config.ConfigError) as ctx: + self._resolve("mdcps.gitea.reviewer") + msg = str(ctx.exception) + self.assertIn("TBD", msg) + self.assertIn("provision", msg) + + def test_tbd_identity_does_not_block_other_identities(self): + # Same file contains the TBD reviewer; author still resolves. + p = self._resolve("mdcps.gitea.author") + self.assertEqual(p["username"], "913443") + + +# --------------------------------------------------------------------------- +# Fail-closed: structure and versions +# --------------------------------------------------------------------------- +class TestV2Structure(_V2Base): + + def test_missing_version_fails_closed(self): + def mutate(cfg): + del cfg["version"] + self._load_raises(mutate, "version") + + def test_unknown_version_fails_closed(self): + def mutate(cfg): + cfg["version"] = 3 + self._load_raises(mutate, "unsupported version") + + def test_missing_environments_fails_closed(self): + def mutate(cfg): + del cfg["environments"] + self._load_raises(mutate, "environments") + + def test_malformed_environment_fails_closed(self): + def mutate(cfg): + cfg["environments"]["prgs"] = "not-an-object" + self._load_raises(mutate, "must be a JSON object") + + def test_missing_services_fails_closed(self): + def mutate(cfg): + cfg["environments"]["prgs"]["services"] = {} + self._load_raises(mutate, "services") + + def test_missing_identities_fails_closed(self): + def mutate(cfg): + cfg["environments"]["prgs"]["services"]["gitea"]["identities"] = {} + self._load_raises(mutate, "identities") + + def test_dotted_segment_name_fails_closed(self): + def mutate(cfg): + envs = cfg["environments"] + envs["bad.env"] = copy.deepcopy(envs["prgs"]) + self._load_raises(mutate, "invalid environment name") + + def test_missing_base_url_fails_closed(self): + def mutate(cfg): + svc = cfg["environments"]["prgs"]["services"]["gitea"] + del svc["base_url"] + self._load_raises(mutate, "base_url") + + +# --------------------------------------------------------------------------- +# Fail-closed: identity invariants +# --------------------------------------------------------------------------- +class TestV2IdentityInvariants(_V2Base): + + def _ident(self, cfg, addr="prgs.gitea.author"): + env, svc, ident = addr.split(".") + return cfg["environments"][env]["services"][svc]["identities"][ident] + + def test_missing_auth_fails_closed(self): + def mutate(cfg): + del self._ident(cfg)["auth"] + self._load_raises(mutate, "missing an 'auth' reference") + + def test_inline_secret_in_identity_rejected(self): + def mutate(cfg): + self._ident(cfg)["token"] = "oops-not-a-real-secret" + msg = self._load_raises(mutate, "inline 'token'") + self.assertNotIn("oops-not-a-real-secret", msg) + + def test_inline_secret_in_auth_rejected(self): + def mutate(cfg): + self._ident(cfg)["auth"]["password"] = "oops-not-a-real-secret" + msg = self._load_raises(mutate, "inline 'password'") + self.assertNotIn("oops-not-a-real-secret", msg) + + def test_reviewer_deadlock_invariant_enforced(self): + def mutate(cfg): + reviewer = self._ident(cfg, "prgs.gitea.reviewer") + reviewer["forbidden_operations"] = [] # can approve/merge AND create + msg = self._load_raises(mutate, "deadlock") + self.assertIn("gitea.pr.create", msg) + + def test_reviewer_deadlock_applies_to_unqualified_merge(self): + def mutate(cfg): + author = self._ident(cfg) + author["allowed_operations"] = ["merge"] # normalized to gitea.pr.merge + author["forbidden_operations"] = [] + self._load_raises(mutate, "deadlock") + + def test_unnormalizable_operation_fails_closed(self): + def mutate(cfg): + self._ident(cfg)["allowed_operations"] = ["frobnicate"] + self._load_raises(mutate, "cannot be normalized") + + def test_foreign_namespace_operation_fails_closed(self): + def mutate(cfg): + reader = self._ident(cfg, "mdcps.jenkins.reader") + reader["allowed_operations"] = ["gitea.pr.merge"] + self._load_raises(mutate, "cannot be normalized") + + +if __name__ == "__main__": + unittest.main()