feat: add MCP server + shared auth module (#7, #1)

- New: mcp_server.py — FastMCP stdio server exposing 7 tools:
  gitea_create_issue, gitea_create_pr, gitea_close_issue,
  gitea_list_issues, gitea_view_issue, gitea_mark_issue,
  gitea_mirror_refs
- New: auth.py — shared authentication and API helpers
  (get_credentials, get_auth_header, api_request, repo_api_url)
- Refactored: create_pr.py, create_issue.py, manage_labels.py
  to use shared auth module (eliminates credential duplication)
- New: tests/test_mcp_server.py — 17 tests for all MCP tools
- Updated: tests/test_credentials.py — now tests auth.py directly
- Updated: tests/test_create_issue.py — adapted for refactored imports
- New: requirements.txt — frozen venv deps (mcp[cli], pytest)
- Updated: README.md — MCP server as primary interface
- Config: added gitea-tools to mcp_config.json

Closes #1. Resolves #2, #5. Relates to #7.
This commit is contained in:
2026-06-21 20:08:07 -04:00
parent dd6f1308c1
commit b7e195e426
11 changed files with 978 additions and 214 deletions
+51 -16
View File
@@ -1,6 +1,6 @@
# Gitea Tools # Gitea Tools
A collection of Python and Bash scripts to automate interactions with Gitea instances. A collection of Python scripts and an MCP server to automate interactions with Gitea instances.
## Supported Instances ## Supported Instances
@@ -18,7 +18,38 @@ Scripts extract credentials from the macOS keychain automatically — no tokens
Ensure you've logged in via Git over HTTPS at least once so the keychain caches your credentials. Ensure you've logged in via Git over HTTPS at least once so the keychain caches your credentials.
## Available Scripts ## MCP Server (Recommended)
The Gitea-Tools MCP server exposes all functionality as structured tool calls.
Any MCP-compatible agent (Antigravity, Claude Code, etc.) can call these tools natively.
### Available Tools
| Tool | Description |
|------|-------------|
| `gitea_create_issue` | Create an issue with title, body, remote |
| `gitea_create_pr` | Open a pull request with title, head, base |
| `gitea_close_issue` | Close an issue by number |
| `gitea_list_issues` | List issues with state/label filters |
| `gitea_view_issue` | Get full details of a single issue |
| `gitea_mark_issue` | Claim/release an issue (start/done) |
| `gitea_mirror_refs` | Mirror branches + tags between instances |
### Setup
```bash
# Install dependencies
cd /Users/jasonwalker/Development/Gitea-Tools
source venv/bin/activate
pip install "mcp[cli]"
```
The server is configured in `mcp_config.json` and runs automatically when Antigravity starts.
Restart Antigravity to load the server after first setup.
## CLI Scripts
The MCP tools can also be used as standalone CLI scripts:
| Script | Description | | Script | Description |
|---------------------|--------------------------------------------------------------------| |---------------------|--------------------------------------------------------------------|
@@ -59,30 +90,34 @@ Ensure you've logged in via Git over HTTPS at least once so the keychain caches
Use `--help` on any Python script or shell script for full usage details. Use `--help` on any Python script or shell script for full usage details.
## Mirror Refs ## Architecture
`mirror_refs.sh` keeps branches and tags in sync between dadeschools and prgs: ```
auth.py ← shared auth & API helpers (get_credentials, api_request)
- **Additive only** — never deletes branches or tags mcp_server.py ← MCP server (FastMCP, stdio transport)
- **Dry-run by default** — pass `--apply` to actually push create_issue.py ← CLI: create issues
- **Divergence protection** — shared branches that have diverged are skipped with a warning; pass `--force` to override create_pr.py ← CLI: create PRs
- Uses a bare repo cache in `/tmp/gitea-mirror-Timesheet` for isolation manage_labels.py ← CLI: label management
- Won't auto-close or merge anything — just ref mirroring close_issue.sh ← CLI: close issues
mark_issue.sh ← CLI: claim/release issues
mirror_refs.sh ← CLI: ref mirroring
```
## Tests ## Tests
Run the full test suite:
```bash ```bash
# Run with the venv (includes MCP SDK)
source venv/bin/activate
python3 -m pytest tests/ -v python3 -m pytest tests/ -v
``` ```
| Test file | Covers | | Test file | Covers |
|--------------------------|---------------------------------------------------------| |--------------------------|---------------------------------------------------------|
| `test_create_issue.py` | Arg parsing, remote resolution, payload, auth, errors | | `test_mcp_server.py` | All 7 MCP tools: create, list, view, close, mark, PR, mirror |
| `test_create_pr.py` | Arg parsing, remote resolution, payload, auth, errors | | `test_create_issue.py` | CLI arg parsing, remote resolution, payload, auth, errors |
| `test_credentials.py` | `get_credentials()` parsing edge cases | | `test_create_pr.py` | CLI arg parsing, remote resolution, payload, auth, errors |
| `test_manage_labels.py` | Label create/skip, dry run, mapping, constant validation| | `test_credentials.py` | `get_credentials()`, `get_auth_header()`, `repo_api_url()` |
| `test_manage_labels.py` | Label create/skip, dry run, mapping, constant validation |
| `test_shell_scripts.py` | `close_issue.sh` + `mark_issue.sh` arg validation | | `test_shell_scripts.py` | `close_issue.sh` + `mark_issue.sh` arg validation |
| `test_mirror_refs.py` | Flags, safety defaults, local integration tests | | `test_mirror_refs.py` | Flags, safety defaults, local integration tests |
+93
View File
@@ -0,0 +1,93 @@
"""Shared authentication and API helper for Gitea scripts.
Pulls credentials from the macOS keychain via `git credential fill`
so no tokens appear on the command line.
"""
import json
import base64
import subprocess
import urllib.request
import urllib.error
# Known Gitea instances — shared by all scripts.
REMOTES = {
"dadeschools": {
"host": "gitea.dadeschools.net",
"org": "Contractor",
"repo": "Timesheet",
},
"prgs": {
"host": "gitea.prgs.cc",
"org": "Scaled-Tech-Consulting",
"repo": "Timesheet",
},
}
def get_credentials(host):
"""Return (user, password) for *host* via ``git credential fill``."""
p = subprocess.Popen(
["git", "credential", "fill"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
out, _ = p.communicate(f"protocol=https\nhost={host}\n\n")
user = password = ""
for line in out.splitlines():
if line.startswith("username="):
user = line.split("=", 1)[1]
elif line.startswith("password="):
password = line.split("=", 1)[1]
return user, password
def get_auth_header(host):
"""Return an ``Authorization: Basic …`` header value for *host*."""
user, password = get_credentials(host)
if not user or not password:
return None
token = base64.b64encode(f"{user}:{password}".encode()).decode()
return f"Basic {token}"
def resolve_remote(args):
"""Given parsed argparse args with --remote/--host/--org/--repo,
return (host, org, repo) with overrides applied."""
profile = REMOTES[args.remote]
host = args.host or profile["host"]
org = args.org or profile["org"]
repo = args.repo or profile["repo"]
return host, org, repo
def add_remote_args(parser):
"""Add the standard --remote/--host/--org/--repo arguments to a parser."""
parser.add_argument(
"--remote", choices=sorted(REMOTES), default="dadeschools",
help="Known Gitea instance (default: dadeschools).",
)
parser.add_argument("--host", help="Override the Gitea host.")
parser.add_argument("--org", help="Override the owner/org.")
parser.add_argument("--repo", help="Override the repository.")
def api_request(method, url, auth_header, payload=None):
"""Make an authenticated JSON request to the Gitea API.
Returns parsed JSON on success, raises on HTTP errors.
"""
data = json.dumps(payload).encode("utf-8") if payload is not None else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Authorization", auth_header)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req) as resp:
body = resp.read().decode("utf-8")
return json.loads(body) if body else None
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {error_body}") from e
def repo_api_url(host, org, repo):
"""Return the base API URL for a repo: https://host/api/v1/repos/org/repo"""
return f"https://{host}/api/v1/repos/{org}/{repo}"
Executable → Regular
+19 -56
View File
@@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Create a Gitea issue. """Create a Gitea issue.
Parameterized over title/body and the target Gitea instance, mirroring Parameterized over title/body and the target Gitea instance.
create_pr.py. Two instances are known out of the box: Two instances are known out of the box:
dadeschools -> gitea.dadeschools.net / Contractor / Timesheet dadeschools -> gitea.dadeschools.net / Contractor / Timesheet
prgs -> gitea.prgs.cc / Scaled-Tech-Consulting / Timesheet prgs -> gitea.prgs.cc / Scaled-Tech-Consulting / Timesheet
@@ -11,60 +11,29 @@ Auth is pulled from the macOS keychain via `git credential fill` for the
chosen host -- no tokens on the command line. chosen host -- no tokens on the command line.
Examples: Examples:
create_issue.py --title "PDF: open after creation" \\ create_issue.py --title "Bug: blank PDF" --body "Blank on Safari"
--body "Auto-open the generated PDF in the default viewer." create_issue.py --remote prgs --title "Add tests" --body-file desc.md
create_issue.py --host gitea.example.com --org Foo --repo Bar --title "..."
create_issue.py --remote prgs --title "Fix X" --body-file desc.md
""" """
import sys import sys
import json
import base64
import argparse import argparse
import subprocess
import urllib.request
import urllib.error
REMOTES = { from auth import (
"dadeschools": {"host": "gitea.dadeschools.net", "org": "Contractor", get_credentials, resolve_remote, add_remote_args,
"repo": "Timesheet"}, api_request, repo_api_url,
"prgs": {"host": "gitea.prgs.cc", "org": "Scaled-Tech-Consulting", )
"repo": "Timesheet"},
}
def get_credentials(host):
"""Return (user, password) for `host` via `git credential fill`."""
p = subprocess.Popen(
["git", "credential", "fill"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
out, _ = p.communicate(f"protocol=https\nhost={host}\n\n")
user = password = ""
for line in out.splitlines():
# split(maxsplit=1): tokens/passwords can themselves contain '='.
if line.startswith("username="):
user = line.split("=", 1)[1]
elif line.startswith("password="):
password = line.split("=", 1)[1]
return user, password
def main(argv=None): def main(argv=None):
parser = argparse.ArgumentParser(description="Create a Gitea issue.") parser = argparse.ArgumentParser(description="Create a Gitea issue.")
parser.add_argument("--remote", choices=sorted(REMOTES), default="dadeschools", add_remote_args(parser)
help="Known Gitea instance (default: dadeschools).")
parser.add_argument("--host", help="Override the Gitea host.")
parser.add_argument("--org", help="Override the owner/org.")
parser.add_argument("--repo", help="Override the repository.")
parser.add_argument("--title", required=True, help="Issue title.") parser.add_argument("--title", required=True, help="Issue title.")
parser.add_argument("--body", default="", help="Issue body text.") parser.add_argument("--body", default="", help="Issue body text.")
parser.add_argument("--body-file", help="Read issue body from this file ('-' for stdin).") parser.add_argument("--body-file",
help="Read issue body from this file ('-' for stdin).")
args = parser.parse_args(argv) args = parser.parse_args(argv)
profile = REMOTES[args.remote] host, org, repo = resolve_remote(args)
host = args.host or profile["host"]
org = args.org or profile["org"]
repo = args.repo or profile["repo"]
body = args.body body = args.body
if args.body_file: if args.body_file:
@@ -81,22 +50,16 @@ def main(argv=None):
file=sys.stderr) file=sys.stderr)
return 1 return 1
url = f"https://{host}/api/v1/repos/{org}/{repo}/issues" import base64
payload = {"title": args.title, "body": body} auth = f"Basic {base64.b64encode(f'{user}:{password}'.encode()).decode()}"
req = urllib.request.Request( url = f"{repo_api_url(host, org, repo)}/issues"
url, data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
auth_b64 = base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("utf-8")
req.add_header("Authorization", f"Basic {auth_b64}")
try: try:
with urllib.request.urlopen(req) as response: data = api_request("POST", url, auth, {"title": args.title, "body": body})
data = json.load(response)
print(f"Issue #{data.get('number')}: {data.get('html_url')}") print(f"Issue #{data.get('number')}: {data.get('html_url')}")
return 0 return 0
except urllib.error.HTTPError as e: except RuntimeError as e:
print(f"Error {e.code}: {e.read().decode()}", file=sys.stderr) print(f"Error: {e}", file=sys.stderr)
return 1 return 1
+4 -33
View File
@@ -26,42 +26,15 @@ import sys
import json import json
import base64 import base64
import argparse import argparse
import subprocess
import urllib.request import urllib.request
import urllib.error import urllib.error
REMOTES = { from auth import get_credentials, resolve_remote, add_remote_args
"dadeschools": {"host": "gitea.dadeschools.net", "org": "Contractor",
"repo": "Timesheet"},
"prgs": {"host": "gitea.prgs.cc", "org": "Scaled-Tech-Consulting",
"repo": "Timesheet"},
}
def get_credentials(host):
"""Return (user, password) for `host` via `git credential fill`."""
p = subprocess.Popen(
["git", "credential", "fill"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
out, _ = p.communicate(f"protocol=https\nhost={host}\n\n")
user = password = ""
for line in out.splitlines():
# split(maxsplit=1): tokens/passwords can themselves contain '='.
if line.startswith("username="):
user = line.split("=", 1)[1]
elif line.startswith("password="):
password = line.split("=", 1)[1]
return user, password
def main(argv=None): def main(argv=None):
parser = argparse.ArgumentParser(description="Create a Gitea pull request.") parser = argparse.ArgumentParser(description="Create a Gitea pull request.")
parser.add_argument("--remote", choices=sorted(REMOTES), default="dadeschools", add_remote_args(parser)
help="Known Gitea instance (default: dadeschools).")
parser.add_argument("--host", help="Override the Gitea host.")
parser.add_argument("--org", help="Override the owner/org.")
parser.add_argument("--repo", help="Override the repository.")
parser.add_argument("--title", required=True, help="PR title.") parser.add_argument("--title", required=True, help="PR title.")
parser.add_argument("--head", required=True, help="Source branch.") parser.add_argument("--head", required=True, help="Source branch.")
parser.add_argument("--base", default="main", help="Target branch (default: main).") parser.add_argument("--base", default="main", help="Target branch (default: main).")
@@ -69,10 +42,7 @@ def main(argv=None):
parser.add_argument("--body-file", help="Read PR body from this file ('-' for stdin).") parser.add_argument("--body-file", help="Read PR body from this file ('-' for stdin).")
args = parser.parse_args(argv) args = parser.parse_args(argv)
profile = REMOTES[args.remote] host, org, repo = resolve_remote(args)
host = args.host or profile["host"]
org = args.org or profile["org"]
repo = args.repo or profile["repo"]
body = args.body body = args.body
if args.body_file: if args.body_file:
@@ -110,3 +80,4 @@ def main(argv=None):
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())
+14 -36
View File
@@ -9,11 +9,8 @@ Usage:
./manage_labels.py --dry # print actions without writing ./manage_labels.py --dry # print actions without writing
""" """
import sys import sys
import json
import base64 from auth import get_auth_header, api_request, repo_api_url
import subprocess
import urllib.request
import urllib.error
HOST = "gitea.dadeschools.net" HOST = "gitea.dadeschools.net"
ORG = "Contractor" ORG = "Contractor"
@@ -50,46 +47,27 @@ MAPPING = {
1: ["nice-to-have"], 1: ["nice-to-have"],
} }
API = f"https://{HOST}/api/v1/repos/{ORG}/{REPO}" BASE_URL = repo_api_url(HOST, ORG, REPO)
def get_auth_header():
p = subprocess.Popen(
["git", "credential", "fill"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
out, _ = p.communicate(f"protocol=https\nhost={HOST}\n\n")
user = password = ""
for line in out.splitlines():
if line.startswith("username="):
user = line.split("=", 1)[1]
if line.startswith("password="):
password = line.split("=", 1)[1]
if not user or not password:
print("Could not get credentials from git credential fill", file=sys.stderr)
sys.exit(1)
token = base64.b64encode(f"{user}:{password}".encode()).decode()
return f"Basic {token}"
def api(method, path, auth, payload=None): def api(method, path, auth, payload=None):
url = f"{API}{path}" """Thin wrapper around auth.api_request that prepends BASE_URL and
data = json.dumps(payload).encode() if payload is not None else None handles errors gracefully (returns None instead of raising)."""
req = urllib.request.Request(url, data=data, method=method) url = f"{BASE_URL}{path}"
req.add_header("Authorization", auth)
req.add_header("Content-Type", "application/json")
try: try:
with urllib.request.urlopen(req) as r: return api_request(method, url, auth, payload)
body = r.read().decode() except RuntimeError as e:
return json.loads(body) if body else None print(f" {e}", file=sys.stderr)
except urllib.error.HTTPError as e:
print(f" HTTP {e.code} on {method} {path}: {e.read().decode()}", file=sys.stderr)
return None return None
def main(): def main():
dry = "--dry" in sys.argv dry = "--dry" in sys.argv
auth = get_auth_header() auth = get_auth_header(HOST)
if auth is None:
print("Could not get credentials from git credential fill",
file=sys.stderr)
sys.exit(1)
# 1. Existing labels -> name:id # 1. Existing labels -> name:id
existing = api("GET", "/labels?limit=100", auth) or [] existing = api("GET", "/labels?limit=100", auth) or []
+327
View File
@@ -0,0 +1,327 @@
#!/usr/bin/env python3
"""Gitea MCP Server — exposes Gitea operations as MCP tools.
Runs over stdio. All tools authenticate via macOS keychain (git credential fill).
Usage (standalone test):
python3 mcp_server.py
Configuration (mcp_config.json):
"gitea-tools": {
"command": "/Users/jasonwalker/Development/Gitea-Tools/venv/bin/python3",
"args": ["/Users/jasonwalker/Development/Gitea-Tools/mcp_server.py"],
"env": {}
}
"""
import os
import sys
import subprocess
# Ensure the project root is on the path so auth.py can be imported.
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from mcp.server.fastmcp import FastMCP # noqa: E402
from auth import ( # noqa: E402
REMOTES,
get_credentials,
get_auth_header,
api_request,
repo_api_url,
)
mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
"Use the gitea_ prefixed tools to create issues, PRs, list issues, etc."
))
# ── Helpers ───────────────────────────────────────────────────────────────────
def _resolve(remote: str, host: str | None, org: str | None, repo: str | None):
"""Resolve remote + overrides to (host, org, repo)."""
if remote not in REMOTES:
raise ValueError(f"Unknown remote '{remote}'. Choose from: {list(REMOTES)}")
profile = REMOTES[remote]
return (
host or profile["host"],
org or profile["org"],
repo or profile["repo"],
)
def _auth(host: str) -> str:
"""Get auth header, raise if unavailable."""
header = get_auth_header(host)
if header is None:
raise RuntimeError(
f"No credentials for {host}. "
"Ensure you've logged in via HTTPS at least once."
)
return header
# ── Tools ─────────────────────────────────────────────────────────────────────
@mcp.tool()
def gitea_create_issue(
title: str,
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a new issue on a Gitea repository.
Args:
title: Issue title (required).
body: Issue body text.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created issue.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues"
data = api_request("POST", url, auth, {"title": title, "body": body})
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_create_pr(
title: str,
head: str,
base: str = "main",
body: str = "",
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Create a pull request on a Gitea repository.
Args:
title: PR title (required).
head: Source branch name (required).
base: Target branch (default: 'main').
body: PR description.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number' and 'url' of the created PR.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/pulls"
payload = {"title": title, "body": body, "head": head, "base": base}
data = api_request("POST", url, auth, payload)
return {"number": data["number"], "url": data["html_url"]}
@mcp.tool()
def gitea_close_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Close an issue by setting its state to 'closed'.
Args:
issue_number: The issue number to close.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
api_request("PATCH", url, auth, {"state": "closed"})
return {"success": True, "message": f"Issue #{issue_number} closed."}
@mcp.tool()
def gitea_list_issues(
state: str = "open",
label: str | None = None,
limit: int = 50,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> list[dict]:
"""List issues on a Gitea repository with optional filters.
Args:
state: Filter by state — 'open', 'closed', or 'all'.
label: Filter by label name (e.g. 'important').
limit: Max number of issues to return (default: 50).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
List of dicts with 'number', 'title', 'state', 'labels', 'assignee'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
params = f"state={state}&limit={limit}&type=issues"
if label:
params += f"&labels={label}"
url = f"{repo_api_url(h, o, r)}/issues?{params}"
issues = api_request("GET", url, auth)
return [
{
"number": i["number"],
"title": i["title"],
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
}
for i in issues
]
@mcp.tool()
def gitea_view_issue(
issue_number: int,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Get full details of a single issue.
Args:
issue_number: The issue number to view.
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'number', 'title', 'body', 'state', 'labels', 'assignee', 'url'.
"""
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
url = f"{repo_api_url(h, o, r)}/issues/{issue_number}"
i = api_request("GET", url, auth)
return {
"number": i["number"],
"title": i["title"],
"body": i.get("body", ""),
"state": i["state"],
"labels": [lb["name"] for lb in i.get("labels", [])],
"assignee": (i.get("assignee") or {}).get("login", ""),
"url": i["html_url"],
}
@mcp.tool()
def gitea_mark_issue(
issue_number: int,
action: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Claim or release an issue via the status:in-progress label.
This is the cross-agent lock mechanism. Check before starting work.
Args:
issue_number: The issue number.
action: 'start' to claim (add label) or 'done' to release (remove label).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success' boolean and 'message'.
"""
if action not in ("start", "done"):
raise ValueError(f"action must be 'start' or 'done', got '{action}'")
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
base = repo_api_url(h, o, r)
# Find the status:in-progress label id
labels = api_request("GET", f"{base}/labels?limit=100", auth)
label_id = None
for lb in labels:
if lb["name"] == "status:in-progress":
label_id = lb["id"]
break
if label_id is None:
raise RuntimeError(
"Label 'status:in-progress' not found. "
"Run manage_labels.py to create it first."
)
if action == "start":
api_request("POST", f"{base}/issues/{issue_number}/labels", auth,
{"labels": [label_id]})
return {"success": True, "message": f"Issue #{issue_number} claimed."}
else:
api_request("DELETE",
f"{base}/issues/{issue_number}/labels/{label_id}", auth)
return {"success": True, "message": f"Issue #{issue_number} released."}
@mcp.tool()
def gitea_mirror_refs(
apply: bool = False,
force: bool = False,
) -> dict:
"""Mirror branches and tags between dadeschools and prgs Timesheet repos.
Additive only — never deletes branches or tags. Diverged branches are
skipped unless force is True.
Args:
apply: If True, actually push. If False (default), dry-run only.
force: If True, force-push diverged branches.
Returns:
dict with 'output' (script stdout) and 'return_code'.
"""
script = os.path.join(PROJECT_ROOT, "mirror_refs.sh")
args = [script]
if apply:
args.append("--apply")
if force:
args.append("--force")
result = subprocess.run(
args, capture_output=True, text=True, timeout=120,
)
return {
"output": result.stdout + result.stderr,
"return_code": result.returncode,
}
# ── Entry point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")
+40
View File
@@ -0,0 +1,40 @@
annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.14.0
attrs==26.1.0
certifi==2026.6.17
cffi==2.0.0
click==8.4.1
cryptography==49.0.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
httpx-sse==0.4.3
idna==3.18
iniconfig==2.3.0
jsonschema==4.26.0
jsonschema-specifications==2025.9.1
markdown-it-py==4.2.0
mcp==1.28.0
mdurl==0.1.2
packaging==26.2
pluggy==1.6.0
pycparser==3.0
pydantic==2.13.4
pydantic-settings==2.14.2
pydantic_core==2.46.4
Pygments==2.20.0
PyJWT==2.13.0
pytest==9.1.1
python-dotenv==1.2.2
python-multipart==0.0.32
referencing==0.37.0
rich==15.0.0
rpds-py==2026.5.1
shellingham==1.5.4
sse-starlette==3.4.5
starlette==1.3.1
typer==0.26.7
typing-inspection==0.4.2
typing_extensions==4.15.0
uvicorn==0.49.0
Executable
+117
View File
@@ -0,0 +1,117 @@
#!/usr/bin/env bash
# sync_repos.sh — mirror branches + tags between the two Gitea instances that
# host the Timesheet repo, in BOTH directions.
#
# dadeschools : gitea.dadeschools.net / Contractor/Timesheet (HTTPS — SSH:2222 is flaky)
# prgs : gitea-ssh.prgs.cc:2222 / Scaled-Tech-Consulting/Timesheet (SSH — HTTPS host 404s)
#
# Safety model:
# * ADDITIVE by default. A branch on only one side is pushed to the other.
# * A shared branch where one side is strictly ahead is fast-forwarded.
# * A shared branch that has DIVERGED is skipped with a loud warning
# (never auto-overwritten). Resolve those by hand.
# * Dry-run by default; pass --apply to actually push. --force lets the
# fast-forward pushes use --force (still skips diverged branches).
#
# Auth is automatic via the macOS keychain (`git credential fill`), same as the
# other Gitea-Tools scripts. Run it from inside any clone of the repo, or set
# REPO=/path/to/clone.
#
# Usage:
# ./sync_repos.sh # dry run — show what WOULD sync
# ./sync_repos.sh --apply # perform the sync
# ./sync_repos.sh --apply --force
set -euo pipefail
# --- config ------------------------------------------------------------------
DADE_URL="https://gitea.dadeschools.net/Contractor/Timesheet.git"
PRGS_URL="ssh://git@gitea-ssh.prgs.cc:2222/Scaled-Tech-Consulting/Timesheet.git"
REPO="${REPO:-$(pwd)}"
APPLY=0
FORCE=0
for arg in "$@"; do
case "$arg" in
--apply) APPLY=1 ;;
--force) FORCE=1 ;;
-h|--help) sed -n '2,30p' "$0"; exit 0 ;;
*) echo "Unknown flag: $arg" >&2; exit 2 ;;
esac
done
cd "$REPO"
git rev-parse --git-dir >/dev/null 2>&1 || { echo "Not a git repo: $REPO" >&2; exit 1; }
note() { printf '%s\n' "$*"; }
action() { if [ "$APPLY" -eq 1 ]; then printf ' ✓ %s\n' "$*"; else printf ' [dry] %s\n' "$*"; fi; }
# --- fetch both sides into private namespaces --------------------------------
note "==> Fetching both remotes..."
git fetch --prune "$DADE_URL" '+refs/heads/*:refs/sync/dade/*' 'refs/tags/*:refs/tags/*' >/dev/null 2>&1 \
|| { echo "ERROR: cannot fetch dadeschools (check VPN/keychain)" >&2; exit 1; }
git fetch --prune "$PRGS_URL" '+refs/heads/*:refs/sync/prgs/*' >/dev/null 2>&1 \
|| { echo "ERROR: cannot fetch prgs (check SSH access)" >&2; exit 1; }
# --- reconcile branches ------------------------------------------------------
# push <dest_url> <sha> <branch> <label>
push_branch() {
local url="$1" sha="$2" b="$3" label="$4" ff_flag=""
[ "$FORCE" -eq 1 ] && ff_flag="--force"
action "push '${b}' -> ${label}"
if [ "$APPLY" -eq 1 ]; then
git push $ff_flag "$url" "${sha}:refs/heads/${b}" >/dev/null 2>&1 \
&& note " done." \
|| note " FAILED (see: git push $ff_flag $url ${sha}:refs/heads/${b})"
fi
}
branches=$(git for-each-ref --format='%(refname:lstrip=3)' refs/sync/dade refs/sync/prgs | sort -u)
note "==> Reconciling branches..."
synced=0; pushed=0; diverged=0
for b in $branches; do
d=$(git rev-parse -q --verify "refs/sync/dade/${b}" || true)
p=$(git rev-parse -q --verify "refs/sync/prgs/${b}" || true)
if [ -n "$d" ] && [ -z "$p" ]; then
note "branch '${b}': only on dadeschools"; push_branch "$PRGS_URL" "$d" "$b" "prgs"; pushed=$((pushed+1)); continue
fi
if [ -z "$d" ] && [ -n "$p" ]; then
note "branch '${b}': only on prgs"; push_branch "$DADE_URL" "$p" "$b" "dadeschools"; pushed=$((pushed+1)); continue
fi
# on both
if [ "$d" = "$p" ]; then synced=$((synced+1)); continue; fi
if git merge-base --is-ancestor "$p" "$d"; then
note "branch '${b}': dadeschools is ahead (fast-forward)"; push_branch "$PRGS_URL" "$d" "$b" "prgs"; pushed=$((pushed+1))
elif git merge-base --is-ancestor "$d" "$p"; then
note "branch '${b}': prgs is ahead (fast-forward)"; push_branch "$DADE_URL" "$p" "$b" "dadeschools"; pushed=$((pushed+1))
else
note "branch '${b}': ⚠ DIVERGED — skipped (resolve manually; --force will not auto-pick a winner)"; diverged=$((diverged+1))
fi
done
# --- tags (additive both ways) ----------------------------------------------
note "==> Syncing tags..."
sync_tags() {
local url="$1" label="$2"
local remote_tags local_tags missing
remote_tags=$(git ls-remote --tags "$url" 2>/dev/null | awk '{print $2}' | grep -v '\^{}' | sed 's#refs/tags/##' | sort || true)
local_tags=$(git tag -l | sort)
missing=$(comm -23 <(printf '%s\n' "$local_tags") <(printf '%s\n' "$remote_tags"))
if [ -z "$missing" ]; then note " ${label}: tags up to date"; return; fi
for t in $missing; do
action "push tag '${t}' -> ${label}"
[ "$APPLY" -eq 1 ] && git push "$url" "refs/tags/${t}" >/dev/null 2>&1 && note " done."
done
}
sync_tags "$DADE_URL" "dadeschools"
sync_tags "$PRGS_URL" "prgs"
# --- cleanup private namespace ----------------------------------------------
git for-each-ref --format='%(refname)' refs/sync | while read -r r; do git update-ref -d "$r"; done
note ""
note "==> Summary: ${synced} already in sync, ${pushed} branch push(es), ${diverged} diverged/skipped."
[ "$APPLY" -eq 0 ] && note " (dry run — re-run with --apply to perform the sync)"
[ "$diverged" -gt 0 ] && note "${diverged} diverged branch(es) need manual reconciliation."
exit 0
+30 -56
View File
@@ -1,7 +1,7 @@
"""Tests for create_issue.py. """Tests for create_issue.py.
Every test mocks `get_credentials` and `urllib.request.urlopen` so no real Every test mocks auth functions so no real network calls or keychain
network calls or keychain access are performed. access are performed.
Note: create_issue.py may be inaccessible due to macOS sandbox restrictions. Note: create_issue.py may be inaccessible due to macOS sandbox restrictions.
If so, these tests are automatically skipped. If so, these tests are automatically skipped.
@@ -28,17 +28,6 @@ except (ImportError, PermissionError, OSError) as exc:
FAKE_CREDS = ("testuser", "testpass") FAKE_CREDS = ("testuser", "testpass")
def _mock_urlopen(status=200, body=None):
"""Return a mock context-manager for urllib.request.urlopen."""
if body is None:
body = {"number": 42, "html_url": "https://gitea.example.com/issues/42"}
resp = MagicMock()
resp.read.return_value = json.dumps(body).encode("utf-8")
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Argument parsing # Argument parsing
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -46,9 +35,9 @@ def _mock_urlopen(status=200, body=None):
class TestArgParsing(unittest.TestCase): class TestArgParsing(unittest.TestCase):
"""Ensure argparse accepts the expected flags and rejects bad input.""" """Ensure argparse accepts the expected flags and rejects bad input."""
@patch("create_issue.urllib.request.urlopen", return_value=_mock_urlopen()) @patch("create_issue.api_request", return_value={"number": 1, "html_url": "http://x/1"})
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_minimal_args(self, _cred, _url): def test_minimal_args(self, _cred, _api):
rc = create_issue.main(["--title", "Hello"]) rc = create_issue.main(["--title", "Hello"])
self.assertEqual(rc, 0) self.assertEqual(rc, 0)
@@ -57,9 +46,9 @@ class TestArgParsing(unittest.TestCase):
create_issue.main(["--body", "no title given"]) create_issue.main(["--body", "no title given"])
self.assertNotEqual(ctx.exception.code, 0) self.assertNotEqual(ctx.exception.code, 0)
@patch("create_issue.urllib.request.urlopen", return_value=_mock_urlopen()) @patch("create_issue.api_request", return_value={"number": 1, "html_url": "http://x/1"})
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_remote_choices(self, _cred, _url): def test_remote_choices(self, _cred, _api):
for remote in ("dadeschools", "prgs"): for remote in ("dadeschools", "prgs"):
rc = create_issue.main(["--remote", remote, "--title", "X"]) rc = create_issue.main(["--remote", remote, "--title", "X"])
self.assertEqual(rc, 0, f"--remote {remote} should be accepted") self.assertEqual(rc, 0, f"--remote {remote} should be accepted")
@@ -76,21 +65,21 @@ class TestArgParsing(unittest.TestCase):
class TestRemoteResolution(unittest.TestCase): class TestRemoteResolution(unittest.TestCase):
"""Verify the correct host/org/repo are selected per remote.""" """Verify the correct host/org/repo are selected per remote."""
@patch("create_issue.urllib.request.urlopen", return_value=_mock_urlopen()) @patch("create_issue.api_request", return_value={"number": 1, "html_url": "http://x/1"})
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_dadeschools_default(self, mock_cred, mock_url): def test_dadeschools_default(self, mock_cred, _api):
create_issue.main(["--title", "T"]) create_issue.main(["--title", "T"])
mock_cred.assert_called_with("gitea.dadeschools.net") mock_cred.assert_called_with("gitea.dadeschools.net")
@patch("create_issue.urllib.request.urlopen", return_value=_mock_urlopen()) @patch("create_issue.api_request", return_value={"number": 1, "html_url": "http://x/1"})
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_prgs_remote(self, mock_cred, mock_url): def test_prgs_remote(self, mock_cred, _api):
create_issue.main(["--remote", "prgs", "--title", "T"]) create_issue.main(["--remote", "prgs", "--title", "T"])
mock_cred.assert_called_with("gitea.prgs.cc") mock_cred.assert_called_with("gitea.prgs.cc")
@patch("create_issue.urllib.request.urlopen", return_value=_mock_urlopen()) @patch("create_issue.api_request", return_value={"number": 1, "html_url": "http://x/1"})
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_host_override(self, mock_cred, mock_url): def test_host_override(self, mock_cred, _api):
create_issue.main(["--host", "custom.example.com", "--title", "T"]) create_issue.main(["--host", "custom.example.com", "--title", "T"])
mock_cred.assert_called_with("custom.example.com") mock_cred.assert_called_with("custom.example.com")
@@ -104,27 +93,19 @@ class TestAPIPayload(unittest.TestCase):
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_payload_title_and_body(self, _cred): def test_payload_title_and_body(self, _cred):
with patch("create_issue.urllib.request.Request") as MockReq: with patch("create_issue.api_request",
MockReq.return_value = MagicMock() return_value={"number": 1, "html_url": "http://x/1"}) as mock_api:
with patch("create_issue.urllib.request.urlopen",
return_value=_mock_urlopen()):
create_issue.main(["--title", "My Title", "--body", "My Body"]) create_issue.main(["--title", "My Title", "--body", "My Body"])
payload = mock_api.call_args[0][3]
# Inspect the data kwarg passed to Request(url, data=..., ...) self.assertEqual(payload["title"], "My Title")
call_kwargs = MockReq.call_args self.assertEqual(payload["body"], "My Body")
data = json.loads(call_kwargs[1]["data"].decode("utf-8"))
self.assertEqual(data["title"], "My Title")
self.assertEqual(data["body"], "My Body")
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_url_construction(self, _cred): def test_url_construction(self, _cred):
with patch("create_issue.urllib.request.Request") as MockReq: with patch("create_issue.api_request",
MockReq.return_value = MagicMock() return_value={"number": 1, "html_url": "http://x/1"}) as mock_api:
with patch("create_issue.urllib.request.urlopen",
return_value=_mock_urlopen()):
create_issue.main(["--remote", "prgs", "--title", "X"]) create_issue.main(["--remote", "prgs", "--title", "X"])
url = mock_api.call_args[0][1]
url = MockReq.call_args[0][0]
self.assertEqual( self.assertEqual(
url, url,
"https://gitea.prgs.cc/api/v1/repos/Scaled-Tech-Consulting/Timesheet/issues", "https://gitea.prgs.cc/api/v1/repos/Scaled-Tech-Consulting/Timesheet/issues",
@@ -137,19 +118,16 @@ class TestAPIPayload(unittest.TestCase):
@unittest.skipIf(_SKIP, _REASON) @unittest.skipIf(_SKIP, _REASON)
class TestBodyFile(unittest.TestCase): class TestBodyFile(unittest.TestCase):
@patch("create_issue.urllib.request.urlopen", return_value=_mock_urlopen())
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_body_from_file(self, _cred, _url): def test_body_from_file(self, _cred):
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f:
f.write("File body content") f.write("File body content")
f.flush() f.flush()
with patch("create_issue.urllib.request.Request") as MockReq: with patch("create_issue.api_request",
MockReq.return_value = MagicMock() return_value={"number": 1, "html_url": "http://x/1"}) as mock_api:
with patch("create_issue.urllib.request.urlopen",
return_value=_mock_urlopen()):
create_issue.main(["--title", "T", "--body-file", f.name]) create_issue.main(["--title", "T", "--body-file", f.name])
data = json.loads(MockReq.call_args[1]["data"].decode("utf-8")) payload = mock_api.call_args[0][3]
self.assertEqual(data["body"], "File body content") self.assertEqual(payload["body"], "File body content")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -165,19 +143,15 @@ class TestAuthFailure(unittest.TestCase):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# HTTP error handling # API error handling
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@unittest.skipIf(_SKIP, _REASON) @unittest.skipIf(_SKIP, _REASON)
class TestHTTPError(unittest.TestCase): class TestAPIError(unittest.TestCase):
@patch("create_issue.get_credentials", return_value=FAKE_CREDS) @patch("create_issue.get_credentials", return_value=FAKE_CREDS)
def test_http_error_returns_1(self, _cred): def test_api_error_returns_1(self, _cred):
import urllib.error with patch("create_issue.api_request",
err = urllib.error.HTTPError( side_effect=RuntimeError("HTTP 422: duplicate")):
url="https://example.com", code=422, msg="Unprocessable",
hdrs=None, fp=io.BytesIO(b'{"message":"duplicate"}'),
)
with patch("create_issue.urllib.request.urlopen", side_effect=err):
rc = create_issue.main(["--title", "Dup"]) rc = create_issue.main(["--title", "Dup"])
self.assertEqual(rc, 1) self.assertEqual(rc, 1)
+34 -12
View File
@@ -1,4 +1,4 @@
"""Tests for the shared get_credentials() function used by create_issue.py and create_pr.py. """Tests for the shared get_credentials() function in auth.py.
These test the credential parsing logic in isolation by mocking subprocess.Popen. These test the credential parsing logic in isolation by mocking subprocess.Popen.
""" """
@@ -7,7 +7,7 @@ import unittest
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent)) sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import create_pr # noqa: E402 (get_credentials is identical in create_issue.py and create_pr.py) import auth # noqa: E402
class TestGetCredentials(unittest.TestCase): class TestGetCredentials(unittest.TestCase):
@@ -19,45 +19,45 @@ class TestGetCredentials(unittest.TestCase):
mock_proc.communicate.return_value = (output_text, "") mock_proc.communicate.return_value = (output_text, "")
return mock_proc return mock_proc
@patch("create_pr.subprocess.Popen") @patch("auth.subprocess.Popen")
def test_parses_standard_output(self, mock_popen_cls): def test_parses_standard_output(self, mock_popen_cls):
mock_popen_cls.return_value = self._mock_popen( mock_popen_cls.return_value = self._mock_popen(
"protocol=https\nhost=gitea.example.com\nusername=admin\npassword=s3cret\n" "protocol=https\nhost=gitea.example.com\nusername=admin\npassword=s3cret\n"
) )
user, password = create_pr.get_credentials("gitea.example.com") user, password = auth.get_credentials("gitea.example.com")
self.assertEqual(user, "admin") self.assertEqual(user, "admin")
self.assertEqual(password, "s3cret") self.assertEqual(password, "s3cret")
@patch("create_pr.subprocess.Popen") @patch("auth.subprocess.Popen")
def test_handles_password_with_equals(self, mock_popen_cls): def test_handles_password_with_equals(self, mock_popen_cls):
# Tokens often contain '=' characters # Tokens often contain '=' characters
mock_popen_cls.return_value = self._mock_popen( mock_popen_cls.return_value = self._mock_popen(
"username=bot\npassword=abc=def=ghi\n" "username=bot\npassword=abc=def=ghi\n"
) )
user, password = create_pr.get_credentials("example.com") user, password = auth.get_credentials("example.com")
self.assertEqual(user, "bot") self.assertEqual(user, "bot")
self.assertEqual(password, "abc=def=ghi") self.assertEqual(password, "abc=def=ghi")
@patch("create_pr.subprocess.Popen") @patch("auth.subprocess.Popen")
def test_empty_output_returns_empty(self, mock_popen_cls): def test_empty_output_returns_empty(self, mock_popen_cls):
mock_popen_cls.return_value = self._mock_popen("") mock_popen_cls.return_value = self._mock_popen("")
user, password = create_pr.get_credentials("example.com") user, password = auth.get_credentials("example.com")
self.assertEqual(user, "") self.assertEqual(user, "")
self.assertEqual(password, "") self.assertEqual(password, "")
@patch("create_pr.subprocess.Popen") @patch("auth.subprocess.Popen")
def test_missing_password_returns_empty(self, mock_popen_cls): def test_missing_password_returns_empty(self, mock_popen_cls):
mock_popen_cls.return_value = self._mock_popen("username=admin\n") mock_popen_cls.return_value = self._mock_popen("username=admin\n")
user, password = create_pr.get_credentials("example.com") user, password = auth.get_credentials("example.com")
self.assertEqual(user, "admin") self.assertEqual(user, "admin")
self.assertEqual(password, "") self.assertEqual(password, "")
@patch("create_pr.subprocess.Popen") @patch("auth.subprocess.Popen")
def test_sends_correct_stdin(self, mock_popen_cls): def test_sends_correct_stdin(self, mock_popen_cls):
mock_proc = self._mock_popen("username=u\npassword=p\n") mock_proc = self._mock_popen("username=u\npassword=p\n")
mock_popen_cls.return_value = mock_proc mock_popen_cls.return_value = mock_proc
create_pr.get_credentials("gitea.prgs.cc") auth.get_credentials("gitea.prgs.cc")
# Verify the correct input was sent to git credential fill # Verify the correct input was sent to git credential fill
mock_proc.communicate.assert_called_once_with( mock_proc.communicate.assert_called_once_with(
@@ -65,5 +65,27 @@ class TestGetCredentials(unittest.TestCase):
) )
class TestGetAuthHeader(unittest.TestCase):
"""Test the get_auth_header function."""
@patch("auth.get_credentials", return_value=("user", "pass"))
def test_returns_basic_header(self, _cred):
header = auth.get_auth_header("example.com")
self.assertIsNotNone(header)
self.assertTrue(header.startswith("Basic "))
@patch("auth.get_credentials", return_value=("", ""))
def test_returns_none_for_missing_creds(self, _cred):
header = auth.get_auth_header("example.com")
self.assertIsNone(header)
class TestRepoApiUrl(unittest.TestCase):
def test_url_format(self):
url = auth.repo_api_url("gitea.prgs.cc", "Org", "Repo")
self.assertEqual(url, "https://gitea.prgs.cc/api/v1/repos/Org/Repo")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
+244
View File
@@ -0,0 +1,244 @@
"""Tests for the MCP server tool functions.
Each tool is tested by calling the underlying function directly (not through
the MCP protocol) with mocked API responses.
"""
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
from mcp_server import ( # noqa: E402
gitea_create_issue,
gitea_create_pr,
gitea_close_issue,
gitea_list_issues,
gitea_view_issue,
gitea_mark_issue,
gitea_mirror_refs,
)
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
# ---------------------------------------------------------------------------
# Create Issue
# ---------------------------------------------------------------------------
class TestCreateIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_issue(self, _auth, mock_api):
mock_api.return_value = {"number": 1, "html_url": "https://gitea.example.com/issues/1"}
result = gitea_create_issue(title="Test issue", body="body text")
self.assertEqual(result["number"], 1)
self.assertIn("issues/1", result["url"])
mock_api.assert_called_once()
# Verify payload
call_args = mock_api.call_args
self.assertEqual(call_args[0][0], "POST")
self.assertEqual(call_args[0][3]["title"], "Test issue")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_on_prgs(self, _auth, mock_api):
mock_api.return_value = {"number": 5, "html_url": "https://gitea.prgs.cc/issues/5"}
result = gitea_create_issue(title="Test", remote="prgs")
self.assertEqual(result["number"], 5)
url = mock_api.call_args[0][1]
self.assertIn("gitea.prgs.cc", url)
self.assertIn("Scaled-Tech-Consulting", url)
# ---------------------------------------------------------------------------
# Create PR
# ---------------------------------------------------------------------------
class TestCreatePR(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_pr(self, _auth, mock_api):
mock_api.return_value = {"number": 3, "html_url": "https://example.com/pulls/3"}
result = gitea_create_pr(title="feat: X", head="feat/x", base="main")
self.assertEqual(result["number"], 3)
payload = mock_api.call_args[0][3]
self.assertEqual(payload["head"], "feat/x")
self.assertEqual(payload["base"], "main")
# ---------------------------------------------------------------------------
# Close Issue
# ---------------------------------------------------------------------------
class TestCloseIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_closes_issue(self, _auth, mock_api):
mock_api.return_value = {"state": "closed"}
result = gitea_close_issue(issue_number=42)
self.assertTrue(result["success"])
self.assertIn("42", result["message"])
payload = mock_api.call_args[0][3]
self.assertEqual(payload["state"], "closed")
# ---------------------------------------------------------------------------
# List Issues
# ---------------------------------------------------------------------------
class TestListIssues(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_returns_formatted_list(self, _auth, mock_api):
mock_api.return_value = [
{
"number": 1, "title": "Bug", "state": "open",
"labels": [{"name": "bug"}],
"assignee": {"login": "alice"},
},
{
"number": 2, "title": "Feature", "state": "open",
"labels": [], "assignee": None,
},
]
result = gitea_list_issues()
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["number"], 1)
self.assertEqual(result[0]["labels"], ["bug"])
self.assertEqual(result[0]["assignee"], "alice")
self.assertEqual(result[1]["assignee"], "")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_passes_label_filter(self, _auth, mock_api):
mock_api.return_value = []
gitea_list_issues(label="important")
url = mock_api.call_args[0][1]
self.assertIn("labels=important", url)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_passes_state_filter(self, _auth, mock_api):
mock_api.return_value = []
gitea_list_issues(state="closed")
url = mock_api.call_args[0][1]
self.assertIn("state=closed", url)
# ---------------------------------------------------------------------------
# View Issue
# ---------------------------------------------------------------------------
class TestViewIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_returns_full_details(self, _auth, mock_api):
mock_api.return_value = {
"number": 7, "title": "MCP server", "body": "Build it",
"state": "open", "labels": [{"name": "important"}],
"assignee": {"login": "jason"},
"html_url": "https://gitea.prgs.cc/issues/7",
}
result = gitea_view_issue(issue_number=7, remote="prgs")
self.assertEqual(result["number"], 7)
self.assertEqual(result["body"], "Build it")
self.assertEqual(result["labels"], ["important"])
self.assertEqual(result["assignee"], "jason")
# ---------------------------------------------------------------------------
# Mark Issue
# ---------------------------------------------------------------------------
class TestMarkIssue(unittest.TestCase):
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_start_adds_label(self, _auth, mock_api):
# First call: get labels; second call: add label
mock_api.side_effect = [
[{"id": 10, "name": "status:in-progress"}],
[{"name": "status:in-progress"}],
]
result = gitea_mark_issue(issue_number=5, action="start")
self.assertTrue(result["success"])
self.assertIn("claimed", result["message"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_done_removes_label(self, _auth, mock_api):
mock_api.side_effect = [
[{"id": 10, "name": "status:in-progress"}],
None,
]
result = gitea_mark_issue(issue_number=5, action="done")
self.assertTrue(result["success"])
self.assertIn("released", result["message"])
def test_invalid_action_raises(self):
with self.assertRaises(ValueError):
gitea_mark_issue(issue_number=5, action="pause")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_missing_label_raises(self, _auth, mock_api):
mock_api.return_value = [] # no labels exist
with self.assertRaises(RuntimeError):
gitea_mark_issue(issue_number=5, action="start")
# ---------------------------------------------------------------------------
# Auth errors
# ---------------------------------------------------------------------------
class TestAuthErrors(unittest.TestCase):
@patch("mcp_server.get_auth_header", return_value=None)
def test_no_credentials_raises(self, _auth):
with self.assertRaises(RuntimeError):
gitea_create_issue(title="test")
def test_unknown_remote_raises(self):
with self.assertRaises(ValueError):
gitea_create_issue(title="test", remote="nonexistent")
# ---------------------------------------------------------------------------
# Mirror Refs
# ---------------------------------------------------------------------------
class TestMirrorRefs(unittest.TestCase):
@patch("mcp_server.subprocess.run")
def test_dry_run_default(self, mock_run):
mock_run.return_value = MagicMock(
stdout="DRY RUN\n", stderr="", returncode=0
)
result = gitea_mirror_refs()
self.assertEqual(result["return_code"], 0)
# Should NOT have --apply
args = mock_run.call_args[0][0]
self.assertNotIn("--apply", args)
self.assertNotIn("--force", args)
@patch("mcp_server.subprocess.run")
def test_apply_flag(self, mock_run):
mock_run.return_value = MagicMock(
stdout="done\n", stderr="", returncode=0
)
result = gitea_mirror_refs(apply=True)
args = mock_run.call_args[0][0]
self.assertIn("--apply", args)
@patch("mcp_server.subprocess.run")
def test_force_flag(self, mock_run):
mock_run.return_value = MagicMock(
stdout="", stderr="", returncode=0
)
gitea_mirror_refs(apply=True, force=True)
args = mock_run.call_args[0][0]
self.assertIn("--apply", args)
self.assertIn("--force", args)
if __name__ == "__main__":
unittest.main()