feat(config): add v1-to-v2 profiles.json migration helper (#105) #123
Executable
+284
@@ -0,0 +1,284 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Migration helper to convert profiles.json from version 1 to version 2 environments shape.
|
||||||
|
|
||||||
|
This script preserves existing keychain references (auth.id) and maps old profile
|
||||||
|
names as aliases so that existing IDE configurations continue to function.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import argparse
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
# Resolve path to import gitea_config
|
||||||
|
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
|
||||||
|
if PROJECT_ROOT not in sys.path:
|
||||||
|
sys.path.insert(0, PROJECT_ROOT)
|
||||||
|
|
||||||
|
import gitea_config
|
||||||
|
|
||||||
|
|
||||||
|
AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"]
|
||||||
|
AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"]
|
||||||
|
REVIEWER_DEFAULT_ALLOWED = [
|
||||||
|
"read", "review", "comment", "approve", "request_changes", "merge"
|
||||||
|
]
|
||||||
|
REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"]
|
||||||
|
|
||||||
|
|
||||||
|
def infer_role(name, execution_profile):
|
||||||
|
"""Return the unambiguous role for a legacy profile name, or None."""
|
||||||
|
haystack = f"{name} {execution_profile or ''}".lower()
|
||||||
|
has_author = "author" in haystack
|
||||||
|
has_reviewer = "reviewer" in haystack
|
||||||
|
if has_author == has_reviewer:
|
||||||
|
return None
|
||||||
|
return "reviewer" if has_reviewer else "author"
|
||||||
|
|
||||||
|
|
||||||
|
def migration_summary(v2_data):
|
||||||
|
"""Return a redacted summary of the migrated config."""
|
||||||
|
environments = v2_data.get("environments", {})
|
||||||
|
service_count = 0
|
||||||
|
identity_count = 0
|
||||||
|
for env in environments.values():
|
||||||
|
services = env.get("services", {})
|
||||||
|
service_count += len(services)
|
||||||
|
for service in services.values():
|
||||||
|
identity_count += len(service.get("identities", {}))
|
||||||
|
return {
|
||||||
|
"version": v2_data.get("version"),
|
||||||
|
"environments": len(environments),
|
||||||
|
"services": service_count,
|
||||||
|
"identities": identity_count,
|
||||||
|
"aliases": len(v2_data.get("aliases", {})),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def migrate_v1_to_v2(v1_data):
|
||||||
|
"""Convert version 1 profiles.json format to version 2 environments format."""
|
||||||
|
environments = {}
|
||||||
|
aliases = {}
|
||||||
|
|
||||||
|
profiles = v1_data.get("profiles", {})
|
||||||
|
if not isinstance(profiles, dict):
|
||||||
|
raise ValueError("Malformed input: 'profiles' field must be a JSON object")
|
||||||
|
|
||||||
|
for name, prof in profiles.items():
|
||||||
|
if not isinstance(prof, dict):
|
||||||
|
raise ValueError(f"Malformed input: profile '{name}' must be a JSON object")
|
||||||
|
|
||||||
|
# Infer environment and identity name
|
||||||
|
if "-" in name:
|
||||||
|
parts = name.split("-", 1)
|
||||||
|
env_name = parts[0]
|
||||||
|
ident_name = parts[1]
|
||||||
|
else:
|
||||||
|
env_name = name
|
||||||
|
ident_name = "author"
|
||||||
|
|
||||||
|
# Determine role and identity based on name / execution_profile.
|
||||||
|
# Ambiguous profiles may still migrate only when they carry explicit
|
||||||
|
# permissions; otherwise role-based defaults could widen permissions.
|
||||||
|
exec_prof = prof.get("execution_profile") or ""
|
||||||
|
role = infer_role(name, exec_prof)
|
||||||
|
if role == "reviewer":
|
||||||
|
ident_name = "reviewer"
|
||||||
|
elif role == "author":
|
||||||
|
ident_name = "author"
|
||||||
|
else:
|
||||||
|
role = prof.get("role")
|
||||||
|
if role not in (None, "author", "reviewer"):
|
||||||
|
raise ValueError(
|
||||||
|
f"Profile '{name}' has unsupported role {role!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Construct identity block
|
||||||
|
identity_data = {
|
||||||
|
"username": prof.get("username"),
|
||||||
|
"auth": prof.get("auth"),
|
||||||
|
}
|
||||||
|
if role:
|
||||||
|
identity_data["role"] = role
|
||||||
|
if prof.get("execution_profile"):
|
||||||
|
identity_data["execution_profile"] = prof["execution_profile"]
|
||||||
|
|
||||||
|
# Set audit label (default to old name to preserve context)
|
||||||
|
identity_data["audit_label"] = prof.get("audit_label") or name
|
||||||
|
|
||||||
|
has_allowed = "allowed_operations" in prof
|
||||||
|
has_forbidden = "forbidden_operations" in prof
|
||||||
|
if has_allowed != has_forbidden:
|
||||||
|
raise ValueError(
|
||||||
|
f"Profile '{name}' must define both allowed_operations and "
|
||||||
|
"forbidden_operations, or neither (fail closed)"
|
||||||
|
)
|
||||||
|
if has_allowed:
|
||||||
|
allowed = prof.get("allowed_operations")
|
||||||
|
forbidden = prof.get("forbidden_operations")
|
||||||
|
if not isinstance(allowed, list) or not isinstance(forbidden, list):
|
||||||
|
raise ValueError(
|
||||||
|
f"Profile '{name}' operation fields must be lists"
|
||||||
|
)
|
||||||
|
identity_data["allowed_operations"] = list(allowed)
|
||||||
|
identity_data["forbidden_operations"] = list(forbidden)
|
||||||
|
elif role == "author":
|
||||||
|
identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED)
|
||||||
|
identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN)
|
||||||
|
elif role == "reviewer":
|
||||||
|
identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED)
|
||||||
|
identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN)
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
f"Profile '{name}' has no explicit operation lists and no "
|
||||||
|
"unambiguous author/reviewer role marker (fail closed)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Nest inside environments/services structure
|
||||||
|
env = environments.setdefault(env_name, {})
|
||||||
|
services = env.setdefault("services", {})
|
||||||
|
gitea_svc = services.setdefault("gitea", {})
|
||||||
|
|
||||||
|
# Copy service-level attributes
|
||||||
|
if prof.get("base_url"):
|
||||||
|
gitea_svc["base_url"] = prof["base_url"]
|
||||||
|
if prof.get("default_owner"):
|
||||||
|
gitea_svc["default_owner"] = prof["default_owner"]
|
||||||
|
if prof.get("default_repo"):
|
||||||
|
gitea_svc["default_repo"] = prof["default_repo"]
|
||||||
|
|
||||||
|
identities = gitea_svc.setdefault("identities", {})
|
||||||
|
identities[ident_name] = identity_data
|
||||||
|
|
||||||
|
# Alias resolution targets
|
||||||
|
alias_target = f"{env_name}.gitea.{ident_name}"
|
||||||
|
if name != alias_target:
|
||||||
|
aliases[name] = alias_target
|
||||||
|
|
||||||
|
# Extra convenience alias for standard old-profile compatibility (e.g. prgs-author)
|
||||||
|
convenience_alias = f"{env_name}-{ident_name}"
|
||||||
|
if convenience_alias != alias_target and convenience_alias not in aliases:
|
||||||
|
aliases[convenience_alias] = alias_target
|
||||||
|
|
||||||
|
v2_data = {
|
||||||
|
"version": 2,
|
||||||
|
"environments": environments,
|
||||||
|
"aliases": aliases
|
||||||
|
}
|
||||||
|
return v2_data
|
||||||
|
|
||||||
|
|
||||||
|
def validate_v2_data(v2_data):
|
||||||
|
"""Validate generated v2 structure using gitea_config parser."""
|
||||||
|
fd, temp_path = tempfile.mkstemp(suffix=".json")
|
||||||
|
os.close(fd)
|
||||||
|
try:
|
||||||
|
with open(temp_path, "w") as f:
|
||||||
|
json.dump(v2_data, f)
|
||||||
|
# Attempt to load using load_config to run all validation rules
|
||||||
|
gitea_config.load_config(temp_path)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"Generated v2 config failed validation: {e}")
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
os.remove(temp_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Migrate profiles.json from version 1 to version 2 environments shape."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-i", "--input",
|
||||||
|
default=gitea_config.DEFAULT_CONFIG_PATH,
|
||||||
|
help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-o", "--output",
|
||||||
|
help="Path to write the migrated version 2 profiles.json file (default: overwrite input)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-w", "--write",
|
||||||
|
action="store_true",
|
||||||
|
help="Actually write the migrated config and create a backup (default is dry-run)"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--backup",
|
||||||
|
help="Path to write the backup file (default: <input_path>.bak)"
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_path = os.path.abspath(args.input)
|
||||||
|
output_path = os.path.abspath(args.output or input_path)
|
||||||
|
backup_path = args.backup or f"{input_path}.bak"
|
||||||
|
|
||||||
|
if not os.path.isfile(input_path):
|
||||||
|
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(input_path, "r") as f:
|
||||||
|
v1_data = json.load(f)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading input file: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Validate version
|
||||||
|
version = v1_data.get("version")
|
||||||
|
if version is not None and version != 1:
|
||||||
|
print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
v2_data = migrate_v1_to_v2(v1_data)
|
||||||
|
validate_v2_data(v2_data)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not args.write:
|
||||||
|
print("=== DRY-RUN MODE (No files modified) ===")
|
||||||
|
print("Generated v2 config validated successfully.")
|
||||||
|
print("Only aggregate counts are shown.")
|
||||||
|
summary = migration_summary(v2_data)
|
||||||
|
print("Summary:")
|
||||||
|
print(f" version: {summary['version']}")
|
||||||
|
print(f" environments: {summary['environments']}")
|
||||||
|
print(f" services: {summary['services']}")
|
||||||
|
print(f" identities: {summary['identities']}")
|
||||||
|
print(f" aliases: {summary['aliases']}")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Write Mode: Create Backup first
|
||||||
|
try:
|
||||||
|
print(f"Creating backup: {backup_path}")
|
||||||
|
shutil.copy2(input_path, backup_path)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error creating backup: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Write migrated config
|
||||||
|
try:
|
||||||
|
print(f"Writing migrated version 2 config: {output_path}")
|
||||||
|
# Ensure target directory exists
|
||||||
|
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||||
|
with open(output_path, "w") as f:
|
||||||
|
json.dump(v2_data, f, indent=2)
|
||||||
|
f.write("\n")
|
||||||
|
print("Migration completed successfully!")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error writing output file: {e}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,299 @@
|
|||||||
|
"""Unit tests for migrate_profiles.py migration helper."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import unittest
|
||||||
|
import tempfile
|
||||||
|
import shutil
|
||||||
|
from unittest.mock import patch
|
||||||
|
from io import StringIO
|
||||||
|
|
||||||
|
# Add project root to sys.path
|
||||||
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
if PROJECT_ROOT not in sys.path:
|
||||||
|
sys.path.insert(0, PROJECT_ROOT)
|
||||||
|
|
||||||
|
import migrate_profiles
|
||||||
|
|
||||||
|
|
||||||
|
class TestMigrateProfiles(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.temp_dir = tempfile.mkdtemp()
|
||||||
|
self.v1_content = {
|
||||||
|
"version": 1,
|
||||||
|
"profiles": {
|
||||||
|
"prgs": {
|
||||||
|
"base_url": "redacted-prgs-service",
|
||||||
|
"username": "jcwalker3",
|
||||||
|
"auth": {
|
||||||
|
"type": "keychain",
|
||||||
|
"id": "redacted-author-ref"
|
||||||
|
},
|
||||||
|
"default_owner": "Scaled-Tech-Consulting",
|
||||||
|
"execution_profile": "personal-prgs",
|
||||||
|
"allowed_operations": ["read", "comment"],
|
||||||
|
"forbidden_operations": ["approve", "merge"]
|
||||||
|
},
|
||||||
|
"mdcps": {
|
||||||
|
"base_url": "redacted-mdcps-service",
|
||||||
|
"username": "913443",
|
||||||
|
"auth": {
|
||||||
|
"type": "keychain",
|
||||||
|
"id": "redacted-mdcps-ref"
|
||||||
|
},
|
||||||
|
"default_owner": "Contractor",
|
||||||
|
"execution_profile": "mdcps",
|
||||||
|
"allowed_operations": ["read"],
|
||||||
|
"forbidden_operations": ["merge"]
|
||||||
|
},
|
||||||
|
"prgs-reviewer": {
|
||||||
|
"base_url": "redacted-prgs-service",
|
||||||
|
"username": "sysadmin",
|
||||||
|
"auth": {
|
||||||
|
"type": "keychain",
|
||||||
|
"id": "redacted-reviewer-ref"
|
||||||
|
},
|
||||||
|
"default_owner": "Scaled-Tech-Consulting",
|
||||||
|
"execution_profile": "prgs-reviewer",
|
||||||
|
"allowed_operations": [
|
||||||
|
"read", "review", "comment", "approve",
|
||||||
|
"request_changes", "merge"
|
||||||
|
],
|
||||||
|
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.input_file = os.path.join(self.temp_dir, "profiles.json")
|
||||||
|
with open(self.input_file, "w") as f:
|
||||||
|
json.dump(self.v1_content, f)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
shutil.rmtree(self.temp_dir)
|
||||||
|
|
||||||
|
def test_migration_logic(self):
|
||||||
|
"""Test the structural transformation and capability mapping."""
|
||||||
|
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||||
|
self.assertEqual(v2_data["version"], 2)
|
||||||
|
|
||||||
|
# Check environment structure
|
||||||
|
envs = v2_data["environments"]
|
||||||
|
self.assertIn("prgs", envs)
|
||||||
|
self.assertIn("mdcps", envs)
|
||||||
|
|
||||||
|
# Check service and identity structure
|
||||||
|
prgs_gitea = envs["prgs"]["services"]["gitea"]
|
||||||
|
self.assertEqual(prgs_gitea["base_url"], "redacted-prgs-service")
|
||||||
|
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
|
||||||
|
|
||||||
|
author = prgs_gitea["identities"]["author"]
|
||||||
|
self.assertEqual(author["username"], "jcwalker3")
|
||||||
|
self.assertEqual(author["auth"]["id"], "redacted-author-ref")
|
||||||
|
self.assertEqual(author["allowed_operations"], ["read", "comment"])
|
||||||
|
self.assertEqual(author["forbidden_operations"], ["approve", "merge"])
|
||||||
|
|
||||||
|
reviewer = prgs_gitea["identities"]["reviewer"]
|
||||||
|
self.assertEqual(reviewer["role"], "reviewer")
|
||||||
|
self.assertEqual(reviewer["username"], "sysadmin")
|
||||||
|
self.assertEqual(reviewer["auth"]["id"], "redacted-reviewer-ref")
|
||||||
|
self.assertIn("merge", reviewer["allowed_operations"])
|
||||||
|
|
||||||
|
def test_alias_generation(self):
|
||||||
|
"""Test that aliases are correctly generated to support old profile names."""
|
||||||
|
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||||
|
aliases = v2_data["aliases"]
|
||||||
|
|
||||||
|
self.assertEqual(aliases["prgs"], "prgs.gitea.author")
|
||||||
|
self.assertEqual(aliases["prgs-author"], "prgs.gitea.author")
|
||||||
|
self.assertEqual(aliases["prgs-reviewer"], "prgs.gitea.reviewer")
|
||||||
|
self.assertEqual(aliases["mdcps"], "mdcps.gitea.author")
|
||||||
|
|
||||||
|
def test_no_secret_behavior(self):
|
||||||
|
"""Ensure secrets are never extracted, printed, or processed."""
|
||||||
|
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||||
|
# Check that auth structures only contain keychain references, not credentials
|
||||||
|
for env in v2_data["environments"].values():
|
||||||
|
for svc in env["services"].values():
|
||||||
|
for ident in svc["identities"].values():
|
||||||
|
auth = ident["auth"]
|
||||||
|
self.assertEqual(auth["type"], "keychain")
|
||||||
|
self.assertIn("id", auth)
|
||||||
|
self.assertNotIn("token", auth)
|
||||||
|
self.assertNotIn("password", auth)
|
||||||
|
|
||||||
|
def test_validation(self):
|
||||||
|
"""Test that the generated v2 configuration validates against Gitea-Tools v2 loader."""
|
||||||
|
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
|
||||||
|
self.assertTrue(migrate_profiles.validate_v2_data(v2_data))
|
||||||
|
|
||||||
|
@patch("sys.stdout", new_callable=StringIO)
|
||||||
|
def test_dry_run_default(self, mock_stdout):
|
||||||
|
"""Verify that running without -w prints generated config without modifying files."""
|
||||||
|
output_file = os.path.join(self.temp_dir, "migrated_dry.json")
|
||||||
|
test_args = [
|
||||||
|
"migrate_profiles.py",
|
||||||
|
"-i", self.input_file,
|
||||||
|
"-o", output_file
|
||||||
|
]
|
||||||
|
with patch.object(sys, "argv", test_args):
|
||||||
|
with self.assertRaises(SystemExit) as cm:
|
||||||
|
migrate_profiles.main()
|
||||||
|
self.assertEqual(cm.exception.code, 0)
|
||||||
|
|
||||||
|
self.assertFalse(os.path.exists(output_file))
|
||||||
|
self.assertFalse(os.path.exists(f"{self.input_file}.bak"))
|
||||||
|
|
||||||
|
stdout_output = mock_stdout.getvalue()
|
||||||
|
self.assertIn("DRY-RUN MODE", stdout_output)
|
||||||
|
self.assertIn("version", stdout_output)
|
||||||
|
self.assertIn("identities", stdout_output)
|
||||||
|
self.assertIn("aliases", stdout_output)
|
||||||
|
self.assertNotIn("redacted-prgs-service", stdout_output)
|
||||||
|
self.assertNotIn("redacted-mdcps-service", stdout_output)
|
||||||
|
self.assertNotIn("redacted-author-ref", stdout_output)
|
||||||
|
self.assertNotIn("redacted-mdcps-ref", stdout_output)
|
||||||
|
self.assertNotIn("redacted-reviewer-ref", stdout_output)
|
||||||
|
self.assertNotIn("keychain", stdout_output)
|
||||||
|
self.assertNotIn("auth", stdout_output)
|
||||||
|
|
||||||
|
def test_dry_run_hides_token_like_values(self):
|
||||||
|
"""Verify dry-run summary does not expose token-like auth metadata."""
|
||||||
|
sensitive = json.loads(json.dumps(self.v1_content))
|
||||||
|
sensitive["profiles"]["prgs"]["auth"]["id"] = "super-secret-token-value"
|
||||||
|
with open(self.input_file, "w") as f:
|
||||||
|
json.dump(sensitive, f)
|
||||||
|
|
||||||
|
test_args = ["migrate_profiles.py", "-i", self.input_file]
|
||||||
|
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
|
||||||
|
with patch.object(sys, "argv", test_args):
|
||||||
|
with self.assertRaises(SystemExit) as cm:
|
||||||
|
migrate_profiles.main()
|
||||||
|
self.assertEqual(cm.exception.code, 0)
|
||||||
|
|
||||||
|
stdout_output = mock_stdout.getvalue()
|
||||||
|
self.assertNotIn("super-secret-token-value", stdout_output)
|
||||||
|
self.assertNotIn("token", stdout_output.lower())
|
||||||
|
|
||||||
|
def test_explicit_operations_are_preserved(self):
|
||||||
|
"""Explicit v1 permissions must not be replaced by role defaults."""
|
||||||
|
v1_data = json.loads(json.dumps(self.v1_content))
|
||||||
|
v1_data["profiles"]["prgs-reviewer"]["allowed_operations"] = ["read"]
|
||||||
|
v1_data["profiles"]["prgs-reviewer"]["forbidden_operations"] = ["merge"]
|
||||||
|
|
||||||
|
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||||
|
reviewer = (
|
||||||
|
v2_data["environments"]["prgs"]["services"]["gitea"]
|
||||||
|
["identities"]["reviewer"]
|
||||||
|
)
|
||||||
|
self.assertEqual(reviewer["allowed_operations"], ["read"])
|
||||||
|
self.assertEqual(reviewer["forbidden_operations"], ["merge"])
|
||||||
|
|
||||||
|
def test_inferred_role_defaults_only_when_unambiguous(self):
|
||||||
|
"""Role defaults are allowed only for clear author/reviewer profiles."""
|
||||||
|
v1_data = {
|
||||||
|
"version": 1,
|
||||||
|
"profiles": {
|
||||||
|
"prgs-author": {
|
||||||
|
"base_url": "redacted-prgs-service",
|
||||||
|
"username": "jcwalker3",
|
||||||
|
"auth": {"type": "keychain", "id": "hidden-author-ref"},
|
||||||
|
"execution_profile": "prgs-author",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||||
|
author = (
|
||||||
|
v2_data["environments"]["prgs"]["services"]["gitea"]
|
||||||
|
["identities"]["author"]
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
author["allowed_operations"],
|
||||||
|
migrate_profiles.AUTHOR_DEFAULT_ALLOWED,
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
author["forbidden_operations"],
|
||||||
|
migrate_profiles.AUTHOR_DEFAULT_FORBIDDEN,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_ambiguous_permission_source_fails_closed(self):
|
||||||
|
"""A profile without explicit permissions or clear role must not widen."""
|
||||||
|
v1_data = {
|
||||||
|
"version": 1,
|
||||||
|
"profiles": {
|
||||||
|
"prgs": {
|
||||||
|
"base_url": "redacted-prgs-service",
|
||||||
|
"username": "jcwalker3",
|
||||||
|
"auth": {"type": "keychain", "id": "hidden-author-ref"},
|
||||||
|
"execution_profile": "personal-prgs",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.assertRaisesRegex(ValueError, "fail closed"):
|
||||||
|
migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||||
|
|
||||||
|
def test_partial_permission_source_fails_closed(self):
|
||||||
|
"""Allowed without forbidden, or vice versa, is ambiguous."""
|
||||||
|
v1_data = json.loads(json.dumps(self.v1_content))
|
||||||
|
del v1_data["profiles"]["prgs"]["forbidden_operations"]
|
||||||
|
|
||||||
|
with self.assertRaisesRegex(ValueError, "fail closed"):
|
||||||
|
migrate_profiles.migrate_v1_to_v2(v1_data)
|
||||||
|
|
||||||
|
def test_write_mode_and_backup(self):
|
||||||
|
"""Verify that write mode creates a backup and correctly saves the validated config."""
|
||||||
|
output_file = os.path.join(self.temp_dir, "migrated.json")
|
||||||
|
backup_file = os.path.join(self.temp_dir, "profiles_backup.json.bak")
|
||||||
|
|
||||||
|
test_args = [
|
||||||
|
"migrate_profiles.py",
|
||||||
|
"-i", self.input_file,
|
||||||
|
"-o", output_file,
|
||||||
|
"--backup", backup_file,
|
||||||
|
"-w"
|
||||||
|
]
|
||||||
|
with patch.object(sys, "argv", test_args):
|
||||||
|
migrate_profiles.main()
|
||||||
|
|
||||||
|
# Verify backup exists and matches original v1 config
|
||||||
|
self.assertTrue(os.path.exists(backup_file))
|
||||||
|
with open(backup_file, "r") as f:
|
||||||
|
backup_data = json.load(f)
|
||||||
|
self.assertEqual(backup_data["version"], 1)
|
||||||
|
self.assertIn("prgs", backup_data["profiles"])
|
||||||
|
|
||||||
|
# Verify migrated v2 config exists and validates
|
||||||
|
self.assertTrue(os.path.exists(output_file))
|
||||||
|
with open(output_file, "r") as f:
|
||||||
|
v2_data = json.load(f)
|
||||||
|
self.assertEqual(v2_data["version"], 2)
|
||||||
|
self.assertIn("environments", v2_data)
|
||||||
|
self.assertEqual(v2_data["aliases"]["prgs"], "prgs.gitea.author")
|
||||||
|
|
||||||
|
def test_malformed_input_fails_safely(self):
|
||||||
|
"""Test that malformed JSON or invalid version numbers cause a clean exit with code 1."""
|
||||||
|
bad_json_file = os.path.join(self.temp_dir, "bad.json")
|
||||||
|
with open(bad_json_file, "w") as f:
|
||||||
|
f.write("{invalid-json}")
|
||||||
|
|
||||||
|
test_args = ["migrate_profiles.py", "-i", bad_json_file]
|
||||||
|
with patch.object(sys, "argv", test_args):
|
||||||
|
with self.assertRaises(SystemExit) as cm:
|
||||||
|
migrate_profiles.main()
|
||||||
|
self.assertEqual(cm.exception.code, 1)
|
||||||
|
|
||||||
|
bad_version_file = os.path.join(self.temp_dir, "bad_version.json")
|
||||||
|
with open(bad_version_file, "w") as f:
|
||||||
|
json.dump({"version": 3, "profiles": {}}, f)
|
||||||
|
|
||||||
|
test_args = ["migrate_profiles.py", "-i", bad_version_file]
|
||||||
|
with patch.object(sys, "argv", test_args):
|
||||||
|
with self.assertRaises(SystemExit) as cm:
|
||||||
|
migrate_profiles.main()
|
||||||
|
self.assertEqual(cm.exception.code, 1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user