13 Commits

Author SHA1 Message Date
sysadmin 349bc06da7 Merge pull request 'feat: add Gitea issue comment list/create MCP tools (#126)' (#127) from feat/issue-126-issue-comment-tools into master 2026-07-03 18:40:38 -05:00
sysadmin 5aeb51f132 feat: add Gitea issue comment list/create MCP tools (#126)
Add gitea_list_issue_comments and gitea_create_issue_comment so
discussion/design workflows can read and post issue comments through
the MCP layer instead of direct API scripts.

- List requires gitea.read; create requires gitea.issue.comment —
  gated separately from the gitea.pr.* review/merge family, fail closed.
- Issue comments never touch PR review endpoints.
- LLM-safe output: comment id/author/timestamps/body only; web links
  appear solely under the GITEA_MCP_REVEAL_ENDPOINTS admin opt-in.
- Create operations are audit-logged (create_issue_comment) and errors
  are redacted before being raised.
- Tests cover list/create success, permission blocks (including PR
  review permissions not granting issue comments), forbidden-overrides,
  empty body, missing issue with redacted error, endpoint separation,
  and reveal opt-in.
- Document issue comments versus PR reviews in
  docs/gitea-execution-profiles.md.

Closes #126

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 19:07:36 -04:00
sysadmin 9c44fd6b27 Merge pull request 'feat: extend whoami and profile metadata for environments (#104)' (#124) from feat/issue-104-whoami-metadata into master 2026-07-03 17:15:33 -05:00
sysadmin e880a210ec feat: extend whoami and profile metadata for environments (#104)
Closes #104
2026-07-03 18:11:35 -04:00
sysadmin 79450b57f5 Merge pull request 'feat(config): add v1-to-v2 profiles.json migration helper (#105)' (#123) from feat/issue-105-profiles-migration into master 2026-07-03 17:05:05 -05:00
sysadmin 23aa2fb192 fix: harden profiles migration helper 2026-07-03 17:53:07 -04:00
sysadmin 9f75e28094 Merge pull request 'docs: re-land release version SOP with v1.1.0 audit lessons (#111)' (#119) from docs/issue-111-release-version-sop into master 2026-07-03 16:09:40 -05:00
sysadmin c063842b2e Merge pull request 'feat: operation-name normalization table with fail-closed enforcement (#106)' (#122) from feat/issue-106-op-normalization into master 2026-07-03 15:24:34 -05:00
sysadmin cd633e2c2b feat(config): add v1-to-v2 profiles.json migration helper (#105) 2026-07-03 04:24:45 -04:00
sysadmin e0861bcb03 feat: operation-name normalization table with fail-closed enforcement (#106)
Promote the #103 minimal alias map to the documented public table
GITEA_OPERATION_ALIASES and add the #106 enforcement layer:

- normalize_operation(op, service): canonical namespaced names; legacy
  spellings accepted only via the explicit table; unknown, ambiguous,
  and cross-service names fail closed.
- check_operation(op, allowed, forbidden, service): normalizes BOTH the
  requested operation and the profile lists before any membership
  check; forbidden always overrides allowed; unnormalizable allowed
  entries grant nothing and unnormalizable forbidden entries deny the
  request, so normalization can never silently widen permissions;
  empty/missing allowed list denies everything.
- gitea_check_pr_eligibility now routes its capability check through
  check_operation, fixing the mismatch where canonical namespaced
  profile ops (gitea.pr.merge) never matched the raw action (merge)
  and namespaced forbidden entries were never enforced.
- Document the normalization table and enforcement rules in
  docs/gitea-execution-profiles.md, replacing the stale 'enforcement
  out of scope' caveat.
- tests/test_op_normalization.py: full #106 matrix (27 tests) —
  qualified/legacy allowed and forbidden, unknown, ambiguous, service
  mismatch, forbidden-overrides-allowed, empty/missing allowed,
  duplicates after normalization, no permission widening, and
  eligibility integration proving normalization happens before
  enforcement. Existing v1/env unqualified behaviour stays compatible.

Closes #106

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 03:35:03 -04:00
sysadmin 9d6a2e0a5f docs: re-land release version SOP with v1.1.0 audit lessons (#111) 2026-07-03 03:24:26 -04:00
sysadmin 205f089c44 Merge pull request 'feat: profiles.json v2 contexts shape with enabled enforcement and LLM-safe output (#120)' (#121) from feat/issue-120-profiles-v2-contexts into master 2026-07-03 01:36:21 -05:00
sysadmin ff920a6496 feat: load profiles.json v2 contexts shape with enabled enforcement and LLM-safe output (#120)
Support the canonical contexts-shape version 2 config (contexts / profiles /
projects / rules) alongside the existing environments shape and v1:

- Require a boolean 'enabled' on every context, profile, service, and
  project. Disabled entries are surfaced in audits but fail closed at
  selection/resolution — never a silent fallback to another profile,
  service, or credential source.
- Resolve the active identity from GITEA_MCP_PROFILE via the existing
  select_profile path; profile base_url falls back to the context's enabled
  gitea block.
- Add resolve_service() and project_for_path() for context service and
  project-to-context resolution (internal use; fail closed on disabled).
- get_auth_header now propagates ConfigError when GITEA_MCP_CONFIG is set
  instead of silently degrading to Basic auth.
- Hide endpoint URLs and keychain ids from normal LLM-facing output:
  gitea_whoami / gitea_get_profile report logical names and auth status
  only; new gitea_audit_config tool reports enabled/disabled state and safe
  one-line service summaries. The GITEA_MCP_REVEAL_ENDPOINTS opt-in (and
  'python3 gitea_config.py audit --reveal-endpoints' locally) restores
  endpoints and auth source names for admin diagnostics; token values are
  never printed on any path.
- Ship gitea-mcp.v2-contexts.example.json (synthetic values) and validate
  it in tests.

Implements #120

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 02:19:39 -04:00
14 changed files with 2759 additions and 45 deletions
+1
View File
@@ -6,6 +6,7 @@ __pycache__/
# Real JSON runtime-profile configs may reference private hosts; keep only the example. # Real JSON runtime-profile configs may reference private hosts; keep only the example.
gitea-mcp*.json gitea-mcp*.json
!gitea-mcp.example.json !gitea-mcp.example.json
!gitea-mcp.v2-contexts.example.json
.vscode/ .vscode/
graphify-out/ graphify-out/
branches/ branches/
+68 -2
View File
@@ -134,8 +134,74 @@ Rules:
appears in both, it is forbidden. appears in both, it is forbidden.
- An operation not present in `allowed_operations` is treated as **not - An operation not present in `allowed_operations` is treated as **not
allowed** (deny by default). allowed** (deny by default).
- These categories are descriptive for this issue. Their runtime enforcement is
out of scope here (see roadmap links). ## Operation-name normalization (#106)
Canonical operation names are namespaced: `{service}.{area}.{verb}` (e.g.
`gitea.pr.merge`, `jenkins.build.read`). Legacy unqualified spellings are
accepted **only** through the explicit alias table below (the code of record
is `GITEA_OPERATION_ALIASES` in `gitea_config.py`; the enforcement matrix is
`tests/test_op_normalization.py`).
| Legacy spelling | Canonical operation |
|-------------------|----------------------------|
| `read` | `gitea.read` |
| `review` | `gitea.pr.review` |
| `comment` | `gitea.pr.comment` |
| `approve` | `gitea.pr.approve` |
| `request_changes` | `gitea.pr.request_changes` |
| `merge` | `gitea.pr.merge` |
| `pr.create` | `gitea.pr.create` |
| `branch.push` | `gitea.branch.push` |
| `branch` | `gitea.branch.create` |
| `commit` | `gitea.repo.commit` |
| `push` | `gitea.branch.push` |
| `open_pr` | `gitea.pr.create` |
For non-Gitea services, a single unqualified word namespaces to the checked
service (`read``jenkins.read` when checking Jenkins); names already
prefixed with that service pass through unchanged.
Enforcement rules (`gitea_config.check_operation`, run **before** any
allowed/forbidden membership check):
- Unknown operation names fail closed (denied).
- Ambiguous names — dotted names that are neither service-prefixed nor in the
alias table — fail closed.
- Cross-service names are never accepted by the wrong service
(`jenkins.read` never matches a Gitea check, and a Gitea alias is never
applied to another service).
- `forbidden_operations` overrides `allowed_operations` after both sides are
normalized, so a legacy spelling can never bypass a canonical forbidden
entry (or vice versa).
- An allowed entry that cannot be normalized grants nothing; a forbidden
entry that cannot be normalized denies the request. Normalization can
therefore never silently widen permissions.
- An empty or missing `allowed_operations` list denies everything.
## Issue comments versus PR reviews (#126)
Issue discussion comments and PR reviews are different capabilities and are
gated by different operations:
- **Issue comments** (`gitea_list_issue_comments`, `gitea_create_issue_comment`)
post to and read from an issue's discussion thread
(`/issues/{n}/comments`). Listing requires `gitea.read`; creating requires
`gitea.issue.comment`. They never submit review verdicts.
- **PR reviews** (`gitea_review_pr`, `gitea_submit_pr_review`) submit
approve/request-changes/comment verdicts on pull requests
(`/pulls/{n}/reviews`) and are gated by the `gitea.pr.*` family
(`gitea.pr.review`, `gitea.pr.approve`, `gitea.pr.request_changes`,
`gitea.pr.comment`).
A profile holding the full PR review/merge set still cannot post issue
discussion comments unless it also allows `gitea.issue.comment`, and vice
versa — neither family implies the other. Both comment tools require an
explicit issue number; the target repo comes only from the standard
remote/org/repo arguments. Create operations are audit-logged
(`create_issue_comment`) when `GITEA_AUDIT_LOG` is configured, errors are
redacted, and normal output contains no endpoint URLs
(`GITEA_MCP_REVEAL_ENDPOINTS=1` is the local admin opt-in for web links).
## Identity and fail-closed rules ## Identity and fail-closed rules
+3
View File
@@ -423,6 +423,9 @@ with the profile and authenticated user when `GITEA_AUDIT_LOG` is set (see
## Releases and version tags ## Releases and version tags
All release tagging, version bumps, and validation must comply with the [Release / Version Process SOP](release-version-sop.md).
Versions follow SemVer — **`vMAJOR.MINOR.PATCH`**, using **`v0.x.y`** while Versions follow SemVer — **`vMAJOR.MINOR.PATCH`**, using **`v0.x.y`** while
unstable. Pick the bump by the largest change since the last tag: unstable. Pick the bump by the largest change since the last tag:
+189
View File
@@ -0,0 +1,189 @@
# Release / Version Process SOP
Operator standard operating procedure for cutting a versioned release of
Gitea-Tools: version bump, checks, merge, tag, and cleanup.
> **Scope.** This is the **human/operator** SOP. It is deliberately distinct
> from [`release-workflows.md`](release-workflows.md), which describes the
> **future `release-mcp` orchestrator** boundary (a coordination concept), not
> the day-to-day tagging process. When they disagree, this document governs how
> a release is actually cut today.
---
## 1. Branch flow
The repo is **`master`-based**. Releases are cut from `master`; there is no
separate `dev`/`release` branch unless and until that is explicitly introduced
and this SOP is updated to match. All work lands on `master` via reviewed PRs
from short-lived, issue-linked branches (e.g. `docs/issue-68-...`).
## 2. Where "the version" lives
There is **no `VERSION` file and no `CHANGELOG` file** in the repo today. The
released version is expressed **only as an annotated git tag** of the form
`vMAJOR.MINOR.PATCH` (existing tags: `v1.0.0`, `v1.0.1`). Release notes are
carried as the **annotated tag's message** (via `--notes-file`), not a tracked
changelog.
> Do **not** confuse this with `SUPPORTED_VERSION` in `gitea_config.py` — that is
> the **config-schema** version, unrelated to the application release version.
If a `VERSION`/`CHANGELOG` file is added later, update this SOP to list it under
"files to update".
## 3. Deciding the version bump (SemVer)
Pick the bump against the last tag using semantic-versioning intent:
* **PATCH** (`v1.0.1 → v1.0.2`): bug fixes, docs, tests, internal cleanups — no
change to tool names, parameters, return payloads, or behavior.
* **MINOR** (`v1.0.1 → v1.1.0`): backward-compatible additions — new MCP tool,
new optional parameter, new script, additive behavior.
* **MAJOR** (`v1.1.0 → v2.0.0`): backward-**incompatible** changes — renamed or
removed tools, changed return-payload shape, changed default behavior, or a
tightened safety gate that rejects previously-accepted input.
When unsure between two levels, choose the higher one.
## 4. Preparing a version-bump / release PR
Releases are still gated by the normal issue-first, PR-reviewed flow.
1. Open (or use) a tracking issue for the release and **claim it** with
`status:in-progress` (see §9).
2. Create an isolated, issue-linked branch + worktree from latest `master`
(e.g. `chore/issue-63-v1.1.0`). Never commit directly to `master`.
3. Include in the PR:
* Any code/docs changes that belong to the release.
* The **release notes** for the annotated tag (draft them in the PR body or a
notes file you will pass to `scripts/release-tag --notes-file`).
* If a `VERSION`/`CHANGELOG` file exists at that time, its update.
4. Open the PR **targeting `master`**.
The tag is **not** created in the PR. Tagging happens only after merge (§6).
## 5. Required checks before release
Run all of these green before merging the release PR and before tagging:
```bash
python3 -m py_compile mcp_server.py
python3 -m py_compile manage_labels.py
bash -n scripts/clear-provenance
./venv/bin/python -m pytest tests/ -q
git diff --check
```
Plus a secret sweep (there is no third-party scanner wired in; do a staged-diff
sweep — see [`developer-testing-guidelines.md`](developer-testing-guidelines.md)
§7):
```bash
git diff --cached | grep -nEi "authorization: (basic|bearer)|password[:=]|token=[A-Za-z0-9]" || echo "clean"
```
`scripts/release-tag` **also** runs the test suite itself before tagging (unless
`--skip-tests` is passed), so tests are enforced twice by default.
## 6. Running `scripts/release-tag`
Tag **only after** the release PR is merged to `master`. `scripts/release-tag`
enforces the tagging policy and is **safe by default** (creates nothing on a
dry-run; never pushes without `--push`).
Before it tags, it requires **all** of:
* version matches `vMAJOR.MINOR.PATCH` (SemVer);
* `fetch --prune` has run;
* you are **on `master`**;
* the worktree is **clean** (no uncommitted changes);
* local `master` **equals** `<remote>/master`;
* `HEAD` is that same commit (the commit is present on remote master);
* the tag does **not** already exist locally or on the remote;
* the test suite passes (unless `--skip-tests`, which warns).
Typical sequence:
```bash
# 1. Dry-run to confirm the plan (changes nothing)
scripts/release-tag --dry-run v1.1.0
# 2. Create the annotated tag locally, with release notes
scripts/release-tag v1.1.0 --notes-file /path/to/release-notes.md
# 3. Push the tag only when ready
scripts/release-tag v1.1.0 --notes-file /path/to/release-notes.md --push
```
Env injection points (mainly for CI/tests):
`RELEASE_TAG_REMOTE` (default `prgs`), `RELEASE_TAG_TEST_CMD`
(default `./venv/bin/python -m pytest tests/ -q`).
## 7. Who may merge / tag
* The release PR must be **merged by someone other than its author** — the
author-cannot-merge safety gate applies to releases exactly as to any other PR.
* Merge uses the gated `gitea_merge_pr` workflow; CLI/legacy merge is disabled.
* Whoever tags must operate on clean master synced to the remote (enforced by
`scripts/release-tag`). Tagging is an operator action performed after merge.
## 8. Self-review / self-merge restrictions
Release PRs are **not** exempt from the safety model:
* No self-review — the author may not approve their own release PR.
* No self-merge — a different eligible identity merges.
* These gates are enforced by the MCP tooling and must not be bypassed.
## 9. Handling `status:in-progress` during release work
* **Claim** the release tracking issue with `status:in-progress` before starting.
* Keep it claimed while the release PR is open and under review.
* On merge/close, the tracker-hygiene automation releases `status:in-progress`
for issues the PR closes; if it remains after the release lands, release it
explicitly. Do not leave a shipped release issue marked in-progress.
## 10. Branch / worktree cleanup after merge
After the release PR merges and the tag is pushed:
* Delete the remote release branch (if repo policy allows).
* Remove the local worktree and delete the local branch:
```bash
git worktree remove branches/<release-worktree>
git branch -d <release-branch>
git worktree prune
```
* Confirm the root repo is clean and on `master` synced to the remote.
## 11. What NOT to do
* **No direct commits to `master`.** All changes land via reviewed PRs.
* **No force-push** (to `master` or to tags).
* **No self-merge** of a release PR.
* **No tagging before merge** — tag only commits already on remote `master`.
* **No release from a dirty worktree** — `scripts/release-tag` refuses, and so
should you.
* **No `--skip-tests`** for a real release unless there is an explicit,
documented reason.
* **No re-tagging / moving an existing tag** — pick the next version instead.
## 12. Post-Merge Verification & Audit Lessons (v1.1.0)
During the v1.1.0 release audit, we identified a critical reconciliation issue (captured in historical PRs/issues #68 and #82):
* **The "Closed" State Trap:** Gitea PRs marked as `closed` are not guaranteed to be `merged` (they can be closed without merging, leading to silent omissions of code/documentation changes).
* **Mandatory Post-Merge File/Commit Presence Probe:** Reviewers/mergers must perform explicit post-merge validation. Do not assume a merge succeeded.
- Check that the merged branch head is an ancestor of the target branch (`master`):
```bash
git fetch <remote> --prune
git merge-base --is-ancestor <pr-head-sha> <remote>/master
```
- Probe file presence for expected modifications/additions:
```bash
git log --oneline -- <expected-file>
# and confirm file presence:
ls -la docs/release-version-sop.md
```
* **Verify in Handoff:** Final report blocks must explicitly document the verification method and probe results.
+80
View File
@@ -0,0 +1,80 @@
{
"version": 2,
"contexts": {
"example-context": {
"enabled": true,
"label": "Example environment",
"description": "One deployment environment: its Gitea plus non-Gitea services.",
"default_owner": "Example-Org",
"gitea": {
"enabled": true,
"kind": "gitea",
"base_url": "https://gitea.example.invalid"
},
"services": {
"jenkins": {
"enabled": true,
"kind": "jenkins",
"label": "Example Jenkins",
"base_url": "https://jenkins.example.invalid",
"auth": { "type": "keychain", "id": "example-jenkins-token" },
"capabilities": ["read"]
},
"glitchtip": {
"enabled": false,
"kind": "glitchtip",
"label": "Example GlitchTip (disabled: defined but unavailable)",
"base_url": "",
"auth": { "type": "keychain", "id": "example-glitchtip-token" },
"capabilities": ["read"],
"allow_raw_events": false
}
}
}
},
"profiles": {
"example-author": {
"enabled": true,
"context": "example-context",
"role": "author",
"username": "author-user",
"execution_profile": "example-author",
"audit_label": "example-author",
"auth": { "type": "keychain", "id": "example-gitea-author-token" },
"allowed_operations": ["read", "branch", "commit", "push", "open_pr", "comment"],
"forbidden_operations": ["approve", "request_changes", "merge"]
},
"example-reviewer": {
"enabled": true,
"context": "example-context",
"role": "reviewer",
"username": "reviewer-user",
"execution_profile": "example-reviewer",
"audit_label": "example-reviewer",
"auth": { "type": "keychain", "id": "example-gitea-reviewer-token" },
"allowed_operations": ["read", "review", "comment", "approve", "request_changes", "merge"],
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
}
},
"projects": {
"/absolute/path/to/local/repo": {
"enabled": true,
"context": "example-context",
"default_owner": "Example-Org",
"default_repo": "Example-Repo",
"default_author_profile": "example-author",
"default_reviewer_profile": "example-reviewer"
}
},
"rules": {
"disabled_behavior": "Defined but unavailable for action. MCP tools may report disabled entries during audits, but must not use them automatically.",
"no_silent_fallback": true,
"tokens_in_json": false,
"token_storage": "keychain",
"identity_must_match_task": true,
"same_username_cannot_review_own_pr": true,
"hide_service_urls_from_llm": true,
"hide_keychain_ids_from_llm": true,
"mcp_resolves_endpoints": true
}
}
+24 -4
View File
@@ -123,13 +123,17 @@ def get_auth_header(host):
token = os.environ.get("GITEA_TOKEN") token = os.environ.get("GITEA_TOKEN")
# 3. Fall back to a JSON runtime-profile token reference (token_env). # 3. Fall back to a JSON runtime-profile token reference (token_env).
# Explicit env tokens above take precedence. A broken config never breaks # Explicit env tokens above take precedence. When GITEA_MCP_CONFIG is
# auth here — it fails closed to "no token"; the clear error surfaces via # configured, a broken config or unresolvable profile/credential fails
# get_profile() / startup instead. # closed here (no silent fallback to Basic auth or another source,
# #120). Without a configured JSON layer, env-only behaviour is
# unchanged.
if not token: if not token:
try: try:
token = gitea_config.resolve_token(gitea_config.resolve_profile()) token = gitea_config.resolve_token(gitea_config.resolve_profile())
except gitea_config.ConfigError: except gitea_config.ConfigError:
if gitea_config.config_path():
raise
token = None token = None
if token: if token:
@@ -469,13 +473,29 @@ def get_profile():
token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() \ token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() \
or gitea_config.auth_source_name(jp) or gitea_config.auth_source_name(jp)
base_url = os.environ.get("GITEA_BASE_URL") or jp.get("base_url") or None base_url = os.environ.get("GITEA_BASE_URL") or jp.get("base_url") or None
auth_type = None
if isinstance(jp.get("auth"), dict):
auth_type = jp["auth"].get("type")
elif token_source:
if token_source.startswith("keychain:"):
auth_type = "keychain"
else:
auth_type = "env"
return { return {
"profile_name": name, "profile_name": name,
"allowed_operations": ops, "allowed_operations": ops,
"forbidden_operations": forbidden, "forbidden_operations": forbidden,
"audit_label": audit_label, "audit_label": audit_label,
"token_source_name": token_source, "token_source_name": token_source,
"auth_source_type": auth_type,
"base_url": base_url, "base_url": base_url,
"username": jp.get("username") or None, "username": jp.get("username") or None,
"default_owner": jp.get("default_owner") or None, "default_owner": jp.get("default_owner") or None,
} "profile_path": jp.get("profile_path") or None,
"environment": jp.get("environment") or None,
"service": jp.get("service") or None,
"identity": jp.get("identity") or None,
"role": jp.get("role") or None,
"execution_profile": jp.get("execution_profile") or None,
}
+453 -17
View File
@@ -70,11 +70,13 @@ _TBD_RE = re.compile(r"(?i)^tbd(-|$)")
# Keys that would mean an inline secret wherever they appear. # Keys that would mean an inline secret wherever they appear.
_INLINE_SECRET_KEYS = ("token", "password", "secret") _INLINE_SECRET_KEYS = ("token", "password", "secret")
# ── Minimal operation normalization (#103) ───────────────────────────────────── # ── Operation-name normalization table (#106; minimal subset landed in #103) ───
# Only what the #103 invariants need. The full normalization table, deprecation # Canonical operations are namespaced ({service}.{area}.{verb}). Legacy
# handling, and enforcement test matrix belong to issue #106 — do not grow this # unqualified spellings are accepted ONLY through this explicit table — never
# beyond invariant safety here. # by guessing. The same table is the documentation of record (see
_MINIMAL_GITEA_OP_MAP = { # docs/gitea-execution-profiles.md) and is exercised by
# tests/test_op_normalization.py.
GITEA_OPERATION_ALIASES = {
"read": "gitea.read", "read": "gitea.read",
"review": "gitea.pr.review", "review": "gitea.pr.review",
"comment": "gitea.pr.comment", "comment": "gitea.pr.comment",
@@ -83,32 +85,94 @@ _MINIMAL_GITEA_OP_MAP = {
"merge": "gitea.pr.merge", "merge": "gitea.pr.merge",
"pr.create": "gitea.pr.create", "pr.create": "gitea.pr.create",
"branch.push": "gitea.branch.push", "branch.push": "gitea.branch.push",
# Contexts-shape author verbs (#120) — the invariant checks below depend on
# "push"/"open_pr" normalizing to the two author-only ops.
"branch": "gitea.branch.create",
"commit": "gitea.repo.commit",
"push": "gitea.branch.push",
"open_pr": "gitea.pr.create",
} }
_REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"}) _REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"}) _AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
def _normalize_op(service, op, addr): def normalize_operation(op, service="gitea"):
"""Normalize *op* for *service*, or fail closed (#103 minimal subset). """Return the canonical namespaced name for *op*, or fail closed (#106).
- already namespaced for this service (``{service}.*``) → unchanged - already namespaced for this service (``{service}.*``) → unchanged
- known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP`` - known unqualified Gitea ops → mapped via ``GITEA_OPERATION_ALIASES``
- unqualified single-word ops on non-Gitea services → ``{service}.{op}`` - unqualified single-word ops on non-Gitea services → ``{service}.{op}``
- anything else (foreign prefixes, unknown unqualified names) → ConfigError - anything else foreign service prefixes, dotted names outside the
table, unknown unqualified names — is unknown or ambiguous → ConfigError
Normalization never crosses services (a Gitea alias is never applied to
another service) and never widens permissions: an operation that cannot
be normalized grants and matches nothing.
""" """
if not isinstance(op, str) or not op: if not isinstance(op, str) or not op:
raise ConfigError(f"identity '{addr}' has an empty or non-string operation") raise ConfigError("operation must be a non-empty string (fail closed)")
if op.startswith(service + "."): if op.startswith(service + "."):
return op return op
if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP: if service == "gitea" and op in GITEA_OPERATION_ALIASES:
return _MINIMAL_GITEA_OP_MAP[op] return GITEA_OPERATION_ALIASES[op]
if service != "gitea" and "." not in op: if service != "gitea" and "." not in op:
return f"{service}.{op}" return f"{service}.{op}"
raise ConfigError( raise ConfigError(
f"identity '{addr}' has operation {op!r} that cannot be normalized " f"operation {op!r} cannot be normalized safely for service "
f"safely for service '{service}' (fail closed; full table is issue #106)" f"'{service}' (unknown, ambiguous, or cross-service; fail closed)"
) )
def check_operation(op, allowed, forbidden=(), service="gitea"):
"""Decide whether *op* is permitted. Returns ``(bool, reason)`` (#106).
Everything is normalized via :func:`normalize_operation` BEFORE any
membership check, so legacy and canonical spellings always compare equal.
Reasons: ``allowed``, ``invalid-operation``, ``invalid-forbidden-entry``,
``forbidden``, ``no-allowed-operations``, ``not-allowed``.
Fail-closed rules:
- an *op* that cannot be normalized is denied (``invalid-operation``)
- a forbidden entry that cannot be normalized denies the request
(``invalid-forbidden-entry``) — dropping it would silently narrow the
forbidden set, i.e. widen permissions
- an allowed entry that cannot be normalized is ignored — it grants
nothing, so permissions never widen
- ``forbidden`` always overrides ``allowed``
- an empty or missing allowed list denies everything
"""
try:
op_n = normalize_operation(op, service)
except ConfigError:
return (False, "invalid-operation")
forbidden_n = set()
for entry in (forbidden or ()):
try:
forbidden_n.add(normalize_operation(entry, service))
except ConfigError:
return (False, "invalid-forbidden-entry")
if op_n in forbidden_n:
return (False, "forbidden")
if not allowed:
return (False, "no-allowed-operations")
allowed_n = set()
for entry in allowed:
try:
allowed_n.add(normalize_operation(entry, service))
except ConfigError:
continue
if op_n in allowed_n:
return (True, "allowed")
return (False, "not-allowed")
def _normalize_op(service, op, addr):
"""Normalize *op* for identity *addr*, or fail closed with context."""
try:
return normalize_operation(op, service)
except ConfigError as exc:
raise ConfigError(f"identity '{addr}': {exc}") from None
# Default canonical config location (one file shared by all LLM launchers). # Default canonical config location (one file shared by all LLM launchers).
DEFAULT_CONFIG_PATH = os.path.join( DEFAULT_CONFIG_PATH = os.path.join(
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json" os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
@@ -169,7 +233,7 @@ def load_config(path=None):
f"expected one of {list(SUPPORTED_VERSIONS)}" f"expected one of {list(SUPPORTED_VERSIONS)}"
) )
if version == 2: if version == 2:
return _load_v2(data, path) return _load_v2_any(data, path)
if version != SUPPORTED_VERSION: if version != SUPPORTED_VERSION:
raise ConfigError( raise ConfigError(
f"{path} has unsupported version {version!r}; " f"{path} has unsupported version {version!r}; "
@@ -345,6 +409,363 @@ def _load_v2(data, path):
} }
# ── profiles.json version 2 *contexts* shape (#120) ───────────────────────────
# The canonical machine config groups everything by context: top-level
# "contexts" (each with a gitea block and non-Gitea "services"), flat
# "profiles" (Gitea identities pointing at a context), "projects" (local repo
# paths mapped to a context), and "rules". Every context/profile/service/
# project carries a required boolean "enabled": disabled entries are surfaced
# in audits but fail closed at selection — never a silent fallback. Loading
# flattens profiles into the same {"profiles": {...}, "unavailable": {...}}
# model v1 consumers and select_profile() already understand, and carries the
# validated "contexts"/"projects"/"rules" through for service resolution.
def _load_v2_any(data, path):
"""Dispatch a version-2 file to its shape loader; ambiguity fails closed."""
has_contexts = "contexts" in data
has_environments = "environments" in data
if has_contexts and has_environments:
raise ConfigError(
f"{path} version 2 config must not mix 'contexts' and "
"'environments' shapes (ambiguous; fail closed)"
)
if has_contexts:
return _load_v2_contexts(data, path)
return _load_v2(data, path)
def _require_enabled(kind, name, obj):
"""Return the required boolean ``enabled`` flag, failing closed."""
enabled = obj.get("enabled")
if not isinstance(enabled, bool):
raise ConfigError(
f"{kind} '{name}' requires a boolean 'enabled' flag (fail closed)"
)
return enabled
def _reject_inline_secrets(kind, name, obj):
for key in _INLINE_SECRET_KEYS:
if key in obj:
raise ConfigError(
f"{kind} '{name}' must not contain an inline '{key}'; "
"store secrets in the keychain and reference them by id"
)
def _validate_context_service(ctx_name, svc_name, svc):
"""Validate one context service entry (auth reference only, no secrets)."""
addr = f"{ctx_name}.{svc_name}"
if not isinstance(svc, dict):
raise ConfigError(f"service '{addr}' must be a JSON object")
_require_enabled("service", addr, svc)
_reject_inline_secrets("service", addr, svc)
if "auth" in svc:
_validate_auth(addr, svc["auth"])
def _load_v2_contexts(data, path):
"""Validate a v2 contexts-shape config and return the resolvable structure."""
contexts = data.get("contexts")
if not isinstance(contexts, dict) or not contexts:
raise ConfigError(
f"{path} version 2 contexts config requires a non-empty "
"'contexts' object"
)
for ctx_name, ctx in contexts.items():
if not _PROFILE_NAME_RE.match(ctx_name or ""):
raise ConfigError(f"invalid context name {ctx_name!r}")
if not isinstance(ctx, dict):
raise ConfigError(f"context '{ctx_name}' must be a JSON object")
_require_enabled("context", ctx_name, ctx)
gitea = ctx.get("gitea")
if gitea is not None:
if not isinstance(gitea, dict):
raise ConfigError(
f"context '{ctx_name}' has a non-object 'gitea' block")
_require_enabled("service", f"{ctx_name}.gitea", gitea)
_reject_inline_secrets("service", f"{ctx_name}.gitea", gitea)
services = ctx.get("services") or {}
if not isinstance(services, dict):
raise ConfigError(
f"context '{ctx_name}' has a non-object 'services' block")
for svc_name, svc in services.items():
_validate_context_service(ctx_name, svc_name, svc)
raw_profiles = data.get("profiles")
if not isinstance(raw_profiles, dict) or not raw_profiles:
raise ConfigError(
f"{path} version 2 contexts config requires a non-empty "
"'profiles' object"
)
profiles = {}
unavailable = {}
for name, raw in raw_profiles.items():
if not is_valid_profile_name(name):
raise ConfigError(f"invalid profile name {name!r}")
if not isinstance(raw, dict):
raise ConfigError(f"profile '{name}' must be a JSON object")
enabled = _require_enabled("profile", name, raw)
_reject_inline_secrets("profile", name, raw)
_validate_identity_auth(name, raw.get("auth"))
ctx_name = raw.get("context")
if ctx_name not in contexts:
raise ConfigError(
f"profile '{name}' references unknown context {ctx_name!r}")
context = contexts[ctx_name]
allowed = raw.get("allowed_operations") or []
forbidden = raw.get("forbidden_operations") or []
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ConfigError(f"profile '{name}' operation fields must be lists")
allowed_n = {_normalize_op("gitea", op, name) for op in allowed}
forbidden_n = {_normalize_op("gitea", op, name) for op in forbidden}
# Reviewer-identity deadlock rule (#100/#103) applies here unchanged.
if allowed_n & _REVIEW_MERGE_OPS:
missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n)
if missing:
raise ConfigError(
f"profile '{name}' allows PR approve/merge but does not "
f"forbid {missing}; reviewer identities must forbid "
"gitea.pr.create and gitea.branch.push "
"(reviewer-identity deadlock rule)"
)
profile = dict(raw)
profile["allowed_operations"] = sorted(allowed_n)
profile["forbidden_operations"] = sorted(forbidden_n)
gitea = context.get("gitea") or {}
if not profile.get("base_url") and gitea.get("enabled"):
profile["base_url"] = gitea.get("base_url")
username = profile.get("username") or ""
if not enabled:
unavailable[name] = (
f"profile '{name}' is disabled (enabled: false); defined but "
"unavailable for action — refusing, no fallback"
)
elif not context.get("enabled"):
unavailable[name] = (
f"profile '{name}' belongs to context '{ctx_name}' which is "
"disabled (enabled: false); refusing, no fallback"
)
elif not profile.get("base_url"):
unavailable[name] = (
f"profile '{name}' has no usable base_url (none set and the "
f"context '{ctx_name}' gitea service is disabled or has none); "
"fail closed"
)
elif _TBD_RE.match(username):
unavailable[name] = (
f"profile '{name}' username {username!r} is a TBD placeholder; "
"provision the account before use (fail closed)"
)
else:
profiles[name] = profile
continue
# Unavailable profiles keep their (secret-free) body for audits only.
profile["_unavailable_reason"] = unavailable[name]
profiles.setdefault("_audit_only", {})
profiles["_audit_only"][name] = profile
projects = data.get("projects") or {}
if not isinstance(projects, dict):
raise ConfigError(f"{path} 'projects' must be a JSON object")
for proj_path, proj in projects.items():
if not isinstance(proj, dict):
raise ConfigError(f"project '{proj_path}' must be a JSON object")
_require_enabled("project", proj_path, proj)
if proj.get("context") not in contexts:
raise ConfigError(
f"project '{proj_path}' references unknown context "
f"{proj.get('context')!r}"
)
rules = data.get("rules") or {}
if not isinstance(rules, dict):
raise ConfigError(f"{path} 'rules' must be a JSON object")
audit_only = profiles.pop("_audit_only", {})
return {
"version": 2,
"shape": "contexts",
"profiles": profiles,
"unavailable": unavailable,
"audit_only_profiles": audit_only,
"contexts": contexts,
"projects": projects,
"rules": rules,
}
def resolve_service(config, context_name, service_name):
"""Return one context service's config for *internal* MCP use.
The returned dict includes the endpoint base_url and the keychain auth
*reference* — both are for MCP-internal resolution only and must never be
echoed into normal LLM-facing output (see audit_config/service_summaries).
Fails closed on an unknown or disabled context/service; never falls back
to another service.
"""
contexts = (config or {}).get("contexts")
if not isinstance(contexts, dict):
raise ConfigError(
"service resolution requires a version 2 contexts config")
ctx = contexts.get(context_name)
if ctx is None:
raise ConfigError(
f"unknown context '{context_name}' (fail closed, no fallback)")
if not ctx.get("enabled"):
raise ConfigError(
f"context '{context_name}' is disabled; its services are defined "
"but unavailable for action (no fallback)"
)
if service_name == "gitea":
service = ctx.get("gitea")
else:
service = (ctx.get("services") or {}).get(service_name)
if service is None:
raise ConfigError(
f"unknown service '{service_name}' in context '{context_name}' "
"(fail closed, no fallback)"
)
if not service.get("enabled"):
raise ConfigError(
f"service '{context_name}.{service_name}' is disabled; defined "
"but unavailable for action — refusing, no fallback"
)
return dict(service)
def project_for_path(config, path):
"""Map a local project *path* to its context entry, failing closed.
Returns None when the path is not configured (feature off for that repo).
Raises :class:`ConfigError` when the project or its context is disabled —
a configured-but-disabled project must never be acted on.
"""
projects = (config or {}).get("projects") or {}
project = projects.get(path)
if project is None:
return None
if not project.get("enabled"):
raise ConfigError(
f"project '{path}' is disabled (enabled: false); refusing, "
"no fallback"
)
contexts = (config or {}).get("contexts") or {}
ctx = contexts.get(project.get("context")) or {}
if not ctx.get("enabled"):
raise ConfigError(
f"project '{path}' maps to context '{project.get('context')}' "
"which is disabled; refusing, no fallback"
)
return dict(project)
def _audit_profile_entry(name, profile, enabled, reveal_endpoints):
"""One LLM-safe audit row: no endpoint URLs, no keychain ids, no tokens."""
auth = profile.get("auth") if isinstance(profile, dict) else None
entry = {
"name": name,
"enabled": enabled,
"context": profile.get("context") or profile.get("environment"),
"role": profile.get("role"),
"username": profile.get("username"),
"auth": (auth or {}).get("type") if isinstance(auth, dict) else None,
}
reason = profile.get("_unavailable_reason")
if reason:
entry["reason"] = reason
if reveal_endpoints:
entry["base_url"] = profile.get("base_url")
entry["auth_source"] = auth_source_name(profile)
return entry
def audit_config(config, reveal_endpoints=False):
"""Report enabled/disabled profiles and services without secrets.
Default output is LLM-safe: names, contexts, enabled state, capability
labels, and the auth *type* only — never endpoint URLs, keychain ids,
token values, or auth source names. ``reveal_endpoints=True`` is the
explicit admin/debug opt-in for local diagnostics: it adds base URLs and
non-secret auth source names (``keychain:<id>`` / env var name). Token
values are never included on any path.
"""
if config is None:
return {"version": None, "profiles": [], "services": []}
report = {
"version": config.get("version"),
"shape": config.get("shape") or ("environments"
if config.get("aliases") is not None
else "profiles"),
"profiles": [],
"services": [],
}
for name, profile in (config.get("profiles") or {}).items():
if not isinstance(profile, dict):
continue
report["profiles"].append(_audit_profile_entry(
name, profile, True, reveal_endpoints))
for name, profile in (config.get("audit_only_profiles") or {}).items():
report["profiles"].append(_audit_profile_entry(
name, profile, False, reveal_endpoints))
for ctx_name, ctx in (config.get("contexts") or {}).items():
ctx_enabled = bool(ctx.get("enabled"))
for svc_name, svc in (ctx.get("services") or {}).items():
entry = {
"context": ctx_name,
"name": svc_name,
"kind": svc.get("kind"),
"label": svc.get("label"),
"enabled": ctx_enabled and bool(svc.get("enabled")),
"capabilities": list(svc.get("capabilities") or []),
"auth": (svc.get("auth") or {}).get("type"),
}
if reveal_endpoints:
entry["base_url"] = svc.get("base_url")
entry["auth_source"] = auth_source_name(svc)
report["services"].append(entry)
return report
def service_summaries(config, auth_check=None):
"""Safe one-line service summaries for LLM sessions.
Each line reports label + state only (e.g. ``PRGS Jenkins: enabled,
read-only, authenticated`` / ``PRGS Sentry: disabled``) — never endpoint
URLs, keychain ids, or token values. *auth_check* is a callable taking the
service dict and returning True when its credential resolves; it defaults
to a local keychain presence check and its result is reported only as
``authenticated`` / ``no credential``.
"""
if auth_check is None:
def auth_check(service):
auth = service.get("auth") or {}
if auth.get("type") == "keychain":
return _keychain_token(auth.get("id")) is not None
if auth.get("type") == "env":
return bool(os.environ.get(auth.get("name") or ""))
return False
lines = []
for ctx_name, ctx in (config.get("contexts") or {}).items():
ctx_enabled = bool(ctx.get("enabled"))
for svc_name, svc in (ctx.get("services") or {}).items():
label = svc.get("label") or f"{ctx_name} {svc_name}"
if not (ctx_enabled and svc.get("enabled")):
lines.append(f"{label}: disabled")
continue
caps = list(svc.get("capabilities") or [])
cap_part = "read-only" if caps == ["read"] else ", ".join(caps)
auth_part = "authenticated" if auth_check(svc) else "no credential"
parts = ["enabled"] + ([cap_part] if cap_part else []) + [auth_part]
lines.append(f"{label}: " + ", ".join(parts))
return lines
def _validate_auth(name, auth): def _validate_auth(name, auth):
"""Validate a profile's optional ``auth`` reference. Never echoes secrets.""" """Validate a profile's optional ``auth`` reference. Never echoes secrets."""
if auth is None: if auth is None:
@@ -534,7 +955,7 @@ def validate_config(config):
elif version == 2: elif version == 2:
# v2 validation is all-or-nothing via the loader's invariants. # v2 validation is all-or-nothing via the loader's invariants.
try: try:
_load_v2(config, "<config>") _load_v2_any(config, "<config>")
except ConfigError as exc: except ConfigError as exc:
problems.append(str(exc)) problems.append(str(exc))
return problems return problems
@@ -691,5 +1112,20 @@ if __name__ == "__main__": # pragma: no cover - thin CLI dispatch
if len(sys.argv) > 1 and sys.argv[1] == "menu": if len(sys.argv) > 1 and sys.argv[1] == "menu":
import gitea_config_menu import gitea_config_menu
raise SystemExit(gitea_config_menu.main(sys.argv[2:])) raise SystemExit(gitea_config_menu.main(sys.argv[2:]))
print("usage: python gitea_config.py menu", file=sys.stderr) if len(sys.argv) > 1 and sys.argv[1] == "audit":
# Local admin/debug diagnostics (#120). --reveal-endpoints is the
# explicit opt-in that adds base URLs and non-secret auth source
# names; token values are never printed on any path.
try:
config = load_config(config_path() or DEFAULT_CONFIG_PATH)
report = audit_config(
config, reveal_endpoints="--reveal-endpoints" in sys.argv[2:])
report["summaries"] = service_summaries(config)
except ConfigError as exc:
print(f"config error: {exc}", file=sys.stderr)
raise SystemExit(1)
print(json.dumps(report, indent=2))
raise SystemExit(0)
print("usage: python gitea_config.py menu | audit [--reveal-endpoints]",
file=sys.stderr)
raise SystemExit(2) raise SystemExit(2)
+233 -16
View File
@@ -43,6 +43,16 @@ from gitea_auth import ( # noqa: E402
get_profile, get_profile,
) )
import gitea_audit # noqa: E402 import gitea_audit # noqa: E402
import gitea_config # noqa: E402
def _reveal_endpoints() -> bool:
"""Admin/debug opt-in (#120): include endpoint URLs and token source
names in tool output. Off by default so normal LLM-facing responses
expose only logical names and status. Never affects token values, which
are excluded on every path."""
return (os.environ.get("GITEA_MCP_REVEAL_ENDPOINTS") or "").strip().lower() \
in ("1", "true", "yes")
mcp = FastMCP("gitea-tools", instructions=( mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. " "Gitea issue tracker and PR management for dadeschools and prgs instances. "
@@ -511,14 +521,24 @@ def gitea_check_pr_eligibility(
return result return result
# Profile capability check (metadata only; not enforcement of the action). # Profile capability check (metadata only; not enforcement of the action).
# Both the action and the profile lists are normalized before comparison
# (#106), so legacy spellings ("merge") and canonical namespaced ops
# ("gitea.pr.merge") always match each other and never cross services.
allowed = profile["allowed_operations"] allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"] forbidden = profile["forbidden_operations"]
if not allowed: op_ok, op_reason = gitea_config.check_operation(action, allowed, forbidden)
reasons.append("profile has no configured allowed operations (fail closed)") if not op_ok:
if action in forbidden: if op_reason == "no-allowed-operations":
reasons.append(f"profile forbids '{action}'") reasons.append(
elif action not in allowed: "profile has no configured allowed operations (fail closed)")
reasons.append(f"profile is not allowed to {action}") elif op_reason == "forbidden":
reasons.append(f"profile forbids '{action}'")
elif op_reason == "invalid-forbidden-entry":
reasons.append(
"profile has an unrecognized forbidden operation entry "
"(fail closed)")
else:
reasons.append(f"profile is not allowed to {action}")
h, o, r = _resolve(remote, host, org, repo) h, o, r = _resolve(remote, host, org, repo)
@@ -1344,6 +1364,145 @@ def gitea_view_issue(
} }
def _issue_comment_gate(op: str) -> list[str]:
"""Profile permission check for issue-comment tools (#126).
Issue discussion comments are gated separately from the gitea.pr.*
review/merge family: listing requires ``gitea.read``, creating requires
``gitea.issue.comment``. Returns a list of block reasons (empty = allowed);
an unreadable profile fails closed.
"""
try:
profile = get_profile()
except Exception as exc:
return [f"profile could not be resolved (fail closed): {_redact(str(exc))}"]
op_ok, op_reason = gitea_config.check_operation(
op, profile["allowed_operations"], profile["forbidden_operations"])
if op_ok:
return []
if op_reason == "no-allowed-operations":
return ["profile has no configured allowed operations (fail closed)"]
if op_reason == "forbidden":
return [f"profile forbids '{op}'"]
if op_reason == "invalid-forbidden-entry":
return ["profile has an unrecognized forbidden operation entry (fail closed)"]
return [f"profile is not allowed to {op}"]
@mcp.tool()
def gitea_list_issue_comments(
issue_number: int,
limit: int = 50,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""List discussion comments on a Gitea issue.
Read-only. Issue discussion comments are distinct from PR reviews: this
reads the issue comment thread and never touches review endpoints. The
profile must allow ``gitea.read`` (fail closed otherwise).
Normal output is LLM-safe: no endpoint URLs. Set
GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include each
comment's web link.
Args:
issue_number: The issue number whose comments to list (required).
limit: Max number of comments to return (default: 50).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success', 'issue_number', and 'comments' (each with 'id',
'author', 'created_at', 'updated_at', 'body'); on a permission block,
'success' False and 'reasons' with no API call made.
"""
reasons = _issue_comment_gate("gitea.read")
if reasons:
return {"success": False, "issue_number": issue_number,
"reasons": reasons}
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments"
comments = api_request("GET", api, auth) or []
reveal = _reveal_endpoints()
out = []
for c in comments[:limit]:
entry = {
"id": c["id"],
"author": (c.get("user") or {}).get("login", ""),
"created_at": c.get("created_at"),
"updated_at": c.get("updated_at"),
"body": c.get("body", ""),
}
if reveal:
entry["url"] = c.get("html_url")
out.append(entry)
return {"success": True, "issue_number": issue_number, "comments": out}
@mcp.tool()
def gitea_create_issue_comment(
issue_number: int,
body: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Post a markdown comment to a Gitea issue's discussion thread.
Issue discussion comments are distinct from PR reviews: this posts to the
issue comment thread only and never submits review verdicts. The profile
must allow ``gitea.issue.comment`` — gated separately from the gitea.pr.*
review/merge operations (fail closed otherwise). The target issue is
always explicit; there is no inference beyond the standard remote
defaults used by every tool.
Normal output is LLM-safe: comment id + issue number, no endpoint URLs.
Set GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include the
comment's web link. Errors are redacted before being raised.
Args:
issue_number: The issue number to comment on (required).
body: Markdown comment body (required, non-empty).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success', 'comment_id', and 'issue_number' ('url' only
with the reveal opt-in); on a permission block or empty body,
'success'/'performed' False and 'reasons' with no API call made.
"""
reasons = _issue_comment_gate("gitea.issue.comment")
if not (body or "").strip():
reasons.append("comment body must be a non-empty string")
if reasons:
return {"success": False, "performed": False,
"issue_number": issue_number, "reasons": reasons}
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments"
try:
with _audited("create_issue_comment", host=h, remote=remote, org=o,
repo=r, issue_number=issue_number,
request_metadata={"body_chars": len(body)}):
data = api_request("POST", api, auth, {"body": body})
except Exception as exc:
raise RuntimeError(_redact(str(exc))) from None
result = {"success": True, "performed": True,
"comment_id": data["id"], "issue_number": issue_number}
if _reveal_endpoints():
result["url"] = data.get("html_url")
return result
@mcp.tool() @mcp.tool()
def gitea_whoami( def gitea_whoami(
remote: str = "dadeschools", remote: str = "dadeschools",
@@ -1382,21 +1541,35 @@ def gitea_whoami(
"Verify the configured token is valid for this instance." "Verify the configured token is valid for this instance."
) )
# Runtime profile metadata is non-secret (name + allowed op categories). # Runtime profile metadata is non-secret (name + allowed op categories).
# The token is resolved separately and is never included here. # The token is resolved separately and is never included here. Endpoint
# URLs stay out of normal LLM-facing output (#120): the logical remote
# name is the addressing surface; 'server' appears only under the
# GITEA_MCP_REVEAL_ENDPOINTS admin opt-in.
profile = get_profile() profile = get_profile()
return { result = {
"authenticated": True, "authenticated": True,
"username": data.get("login"), "username": data.get("login"),
"display_name": data.get("full_name") or None, "display_name": data.get("full_name") or None,
"user_id": data.get("id"), "user_id": data.get("id"),
"email": data.get("email") or None, "email": data.get("email") or None,
"server": f"https://{h}",
"remote": remote, "remote": remote,
"profile": { "profile": {
"profile_name": profile["profile_name"], "profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"], "allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"],
"environment": profile.get("environment"),
"service": profile.get("service"),
"identity": profile.get("identity"),
"role": profile.get("role"),
"profile_address": profile.get("profile_path"),
"execution_profile": profile.get("execution_profile"),
"audit_label": profile.get("audit_label"),
"auth_source_type": profile.get("auth_source_type"),
}, },
} }
if _reveal_endpoints():
result["server"] = f"https://{h}"
return result
@@ -1427,9 +1600,11 @@ def gitea_get_profile(
Read-only. Reports the non-secret configuration of the running MCP Read-only. Reports the non-secret configuration of the running MCP
process (profile name, allowed/forbidden operation categories, audit process (profile name, allowed/forbidden operation categories, audit
label, token *source name*, base URL) plus the resolved server for the label, auth *status*). Endpoint URLs and token source names are hidden
given remote. Optionally resolves the authenticated username via from normal output (#120) and appear only under the
``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as. GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Optionally resolves the
authenticated username via ``gitea_whoami``'s endpoint so an LLM can see
who this runtime acts as.
This tool never mutates Gitea and never approves, merges, comments, or This tool never mutates Gitea and never approves, merges, comments, or
creates anything. It never returns the token value, Authorization header, creates anything. It never returns the token value, Authorization header,
@@ -1447,18 +1622,32 @@ def gitea_get_profile(
'verified', 'unknown', 'unavailable', or 'not_resolved'. 'verified', 'unknown', 'unavailable', or 'not_resolved'.
""" """
profile = get_profile() profile = get_profile()
reveal = _reveal_endpoints()
result = { result = {
"profile_name": profile["profile_name"], "profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"], "allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"], "forbidden_operations": profile["forbidden_operations"],
"audit_label": profile["audit_label"], "audit_label": profile["audit_label"],
"token_source_name": profile["token_source_name"], "environment": profile.get("environment"),
"base_url": profile["base_url"], "service": profile.get("service"),
"identity": profile.get("identity"),
"role": profile.get("role"),
"profile_address": profile.get("profile_path"),
"execution_profile": profile.get("execution_profile"),
"auth_source_type": profile.get("auth_source_type"),
# Auth is reported as a status only (#120): the token source *name*
# (env var name / keychain id) joins endpoint URLs behind the
# GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Token values never appear.
"auth_status": ("configured" if profile["token_source_name"]
else "unconfigured"),
"remote": remote if remote in REMOTES else None, "remote": remote if remote in REMOTES else None,
"server": None,
"authenticated_username": None, "authenticated_username": None,
"identity_status": "not_resolved", "identity_status": "not_resolved",
} }
if reveal:
result["token_source_name"] = profile["token_source_name"]
result["base_url"] = profile["base_url"]
result["server"] = None
if remote not in REMOTES: if remote not in REMOTES:
# Mark ambiguity rather than raising: the tool stays inspectable. # Mark ambiguity rather than raising: the tool stays inspectable.
@@ -1467,7 +1656,8 @@ def gitea_get_profile(
return result return result
h = host or REMOTES[remote]["host"] h = host or REMOTES[remote]["host"]
result["server"] = f"https://{h}" if reveal:
result["server"] = f"https://{h}"
if resolve_identity: if resolve_identity:
try: try:
@@ -1487,6 +1677,33 @@ def gitea_get_profile(
return result return result
@mcp.tool()
def gitea_audit_config() -> dict:
"""Audit the configured profiles/services: enabled state, no secrets.
Read-only and local-only: loads the canonical profiles.json named by
GITEA_MCP_CONFIG and reports profile/service names, contexts, enabled
state, capabilities, auth *status*, and one-line service summaries (e.g.
``PRGS Jenkins: enabled, read-only, authenticated``). Disabled entries
are listed so they can be audited, but the server refuses to act with
them and never falls back to another profile or service.
Never includes endpoint URLs, keychain ids, token source names, or token
values. Endpoint-revealing diagnostics exist only in the local admin CLI
(``python3 gitea_config.py audit --reveal-endpoints``), never over MCP.
"""
config = gitea_config.load_config()
if config is None:
return {
"configured": False,
"message": "No GITEA_MCP_CONFIG configured; env-only mode.",
}
report = gitea_config.audit_config(config)
report["configured"] = True
report["summaries"] = gitea_config.service_summaries(config)
return report
@mcp.tool() @mcp.tool()
def gitea_mark_issue( def gitea_mark_issue(
issue_number: int, issue_number: int,
+284
View File
@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""Migration helper to convert profiles.json from version 1 to version 2 environments shape.
This script preserves existing keychain references (auth.id) and maps old profile
names as aliases so that existing IDE configurations continue to function.
"""
import os
import sys
import json
import argparse
import shutil
import tempfile
# Resolve path to import gitea_config
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import gitea_config
AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"]
AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"]
REVIEWER_DEFAULT_ALLOWED = [
"read", "review", "comment", "approve", "request_changes", "merge"
]
REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"]
def infer_role(name, execution_profile):
"""Return the unambiguous role for a legacy profile name, or None."""
haystack = f"{name} {execution_profile or ''}".lower()
has_author = "author" in haystack
has_reviewer = "reviewer" in haystack
if has_author == has_reviewer:
return None
return "reviewer" if has_reviewer else "author"
def migration_summary(v2_data):
"""Return a redacted summary of the migrated config."""
environments = v2_data.get("environments", {})
service_count = 0
identity_count = 0
for env in environments.values():
services = env.get("services", {})
service_count += len(services)
for service in services.values():
identity_count += len(service.get("identities", {}))
return {
"version": v2_data.get("version"),
"environments": len(environments),
"services": service_count,
"identities": identity_count,
"aliases": len(v2_data.get("aliases", {})),
}
def migrate_v1_to_v2(v1_data):
"""Convert version 1 profiles.json format to version 2 environments format."""
environments = {}
aliases = {}
profiles = v1_data.get("profiles", {})
if not isinstance(profiles, dict):
raise ValueError("Malformed input: 'profiles' field must be a JSON object")
for name, prof in profiles.items():
if not isinstance(prof, dict):
raise ValueError(f"Malformed input: profile '{name}' must be a JSON object")
# Infer environment and identity name
if "-" in name:
parts = name.split("-", 1)
env_name = parts[0]
ident_name = parts[1]
else:
env_name = name
ident_name = "author"
# Determine role and identity based on name / execution_profile.
# Ambiguous profiles may still migrate only when they carry explicit
# permissions; otherwise role-based defaults could widen permissions.
exec_prof = prof.get("execution_profile") or ""
role = infer_role(name, exec_prof)
if role == "reviewer":
ident_name = "reviewer"
elif role == "author":
ident_name = "author"
else:
role = prof.get("role")
if role not in (None, "author", "reviewer"):
raise ValueError(
f"Profile '{name}' has unsupported role {role!r}"
)
# Construct identity block
identity_data = {
"username": prof.get("username"),
"auth": prof.get("auth"),
}
if role:
identity_data["role"] = role
if prof.get("execution_profile"):
identity_data["execution_profile"] = prof["execution_profile"]
# Set audit label (default to old name to preserve context)
identity_data["audit_label"] = prof.get("audit_label") or name
has_allowed = "allowed_operations" in prof
has_forbidden = "forbidden_operations" in prof
if has_allowed != has_forbidden:
raise ValueError(
f"Profile '{name}' must define both allowed_operations and "
"forbidden_operations, or neither (fail closed)"
)
if has_allowed:
allowed = prof.get("allowed_operations")
forbidden = prof.get("forbidden_operations")
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ValueError(
f"Profile '{name}' operation fields must be lists"
)
identity_data["allowed_operations"] = list(allowed)
identity_data["forbidden_operations"] = list(forbidden)
elif role == "author":
identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN)
elif role == "reviewer":
identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN)
else:
raise ValueError(
f"Profile '{name}' has no explicit operation lists and no "
"unambiguous author/reviewer role marker (fail closed)"
)
# Nest inside environments/services structure
env = environments.setdefault(env_name, {})
services = env.setdefault("services", {})
gitea_svc = services.setdefault("gitea", {})
# Copy service-level attributes
if prof.get("base_url"):
gitea_svc["base_url"] = prof["base_url"]
if prof.get("default_owner"):
gitea_svc["default_owner"] = prof["default_owner"]
if prof.get("default_repo"):
gitea_svc["default_repo"] = prof["default_repo"]
identities = gitea_svc.setdefault("identities", {})
identities[ident_name] = identity_data
# Alias resolution targets
alias_target = f"{env_name}.gitea.{ident_name}"
if name != alias_target:
aliases[name] = alias_target
# Extra convenience alias for standard old-profile compatibility (e.g. prgs-author)
convenience_alias = f"{env_name}-{ident_name}"
if convenience_alias != alias_target and convenience_alias not in aliases:
aliases[convenience_alias] = alias_target
v2_data = {
"version": 2,
"environments": environments,
"aliases": aliases
}
return v2_data
def validate_v2_data(v2_data):
"""Validate generated v2 structure using gitea_config parser."""
fd, temp_path = tempfile.mkstemp(suffix=".json")
os.close(fd)
try:
with open(temp_path, "w") as f:
json.dump(v2_data, f)
# Attempt to load using load_config to run all validation rules
gitea_config.load_config(temp_path)
return True
except Exception as e:
raise ValueError(f"Generated v2 config failed validation: {e}")
finally:
try:
os.remove(temp_path)
except OSError:
pass
def main():
parser = argparse.ArgumentParser(
description="Migrate profiles.json from version 1 to version 2 environments shape."
)
parser.add_argument(
"-i", "--input",
default=gitea_config.DEFAULT_CONFIG_PATH,
help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)"
)
parser.add_argument(
"-o", "--output",
help="Path to write the migrated version 2 profiles.json file (default: overwrite input)"
)
parser.add_argument(
"-w", "--write",
action="store_true",
help="Actually write the migrated config and create a backup (default is dry-run)"
)
parser.add_argument(
"--backup",
help="Path to write the backup file (default: <input_path>.bak)"
)
args = parser.parse_args()
input_path = os.path.abspath(args.input)
output_path = os.path.abspath(args.output or input_path)
backup_path = args.backup or f"{input_path}.bak"
if not os.path.isfile(input_path):
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
sys.exit(1)
try:
with open(input_path, "r") as f:
v1_data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error reading input file: {e}", file=sys.stderr)
sys.exit(1)
# Validate version
version = v1_data.get("version")
if version is not None and version != 1:
print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr)
sys.exit(1)
try:
v2_data = migrate_v1_to_v2(v1_data)
validate_v2_data(v2_data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not args.write:
print("=== DRY-RUN MODE (No files modified) ===")
print("Generated v2 config validated successfully.")
print("Only aggregate counts are shown.")
summary = migration_summary(v2_data)
print("Summary:")
print(f" version: {summary['version']}")
print(f" environments: {summary['environments']}")
print(f" services: {summary['services']}")
print(f" identities: {summary['identities']}")
print(f" aliases: {summary['aliases']}")
sys.exit(0)
# Write Mode: Create Backup first
try:
print(f"Creating backup: {backup_path}")
shutil.copy2(input_path, backup_path)
except Exception as e:
print(f"Error creating backup: {e}", file=sys.stderr)
sys.exit(1)
# Write migrated config
try:
print(f"Writing migrated version 2 config: {output_path}")
# Ensure target directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w") as f:
json.dump(v2_data, f, indent=2)
f.write("\n")
print("Migration completed successfully!")
except Exception as e:
print(f"Error writing output file: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+5 -3
View File
@@ -284,11 +284,13 @@ class TestAuthIntegration(_ConfigBase):
self.assertEqual(header, "token process-token") self.assertEqual(header, "token process-token")
def test_auth_header_unresolvable_ref_fails_closed(self): def test_auth_header_unresolvable_ref_fails_closed(self):
# env token ref points at an unset var -> ConfigError inside resolve is # env token ref points at an unset var -> with GITEA_MCP_CONFIG set the
# swallowed to "no token"; auth falls through to (mocked-empty) basic. # ConfigError propagates (fail closed, #120): no silent fallback to
# Basic auth or another credential source.
with patch.dict(os.environ, self._env("mdcps-env"), clear=True): with patch.dict(os.environ, self._env("mdcps-env"), clear=True):
with patch("gitea_auth.get_credentials", return_value=("", "")): with patch("gitea_auth.get_credentials", return_value=("", "")):
self.assertIsNone(gitea_auth.get_auth_header("gitea.example.com")) with self.assertRaises(gitea_config.ConfigError):
gitea_auth.get_auth_header("gitea.example.com")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+446
View File
@@ -0,0 +1,446 @@
"""Tests for profiles.json version 2 *contexts* shape (#120).
The canonical machine config uses ``contexts`` / ``profiles`` / ``projects`` /
``rules`` with explicit ``enabled`` flags. Covers: loading + active-profile
resolution via GITEA_MCP_PROFILE, fail-closed refusal of disabled profiles /
contexts / services / projects, project-to-context mapping, base-URL fallback
from the context's gitea block, keychain-only auth references, LLM-safe audit
output (no endpoint URLs, no keychain ids, no tokens) with an explicit
admin/debug opt-in, v1 compatibility, and the no-silent-fallback rule in
gitea_auth.get_auth_header. No network, no real secrets.
"""
import json
import os
import sys
import tempfile
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_config # noqa: E402
import gitea_auth # noqa: E402
FAKE_TOKEN = "fake-token-for-tests" # not a real credential
def contexts_config():
"""A fresh, valid v2 contexts-shape config with enabled/disabled entries."""
return {
"version": 2,
"contexts": {
"prgs": {
"enabled": True,
"label": "Local / PRGS",
"default_owner": "Scaled-Tech-Consulting",
"gitea": {
"enabled": True,
"kind": "gitea",
"base_url": "https://gitea.prgs.cc",
},
"services": {
"jenkins": {
"enabled": True,
"kind": "jenkins",
"label": "PRGS Jenkins",
"base_url": "https://jenkins.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-jenkins-token"},
"capabilities": ["read"],
},
"sentry": {
"enabled": False,
"kind": "sentry",
"label": "PRGS Sentry",
"base_url": "",
"auth": {"type": "keychain", "id": "prgs-sentry-token"},
"capabilities": ["read"],
},
},
},
"lab": {
"enabled": False,
"gitea": {"enabled": False, "kind": "gitea", "base_url": ""},
"services": {
"jenkins": {
"enabled": False,
"kind": "jenkins",
"label": "Lab Jenkins",
"base_url": "http://localhost:8080",
"auth": {"type": "keychain", "id": "lab-jenkins-token"},
"capabilities": ["read"],
},
},
},
},
"profiles": {
"prgs-author": {
"enabled": True,
"context": "prgs",
"role": "author",
"username": "jcwalker3",
"execution_profile": "prgs-author",
"audit_label": "prgs-author",
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-gitea-author-token"},
"allowed_operations": [
"read", "branch", "commit", "push", "open_pr", "comment",
],
"forbidden_operations": [
"approve", "request_changes", "merge",
],
},
"prgs-reviewer": {
"enabled": True,
"context": "prgs",
"role": "reviewer",
"username": "sysadmin",
"execution_profile": "prgs-reviewer",
"audit_label": "prgs-reviewer",
# no base_url on purpose: must fall back to context gitea
"auth": {"type": "keychain", "id": "prgs-gitea-reviewer-token"},
"allowed_operations": [
"read", "review", "comment", "approve",
"request_changes", "merge",
],
"forbidden_operations": [
"branch", "commit", "push", "open_pr",
],
},
"retired-author": {
"enabled": False,
"context": "prgs",
"role": "author",
"username": "jcwalker3",
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "retired-token-ref"},
"allowed_operations": ["read"],
"forbidden_operations": [],
},
"lab-author": {
"enabled": True,
"context": "lab",
"role": "author",
"username": "jcwalker3",
"base_url": "http://localhost:3000",
"auth": {"type": "keychain", "id": "lab-gitea-author-token"},
"allowed_operations": ["read"],
"forbidden_operations": [],
},
},
"projects": {
"/repo/one": {
"enabled": True,
"context": "prgs",
"default_owner": "Scaled-Tech-Consulting",
"default_repo": "One",
"default_author_profile": "prgs-author",
"default_reviewer_profile": "prgs-reviewer",
},
"/repo/lab": {
"enabled": False,
"context": "lab",
},
},
"rules": {
"disabled_behavior": "report in audits, never act",
"no_silent_fallback": True,
"tokens_in_json": False,
"token_storage": "keychain",
"hide_service_urls_from_llm": True,
"hide_keychain_ids_from_llm": True,
"mcp_resolves_endpoints": True,
},
}
def write_config(data):
"""Write *data* to a temp JSON file and return its path."""
fd, path = tempfile.mkstemp(suffix=".json")
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh)
return path
def load(data):
"""Load *data* through gitea_config via a temp file, then clean up."""
path = write_config(data)
try:
return gitea_config.load_config(path)
finally:
os.unlink(path)
class LoadContextsShapeTests(unittest.TestCase):
def test_contexts_shape_loads(self):
config = load(contexts_config())
self.assertEqual(config["version"], 2)
self.assertIn("prgs-author", config["profiles"])
self.assertIn("prgs-reviewer", config["profiles"])
def test_active_profile_resolved_from_env(self):
path = write_config(contexts_config())
try:
with patch.dict(os.environ, {
gitea_config.ENV_CONFIG_PATH: path,
gitea_config.ENV_PROFILE: "prgs-author",
}):
profile = gitea_config.resolve_profile()
finally:
os.unlink(path)
self.assertEqual(profile["username"], "jcwalker3")
self.assertEqual(profile["base_url"], "https://gitea.prgs.cc")
self.assertEqual(profile["context"], "prgs")
def test_base_url_falls_back_to_context_gitea(self):
profile = gitea_config.select_profile(load(contexts_config()),
"prgs-reviewer")
self.assertEqual(profile["base_url"], "https://gitea.prgs.cc")
def test_profile_without_any_base_url_is_refused(self):
data = contexts_config()
del data["profiles"]["prgs-author"]["base_url"]
data["contexts"]["prgs"]["gitea"]["enabled"] = False
config = load(data)
with self.assertRaises(gitea_config.ConfigError):
gitea_config.select_profile(config, "prgs-author")
def test_v1_config_still_loads(self):
config = load({
"version": 1,
"profiles": {
"prgs": {
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-gitea-token"},
},
},
})
profile = gitea_config.select_profile(config, "prgs")
self.assertEqual(profile["base_url"], "https://gitea.prgs.cc")
def test_mixed_contexts_and_environments_rejected(self):
data = contexts_config()
data["environments"] = {"x": {"services": {}}}
with self.assertRaises(gitea_config.ConfigError):
load(data)
def test_missing_enabled_flag_is_refused(self):
data = contexts_config()
del data["profiles"]["prgs-author"]["enabled"]
with self.assertRaises(gitea_config.ConfigError) as ctx:
load(data)
self.assertIn("enabled", str(ctx.exception))
class DisabledRefusalTests(unittest.TestCase):
def setUp(self):
self.config = load(contexts_config())
def test_disabled_profile_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.select_profile(self.config, "retired-author")
self.assertIn("disabled", str(ctx.exception))
def test_profile_in_disabled_context_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.select_profile(self.config, "lab-author")
self.assertIn("disabled", str(ctx.exception))
def test_enabled_profile_still_selectable(self):
profile = gitea_config.select_profile(self.config, "prgs-author")
self.assertEqual(profile["context"], "prgs")
def test_disabled_service_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.resolve_service(self.config, "prgs", "sentry")
self.assertIn("disabled", str(ctx.exception))
def test_enabled_service_resolves_internally_with_auth_reference(self):
# Internal resolution keeps the URL + auth reference for MCP's own use;
# they must never appear in LLM-facing (audit/summary) output.
service = gitea_config.resolve_service(self.config, "prgs", "jenkins")
self.assertEqual(service["base_url"], "https://jenkins.prgs.cc")
self.assertEqual(service["auth"], {"type": "keychain",
"id": "prgs-jenkins-token"})
self.assertNotIn("token", service)
def test_service_in_disabled_context_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.resolve_service(self.config, "lab", "jenkins")
self.assertIn("disabled", str(ctx.exception))
def test_unknown_service_fails_closed(self):
with self.assertRaises(gitea_config.ConfigError):
gitea_config.resolve_service(self.config, "prgs", "nope")
class ProjectMappingTests(unittest.TestCase):
def setUp(self):
self.config = load(contexts_config())
def test_project_maps_to_context(self):
project = gitea_config.project_for_path(self.config, "/repo/one")
self.assertEqual(project["context"], "prgs")
self.assertEqual(project["default_reviewer_profile"], "prgs-reviewer")
def test_unknown_project_returns_none(self):
self.assertIsNone(
gitea_config.project_for_path(self.config, "/repo/unknown"))
def test_disabled_project_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.project_for_path(self.config, "/repo/lab")
self.assertIn("disabled", str(ctx.exception))
class SecretHandlingTests(unittest.TestCase):
def test_inline_profile_token_rejected(self):
data = contexts_config()
data["profiles"]["prgs-author"]["token"] = FAKE_TOKEN
with self.assertRaises(gitea_config.ConfigError) as ctx:
load(data)
self.assertNotIn(FAKE_TOKEN, str(ctx.exception))
def test_inline_service_token_rejected(self):
data = contexts_config()
data["contexts"]["prgs"]["services"]["jenkins"]["token"] = FAKE_TOKEN
with self.assertRaises(gitea_config.ConfigError) as ctx:
load(data)
self.assertNotIn(FAKE_TOKEN, str(ctx.exception))
def test_selected_profile_resolves_token_via_keychain(self):
profile = gitea_config.select_profile(load(contexts_config()),
"prgs-author")
token = gitea_config.resolve_token(
profile, keychain_lookup=lambda item_id: FAKE_TOKEN
if item_id == "prgs-gitea-author-token" else None)
self.assertEqual(token, FAKE_TOKEN)
class AuditTests(unittest.TestCase):
"""LLM-facing audit output: enabled/disabled state only — no endpoint
URLs, no keychain ids, no token values. Admin opt-in reveals endpoints
and auth source names (never token values)."""
def setUp(self):
self.config = load(contexts_config())
def test_audit_reports_enabled_and_disabled(self):
report = gitea_config.audit_config(self.config)
profiles = {p["name"]: p for p in report["profiles"]}
self.assertTrue(profiles["prgs-author"]["enabled"])
self.assertFalse(profiles["retired-author"]["enabled"])
services = {(s["context"], s["name"]): s for s in report["services"]}
self.assertTrue(services[("prgs", "jenkins")]["enabled"])
self.assertFalse(services[("prgs", "sentry")]["enabled"])
self.assertFalse(services[("lab", "jenkins")]["enabled"])
def test_audit_hides_urls_keychain_ids_and_tokens_by_default(self):
rendered = json.dumps(gitea_config.audit_config(self.config))
for leaked in ("https://", "http://", "prgs-gitea-author-token",
"prgs-jenkins-token", "base_url", FAKE_TOKEN):
self.assertNotIn(leaked, rendered)
# Auth is reported as a status, not a reference.
report = gitea_config.audit_config(self.config)
profiles = {p["name"]: p for p in report["profiles"]}
self.assertEqual(profiles["prgs-author"]["auth"], "keychain")
def test_audit_admin_optin_reveals_endpoints_but_never_tokens(self):
report = gitea_config.audit_config(self.config, reveal_endpoints=True)
rendered = json.dumps(report)
self.assertIn("https://jenkins.prgs.cc", rendered)
self.assertIn("keychain:prgs-gitea-author-token", rendered)
self.assertNotIn(FAKE_TOKEN, rendered)
def test_audit_works_for_v1_config(self):
report = gitea_config.audit_config({
"version": 1,
"profiles": {
"prgs": {
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-gitea-token"},
},
},
})
profiles = {p["name"]: p for p in report["profiles"]}
self.assertTrue(profiles["prgs"]["enabled"])
self.assertEqual(profiles["prgs"]["auth"], "keychain")
self.assertNotIn("https://", json.dumps(report))
class ServiceSummaryTests(unittest.TestCase):
"""Safe one-line summaries for LLM sessions: label + state only."""
def setUp(self):
self.config = load(contexts_config())
def test_summaries_show_state_without_urls_or_ids(self):
lines = gitea_config.service_summaries(
self.config, auth_check=lambda service: True)
text = "\n".join(lines)
self.assertIn("PRGS Jenkins: enabled, read-only, authenticated", text)
self.assertIn("PRGS Sentry: disabled", text)
self.assertIn("Lab Jenkins: disabled", text)
for leaked in ("https://", "http://", "keychain",
"prgs-jenkins-token"):
self.assertNotIn(leaked, text)
def test_summary_reports_missing_auth_without_secrets(self):
lines = gitea_config.service_summaries(
self.config, auth_check=lambda service: False)
text = "\n".join(lines)
self.assertIn("PRGS Jenkins: enabled, read-only, no credential", text)
class NoSilentFallbackTests(unittest.TestCase):
def test_broken_config_fails_auth_instead_of_falling_back(self):
"""With GITEA_MCP_CONFIG set but unloadable, auth must fail closed."""
path = write_config({"version": 2}) # invalid: no contexts/environments
env = {
gitea_config.ENV_CONFIG_PATH: path,
gitea_config.ENV_PROFILE: "prgs-author",
}
try:
with patch.dict(os.environ, env, clear=False), \
patch.object(gitea_auth, "get_credentials",
return_value=(None, None)):
for var in ("GITEA_TOKEN", "GITEA_TOKEN_PRGS",
"GITEA_TOKEN_DADESCHOOLS"):
os.environ.pop(var, None)
with self.assertRaises(gitea_config.ConfigError):
gitea_auth.get_auth_header("https://gitea.prgs.cc")
finally:
os.unlink(path)
def test_env_only_users_unaffected(self):
"""Without GITEA_MCP_CONFIG, a missing token still degrades quietly."""
env = dict(os.environ)
env.pop(gitea_config.ENV_CONFIG_PATH, None)
with patch.dict(os.environ, env, clear=True), \
patch.object(gitea_auth, "get_credentials",
return_value=(None, None)):
for var in ("GITEA_TOKEN", "GITEA_TOKEN_PRGS",
"GITEA_TOKEN_DADESCHOOLS"):
os.environ.pop(var, None)
self.assertIsNone(
gitea_auth.get_auth_header("https://gitea.prgs.cc"))
class ValidateConfigTests(unittest.TestCase):
def test_valid_contexts_config_has_no_problems(self):
self.assertEqual(gitea_config.validate_config(contexts_config()), [])
def test_repo_example_file_validates(self):
example = __import__("pathlib").Path(__file__).resolve().parent.parent \
/ "gitea-mcp.v2-contexts.example.json"
with open(example, encoding="utf-8") as fh:
self.assertEqual(gitea_config.validate_config(json.load(fh)), [])
def test_broken_contexts_config_reports_problems(self):
data = contexts_config()
data["profiles"]["prgs-author"]["context"] = "nope"
problems = gitea_config.validate_config(data)
self.assertTrue(problems)
if __name__ == "__main__":
unittest.main()
+435 -3
View File
@@ -3,6 +3,7 @@
Each tool is tested by calling the underlying function directly (not through Each tool is tested by calling the underlying function directly (not through
the MCP protocol) with mocked API responses. the MCP protocol) with mocked API responses.
""" """
import json
import os import os
import sys import sys
import unittest import unittest
@@ -30,6 +31,8 @@ from mcp_server import ( # noqa: E402
gitea_get_profile, gitea_get_profile,
gitea_check_pr_eligibility, gitea_check_pr_eligibility,
gitea_submit_pr_review, gitea_submit_pr_review,
gitea_list_issue_comments,
gitea_create_issue_comment,
) )
from gitea_auth import get_profile # noqa: E402 from gitea_auth import get_profile # noqa: E402
@@ -880,7 +883,9 @@ class TestWhoami(unittest.TestCase):
self.assertEqual(result["username"], "reviewer-bot") self.assertEqual(result["username"], "reviewer-bot")
self.assertEqual(result["display_name"], "Reviewer Bot") self.assertEqual(result["display_name"], "Reviewer Bot")
self.assertEqual(result["user_id"], 42) self.assertEqual(result["user_id"], 42)
self.assertEqual(result["server"], "https://gitea.prgs.cc") # Endpoint URLs are hidden from normal LLM-facing output (#120);
# the logical remote name is the addressing surface.
self.assertNotIn("server", result)
self.assertEqual(result["remote"], "prgs") self.assertEqual(result["remote"], "prgs")
# Read-only: GET against the authenticated-user endpoint. # Read-only: GET against the authenticated-user endpoint.
call_args = mock_api.call_args call_args = mock_api.call_args
@@ -992,6 +997,65 @@ class TestRuntimeProfile(unittest.TestCase):
for secret in ("super-secret-token", "token", "authorization", "basic "): for secret in ("super-secret-token", "token", "authorization", "basic "):
self.assertNotIn(secret, blob) self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_v2_metadata(self, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
env = {
"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
"GITEA_FORBIDDEN_OPERATIONS": "merge",
"GITEA_AUDIT_LABEL": "reviewer-runtime",
"GITEA_TOKEN_SOURCE": "keychain:prgs-reviewer-token",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_whoami(remote="prgs")
profile = result["profile"]
self.assertEqual(profile["environment"], None)
self.assertEqual(profile["service"], None)
self.assertEqual(profile["identity"], None)
self.assertEqual(profile["role"], None)
self.assertEqual(profile["profile_address"], None)
self.assertEqual(profile["execution_profile"], None)
self.assertEqual(profile["audit_label"], "reviewer-runtime")
self.assertEqual(profile["auth_source_type"], "keychain")
self.assertEqual(profile["forbidden_operations"], ["merge"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
@patch("mcp_server.get_profile")
def test_whoami_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
mock_get_profile.return_value = {
"profile_name": "prgs.gitea.reviewer",
"allowed_operations": ["read", "review"],
"forbidden_operations": ["merge"],
"audit_label": "rev-audit",
"token_source_name": "keychain:prgs-reviewer-token",
"auth_source_type": "keychain",
"base_url": "https://gitea.prgs.cc",
"username": "sysadmin",
"default_owner": "Scaled-Tech-Consulting",
"profile_path": "prgs.gitea.reviewer",
"environment": "prgs",
"service": "gitea",
"identity": "reviewer",
"role": "reviewer",
"execution_profile": "reviewer-profile",
}
result = gitea_whoami(remote="prgs")
profile = result["profile"]
self.assertEqual(profile["environment"], "prgs")
self.assertEqual(profile["service"], "gitea")
self.assertEqual(profile["identity"], "reviewer")
self.assertEqual(profile["role"], "reviewer")
self.assertEqual(profile["profile_address"], "prgs.gitea.reviewer")
self.assertEqual(profile["execution_profile"], "reviewer-profile")
self.assertEqual(profile["audit_label"], "rev-audit")
self.assertEqual(profile["auth_source_type"], "keychain")
self.assertEqual(profile["forbidden_operations"], ["merge"])
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Profile discovery (read-only) — issue #13 # Profile discovery (read-only) — issue #13
@@ -1035,8 +1099,12 @@ class TestProfileDiscovery(unittest.TestCase):
self.assertEqual(result["allowed_operations"], ["read", "review", "approve"]) self.assertEqual(result["allowed_operations"], ["read", "review", "approve"])
self.assertEqual(result["authenticated_username"], "reviewer-bot") self.assertEqual(result["authenticated_username"], "reviewer-bot")
self.assertEqual(result["identity_status"], "verified") self.assertEqual(result["identity_status"], "verified")
self.assertEqual(result["server"], "https://gitea.prgs.cc") # Endpoint URLs and token source names are hidden from normal
self.assertEqual(result["token_source_name"], "GITEA_TOKEN") # LLM-facing output (#120); auth is reported as a status only.
self.assertNotIn("server", result)
self.assertNotIn("base_url", result)
self.assertNotIn("token_source_name", result)
self.assertEqual(result["auth_status"], "configured")
# Read-only: only a GET to the user endpoint was issued. # Read-only: only a GET to the user endpoint was issued.
self.assertEqual(mock_api.call_args[0][0], "GET") self.assertEqual(mock_api.call_args[0][0], "GET")
self.assertTrue(mock_api.call_args[0][1].endswith("/api/v1/user")) self.assertTrue(mock_api.call_args[0][1].endswith("/api/v1/user"))
@@ -1075,6 +1143,39 @@ class TestProfileDiscovery(unittest.TestCase):
self.assertIsNone(result["remote"]) self.assertIsNone(result["remote"])
self.assertIn("remote_error", result) self.assertIn("remote_error", result)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
@patch("mcp_server.get_profile")
def test_get_profile_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
mock_get_profile.return_value = {
"profile_name": "prgs.gitea.reviewer",
"allowed_operations": ["read", "review"],
"forbidden_operations": ["merge"],
"audit_label": "rev-audit",
"token_source_name": "keychain:prgs-reviewer-token",
"auth_source_type": "keychain",
"base_url": "https://gitea.prgs.cc",
"username": "sysadmin",
"default_owner": "Scaled-Tech-Consulting",
"profile_path": "prgs.gitea.reviewer",
"environment": "prgs",
"service": "gitea",
"identity": "reviewer",
"role": "reviewer",
"execution_profile": "reviewer-profile",
}
result = gitea_get_profile(remote="prgs")
self.assertEqual(result["environment"], "prgs")
self.assertEqual(result["service"], "gitea")
self.assertEqual(result["identity"], "reviewer")
self.assertEqual(result["role"], "reviewer")
self.assertEqual(result["profile_address"], "prgs.gitea.reviewer")
self.assertEqual(result["execution_profile"], "reviewer-profile")
self.assertEqual(result["auth_source_type"], "keychain")
self.assertEqual(result["forbidden_operations"], ["merge"])
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# PR eligibility checks (read-only) — issue #14 # PR eligibility checks (read-only) — issue #14
@@ -1669,3 +1770,334 @@ class TestTrackerHygieneCleanup(unittest.TestCase):
# branch name fallback # branch name fallback
self.assertEqual(extract_linked_issue_numbers("", branch_name="issue-123"), [123]) self.assertEqual(extract_linked_issue_numbers("", branch_name="issue-123"), [123])
self.assertEqual(extract_linked_issue_numbers("", branch_name="feat/issue-123-foo"), [123]) self.assertEqual(extract_linked_issue_numbers("", branch_name="feat/issue-123-foo"), [123])
# ---------------------------------------------------------------------------
# Endpoint/keychain redaction in LLM-facing output — issue #120
# ---------------------------------------------------------------------------
class TestEndpointRedaction(unittest.TestCase):
"""Normal MCP output hides endpoint URLs and keychain ids; the admin
opt-in (GITEA_MCP_REVEAL_ENDPOINTS) restores them for local diagnostics
without ever revealing token values."""
def _contexts_config_file(self):
import tempfile
config = {
"version": 2,
"contexts": {
"prgs": {
"enabled": True,
"gitea": {"enabled": True, "kind": "gitea",
"base_url": "https://gitea.prgs.cc"},
"services": {
"jenkins": {
"enabled": True, "kind": "jenkins",
"label": "PRGS Jenkins",
"base_url": "https://jenkins.prgs.cc",
"auth": {"type": "keychain",
"id": "prgs-jenkins-token"},
"capabilities": ["read"],
},
"sentry": {
"enabled": False, "kind": "sentry",
"label": "PRGS Sentry", "base_url": "",
"auth": {"type": "keychain",
"id": "prgs-sentry-token"},
"capabilities": ["read"],
},
},
},
},
"profiles": {
"prgs-author": {
"enabled": True, "context": "prgs", "role": "author",
"username": "jcwalker3",
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain",
"id": "prgs-gitea-author-token"},
"allowed_operations": ["read"],
"forbidden_operations": [],
},
},
"projects": {},
"rules": {"hide_service_urls_from_llm": True,
"hide_keychain_ids_from_llm": True,
"mcp_resolves_endpoints": True},
}
fd, path = tempfile.mkstemp(suffix=".json")
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(config, fh)
return path
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_hides_endpoint_url_by_default(self, _auth, mock_api):
mock_api.return_value = {"id": 1, "login": "someone"}
with patch.dict(os.environ, {}, clear=True):
result = gitea_whoami(remote="prgs")
self.assertNotIn("server", result)
self.assertNotIn("https://", repr(result))
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_reveals_endpoint_with_admin_optin(self, _auth, mock_api):
mock_api.return_value = {"id": 1, "login": "someone"}
with patch.dict(os.environ,
{"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_whoami(remote="prgs")
self.assertEqual(result["server"], "https://gitea.prgs.cc")
def test_get_profile_hides_url_and_token_source_by_default(self):
env = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_BASE_URL": "https://gitea.example.invalid",
"GITEA_TOKEN_SOURCE": "keychain:some-item-id",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_get_profile(remote="prgs",
resolve_identity=False)
blob = repr(result)
for leaked in ("https://", "keychain:", "some-item-id",
"base_url", "server", "token_source_name"):
self.assertNotIn(leaked, blob)
self.assertEqual(result["auth_status"], "configured")
def test_get_profile_reports_unconfigured_auth(self):
with patch.dict(os.environ,
{"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
result = gitea_get_profile(remote="prgs",
resolve_identity=False)
self.assertEqual(result["auth_status"], "unconfigured")
def test_get_profile_reveals_with_admin_optin(self):
env = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_TOKEN_SOURCE": "keychain:some-item-id",
"GITEA_MCP_REVEAL_ENDPOINTS": "1",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_get_profile(remote="prgs",
resolve_identity=False)
self.assertEqual(result["server"], "https://gitea.prgs.cc")
self.assertEqual(result["token_source_name"], "keychain:some-item-id")
def test_audit_tool_reports_state_without_urls_or_ids(self):
from mcp_server import gitea_audit_config
path = self._contexts_config_file()
try:
env = {"GITEA_MCP_CONFIG": path,
"GITEA_MCP_PROFILE": "prgs-author"}
with patch.dict(os.environ, env, clear=True), \
patch("gitea_config._keychain_token", return_value="x"):
result = gitea_audit_config()
finally:
os.unlink(path)
blob = json.dumps(result)
self.assertIn("PRGS Jenkins: enabled, read-only, authenticated",
result["summaries"])
self.assertIn("PRGS Sentry: disabled", result["summaries"])
for leaked in ("https://", "http://", "prgs-jenkins-token",
"prgs-gitea-author-token", "base_url"):
self.assertNotIn(leaked, blob)
def test_audit_tool_without_config_reports_off(self):
with patch.dict(os.environ, {}, clear=True):
from mcp_server import gitea_audit_config
result = gitea_audit_config()
self.assertFalse(result["configured"])
# ---------------------------------------------------------------------------
# Issue comment tools (#126)
# ---------------------------------------------------------------------------
class TestIssueCommentTools(unittest.TestCase):
"""gitea_list_issue_comments / gitea_create_issue_comment (#126).
Issue discussion comments are distinct from PR reviews: they hit the
/issues/{n}/comments endpoint, are gated on gitea.read (list) and
gitea.issue.comment (create) — never on the gitea.pr.* review/merge
family — and their normal output is LLM-safe (no endpoint URLs; the
GITEA_MCP_REVEAL_ENDPOINTS opt-in restores links for local diagnostics).
"""
AUTHOR_ENV = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
}
@staticmethod
def _comment(cid=101, login="alice", body="hello world"):
return {
"id": cid,
"user": {"login": login},
"body": body,
"created_at": "2026-07-03T00:00:00Z",
"updated_at": "2026-07-03T01:00:00Z",
"html_url": (
"https://gitea.example.com/o/r/issues/9#issuecomment-%d" % cid
),
}
# -- list ----------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_lists_comments(self, _auth, mock_api):
mock_api.return_value = [self._comment(101), self._comment(102, "bob")]
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
self.assertTrue(result["success"])
self.assertEqual(result["issue_number"], 9)
self.assertEqual(len(result["comments"]), 2)
first = result["comments"][0]
self.assertEqual(first["id"], 101)
self.assertEqual(first["author"], "alice")
self.assertEqual(first["body"], "hello world")
self.assertEqual(first["created_at"], "2026-07-03T00:00:00Z")
self.assertEqual(first["updated_at"], "2026-07-03T01:00:00Z")
url = mock_api.call_args[0][1]
self.assertIn("/issues/9/comments", url)
self.assertEqual(mock_api.call_args[0][0], "GET")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_output_has_no_urls_by_default(self, _auth, mock_api):
mock_api.return_value = [self._comment()]
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
blob = json.dumps(result)
self.assertNotIn("https://", blob)
self.assertNotIn("http://", blob)
self.assertNotIn("url", blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = [self._comment()]
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
with patch.dict(os.environ, env, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
self.assertIn("issuecomment-101", result["comments"][0]["url"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_blocked_without_read_permission(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-writer-only",
"GITEA_ALLOWED_OPERATIONS": "gitea.issue.comment"}
with patch.dict(os.environ, env, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
self.assertFalse(result["success"])
self.assertTrue(result["reasons"])
mock_api.assert_not_called()
def test_list_unknown_remote_raises(self):
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
with self.assertRaises(ValueError):
gitea_list_issue_comments(issue_number=9, remote="nope")
# -- create ----------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_comment(self, _auth, mock_api):
mock_api.return_value = self._comment(555, "gitea-author", "posted")
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertTrue(result["success"])
self.assertEqual(result["comment_id"], 555)
self.assertEqual(result["issue_number"], 9)
self.assertEqual(mock_api.call_args[0][0], "POST")
url = mock_api.call_args[0][1]
self.assertIn("/issues/9/comments", url)
self.assertIn("gitea.prgs.cc", url)
self.assertIn("Scaled-Tech-Consulting", url)
self.assertEqual(mock_api.call_args[0][3], {"body": "posted"})
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_output_has_no_urls_by_default(self, _auth, mock_api):
mock_api.return_value = self._comment(555)
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
blob = json.dumps(result)
self.assertNotIn("https://", blob)
self.assertNotIn("http://", blob)
self.assertNotIn("url", blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = self._comment(555)
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
with patch.dict(os.environ, env, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertIn("issuecomment-555", result["url"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_review_permissions_do_not_grant_issue_comments(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS":
"gitea.read,gitea.pr.review,gitea.pr.comment,"
"gitea.pr.approve,gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertFalse(result["success"])
self.assertFalse(result["performed"])
self.assertTrue(result["reasons"])
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_forbidden_overrides_allowed(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.issue.comment"}
with patch.dict(os.environ, env, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertFalse(result["success"])
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_empty_body_blocked(self, _auth, mock_api):
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body=" ", remote="prgs")
self.assertFalse(result["success"])
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_missing_issue_error_is_redacted(self, _auth, mock_api):
mock_api.side_effect = RuntimeError(
"Gitea API error 404 for issue (auth was token abc123secret)")
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
with self.assertRaises(RuntimeError) as cm:
gitea_create_issue_comment(
issue_number=99999, body="posted", remote="prgs")
msg = str(cm.exception)
self.assertNotIn("abc123secret", msg)
self.assertIn("404", msg)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_never_touches_review_endpoints(self, _auth, mock_api):
mock_api.return_value = self._comment(555)
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
for call in mock_api.call_args_list:
self.assertNotIn("reviews", call[0][1])
self.assertNotIn("/pulls/", call[0][1])
def test_create_unknown_remote_raises(self):
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
with self.assertRaises(ValueError):
gitea_create_issue_comment(
issue_number=9, body="x", remote="nope")
+299
View File
@@ -0,0 +1,299 @@
"""Unit tests for migrate_profiles.py migration helper."""
import os
import sys
import json
import unittest
import tempfile
import shutil
from unittest.mock import patch
from io import StringIO
# Add project root to sys.path
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import migrate_profiles
class TestMigrateProfiles(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.v1_content = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {
"type": "keychain",
"id": "redacted-author-ref"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "personal-prgs",
"allowed_operations": ["read", "comment"],
"forbidden_operations": ["approve", "merge"]
},
"mdcps": {
"base_url": "redacted-mdcps-service",
"username": "913443",
"auth": {
"type": "keychain",
"id": "redacted-mdcps-ref"
},
"default_owner": "Contractor",
"execution_profile": "mdcps",
"allowed_operations": ["read"],
"forbidden_operations": ["merge"]
},
"prgs-reviewer": {
"base_url": "redacted-prgs-service",
"username": "sysadmin",
"auth": {
"type": "keychain",
"id": "redacted-reviewer-ref"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "prgs-reviewer",
"allowed_operations": [
"read", "review", "comment", "approve",
"request_changes", "merge"
],
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
}
}
}
self.input_file = os.path.join(self.temp_dir, "profiles.json")
with open(self.input_file, "w") as f:
json.dump(self.v1_content, f)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_migration_logic(self):
"""Test the structural transformation and capability mapping."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertEqual(v2_data["version"], 2)
# Check environment structure
envs = v2_data["environments"]
self.assertIn("prgs", envs)
self.assertIn("mdcps", envs)
# Check service and identity structure
prgs_gitea = envs["prgs"]["services"]["gitea"]
self.assertEqual(prgs_gitea["base_url"], "redacted-prgs-service")
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
author = prgs_gitea["identities"]["author"]
self.assertEqual(author["username"], "jcwalker3")
self.assertEqual(author["auth"]["id"], "redacted-author-ref")
self.assertEqual(author["allowed_operations"], ["read", "comment"])
self.assertEqual(author["forbidden_operations"], ["approve", "merge"])
reviewer = prgs_gitea["identities"]["reviewer"]
self.assertEqual(reviewer["role"], "reviewer")
self.assertEqual(reviewer["username"], "sysadmin")
self.assertEqual(reviewer["auth"]["id"], "redacted-reviewer-ref")
self.assertIn("merge", reviewer["allowed_operations"])
def test_alias_generation(self):
"""Test that aliases are correctly generated to support old profile names."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
aliases = v2_data["aliases"]
self.assertEqual(aliases["prgs"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-author"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-reviewer"], "prgs.gitea.reviewer")
self.assertEqual(aliases["mdcps"], "mdcps.gitea.author")
def test_no_secret_behavior(self):
"""Ensure secrets are never extracted, printed, or processed."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
# Check that auth structures only contain keychain references, not credentials
for env in v2_data["environments"].values():
for svc in env["services"].values():
for ident in svc["identities"].values():
auth = ident["auth"]
self.assertEqual(auth["type"], "keychain")
self.assertIn("id", auth)
self.assertNotIn("token", auth)
self.assertNotIn("password", auth)
def test_validation(self):
"""Test that the generated v2 configuration validates against Gitea-Tools v2 loader."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertTrue(migrate_profiles.validate_v2_data(v2_data))
@patch("sys.stdout", new_callable=StringIO)
def test_dry_run_default(self, mock_stdout):
"""Verify that running without -w prints generated config without modifying files."""
output_file = os.path.join(self.temp_dir, "migrated_dry.json")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file
]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
self.assertFalse(os.path.exists(output_file))
self.assertFalse(os.path.exists(f"{self.input_file}.bak"))
stdout_output = mock_stdout.getvalue()
self.assertIn("DRY-RUN MODE", stdout_output)
self.assertIn("version", stdout_output)
self.assertIn("identities", stdout_output)
self.assertIn("aliases", stdout_output)
self.assertNotIn("redacted-prgs-service", stdout_output)
self.assertNotIn("redacted-mdcps-service", stdout_output)
self.assertNotIn("redacted-author-ref", stdout_output)
self.assertNotIn("redacted-mdcps-ref", stdout_output)
self.assertNotIn("redacted-reviewer-ref", stdout_output)
self.assertNotIn("keychain", stdout_output)
self.assertNotIn("auth", stdout_output)
def test_dry_run_hides_token_like_values(self):
"""Verify dry-run summary does not expose token-like auth metadata."""
sensitive = json.loads(json.dumps(self.v1_content))
sensitive["profiles"]["prgs"]["auth"]["id"] = "super-secret-token-value"
with open(self.input_file, "w") as f:
json.dump(sensitive, f)
test_args = ["migrate_profiles.py", "-i", self.input_file]
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
stdout_output = mock_stdout.getvalue()
self.assertNotIn("super-secret-token-value", stdout_output)
self.assertNotIn("token", stdout_output.lower())
def test_explicit_operations_are_preserved(self):
"""Explicit v1 permissions must not be replaced by role defaults."""
v1_data = json.loads(json.dumps(self.v1_content))
v1_data["profiles"]["prgs-reviewer"]["allowed_operations"] = ["read"]
v1_data["profiles"]["prgs-reviewer"]["forbidden_operations"] = ["merge"]
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
reviewer = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["reviewer"]
)
self.assertEqual(reviewer["allowed_operations"], ["read"])
self.assertEqual(reviewer["forbidden_operations"], ["merge"])
def test_inferred_role_defaults_only_when_unambiguous(self):
"""Role defaults are allowed only for clear author/reviewer profiles."""
v1_data = {
"version": 1,
"profiles": {
"prgs-author": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "prgs-author",
}
}
}
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
author = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["author"]
)
self.assertEqual(
author["allowed_operations"],
migrate_profiles.AUTHOR_DEFAULT_ALLOWED,
)
self.assertEqual(
author["forbidden_operations"],
migrate_profiles.AUTHOR_DEFAULT_FORBIDDEN,
)
def test_ambiguous_permission_source_fails_closed(self):
"""A profile without explicit permissions or clear role must not widen."""
v1_data = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "personal-prgs",
}
}
}
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_partial_permission_source_fails_closed(self):
"""Allowed without forbidden, or vice versa, is ambiguous."""
v1_data = json.loads(json.dumps(self.v1_content))
del v1_data["profiles"]["prgs"]["forbidden_operations"]
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_write_mode_and_backup(self):
"""Verify that write mode creates a backup and correctly saves the validated config."""
output_file = os.path.join(self.temp_dir, "migrated.json")
backup_file = os.path.join(self.temp_dir, "profiles_backup.json.bak")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file,
"--backup", backup_file,
"-w"
]
with patch.object(sys, "argv", test_args):
migrate_profiles.main()
# Verify backup exists and matches original v1 config
self.assertTrue(os.path.exists(backup_file))
with open(backup_file, "r") as f:
backup_data = json.load(f)
self.assertEqual(backup_data["version"], 1)
self.assertIn("prgs", backup_data["profiles"])
# Verify migrated v2 config exists and validates
self.assertTrue(os.path.exists(output_file))
with open(output_file, "r") as f:
v2_data = json.load(f)
self.assertEqual(v2_data["version"], 2)
self.assertIn("environments", v2_data)
self.assertEqual(v2_data["aliases"]["prgs"], "prgs.gitea.author")
def test_malformed_input_fails_safely(self):
"""Test that malformed JSON or invalid version numbers cause a clean exit with code 1."""
bad_json_file = os.path.join(self.temp_dir, "bad.json")
with open(bad_json_file, "w") as f:
f.write("{invalid-json}")
test_args = ["migrate_profiles.py", "-i", bad_json_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
bad_version_file = os.path.join(self.temp_dir, "bad_version.json")
with open(bad_version_file, "w") as f:
json.dump({"version": 3, "profiles": {}}, f)
test_args = ["migrate_profiles.py", "-i", bad_version_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
if __name__ == "__main__":
unittest.main()
+239
View File
@@ -0,0 +1,239 @@
"""Operation-name normalization table and enforcement tests — issue #106.
Covers the required matrix from #106:
- fully qualified allowed / forbidden operations
- legacy unqualified allowed / forbidden operations
- unknown operations (fail closed)
- ambiguous operations (fail closed)
- service mismatch (cross-service names never accepted by the wrong service)
- forbidden-overrides-allowed
- empty / missing allowed list
- duplicate operations after normalization
- no silent permission widening
- eligibility enforcement normalizes before checking
"""
import os
import sys
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_config # noqa: E402
from gitea_config import ( # noqa: E402
ConfigError,
check_operation,
normalize_operation,
)
from mcp_server import gitea_check_pr_eligibility # noqa: E402
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
# ---------------------------------------------------------------------------
# normalize_operation — canonical table
# ---------------------------------------------------------------------------
class TestNormalizeOperation(unittest.TestCase):
def test_fully_qualified_gitea_op_unchanged(self):
self.assertEqual(normalize_operation("gitea.pr.merge"), "gitea.pr.merge")
def test_legacy_aliases_map_to_canonical_names(self):
expected = {
"merge": "gitea.pr.merge",
"approve": "gitea.pr.approve",
"request_changes": "gitea.pr.request_changes",
"review": "gitea.pr.review",
"comment": "gitea.pr.comment",
"read": "gitea.read",
}
for legacy, canonical in expected.items():
self.assertEqual(normalize_operation(legacy), canonical)
def test_contexts_shape_author_verbs(self):
self.assertEqual(normalize_operation("branch"), "gitea.branch.create")
self.assertEqual(normalize_operation("commit"), "gitea.repo.commit")
self.assertEqual(normalize_operation("push"), "gitea.branch.push")
self.assertEqual(normalize_operation("open_pr"), "gitea.pr.create")
def test_unknown_unqualified_op_fails_closed(self):
with self.assertRaises(ConfigError):
normalize_operation("frobnicate")
def test_ambiguous_dotted_op_fails_closed(self):
# Dotted but neither gitea-prefixed nor an explicit alias: refuse to
# guess which namespace was meant.
with self.assertRaises(ConfigError):
normalize_operation("build.read")
def test_cross_service_name_rejected_by_wrong_service(self):
with self.assertRaises(ConfigError):
normalize_operation("jenkins.read", service="gitea")
with self.assertRaises(ConfigError):
normalize_operation("gitea.read", service="jenkins")
def test_non_gitea_single_word_namespaced_to_service(self):
self.assertEqual(normalize_operation("read", service="jenkins"),
"jenkins.read")
def test_non_gitea_qualified_own_prefix_unchanged(self):
self.assertEqual(
normalize_operation("jenkins.build.read", service="jenkins"),
"jenkins.build.read",
)
def test_empty_and_non_string_fail_closed(self):
for bad in ("", None, 3, ["merge"]):
with self.assertRaises(ConfigError):
normalize_operation(bad)
def test_gitea_alias_not_applied_to_other_services(self):
# "merge" on jenkins must not resolve to the *gitea* merge permission.
self.assertEqual(normalize_operation("merge", service="jenkins"),
"jenkins.merge")
def test_table_is_documented_and_matches_normalization(self):
table = gitea_config.GITEA_OPERATION_ALIASES
self.assertIsInstance(table, dict)
self.assertTrue(table)
for legacy, canonical in table.items():
self.assertEqual(normalize_operation(legacy), canonical)
self.assertTrue(canonical.startswith("gitea."))
# ---------------------------------------------------------------------------
# check_operation — enforcement semantics (normalize BEFORE checking)
# ---------------------------------------------------------------------------
class TestCheckOperation(unittest.TestCase):
def test_fully_qualified_allowed(self):
ok, reason = check_operation("gitea.pr.merge", ["gitea.pr.merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_fully_qualified_forbidden(self):
ok, reason = check_operation(
"gitea.pr.merge", ["gitea.pr.merge"], ["gitea.pr.merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_legacy_unqualified_allowed(self):
ok, reason = check_operation("merge", ["gitea.pr.merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_legacy_unqualified_forbidden(self):
ok, reason = check_operation("merge", ["gitea.pr.merge"], ["merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_unknown_operation_fails_closed(self):
ok, reason = check_operation("frobnicate", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_ambiguous_operation_fails_closed(self):
ok, reason = check_operation("build.read", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_service_mismatch_rejected(self):
ok, reason = check_operation("jenkins.read", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_forbidden_overrides_allowed_across_spellings(self):
# Allowed via legacy spelling, forbidden via canonical spelling: the
# forbidden entry must win after both normalize to the same op.
ok, reason = check_operation("merge", ["merge"], ["gitea.pr.merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_empty_allowed_list_denies(self):
ok, reason = check_operation("gitea.read", [])
self.assertFalse(ok)
self.assertEqual(reason, "no-allowed-operations")
def test_missing_allowed_list_denies(self):
ok, reason = check_operation("gitea.read", None)
self.assertFalse(ok)
self.assertEqual(reason, "no-allowed-operations")
def test_duplicates_after_normalization_are_harmless(self):
ok, reason = check_operation(
"merge", ["merge", "gitea.pr.merge", "merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_unnormalizable_allowed_entry_grants_nothing(self):
# A junk allowed entry must not widen permissions to anything.
ok, reason = check_operation("gitea.read", ["frobnicate"])
self.assertFalse(ok)
self.assertEqual(reason, "not-allowed")
def test_unnormalizable_forbidden_entry_fails_closed(self):
# If a forbidden entry cannot be understood, deny rather than risk
# silently narrowing the forbidden set (which would widen permissions).
ok, reason = check_operation(
"gitea.read", ["gitea.read"], ["frobnicate"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-forbidden-entry")
# ---------------------------------------------------------------------------
# Eligibility enforcement — normalization happens before checking (#106)
# ---------------------------------------------------------------------------
class TestEligibilityNormalizesOperations(unittest.TestCase):
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {
"user": {"login": author},
"state": state,
"head": {"sha": sha},
"mergeable": mergeable,
}
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_namespaced_profile_ops_allow_legacy_action(self, _auth, mock_api):
# JSON-config profiles carry canonical namespaced ops; the raw action
# "merge" must still match them after normalization.
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
remote="prgs")
self.assertTrue(r["eligible"])
self.assertNotIn("profile is not allowed to merge", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_namespaced_forbidden_op_blocks_legacy_action(self, _auth, mock_api):
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("profile forbids 'merge'", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_legacy_env_ops_still_work(self, _auth, mock_api):
# v1/env behaviour stays compatible: unqualified env ops keep working.
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=5, action="review",
remote="prgs")
self.assertTrue(r["eligible"])
if __name__ == "__main__":
unittest.main()