Merge pull request 'feat(config): add v1-to-v2 profiles.json migration helper (#105)' (#123) from feat/issue-105-profiles-migration into master

This commit was merged in pull request #123.
This commit is contained in:
2026-07-03 17:05:05 -05:00
2 changed files with 583 additions and 0 deletions
+284
View File
@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""Migration helper to convert profiles.json from version 1 to version 2 environments shape.
This script preserves existing keychain references (auth.id) and maps old profile
names as aliases so that existing IDE configurations continue to function.
"""
import os
import sys
import json
import argparse
import shutil
import tempfile
# Resolve path to import gitea_config
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import gitea_config
AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"]
AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"]
REVIEWER_DEFAULT_ALLOWED = [
"read", "review", "comment", "approve", "request_changes", "merge"
]
REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"]
def infer_role(name, execution_profile):
"""Return the unambiguous role for a legacy profile name, or None."""
haystack = f"{name} {execution_profile or ''}".lower()
has_author = "author" in haystack
has_reviewer = "reviewer" in haystack
if has_author == has_reviewer:
return None
return "reviewer" if has_reviewer else "author"
def migration_summary(v2_data):
"""Return a redacted summary of the migrated config."""
environments = v2_data.get("environments", {})
service_count = 0
identity_count = 0
for env in environments.values():
services = env.get("services", {})
service_count += len(services)
for service in services.values():
identity_count += len(service.get("identities", {}))
return {
"version": v2_data.get("version"),
"environments": len(environments),
"services": service_count,
"identities": identity_count,
"aliases": len(v2_data.get("aliases", {})),
}
def migrate_v1_to_v2(v1_data):
"""Convert version 1 profiles.json format to version 2 environments format."""
environments = {}
aliases = {}
profiles = v1_data.get("profiles", {})
if not isinstance(profiles, dict):
raise ValueError("Malformed input: 'profiles' field must be a JSON object")
for name, prof in profiles.items():
if not isinstance(prof, dict):
raise ValueError(f"Malformed input: profile '{name}' must be a JSON object")
# Infer environment and identity name
if "-" in name:
parts = name.split("-", 1)
env_name = parts[0]
ident_name = parts[1]
else:
env_name = name
ident_name = "author"
# Determine role and identity based on name / execution_profile.
# Ambiguous profiles may still migrate only when they carry explicit
# permissions; otherwise role-based defaults could widen permissions.
exec_prof = prof.get("execution_profile") or ""
role = infer_role(name, exec_prof)
if role == "reviewer":
ident_name = "reviewer"
elif role == "author":
ident_name = "author"
else:
role = prof.get("role")
if role not in (None, "author", "reviewer"):
raise ValueError(
f"Profile '{name}' has unsupported role {role!r}"
)
# Construct identity block
identity_data = {
"username": prof.get("username"),
"auth": prof.get("auth"),
}
if role:
identity_data["role"] = role
if prof.get("execution_profile"):
identity_data["execution_profile"] = prof["execution_profile"]
# Set audit label (default to old name to preserve context)
identity_data["audit_label"] = prof.get("audit_label") or name
has_allowed = "allowed_operations" in prof
has_forbidden = "forbidden_operations" in prof
if has_allowed != has_forbidden:
raise ValueError(
f"Profile '{name}' must define both allowed_operations and "
"forbidden_operations, or neither (fail closed)"
)
if has_allowed:
allowed = prof.get("allowed_operations")
forbidden = prof.get("forbidden_operations")
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ValueError(
f"Profile '{name}' operation fields must be lists"
)
identity_data["allowed_operations"] = list(allowed)
identity_data["forbidden_operations"] = list(forbidden)
elif role == "author":
identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN)
elif role == "reviewer":
identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN)
else:
raise ValueError(
f"Profile '{name}' has no explicit operation lists and no "
"unambiguous author/reviewer role marker (fail closed)"
)
# Nest inside environments/services structure
env = environments.setdefault(env_name, {})
services = env.setdefault("services", {})
gitea_svc = services.setdefault("gitea", {})
# Copy service-level attributes
if prof.get("base_url"):
gitea_svc["base_url"] = prof["base_url"]
if prof.get("default_owner"):
gitea_svc["default_owner"] = prof["default_owner"]
if prof.get("default_repo"):
gitea_svc["default_repo"] = prof["default_repo"]
identities = gitea_svc.setdefault("identities", {})
identities[ident_name] = identity_data
# Alias resolution targets
alias_target = f"{env_name}.gitea.{ident_name}"
if name != alias_target:
aliases[name] = alias_target
# Extra convenience alias for standard old-profile compatibility (e.g. prgs-author)
convenience_alias = f"{env_name}-{ident_name}"
if convenience_alias != alias_target and convenience_alias not in aliases:
aliases[convenience_alias] = alias_target
v2_data = {
"version": 2,
"environments": environments,
"aliases": aliases
}
return v2_data
def validate_v2_data(v2_data):
"""Validate generated v2 structure using gitea_config parser."""
fd, temp_path = tempfile.mkstemp(suffix=".json")
os.close(fd)
try:
with open(temp_path, "w") as f:
json.dump(v2_data, f)
# Attempt to load using load_config to run all validation rules
gitea_config.load_config(temp_path)
return True
except Exception as e:
raise ValueError(f"Generated v2 config failed validation: {e}")
finally:
try:
os.remove(temp_path)
except OSError:
pass
def main():
parser = argparse.ArgumentParser(
description="Migrate profiles.json from version 1 to version 2 environments shape."
)
parser.add_argument(
"-i", "--input",
default=gitea_config.DEFAULT_CONFIG_PATH,
help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)"
)
parser.add_argument(
"-o", "--output",
help="Path to write the migrated version 2 profiles.json file (default: overwrite input)"
)
parser.add_argument(
"-w", "--write",
action="store_true",
help="Actually write the migrated config and create a backup (default is dry-run)"
)
parser.add_argument(
"--backup",
help="Path to write the backup file (default: <input_path>.bak)"
)
args = parser.parse_args()
input_path = os.path.abspath(args.input)
output_path = os.path.abspath(args.output or input_path)
backup_path = args.backup or f"{input_path}.bak"
if not os.path.isfile(input_path):
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
sys.exit(1)
try:
with open(input_path, "r") as f:
v1_data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error reading input file: {e}", file=sys.stderr)
sys.exit(1)
# Validate version
version = v1_data.get("version")
if version is not None and version != 1:
print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr)
sys.exit(1)
try:
v2_data = migrate_v1_to_v2(v1_data)
validate_v2_data(v2_data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not args.write:
print("=== DRY-RUN MODE (No files modified) ===")
print("Generated v2 config validated successfully.")
print("Only aggregate counts are shown.")
summary = migration_summary(v2_data)
print("Summary:")
print(f" version: {summary['version']}")
print(f" environments: {summary['environments']}")
print(f" services: {summary['services']}")
print(f" identities: {summary['identities']}")
print(f" aliases: {summary['aliases']}")
sys.exit(0)
# Write Mode: Create Backup first
try:
print(f"Creating backup: {backup_path}")
shutil.copy2(input_path, backup_path)
except Exception as e:
print(f"Error creating backup: {e}", file=sys.stderr)
sys.exit(1)
# Write migrated config
try:
print(f"Writing migrated version 2 config: {output_path}")
# Ensure target directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w") as f:
json.dump(v2_data, f, indent=2)
f.write("\n")
print("Migration completed successfully!")
except Exception as e:
print(f"Error writing output file: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+299
View File
@@ -0,0 +1,299 @@
"""Unit tests for migrate_profiles.py migration helper."""
import os
import sys
import json
import unittest
import tempfile
import shutil
from unittest.mock import patch
from io import StringIO
# Add project root to sys.path
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import migrate_profiles
class TestMigrateProfiles(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.v1_content = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {
"type": "keychain",
"id": "redacted-author-ref"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "personal-prgs",
"allowed_operations": ["read", "comment"],
"forbidden_operations": ["approve", "merge"]
},
"mdcps": {
"base_url": "redacted-mdcps-service",
"username": "913443",
"auth": {
"type": "keychain",
"id": "redacted-mdcps-ref"
},
"default_owner": "Contractor",
"execution_profile": "mdcps",
"allowed_operations": ["read"],
"forbidden_operations": ["merge"]
},
"prgs-reviewer": {
"base_url": "redacted-prgs-service",
"username": "sysadmin",
"auth": {
"type": "keychain",
"id": "redacted-reviewer-ref"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "prgs-reviewer",
"allowed_operations": [
"read", "review", "comment", "approve",
"request_changes", "merge"
],
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
}
}
}
self.input_file = os.path.join(self.temp_dir, "profiles.json")
with open(self.input_file, "w") as f:
json.dump(self.v1_content, f)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_migration_logic(self):
"""Test the structural transformation and capability mapping."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertEqual(v2_data["version"], 2)
# Check environment structure
envs = v2_data["environments"]
self.assertIn("prgs", envs)
self.assertIn("mdcps", envs)
# Check service and identity structure
prgs_gitea = envs["prgs"]["services"]["gitea"]
self.assertEqual(prgs_gitea["base_url"], "redacted-prgs-service")
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
author = prgs_gitea["identities"]["author"]
self.assertEqual(author["username"], "jcwalker3")
self.assertEqual(author["auth"]["id"], "redacted-author-ref")
self.assertEqual(author["allowed_operations"], ["read", "comment"])
self.assertEqual(author["forbidden_operations"], ["approve", "merge"])
reviewer = prgs_gitea["identities"]["reviewer"]
self.assertEqual(reviewer["role"], "reviewer")
self.assertEqual(reviewer["username"], "sysadmin")
self.assertEqual(reviewer["auth"]["id"], "redacted-reviewer-ref")
self.assertIn("merge", reviewer["allowed_operations"])
def test_alias_generation(self):
"""Test that aliases are correctly generated to support old profile names."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
aliases = v2_data["aliases"]
self.assertEqual(aliases["prgs"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-author"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-reviewer"], "prgs.gitea.reviewer")
self.assertEqual(aliases["mdcps"], "mdcps.gitea.author")
def test_no_secret_behavior(self):
"""Ensure secrets are never extracted, printed, or processed."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
# Check that auth structures only contain keychain references, not credentials
for env in v2_data["environments"].values():
for svc in env["services"].values():
for ident in svc["identities"].values():
auth = ident["auth"]
self.assertEqual(auth["type"], "keychain")
self.assertIn("id", auth)
self.assertNotIn("token", auth)
self.assertNotIn("password", auth)
def test_validation(self):
"""Test that the generated v2 configuration validates against Gitea-Tools v2 loader."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertTrue(migrate_profiles.validate_v2_data(v2_data))
@patch("sys.stdout", new_callable=StringIO)
def test_dry_run_default(self, mock_stdout):
"""Verify that running without -w prints generated config without modifying files."""
output_file = os.path.join(self.temp_dir, "migrated_dry.json")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file
]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
self.assertFalse(os.path.exists(output_file))
self.assertFalse(os.path.exists(f"{self.input_file}.bak"))
stdout_output = mock_stdout.getvalue()
self.assertIn("DRY-RUN MODE", stdout_output)
self.assertIn("version", stdout_output)
self.assertIn("identities", stdout_output)
self.assertIn("aliases", stdout_output)
self.assertNotIn("redacted-prgs-service", stdout_output)
self.assertNotIn("redacted-mdcps-service", stdout_output)
self.assertNotIn("redacted-author-ref", stdout_output)
self.assertNotIn("redacted-mdcps-ref", stdout_output)
self.assertNotIn("redacted-reviewer-ref", stdout_output)
self.assertNotIn("keychain", stdout_output)
self.assertNotIn("auth", stdout_output)
def test_dry_run_hides_token_like_values(self):
"""Verify dry-run summary does not expose token-like auth metadata."""
sensitive = json.loads(json.dumps(self.v1_content))
sensitive["profiles"]["prgs"]["auth"]["id"] = "super-secret-token-value"
with open(self.input_file, "w") as f:
json.dump(sensitive, f)
test_args = ["migrate_profiles.py", "-i", self.input_file]
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
stdout_output = mock_stdout.getvalue()
self.assertNotIn("super-secret-token-value", stdout_output)
self.assertNotIn("token", stdout_output.lower())
def test_explicit_operations_are_preserved(self):
"""Explicit v1 permissions must not be replaced by role defaults."""
v1_data = json.loads(json.dumps(self.v1_content))
v1_data["profiles"]["prgs-reviewer"]["allowed_operations"] = ["read"]
v1_data["profiles"]["prgs-reviewer"]["forbidden_operations"] = ["merge"]
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
reviewer = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["reviewer"]
)
self.assertEqual(reviewer["allowed_operations"], ["read"])
self.assertEqual(reviewer["forbidden_operations"], ["merge"])
def test_inferred_role_defaults_only_when_unambiguous(self):
"""Role defaults are allowed only for clear author/reviewer profiles."""
v1_data = {
"version": 1,
"profiles": {
"prgs-author": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "prgs-author",
}
}
}
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
author = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["author"]
)
self.assertEqual(
author["allowed_operations"],
migrate_profiles.AUTHOR_DEFAULT_ALLOWED,
)
self.assertEqual(
author["forbidden_operations"],
migrate_profiles.AUTHOR_DEFAULT_FORBIDDEN,
)
def test_ambiguous_permission_source_fails_closed(self):
"""A profile without explicit permissions or clear role must not widen."""
v1_data = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "personal-prgs",
}
}
}
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_partial_permission_source_fails_closed(self):
"""Allowed without forbidden, or vice versa, is ambiguous."""
v1_data = json.loads(json.dumps(self.v1_content))
del v1_data["profiles"]["prgs"]["forbidden_operations"]
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_write_mode_and_backup(self):
"""Verify that write mode creates a backup and correctly saves the validated config."""
output_file = os.path.join(self.temp_dir, "migrated.json")
backup_file = os.path.join(self.temp_dir, "profiles_backup.json.bak")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file,
"--backup", backup_file,
"-w"
]
with patch.object(sys, "argv", test_args):
migrate_profiles.main()
# Verify backup exists and matches original v1 config
self.assertTrue(os.path.exists(backup_file))
with open(backup_file, "r") as f:
backup_data = json.load(f)
self.assertEqual(backup_data["version"], 1)
self.assertIn("prgs", backup_data["profiles"])
# Verify migrated v2 config exists and validates
self.assertTrue(os.path.exists(output_file))
with open(output_file, "r") as f:
v2_data = json.load(f)
self.assertEqual(v2_data["version"], 2)
self.assertIn("environments", v2_data)
self.assertEqual(v2_data["aliases"]["prgs"], "prgs.gitea.author")
def test_malformed_input_fails_safely(self):
"""Test that malformed JSON or invalid version numbers cause a clean exit with code 1."""
bad_json_file = os.path.join(self.temp_dir, "bad.json")
with open(bad_json_file, "w") as f:
f.write("{invalid-json}")
test_args = ["migrate_profiles.py", "-i", bad_json_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
bad_version_file = os.path.join(self.temp_dir, "bad_version.json")
with open(bad_version_file, "w") as f:
json.dump({"version": 3, "profiles": {}}, f)
test_args = ["migrate_profiles.py", "-i", bad_version_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
if __name__ == "__main__":
unittest.main()