diff --git a/docs/gitea-execution-profiles.md b/docs/gitea-execution-profiles.md index b2c0497..9f3e5b1 100644 --- a/docs/gitea-execution-profiles.md +++ b/docs/gitea-execution-profiles.md @@ -134,8 +134,50 @@ Rules: appears in both, it is forbidden. - An operation not present in `allowed_operations` is treated as **not allowed** (deny by default). -- These categories are descriptive for this issue. Their runtime enforcement is - out of scope here (see roadmap links). + +## Operation-name normalization (#106) + +Canonical operation names are namespaced: `{service}.{area}.{verb}` (e.g. +`gitea.pr.merge`, `jenkins.build.read`). Legacy unqualified spellings are +accepted **only** through the explicit alias table below (the code of record +is `GITEA_OPERATION_ALIASES` in `gitea_config.py`; the enforcement matrix is +`tests/test_op_normalization.py`). + +| Legacy spelling | Canonical operation | +|-------------------|----------------------------| +| `read` | `gitea.read` | +| `review` | `gitea.pr.review` | +| `comment` | `gitea.pr.comment` | +| `approve` | `gitea.pr.approve` | +| `request_changes` | `gitea.pr.request_changes` | +| `merge` | `gitea.pr.merge` | +| `pr.create` | `gitea.pr.create` | +| `branch.push` | `gitea.branch.push` | +| `branch` | `gitea.branch.create` | +| `commit` | `gitea.repo.commit` | +| `push` | `gitea.branch.push` | +| `open_pr` | `gitea.pr.create` | + +For non-Gitea services, a single unqualified word namespaces to the checked +service (`read` → `jenkins.read` when checking Jenkins); names already +prefixed with that service pass through unchanged. + +Enforcement rules (`gitea_config.check_operation`, run **before** any +allowed/forbidden membership check): + +- Unknown operation names fail closed (denied). +- Ambiguous names — dotted names that are neither service-prefixed nor in the + alias table — fail closed. +- Cross-service names are never accepted by the wrong service + (`jenkins.read` never matches a Gitea check, and a Gitea alias is never + applied to another service). +- `forbidden_operations` overrides `allowed_operations` after both sides are + normalized, so a legacy spelling can never bypass a canonical forbidden + entry (or vice versa). +- An allowed entry that cannot be normalized grants nothing; a forbidden + entry that cannot be normalized denies the request. Normalization can + therefore never silently widen permissions. +- An empty or missing `allowed_operations` list denies everything. ## Identity and fail-closed rules diff --git a/gitea_config.py b/gitea_config.py index f36b835..b029478 100644 --- a/gitea_config.py +++ b/gitea_config.py @@ -70,11 +70,13 @@ _TBD_RE = re.compile(r"(?i)^tbd(-|$)") # Keys that would mean an inline secret wherever they appear. _INLINE_SECRET_KEYS = ("token", "password", "secret") -# ── Minimal operation normalization (#103) ───────────────────────────────────── -# Only what the #103 invariants need. The full normalization table, deprecation -# handling, and enforcement test matrix belong to issue #106 — do not grow this -# beyond invariant safety here. -_MINIMAL_GITEA_OP_MAP = { +# ── Operation-name normalization table (#106; minimal subset landed in #103) ─── +# Canonical operations are namespaced ({service}.{area}.{verb}). Legacy +# unqualified spellings are accepted ONLY through this explicit table — never +# by guessing. The same table is the documentation of record (see +# docs/gitea-execution-profiles.md) and is exercised by +# tests/test_op_normalization.py. +GITEA_OPERATION_ALIASES = { "read": "gitea.read", "review": "gitea.pr.review", "comment": "gitea.pr.comment", @@ -94,27 +96,83 @@ _REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"}) _AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"}) -def _normalize_op(service, op, addr): - """Normalize *op* for *service*, or fail closed (#103 minimal subset). +def normalize_operation(op, service="gitea"): + """Return the canonical namespaced name for *op*, or fail closed (#106). - already namespaced for this service (``{service}.*``) → unchanged - - known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP`` + - known unqualified Gitea ops → mapped via ``GITEA_OPERATION_ALIASES`` - unqualified single-word ops on non-Gitea services → ``{service}.{op}`` - - anything else (foreign prefixes, unknown unqualified names) → ConfigError + - anything else — foreign service prefixes, dotted names outside the + table, unknown unqualified names — is unknown or ambiguous → ConfigError + + Normalization never crosses services (a Gitea alias is never applied to + another service) and never widens permissions: an operation that cannot + be normalized grants and matches nothing. """ if not isinstance(op, str) or not op: - raise ConfigError(f"identity '{addr}' has an empty or non-string operation") + raise ConfigError("operation must be a non-empty string (fail closed)") if op.startswith(service + "."): return op - if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP: - return _MINIMAL_GITEA_OP_MAP[op] + if service == "gitea" and op in GITEA_OPERATION_ALIASES: + return GITEA_OPERATION_ALIASES[op] if service != "gitea" and "." not in op: return f"{service}.{op}" raise ConfigError( - f"identity '{addr}' has operation {op!r} that cannot be normalized " - f"safely for service '{service}' (fail closed; full table is issue #106)" + f"operation {op!r} cannot be normalized safely for service " + f"'{service}' (unknown, ambiguous, or cross-service; fail closed)" ) + +def check_operation(op, allowed, forbidden=(), service="gitea"): + """Decide whether *op* is permitted. Returns ``(bool, reason)`` (#106). + + Everything is normalized via :func:`normalize_operation` BEFORE any + membership check, so legacy and canonical spellings always compare equal. + Reasons: ``allowed``, ``invalid-operation``, ``invalid-forbidden-entry``, + ``forbidden``, ``no-allowed-operations``, ``not-allowed``. + + Fail-closed rules: + - an *op* that cannot be normalized is denied (``invalid-operation``) + - a forbidden entry that cannot be normalized denies the request + (``invalid-forbidden-entry``) — dropping it would silently narrow the + forbidden set, i.e. widen permissions + - an allowed entry that cannot be normalized is ignored — it grants + nothing, so permissions never widen + - ``forbidden`` always overrides ``allowed`` + - an empty or missing allowed list denies everything + """ + try: + op_n = normalize_operation(op, service) + except ConfigError: + return (False, "invalid-operation") + forbidden_n = set() + for entry in (forbidden or ()): + try: + forbidden_n.add(normalize_operation(entry, service)) + except ConfigError: + return (False, "invalid-forbidden-entry") + if op_n in forbidden_n: + return (False, "forbidden") + if not allowed: + return (False, "no-allowed-operations") + allowed_n = set() + for entry in allowed: + try: + allowed_n.add(normalize_operation(entry, service)) + except ConfigError: + continue + if op_n in allowed_n: + return (True, "allowed") + return (False, "not-allowed") + + +def _normalize_op(service, op, addr): + """Normalize *op* for identity *addr*, or fail closed with context.""" + try: + return normalize_operation(op, service) + except ConfigError as exc: + raise ConfigError(f"identity '{addr}': {exc}") from None + # Default canonical config location (one file shared by all LLM launchers). DEFAULT_CONFIG_PATH = os.path.join( os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json" diff --git a/mcp_server.py b/mcp_server.py index 3d6d26b..076abd8 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -521,14 +521,24 @@ def gitea_check_pr_eligibility( return result # Profile capability check (metadata only; not enforcement of the action). + # Both the action and the profile lists are normalized before comparison + # (#106), so legacy spellings ("merge") and canonical namespaced ops + # ("gitea.pr.merge") always match each other and never cross services. allowed = profile["allowed_operations"] forbidden = profile["forbidden_operations"] - if not allowed: - reasons.append("profile has no configured allowed operations (fail closed)") - if action in forbidden: - reasons.append(f"profile forbids '{action}'") - elif action not in allowed: - reasons.append(f"profile is not allowed to {action}") + op_ok, op_reason = gitea_config.check_operation(action, allowed, forbidden) + if not op_ok: + if op_reason == "no-allowed-operations": + reasons.append( + "profile has no configured allowed operations (fail closed)") + elif op_reason == "forbidden": + reasons.append(f"profile forbids '{action}'") + elif op_reason == "invalid-forbidden-entry": + reasons.append( + "profile has an unrecognized forbidden operation entry " + "(fail closed)") + else: + reasons.append(f"profile is not allowed to {action}") h, o, r = _resolve(remote, host, org, repo) diff --git a/tests/test_op_normalization.py b/tests/test_op_normalization.py new file mode 100644 index 0000000..70c4ec5 --- /dev/null +++ b/tests/test_op_normalization.py @@ -0,0 +1,239 @@ +"""Operation-name normalization table and enforcement tests — issue #106. + +Covers the required matrix from #106: + +- fully qualified allowed / forbidden operations +- legacy unqualified allowed / forbidden operations +- unknown operations (fail closed) +- ambiguous operations (fail closed) +- service mismatch (cross-service names never accepted by the wrong service) +- forbidden-overrides-allowed +- empty / missing allowed list +- duplicate operations after normalization +- no silent permission widening +- eligibility enforcement normalizes before checking +""" + +import os +import sys +import unittest +from unittest.mock import patch + +sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) + +import gitea_config # noqa: E402 +from gitea_config import ( # noqa: E402 + ConfigError, + check_operation, + normalize_operation, +) +from mcp_server import gitea_check_pr_eligibility # noqa: E402 + +FAKE_AUTH = "Basic dGVzdDp0ZXN0" + + +# --------------------------------------------------------------------------- +# normalize_operation — canonical table +# --------------------------------------------------------------------------- +class TestNormalizeOperation(unittest.TestCase): + + def test_fully_qualified_gitea_op_unchanged(self): + self.assertEqual(normalize_operation("gitea.pr.merge"), "gitea.pr.merge") + + def test_legacy_aliases_map_to_canonical_names(self): + expected = { + "merge": "gitea.pr.merge", + "approve": "gitea.pr.approve", + "request_changes": "gitea.pr.request_changes", + "review": "gitea.pr.review", + "comment": "gitea.pr.comment", + "read": "gitea.read", + } + for legacy, canonical in expected.items(): + self.assertEqual(normalize_operation(legacy), canonical) + + def test_contexts_shape_author_verbs(self): + self.assertEqual(normalize_operation("branch"), "gitea.branch.create") + self.assertEqual(normalize_operation("commit"), "gitea.repo.commit") + self.assertEqual(normalize_operation("push"), "gitea.branch.push") + self.assertEqual(normalize_operation("open_pr"), "gitea.pr.create") + + def test_unknown_unqualified_op_fails_closed(self): + with self.assertRaises(ConfigError): + normalize_operation("frobnicate") + + def test_ambiguous_dotted_op_fails_closed(self): + # Dotted but neither gitea-prefixed nor an explicit alias: refuse to + # guess which namespace was meant. + with self.assertRaises(ConfigError): + normalize_operation("build.read") + + def test_cross_service_name_rejected_by_wrong_service(self): + with self.assertRaises(ConfigError): + normalize_operation("jenkins.read", service="gitea") + with self.assertRaises(ConfigError): + normalize_operation("gitea.read", service="jenkins") + + def test_non_gitea_single_word_namespaced_to_service(self): + self.assertEqual(normalize_operation("read", service="jenkins"), + "jenkins.read") + + def test_non_gitea_qualified_own_prefix_unchanged(self): + self.assertEqual( + normalize_operation("jenkins.build.read", service="jenkins"), + "jenkins.build.read", + ) + + def test_empty_and_non_string_fail_closed(self): + for bad in ("", None, 3, ["merge"]): + with self.assertRaises(ConfigError): + normalize_operation(bad) + + def test_gitea_alias_not_applied_to_other_services(self): + # "merge" on jenkins must not resolve to the *gitea* merge permission. + self.assertEqual(normalize_operation("merge", service="jenkins"), + "jenkins.merge") + + def test_table_is_documented_and_matches_normalization(self): + table = gitea_config.GITEA_OPERATION_ALIASES + self.assertIsInstance(table, dict) + self.assertTrue(table) + for legacy, canonical in table.items(): + self.assertEqual(normalize_operation(legacy), canonical) + self.assertTrue(canonical.startswith("gitea.")) + + +# --------------------------------------------------------------------------- +# check_operation — enforcement semantics (normalize BEFORE checking) +# --------------------------------------------------------------------------- +class TestCheckOperation(unittest.TestCase): + + def test_fully_qualified_allowed(self): + ok, reason = check_operation("gitea.pr.merge", ["gitea.pr.merge"]) + self.assertTrue(ok) + self.assertEqual(reason, "allowed") + + def test_fully_qualified_forbidden(self): + ok, reason = check_operation( + "gitea.pr.merge", ["gitea.pr.merge"], ["gitea.pr.merge"]) + self.assertFalse(ok) + self.assertEqual(reason, "forbidden") + + def test_legacy_unqualified_allowed(self): + ok, reason = check_operation("merge", ["gitea.pr.merge"]) + self.assertTrue(ok) + self.assertEqual(reason, "allowed") + + def test_legacy_unqualified_forbidden(self): + ok, reason = check_operation("merge", ["gitea.pr.merge"], ["merge"]) + self.assertFalse(ok) + self.assertEqual(reason, "forbidden") + + def test_unknown_operation_fails_closed(self): + ok, reason = check_operation("frobnicate", ["gitea.read"]) + self.assertFalse(ok) + self.assertEqual(reason, "invalid-operation") + + def test_ambiguous_operation_fails_closed(self): + ok, reason = check_operation("build.read", ["gitea.read"]) + self.assertFalse(ok) + self.assertEqual(reason, "invalid-operation") + + def test_service_mismatch_rejected(self): + ok, reason = check_operation("jenkins.read", ["gitea.read"]) + self.assertFalse(ok) + self.assertEqual(reason, "invalid-operation") + + def test_forbidden_overrides_allowed_across_spellings(self): + # Allowed via legacy spelling, forbidden via canonical spelling: the + # forbidden entry must win after both normalize to the same op. + ok, reason = check_operation("merge", ["merge"], ["gitea.pr.merge"]) + self.assertFalse(ok) + self.assertEqual(reason, "forbidden") + + def test_empty_allowed_list_denies(self): + ok, reason = check_operation("gitea.read", []) + self.assertFalse(ok) + self.assertEqual(reason, "no-allowed-operations") + + def test_missing_allowed_list_denies(self): + ok, reason = check_operation("gitea.read", None) + self.assertFalse(ok) + self.assertEqual(reason, "no-allowed-operations") + + def test_duplicates_after_normalization_are_harmless(self): + ok, reason = check_operation( + "merge", ["merge", "gitea.pr.merge", "merge"]) + self.assertTrue(ok) + self.assertEqual(reason, "allowed") + + def test_unnormalizable_allowed_entry_grants_nothing(self): + # A junk allowed entry must not widen permissions to anything. + ok, reason = check_operation("gitea.read", ["frobnicate"]) + self.assertFalse(ok) + self.assertEqual(reason, "not-allowed") + + def test_unnormalizable_forbidden_entry_fails_closed(self): + # If a forbidden entry cannot be understood, deny rather than risk + # silently narrowing the forbidden set (which would widen permissions). + ok, reason = check_operation( + "gitea.read", ["gitea.read"], ["frobnicate"]) + self.assertFalse(ok) + self.assertEqual(reason, "invalid-forbidden-entry") + + +# --------------------------------------------------------------------------- +# Eligibility enforcement — normalization happens before checking (#106) +# --------------------------------------------------------------------------- +class TestEligibilityNormalizesOperations(unittest.TestCase): + + def _pr(self, author, state="open", sha="abc123", mergeable=True): + return { + "user": {"login": author}, + "state": state, + "head": {"sha": sha}, + "mergeable": mergeable, + } + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_namespaced_profile_ops_allow_legacy_action(self, _auth, mock_api): + # JSON-config profiles carry canonical namespaced ops; the raw action + # "merge" must still match them after normalization. + mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=9, action="merge", + remote="prgs") + self.assertTrue(r["eligible"]) + self.assertNotIn("profile is not allowed to merge", r["reasons"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_namespaced_forbidden_op_blocks_legacy_action(self, _auth, mock_api): + mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-merger", + "GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge", + "GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.merge"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=9, action="merge", + remote="prgs") + self.assertFalse(r["eligible"]) + self.assertIn("profile forbids 'merge'", r["reasons"]) + + @patch("mcp_server.api_request") + @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) + def test_legacy_env_ops_still_work(self, _auth, mock_api): + # v1/env behaviour stays compatible: unqualified env ops keep working. + mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] + env = {"GITEA_PROFILE_NAME": "gitea-reviewer", + "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} + with patch.dict(os.environ, env, clear=True): + r = gitea_check_pr_eligibility(pr_number=5, action="review", + remote="prgs") + self.assertTrue(r["eligible"]) + + +if __name__ == "__main__": + unittest.main()