#!/usr/bin/env python3 """Migration helper to convert profiles.json from version 1 to version 2 environments shape. This script preserves existing keychain references (auth.id) and maps old profile names as aliases so that existing IDE configurations continue to function. """ import os import sys import json import argparse import shutil import tempfile # Resolve path to import gitea_config PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) if PROJECT_ROOT not in sys.path: sys.path.insert(0, PROJECT_ROOT) import gitea_config AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"] AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"] REVIEWER_DEFAULT_ALLOWED = [ "read", "review", "comment", "approve", "request_changes", "merge" ] REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"] def infer_role(name, execution_profile): """Return the unambiguous role for a legacy profile name, or None.""" haystack = f"{name} {execution_profile or ''}".lower() has_author = "author" in haystack has_reviewer = "reviewer" in haystack if has_author == has_reviewer: return None return "reviewer" if has_reviewer else "author" def migration_summary(v2_data): """Return a redacted summary of the migrated config.""" environments = v2_data.get("environments", {}) service_count = 0 identity_count = 0 for env in environments.values(): services = env.get("services", {}) service_count += len(services) for service in services.values(): identity_count += len(service.get("identities", {})) return { "version": v2_data.get("version"), "environments": len(environments), "services": service_count, "identities": identity_count, "aliases": len(v2_data.get("aliases", {})), } def migrate_v1_to_v2(v1_data): """Convert version 1 profiles.json format to version 2 environments format.""" environments = {} aliases = {} profiles = v1_data.get("profiles", {}) if not isinstance(profiles, dict): raise ValueError("Malformed input: 'profiles' field must be a JSON object") for name, prof in profiles.items(): if not isinstance(prof, dict): raise ValueError(f"Malformed input: profile '{name}' must be a JSON object") # Infer environment and identity name if "-" in name: parts = name.split("-", 1) env_name = parts[0] ident_name = parts[1] else: env_name = name ident_name = "author" # Determine role and identity based on name / execution_profile. # Ambiguous profiles may still migrate only when they carry explicit # permissions; otherwise role-based defaults could widen permissions. exec_prof = prof.get("execution_profile") or "" role = infer_role(name, exec_prof) if role == "reviewer": ident_name = "reviewer" elif role == "author": ident_name = "author" else: role = prof.get("role") if role not in (None, "author", "reviewer"): raise ValueError( f"Profile '{name}' has unsupported role {role!r}" ) # Construct identity block identity_data = { "username": prof.get("username"), "auth": prof.get("auth"), } if role: identity_data["role"] = role if prof.get("execution_profile"): identity_data["execution_profile"] = prof["execution_profile"] # Set audit label (default to old name to preserve context) identity_data["audit_label"] = prof.get("audit_label") or name has_allowed = "allowed_operations" in prof has_forbidden = "forbidden_operations" in prof if has_allowed != has_forbidden: raise ValueError( f"Profile '{name}' must define both allowed_operations and " "forbidden_operations, or neither (fail closed)" ) if has_allowed: allowed = prof.get("allowed_operations") forbidden = prof.get("forbidden_operations") if not isinstance(allowed, list) or not isinstance(forbidden, list): raise ValueError( f"Profile '{name}' operation fields must be lists" ) identity_data["allowed_operations"] = list(allowed) identity_data["forbidden_operations"] = list(forbidden) elif role == "author": identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED) identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN) elif role == "reviewer": identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED) identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN) else: raise ValueError( f"Profile '{name}' has no explicit operation lists and no " "unambiguous author/reviewer role marker (fail closed)" ) # Nest inside environments/services structure env = environments.setdefault(env_name, {}) services = env.setdefault("services", {}) gitea_svc = services.setdefault("gitea", {}) # Copy service-level attributes if prof.get("base_url"): gitea_svc["base_url"] = prof["base_url"] if prof.get("default_owner"): gitea_svc["default_owner"] = prof["default_owner"] if prof.get("default_repo"): gitea_svc["default_repo"] = prof["default_repo"] identities = gitea_svc.setdefault("identities", {}) identities[ident_name] = identity_data # Alias resolution targets alias_target = f"{env_name}.gitea.{ident_name}" if name != alias_target: aliases[name] = alias_target # Extra convenience alias for standard old-profile compatibility (e.g. prgs-author) convenience_alias = f"{env_name}-{ident_name}" if convenience_alias != alias_target and convenience_alias not in aliases: aliases[convenience_alias] = alias_target v2_data = { "version": 2, "environments": environments, "aliases": aliases } return v2_data def validate_v2_data(v2_data): """Validate generated v2 structure using gitea_config parser.""" fd, temp_path = tempfile.mkstemp(suffix=".json") os.close(fd) try: with open(temp_path, "w") as f: json.dump(v2_data, f) # Attempt to load using load_config to run all validation rules gitea_config.load_config(temp_path) return True except Exception as e: raise ValueError(f"Generated v2 config failed validation: {e}") finally: try: os.remove(temp_path) except OSError: pass def main(): parser = argparse.ArgumentParser( description="Migrate profiles.json from version 1 to version 2 environments shape." ) parser.add_argument( "-i", "--input", default=gitea_config.DEFAULT_CONFIG_PATH, help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)" ) parser.add_argument( "-o", "--output", help="Path to write the migrated version 2 profiles.json file (default: overwrite input)" ) parser.add_argument( "-w", "--write", action="store_true", help="Actually write the migrated config and create a backup (default is dry-run)" ) parser.add_argument( "--backup", help="Path to write the backup file (default: .bak)" ) args = parser.parse_args() input_path = os.path.abspath(args.input) output_path = os.path.abspath(args.output or input_path) backup_path = args.backup or f"{input_path}.bak" if not os.path.isfile(input_path): print(f"Error: Input file not found: {input_path}", file=sys.stderr) sys.exit(1) try: with open(input_path, "r") as f: v1_data = json.load(f) except json.JSONDecodeError as e: print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr) sys.exit(1) except Exception as e: print(f"Error reading input file: {e}", file=sys.stderr) sys.exit(1) # Validate version version = v1_data.get("version") if version is not None and version != 1: print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr) sys.exit(1) try: v2_data = migrate_v1_to_v2(v1_data) validate_v2_data(v2_data) except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) if not args.write: print("=== DRY-RUN MODE (No files modified) ===") print("Generated v2 config validated successfully.") print("Only aggregate counts are shown.") summary = migration_summary(v2_data) print("Summary:") print(f" version: {summary['version']}") print(f" environments: {summary['environments']}") print(f" services: {summary['services']}") print(f" identities: {summary['identities']}") print(f" aliases: {summary['aliases']}") sys.exit(0) # Write Mode: Create Backup first try: print(f"Creating backup: {backup_path}") shutil.copy2(input_path, backup_path) except Exception as e: print(f"Error creating backup: {e}", file=sys.stderr) sys.exit(1) # Write migrated config try: print(f"Writing migrated version 2 config: {output_path}") # Ensure target directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, "w") as f: json.dump(v2_data, f, indent=2) f.write("\n") print("Migration completed successfully!") except Exception as e: print(f"Error writing output file: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()