1 Commits

Author SHA1 Message Date
sysadmin e0861bcb03 feat: operation-name normalization table with fail-closed enforcement (#106)
Promote the #103 minimal alias map to the documented public table
GITEA_OPERATION_ALIASES and add the #106 enforcement layer:

- normalize_operation(op, service): canonical namespaced names; legacy
  spellings accepted only via the explicit table; unknown, ambiguous,
  and cross-service names fail closed.
- check_operation(op, allowed, forbidden, service): normalizes BOTH the
  requested operation and the profile lists before any membership
  check; forbidden always overrides allowed; unnormalizable allowed
  entries grant nothing and unnormalizable forbidden entries deny the
  request, so normalization can never silently widen permissions;
  empty/missing allowed list denies everything.
- gitea_check_pr_eligibility now routes its capability check through
  check_operation, fixing the mismatch where canonical namespaced
  profile ops (gitea.pr.merge) never matched the raw action (merge)
  and namespaced forbidden entries were never enforced.
- Document the normalization table and enforcement rules in
  docs/gitea-execution-profiles.md, replacing the stale 'enforcement
  out of scope' caveat.
- tests/test_op_normalization.py: full #106 matrix (27 tests) —
  qualified/legacy allowed and forbidden, unknown, ambiguous, service
  mismatch, forbidden-overrides-allowed, empty/missing allowed,
  duplicates after normalization, no permission widening, and
  eligibility integration proving normalization happens before
  enforcement. Existing v1/env unqualified behaviour stays compatible.

Closes #106

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 03:35:03 -04:00
6 changed files with 371 additions and 443 deletions
+44 -2
View File
@@ -134,8 +134,50 @@ Rules:
appears in both, it is forbidden.
- An operation not present in `allowed_operations` is treated as **not
allowed** (deny by default).
- These categories are descriptive for this issue. Their runtime enforcement is
out of scope here (see roadmap links).
## Operation-name normalization (#106)
Canonical operation names are namespaced: `{service}.{area}.{verb}` (e.g.
`gitea.pr.merge`, `jenkins.build.read`). Legacy unqualified spellings are
accepted **only** through the explicit alias table below (the code of record
is `GITEA_OPERATION_ALIASES` in `gitea_config.py`; the enforcement matrix is
`tests/test_op_normalization.py`).
| Legacy spelling | Canonical operation |
|-------------------|----------------------------|
| `read` | `gitea.read` |
| `review` | `gitea.pr.review` |
| `comment` | `gitea.pr.comment` |
| `approve` | `gitea.pr.approve` |
| `request_changes` | `gitea.pr.request_changes` |
| `merge` | `gitea.pr.merge` |
| `pr.create` | `gitea.pr.create` |
| `branch.push` | `gitea.branch.push` |
| `branch` | `gitea.branch.create` |
| `commit` | `gitea.repo.commit` |
| `push` | `gitea.branch.push` |
| `open_pr` | `gitea.pr.create` |
For non-Gitea services, a single unqualified word namespaces to the checked
service (`read``jenkins.read` when checking Jenkins); names already
prefixed with that service pass through unchanged.
Enforcement rules (`gitea_config.check_operation`, run **before** any
allowed/forbidden membership check):
- Unknown operation names fail closed (denied).
- Ambiguous names — dotted names that are neither service-prefixed nor in the
alias table — fail closed.
- Cross-service names are never accepted by the wrong service
(`jenkins.read` never matches a Gitea check, and a Gitea alias is never
applied to another service).
- `forbidden_operations` overrides `allowed_operations` after both sides are
normalized, so a legacy spelling can never bypass a canonical forbidden
entry (or vice versa).
- An allowed entry that cannot be normalized grants nothing; a forbidden
entry that cannot be normalized denies the request. Normalization can
therefore never silently widen permissions.
- An empty or missing `allowed_operations` list denies everything.
## Identity and fail-closed rules
+72 -14
View File
@@ -70,11 +70,13 @@ _TBD_RE = re.compile(r"(?i)^tbd(-|$)")
# Keys that would mean an inline secret wherever they appear.
_INLINE_SECRET_KEYS = ("token", "password", "secret")
# ── Minimal operation normalization (#103) ─────────────────────────────────────
# Only what the #103 invariants need. The full normalization table, deprecation
# handling, and enforcement test matrix belong to issue #106 — do not grow this
# beyond invariant safety here.
_MINIMAL_GITEA_OP_MAP = {
# ── Operation-name normalization table (#106; minimal subset landed in #103) ───
# Canonical operations are namespaced ({service}.{area}.{verb}). Legacy
# unqualified spellings are accepted ONLY through this explicit table — never
# by guessing. The same table is the documentation of record (see
# docs/gitea-execution-profiles.md) and is exercised by
# tests/test_op_normalization.py.
GITEA_OPERATION_ALIASES = {
"read": "gitea.read",
"review": "gitea.pr.review",
"comment": "gitea.pr.comment",
@@ -94,27 +96,83 @@ _REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
def _normalize_op(service, op, addr):
"""Normalize *op* for *service*, or fail closed (#103 minimal subset).
def normalize_operation(op, service="gitea"):
"""Return the canonical namespaced name for *op*, or fail closed (#106).
- already namespaced for this service (``{service}.*``) → unchanged
- known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP``
- known unqualified Gitea ops → mapped via ``GITEA_OPERATION_ALIASES``
- unqualified single-word ops on non-Gitea services → ``{service}.{op}``
- anything else (foreign prefixes, unknown unqualified names) → ConfigError
- anything else foreign service prefixes, dotted names outside the
table, unknown unqualified names — is unknown or ambiguous → ConfigError
Normalization never crosses services (a Gitea alias is never applied to
another service) and never widens permissions: an operation that cannot
be normalized grants and matches nothing.
"""
if not isinstance(op, str) or not op:
raise ConfigError(f"identity '{addr}' has an empty or non-string operation")
raise ConfigError("operation must be a non-empty string (fail closed)")
if op.startswith(service + "."):
return op
if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP:
return _MINIMAL_GITEA_OP_MAP[op]
if service == "gitea" and op in GITEA_OPERATION_ALIASES:
return GITEA_OPERATION_ALIASES[op]
if service != "gitea" and "." not in op:
return f"{service}.{op}"
raise ConfigError(
f"identity '{addr}' has operation {op!r} that cannot be normalized "
f"safely for service '{service}' (fail closed; full table is issue #106)"
f"operation {op!r} cannot be normalized safely for service "
f"'{service}' (unknown, ambiguous, or cross-service; fail closed)"
)
def check_operation(op, allowed, forbidden=(), service="gitea"):
"""Decide whether *op* is permitted. Returns ``(bool, reason)`` (#106).
Everything is normalized via :func:`normalize_operation` BEFORE any
membership check, so legacy and canonical spellings always compare equal.
Reasons: ``allowed``, ``invalid-operation``, ``invalid-forbidden-entry``,
``forbidden``, ``no-allowed-operations``, ``not-allowed``.
Fail-closed rules:
- an *op* that cannot be normalized is denied (``invalid-operation``)
- a forbidden entry that cannot be normalized denies the request
(``invalid-forbidden-entry``) — dropping it would silently narrow the
forbidden set, i.e. widen permissions
- an allowed entry that cannot be normalized is ignored — it grants
nothing, so permissions never widen
- ``forbidden`` always overrides ``allowed``
- an empty or missing allowed list denies everything
"""
try:
op_n = normalize_operation(op, service)
except ConfigError:
return (False, "invalid-operation")
forbidden_n = set()
for entry in (forbidden or ()):
try:
forbidden_n.add(normalize_operation(entry, service))
except ConfigError:
return (False, "invalid-forbidden-entry")
if op_n in forbidden_n:
return (False, "forbidden")
if not allowed:
return (False, "no-allowed-operations")
allowed_n = set()
for entry in allowed:
try:
allowed_n.add(normalize_operation(entry, service))
except ConfigError:
continue
if op_n in allowed_n:
return (True, "allowed")
return (False, "not-allowed")
def _normalize_op(service, op, addr):
"""Normalize *op* for identity *addr*, or fail closed with context."""
try:
return normalize_operation(op, service)
except ConfigError as exc:
raise ConfigError(f"identity '{addr}': {exc}") from None
# Default canonical config location (one file shared by all LLM launchers).
DEFAULT_CONFIG_PATH = os.path.join(
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
+16 -6
View File
@@ -521,14 +521,24 @@ def gitea_check_pr_eligibility(
return result
# Profile capability check (metadata only; not enforcement of the action).
# Both the action and the profile lists are normalized before comparison
# (#106), so legacy spellings ("merge") and canonical namespaced ops
# ("gitea.pr.merge") always match each other and never cross services.
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
if not allowed:
reasons.append("profile has no configured allowed operations (fail closed)")
if action in forbidden:
reasons.append(f"profile forbids '{action}'")
elif action not in allowed:
reasons.append(f"profile is not allowed to {action}")
op_ok, op_reason = gitea_config.check_operation(action, allowed, forbidden)
if not op_ok:
if op_reason == "no-allowed-operations":
reasons.append(
"profile has no configured allowed operations (fail closed)")
elif op_reason == "forbidden":
reasons.append(f"profile forbids '{action}'")
elif op_reason == "invalid-forbidden-entry":
reasons.append(
"profile has an unrecognized forbidden operation entry "
"(fail closed)")
else:
reasons.append(f"profile is not allowed to {action}")
h, o, r = _resolve(remote, host, org, repo)
-224
View File
@@ -1,224 +0,0 @@
#!/usr/bin/env python3
"""Migration helper to convert profiles.json from version 1 to version 2 environments shape.
This script preserves existing keychain references (auth.id) and maps old profile
names as aliases so that existing IDE configurations continue to function.
"""
import os
import sys
import json
import argparse
import shutil
import tempfile
# Resolve path to import gitea_config
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import gitea_config
def migrate_v1_to_v2(v1_data):
"""Convert version 1 profiles.json format to version 2 environments format."""
environments = {}
aliases = {}
profiles = v1_data.get("profiles", {})
if not isinstance(profiles, dict):
raise ValueError("Malformed input: 'profiles' field must be a JSON object")
for name, prof in profiles.items():
if not isinstance(prof, dict):
raise ValueError(f"Malformed input: profile '{name}' must be a JSON object")
# Infer environment and identity name
if "-" in name:
parts = name.split("-", 1)
env_name = parts[0]
ident_name = parts[1]
else:
env_name = name
ident_name = "author"
# Determine role and identity based on name / execution_profile
role = "author"
exec_prof = prof.get("execution_profile") or ""
if "reviewer" in name or "reviewer" in exec_prof:
role = "reviewer"
ident_name = "reviewer"
elif "author" in name or "author" in exec_prof:
role = "author"
ident_name = "author"
# Construct identity block
identity_data = {
"role": role,
"username": prof.get("username"),
"auth": prof.get("auth"),
}
if prof.get("execution_profile"):
identity_data["execution_profile"] = prof["execution_profile"]
# Set audit label (default to old name to preserve context)
identity_data["audit_label"] = prof.get("audit_label") or name
# Populate capabilities based on role
if role == "author":
identity_data["allowed_operations"] = [
"read", "branch", "commit", "push", "open_pr", "comment"
]
identity_data["forbidden_operations"] = [
"approve", "request_changes", "merge"
]
else:
identity_data["allowed_operations"] = [
"read", "review", "comment", "approve", "request_changes", "merge"
]
identity_data["forbidden_operations"] = [
"branch", "commit", "push", "open_pr"
]
# Nest inside environments/services structure
env = environments.setdefault(env_name, {})
services = env.setdefault("services", {})
gitea_svc = services.setdefault("gitea", {})
# Copy service-level attributes
if prof.get("base_url"):
gitea_svc["base_url"] = prof["base_url"]
if prof.get("default_owner"):
gitea_svc["default_owner"] = prof["default_owner"]
if prof.get("default_repo"):
gitea_svc["default_repo"] = prof["default_repo"]
identities = gitea_svc.setdefault("identities", {})
identities[ident_name] = identity_data
# Alias resolution targets
alias_target = f"{env_name}.gitea.{ident_name}"
if name != alias_target:
aliases[name] = alias_target
# Extra convenience alias for standard old-profile compatibility (e.g. prgs-author)
convenience_alias = f"{env_name}-{ident_name}"
if convenience_alias != alias_target and convenience_alias not in aliases:
aliases[convenience_alias] = alias_target
v2_data = {
"version": 2,
"environments": environments,
"aliases": aliases
}
return v2_data
def validate_v2_data(v2_data):
"""Validate generated v2 structure using gitea_config parser."""
fd, temp_path = tempfile.mkstemp(suffix=".json")
os.close(fd)
try:
with open(temp_path, "w") as f:
json.dump(v2_data, f)
# Attempt to load using load_config to run all validation rules
gitea_config.load_config(temp_path)
return True
except Exception as e:
raise ValueError(f"Generated v2 config failed validation: {e}")
finally:
try:
os.remove(temp_path)
except OSError:
pass
def main():
parser = argparse.ArgumentParser(
description="Migrate profiles.json from version 1 to version 2 environments shape."
)
parser.add_argument(
"-i", "--input",
default=gitea_config.DEFAULT_CONFIG_PATH,
help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)"
)
parser.add_argument(
"-o", "--output",
help="Path to write the migrated version 2 profiles.json file (default: overwrite input)"
)
parser.add_argument(
"-w", "--write",
action="store_true",
help="Actually write the migrated config and create a backup (default is dry-run)"
)
parser.add_argument(
"--backup",
help="Path to write the backup file (default: <input_path>.bak)"
)
args = parser.parse_args()
input_path = os.path.abspath(args.input)
output_path = os.path.abspath(args.output or input_path)
backup_path = args.backup or f"{input_path}.bak"
if not os.path.isfile(input_path):
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
sys.exit(1)
try:
with open(input_path, "r") as f:
v1_data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error reading input file: {e}", file=sys.stderr)
sys.exit(1)
# Validate version
version = v1_data.get("version")
if version is not None and version != 1:
print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr)
sys.exit(1)
try:
v2_data = migrate_v1_to_v2(v1_data)
validate_v2_data(v2_data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not args.write:
print("=== DRY-RUN MODE (No files modified) ===")
print(f"Would read from: {input_path}")
print(f"Would create backup at: {backup_path}")
print(f"Would write v2 config to: {output_path}")
print("\nGenerated v2 config:")
print(json.dumps(v2_data, indent=2))
sys.exit(0)
# Write Mode: Create Backup first
try:
print(f"Creating backup: {backup_path}")
shutil.copy2(input_path, backup_path)
except Exception as e:
print(f"Error creating backup: {e}", file=sys.stderr)
sys.exit(1)
# Write migrated config
try:
print(f"Writing migrated version 2 config: {output_path}")
# Ensure target directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w") as f:
json.dump(v2_data, f, indent=2)
f.write("\n")
print("Migration completed successfully!")
except Exception as e:
print(f"Error writing output file: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
-197
View File
@@ -1,197 +0,0 @@
"""Unit tests for migrate_profiles.py migration helper."""
import os
import sys
import json
import unittest
import tempfile
import shutil
from unittest.mock import patch
from io import StringIO
# Add project root to sys.path
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import migrate_profiles
class TestMigrateProfiles(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.v1_content = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "https://gitea.prgs.cc",
"username": "jcwalker3",
"auth": {
"type": "keychain",
"id": "prgs-gitea-token"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "personal-prgs"
},
"mdcps": {
"base_url": "https://gitea.dadeschools.net",
"username": "913443",
"auth": {
"type": "keychain",
"id": "mdcps-gitea-token"
},
"default_owner": "Contractor",
"execution_profile": "mdcps"
},
"prgs-reviewer": {
"base_url": "https://gitea.prgs.cc",
"username": "sysadmin",
"auth": {
"type": "keychain",
"id": "prgs-gitea-reviewer-token"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "prgs-reviewer"
}
}
}
self.input_file = os.path.join(self.temp_dir, "profiles.json")
with open(self.input_file, "w") as f:
json.dump(self.v1_content, f)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_migration_logic(self):
"""Test the structural transformation and capability mapping."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertEqual(v2_data["version"], 2)
# Check environment structure
envs = v2_data["environments"]
self.assertIn("prgs", envs)
self.assertIn("mdcps", envs)
# Check service and identity structure
prgs_gitea = envs["prgs"]["services"]["gitea"]
self.assertEqual(prgs_gitea["base_url"], "https://gitea.prgs.cc")
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
author = prgs_gitea["identities"]["author"]
self.assertEqual(author["role"], "author")
self.assertEqual(author["username"], "jcwalker3")
self.assertEqual(author["auth"]["id"], "prgs-gitea-token")
self.assertIn("push", author["allowed_operations"])
reviewer = prgs_gitea["identities"]["reviewer"]
self.assertEqual(reviewer["role"], "reviewer")
self.assertEqual(reviewer["username"], "sysadmin")
self.assertEqual(reviewer["auth"]["id"], "prgs-gitea-reviewer-token")
self.assertIn("merge", reviewer["allowed_operations"])
def test_alias_generation(self):
"""Test that aliases are correctly generated to support old profile names."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
aliases = v2_data["aliases"]
self.assertEqual(aliases["prgs"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-author"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-reviewer"], "prgs.gitea.reviewer")
self.assertEqual(aliases["mdcps"], "mdcps.gitea.author")
def test_no_secret_behavior(self):
"""Ensure secrets are never extracted, printed, or processed."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
# Check that auth structures only contain keychain references, not credentials
for env in v2_data["environments"].values():
for svc in env["services"].values():
for ident in svc["identities"].values():
auth = ident["auth"]
self.assertEqual(auth["type"], "keychain")
self.assertIn("id", auth)
self.assertNotIn("token", auth)
self.assertNotIn("password", auth)
def test_validation(self):
"""Test that the generated v2 configuration validates against Gitea-Tools v2 loader."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertTrue(migrate_profiles.validate_v2_data(v2_data))
@patch("sys.stdout", new_callable=StringIO)
def test_dry_run_default(self, mock_stdout):
"""Verify that running without -w prints generated config without modifying files."""
output_file = os.path.join(self.temp_dir, "migrated_dry.json")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file
]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
self.assertFalse(os.path.exists(output_file))
self.assertFalse(os.path.exists(f"{self.input_file}.bak"))
stdout_output = mock_stdout.getvalue()
self.assertIn("DRY-RUN MODE", stdout_output)
self.assertIn("version", stdout_output)
self.assertIn("environments", stdout_output)
def test_write_mode_and_backup(self):
"""Verify that write mode creates a backup and correctly saves the validated config."""
output_file = os.path.join(self.temp_dir, "migrated.json")
backup_file = os.path.join(self.temp_dir, "profiles_backup.json.bak")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file,
"--backup", backup_file,
"-w"
]
with patch.object(sys, "argv", test_args):
migrate_profiles.main()
# Verify backup exists and matches original v1 config
self.assertTrue(os.path.exists(backup_file))
with open(backup_file, "r") as f:
backup_data = json.load(f)
self.assertEqual(backup_data["version"], 1)
self.assertIn("prgs", backup_data["profiles"])
# Verify migrated v2 config exists and validates
self.assertTrue(os.path.exists(output_file))
with open(output_file, "r") as f:
v2_data = json.load(f)
self.assertEqual(v2_data["version"], 2)
self.assertIn("environments", v2_data)
self.assertEqual(v2_data["aliases"]["prgs"], "prgs.gitea.author")
def test_malformed_input_fails_safely(self):
"""Test that malformed JSON or invalid version numbers cause a clean exit with code 1."""
bad_json_file = os.path.join(self.temp_dir, "bad.json")
with open(bad_json_file, "w") as f:
f.write("{invalid-json}")
test_args = ["migrate_profiles.py", "-i", bad_json_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
bad_version_file = os.path.join(self.temp_dir, "bad_version.json")
with open(bad_version_file, "w") as f:
json.dump({"version": 3, "profiles": {}}, f)
test_args = ["migrate_profiles.py", "-i", bad_version_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
if __name__ == "__main__":
unittest.main()
+239
View File
@@ -0,0 +1,239 @@
"""Operation-name normalization table and enforcement tests — issue #106.
Covers the required matrix from #106:
- fully qualified allowed / forbidden operations
- legacy unqualified allowed / forbidden operations
- unknown operations (fail closed)
- ambiguous operations (fail closed)
- service mismatch (cross-service names never accepted by the wrong service)
- forbidden-overrides-allowed
- empty / missing allowed list
- duplicate operations after normalization
- no silent permission widening
- eligibility enforcement normalizes before checking
"""
import os
import sys
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_config # noqa: E402
from gitea_config import ( # noqa: E402
ConfigError,
check_operation,
normalize_operation,
)
from mcp_server import gitea_check_pr_eligibility # noqa: E402
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
# ---------------------------------------------------------------------------
# normalize_operation — canonical table
# ---------------------------------------------------------------------------
class TestNormalizeOperation(unittest.TestCase):
def test_fully_qualified_gitea_op_unchanged(self):
self.assertEqual(normalize_operation("gitea.pr.merge"), "gitea.pr.merge")
def test_legacy_aliases_map_to_canonical_names(self):
expected = {
"merge": "gitea.pr.merge",
"approve": "gitea.pr.approve",
"request_changes": "gitea.pr.request_changes",
"review": "gitea.pr.review",
"comment": "gitea.pr.comment",
"read": "gitea.read",
}
for legacy, canonical in expected.items():
self.assertEqual(normalize_operation(legacy), canonical)
def test_contexts_shape_author_verbs(self):
self.assertEqual(normalize_operation("branch"), "gitea.branch.create")
self.assertEqual(normalize_operation("commit"), "gitea.repo.commit")
self.assertEqual(normalize_operation("push"), "gitea.branch.push")
self.assertEqual(normalize_operation("open_pr"), "gitea.pr.create")
def test_unknown_unqualified_op_fails_closed(self):
with self.assertRaises(ConfigError):
normalize_operation("frobnicate")
def test_ambiguous_dotted_op_fails_closed(self):
# Dotted but neither gitea-prefixed nor an explicit alias: refuse to
# guess which namespace was meant.
with self.assertRaises(ConfigError):
normalize_operation("build.read")
def test_cross_service_name_rejected_by_wrong_service(self):
with self.assertRaises(ConfigError):
normalize_operation("jenkins.read", service="gitea")
with self.assertRaises(ConfigError):
normalize_operation("gitea.read", service="jenkins")
def test_non_gitea_single_word_namespaced_to_service(self):
self.assertEqual(normalize_operation("read", service="jenkins"),
"jenkins.read")
def test_non_gitea_qualified_own_prefix_unchanged(self):
self.assertEqual(
normalize_operation("jenkins.build.read", service="jenkins"),
"jenkins.build.read",
)
def test_empty_and_non_string_fail_closed(self):
for bad in ("", None, 3, ["merge"]):
with self.assertRaises(ConfigError):
normalize_operation(bad)
def test_gitea_alias_not_applied_to_other_services(self):
# "merge" on jenkins must not resolve to the *gitea* merge permission.
self.assertEqual(normalize_operation("merge", service="jenkins"),
"jenkins.merge")
def test_table_is_documented_and_matches_normalization(self):
table = gitea_config.GITEA_OPERATION_ALIASES
self.assertIsInstance(table, dict)
self.assertTrue(table)
for legacy, canonical in table.items():
self.assertEqual(normalize_operation(legacy), canonical)
self.assertTrue(canonical.startswith("gitea."))
# ---------------------------------------------------------------------------
# check_operation — enforcement semantics (normalize BEFORE checking)
# ---------------------------------------------------------------------------
class TestCheckOperation(unittest.TestCase):
def test_fully_qualified_allowed(self):
ok, reason = check_operation("gitea.pr.merge", ["gitea.pr.merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_fully_qualified_forbidden(self):
ok, reason = check_operation(
"gitea.pr.merge", ["gitea.pr.merge"], ["gitea.pr.merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_legacy_unqualified_allowed(self):
ok, reason = check_operation("merge", ["gitea.pr.merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_legacy_unqualified_forbidden(self):
ok, reason = check_operation("merge", ["gitea.pr.merge"], ["merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_unknown_operation_fails_closed(self):
ok, reason = check_operation("frobnicate", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_ambiguous_operation_fails_closed(self):
ok, reason = check_operation("build.read", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_service_mismatch_rejected(self):
ok, reason = check_operation("jenkins.read", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_forbidden_overrides_allowed_across_spellings(self):
# Allowed via legacy spelling, forbidden via canonical spelling: the
# forbidden entry must win after both normalize to the same op.
ok, reason = check_operation("merge", ["merge"], ["gitea.pr.merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_empty_allowed_list_denies(self):
ok, reason = check_operation("gitea.read", [])
self.assertFalse(ok)
self.assertEqual(reason, "no-allowed-operations")
def test_missing_allowed_list_denies(self):
ok, reason = check_operation("gitea.read", None)
self.assertFalse(ok)
self.assertEqual(reason, "no-allowed-operations")
def test_duplicates_after_normalization_are_harmless(self):
ok, reason = check_operation(
"merge", ["merge", "gitea.pr.merge", "merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_unnormalizable_allowed_entry_grants_nothing(self):
# A junk allowed entry must not widen permissions to anything.
ok, reason = check_operation("gitea.read", ["frobnicate"])
self.assertFalse(ok)
self.assertEqual(reason, "not-allowed")
def test_unnormalizable_forbidden_entry_fails_closed(self):
# If a forbidden entry cannot be understood, deny rather than risk
# silently narrowing the forbidden set (which would widen permissions).
ok, reason = check_operation(
"gitea.read", ["gitea.read"], ["frobnicate"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-forbidden-entry")
# ---------------------------------------------------------------------------
# Eligibility enforcement — normalization happens before checking (#106)
# ---------------------------------------------------------------------------
class TestEligibilityNormalizesOperations(unittest.TestCase):
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {
"user": {"login": author},
"state": state,
"head": {"sha": sha},
"mergeable": mergeable,
}
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_namespaced_profile_ops_allow_legacy_action(self, _auth, mock_api):
# JSON-config profiles carry canonical namespaced ops; the raw action
# "merge" must still match them after normalization.
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
remote="prgs")
self.assertTrue(r["eligible"])
self.assertNotIn("profile is not allowed to merge", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_namespaced_forbidden_op_blocks_legacy_action(self, _auth, mock_api):
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("profile forbids 'merge'", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_legacy_env_ops_still_work(self, _auth, mock_api):
# v1/env behaviour stays compatible: unqualified env ops keep working.
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=5, action="review",
remote="prgs")
self.assertTrue(r["eligible"])
if __name__ == "__main__":
unittest.main()