feat: add mirror_refs.sh for bidirectional ref syncing
- mirror_refs.sh: additive branch+tag mirroring between dadeschools (HTTPS) and prgs (SSH:2222). Dry-run default, --apply to execute, --force for diverged branches. Uses bare repo cache for isolation. - test_mirror_refs.py: flag parsing, safety defaults, brace-delimited refspec validation, and local bare-repo integration tests (FF detection, branch/tag comparison). - README.md: document mirror_refs.sh, test suite, and multi-instance auth.
This commit is contained in:
@@ -0,0 +1,269 @@
|
||||
"""Tests for mirror_refs.sh.
|
||||
|
||||
These test the script's argument handling and dry-run output. Actual mirroring
|
||||
is not tested because it requires real git remotes with credentials.
|
||||
|
||||
For the mirroring logic, we create a local test harness with two bare "remote"
|
||||
repos and verify the script's branching/tagging comparison logic indirectly.
|
||||
"""
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
REPO_ROOT = str(__import__("pathlib").Path(__file__).resolve().parent.parent)
|
||||
SCRIPT = os.path.join(REPO_ROOT, "mirror_refs.sh")
|
||||
|
||||
|
||||
def _run(args=None, env_override=None):
|
||||
"""Run mirror_refs.sh with given args, return (rc, stdout, stderr)."""
|
||||
cmd = [SCRIPT]
|
||||
if args:
|
||||
cmd.extend(args)
|
||||
env = os.environ.copy()
|
||||
if env_override:
|
||||
env.update(env_override)
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, env=env, timeout=30,
|
||||
)
|
||||
return result.returncode, result.stdout, result.stderr
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Flag parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestFlagParsing(unittest.TestCase):
|
||||
"""Verify --help, unknown flags, and flag combinations."""
|
||||
|
||||
def test_help_flag(self):
|
||||
rc, stdout, stderr = _run(["--help"])
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertIn("mirror_refs.sh", stdout)
|
||||
self.assertIn("--apply", stdout)
|
||||
self.assertIn("--force", stdout)
|
||||
|
||||
def test_h_flag(self):
|
||||
rc, stdout, stderr = _run(["-h"])
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertIn("--apply", stdout)
|
||||
|
||||
def test_unknown_flag_fails(self):
|
||||
rc, stdout, stderr = _run(["--bogus"])
|
||||
self.assertNotEqual(rc, 0)
|
||||
self.assertIn("Unknown flag", stderr)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dry-run behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestDryRunBanner(unittest.TestCase):
|
||||
"""The script should clearly indicate dry-run mode."""
|
||||
|
||||
def test_dry_run_banner_shown_by_default(self):
|
||||
"""Without --apply, the script prints a DRY RUN banner.
|
||||
|
||||
This test will fail if credentials are unavailable (expected in CI).
|
||||
We just check the banner appears before the credential check.
|
||||
"""
|
||||
rc, stdout, stderr = _run()
|
||||
# The dry-run banner is printed before any remote operations
|
||||
combined = stdout + stderr
|
||||
self.assertTrue(
|
||||
"DRY RUN" in combined or "credential" in combined.lower(),
|
||||
"Script should either show DRY RUN banner or fail at credentials"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Script structure
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestScriptStructure(unittest.TestCase):
|
||||
"""Verify the script has the expected structure and safety defaults."""
|
||||
|
||||
def test_shebang(self):
|
||||
with open(SCRIPT, "r") as f:
|
||||
first_line = f.readline().strip()
|
||||
self.assertEqual(first_line, "#!/usr/bin/env bash")
|
||||
|
||||
def test_set_euo_pipefail(self):
|
||||
with open(SCRIPT, "r") as f:
|
||||
content = f.read()
|
||||
self.assertIn("set -euo pipefail", content)
|
||||
|
||||
def test_dry_run_is_default(self):
|
||||
"""Verify DRY_RUN=true is the default in the script source."""
|
||||
with open(SCRIPT, "r") as f:
|
||||
content = f.read()
|
||||
self.assertIn("DRY_RUN=true", content)
|
||||
|
||||
def test_force_is_off_by_default(self):
|
||||
with open(SCRIPT, "r") as f:
|
||||
content = f.read()
|
||||
self.assertIn("FORCE=false", content)
|
||||
|
||||
def test_no_force_push_without_flag(self):
|
||||
"""The script should never force-push unless FORCE is true."""
|
||||
with open(SCRIPT, "r") as f:
|
||||
content = f.read()
|
||||
# All --force pushes should be gated by $FORCE
|
||||
self.assertIn("if $FORCE", content)
|
||||
|
||||
def test_uses_brace_delimited_refspecs(self):
|
||||
"""Verify refspecs use ${b} not $b to avoid zsh :r issues."""
|
||||
with open(SCRIPT, "r") as f:
|
||||
content = f.read()
|
||||
# The push commands should use ${...} brace-delimited variable refs
|
||||
# in refspec strings to prevent zsh's :r modifier from eating the colon
|
||||
lines_with_push_refspec = [
|
||||
line for line in content.splitlines()
|
||||
if "refs/heads/" in line and "push" in line
|
||||
]
|
||||
for line in lines_with_push_refspec:
|
||||
# Should not contain bare $b: or $t: patterns (without braces)
|
||||
self.assertNotRegex(
|
||||
line, r'\$[a-z]:',
|
||||
f"Unbraced variable before colon in refspec: {line.strip()}"
|
||||
)
|
||||
|
||||
def test_additive_only_no_delete(self):
|
||||
"""Script should never use --delete or push :refs/... (delete refspec)."""
|
||||
with open(SCRIPT, "r") as f:
|
||||
content = f.read()
|
||||
self.assertNotIn("--delete", content)
|
||||
# A delete refspec looks like ":refs/..." (colon with no left side)
|
||||
for line in content.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
continue
|
||||
self.assertNotRegex(
|
||||
stripped, r'push\s.*\s":refs/',
|
||||
f"Found delete refspec: {stripped}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Local mirror logic (integration test with local bare repos)
|
||||
# ---------------------------------------------------------------------------
|
||||
class TestLocalMirrorLogic(unittest.TestCase):
|
||||
"""Test the core mirroring logic using local bare repos as stand-ins.
|
||||
|
||||
This creates two bare repos ("ds" and "prgs"), puts some branches/tags
|
||||
on them, and then runs a simplified version of the comparison logic
|
||||
to validate the algorithm.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.mkdtemp(prefix="mirror-test-")
|
||||
self.ds_repo = os.path.join(self.tmpdir, "ds.git")
|
||||
self.prgs_repo = os.path.join(self.tmpdir, "prgs.git")
|
||||
self.work_repo = os.path.join(self.tmpdir, "work")
|
||||
|
||||
# Create two bare repos
|
||||
subprocess.run(["git", "init", "--bare", self.ds_repo], capture_output=True)
|
||||
subprocess.run(["git", "init", "--bare", self.prgs_repo], capture_output=True)
|
||||
|
||||
# Create a working repo to push some initial content
|
||||
subprocess.run(["git", "init", self.work_repo], capture_output=True)
|
||||
subprocess.run(
|
||||
["git", "-C", self.work_repo, "commit", "--allow-empty", "-m", "init"],
|
||||
capture_output=True,
|
||||
env={**os.environ, "GIT_AUTHOR_NAME": "test", "GIT_AUTHOR_EMAIL": "t@t",
|
||||
"GIT_COMMITTER_NAME": "test", "GIT_COMMITTER_EMAIL": "t@t"},
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
def _git(self, *args, cwd=None):
|
||||
env = {
|
||||
**os.environ,
|
||||
"GIT_AUTHOR_NAME": "test", "GIT_AUTHOR_EMAIL": "t@t",
|
||||
"GIT_COMMITTER_NAME": "test", "GIT_COMMITTER_EMAIL": "t@t",
|
||||
}
|
||||
result = subprocess.run(
|
||||
["git"] + list(args), capture_output=True, text=True,
|
||||
cwd=cwd or self.work_repo, env=env,
|
||||
)
|
||||
return result.stdout.strip()
|
||||
|
||||
def test_branch_only_on_ds_detected(self):
|
||||
"""A branch only on ds should be identified as needing push to prgs."""
|
||||
# Push main to both, then a feature branch only to ds
|
||||
self._git("remote", "add", "ds", self.ds_repo)
|
||||
self._git("remote", "add", "prgs", self.prgs_repo)
|
||||
self._git("push", "ds", "HEAD:refs/heads/main")
|
||||
self._git("push", "prgs", "HEAD:refs/heads/main")
|
||||
self._git("push", "ds", "HEAD:refs/heads/feat/only-ds")
|
||||
|
||||
# Fetch both into a bare mirror
|
||||
mirror = os.path.join(self.tmpdir, "mirror.git")
|
||||
subprocess.run(["git", "init", "--bare", mirror], capture_output=True)
|
||||
self._git("remote", "add", "ds", self.ds_repo, cwd=mirror)
|
||||
self._git("remote", "add", "prgs", self.prgs_repo, cwd=mirror)
|
||||
self._git("fetch", "ds", cwd=mirror)
|
||||
self._git("fetch", "prgs", cwd=mirror)
|
||||
|
||||
# Check: ds has feat/only-ds, prgs does not
|
||||
ds_refs = self._git("for-each-ref", "--format=%(refname)",
|
||||
"refs/remotes/ds/", cwd=mirror)
|
||||
prgs_refs = self._git("for-each-ref", "--format=%(refname)",
|
||||
"refs/remotes/prgs/", cwd=mirror)
|
||||
|
||||
self.assertIn("feat/only-ds", ds_refs)
|
||||
self.assertNotIn("feat/only-ds", prgs_refs)
|
||||
|
||||
def test_tag_only_on_one_side_detected(self):
|
||||
"""A tag only on ds should be identified as needing push to prgs."""
|
||||
self._git("remote", "add", "ds", self.ds_repo)
|
||||
self._git("remote", "add", "prgs", self.prgs_repo)
|
||||
self._git("push", "ds", "HEAD:refs/heads/main")
|
||||
self._git("push", "prgs", "HEAD:refs/heads/main")
|
||||
self._git("tag", "v1.0.0")
|
||||
self._git("push", "ds", "v1.0.0")
|
||||
|
||||
# ls-remote to check tags
|
||||
ds_tags = self._git("ls-remote", "--tags", self.ds_repo)
|
||||
prgs_tags = self._git("ls-remote", "--tags", self.prgs_repo)
|
||||
|
||||
self.assertIn("v1.0.0", ds_tags)
|
||||
self.assertNotIn("v1.0.0", prgs_tags)
|
||||
|
||||
def test_in_sync_branches_need_no_push(self):
|
||||
"""Branches at the same commit on both sides need no action."""
|
||||
self._git("remote", "add", "ds", self.ds_repo)
|
||||
self._git("remote", "add", "prgs", self.prgs_repo)
|
||||
self._git("push", "ds", "HEAD:refs/heads/main")
|
||||
self._git("push", "prgs", "HEAD:refs/heads/main")
|
||||
|
||||
ds_sha = self._git("ls-remote", self.ds_repo, "refs/heads/main").split()[0]
|
||||
prgs_sha = self._git("ls-remote", self.prgs_repo, "refs/heads/main").split()[0]
|
||||
|
||||
self.assertEqual(ds_sha, prgs_sha)
|
||||
|
||||
def test_fast_forward_detected(self):
|
||||
"""When one side is ahead (FF), it should be pushable."""
|
||||
self._git("remote", "add", "ds", self.ds_repo)
|
||||
self._git("remote", "add", "prgs", self.prgs_repo)
|
||||
self._git("push", "ds", "HEAD:refs/heads/main")
|
||||
self._git("push", "prgs", "HEAD:refs/heads/main")
|
||||
|
||||
# Make a new commit and push only to ds
|
||||
self._git("commit", "--allow-empty", "-m", "advance")
|
||||
self._git("push", "ds", "HEAD:refs/heads/main")
|
||||
|
||||
ds_sha = self._git("ls-remote", self.ds_repo, "refs/heads/main").split()[0]
|
||||
prgs_sha = self._git("ls-remote", self.prgs_repo, "refs/heads/main").split()[0]
|
||||
|
||||
self.assertNotEqual(ds_sha, prgs_sha)
|
||||
# prgs_sha should be ancestor of ds_sha (fast-forward)
|
||||
rc = subprocess.run(
|
||||
["git", "-C", self.work_repo, "merge-base", "--is-ancestor",
|
||||
prgs_sha, ds_sha],
|
||||
capture_output=True,
|
||||
).returncode
|
||||
self.assertEqual(rc, 0, "prgs commit should be ancestor of ds commit (FF)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user