diff --git a/mcp_server.py b/mcp_server.py index f67f0c1..f5109ac 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -48,6 +48,74 @@ mcp = FastMCP("gitea-tools", instructions=( )) + +import re + +def extract_linked_issue_numbers(text: str | None, branch_name: str | None = None) -> list[int]: + issues = set() + if text: + pattern = re.compile(r'(?i)(?:close[sd]?|fix(?:e[sd])?|resolve[sd]?|ref[s]?)\s+#(\d+)') + issues.update(int(m) for m in pattern.findall(text)) + if branch_name: + pattern = re.compile(r'(?i)issue-(\d+)') + issues.update(int(m) for m in pattern.findall(branch_name)) + return sorted(list(issues)) + +def release_in_progress_label(issue_numbers: list[int], remote: str, host: str | None, org: str | None, repo: str | None) -> dict: + if not issue_numbers: + return {} + + h, o, r = _resolve(remote, host, org, repo) + auth = _auth(h) + base = repo_api_url(h, o, r) + + try: + labels = api_request("GET", f"{base}/labels?limit=100", auth) + label_id = None + for lb in labels: + if lb["name"] == "status:in-progress": + label_id = lb["id"] + break + except Exception as exc: + return {num: f"error fetching repo labels: {_redact(str(exc))}" for num in issue_numbers} + + results = {} + if label_id is None: + for num in issue_numbers: + results[num] = "not present" + return results + + for num in issue_numbers: + try: + url = f"{base}/issues/{num}" + issue_data = api_request("GET", url, auth) + issue_labels = [lb["name"] for lb in issue_data.get("labels", [])] + + if "status:in-progress" in issue_labels: + with _audited("release_in_progress_label", host=h, remote=remote, org=o, repo=r, issue_number=num, request_metadata={"action": "remove status:in-progress"}): + api_request("DELETE", f"{url}/labels/{label_id}", auth) + results[num] = "released" + else: + results[num] = "not present" + except Exception as exc: + results[num] = f"error: {_redact(str(exc))}" + + return results + +def cleanup_in_progress_for_pr(pr_payload: dict, remote: str, host: str | None, org: str | None, repo: str | None) -> dict: + body = pr_payload.get("body") or "" + title = pr_payload.get("title") or "" + branch = pr_payload.get("head", {}).get("ref") or "" + + text = f"{title}\n{body}" + issues = extract_linked_issue_numbers(text, branch) + + if not issues: + return {"cleanup_status": "no linked issue found"} + + results = release_in_progress_label(issues, remote, host, org, repo) + return {"cleanup_status": results} + # ── Helpers ─────────────────────────────────────────────────────────────────── def _resolve(remote: str, host: str | None, org: str | None, repo: str | None): @@ -1603,63 +1671,3 @@ def gitea_mirror_refs( if __name__ == "__main__": mcp.run(transport="stdio") -import re - -def extract_linked_issue_numbers(text: str | None, branch_name: str | None = None) -> list[int]: - issues = set() - if text: - pattern = re.compile(r'(?i)(?:close[sd]?|fix(?:e[sd])?|resolve[sd]?|ref[s]?)\s+#(\d+)') - issues.update(int(m) for m in pattern.findall(text)) - if branch_name: - pattern = re.compile(r'(?i)issue-(\d+)') - issues.update(int(m) for m in pattern.findall(branch_name)) - return sorted(list(issues)) - -def release_in_progress_label(issue_numbers: list[int], remote: str, host: str | None, org: str | None, repo: str | None) -> dict: - if not issue_numbers: - return {} - - h, o, r = _resolve(remote, host, org, repo) - auth = _auth(h) - base = repo_api_url(h, o, r) - - try: - existing = api_request("GET", f"{base}/labels?limit=100", auth) - name_to_id = {lb["name"]: lb["id"] for lb in existing} - except Exception as exc: - return {num: f"error fetching repo labels: {_redact(str(exc))}" for num in issue_numbers} - - results = {} - for num in issue_numbers: - try: - url = f"{base}/issues/{num}" - issue_data = api_request("GET", url, auth) - labels = [lb["name"] for lb in issue_data.get("labels", [])] - - if "status:in-progress" in labels: - new_labels = [lb for lb in labels if lb != "status:in-progress"] - label_ids = [name_to_id[name] for name in new_labels if name in name_to_id] - - with _audited("release_in_progress_label", host=h, remote=remote, org=o, repo=r, issue_number=num, request_metadata={"action": "remove status:in-progress"}): - api_request("PUT", f"{url}/labels", auth, {"labels": label_ids}) - results[num] = "released" - else: - results[num] = "not present" - except Exception as exc: - results[num] = f"error: {_redact(str(exc))}" - - return results - -def cleanup_in_progress_for_pr(pr_payload: dict, remote: str, host: str | None, org: str | None, repo: str | None) -> dict: - body = pr_payload.get("body") or "" - title = pr_payload.get("title") or "" - branch = pr_payload.get("head", {}).get("ref") or "" - - text = f"{title}\n{body}" - issues = extract_linked_issue_numbers(text, branch) - - if not issues: - return {"cleanup_status": "no linked issue found"} - - results = release_in_progress_label(issues, remote, host, org, repo) - return {"cleanup_status": results} diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index ecedcd8..4e457ac 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -1377,9 +1377,8 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}] if method == "GET" and "issues/1" in url: return {"labels": [{"name": "status:in-progress"}, {"name": "bug"}]} - if method == "PUT" and "labels" in url: - self.assertEqual(payload["labels"], [2]) - return [] + if method == "DELETE" and "labels/1" in url: + return {} return {} self.mock_api.side_effect = api_side_effect @@ -1396,8 +1395,8 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}] if method == "GET" and "issues/1" in url: return {"labels": [{"name": "bug"}]} - if method == "PUT" and "labels" in url: - self.fail("Should not PUT labels") + if method == "DELETE" and "labels/1" in url: + self.fail("Should not DELETE labels") return {} self.mock_api.side_effect = api_side_effect @@ -1426,9 +1425,8 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}] if method == "GET" and "issues/123" in url: return {"labels": [{"name": "status:in-progress"}, {"name": "bug"}]} - if method == "PUT" and "labels" in url: - self.assertEqual(payload["labels"], [2]) - return [] + if method == "DELETE" and "labels/1" in url: + return {} return {} self.mock_api.side_effect = api_side_effect @@ -1457,9 +1455,8 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return [{"name": "status:in-progress", "id": 1}, {"name": "bug", "id": 2}] if method == "GET" and "issues/123" in url: return {"labels": [{"name": "status:in-progress"}, {"name": "bug"}]} - if method == "PUT" and "labels" in url: - self.assertEqual(payload["labels"], [2]) - return [] + if method == "DELETE" and "labels/1" in url: + return {} return {} self.mock_api.side_effect = api_side_effect @@ -1482,9 +1479,8 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return [{"name": "status:in-progress", "id": 1}] if method == "GET" and "issues/123" in url: return {"labels": [{"name": "status:in-progress"}]} - if method == "PUT" and "labels" in url: - self.assertEqual(payload["labels"], []) - return [] + if method == "DELETE" and "labels/1" in url: + return {} if method == "POST" and "comments" in url: return {} return {} @@ -1513,9 +1509,8 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return {"labels": [{"name": "status:in-progress"}]} if method == "GET" and "issues/125" in url: return {"labels": []} - if method == "PUT" and "labels" in url: - self.assertEqual(payload["labels"], []) - return [] + if method == "DELETE" and "labels/1" in url: + return {} if method == "POST" and "comments" in url: return {} return {} @@ -1553,7 +1548,7 @@ class TestTrackerHygieneCleanup(unittest.TestCase): return [{"name": "status:in-progress", "id": 1}] if method == "GET" and "issues/1" in url: return {"labels": [{"name": "status:in-progress"}]} - if method == "PUT" and "labels" in url: + if method == "DELETE" and "labels/1" in url: raise RuntimeError("API failure") return {} self.mock_api.side_effect = api_side_effect