fix: harden profiles migration helper

This commit is contained in:
2026-07-03 17:53:07 -04:00
parent cd633e2c2b
commit 23aa2fb192
2 changed files with 232 additions and 70 deletions
+86 -26
View File
@@ -20,6 +20,43 @@ if PROJECT_ROOT not in sys.path:
import gitea_config import gitea_config
AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"]
AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"]
REVIEWER_DEFAULT_ALLOWED = [
"read", "review", "comment", "approve", "request_changes", "merge"
]
REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"]
def infer_role(name, execution_profile):
"""Return the unambiguous role for a legacy profile name, or None."""
haystack = f"{name} {execution_profile or ''}".lower()
has_author = "author" in haystack
has_reviewer = "reviewer" in haystack
if has_author == has_reviewer:
return None
return "reviewer" if has_reviewer else "author"
def migration_summary(v2_data):
"""Return a redacted summary of the migrated config."""
environments = v2_data.get("environments", {})
service_count = 0
identity_count = 0
for env in environments.values():
services = env.get("services", {})
service_count += len(services)
for service in services.values():
identity_count += len(service.get("identities", {}))
return {
"version": v2_data.get("version"),
"environments": len(environments),
"services": service_count,
"identities": identity_count,
"aliases": len(v2_data.get("aliases", {})),
}
def migrate_v1_to_v2(v1_data): def migrate_v1_to_v2(v1_data):
"""Convert version 1 profiles.json format to version 2 environments format.""" """Convert version 1 profiles.json format to version 2 environments format."""
environments = {} environments = {}
@@ -42,43 +79,62 @@ def migrate_v1_to_v2(v1_data):
env_name = name env_name = name
ident_name = "author" ident_name = "author"
# Determine role and identity based on name / execution_profile # Determine role and identity based on name / execution_profile.
role = "author" # Ambiguous profiles may still migrate only when they carry explicit
# permissions; otherwise role-based defaults could widen permissions.
exec_prof = prof.get("execution_profile") or "" exec_prof = prof.get("execution_profile") or ""
if "reviewer" in name or "reviewer" in exec_prof: role = infer_role(name, exec_prof)
role = "reviewer" if role == "reviewer":
ident_name = "reviewer" ident_name = "reviewer"
elif "author" in name or "author" in exec_prof: elif role == "author":
role = "author"
ident_name = "author" ident_name = "author"
else:
role = prof.get("role")
if role not in (None, "author", "reviewer"):
raise ValueError(
f"Profile '{name}' has unsupported role {role!r}"
)
# Construct identity block # Construct identity block
identity_data = { identity_data = {
"role": role,
"username": prof.get("username"), "username": prof.get("username"),
"auth": prof.get("auth"), "auth": prof.get("auth"),
} }
if role:
identity_data["role"] = role
if prof.get("execution_profile"): if prof.get("execution_profile"):
identity_data["execution_profile"] = prof["execution_profile"] identity_data["execution_profile"] = prof["execution_profile"]
# Set audit label (default to old name to preserve context) # Set audit label (default to old name to preserve context)
identity_data["audit_label"] = prof.get("audit_label") or name identity_data["audit_label"] = prof.get("audit_label") or name
# Populate capabilities based on role has_allowed = "allowed_operations" in prof
if role == "author": has_forbidden = "forbidden_operations" in prof
identity_data["allowed_operations"] = [ if has_allowed != has_forbidden:
"read", "branch", "commit", "push", "open_pr", "comment" raise ValueError(
] f"Profile '{name}' must define both allowed_operations and "
identity_data["forbidden_operations"] = [ "forbidden_operations, or neither (fail closed)"
"approve", "request_changes", "merge" )
] if has_allowed:
allowed = prof.get("allowed_operations")
forbidden = prof.get("forbidden_operations")
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ValueError(
f"Profile '{name}' operation fields must be lists"
)
identity_data["allowed_operations"] = list(allowed)
identity_data["forbidden_operations"] = list(forbidden)
elif role == "author":
identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN)
elif role == "reviewer":
identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN)
else: else:
identity_data["allowed_operations"] = [ raise ValueError(
"read", "review", "comment", "approve", "request_changes", "merge" f"Profile '{name}' has no explicit operation lists and no "
] "unambiguous author/reviewer role marker (fail closed)"
identity_data["forbidden_operations"] = [ )
"branch", "commit", "push", "open_pr"
]
# Nest inside environments/services structure # Nest inside environments/services structure
env = environments.setdefault(env_name, {}) env = environments.setdefault(env_name, {})
@@ -191,11 +247,15 @@ def main():
if not args.write: if not args.write:
print("=== DRY-RUN MODE (No files modified) ===") print("=== DRY-RUN MODE (No files modified) ===")
print(f"Would read from: {input_path}") print("Generated v2 config validated successfully.")
print(f"Would create backup at: {backup_path}") print("Only aggregate counts are shown.")
print(f"Would write v2 config to: {output_path}") summary = migration_summary(v2_data)
print("\nGenerated v2 config:") print("Summary:")
print(json.dumps(v2_data, indent=2)) print(f" version: {summary['version']}")
print(f" environments: {summary['environments']}")
print(f" services: {summary['services']}")
print(f" identities: {summary['identities']}")
print(f" aliases: {summary['aliases']}")
sys.exit(0) sys.exit(0)
# Write Mode: Create Backup first # Write Mode: Create Backup first
+132 -30
View File
@@ -25,34 +25,43 @@ class TestMigrateProfiles(unittest.TestCase):
"version": 1, "version": 1,
"profiles": { "profiles": {
"prgs": { "prgs": {
"base_url": "https://gitea.prgs.cc", "base_url": "redacted-prgs-service",
"username": "jcwalker3", "username": "jcwalker3",
"auth": { "auth": {
"type": "keychain", "type": "keychain",
"id": "prgs-gitea-token" "id": "redacted-author-ref"
}, },
"default_owner": "Scaled-Tech-Consulting", "default_owner": "Scaled-Tech-Consulting",
"execution_profile": "personal-prgs" "execution_profile": "personal-prgs",
"allowed_operations": ["read", "comment"],
"forbidden_operations": ["approve", "merge"]
}, },
"mdcps": { "mdcps": {
"base_url": "https://gitea.dadeschools.net", "base_url": "redacted-mdcps-service",
"username": "913443", "username": "913443",
"auth": { "auth": {
"type": "keychain", "type": "keychain",
"id": "mdcps-gitea-token" "id": "redacted-mdcps-ref"
}, },
"default_owner": "Contractor", "default_owner": "Contractor",
"execution_profile": "mdcps" "execution_profile": "mdcps",
"allowed_operations": ["read"],
"forbidden_operations": ["merge"]
}, },
"prgs-reviewer": { "prgs-reviewer": {
"base_url": "https://gitea.prgs.cc", "base_url": "redacted-prgs-service",
"username": "sysadmin", "username": "sysadmin",
"auth": { "auth": {
"type": "keychain", "type": "keychain",
"id": "prgs-gitea-reviewer-token" "id": "redacted-reviewer-ref"
}, },
"default_owner": "Scaled-Tech-Consulting", "default_owner": "Scaled-Tech-Consulting",
"execution_profile": "prgs-reviewer" "execution_profile": "prgs-reviewer",
"allowed_operations": [
"read", "review", "comment", "approve",
"request_changes", "merge"
],
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
} }
} }
} }
@@ -75,19 +84,19 @@ class TestMigrateProfiles(unittest.TestCase):
# Check service and identity structure # Check service and identity structure
prgs_gitea = envs["prgs"]["services"]["gitea"] prgs_gitea = envs["prgs"]["services"]["gitea"]
self.assertEqual(prgs_gitea["base_url"], "https://gitea.prgs.cc") self.assertEqual(prgs_gitea["base_url"], "redacted-prgs-service")
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting") self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
author = prgs_gitea["identities"]["author"] author = prgs_gitea["identities"]["author"]
self.assertEqual(author["role"], "author")
self.assertEqual(author["username"], "jcwalker3") self.assertEqual(author["username"], "jcwalker3")
self.assertEqual(author["auth"]["id"], "prgs-gitea-token") self.assertEqual(author["auth"]["id"], "redacted-author-ref")
self.assertIn("push", author["allowed_operations"]) self.assertEqual(author["allowed_operations"], ["read", "comment"])
self.assertEqual(author["forbidden_operations"], ["approve", "merge"])
reviewer = prgs_gitea["identities"]["reviewer"] reviewer = prgs_gitea["identities"]["reviewer"]
self.assertEqual(reviewer["role"], "reviewer") self.assertEqual(reviewer["role"], "reviewer")
self.assertEqual(reviewer["username"], "sysadmin") self.assertEqual(reviewer["username"], "sysadmin")
self.assertEqual(reviewer["auth"]["id"], "prgs-gitea-reviewer-token") self.assertEqual(reviewer["auth"]["id"], "redacted-reviewer-ref")
self.assertIn("merge", reviewer["allowed_operations"]) self.assertIn("merge", reviewer["allowed_operations"])
def test_alias_generation(self): def test_alias_generation(self):
@@ -138,7 +147,100 @@ class TestMigrateProfiles(unittest.TestCase):
stdout_output = mock_stdout.getvalue() stdout_output = mock_stdout.getvalue()
self.assertIn("DRY-RUN MODE", stdout_output) self.assertIn("DRY-RUN MODE", stdout_output)
self.assertIn("version", stdout_output) self.assertIn("version", stdout_output)
self.assertIn("environments", stdout_output) self.assertIn("identities", stdout_output)
self.assertIn("aliases", stdout_output)
self.assertNotIn("redacted-prgs-service", stdout_output)
self.assertNotIn("redacted-mdcps-service", stdout_output)
self.assertNotIn("redacted-author-ref", stdout_output)
self.assertNotIn("redacted-mdcps-ref", stdout_output)
self.assertNotIn("redacted-reviewer-ref", stdout_output)
self.assertNotIn("keychain", stdout_output)
self.assertNotIn("auth", stdout_output)
def test_dry_run_hides_token_like_values(self):
"""Verify dry-run summary does not expose token-like auth metadata."""
sensitive = json.loads(json.dumps(self.v1_content))
sensitive["profiles"]["prgs"]["auth"]["id"] = "super-secret-token-value"
with open(self.input_file, "w") as f:
json.dump(sensitive, f)
test_args = ["migrate_profiles.py", "-i", self.input_file]
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
stdout_output = mock_stdout.getvalue()
self.assertNotIn("super-secret-token-value", stdout_output)
self.assertNotIn("token", stdout_output.lower())
def test_explicit_operations_are_preserved(self):
"""Explicit v1 permissions must not be replaced by role defaults."""
v1_data = json.loads(json.dumps(self.v1_content))
v1_data["profiles"]["prgs-reviewer"]["allowed_operations"] = ["read"]
v1_data["profiles"]["prgs-reviewer"]["forbidden_operations"] = ["merge"]
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
reviewer = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["reviewer"]
)
self.assertEqual(reviewer["allowed_operations"], ["read"])
self.assertEqual(reviewer["forbidden_operations"], ["merge"])
def test_inferred_role_defaults_only_when_unambiguous(self):
"""Role defaults are allowed only for clear author/reviewer profiles."""
v1_data = {
"version": 1,
"profiles": {
"prgs-author": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "prgs-author",
}
}
}
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
author = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["author"]
)
self.assertEqual(
author["allowed_operations"],
migrate_profiles.AUTHOR_DEFAULT_ALLOWED,
)
self.assertEqual(
author["forbidden_operations"],
migrate_profiles.AUTHOR_DEFAULT_FORBIDDEN,
)
def test_ambiguous_permission_source_fails_closed(self):
"""A profile without explicit permissions or clear role must not widen."""
v1_data = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "personal-prgs",
}
}
}
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_partial_permission_source_fails_closed(self):
"""Allowed without forbidden, or vice versa, is ambiguous."""
v1_data = json.loads(json.dumps(self.v1_content))
del v1_data["profiles"]["prgs"]["forbidden_operations"]
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_write_mode_and_backup(self): def test_write_mode_and_backup(self):
"""Verify that write mode creates a backup and correctly saves the validated config.""" """Verify that write mode creates a backup and correctly saves the validated config."""