Compare commits
8 Commits
9d6a2e0a5f
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 9c44fd6b27 | |||
| e880a210ec | |||
| 79450b57f5 | |||
| 23aa2fb192 | |||
| 9f75e28094 | |||
| c063842b2e | |||
| cd633e2c2b | |||
| e0861bcb03 |
@@ -134,8 +134,50 @@ Rules:
|
||||
appears in both, it is forbidden.
|
||||
- An operation not present in `allowed_operations` is treated as **not
|
||||
allowed** (deny by default).
|
||||
- These categories are descriptive for this issue. Their runtime enforcement is
|
||||
out of scope here (see roadmap links).
|
||||
|
||||
## Operation-name normalization (#106)
|
||||
|
||||
Canonical operation names are namespaced: `{service}.{area}.{verb}` (e.g.
|
||||
`gitea.pr.merge`, `jenkins.build.read`). Legacy unqualified spellings are
|
||||
accepted **only** through the explicit alias table below (the code of record
|
||||
is `GITEA_OPERATION_ALIASES` in `gitea_config.py`; the enforcement matrix is
|
||||
`tests/test_op_normalization.py`).
|
||||
|
||||
| Legacy spelling | Canonical operation |
|
||||
|-------------------|----------------------------|
|
||||
| `read` | `gitea.read` |
|
||||
| `review` | `gitea.pr.review` |
|
||||
| `comment` | `gitea.pr.comment` |
|
||||
| `approve` | `gitea.pr.approve` |
|
||||
| `request_changes` | `gitea.pr.request_changes` |
|
||||
| `merge` | `gitea.pr.merge` |
|
||||
| `pr.create` | `gitea.pr.create` |
|
||||
| `branch.push` | `gitea.branch.push` |
|
||||
| `branch` | `gitea.branch.create` |
|
||||
| `commit` | `gitea.repo.commit` |
|
||||
| `push` | `gitea.branch.push` |
|
||||
| `open_pr` | `gitea.pr.create` |
|
||||
|
||||
For non-Gitea services, a single unqualified word namespaces to the checked
|
||||
service (`read` → `jenkins.read` when checking Jenkins); names already
|
||||
prefixed with that service pass through unchanged.
|
||||
|
||||
Enforcement rules (`gitea_config.check_operation`, run **before** any
|
||||
allowed/forbidden membership check):
|
||||
|
||||
- Unknown operation names fail closed (denied).
|
||||
- Ambiguous names — dotted names that are neither service-prefixed nor in the
|
||||
alias table — fail closed.
|
||||
- Cross-service names are never accepted by the wrong service
|
||||
(`jenkins.read` never matches a Gitea check, and a Gitea alias is never
|
||||
applied to another service).
|
||||
- `forbidden_operations` overrides `allowed_operations` after both sides are
|
||||
normalized, so a legacy spelling can never bypass a canonical forbidden
|
||||
entry (or vice versa).
|
||||
- An allowed entry that cannot be normalized grants nothing; a forbidden
|
||||
entry that cannot be normalized denies the request. Normalization can
|
||||
therefore never silently widen permissions.
|
||||
- An empty or missing `allowed_operations` list denies everything.
|
||||
|
||||
## Identity and fail-closed rules
|
||||
|
||||
|
||||
+17
-1
@@ -473,13 +473,29 @@ def get_profile():
|
||||
token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() \
|
||||
or gitea_config.auth_source_name(jp)
|
||||
base_url = os.environ.get("GITEA_BASE_URL") or jp.get("base_url") or None
|
||||
auth_type = None
|
||||
if isinstance(jp.get("auth"), dict):
|
||||
auth_type = jp["auth"].get("type")
|
||||
elif token_source:
|
||||
if token_source.startswith("keychain:"):
|
||||
auth_type = "keychain"
|
||||
else:
|
||||
auth_type = "env"
|
||||
|
||||
return {
|
||||
"profile_name": name,
|
||||
"allowed_operations": ops,
|
||||
"forbidden_operations": forbidden,
|
||||
"audit_label": audit_label,
|
||||
"token_source_name": token_source,
|
||||
"auth_source_type": auth_type,
|
||||
"base_url": base_url,
|
||||
"username": jp.get("username") or None,
|
||||
"default_owner": jp.get("default_owner") or None,
|
||||
}
|
||||
"profile_path": jp.get("profile_path") or None,
|
||||
"environment": jp.get("environment") or None,
|
||||
"service": jp.get("service") or None,
|
||||
"identity": jp.get("identity") or None,
|
||||
"role": jp.get("role") or None,
|
||||
"execution_profile": jp.get("execution_profile") or None,
|
||||
}
|
||||
+72
-14
@@ -70,11 +70,13 @@ _TBD_RE = re.compile(r"(?i)^tbd(-|$)")
|
||||
# Keys that would mean an inline secret wherever they appear.
|
||||
_INLINE_SECRET_KEYS = ("token", "password", "secret")
|
||||
|
||||
# ── Minimal operation normalization (#103) ─────────────────────────────────────
|
||||
# Only what the #103 invariants need. The full normalization table, deprecation
|
||||
# handling, and enforcement test matrix belong to issue #106 — do not grow this
|
||||
# beyond invariant safety here.
|
||||
_MINIMAL_GITEA_OP_MAP = {
|
||||
# ── Operation-name normalization table (#106; minimal subset landed in #103) ───
|
||||
# Canonical operations are namespaced ({service}.{area}.{verb}). Legacy
|
||||
# unqualified spellings are accepted ONLY through this explicit table — never
|
||||
# by guessing. The same table is the documentation of record (see
|
||||
# docs/gitea-execution-profiles.md) and is exercised by
|
||||
# tests/test_op_normalization.py.
|
||||
GITEA_OPERATION_ALIASES = {
|
||||
"read": "gitea.read",
|
||||
"review": "gitea.pr.review",
|
||||
"comment": "gitea.pr.comment",
|
||||
@@ -94,27 +96,83 @@ _REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
|
||||
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
|
||||
|
||||
|
||||
def _normalize_op(service, op, addr):
|
||||
"""Normalize *op* for *service*, or fail closed (#103 minimal subset).
|
||||
def normalize_operation(op, service="gitea"):
|
||||
"""Return the canonical namespaced name for *op*, or fail closed (#106).
|
||||
|
||||
- already namespaced for this service (``{service}.*``) → unchanged
|
||||
- known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP``
|
||||
- known unqualified Gitea ops → mapped via ``GITEA_OPERATION_ALIASES``
|
||||
- unqualified single-word ops on non-Gitea services → ``{service}.{op}``
|
||||
- anything else (foreign prefixes, unknown unqualified names) → ConfigError
|
||||
- anything else — foreign service prefixes, dotted names outside the
|
||||
table, unknown unqualified names — is unknown or ambiguous → ConfigError
|
||||
|
||||
Normalization never crosses services (a Gitea alias is never applied to
|
||||
another service) and never widens permissions: an operation that cannot
|
||||
be normalized grants and matches nothing.
|
||||
"""
|
||||
if not isinstance(op, str) or not op:
|
||||
raise ConfigError(f"identity '{addr}' has an empty or non-string operation")
|
||||
raise ConfigError("operation must be a non-empty string (fail closed)")
|
||||
if op.startswith(service + "."):
|
||||
return op
|
||||
if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP:
|
||||
return _MINIMAL_GITEA_OP_MAP[op]
|
||||
if service == "gitea" and op in GITEA_OPERATION_ALIASES:
|
||||
return GITEA_OPERATION_ALIASES[op]
|
||||
if service != "gitea" and "." not in op:
|
||||
return f"{service}.{op}"
|
||||
raise ConfigError(
|
||||
f"identity '{addr}' has operation {op!r} that cannot be normalized "
|
||||
f"safely for service '{service}' (fail closed; full table is issue #106)"
|
||||
f"operation {op!r} cannot be normalized safely for service "
|
||||
f"'{service}' (unknown, ambiguous, or cross-service; fail closed)"
|
||||
)
|
||||
|
||||
|
||||
def check_operation(op, allowed, forbidden=(), service="gitea"):
|
||||
"""Decide whether *op* is permitted. Returns ``(bool, reason)`` (#106).
|
||||
|
||||
Everything is normalized via :func:`normalize_operation` BEFORE any
|
||||
membership check, so legacy and canonical spellings always compare equal.
|
||||
Reasons: ``allowed``, ``invalid-operation``, ``invalid-forbidden-entry``,
|
||||
``forbidden``, ``no-allowed-operations``, ``not-allowed``.
|
||||
|
||||
Fail-closed rules:
|
||||
- an *op* that cannot be normalized is denied (``invalid-operation``)
|
||||
- a forbidden entry that cannot be normalized denies the request
|
||||
(``invalid-forbidden-entry``) — dropping it would silently narrow the
|
||||
forbidden set, i.e. widen permissions
|
||||
- an allowed entry that cannot be normalized is ignored — it grants
|
||||
nothing, so permissions never widen
|
||||
- ``forbidden`` always overrides ``allowed``
|
||||
- an empty or missing allowed list denies everything
|
||||
"""
|
||||
try:
|
||||
op_n = normalize_operation(op, service)
|
||||
except ConfigError:
|
||||
return (False, "invalid-operation")
|
||||
forbidden_n = set()
|
||||
for entry in (forbidden or ()):
|
||||
try:
|
||||
forbidden_n.add(normalize_operation(entry, service))
|
||||
except ConfigError:
|
||||
return (False, "invalid-forbidden-entry")
|
||||
if op_n in forbidden_n:
|
||||
return (False, "forbidden")
|
||||
if not allowed:
|
||||
return (False, "no-allowed-operations")
|
||||
allowed_n = set()
|
||||
for entry in allowed:
|
||||
try:
|
||||
allowed_n.add(normalize_operation(entry, service))
|
||||
except ConfigError:
|
||||
continue
|
||||
if op_n in allowed_n:
|
||||
return (True, "allowed")
|
||||
return (False, "not-allowed")
|
||||
|
||||
|
||||
def _normalize_op(service, op, addr):
|
||||
"""Normalize *op* for identity *addr*, or fail closed with context."""
|
||||
try:
|
||||
return normalize_operation(op, service)
|
||||
except ConfigError as exc:
|
||||
raise ConfigError(f"identity '{addr}': {exc}") from None
|
||||
|
||||
# Default canonical config location (one file shared by all LLM launchers).
|
||||
DEFAULT_CONFIG_PATH = os.path.join(
|
||||
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
|
||||
|
||||
+32
-6
@@ -521,14 +521,24 @@ def gitea_check_pr_eligibility(
|
||||
return result
|
||||
|
||||
# Profile capability check (metadata only; not enforcement of the action).
|
||||
# Both the action and the profile lists are normalized before comparison
|
||||
# (#106), so legacy spellings ("merge") and canonical namespaced ops
|
||||
# ("gitea.pr.merge") always match each other and never cross services.
|
||||
allowed = profile["allowed_operations"]
|
||||
forbidden = profile["forbidden_operations"]
|
||||
if not allowed:
|
||||
reasons.append("profile has no configured allowed operations (fail closed)")
|
||||
if action in forbidden:
|
||||
reasons.append(f"profile forbids '{action}'")
|
||||
elif action not in allowed:
|
||||
reasons.append(f"profile is not allowed to {action}")
|
||||
op_ok, op_reason = gitea_config.check_operation(action, allowed, forbidden)
|
||||
if not op_ok:
|
||||
if op_reason == "no-allowed-operations":
|
||||
reasons.append(
|
||||
"profile has no configured allowed operations (fail closed)")
|
||||
elif op_reason == "forbidden":
|
||||
reasons.append(f"profile forbids '{action}'")
|
||||
elif op_reason == "invalid-forbidden-entry":
|
||||
reasons.append(
|
||||
"profile has an unrecognized forbidden operation entry "
|
||||
"(fail closed)")
|
||||
else:
|
||||
reasons.append(f"profile is not allowed to {action}")
|
||||
|
||||
h, o, r = _resolve(remote, host, org, repo)
|
||||
|
||||
@@ -1407,6 +1417,15 @@ def gitea_whoami(
|
||||
"profile": {
|
||||
"profile_name": profile["profile_name"],
|
||||
"allowed_operations": profile["allowed_operations"],
|
||||
"forbidden_operations": profile["forbidden_operations"],
|
||||
"environment": profile.get("environment"),
|
||||
"service": profile.get("service"),
|
||||
"identity": profile.get("identity"),
|
||||
"role": profile.get("role"),
|
||||
"profile_address": profile.get("profile_path"),
|
||||
"execution_profile": profile.get("execution_profile"),
|
||||
"audit_label": profile.get("audit_label"),
|
||||
"auth_source_type": profile.get("auth_source_type"),
|
||||
},
|
||||
}
|
||||
if _reveal_endpoints():
|
||||
@@ -1470,6 +1489,13 @@ def gitea_get_profile(
|
||||
"allowed_operations": profile["allowed_operations"],
|
||||
"forbidden_operations": profile["forbidden_operations"],
|
||||
"audit_label": profile["audit_label"],
|
||||
"environment": profile.get("environment"),
|
||||
"service": profile.get("service"),
|
||||
"identity": profile.get("identity"),
|
||||
"role": profile.get("role"),
|
||||
"profile_address": profile.get("profile_path"),
|
||||
"execution_profile": profile.get("execution_profile"),
|
||||
"auth_source_type": profile.get("auth_source_type"),
|
||||
# Auth is reported as a status only (#120): the token source *name*
|
||||
# (env var name / keychain id) joins endpoint URLs behind the
|
||||
# GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Token values never appear.
|
||||
|
||||
Executable
+284
@@ -0,0 +1,284 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Migration helper to convert profiles.json from version 1 to version 2 environments shape.
|
||||
|
||||
This script preserves existing keychain references (auth.id) and maps old profile
|
||||
names as aliases so that existing IDE configurations continue to function.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
# Resolve path to import gitea_config
|
||||
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
|
||||
if PROJECT_ROOT not in sys.path:
|
||||
sys.path.insert(0, PROJECT_ROOT)
|
||||
|
||||
import gitea_config
|
||||
|
||||
|
||||
AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"]
|
||||
AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"]
|
||||
REVIEWER_DEFAULT_ALLOWED = [
|
||||
"read", "review", "comment", "approve", "request_changes", "merge"
|
||||
]
|
||||
REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"]
|
||||
|
||||
|
||||
def infer_role(name, execution_profile):
|
||||
"""Return the unambiguous role for a legacy profile name, or None."""
|
||||
haystack = f"{name} {execution_profile or ''}".lower()
|
||||
has_author = "author" in haystack
|
||||
has_reviewer = "reviewer" in haystack
|
||||
if has_author == has_reviewer:
|
||||
return None
|
||||
return "reviewer" if has_reviewer else "author"
|
||||
|
||||
|
||||
def migration_summary(v2_data):
|
||||
"""Return a redacted summary of the migrated config."""
|
||||
environments = v2_data.get("environments", {})
|
||||
service_count = 0
|
||||
identity_count = 0
|
||||
for env in environments.values():
|
||||
services = env.get("services", {})
|
||||
service_count += len(services)
|
||||
for service in services.values():
|
||||
identity_count += len(service.get("identities", {}))
|
||||
return {
|
||||
"version": v2_data.get("version"),
|
||||
"environments": len(environments),
|
||||
"services": service_count,
|
||||
"identities": identity_count,
|
||||
"aliases": len(v2_data.get("aliases", {})),
|
||||
}
|
||||
|
||||
|
||||
def migrate_v1_to_v2(v1_data):
|
||||
"""Convert version 1 profiles.json format to version 2 environments format."""
|
||||
environments = {}
|
||||
aliases = {}
|
||||
|
||||
profiles = v1_data.get("profiles", {})
|
||||
if not isinstance(profiles, dict):
|
||||
raise ValueError("Malformed input: 'profiles' field must be a JSON object")
|
||||
|
||||
for name, prof in profiles.items():
|
||||
if not isinstance(prof, dict):
|
||||
raise ValueError(f"Malformed input: profile '{name}' must be a JSON object")
|
||||
|
||||
# Infer environment and identity name
|
||||
if "-" in name:
|
||||
parts = name.split("-", 1)
|
||||
env_name = parts[0]
|
||||
ident_name = parts[1]
|
||||
else:
|
||||
env_name = name
|
||||
ident_name = "author"
|
||||
|
||||
# Determine role and identity based on name / execution_profile.
|
||||
# Ambiguous profiles may still migrate only when they carry explicit
|
||||
# permissions; otherwise role-based defaults could widen permissions.
|
||||
exec_prof = prof.get("execution_profile") or ""
|
||||
role = infer_role(name, exec_prof)
|
||||
if role == "reviewer":
|
||||
ident_name = "reviewer"
|
||||
elif role == "author":
|
||||
ident_name = "author"
|
||||
else:
|
||||
role = prof.get("role")
|
||||
if role not in (None, "author", "reviewer"):
|
||||
raise ValueError(
|
||||
f"Profile '{name}' has unsupported role {role!r}"
|
||||
)
|
||||
|
||||
# Construct identity block
|
||||
identity_data = {
|
||||
"username": prof.get("username"),
|
||||
"auth": prof.get("auth"),
|
||||
}
|
||||
if role:
|
||||
identity_data["role"] = role
|
||||
if prof.get("execution_profile"):
|
||||
identity_data["execution_profile"] = prof["execution_profile"]
|
||||
|
||||
# Set audit label (default to old name to preserve context)
|
||||
identity_data["audit_label"] = prof.get("audit_label") or name
|
||||
|
||||
has_allowed = "allowed_operations" in prof
|
||||
has_forbidden = "forbidden_operations" in prof
|
||||
if has_allowed != has_forbidden:
|
||||
raise ValueError(
|
||||
f"Profile '{name}' must define both allowed_operations and "
|
||||
"forbidden_operations, or neither (fail closed)"
|
||||
)
|
||||
if has_allowed:
|
||||
allowed = prof.get("allowed_operations")
|
||||
forbidden = prof.get("forbidden_operations")
|
||||
if not isinstance(allowed, list) or not isinstance(forbidden, list):
|
||||
raise ValueError(
|
||||
f"Profile '{name}' operation fields must be lists"
|
||||
)
|
||||
identity_data["allowed_operations"] = list(allowed)
|
||||
identity_data["forbidden_operations"] = list(forbidden)
|
||||
elif role == "author":
|
||||
identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED)
|
||||
identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN)
|
||||
elif role == "reviewer":
|
||||
identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED)
|
||||
identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Profile '{name}' has no explicit operation lists and no "
|
||||
"unambiguous author/reviewer role marker (fail closed)"
|
||||
)
|
||||
|
||||
# Nest inside environments/services structure
|
||||
env = environments.setdefault(env_name, {})
|
||||
services = env.setdefault("services", {})
|
||||
gitea_svc = services.setdefault("gitea", {})
|
||||
|
||||
# Copy service-level attributes
|
||||
if prof.get("base_url"):
|
||||
gitea_svc["base_url"] = prof["base_url"]
|
||||
if prof.get("default_owner"):
|
||||
gitea_svc["default_owner"] = prof["default_owner"]
|
||||
if prof.get("default_repo"):
|
||||
gitea_svc["default_repo"] = prof["default_repo"]
|
||||
|
||||
identities = gitea_svc.setdefault("identities", {})
|
||||
identities[ident_name] = identity_data
|
||||
|
||||
# Alias resolution targets
|
||||
alias_target = f"{env_name}.gitea.{ident_name}"
|
||||
if name != alias_target:
|
||||
aliases[name] = alias_target
|
||||
|
||||
# Extra convenience alias for standard old-profile compatibility (e.g. prgs-author)
|
||||
convenience_alias = f"{env_name}-{ident_name}"
|
||||
if convenience_alias != alias_target and convenience_alias not in aliases:
|
||||
aliases[convenience_alias] = alias_target
|
||||
|
||||
v2_data = {
|
||||
"version": 2,
|
||||
"environments": environments,
|
||||
"aliases": aliases
|
||||
}
|
||||
return v2_data
|
||||
|
||||
|
||||
def validate_v2_data(v2_data):
|
||||
"""Validate generated v2 structure using gitea_config parser."""
|
||||
fd, temp_path = tempfile.mkstemp(suffix=".json")
|
||||
os.close(fd)
|
||||
try:
|
||||
with open(temp_path, "w") as f:
|
||||
json.dump(v2_data, f)
|
||||
# Attempt to load using load_config to run all validation rules
|
||||
gitea_config.load_config(temp_path)
|
||||
return True
|
||||
except Exception as e:
|
||||
raise ValueError(f"Generated v2 config failed validation: {e}")
|
||||
finally:
|
||||
try:
|
||||
os.remove(temp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Migrate profiles.json from version 1 to version 2 environments shape."
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i", "--input",
|
||||
default=gitea_config.DEFAULT_CONFIG_PATH,
|
||||
help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output",
|
||||
help="Path to write the migrated version 2 profiles.json file (default: overwrite input)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-w", "--write",
|
||||
action="store_true",
|
||||
help="Actually write the migrated config and create a backup (default is dry-run)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--backup",
|
||||
help="Path to write the backup file (default: <input_path>.bak)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
input_path = os.path.abspath(args.input)
|
||||
output_path = os.path.abspath(args.output or input_path)
|
||||
backup_path = args.backup or f"{input_path}.bak"
|
||||
|
||||
if not os.path.isfile(input_path):
|
||||
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
with open(input_path, "r") as f:
|
||||
v1_data = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Error reading input file: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Validate version
|
||||
version = v1_data.get("version")
|
||||
if version is not None and version != 1:
|
||||
print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
v2_data = migrate_v1_to_v2(v1_data)
|
||||
validate_v2_data(v2_data)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
if not args.write:
|
||||
print("=== DRY-RUN MODE (No files modified) ===")
|
||||
print("Generated v2 config validated successfully.")
|
||||
print("Only aggregate counts are shown.")
|
||||
summary = migration_summary(v2_data)
|
||||
print("Summary:")
|
||||
print(f" version: {summary['version']}")
|
||||
print(f" environments: {summary['environments']}")
|
||||
print(f" services: {summary['services']}")
|
||||
print(f" identities: {summary['identities']}")
|
||||
print(f" aliases: {summary['aliases']}")
|
||||
sys.exit(0)
|
||||
|
||||
# Write Mode: Create Backup first
|
||||
try:
|
||||
print(f"Creating backup: {backup_path}")
|
||||
shutil.copy2(input_path, backup_path)
|
||||
except Exception as e:
|
||||
print(f"Error creating backup: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Write migrated config
|
||||
try:
|
||||
print(f"Writing migrated version 2 config: {output_path}")
|
||||
# Ensure target directory exists
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, "w") as f:
|
||||
json.dump(v2_data, f, indent=2)
|
||||
f.write("\n")
|
||||
print("Migration completed successfully!")
|
||||
except Exception as e:
|
||||
print(f"Error writing output file: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -995,6 +995,65 @@ class TestRuntimeProfile(unittest.TestCase):
|
||||
for secret in ("super-secret-token", "token", "authorization", "basic "):
|
||||
self.assertNotIn(secret, blob)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_whoami_v2_metadata(self, _auth, mock_api):
|
||||
mock_api.return_value = {"id": 7, "login": "rev"}
|
||||
env = {
|
||||
"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "merge",
|
||||
"GITEA_AUDIT_LABEL": "reviewer-runtime",
|
||||
"GITEA_TOKEN_SOURCE": "keychain:prgs-reviewer-token",
|
||||
}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
result = gitea_whoami(remote="prgs")
|
||||
profile = result["profile"]
|
||||
self.assertEqual(profile["environment"], None)
|
||||
self.assertEqual(profile["service"], None)
|
||||
self.assertEqual(profile["identity"], None)
|
||||
self.assertEqual(profile["role"], None)
|
||||
self.assertEqual(profile["profile_address"], None)
|
||||
self.assertEqual(profile["execution_profile"], None)
|
||||
self.assertEqual(profile["audit_label"], "reviewer-runtime")
|
||||
self.assertEqual(profile["auth_source_type"], "keychain")
|
||||
self.assertEqual(profile["forbidden_operations"], ["merge"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
@patch("mcp_server.get_profile")
|
||||
def test_whoami_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
|
||||
mock_api.return_value = {"id": 7, "login": "rev"}
|
||||
mock_get_profile.return_value = {
|
||||
"profile_name": "prgs.gitea.reviewer",
|
||||
"allowed_operations": ["read", "review"],
|
||||
"forbidden_operations": ["merge"],
|
||||
"audit_label": "rev-audit",
|
||||
"token_source_name": "keychain:prgs-reviewer-token",
|
||||
"auth_source_type": "keychain",
|
||||
"base_url": "https://gitea.prgs.cc",
|
||||
"username": "sysadmin",
|
||||
"default_owner": "Scaled-Tech-Consulting",
|
||||
"profile_path": "prgs.gitea.reviewer",
|
||||
"environment": "prgs",
|
||||
"service": "gitea",
|
||||
"identity": "reviewer",
|
||||
"role": "reviewer",
|
||||
"execution_profile": "reviewer-profile",
|
||||
}
|
||||
result = gitea_whoami(remote="prgs")
|
||||
profile = result["profile"]
|
||||
self.assertEqual(profile["environment"], "prgs")
|
||||
self.assertEqual(profile["service"], "gitea")
|
||||
self.assertEqual(profile["identity"], "reviewer")
|
||||
self.assertEqual(profile["role"], "reviewer")
|
||||
self.assertEqual(profile["profile_address"], "prgs.gitea.reviewer")
|
||||
self.assertEqual(profile["execution_profile"], "reviewer-profile")
|
||||
self.assertEqual(profile["audit_label"], "rev-audit")
|
||||
self.assertEqual(profile["auth_source_type"], "keychain")
|
||||
self.assertEqual(profile["forbidden_operations"], ["merge"])
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile discovery (read-only) — issue #13
|
||||
@@ -1082,6 +1141,39 @@ class TestProfileDiscovery(unittest.TestCase):
|
||||
self.assertIsNone(result["remote"])
|
||||
self.assertIn("remote_error", result)
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
@patch("mcp_server.get_profile")
|
||||
def test_get_profile_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
|
||||
mock_api.return_value = {"id": 7, "login": "rev"}
|
||||
mock_get_profile.return_value = {
|
||||
"profile_name": "prgs.gitea.reviewer",
|
||||
"allowed_operations": ["read", "review"],
|
||||
"forbidden_operations": ["merge"],
|
||||
"audit_label": "rev-audit",
|
||||
"token_source_name": "keychain:prgs-reviewer-token",
|
||||
"auth_source_type": "keychain",
|
||||
"base_url": "https://gitea.prgs.cc",
|
||||
"username": "sysadmin",
|
||||
"default_owner": "Scaled-Tech-Consulting",
|
||||
"profile_path": "prgs.gitea.reviewer",
|
||||
"environment": "prgs",
|
||||
"service": "gitea",
|
||||
"identity": "reviewer",
|
||||
"role": "reviewer",
|
||||
"execution_profile": "reviewer-profile",
|
||||
}
|
||||
result = gitea_get_profile(remote="prgs")
|
||||
self.assertEqual(result["environment"], "prgs")
|
||||
self.assertEqual(result["service"], "gitea")
|
||||
self.assertEqual(result["identity"], "reviewer")
|
||||
self.assertEqual(result["role"], "reviewer")
|
||||
self.assertEqual(result["profile_address"], "prgs.gitea.reviewer")
|
||||
self.assertEqual(result["execution_profile"], "reviewer-profile")
|
||||
self.assertEqual(result["auth_source_type"], "keychain")
|
||||
self.assertEqual(result["forbidden_operations"], ["merge"])
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PR eligibility checks (read-only) — issue #14
|
||||
|
||||
@@ -0,0 +1,299 @@
|
||||
"""Unit tests for migrate_profiles.py migration helper."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import unittest
|
||||
import tempfile
|
||||
import shutil
|
||||
from unittest.mock import patch
|
||||
from io import StringIO
|
||||
|
||||
# Add project root to sys.path
|
||||
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
if PROJECT_ROOT not in sys.path:
|
||||
sys.path.insert(0, PROJECT_ROOT)
|
||||
|
||||
import migrate_profiles
|
||||
|
||||
|
||||
class TestMigrateProfiles(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.v1_content = {
|
||||
"version": 1,
|
||||
"profiles": {
|
||||
"prgs": {
|
||||
"base_url": "redacted-prgs-service",
|
||||
"username": "jcwalker3",
|
||||
"auth": {
|
||||
"type": "keychain",
|
||||
"id": "redacted-author-ref"
|
||||
},
|
||||
"default_owner": "Scaled-Tech-Consulting",
|
||||
"execution_profile": "personal-prgs",
|
||||
"allowed_operations": ["read", "comment"],
|
||||
"forbidden_operations": ["approve", "merge"]
|
||||
},
|
||||
"mdcps": {
|
||||
"base_url": "redacted-mdcps-service",
|
||||
"username": "913443",
|
||||
"auth": {
|
||||
"type": "keychain",
|
||||
"id": "redacted-mdcps-ref"
|
||||
},
|
||||
"default_owner": "Contractor",
|
||||
"execution_profile": "mdcps",
|
||||
"allowed_operations": ["read"],
|
||||
"forbidden_operations": ["merge"]
|
||||
},
|
||||
"prgs-reviewer": {
|
||||
"base_url": "redacted-prgs-service",
|
||||
"username": "sysadmin",
|
||||
"auth": {
|
||||
"type": "keychain",
|
||||
"id": "redacted-reviewer-ref"
|
||||
},
|
||||
"default_owner": "Scaled-Tech-Consulting",
|
||||
"execution_profile": "prgs-reviewer",
|
||||
"allowed_operations": [
|
||||
"read", "review", "comment", "approve",
|
||||
"request_changes", "merge"
|
||||
],
|
||||
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
|
||||
}
|
||||
}
|
||||
}
|
||||
self.input_file = os.path.join(self.temp_dir, "profiles.json")
|
||||
with open(self.input_file, "w") as f:
|
||||
json.dump(self.v1_content, f)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_migration_logic(self):
|
||||
"""Test the structural transformation and capability mapping."""
|
||||
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||
self.assertEqual(v2_data["version"], 2)
|
||||
|
||||
# Check environment structure
|
||||
envs = v2_data["environments"]
|
||||
self.assertIn("prgs", envs)
|
||||
self.assertIn("mdcps", envs)
|
||||
|
||||
# Check service and identity structure
|
||||
prgs_gitea = envs["prgs"]["services"]["gitea"]
|
||||
self.assertEqual(prgs_gitea["base_url"], "redacted-prgs-service")
|
||||
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
|
||||
|
||||
author = prgs_gitea["identities"]["author"]
|
||||
self.assertEqual(author["username"], "jcwalker3")
|
||||
self.assertEqual(author["auth"]["id"], "redacted-author-ref")
|
||||
self.assertEqual(author["allowed_operations"], ["read", "comment"])
|
||||
self.assertEqual(author["forbidden_operations"], ["approve", "merge"])
|
||||
|
||||
reviewer = prgs_gitea["identities"]["reviewer"]
|
||||
self.assertEqual(reviewer["role"], "reviewer")
|
||||
self.assertEqual(reviewer["username"], "sysadmin")
|
||||
self.assertEqual(reviewer["auth"]["id"], "redacted-reviewer-ref")
|
||||
self.assertIn("merge", reviewer["allowed_operations"])
|
||||
|
||||
def test_alias_generation(self):
|
||||
"""Test that aliases are correctly generated to support old profile names."""
|
||||
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||
aliases = v2_data["aliases"]
|
||||
|
||||
self.assertEqual(aliases["prgs"], "prgs.gitea.author")
|
||||
self.assertEqual(aliases["prgs-author"], "prgs.gitea.author")
|
||||
self.assertEqual(aliases["prgs-reviewer"], "prgs.gitea.reviewer")
|
||||
self.assertEqual(aliases["mdcps"], "mdcps.gitea.author")
|
||||
|
||||
def test_no_secret_behavior(self):
|
||||
"""Ensure secrets are never extracted, printed, or processed."""
|
||||
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||
# Check that auth structures only contain keychain references, not credentials
|
||||
for env in v2_data["environments"].values():
|
||||
for svc in env["services"].values():
|
||||
for ident in svc["identities"].values():
|
||||
auth = ident["auth"]
|
||||
self.assertEqual(auth["type"], "keychain")
|
||||
self.assertIn("id", auth)
|
||||
self.assertNotIn("token", auth)
|
||||
self.assertNotIn("password", auth)
|
||||
|
||||
def test_validation(self):
|
||||
"""Test that the generated v2 configuration validates against Gitea-Tools v2 loader."""
|
||||
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||
self.assertTrue(migrate_profiles.validate_v2_data(v2_data))
|
||||
|
||||
@patch("sys.stdout", new_callable=StringIO)
|
||||
def test_dry_run_default(self, mock_stdout):
|
||||
"""Verify that running without -w prints generated config without modifying files."""
|
||||
output_file = os.path.join(self.temp_dir, "migrated_dry.json")
|
||||
test_args = [
|
||||
"migrate_profiles.py",
|
||||
"-i", self.input_file,
|
||||
"-o", output_file
|
||||
]
|
||||
with patch.object(sys, "argv", test_args):
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
migrate_profiles.main()
|
||||
self.assertEqual(cm.exception.code, 0)
|
||||
|
||||
self.assertFalse(os.path.exists(output_file))
|
||||
self.assertFalse(os.path.exists(f"{self.input_file}.bak"))
|
||||
|
||||
stdout_output = mock_stdout.getvalue()
|
||||
self.assertIn("DRY-RUN MODE", stdout_output)
|
||||
self.assertIn("version", stdout_output)
|
||||
self.assertIn("identities", stdout_output)
|
||||
self.assertIn("aliases", stdout_output)
|
||||
self.assertNotIn("redacted-prgs-service", stdout_output)
|
||||
self.assertNotIn("redacted-mdcps-service", stdout_output)
|
||||
self.assertNotIn("redacted-author-ref", stdout_output)
|
||||
self.assertNotIn("redacted-mdcps-ref", stdout_output)
|
||||
self.assertNotIn("redacted-reviewer-ref", stdout_output)
|
||||
self.assertNotIn("keychain", stdout_output)
|
||||
self.assertNotIn("auth", stdout_output)
|
||||
|
||||
def test_dry_run_hides_token_like_values(self):
|
||||
"""Verify dry-run summary does not expose token-like auth metadata."""
|
||||
sensitive = json.loads(json.dumps(self.v1_content))
|
||||
sensitive["profiles"]["prgs"]["auth"]["id"] = "super-secret-token-value"
|
||||
with open(self.input_file, "w") as f:
|
||||
json.dump(sensitive, f)
|
||||
|
||||
test_args = ["migrate_profiles.py", "-i", self.input_file]
|
||||
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
|
||||
with patch.object(sys, "argv", test_args):
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
migrate_profiles.main()
|
||||
self.assertEqual(cm.exception.code, 0)
|
||||
|
||||
stdout_output = mock_stdout.getvalue()
|
||||
self.assertNotIn("super-secret-token-value", stdout_output)
|
||||
self.assertNotIn("token", stdout_output.lower())
|
||||
|
||||
def test_explicit_operations_are_preserved(self):
|
||||
"""Explicit v1 permissions must not be replaced by role defaults."""
|
||||
v1_data = json.loads(json.dumps(self.v1_content))
|
||||
v1_data["profiles"]["prgs-reviewer"]["allowed_operations"] = ["read"]
|
||||
v1_data["profiles"]["prgs-reviewer"]["forbidden_operations"] = ["merge"]
|
||||
|
||||
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||
reviewer = (
|
||||
v2_data["environments"]["prgs"]["services"]["gitea"]
|
||||
["identities"]["reviewer"]
|
||||
)
|
||||
self.assertEqual(reviewer["allowed_operations"], ["read"])
|
||||
self.assertEqual(reviewer["forbidden_operations"], ["merge"])
|
||||
|
||||
def test_inferred_role_defaults_only_when_unambiguous(self):
|
||||
"""Role defaults are allowed only for clear author/reviewer profiles."""
|
||||
v1_data = {
|
||||
"version": 1,
|
||||
"profiles": {
|
||||
"prgs-author": {
|
||||
"base_url": "redacted-prgs-service",
|
||||
"username": "jcwalker3",
|
||||
"auth": {"type": "keychain", "id": "hidden-author-ref"},
|
||||
"execution_profile": "prgs-author",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||
author = (
|
||||
v2_data["environments"]["prgs"]["services"]["gitea"]
|
||||
["identities"]["author"]
|
||||
)
|
||||
self.assertEqual(
|
||||
author["allowed_operations"],
|
||||
migrate_profiles.AUTHOR_DEFAULT_ALLOWED,
|
||||
)
|
||||
self.assertEqual(
|
||||
author["forbidden_operations"],
|
||||
migrate_profiles.AUTHOR_DEFAULT_FORBIDDEN,
|
||||
)
|
||||
|
||||
def test_ambiguous_permission_source_fails_closed(self):
|
||||
"""A profile without explicit permissions or clear role must not widen."""
|
||||
v1_data = {
|
||||
"version": 1,
|
||||
"profiles": {
|
||||
"prgs": {
|
||||
"base_url": "redacted-prgs-service",
|
||||
"username": "jcwalker3",
|
||||
"auth": {"type": "keychain", "id": "hidden-author-ref"},
|
||||
"execution_profile": "personal-prgs",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
with self.assertRaisesRegex(ValueError, "fail closed"):
|
||||
migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||
|
||||
def test_partial_permission_source_fails_closed(self):
|
||||
"""Allowed without forbidden, or vice versa, is ambiguous."""
|
||||
v1_data = json.loads(json.dumps(self.v1_content))
|
||||
del v1_data["profiles"]["prgs"]["forbidden_operations"]
|
||||
|
||||
with self.assertRaisesRegex(ValueError, "fail closed"):
|
||||
migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||
|
||||
def test_write_mode_and_backup(self):
|
||||
"""Verify that write mode creates a backup and correctly saves the validated config."""
|
||||
output_file = os.path.join(self.temp_dir, "migrated.json")
|
||||
backup_file = os.path.join(self.temp_dir, "profiles_backup.json.bak")
|
||||
|
||||
test_args = [
|
||||
"migrate_profiles.py",
|
||||
"-i", self.input_file,
|
||||
"-o", output_file,
|
||||
"--backup", backup_file,
|
||||
"-w"
|
||||
]
|
||||
with patch.object(sys, "argv", test_args):
|
||||
migrate_profiles.main()
|
||||
|
||||
# Verify backup exists and matches original v1 config
|
||||
self.assertTrue(os.path.exists(backup_file))
|
||||
with open(backup_file, "r") as f:
|
||||
backup_data = json.load(f)
|
||||
self.assertEqual(backup_data["version"], 1)
|
||||
self.assertIn("prgs", backup_data["profiles"])
|
||||
|
||||
# Verify migrated v2 config exists and validates
|
||||
self.assertTrue(os.path.exists(output_file))
|
||||
with open(output_file, "r") as f:
|
||||
v2_data = json.load(f)
|
||||
self.assertEqual(v2_data["version"], 2)
|
||||
self.assertIn("environments", v2_data)
|
||||
self.assertEqual(v2_data["aliases"]["prgs"], "prgs.gitea.author")
|
||||
|
||||
def test_malformed_input_fails_safely(self):
|
||||
"""Test that malformed JSON or invalid version numbers cause a clean exit with code 1."""
|
||||
bad_json_file = os.path.join(self.temp_dir, "bad.json")
|
||||
with open(bad_json_file, "w") as f:
|
||||
f.write("{invalid-json}")
|
||||
|
||||
test_args = ["migrate_profiles.py", "-i", bad_json_file]
|
||||
with patch.object(sys, "argv", test_args):
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
migrate_profiles.main()
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
|
||||
bad_version_file = os.path.join(self.temp_dir, "bad_version.json")
|
||||
with open(bad_version_file, "w") as f:
|
||||
json.dump({"version": 3, "profiles": {}}, f)
|
||||
|
||||
test_args = ["migrate_profiles.py", "-i", bad_version_file]
|
||||
with patch.object(sys, "argv", test_args):
|
||||
with self.assertRaises(SystemExit) as cm:
|
||||
migrate_profiles.main()
|
||||
self.assertEqual(cm.exception.code, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,239 @@
|
||||
"""Operation-name normalization table and enforcement tests — issue #106.
|
||||
|
||||
Covers the required matrix from #106:
|
||||
|
||||
- fully qualified allowed / forbidden operations
|
||||
- legacy unqualified allowed / forbidden operations
|
||||
- unknown operations (fail closed)
|
||||
- ambiguous operations (fail closed)
|
||||
- service mismatch (cross-service names never accepted by the wrong service)
|
||||
- forbidden-overrides-allowed
|
||||
- empty / missing allowed list
|
||||
- duplicate operations after normalization
|
||||
- no silent permission widening
|
||||
- eligibility enforcement normalizes before checking
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
|
||||
|
||||
import gitea_config # noqa: E402
|
||||
from gitea_config import ( # noqa: E402
|
||||
ConfigError,
|
||||
check_operation,
|
||||
normalize_operation,
|
||||
)
|
||||
from mcp_server import gitea_check_pr_eligibility # noqa: E402
|
||||
|
||||
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# normalize_operation — canonical table
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestNormalizeOperation(unittest.TestCase):
|
||||
|
||||
def test_fully_qualified_gitea_op_unchanged(self):
|
||||
self.assertEqual(normalize_operation("gitea.pr.merge"), "gitea.pr.merge")
|
||||
|
||||
def test_legacy_aliases_map_to_canonical_names(self):
|
||||
expected = {
|
||||
"merge": "gitea.pr.merge",
|
||||
"approve": "gitea.pr.approve",
|
||||
"request_changes": "gitea.pr.request_changes",
|
||||
"review": "gitea.pr.review",
|
||||
"comment": "gitea.pr.comment",
|
||||
"read": "gitea.read",
|
||||
}
|
||||
for legacy, canonical in expected.items():
|
||||
self.assertEqual(normalize_operation(legacy), canonical)
|
||||
|
||||
def test_contexts_shape_author_verbs(self):
|
||||
self.assertEqual(normalize_operation("branch"), "gitea.branch.create")
|
||||
self.assertEqual(normalize_operation("commit"), "gitea.repo.commit")
|
||||
self.assertEqual(normalize_operation("push"), "gitea.branch.push")
|
||||
self.assertEqual(normalize_operation("open_pr"), "gitea.pr.create")
|
||||
|
||||
def test_unknown_unqualified_op_fails_closed(self):
|
||||
with self.assertRaises(ConfigError):
|
||||
normalize_operation("frobnicate")
|
||||
|
||||
def test_ambiguous_dotted_op_fails_closed(self):
|
||||
# Dotted but neither gitea-prefixed nor an explicit alias: refuse to
|
||||
# guess which namespace was meant.
|
||||
with self.assertRaises(ConfigError):
|
||||
normalize_operation("build.read")
|
||||
|
||||
def test_cross_service_name_rejected_by_wrong_service(self):
|
||||
with self.assertRaises(ConfigError):
|
||||
normalize_operation("jenkins.read", service="gitea")
|
||||
with self.assertRaises(ConfigError):
|
||||
normalize_operation("gitea.read", service="jenkins")
|
||||
|
||||
def test_non_gitea_single_word_namespaced_to_service(self):
|
||||
self.assertEqual(normalize_operation("read", service="jenkins"),
|
||||
"jenkins.read")
|
||||
|
||||
def test_non_gitea_qualified_own_prefix_unchanged(self):
|
||||
self.assertEqual(
|
||||
normalize_operation("jenkins.build.read", service="jenkins"),
|
||||
"jenkins.build.read",
|
||||
)
|
||||
|
||||
def test_empty_and_non_string_fail_closed(self):
|
||||
for bad in ("", None, 3, ["merge"]):
|
||||
with self.assertRaises(ConfigError):
|
||||
normalize_operation(bad)
|
||||
|
||||
def test_gitea_alias_not_applied_to_other_services(self):
|
||||
# "merge" on jenkins must not resolve to the *gitea* merge permission.
|
||||
self.assertEqual(normalize_operation("merge", service="jenkins"),
|
||||
"jenkins.merge")
|
||||
|
||||
def test_table_is_documented_and_matches_normalization(self):
|
||||
table = gitea_config.GITEA_OPERATION_ALIASES
|
||||
self.assertIsInstance(table, dict)
|
||||
self.assertTrue(table)
|
||||
for legacy, canonical in table.items():
|
||||
self.assertEqual(normalize_operation(legacy), canonical)
|
||||
self.assertTrue(canonical.startswith("gitea."))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# check_operation — enforcement semantics (normalize BEFORE checking)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestCheckOperation(unittest.TestCase):
|
||||
|
||||
def test_fully_qualified_allowed(self):
|
||||
ok, reason = check_operation("gitea.pr.merge", ["gitea.pr.merge"])
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(reason, "allowed")
|
||||
|
||||
def test_fully_qualified_forbidden(self):
|
||||
ok, reason = check_operation(
|
||||
"gitea.pr.merge", ["gitea.pr.merge"], ["gitea.pr.merge"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "forbidden")
|
||||
|
||||
def test_legacy_unqualified_allowed(self):
|
||||
ok, reason = check_operation("merge", ["gitea.pr.merge"])
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(reason, "allowed")
|
||||
|
||||
def test_legacy_unqualified_forbidden(self):
|
||||
ok, reason = check_operation("merge", ["gitea.pr.merge"], ["merge"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "forbidden")
|
||||
|
||||
def test_unknown_operation_fails_closed(self):
|
||||
ok, reason = check_operation("frobnicate", ["gitea.read"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "invalid-operation")
|
||||
|
||||
def test_ambiguous_operation_fails_closed(self):
|
||||
ok, reason = check_operation("build.read", ["gitea.read"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "invalid-operation")
|
||||
|
||||
def test_service_mismatch_rejected(self):
|
||||
ok, reason = check_operation("jenkins.read", ["gitea.read"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "invalid-operation")
|
||||
|
||||
def test_forbidden_overrides_allowed_across_spellings(self):
|
||||
# Allowed via legacy spelling, forbidden via canonical spelling: the
|
||||
# forbidden entry must win after both normalize to the same op.
|
||||
ok, reason = check_operation("merge", ["merge"], ["gitea.pr.merge"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "forbidden")
|
||||
|
||||
def test_empty_allowed_list_denies(self):
|
||||
ok, reason = check_operation("gitea.read", [])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "no-allowed-operations")
|
||||
|
||||
def test_missing_allowed_list_denies(self):
|
||||
ok, reason = check_operation("gitea.read", None)
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "no-allowed-operations")
|
||||
|
||||
def test_duplicates_after_normalization_are_harmless(self):
|
||||
ok, reason = check_operation(
|
||||
"merge", ["merge", "gitea.pr.merge", "merge"])
|
||||
self.assertTrue(ok)
|
||||
self.assertEqual(reason, "allowed")
|
||||
|
||||
def test_unnormalizable_allowed_entry_grants_nothing(self):
|
||||
# A junk allowed entry must not widen permissions to anything.
|
||||
ok, reason = check_operation("gitea.read", ["frobnicate"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "not-allowed")
|
||||
|
||||
def test_unnormalizable_forbidden_entry_fails_closed(self):
|
||||
# If a forbidden entry cannot be understood, deny rather than risk
|
||||
# silently narrowing the forbidden set (which would widen permissions).
|
||||
ok, reason = check_operation(
|
||||
"gitea.read", ["gitea.read"], ["frobnicate"])
|
||||
self.assertFalse(ok)
|
||||
self.assertEqual(reason, "invalid-forbidden-entry")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Eligibility enforcement — normalization happens before checking (#106)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestEligibilityNormalizesOperations(unittest.TestCase):
|
||||
|
||||
def _pr(self, author, state="open", sha="abc123", mergeable=True):
|
||||
return {
|
||||
"user": {"login": author},
|
||||
"state": state,
|
||||
"head": {"sha": sha},
|
||||
"mergeable": mergeable,
|
||||
}
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_namespaced_profile_ops_allow_legacy_action(self, _auth, mock_api):
|
||||
# JSON-config profiles carry canonical namespaced ops; the raw action
|
||||
# "merge" must still match them after normalization.
|
||||
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
||||
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
|
||||
remote="prgs")
|
||||
self.assertTrue(r["eligible"])
|
||||
self.assertNotIn("profile is not allowed to merge", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_namespaced_forbidden_op_blocks_legacy_action(self, _auth, mock_api):
|
||||
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-merger",
|
||||
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge",
|
||||
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.merge"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
|
||||
remote="prgs")
|
||||
self.assertFalse(r["eligible"])
|
||||
self.assertIn("profile forbids 'merge'", r["reasons"])
|
||||
|
||||
@patch("mcp_server.api_request")
|
||||
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
|
||||
def test_legacy_env_ops_still_work(self, _auth, mock_api):
|
||||
# v1/env behaviour stays compatible: unqualified env ops keep working.
|
||||
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
|
||||
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
|
||||
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
|
||||
with patch.dict(os.environ, env, clear=True):
|
||||
r = gitea_check_pr_eligibility(pr_number=5, action="review",
|
||||
remote="prgs")
|
||||
self.assertTrue(r["eligible"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user