"""Operation-name normalization table and enforcement tests — issue #106. Covers the required matrix from #106: - fully qualified allowed / forbidden operations - legacy unqualified allowed / forbidden operations - unknown operations (fail closed) - ambiguous operations (fail closed) - service mismatch (cross-service names never accepted by the wrong service) - forbidden-overrides-allowed - empty / missing allowed list - duplicate operations after normalization - no silent permission widening - eligibility enforcement normalizes before checking """ import os import sys import unittest from unittest.mock import patch sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) import gitea_config # noqa: E402 from gitea_config import ( # noqa: E402 ConfigError, check_operation, normalize_operation, ) from mcp_server import gitea_check_pr_eligibility # noqa: E402 FAKE_AUTH = "Basic dGVzdDp0ZXN0" # --------------------------------------------------------------------------- # normalize_operation — canonical table # --------------------------------------------------------------------------- class TestNormalizeOperation(unittest.TestCase): def test_fully_qualified_gitea_op_unchanged(self): self.assertEqual(normalize_operation("gitea.pr.merge"), "gitea.pr.merge") def test_legacy_aliases_map_to_canonical_names(self): expected = { "merge": "gitea.pr.merge", "approve": "gitea.pr.approve", "request_changes": "gitea.pr.request_changes", "review": "gitea.pr.review", "comment": "gitea.pr.comment", "read": "gitea.read", } for legacy, canonical in expected.items(): self.assertEqual(normalize_operation(legacy), canonical) def test_contexts_shape_author_verbs(self): self.assertEqual(normalize_operation("branch"), "gitea.branch.create") self.assertEqual(normalize_operation("commit"), "gitea.repo.commit") self.assertEqual(normalize_operation("push"), "gitea.branch.push") self.assertEqual(normalize_operation("open_pr"), "gitea.pr.create") def test_unknown_unqualified_op_fails_closed(self): with self.assertRaises(ConfigError): normalize_operation("frobnicate") def test_ambiguous_dotted_op_fails_closed(self): # Dotted but neither gitea-prefixed nor an explicit alias: refuse to # guess which namespace was meant. with self.assertRaises(ConfigError): normalize_operation("build.read") def test_cross_service_name_rejected_by_wrong_service(self): with self.assertRaises(ConfigError): normalize_operation("jenkins.read", service="gitea") with self.assertRaises(ConfigError): normalize_operation("gitea.read", service="jenkins") def test_non_gitea_single_word_namespaced_to_service(self): self.assertEqual(normalize_operation("read", service="jenkins"), "jenkins.read") def test_non_gitea_qualified_own_prefix_unchanged(self): self.assertEqual( normalize_operation("jenkins.build.read", service="jenkins"), "jenkins.build.read", ) def test_empty_and_non_string_fail_closed(self): for bad in ("", None, 3, ["merge"]): with self.assertRaises(ConfigError): normalize_operation(bad) def test_gitea_alias_not_applied_to_other_services(self): # "merge" on jenkins must not resolve to the *gitea* merge permission. self.assertEqual(normalize_operation("merge", service="jenkins"), "jenkins.merge") def test_table_is_documented_and_matches_normalization(self): table = gitea_config.GITEA_OPERATION_ALIASES self.assertIsInstance(table, dict) self.assertTrue(table) for legacy, canonical in table.items(): self.assertEqual(normalize_operation(legacy), canonical) self.assertTrue(canonical.startswith("gitea.")) # --------------------------------------------------------------------------- # check_operation — enforcement semantics (normalize BEFORE checking) # --------------------------------------------------------------------------- class TestCheckOperation(unittest.TestCase): def test_fully_qualified_allowed(self): ok, reason = check_operation("gitea.pr.merge", ["gitea.pr.merge"]) self.assertTrue(ok) self.assertEqual(reason, "allowed") def test_fully_qualified_forbidden(self): ok, reason = check_operation( "gitea.pr.merge", ["gitea.pr.merge"], ["gitea.pr.merge"]) self.assertFalse(ok) self.assertEqual(reason, "forbidden") def test_legacy_unqualified_allowed(self): ok, reason = check_operation("merge", ["gitea.pr.merge"]) self.assertTrue(ok) self.assertEqual(reason, "allowed") def test_legacy_unqualified_forbidden(self): ok, reason = check_operation("merge", ["gitea.pr.merge"], ["merge"]) self.assertFalse(ok) self.assertEqual(reason, "forbidden") def test_unknown_operation_fails_closed(self): ok, reason = check_operation("frobnicate", ["gitea.read"]) self.assertFalse(ok) self.assertEqual(reason, "invalid-operation") def test_ambiguous_operation_fails_closed(self): ok, reason = check_operation("build.read", ["gitea.read"]) self.assertFalse(ok) self.assertEqual(reason, "invalid-operation") def test_service_mismatch_rejected(self): ok, reason = check_operation("jenkins.read", ["gitea.read"]) self.assertFalse(ok) self.assertEqual(reason, "invalid-operation") def test_forbidden_overrides_allowed_across_spellings(self): # Allowed via legacy spelling, forbidden via canonical spelling: the # forbidden entry must win after both normalize to the same op. ok, reason = check_operation("merge", ["merge"], ["gitea.pr.merge"]) self.assertFalse(ok) self.assertEqual(reason, "forbidden") def test_empty_allowed_list_denies(self): ok, reason = check_operation("gitea.read", []) self.assertFalse(ok) self.assertEqual(reason, "no-allowed-operations") def test_missing_allowed_list_denies(self): ok, reason = check_operation("gitea.read", None) self.assertFalse(ok) self.assertEqual(reason, "no-allowed-operations") def test_duplicates_after_normalization_are_harmless(self): ok, reason = check_operation( "merge", ["merge", "gitea.pr.merge", "merge"]) self.assertTrue(ok) self.assertEqual(reason, "allowed") def test_unnormalizable_allowed_entry_grants_nothing(self): # A junk allowed entry must not widen permissions to anything. ok, reason = check_operation("gitea.read", ["frobnicate"]) self.assertFalse(ok) self.assertEqual(reason, "not-allowed") def test_unnormalizable_forbidden_entry_fails_closed(self): # If a forbidden entry cannot be understood, deny rather than risk # silently narrowing the forbidden set (which would widen permissions). ok, reason = check_operation( "gitea.read", ["gitea.read"], ["frobnicate"]) self.assertFalse(ok) self.assertEqual(reason, "invalid-forbidden-entry") # --------------------------------------------------------------------------- # Eligibility enforcement — normalization happens before checking (#106) # --------------------------------------------------------------------------- class TestEligibilityNormalizesOperations(unittest.TestCase): def _pr(self, author, state="open", sha="abc123", mergeable=True): return { "user": {"login": author}, "state": state, "head": {"sha": sha}, "mergeable": mergeable, } @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_namespaced_profile_ops_allow_legacy_action(self, _auth, mock_api): # JSON-config profiles carry canonical namespaced ops; the raw action # "merge" must still match them after normalization. mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")] env = {"GITEA_PROFILE_NAME": "gitea-merger", "GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge"} with patch.dict(os.environ, env, clear=True): r = gitea_check_pr_eligibility(pr_number=9, action="merge", remote="prgs") self.assertTrue(r["eligible"]) self.assertNotIn("profile is not allowed to merge", r["reasons"]) @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_namespaced_forbidden_op_blocks_legacy_action(self, _auth, mock_api): mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")] env = {"GITEA_PROFILE_NAME": "gitea-merger", "GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge", "GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.merge"} with patch.dict(os.environ, env, clear=True): r = gitea_check_pr_eligibility(pr_number=9, action="merge", remote="prgs") self.assertFalse(r["eligible"]) self.assertIn("profile forbids 'merge'", r["reasons"]) @patch("mcp_server.api_request") @patch("mcp_server.get_auth_header", return_value=FAKE_AUTH) def test_legacy_env_ops_still_work(self, _auth, mock_api): # v1/env behaviour stays compatible: unqualified env ops keep working. mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")] env = {"GITEA_PROFILE_NAME": "gitea-reviewer", "GITEA_ALLOWED_OPERATIONS": "read,review,approve"} with patch.dict(os.environ, env, clear=True): r = gitea_check_pr_eligibility(pr_number=5, action="review", remote="prgs") self.assertTrue(r["eligible"]) if __name__ == "__main__": unittest.main()