1 Commits

Author SHA1 Message Date
sysadmin a6da65c82b docs: re-land release version SOP with v1.1.0 audit lessons (#111) 2026-07-02 20:53:22 -04:00
12 changed files with 45 additions and 2567 deletions
-1
View File
@@ -6,7 +6,6 @@ __pycache__/
# Real JSON runtime-profile configs may reference private hosts; keep only the example.
gitea-mcp*.json
!gitea-mcp.example.json
!gitea-mcp.v2-contexts.example.json
.vscode/
graphify-out/
branches/
+2 -68
View File
@@ -134,74 +134,8 @@ Rules:
appears in both, it is forbidden.
- An operation not present in `allowed_operations` is treated as **not
allowed** (deny by default).
## Operation-name normalization (#106)
Canonical operation names are namespaced: `{service}.{area}.{verb}` (e.g.
`gitea.pr.merge`, `jenkins.build.read`). Legacy unqualified spellings are
accepted **only** through the explicit alias table below (the code of record
is `GITEA_OPERATION_ALIASES` in `gitea_config.py`; the enforcement matrix is
`tests/test_op_normalization.py`).
| Legacy spelling | Canonical operation |
|-------------------|----------------------------|
| `read` | `gitea.read` |
| `review` | `gitea.pr.review` |
| `comment` | `gitea.pr.comment` |
| `approve` | `gitea.pr.approve` |
| `request_changes` | `gitea.pr.request_changes` |
| `merge` | `gitea.pr.merge` |
| `pr.create` | `gitea.pr.create` |
| `branch.push` | `gitea.branch.push` |
| `branch` | `gitea.branch.create` |
| `commit` | `gitea.repo.commit` |
| `push` | `gitea.branch.push` |
| `open_pr` | `gitea.pr.create` |
For non-Gitea services, a single unqualified word namespaces to the checked
service (`read``jenkins.read` when checking Jenkins); names already
prefixed with that service pass through unchanged.
Enforcement rules (`gitea_config.check_operation`, run **before** any
allowed/forbidden membership check):
- Unknown operation names fail closed (denied).
- Ambiguous names — dotted names that are neither service-prefixed nor in the
alias table — fail closed.
- Cross-service names are never accepted by the wrong service
(`jenkins.read` never matches a Gitea check, and a Gitea alias is never
applied to another service).
- `forbidden_operations` overrides `allowed_operations` after both sides are
normalized, so a legacy spelling can never bypass a canonical forbidden
entry (or vice versa).
- An allowed entry that cannot be normalized grants nothing; a forbidden
entry that cannot be normalized denies the request. Normalization can
therefore never silently widen permissions.
- An empty or missing `allowed_operations` list denies everything.
## Issue comments versus PR reviews (#126)
Issue discussion comments and PR reviews are different capabilities and are
gated by different operations:
- **Issue comments** (`gitea_list_issue_comments`, `gitea_create_issue_comment`)
post to and read from an issue's discussion thread
(`/issues/{n}/comments`). Listing requires `gitea.read`; creating requires
`gitea.issue.comment`. They never submit review verdicts.
- **PR reviews** (`gitea_review_pr`, `gitea_submit_pr_review`) submit
approve/request-changes/comment verdicts on pull requests
(`/pulls/{n}/reviews`) and are gated by the `gitea.pr.*` family
(`gitea.pr.review`, `gitea.pr.approve`, `gitea.pr.request_changes`,
`gitea.pr.comment`).
A profile holding the full PR review/merge set still cannot post issue
discussion comments unless it also allows `gitea.issue.comment`, and vice
versa — neither family implies the other. Both comment tools require an
explicit issue number; the target repo comes only from the standard
remote/org/repo arguments. Create operations are audit-logged
(`create_issue_comment`) when `GITEA_AUDIT_LOG` is configured, errors are
redacted, and normal output contains no endpoint URLs
(`GITEA_MCP_REVEAL_ENDPOINTS=1` is the local admin opt-in for web links).
- These categories are descriptive for this issue. Their runtime enforcement is
out of scope here (see roadmap links).
## Identity and fail-closed rules
-80
View File
@@ -1,80 +0,0 @@
{
"version": 2,
"contexts": {
"example-context": {
"enabled": true,
"label": "Example environment",
"description": "One deployment environment: its Gitea plus non-Gitea services.",
"default_owner": "Example-Org",
"gitea": {
"enabled": true,
"kind": "gitea",
"base_url": "https://gitea.example.invalid"
},
"services": {
"jenkins": {
"enabled": true,
"kind": "jenkins",
"label": "Example Jenkins",
"base_url": "https://jenkins.example.invalid",
"auth": { "type": "keychain", "id": "example-jenkins-token" },
"capabilities": ["read"]
},
"glitchtip": {
"enabled": false,
"kind": "glitchtip",
"label": "Example GlitchTip (disabled: defined but unavailable)",
"base_url": "",
"auth": { "type": "keychain", "id": "example-glitchtip-token" },
"capabilities": ["read"],
"allow_raw_events": false
}
}
}
},
"profiles": {
"example-author": {
"enabled": true,
"context": "example-context",
"role": "author",
"username": "author-user",
"execution_profile": "example-author",
"audit_label": "example-author",
"auth": { "type": "keychain", "id": "example-gitea-author-token" },
"allowed_operations": ["read", "branch", "commit", "push", "open_pr", "comment"],
"forbidden_operations": ["approve", "request_changes", "merge"]
},
"example-reviewer": {
"enabled": true,
"context": "example-context",
"role": "reviewer",
"username": "reviewer-user",
"execution_profile": "example-reviewer",
"audit_label": "example-reviewer",
"auth": { "type": "keychain", "id": "example-gitea-reviewer-token" },
"allowed_operations": ["read", "review", "comment", "approve", "request_changes", "merge"],
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
}
},
"projects": {
"/absolute/path/to/local/repo": {
"enabled": true,
"context": "example-context",
"default_owner": "Example-Org",
"default_repo": "Example-Repo",
"default_author_profile": "example-author",
"default_reviewer_profile": "example-reviewer"
}
},
"rules": {
"disabled_behavior": "Defined but unavailable for action. MCP tools may report disabled entries during audits, but must not use them automatically.",
"no_silent_fallback": true,
"tokens_in_json": false,
"token_storage": "keychain",
"identity_must_match_task": true,
"same_username_cannot_review_own_pr": true,
"hide_service_urls_from_llm": true,
"hide_keychain_ids_from_llm": true,
"mcp_resolves_endpoints": true
}
}
+3 -23
View File
@@ -123,17 +123,13 @@ def get_auth_header(host):
token = os.environ.get("GITEA_TOKEN")
# 3. Fall back to a JSON runtime-profile token reference (token_env).
# Explicit env tokens above take precedence. When GITEA_MCP_CONFIG is
# configured, a broken config or unresolvable profile/credential fails
# closed here (no silent fallback to Basic auth or another source,
# #120). Without a configured JSON layer, env-only behaviour is
# unchanged.
# Explicit env tokens above take precedence. A broken config never breaks
# auth here — it fails closed to "no token"; the clear error surfaces via
# get_profile() / startup instead.
if not token:
try:
token = gitea_config.resolve_token(gitea_config.resolve_profile())
except gitea_config.ConfigError:
if gitea_config.config_path():
raise
token = None
if token:
@@ -473,29 +469,13 @@ def get_profile():
token_source = (os.environ.get("GITEA_TOKEN_SOURCE") or "").strip() \
or gitea_config.auth_source_name(jp)
base_url = os.environ.get("GITEA_BASE_URL") or jp.get("base_url") or None
auth_type = None
if isinstance(jp.get("auth"), dict):
auth_type = jp["auth"].get("type")
elif token_source:
if token_source.startswith("keychain:"):
auth_type = "keychain"
else:
auth_type = "env"
return {
"profile_name": name,
"allowed_operations": ops,
"forbidden_operations": forbidden,
"audit_label": audit_label,
"token_source_name": token_source,
"auth_source_type": auth_type,
"base_url": base_url,
"username": jp.get("username") or None,
"default_owner": jp.get("default_owner") or None,
"profile_path": jp.get("profile_path") or None,
"environment": jp.get("environment") or None,
"service": jp.get("service") or None,
"identity": jp.get("identity") or None,
"role": jp.get("role") or None,
"execution_profile": jp.get("execution_profile") or None,
}
+17 -453
View File
@@ -70,13 +70,11 @@ _TBD_RE = re.compile(r"(?i)^tbd(-|$)")
# Keys that would mean an inline secret wherever they appear.
_INLINE_SECRET_KEYS = ("token", "password", "secret")
# ── Operation-name normalization table (#106; minimal subset landed in #103) ───
# Canonical operations are namespaced ({service}.{area}.{verb}). Legacy
# unqualified spellings are accepted ONLY through this explicit table — never
# by guessing. The same table is the documentation of record (see
# docs/gitea-execution-profiles.md) and is exercised by
# tests/test_op_normalization.py.
GITEA_OPERATION_ALIASES = {
# ── Minimal operation normalization (#103) ─────────────────────────────────────
# Only what the #103 invariants need. The full normalization table, deprecation
# handling, and enforcement test matrix belong to issue #106 — do not grow this
# beyond invariant safety here.
_MINIMAL_GITEA_OP_MAP = {
"read": "gitea.read",
"review": "gitea.pr.review",
"comment": "gitea.pr.comment",
@@ -85,94 +83,32 @@ GITEA_OPERATION_ALIASES = {
"merge": "gitea.pr.merge",
"pr.create": "gitea.pr.create",
"branch.push": "gitea.branch.push",
# Contexts-shape author verbs (#120) — the invariant checks below depend on
# "push"/"open_pr" normalizing to the two author-only ops.
"branch": "gitea.branch.create",
"commit": "gitea.repo.commit",
"push": "gitea.branch.push",
"open_pr": "gitea.pr.create",
}
_REVIEW_MERGE_OPS = frozenset({"gitea.pr.approve", "gitea.pr.merge"})
_AUTHOR_ONLY_OPS = frozenset({"gitea.pr.create", "gitea.branch.push"})
def normalize_operation(op, service="gitea"):
"""Return the canonical namespaced name for *op*, or fail closed (#106).
def _normalize_op(service, op, addr):
"""Normalize *op* for *service*, or fail closed (#103 minimal subset).
- already namespaced for this service (``{service}.*``) → unchanged
- known unqualified Gitea ops → mapped via ``GITEA_OPERATION_ALIASES``
- known unqualified Gitea ops → mapped via ``_MINIMAL_GITEA_OP_MAP``
- unqualified single-word ops on non-Gitea services → ``{service}.{op}``
- anything else foreign service prefixes, dotted names outside the
table, unknown unqualified names — is unknown or ambiguous → ConfigError
Normalization never crosses services (a Gitea alias is never applied to
another service) and never widens permissions: an operation that cannot
be normalized grants and matches nothing.
- anything else (foreign prefixes, unknown unqualified names) → ConfigError
"""
if not isinstance(op, str) or not op:
raise ConfigError("operation must be a non-empty string (fail closed)")
raise ConfigError(f"identity '{addr}' has an empty or non-string operation")
if op.startswith(service + "."):
return op
if service == "gitea" and op in GITEA_OPERATION_ALIASES:
return GITEA_OPERATION_ALIASES[op]
if service == "gitea" and op in _MINIMAL_GITEA_OP_MAP:
return _MINIMAL_GITEA_OP_MAP[op]
if service != "gitea" and "." not in op:
return f"{service}.{op}"
raise ConfigError(
f"operation {op!r} cannot be normalized safely for service "
f"'{service}' (unknown, ambiguous, or cross-service; fail closed)"
f"identity '{addr}' has operation {op!r} that cannot be normalized "
f"safely for service '{service}' (fail closed; full table is issue #106)"
)
def check_operation(op, allowed, forbidden=(), service="gitea"):
"""Decide whether *op* is permitted. Returns ``(bool, reason)`` (#106).
Everything is normalized via :func:`normalize_operation` BEFORE any
membership check, so legacy and canonical spellings always compare equal.
Reasons: ``allowed``, ``invalid-operation``, ``invalid-forbidden-entry``,
``forbidden``, ``no-allowed-operations``, ``not-allowed``.
Fail-closed rules:
- an *op* that cannot be normalized is denied (``invalid-operation``)
- a forbidden entry that cannot be normalized denies the request
(``invalid-forbidden-entry``) — dropping it would silently narrow the
forbidden set, i.e. widen permissions
- an allowed entry that cannot be normalized is ignored — it grants
nothing, so permissions never widen
- ``forbidden`` always overrides ``allowed``
- an empty or missing allowed list denies everything
"""
try:
op_n = normalize_operation(op, service)
except ConfigError:
return (False, "invalid-operation")
forbidden_n = set()
for entry in (forbidden or ()):
try:
forbidden_n.add(normalize_operation(entry, service))
except ConfigError:
return (False, "invalid-forbidden-entry")
if op_n in forbidden_n:
return (False, "forbidden")
if not allowed:
return (False, "no-allowed-operations")
allowed_n = set()
for entry in allowed:
try:
allowed_n.add(normalize_operation(entry, service))
except ConfigError:
continue
if op_n in allowed_n:
return (True, "allowed")
return (False, "not-allowed")
def _normalize_op(service, op, addr):
"""Normalize *op* for identity *addr*, or fail closed with context."""
try:
return normalize_operation(op, service)
except ConfigError as exc:
raise ConfigError(f"identity '{addr}': {exc}") from None
# Default canonical config location (one file shared by all LLM launchers).
DEFAULT_CONFIG_PATH = os.path.join(
os.path.expanduser("~"), ".config", "gitea-tools", "profiles.json"
@@ -233,7 +169,7 @@ def load_config(path=None):
f"expected one of {list(SUPPORTED_VERSIONS)}"
)
if version == 2:
return _load_v2_any(data, path)
return _load_v2(data, path)
if version != SUPPORTED_VERSION:
raise ConfigError(
f"{path} has unsupported version {version!r}; "
@@ -409,363 +345,6 @@ def _load_v2(data, path):
}
# ── profiles.json version 2 *contexts* shape (#120) ───────────────────────────
# The canonical machine config groups everything by context: top-level
# "contexts" (each with a gitea block and non-Gitea "services"), flat
# "profiles" (Gitea identities pointing at a context), "projects" (local repo
# paths mapped to a context), and "rules". Every context/profile/service/
# project carries a required boolean "enabled": disabled entries are surfaced
# in audits but fail closed at selection — never a silent fallback. Loading
# flattens profiles into the same {"profiles": {...}, "unavailable": {...}}
# model v1 consumers and select_profile() already understand, and carries the
# validated "contexts"/"projects"/"rules" through for service resolution.
def _load_v2_any(data, path):
"""Dispatch a version-2 file to its shape loader; ambiguity fails closed."""
has_contexts = "contexts" in data
has_environments = "environments" in data
if has_contexts and has_environments:
raise ConfigError(
f"{path} version 2 config must not mix 'contexts' and "
"'environments' shapes (ambiguous; fail closed)"
)
if has_contexts:
return _load_v2_contexts(data, path)
return _load_v2(data, path)
def _require_enabled(kind, name, obj):
"""Return the required boolean ``enabled`` flag, failing closed."""
enabled = obj.get("enabled")
if not isinstance(enabled, bool):
raise ConfigError(
f"{kind} '{name}' requires a boolean 'enabled' flag (fail closed)"
)
return enabled
def _reject_inline_secrets(kind, name, obj):
for key in _INLINE_SECRET_KEYS:
if key in obj:
raise ConfigError(
f"{kind} '{name}' must not contain an inline '{key}'; "
"store secrets in the keychain and reference them by id"
)
def _validate_context_service(ctx_name, svc_name, svc):
"""Validate one context service entry (auth reference only, no secrets)."""
addr = f"{ctx_name}.{svc_name}"
if not isinstance(svc, dict):
raise ConfigError(f"service '{addr}' must be a JSON object")
_require_enabled("service", addr, svc)
_reject_inline_secrets("service", addr, svc)
if "auth" in svc:
_validate_auth(addr, svc["auth"])
def _load_v2_contexts(data, path):
"""Validate a v2 contexts-shape config and return the resolvable structure."""
contexts = data.get("contexts")
if not isinstance(contexts, dict) or not contexts:
raise ConfigError(
f"{path} version 2 contexts config requires a non-empty "
"'contexts' object"
)
for ctx_name, ctx in contexts.items():
if not _PROFILE_NAME_RE.match(ctx_name or ""):
raise ConfigError(f"invalid context name {ctx_name!r}")
if not isinstance(ctx, dict):
raise ConfigError(f"context '{ctx_name}' must be a JSON object")
_require_enabled("context", ctx_name, ctx)
gitea = ctx.get("gitea")
if gitea is not None:
if not isinstance(gitea, dict):
raise ConfigError(
f"context '{ctx_name}' has a non-object 'gitea' block")
_require_enabled("service", f"{ctx_name}.gitea", gitea)
_reject_inline_secrets("service", f"{ctx_name}.gitea", gitea)
services = ctx.get("services") or {}
if not isinstance(services, dict):
raise ConfigError(
f"context '{ctx_name}' has a non-object 'services' block")
for svc_name, svc in services.items():
_validate_context_service(ctx_name, svc_name, svc)
raw_profiles = data.get("profiles")
if not isinstance(raw_profiles, dict) or not raw_profiles:
raise ConfigError(
f"{path} version 2 contexts config requires a non-empty "
"'profiles' object"
)
profiles = {}
unavailable = {}
for name, raw in raw_profiles.items():
if not is_valid_profile_name(name):
raise ConfigError(f"invalid profile name {name!r}")
if not isinstance(raw, dict):
raise ConfigError(f"profile '{name}' must be a JSON object")
enabled = _require_enabled("profile", name, raw)
_reject_inline_secrets("profile", name, raw)
_validate_identity_auth(name, raw.get("auth"))
ctx_name = raw.get("context")
if ctx_name not in contexts:
raise ConfigError(
f"profile '{name}' references unknown context {ctx_name!r}")
context = contexts[ctx_name]
allowed = raw.get("allowed_operations") or []
forbidden = raw.get("forbidden_operations") or []
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ConfigError(f"profile '{name}' operation fields must be lists")
allowed_n = {_normalize_op("gitea", op, name) for op in allowed}
forbidden_n = {_normalize_op("gitea", op, name) for op in forbidden}
# Reviewer-identity deadlock rule (#100/#103) applies here unchanged.
if allowed_n & _REVIEW_MERGE_OPS:
missing = sorted(_AUTHOR_ONLY_OPS - forbidden_n)
if missing:
raise ConfigError(
f"profile '{name}' allows PR approve/merge but does not "
f"forbid {missing}; reviewer identities must forbid "
"gitea.pr.create and gitea.branch.push "
"(reviewer-identity deadlock rule)"
)
profile = dict(raw)
profile["allowed_operations"] = sorted(allowed_n)
profile["forbidden_operations"] = sorted(forbidden_n)
gitea = context.get("gitea") or {}
if not profile.get("base_url") and gitea.get("enabled"):
profile["base_url"] = gitea.get("base_url")
username = profile.get("username") or ""
if not enabled:
unavailable[name] = (
f"profile '{name}' is disabled (enabled: false); defined but "
"unavailable for action — refusing, no fallback"
)
elif not context.get("enabled"):
unavailable[name] = (
f"profile '{name}' belongs to context '{ctx_name}' which is "
"disabled (enabled: false); refusing, no fallback"
)
elif not profile.get("base_url"):
unavailable[name] = (
f"profile '{name}' has no usable base_url (none set and the "
f"context '{ctx_name}' gitea service is disabled or has none); "
"fail closed"
)
elif _TBD_RE.match(username):
unavailable[name] = (
f"profile '{name}' username {username!r} is a TBD placeholder; "
"provision the account before use (fail closed)"
)
else:
profiles[name] = profile
continue
# Unavailable profiles keep their (secret-free) body for audits only.
profile["_unavailable_reason"] = unavailable[name]
profiles.setdefault("_audit_only", {})
profiles["_audit_only"][name] = profile
projects = data.get("projects") or {}
if not isinstance(projects, dict):
raise ConfigError(f"{path} 'projects' must be a JSON object")
for proj_path, proj in projects.items():
if not isinstance(proj, dict):
raise ConfigError(f"project '{proj_path}' must be a JSON object")
_require_enabled("project", proj_path, proj)
if proj.get("context") not in contexts:
raise ConfigError(
f"project '{proj_path}' references unknown context "
f"{proj.get('context')!r}"
)
rules = data.get("rules") or {}
if not isinstance(rules, dict):
raise ConfigError(f"{path} 'rules' must be a JSON object")
audit_only = profiles.pop("_audit_only", {})
return {
"version": 2,
"shape": "contexts",
"profiles": profiles,
"unavailable": unavailable,
"audit_only_profiles": audit_only,
"contexts": contexts,
"projects": projects,
"rules": rules,
}
def resolve_service(config, context_name, service_name):
"""Return one context service's config for *internal* MCP use.
The returned dict includes the endpoint base_url and the keychain auth
*reference* — both are for MCP-internal resolution only and must never be
echoed into normal LLM-facing output (see audit_config/service_summaries).
Fails closed on an unknown or disabled context/service; never falls back
to another service.
"""
contexts = (config or {}).get("contexts")
if not isinstance(contexts, dict):
raise ConfigError(
"service resolution requires a version 2 contexts config")
ctx = contexts.get(context_name)
if ctx is None:
raise ConfigError(
f"unknown context '{context_name}' (fail closed, no fallback)")
if not ctx.get("enabled"):
raise ConfigError(
f"context '{context_name}' is disabled; its services are defined "
"but unavailable for action (no fallback)"
)
if service_name == "gitea":
service = ctx.get("gitea")
else:
service = (ctx.get("services") or {}).get(service_name)
if service is None:
raise ConfigError(
f"unknown service '{service_name}' in context '{context_name}' "
"(fail closed, no fallback)"
)
if not service.get("enabled"):
raise ConfigError(
f"service '{context_name}.{service_name}' is disabled; defined "
"but unavailable for action — refusing, no fallback"
)
return dict(service)
def project_for_path(config, path):
"""Map a local project *path* to its context entry, failing closed.
Returns None when the path is not configured (feature off for that repo).
Raises :class:`ConfigError` when the project or its context is disabled —
a configured-but-disabled project must never be acted on.
"""
projects = (config or {}).get("projects") or {}
project = projects.get(path)
if project is None:
return None
if not project.get("enabled"):
raise ConfigError(
f"project '{path}' is disabled (enabled: false); refusing, "
"no fallback"
)
contexts = (config or {}).get("contexts") or {}
ctx = contexts.get(project.get("context")) or {}
if not ctx.get("enabled"):
raise ConfigError(
f"project '{path}' maps to context '{project.get('context')}' "
"which is disabled; refusing, no fallback"
)
return dict(project)
def _audit_profile_entry(name, profile, enabled, reveal_endpoints):
"""One LLM-safe audit row: no endpoint URLs, no keychain ids, no tokens."""
auth = profile.get("auth") if isinstance(profile, dict) else None
entry = {
"name": name,
"enabled": enabled,
"context": profile.get("context") or profile.get("environment"),
"role": profile.get("role"),
"username": profile.get("username"),
"auth": (auth or {}).get("type") if isinstance(auth, dict) else None,
}
reason = profile.get("_unavailable_reason")
if reason:
entry["reason"] = reason
if reveal_endpoints:
entry["base_url"] = profile.get("base_url")
entry["auth_source"] = auth_source_name(profile)
return entry
def audit_config(config, reveal_endpoints=False):
"""Report enabled/disabled profiles and services without secrets.
Default output is LLM-safe: names, contexts, enabled state, capability
labels, and the auth *type* only — never endpoint URLs, keychain ids,
token values, or auth source names. ``reveal_endpoints=True`` is the
explicit admin/debug opt-in for local diagnostics: it adds base URLs and
non-secret auth source names (``keychain:<id>`` / env var name). Token
values are never included on any path.
"""
if config is None:
return {"version": None, "profiles": [], "services": []}
report = {
"version": config.get("version"),
"shape": config.get("shape") or ("environments"
if config.get("aliases") is not None
else "profiles"),
"profiles": [],
"services": [],
}
for name, profile in (config.get("profiles") or {}).items():
if not isinstance(profile, dict):
continue
report["profiles"].append(_audit_profile_entry(
name, profile, True, reveal_endpoints))
for name, profile in (config.get("audit_only_profiles") or {}).items():
report["profiles"].append(_audit_profile_entry(
name, profile, False, reveal_endpoints))
for ctx_name, ctx in (config.get("contexts") or {}).items():
ctx_enabled = bool(ctx.get("enabled"))
for svc_name, svc in (ctx.get("services") or {}).items():
entry = {
"context": ctx_name,
"name": svc_name,
"kind": svc.get("kind"),
"label": svc.get("label"),
"enabled": ctx_enabled and bool(svc.get("enabled")),
"capabilities": list(svc.get("capabilities") or []),
"auth": (svc.get("auth") or {}).get("type"),
}
if reveal_endpoints:
entry["base_url"] = svc.get("base_url")
entry["auth_source"] = auth_source_name(svc)
report["services"].append(entry)
return report
def service_summaries(config, auth_check=None):
"""Safe one-line service summaries for LLM sessions.
Each line reports label + state only (e.g. ``PRGS Jenkins: enabled,
read-only, authenticated`` / ``PRGS Sentry: disabled``) — never endpoint
URLs, keychain ids, or token values. *auth_check* is a callable taking the
service dict and returning True when its credential resolves; it defaults
to a local keychain presence check and its result is reported only as
``authenticated`` / ``no credential``.
"""
if auth_check is None:
def auth_check(service):
auth = service.get("auth") or {}
if auth.get("type") == "keychain":
return _keychain_token(auth.get("id")) is not None
if auth.get("type") == "env":
return bool(os.environ.get(auth.get("name") or ""))
return False
lines = []
for ctx_name, ctx in (config.get("contexts") or {}).items():
ctx_enabled = bool(ctx.get("enabled"))
for svc_name, svc in (ctx.get("services") or {}).items():
label = svc.get("label") or f"{ctx_name} {svc_name}"
if not (ctx_enabled and svc.get("enabled")):
lines.append(f"{label}: disabled")
continue
caps = list(svc.get("capabilities") or [])
cap_part = "read-only" if caps == ["read"] else ", ".join(caps)
auth_part = "authenticated" if auth_check(svc) else "no credential"
parts = ["enabled"] + ([cap_part] if cap_part else []) + [auth_part]
lines.append(f"{label}: " + ", ".join(parts))
return lines
def _validate_auth(name, auth):
"""Validate a profile's optional ``auth`` reference. Never echoes secrets."""
if auth is None:
@@ -955,7 +534,7 @@ def validate_config(config):
elif version == 2:
# v2 validation is all-or-nothing via the loader's invariants.
try:
_load_v2_any(config, "<config>")
_load_v2(config, "<config>")
except ConfigError as exc:
problems.append(str(exc))
return problems
@@ -1112,20 +691,5 @@ if __name__ == "__main__": # pragma: no cover - thin CLI dispatch
if len(sys.argv) > 1 and sys.argv[1] == "menu":
import gitea_config_menu
raise SystemExit(gitea_config_menu.main(sys.argv[2:]))
if len(sys.argv) > 1 and sys.argv[1] == "audit":
# Local admin/debug diagnostics (#120). --reveal-endpoints is the
# explicit opt-in that adds base URLs and non-secret auth source
# names; token values are never printed on any path.
try:
config = load_config(config_path() or DEFAULT_CONFIG_PATH)
report = audit_config(
config, reveal_endpoints="--reveal-endpoints" in sys.argv[2:])
report["summaries"] = service_summaries(config)
except ConfigError as exc:
print(f"config error: {exc}", file=sys.stderr)
raise SystemExit(1)
print(json.dumps(report, indent=2))
raise SystemExit(0)
print("usage: python gitea_config.py menu | audit [--reveal-endpoints]",
file=sys.stderr)
print("usage: python gitea_config.py menu", file=sys.stderr)
raise SystemExit(2)
+16 -233
View File
@@ -43,16 +43,6 @@ from gitea_auth import ( # noqa: E402
get_profile,
)
import gitea_audit # noqa: E402
import gitea_config # noqa: E402
def _reveal_endpoints() -> bool:
"""Admin/debug opt-in (#120): include endpoint URLs and token source
names in tool output. Off by default so normal LLM-facing responses
expose only logical names and status. Never affects token values, which
are excluded on every path."""
return (os.environ.get("GITEA_MCP_REVEAL_ENDPOINTS") or "").strip().lower() \
in ("1", "true", "yes")
mcp = FastMCP("gitea-tools", instructions=(
"Gitea issue tracker and PR management for dadeschools and prgs instances. "
@@ -521,24 +511,14 @@ def gitea_check_pr_eligibility(
return result
# Profile capability check (metadata only; not enforcement of the action).
# Both the action and the profile lists are normalized before comparison
# (#106), so legacy spellings ("merge") and canonical namespaced ops
# ("gitea.pr.merge") always match each other and never cross services.
allowed = profile["allowed_operations"]
forbidden = profile["forbidden_operations"]
op_ok, op_reason = gitea_config.check_operation(action, allowed, forbidden)
if not op_ok:
if op_reason == "no-allowed-operations":
reasons.append(
"profile has no configured allowed operations (fail closed)")
elif op_reason == "forbidden":
reasons.append(f"profile forbids '{action}'")
elif op_reason == "invalid-forbidden-entry":
reasons.append(
"profile has an unrecognized forbidden operation entry "
"(fail closed)")
else:
reasons.append(f"profile is not allowed to {action}")
if not allowed:
reasons.append("profile has no configured allowed operations (fail closed)")
if action in forbidden:
reasons.append(f"profile forbids '{action}'")
elif action not in allowed:
reasons.append(f"profile is not allowed to {action}")
h, o, r = _resolve(remote, host, org, repo)
@@ -1364,145 +1344,6 @@ def gitea_view_issue(
}
def _issue_comment_gate(op: str) -> list[str]:
"""Profile permission check for issue-comment tools (#126).
Issue discussion comments are gated separately from the gitea.pr.*
review/merge family: listing requires ``gitea.read``, creating requires
``gitea.issue.comment``. Returns a list of block reasons (empty = allowed);
an unreadable profile fails closed.
"""
try:
profile = get_profile()
except Exception as exc:
return [f"profile could not be resolved (fail closed): {_redact(str(exc))}"]
op_ok, op_reason = gitea_config.check_operation(
op, profile["allowed_operations"], profile["forbidden_operations"])
if op_ok:
return []
if op_reason == "no-allowed-operations":
return ["profile has no configured allowed operations (fail closed)"]
if op_reason == "forbidden":
return [f"profile forbids '{op}'"]
if op_reason == "invalid-forbidden-entry":
return ["profile has an unrecognized forbidden operation entry (fail closed)"]
return [f"profile is not allowed to {op}"]
@mcp.tool()
def gitea_list_issue_comments(
issue_number: int,
limit: int = 50,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""List discussion comments on a Gitea issue.
Read-only. Issue discussion comments are distinct from PR reviews: this
reads the issue comment thread and never touches review endpoints. The
profile must allow ``gitea.read`` (fail closed otherwise).
Normal output is LLM-safe: no endpoint URLs. Set
GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include each
comment's web link.
Args:
issue_number: The issue number whose comments to list (required).
limit: Max number of comments to return (default: 50).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success', 'issue_number', and 'comments' (each with 'id',
'author', 'created_at', 'updated_at', 'body'); on a permission block,
'success' False and 'reasons' with no API call made.
"""
reasons = _issue_comment_gate("gitea.read")
if reasons:
return {"success": False, "issue_number": issue_number,
"reasons": reasons}
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments"
comments = api_request("GET", api, auth) or []
reveal = _reveal_endpoints()
out = []
for c in comments[:limit]:
entry = {
"id": c["id"],
"author": (c.get("user") or {}).get("login", ""),
"created_at": c.get("created_at"),
"updated_at": c.get("updated_at"),
"body": c.get("body", ""),
}
if reveal:
entry["url"] = c.get("html_url")
out.append(entry)
return {"success": True, "issue_number": issue_number, "comments": out}
@mcp.tool()
def gitea_create_issue_comment(
issue_number: int,
body: str,
remote: str = "dadeschools",
host: str | None = None,
org: str | None = None,
repo: str | None = None,
) -> dict:
"""Post a markdown comment to a Gitea issue's discussion thread.
Issue discussion comments are distinct from PR reviews: this posts to the
issue comment thread only and never submits review verdicts. The profile
must allow ``gitea.issue.comment`` — gated separately from the gitea.pr.*
review/merge operations (fail closed otherwise). The target issue is
always explicit; there is no inference beyond the standard remote
defaults used by every tool.
Normal output is LLM-safe: comment id + issue number, no endpoint URLs.
Set GITEA_MCP_REVEAL_ENDPOINTS=1 (admin/debug opt-in) to include the
comment's web link. Errors are redacted before being raised.
Args:
issue_number: The issue number to comment on (required).
body: Markdown comment body (required, non-empty).
remote: Known instance — 'dadeschools' or 'prgs'.
host: Override the Gitea host.
org: Override the owner/organization.
repo: Override the repository name.
Returns:
dict with 'success', 'comment_id', and 'issue_number' ('url' only
with the reveal opt-in); on a permission block or empty body,
'success'/'performed' False and 'reasons' with no API call made.
"""
reasons = _issue_comment_gate("gitea.issue.comment")
if not (body or "").strip():
reasons.append("comment body must be a non-empty string")
if reasons:
return {"success": False, "performed": False,
"issue_number": issue_number, "reasons": reasons}
h, o, r = _resolve(remote, host, org, repo)
auth = _auth(h)
api = f"{repo_api_url(h, o, r)}/issues/{issue_number}/comments"
try:
with _audited("create_issue_comment", host=h, remote=remote, org=o,
repo=r, issue_number=issue_number,
request_metadata={"body_chars": len(body)}):
data = api_request("POST", api, auth, {"body": body})
except Exception as exc:
raise RuntimeError(_redact(str(exc))) from None
result = {"success": True, "performed": True,
"comment_id": data["id"], "issue_number": issue_number}
if _reveal_endpoints():
result["url"] = data.get("html_url")
return result
@mcp.tool()
def gitea_whoami(
remote: str = "dadeschools",
@@ -1541,35 +1382,21 @@ def gitea_whoami(
"Verify the configured token is valid for this instance."
)
# Runtime profile metadata is non-secret (name + allowed op categories).
# The token is resolved separately and is never included here. Endpoint
# URLs stay out of normal LLM-facing output (#120): the logical remote
# name is the addressing surface; 'server' appears only under the
# GITEA_MCP_REVEAL_ENDPOINTS admin opt-in.
# The token is resolved separately and is never included here.
profile = get_profile()
result = {
return {
"authenticated": True,
"username": data.get("login"),
"display_name": data.get("full_name") or None,
"user_id": data.get("id"),
"email": data.get("email") or None,
"server": f"https://{h}",
"remote": remote,
"profile": {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"],
"environment": profile.get("environment"),
"service": profile.get("service"),
"identity": profile.get("identity"),
"role": profile.get("role"),
"profile_address": profile.get("profile_path"),
"execution_profile": profile.get("execution_profile"),
"audit_label": profile.get("audit_label"),
"auth_source_type": profile.get("auth_source_type"),
},
}
if _reveal_endpoints():
result["server"] = f"https://{h}"
return result
@@ -1600,11 +1427,9 @@ def gitea_get_profile(
Read-only. Reports the non-secret configuration of the running MCP
process (profile name, allowed/forbidden operation categories, audit
label, auth *status*). Endpoint URLs and token source names are hidden
from normal output (#120) and appear only under the
GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Optionally resolves the
authenticated username via ``gitea_whoami``'s endpoint so an LLM can see
who this runtime acts as.
label, token *source name*, base URL) plus the resolved server for the
given remote. Optionally resolves the authenticated username via
``gitea_whoami``'s endpoint so an LLM can see who this runtime acts as.
This tool never mutates Gitea and never approves, merges, comments, or
creates anything. It never returns the token value, Authorization header,
@@ -1622,32 +1447,18 @@ def gitea_get_profile(
'verified', 'unknown', 'unavailable', or 'not_resolved'.
"""
profile = get_profile()
reveal = _reveal_endpoints()
result = {
"profile_name": profile["profile_name"],
"allowed_operations": profile["allowed_operations"],
"forbidden_operations": profile["forbidden_operations"],
"audit_label": profile["audit_label"],
"environment": profile.get("environment"),
"service": profile.get("service"),
"identity": profile.get("identity"),
"role": profile.get("role"),
"profile_address": profile.get("profile_path"),
"execution_profile": profile.get("execution_profile"),
"auth_source_type": profile.get("auth_source_type"),
# Auth is reported as a status only (#120): the token source *name*
# (env var name / keychain id) joins endpoint URLs behind the
# GITEA_MCP_REVEAL_ENDPOINTS admin opt-in. Token values never appear.
"auth_status": ("configured" if profile["token_source_name"]
else "unconfigured"),
"token_source_name": profile["token_source_name"],
"base_url": profile["base_url"],
"remote": remote if remote in REMOTES else None,
"server": None,
"authenticated_username": None,
"identity_status": "not_resolved",
}
if reveal:
result["token_source_name"] = profile["token_source_name"]
result["base_url"] = profile["base_url"]
result["server"] = None
if remote not in REMOTES:
# Mark ambiguity rather than raising: the tool stays inspectable.
@@ -1656,8 +1467,7 @@ def gitea_get_profile(
return result
h = host or REMOTES[remote]["host"]
if reveal:
result["server"] = f"https://{h}"
result["server"] = f"https://{h}"
if resolve_identity:
try:
@@ -1677,33 +1487,6 @@ def gitea_get_profile(
return result
@mcp.tool()
def gitea_audit_config() -> dict:
"""Audit the configured profiles/services: enabled state, no secrets.
Read-only and local-only: loads the canonical profiles.json named by
GITEA_MCP_CONFIG and reports profile/service names, contexts, enabled
state, capabilities, auth *status*, and one-line service summaries (e.g.
``PRGS Jenkins: enabled, read-only, authenticated``). Disabled entries
are listed so they can be audited, but the server refuses to act with
them and never falls back to another profile or service.
Never includes endpoint URLs, keychain ids, token source names, or token
values. Endpoint-revealing diagnostics exist only in the local admin CLI
(``python3 gitea_config.py audit --reveal-endpoints``), never over MCP.
"""
config = gitea_config.load_config()
if config is None:
return {
"configured": False,
"message": "No GITEA_MCP_CONFIG configured; env-only mode.",
}
report = gitea_config.audit_config(config)
report["configured"] = True
report["summaries"] = gitea_config.service_summaries(config)
return report
@mcp.tool()
def gitea_mark_issue(
issue_number: int,
-284
View File
@@ -1,284 +0,0 @@
#!/usr/bin/env python3
"""Migration helper to convert profiles.json from version 1 to version 2 environments shape.
This script preserves existing keychain references (auth.id) and maps old profile
names as aliases so that existing IDE configurations continue to function.
"""
import os
import sys
import json
import argparse
import shutil
import tempfile
# Resolve path to import gitea_config
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import gitea_config
AUTHOR_DEFAULT_ALLOWED = ["read", "branch", "commit", "push", "open_pr", "comment"]
AUTHOR_DEFAULT_FORBIDDEN = ["approve", "request_changes", "merge"]
REVIEWER_DEFAULT_ALLOWED = [
"read", "review", "comment", "approve", "request_changes", "merge"
]
REVIEWER_DEFAULT_FORBIDDEN = ["branch", "commit", "push", "open_pr"]
def infer_role(name, execution_profile):
"""Return the unambiguous role for a legacy profile name, or None."""
haystack = f"{name} {execution_profile or ''}".lower()
has_author = "author" in haystack
has_reviewer = "reviewer" in haystack
if has_author == has_reviewer:
return None
return "reviewer" if has_reviewer else "author"
def migration_summary(v2_data):
"""Return a redacted summary of the migrated config."""
environments = v2_data.get("environments", {})
service_count = 0
identity_count = 0
for env in environments.values():
services = env.get("services", {})
service_count += len(services)
for service in services.values():
identity_count += len(service.get("identities", {}))
return {
"version": v2_data.get("version"),
"environments": len(environments),
"services": service_count,
"identities": identity_count,
"aliases": len(v2_data.get("aliases", {})),
}
def migrate_v1_to_v2(v1_data):
"""Convert version 1 profiles.json format to version 2 environments format."""
environments = {}
aliases = {}
profiles = v1_data.get("profiles", {})
if not isinstance(profiles, dict):
raise ValueError("Malformed input: 'profiles' field must be a JSON object")
for name, prof in profiles.items():
if not isinstance(prof, dict):
raise ValueError(f"Malformed input: profile '{name}' must be a JSON object")
# Infer environment and identity name
if "-" in name:
parts = name.split("-", 1)
env_name = parts[0]
ident_name = parts[1]
else:
env_name = name
ident_name = "author"
# Determine role and identity based on name / execution_profile.
# Ambiguous profiles may still migrate only when they carry explicit
# permissions; otherwise role-based defaults could widen permissions.
exec_prof = prof.get("execution_profile") or ""
role = infer_role(name, exec_prof)
if role == "reviewer":
ident_name = "reviewer"
elif role == "author":
ident_name = "author"
else:
role = prof.get("role")
if role not in (None, "author", "reviewer"):
raise ValueError(
f"Profile '{name}' has unsupported role {role!r}"
)
# Construct identity block
identity_data = {
"username": prof.get("username"),
"auth": prof.get("auth"),
}
if role:
identity_data["role"] = role
if prof.get("execution_profile"):
identity_data["execution_profile"] = prof["execution_profile"]
# Set audit label (default to old name to preserve context)
identity_data["audit_label"] = prof.get("audit_label") or name
has_allowed = "allowed_operations" in prof
has_forbidden = "forbidden_operations" in prof
if has_allowed != has_forbidden:
raise ValueError(
f"Profile '{name}' must define both allowed_operations and "
"forbidden_operations, or neither (fail closed)"
)
if has_allowed:
allowed = prof.get("allowed_operations")
forbidden = prof.get("forbidden_operations")
if not isinstance(allowed, list) or not isinstance(forbidden, list):
raise ValueError(
f"Profile '{name}' operation fields must be lists"
)
identity_data["allowed_operations"] = list(allowed)
identity_data["forbidden_operations"] = list(forbidden)
elif role == "author":
identity_data["allowed_operations"] = list(AUTHOR_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(AUTHOR_DEFAULT_FORBIDDEN)
elif role == "reviewer":
identity_data["allowed_operations"] = list(REVIEWER_DEFAULT_ALLOWED)
identity_data["forbidden_operations"] = list(REVIEWER_DEFAULT_FORBIDDEN)
else:
raise ValueError(
f"Profile '{name}' has no explicit operation lists and no "
"unambiguous author/reviewer role marker (fail closed)"
)
# Nest inside environments/services structure
env = environments.setdefault(env_name, {})
services = env.setdefault("services", {})
gitea_svc = services.setdefault("gitea", {})
# Copy service-level attributes
if prof.get("base_url"):
gitea_svc["base_url"] = prof["base_url"]
if prof.get("default_owner"):
gitea_svc["default_owner"] = prof["default_owner"]
if prof.get("default_repo"):
gitea_svc["default_repo"] = prof["default_repo"]
identities = gitea_svc.setdefault("identities", {})
identities[ident_name] = identity_data
# Alias resolution targets
alias_target = f"{env_name}.gitea.{ident_name}"
if name != alias_target:
aliases[name] = alias_target
# Extra convenience alias for standard old-profile compatibility (e.g. prgs-author)
convenience_alias = f"{env_name}-{ident_name}"
if convenience_alias != alias_target and convenience_alias not in aliases:
aliases[convenience_alias] = alias_target
v2_data = {
"version": 2,
"environments": environments,
"aliases": aliases
}
return v2_data
def validate_v2_data(v2_data):
"""Validate generated v2 structure using gitea_config parser."""
fd, temp_path = tempfile.mkstemp(suffix=".json")
os.close(fd)
try:
with open(temp_path, "w") as f:
json.dump(v2_data, f)
# Attempt to load using load_config to run all validation rules
gitea_config.load_config(temp_path)
return True
except Exception as e:
raise ValueError(f"Generated v2 config failed validation: {e}")
finally:
try:
os.remove(temp_path)
except OSError:
pass
def main():
parser = argparse.ArgumentParser(
description="Migrate profiles.json from version 1 to version 2 environments shape."
)
parser.add_argument(
"-i", "--input",
default=gitea_config.DEFAULT_CONFIG_PATH,
help="Path to the version 1 profiles.json file (default: ~/.config/gitea-tools/profiles.json)"
)
parser.add_argument(
"-o", "--output",
help="Path to write the migrated version 2 profiles.json file (default: overwrite input)"
)
parser.add_argument(
"-w", "--write",
action="store_true",
help="Actually write the migrated config and create a backup (default is dry-run)"
)
parser.add_argument(
"--backup",
help="Path to write the backup file (default: <input_path>.bak)"
)
args = parser.parse_args()
input_path = os.path.abspath(args.input)
output_path = os.path.abspath(args.output or input_path)
backup_path = args.backup or f"{input_path}.bak"
if not os.path.isfile(input_path):
print(f"Error: Input file not found: {input_path}", file=sys.stderr)
sys.exit(1)
try:
with open(input_path, "r") as f:
v1_data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Input file is not valid JSON: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error reading input file: {e}", file=sys.stderr)
sys.exit(1)
# Validate version
version = v1_data.get("version")
if version is not None and version != 1:
print(f"Error: Unsupported profiles.json version: {version}. Expected version 1.", file=sys.stderr)
sys.exit(1)
try:
v2_data = migrate_v1_to_v2(v1_data)
validate_v2_data(v2_data)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if not args.write:
print("=== DRY-RUN MODE (No files modified) ===")
print("Generated v2 config validated successfully.")
print("Only aggregate counts are shown.")
summary = migration_summary(v2_data)
print("Summary:")
print(f" version: {summary['version']}")
print(f" environments: {summary['environments']}")
print(f" services: {summary['services']}")
print(f" identities: {summary['identities']}")
print(f" aliases: {summary['aliases']}")
sys.exit(0)
# Write Mode: Create Backup first
try:
print(f"Creating backup: {backup_path}")
shutil.copy2(input_path, backup_path)
except Exception as e:
print(f"Error creating backup: {e}", file=sys.stderr)
sys.exit(1)
# Write migrated config
try:
print(f"Writing migrated version 2 config: {output_path}")
# Ensure target directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w") as f:
json.dump(v2_data, f, indent=2)
f.write("\n")
print("Migration completed successfully!")
except Exception as e:
print(f"Error writing output file: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
+3 -5
View File
@@ -284,13 +284,11 @@ class TestAuthIntegration(_ConfigBase):
self.assertEqual(header, "token process-token")
def test_auth_header_unresolvable_ref_fails_closed(self):
# env token ref points at an unset var -> with GITEA_MCP_CONFIG set the
# ConfigError propagates (fail closed, #120): no silent fallback to
# Basic auth or another credential source.
# env token ref points at an unset var -> ConfigError inside resolve is
# swallowed to "no token"; auth falls through to (mocked-empty) basic.
with patch.dict(os.environ, self._env("mdcps-env"), clear=True):
with patch("gitea_auth.get_credentials", return_value=("", "")):
with self.assertRaises(gitea_config.ConfigError):
gitea_auth.get_auth_header("gitea.example.com")
self.assertIsNone(gitea_auth.get_auth_header("gitea.example.com"))
# ---------------------------------------------------------------------------
-446
View File
@@ -1,446 +0,0 @@
"""Tests for profiles.json version 2 *contexts* shape (#120).
The canonical machine config uses ``contexts`` / ``profiles`` / ``projects`` /
``rules`` with explicit ``enabled`` flags. Covers: loading + active-profile
resolution via GITEA_MCP_PROFILE, fail-closed refusal of disabled profiles /
contexts / services / projects, project-to-context mapping, base-URL fallback
from the context's gitea block, keychain-only auth references, LLM-safe audit
output (no endpoint URLs, no keychain ids, no tokens) with an explicit
admin/debug opt-in, v1 compatibility, and the no-silent-fallback rule in
gitea_auth.get_auth_header. No network, no real secrets.
"""
import json
import os
import sys
import tempfile
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_config # noqa: E402
import gitea_auth # noqa: E402
FAKE_TOKEN = "fake-token-for-tests" # not a real credential
def contexts_config():
"""A fresh, valid v2 contexts-shape config with enabled/disabled entries."""
return {
"version": 2,
"contexts": {
"prgs": {
"enabled": True,
"label": "Local / PRGS",
"default_owner": "Scaled-Tech-Consulting",
"gitea": {
"enabled": True,
"kind": "gitea",
"base_url": "https://gitea.prgs.cc",
},
"services": {
"jenkins": {
"enabled": True,
"kind": "jenkins",
"label": "PRGS Jenkins",
"base_url": "https://jenkins.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-jenkins-token"},
"capabilities": ["read"],
},
"sentry": {
"enabled": False,
"kind": "sentry",
"label": "PRGS Sentry",
"base_url": "",
"auth": {"type": "keychain", "id": "prgs-sentry-token"},
"capabilities": ["read"],
},
},
},
"lab": {
"enabled": False,
"gitea": {"enabled": False, "kind": "gitea", "base_url": ""},
"services": {
"jenkins": {
"enabled": False,
"kind": "jenkins",
"label": "Lab Jenkins",
"base_url": "http://localhost:8080",
"auth": {"type": "keychain", "id": "lab-jenkins-token"},
"capabilities": ["read"],
},
},
},
},
"profiles": {
"prgs-author": {
"enabled": True,
"context": "prgs",
"role": "author",
"username": "jcwalker3",
"execution_profile": "prgs-author",
"audit_label": "prgs-author",
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-gitea-author-token"},
"allowed_operations": [
"read", "branch", "commit", "push", "open_pr", "comment",
],
"forbidden_operations": [
"approve", "request_changes", "merge",
],
},
"prgs-reviewer": {
"enabled": True,
"context": "prgs",
"role": "reviewer",
"username": "sysadmin",
"execution_profile": "prgs-reviewer",
"audit_label": "prgs-reviewer",
# no base_url on purpose: must fall back to context gitea
"auth": {"type": "keychain", "id": "prgs-gitea-reviewer-token"},
"allowed_operations": [
"read", "review", "comment", "approve",
"request_changes", "merge",
],
"forbidden_operations": [
"branch", "commit", "push", "open_pr",
],
},
"retired-author": {
"enabled": False,
"context": "prgs",
"role": "author",
"username": "jcwalker3",
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "retired-token-ref"},
"allowed_operations": ["read"],
"forbidden_operations": [],
},
"lab-author": {
"enabled": True,
"context": "lab",
"role": "author",
"username": "jcwalker3",
"base_url": "http://localhost:3000",
"auth": {"type": "keychain", "id": "lab-gitea-author-token"},
"allowed_operations": ["read"],
"forbidden_operations": [],
},
},
"projects": {
"/repo/one": {
"enabled": True,
"context": "prgs",
"default_owner": "Scaled-Tech-Consulting",
"default_repo": "One",
"default_author_profile": "prgs-author",
"default_reviewer_profile": "prgs-reviewer",
},
"/repo/lab": {
"enabled": False,
"context": "lab",
},
},
"rules": {
"disabled_behavior": "report in audits, never act",
"no_silent_fallback": True,
"tokens_in_json": False,
"token_storage": "keychain",
"hide_service_urls_from_llm": True,
"hide_keychain_ids_from_llm": True,
"mcp_resolves_endpoints": True,
},
}
def write_config(data):
"""Write *data* to a temp JSON file and return its path."""
fd, path = tempfile.mkstemp(suffix=".json")
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh)
return path
def load(data):
"""Load *data* through gitea_config via a temp file, then clean up."""
path = write_config(data)
try:
return gitea_config.load_config(path)
finally:
os.unlink(path)
class LoadContextsShapeTests(unittest.TestCase):
def test_contexts_shape_loads(self):
config = load(contexts_config())
self.assertEqual(config["version"], 2)
self.assertIn("prgs-author", config["profiles"])
self.assertIn("prgs-reviewer", config["profiles"])
def test_active_profile_resolved_from_env(self):
path = write_config(contexts_config())
try:
with patch.dict(os.environ, {
gitea_config.ENV_CONFIG_PATH: path,
gitea_config.ENV_PROFILE: "prgs-author",
}):
profile = gitea_config.resolve_profile()
finally:
os.unlink(path)
self.assertEqual(profile["username"], "jcwalker3")
self.assertEqual(profile["base_url"], "https://gitea.prgs.cc")
self.assertEqual(profile["context"], "prgs")
def test_base_url_falls_back_to_context_gitea(self):
profile = gitea_config.select_profile(load(contexts_config()),
"prgs-reviewer")
self.assertEqual(profile["base_url"], "https://gitea.prgs.cc")
def test_profile_without_any_base_url_is_refused(self):
data = contexts_config()
del data["profiles"]["prgs-author"]["base_url"]
data["contexts"]["prgs"]["gitea"]["enabled"] = False
config = load(data)
with self.assertRaises(gitea_config.ConfigError):
gitea_config.select_profile(config, "prgs-author")
def test_v1_config_still_loads(self):
config = load({
"version": 1,
"profiles": {
"prgs": {
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-gitea-token"},
},
},
})
profile = gitea_config.select_profile(config, "prgs")
self.assertEqual(profile["base_url"], "https://gitea.prgs.cc")
def test_mixed_contexts_and_environments_rejected(self):
data = contexts_config()
data["environments"] = {"x": {"services": {}}}
with self.assertRaises(gitea_config.ConfigError):
load(data)
def test_missing_enabled_flag_is_refused(self):
data = contexts_config()
del data["profiles"]["prgs-author"]["enabled"]
with self.assertRaises(gitea_config.ConfigError) as ctx:
load(data)
self.assertIn("enabled", str(ctx.exception))
class DisabledRefusalTests(unittest.TestCase):
def setUp(self):
self.config = load(contexts_config())
def test_disabled_profile_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.select_profile(self.config, "retired-author")
self.assertIn("disabled", str(ctx.exception))
def test_profile_in_disabled_context_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.select_profile(self.config, "lab-author")
self.assertIn("disabled", str(ctx.exception))
def test_enabled_profile_still_selectable(self):
profile = gitea_config.select_profile(self.config, "prgs-author")
self.assertEqual(profile["context"], "prgs")
def test_disabled_service_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.resolve_service(self.config, "prgs", "sentry")
self.assertIn("disabled", str(ctx.exception))
def test_enabled_service_resolves_internally_with_auth_reference(self):
# Internal resolution keeps the URL + auth reference for MCP's own use;
# they must never appear in LLM-facing (audit/summary) output.
service = gitea_config.resolve_service(self.config, "prgs", "jenkins")
self.assertEqual(service["base_url"], "https://jenkins.prgs.cc")
self.assertEqual(service["auth"], {"type": "keychain",
"id": "prgs-jenkins-token"})
self.assertNotIn("token", service)
def test_service_in_disabled_context_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.resolve_service(self.config, "lab", "jenkins")
self.assertIn("disabled", str(ctx.exception))
def test_unknown_service_fails_closed(self):
with self.assertRaises(gitea_config.ConfigError):
gitea_config.resolve_service(self.config, "prgs", "nope")
class ProjectMappingTests(unittest.TestCase):
def setUp(self):
self.config = load(contexts_config())
def test_project_maps_to_context(self):
project = gitea_config.project_for_path(self.config, "/repo/one")
self.assertEqual(project["context"], "prgs")
self.assertEqual(project["default_reviewer_profile"], "prgs-reviewer")
def test_unknown_project_returns_none(self):
self.assertIsNone(
gitea_config.project_for_path(self.config, "/repo/unknown"))
def test_disabled_project_refused(self):
with self.assertRaises(gitea_config.ConfigError) as ctx:
gitea_config.project_for_path(self.config, "/repo/lab")
self.assertIn("disabled", str(ctx.exception))
class SecretHandlingTests(unittest.TestCase):
def test_inline_profile_token_rejected(self):
data = contexts_config()
data["profiles"]["prgs-author"]["token"] = FAKE_TOKEN
with self.assertRaises(gitea_config.ConfigError) as ctx:
load(data)
self.assertNotIn(FAKE_TOKEN, str(ctx.exception))
def test_inline_service_token_rejected(self):
data = contexts_config()
data["contexts"]["prgs"]["services"]["jenkins"]["token"] = FAKE_TOKEN
with self.assertRaises(gitea_config.ConfigError) as ctx:
load(data)
self.assertNotIn(FAKE_TOKEN, str(ctx.exception))
def test_selected_profile_resolves_token_via_keychain(self):
profile = gitea_config.select_profile(load(contexts_config()),
"prgs-author")
token = gitea_config.resolve_token(
profile, keychain_lookup=lambda item_id: FAKE_TOKEN
if item_id == "prgs-gitea-author-token" else None)
self.assertEqual(token, FAKE_TOKEN)
class AuditTests(unittest.TestCase):
"""LLM-facing audit output: enabled/disabled state only — no endpoint
URLs, no keychain ids, no token values. Admin opt-in reveals endpoints
and auth source names (never token values)."""
def setUp(self):
self.config = load(contexts_config())
def test_audit_reports_enabled_and_disabled(self):
report = gitea_config.audit_config(self.config)
profiles = {p["name"]: p for p in report["profiles"]}
self.assertTrue(profiles["prgs-author"]["enabled"])
self.assertFalse(profiles["retired-author"]["enabled"])
services = {(s["context"], s["name"]): s for s in report["services"]}
self.assertTrue(services[("prgs", "jenkins")]["enabled"])
self.assertFalse(services[("prgs", "sentry")]["enabled"])
self.assertFalse(services[("lab", "jenkins")]["enabled"])
def test_audit_hides_urls_keychain_ids_and_tokens_by_default(self):
rendered = json.dumps(gitea_config.audit_config(self.config))
for leaked in ("https://", "http://", "prgs-gitea-author-token",
"prgs-jenkins-token", "base_url", FAKE_TOKEN):
self.assertNotIn(leaked, rendered)
# Auth is reported as a status, not a reference.
report = gitea_config.audit_config(self.config)
profiles = {p["name"]: p for p in report["profiles"]}
self.assertEqual(profiles["prgs-author"]["auth"], "keychain")
def test_audit_admin_optin_reveals_endpoints_but_never_tokens(self):
report = gitea_config.audit_config(self.config, reveal_endpoints=True)
rendered = json.dumps(report)
self.assertIn("https://jenkins.prgs.cc", rendered)
self.assertIn("keychain:prgs-gitea-author-token", rendered)
self.assertNotIn(FAKE_TOKEN, rendered)
def test_audit_works_for_v1_config(self):
report = gitea_config.audit_config({
"version": 1,
"profiles": {
"prgs": {
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain", "id": "prgs-gitea-token"},
},
},
})
profiles = {p["name"]: p for p in report["profiles"]}
self.assertTrue(profiles["prgs"]["enabled"])
self.assertEqual(profiles["prgs"]["auth"], "keychain")
self.assertNotIn("https://", json.dumps(report))
class ServiceSummaryTests(unittest.TestCase):
"""Safe one-line summaries for LLM sessions: label + state only."""
def setUp(self):
self.config = load(contexts_config())
def test_summaries_show_state_without_urls_or_ids(self):
lines = gitea_config.service_summaries(
self.config, auth_check=lambda service: True)
text = "\n".join(lines)
self.assertIn("PRGS Jenkins: enabled, read-only, authenticated", text)
self.assertIn("PRGS Sentry: disabled", text)
self.assertIn("Lab Jenkins: disabled", text)
for leaked in ("https://", "http://", "keychain",
"prgs-jenkins-token"):
self.assertNotIn(leaked, text)
def test_summary_reports_missing_auth_without_secrets(self):
lines = gitea_config.service_summaries(
self.config, auth_check=lambda service: False)
text = "\n".join(lines)
self.assertIn("PRGS Jenkins: enabled, read-only, no credential", text)
class NoSilentFallbackTests(unittest.TestCase):
def test_broken_config_fails_auth_instead_of_falling_back(self):
"""With GITEA_MCP_CONFIG set but unloadable, auth must fail closed."""
path = write_config({"version": 2}) # invalid: no contexts/environments
env = {
gitea_config.ENV_CONFIG_PATH: path,
gitea_config.ENV_PROFILE: "prgs-author",
}
try:
with patch.dict(os.environ, env, clear=False), \
patch.object(gitea_auth, "get_credentials",
return_value=(None, None)):
for var in ("GITEA_TOKEN", "GITEA_TOKEN_PRGS",
"GITEA_TOKEN_DADESCHOOLS"):
os.environ.pop(var, None)
with self.assertRaises(gitea_config.ConfigError):
gitea_auth.get_auth_header("https://gitea.prgs.cc")
finally:
os.unlink(path)
def test_env_only_users_unaffected(self):
"""Without GITEA_MCP_CONFIG, a missing token still degrades quietly."""
env = dict(os.environ)
env.pop(gitea_config.ENV_CONFIG_PATH, None)
with patch.dict(os.environ, env, clear=True), \
patch.object(gitea_auth, "get_credentials",
return_value=(None, None)):
for var in ("GITEA_TOKEN", "GITEA_TOKEN_PRGS",
"GITEA_TOKEN_DADESCHOOLS"):
os.environ.pop(var, None)
self.assertIsNone(
gitea_auth.get_auth_header("https://gitea.prgs.cc"))
class ValidateConfigTests(unittest.TestCase):
def test_valid_contexts_config_has_no_problems(self):
self.assertEqual(gitea_config.validate_config(contexts_config()), [])
def test_repo_example_file_validates(self):
example = __import__("pathlib").Path(__file__).resolve().parent.parent \
/ "gitea-mcp.v2-contexts.example.json"
with open(example, encoding="utf-8") as fh:
self.assertEqual(gitea_config.validate_config(json.load(fh)), [])
def test_broken_contexts_config_reports_problems(self):
data = contexts_config()
data["profiles"]["prgs-author"]["context"] = "nope"
problems = gitea_config.validate_config(data)
self.assertTrue(problems)
if __name__ == "__main__":
unittest.main()
+3 -435
View File
@@ -3,7 +3,6 @@
Each tool is tested by calling the underlying function directly (not through
the MCP protocol) with mocked API responses.
"""
import json
import os
import sys
import unittest
@@ -31,8 +30,6 @@ from mcp_server import ( # noqa: E402
gitea_get_profile,
gitea_check_pr_eligibility,
gitea_submit_pr_review,
gitea_list_issue_comments,
gitea_create_issue_comment,
)
from gitea_auth import get_profile # noqa: E402
@@ -883,9 +880,7 @@ class TestWhoami(unittest.TestCase):
self.assertEqual(result["username"], "reviewer-bot")
self.assertEqual(result["display_name"], "Reviewer Bot")
self.assertEqual(result["user_id"], 42)
# Endpoint URLs are hidden from normal LLM-facing output (#120);
# the logical remote name is the addressing surface.
self.assertNotIn("server", result)
self.assertEqual(result["server"], "https://gitea.prgs.cc")
self.assertEqual(result["remote"], "prgs")
# Read-only: GET against the authenticated-user endpoint.
call_args = mock_api.call_args
@@ -997,65 +992,6 @@ class TestRuntimeProfile(unittest.TestCase):
for secret in ("super-secret-token", "token", "authorization", "basic "):
self.assertNotIn(secret, blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_v2_metadata(self, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
env = {
"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve",
"GITEA_FORBIDDEN_OPERATIONS": "merge",
"GITEA_AUDIT_LABEL": "reviewer-runtime",
"GITEA_TOKEN_SOURCE": "keychain:prgs-reviewer-token",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_whoami(remote="prgs")
profile = result["profile"]
self.assertEqual(profile["environment"], None)
self.assertEqual(profile["service"], None)
self.assertEqual(profile["identity"], None)
self.assertEqual(profile["role"], None)
self.assertEqual(profile["profile_address"], None)
self.assertEqual(profile["execution_profile"], None)
self.assertEqual(profile["audit_label"], "reviewer-runtime")
self.assertEqual(profile["auth_source_type"], "keychain")
self.assertEqual(profile["forbidden_operations"], ["merge"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
@patch("mcp_server.get_profile")
def test_whoami_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
mock_get_profile.return_value = {
"profile_name": "prgs.gitea.reviewer",
"allowed_operations": ["read", "review"],
"forbidden_operations": ["merge"],
"audit_label": "rev-audit",
"token_source_name": "keychain:prgs-reviewer-token",
"auth_source_type": "keychain",
"base_url": "https://gitea.prgs.cc",
"username": "sysadmin",
"default_owner": "Scaled-Tech-Consulting",
"profile_path": "prgs.gitea.reviewer",
"environment": "prgs",
"service": "gitea",
"identity": "reviewer",
"role": "reviewer",
"execution_profile": "reviewer-profile",
}
result = gitea_whoami(remote="prgs")
profile = result["profile"]
self.assertEqual(profile["environment"], "prgs")
self.assertEqual(profile["service"], "gitea")
self.assertEqual(profile["identity"], "reviewer")
self.assertEqual(profile["role"], "reviewer")
self.assertEqual(profile["profile_address"], "prgs.gitea.reviewer")
self.assertEqual(profile["execution_profile"], "reviewer-profile")
self.assertEqual(profile["audit_label"], "rev-audit")
self.assertEqual(profile["auth_source_type"], "keychain")
self.assertEqual(profile["forbidden_operations"], ["merge"])
# ---------------------------------------------------------------------------
# Profile discovery (read-only) — issue #13
@@ -1099,12 +1035,8 @@ class TestProfileDiscovery(unittest.TestCase):
self.assertEqual(result["allowed_operations"], ["read", "review", "approve"])
self.assertEqual(result["authenticated_username"], "reviewer-bot")
self.assertEqual(result["identity_status"], "verified")
# Endpoint URLs and token source names are hidden from normal
# LLM-facing output (#120); auth is reported as a status only.
self.assertNotIn("server", result)
self.assertNotIn("base_url", result)
self.assertNotIn("token_source_name", result)
self.assertEqual(result["auth_status"], "configured")
self.assertEqual(result["server"], "https://gitea.prgs.cc")
self.assertEqual(result["token_source_name"], "GITEA_TOKEN")
# Read-only: only a GET to the user endpoint was issued.
self.assertEqual(mock_api.call_args[0][0], "GET")
self.assertTrue(mock_api.call_args[0][1].endswith("/api/v1/user"))
@@ -1143,39 +1075,6 @@ class TestProfileDiscovery(unittest.TestCase):
self.assertIsNone(result["remote"])
self.assertIn("remote_error", result)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
@patch("mcp_server.get_profile")
def test_get_profile_v2_resolved_metadata(self, mock_get_profile, _auth, mock_api):
mock_api.return_value = {"id": 7, "login": "rev"}
mock_get_profile.return_value = {
"profile_name": "prgs.gitea.reviewer",
"allowed_operations": ["read", "review"],
"forbidden_operations": ["merge"],
"audit_label": "rev-audit",
"token_source_name": "keychain:prgs-reviewer-token",
"auth_source_type": "keychain",
"base_url": "https://gitea.prgs.cc",
"username": "sysadmin",
"default_owner": "Scaled-Tech-Consulting",
"profile_path": "prgs.gitea.reviewer",
"environment": "prgs",
"service": "gitea",
"identity": "reviewer",
"role": "reviewer",
"execution_profile": "reviewer-profile",
}
result = gitea_get_profile(remote="prgs")
self.assertEqual(result["environment"], "prgs")
self.assertEqual(result["service"], "gitea")
self.assertEqual(result["identity"], "reviewer")
self.assertEqual(result["role"], "reviewer")
self.assertEqual(result["profile_address"], "prgs.gitea.reviewer")
self.assertEqual(result["execution_profile"], "reviewer-profile")
self.assertEqual(result["auth_source_type"], "keychain")
self.assertEqual(result["forbidden_operations"], ["merge"])
# ---------------------------------------------------------------------------
# PR eligibility checks (read-only) — issue #14
@@ -1770,334 +1669,3 @@ class TestTrackerHygieneCleanup(unittest.TestCase):
# branch name fallback
self.assertEqual(extract_linked_issue_numbers("", branch_name="issue-123"), [123])
self.assertEqual(extract_linked_issue_numbers("", branch_name="feat/issue-123-foo"), [123])
# ---------------------------------------------------------------------------
# Endpoint/keychain redaction in LLM-facing output — issue #120
# ---------------------------------------------------------------------------
class TestEndpointRedaction(unittest.TestCase):
"""Normal MCP output hides endpoint URLs and keychain ids; the admin
opt-in (GITEA_MCP_REVEAL_ENDPOINTS) restores them for local diagnostics
without ever revealing token values."""
def _contexts_config_file(self):
import tempfile
config = {
"version": 2,
"contexts": {
"prgs": {
"enabled": True,
"gitea": {"enabled": True, "kind": "gitea",
"base_url": "https://gitea.prgs.cc"},
"services": {
"jenkins": {
"enabled": True, "kind": "jenkins",
"label": "PRGS Jenkins",
"base_url": "https://jenkins.prgs.cc",
"auth": {"type": "keychain",
"id": "prgs-jenkins-token"},
"capabilities": ["read"],
},
"sentry": {
"enabled": False, "kind": "sentry",
"label": "PRGS Sentry", "base_url": "",
"auth": {"type": "keychain",
"id": "prgs-sentry-token"},
"capabilities": ["read"],
},
},
},
},
"profiles": {
"prgs-author": {
"enabled": True, "context": "prgs", "role": "author",
"username": "jcwalker3",
"base_url": "https://gitea.prgs.cc",
"auth": {"type": "keychain",
"id": "prgs-gitea-author-token"},
"allowed_operations": ["read"],
"forbidden_operations": [],
},
},
"projects": {},
"rules": {"hide_service_urls_from_llm": True,
"hide_keychain_ids_from_llm": True,
"mcp_resolves_endpoints": True},
}
fd, path = tempfile.mkstemp(suffix=".json")
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(config, fh)
return path
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_hides_endpoint_url_by_default(self, _auth, mock_api):
mock_api.return_value = {"id": 1, "login": "someone"}
with patch.dict(os.environ, {}, clear=True):
result = gitea_whoami(remote="prgs")
self.assertNotIn("server", result)
self.assertNotIn("https://", repr(result))
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_whoami_reveals_endpoint_with_admin_optin(self, _auth, mock_api):
mock_api.return_value = {"id": 1, "login": "someone"}
with patch.dict(os.environ,
{"GITEA_MCP_REVEAL_ENDPOINTS": "1"}, clear=True):
result = gitea_whoami(remote="prgs")
self.assertEqual(result["server"], "https://gitea.prgs.cc")
def test_get_profile_hides_url_and_token_source_by_default(self):
env = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_BASE_URL": "https://gitea.example.invalid",
"GITEA_TOKEN_SOURCE": "keychain:some-item-id",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_get_profile(remote="prgs",
resolve_identity=False)
blob = repr(result)
for leaked in ("https://", "keychain:", "some-item-id",
"base_url", "server", "token_source_name"):
self.assertNotIn(leaked, blob)
self.assertEqual(result["auth_status"], "configured")
def test_get_profile_reports_unconfigured_auth(self):
with patch.dict(os.environ,
{"GITEA_PROFILE_NAME": "gitea-author"}, clear=True):
result = gitea_get_profile(remote="prgs",
resolve_identity=False)
self.assertEqual(result["auth_status"], "unconfigured")
def test_get_profile_reveals_with_admin_optin(self):
env = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_TOKEN_SOURCE": "keychain:some-item-id",
"GITEA_MCP_REVEAL_ENDPOINTS": "1",
}
with patch.dict(os.environ, env, clear=True):
result = gitea_get_profile(remote="prgs",
resolve_identity=False)
self.assertEqual(result["server"], "https://gitea.prgs.cc")
self.assertEqual(result["token_source_name"], "keychain:some-item-id")
def test_audit_tool_reports_state_without_urls_or_ids(self):
from mcp_server import gitea_audit_config
path = self._contexts_config_file()
try:
env = {"GITEA_MCP_CONFIG": path,
"GITEA_MCP_PROFILE": "prgs-author"}
with patch.dict(os.environ, env, clear=True), \
patch("gitea_config._keychain_token", return_value="x"):
result = gitea_audit_config()
finally:
os.unlink(path)
blob = json.dumps(result)
self.assertIn("PRGS Jenkins: enabled, read-only, authenticated",
result["summaries"])
self.assertIn("PRGS Sentry: disabled", result["summaries"])
for leaked in ("https://", "http://", "prgs-jenkins-token",
"prgs-gitea-author-token", "base_url"):
self.assertNotIn(leaked, blob)
def test_audit_tool_without_config_reports_off(self):
with patch.dict(os.environ, {}, clear=True):
from mcp_server import gitea_audit_config
result = gitea_audit_config()
self.assertFalse(result["configured"])
# ---------------------------------------------------------------------------
# Issue comment tools (#126)
# ---------------------------------------------------------------------------
class TestIssueCommentTools(unittest.TestCase):
"""gitea_list_issue_comments / gitea_create_issue_comment (#126).
Issue discussion comments are distinct from PR reviews: they hit the
/issues/{n}/comments endpoint, are gated on gitea.read (list) and
gitea.issue.comment (create) — never on the gitea.pr.* review/merge
family — and their normal output is LLM-safe (no endpoint URLs; the
GITEA_MCP_REVEAL_ENDPOINTS opt-in restores links for local diagnostics).
"""
AUTHOR_ENV = {
"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
}
@staticmethod
def _comment(cid=101, login="alice", body="hello world"):
return {
"id": cid,
"user": {"login": login},
"body": body,
"created_at": "2026-07-03T00:00:00Z",
"updated_at": "2026-07-03T01:00:00Z",
"html_url": (
"https://gitea.example.com/o/r/issues/9#issuecomment-%d" % cid
),
}
# -- list ----------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_lists_comments(self, _auth, mock_api):
mock_api.return_value = [self._comment(101), self._comment(102, "bob")]
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
self.assertTrue(result["success"])
self.assertEqual(result["issue_number"], 9)
self.assertEqual(len(result["comments"]), 2)
first = result["comments"][0]
self.assertEqual(first["id"], 101)
self.assertEqual(first["author"], "alice")
self.assertEqual(first["body"], "hello world")
self.assertEqual(first["created_at"], "2026-07-03T00:00:00Z")
self.assertEqual(first["updated_at"], "2026-07-03T01:00:00Z")
url = mock_api.call_args[0][1]
self.assertIn("/issues/9/comments", url)
self.assertEqual(mock_api.call_args[0][0], "GET")
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_output_has_no_urls_by_default(self, _auth, mock_api):
mock_api.return_value = [self._comment()]
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
blob = json.dumps(result)
self.assertNotIn("https://", blob)
self.assertNotIn("http://", blob)
self.assertNotIn("url", blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = [self._comment()]
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
with patch.dict(os.environ, env, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
self.assertIn("issuecomment-101", result["comments"][0]["url"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_list_blocked_without_read_permission(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-writer-only",
"GITEA_ALLOWED_OPERATIONS": "gitea.issue.comment"}
with patch.dict(os.environ, env, clear=True):
result = gitea_list_issue_comments(issue_number=9, remote="prgs")
self.assertFalse(result["success"])
self.assertTrue(result["reasons"])
mock_api.assert_not_called()
def test_list_unknown_remote_raises(self):
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
with self.assertRaises(ValueError):
gitea_list_issue_comments(issue_number=9, remote="nope")
# -- create ----------------------------------------------------------------
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_creates_comment(self, _auth, mock_api):
mock_api.return_value = self._comment(555, "gitea-author", "posted")
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertTrue(result["success"])
self.assertEqual(result["comment_id"], 555)
self.assertEqual(result["issue_number"], 9)
self.assertEqual(mock_api.call_args[0][0], "POST")
url = mock_api.call_args[0][1]
self.assertIn("/issues/9/comments", url)
self.assertIn("gitea.prgs.cc", url)
self.assertIn("Scaled-Tech-Consulting", url)
self.assertEqual(mock_api.call_args[0][3], {"body": "posted"})
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_output_has_no_urls_by_default(self, _auth, mock_api):
mock_api.return_value = self._comment(555)
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
blob = json.dumps(result)
self.assertNotIn("https://", blob)
self.assertNotIn("http://", blob)
self.assertNotIn("url", blob)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_reveal_opt_in_includes_url(self, _auth, mock_api):
mock_api.return_value = self._comment(555)
env = dict(self.AUTHOR_ENV, GITEA_MCP_REVEAL_ENDPOINTS="1")
with patch.dict(os.environ, env, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertIn("issuecomment-555", result["url"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_review_permissions_do_not_grant_issue_comments(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS":
"gitea.read,gitea.pr.review,gitea.pr.comment,"
"gitea.pr.approve,gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertFalse(result["success"])
self.assertFalse(result["performed"])
self.assertTrue(result["reasons"])
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_forbidden_overrides_allowed(self, _auth, mock_api):
env = {"GITEA_PROFILE_NAME": "gitea-author",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.issue.comment",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.issue.comment"}
with patch.dict(os.environ, env, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
self.assertFalse(result["success"])
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_empty_body_blocked(self, _auth, mock_api):
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
result = gitea_create_issue_comment(
issue_number=9, body=" ", remote="prgs")
self.assertFalse(result["success"])
mock_api.assert_not_called()
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_missing_issue_error_is_redacted(self, _auth, mock_api):
mock_api.side_effect = RuntimeError(
"Gitea API error 404 for issue (auth was token abc123secret)")
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
with self.assertRaises(RuntimeError) as cm:
gitea_create_issue_comment(
issue_number=99999, body="posted", remote="prgs")
msg = str(cm.exception)
self.assertNotIn("abc123secret", msg)
self.assertIn("404", msg)
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_create_never_touches_review_endpoints(self, _auth, mock_api):
mock_api.return_value = self._comment(555)
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
gitea_create_issue_comment(
issue_number=9, body="posted", remote="prgs")
for call in mock_api.call_args_list:
self.assertNotIn("reviews", call[0][1])
self.assertNotIn("/pulls/", call[0][1])
def test_create_unknown_remote_raises(self):
with patch.dict(os.environ, self.AUTHOR_ENV, clear=True):
with self.assertRaises(ValueError):
gitea_create_issue_comment(
issue_number=9, body="x", remote="nope")
-299
View File
@@ -1,299 +0,0 @@
"""Unit tests for migrate_profiles.py migration helper."""
import os
import sys
import json
import unittest
import tempfile
import shutil
from unittest.mock import patch
from io import StringIO
# Add project root to sys.path
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
import migrate_profiles
class TestMigrateProfiles(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.v1_content = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {
"type": "keychain",
"id": "redacted-author-ref"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "personal-prgs",
"allowed_operations": ["read", "comment"],
"forbidden_operations": ["approve", "merge"]
},
"mdcps": {
"base_url": "redacted-mdcps-service",
"username": "913443",
"auth": {
"type": "keychain",
"id": "redacted-mdcps-ref"
},
"default_owner": "Contractor",
"execution_profile": "mdcps",
"allowed_operations": ["read"],
"forbidden_operations": ["merge"]
},
"prgs-reviewer": {
"base_url": "redacted-prgs-service",
"username": "sysadmin",
"auth": {
"type": "keychain",
"id": "redacted-reviewer-ref"
},
"default_owner": "Scaled-Tech-Consulting",
"execution_profile": "prgs-reviewer",
"allowed_operations": [
"read", "review", "comment", "approve",
"request_changes", "merge"
],
"forbidden_operations": ["branch", "commit", "push", "open_pr"]
}
}
}
self.input_file = os.path.join(self.temp_dir, "profiles.json")
with open(self.input_file, "w") as f:
json.dump(self.v1_content, f)
def tearDown(self):
shutil.rmtree(self.temp_dir)
def test_migration_logic(self):
"""Test the structural transformation and capability mapping."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertEqual(v2_data["version"], 2)
# Check environment structure
envs = v2_data["environments"]
self.assertIn("prgs", envs)
self.assertIn("mdcps", envs)
# Check service and identity structure
prgs_gitea = envs["prgs"]["services"]["gitea"]
self.assertEqual(prgs_gitea["base_url"], "redacted-prgs-service")
self.assertEqual(prgs_gitea["default_owner"], "Scaled-Tech-Consulting")
author = prgs_gitea["identities"]["author"]
self.assertEqual(author["username"], "jcwalker3")
self.assertEqual(author["auth"]["id"], "redacted-author-ref")
self.assertEqual(author["allowed_operations"], ["read", "comment"])
self.assertEqual(author["forbidden_operations"], ["approve", "merge"])
reviewer = prgs_gitea["identities"]["reviewer"]
self.assertEqual(reviewer["role"], "reviewer")
self.assertEqual(reviewer["username"], "sysadmin")
self.assertEqual(reviewer["auth"]["id"], "redacted-reviewer-ref")
self.assertIn("merge", reviewer["allowed_operations"])
def test_alias_generation(self):
"""Test that aliases are correctly generated to support old profile names."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
aliases = v2_data["aliases"]
self.assertEqual(aliases["prgs"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-author"], "prgs.gitea.author")
self.assertEqual(aliases["prgs-reviewer"], "prgs.gitea.reviewer")
self.assertEqual(aliases["mdcps"], "mdcps.gitea.author")
def test_no_secret_behavior(self):
"""Ensure secrets are never extracted, printed, or processed."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
# Check that auth structures only contain keychain references, not credentials
for env in v2_data["environments"].values():
for svc in env["services"].values():
for ident in svc["identities"].values():
auth = ident["auth"]
self.assertEqual(auth["type"], "keychain")
self.assertIn("id", auth)
self.assertNotIn("token", auth)
self.assertNotIn("password", auth)
def test_validation(self):
"""Test that the generated v2 configuration validates against Gitea-Tools v2 loader."""
v2_data = migrate_profiles.migrate_v1_to_v2(self.v1_content)
self.assertTrue(migrate_profiles.validate_v2_data(v2_data))
@patch("sys.stdout", new_callable=StringIO)
def test_dry_run_default(self, mock_stdout):
"""Verify that running without -w prints generated config without modifying files."""
output_file = os.path.join(self.temp_dir, "migrated_dry.json")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file
]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
self.assertFalse(os.path.exists(output_file))
self.assertFalse(os.path.exists(f"{self.input_file}.bak"))
stdout_output = mock_stdout.getvalue()
self.assertIn("DRY-RUN MODE", stdout_output)
self.assertIn("version", stdout_output)
self.assertIn("identities", stdout_output)
self.assertIn("aliases", stdout_output)
self.assertNotIn("redacted-prgs-service", stdout_output)
self.assertNotIn("redacted-mdcps-service", stdout_output)
self.assertNotIn("redacted-author-ref", stdout_output)
self.assertNotIn("redacted-mdcps-ref", stdout_output)
self.assertNotIn("redacted-reviewer-ref", stdout_output)
self.assertNotIn("keychain", stdout_output)
self.assertNotIn("auth", stdout_output)
def test_dry_run_hides_token_like_values(self):
"""Verify dry-run summary does not expose token-like auth metadata."""
sensitive = json.loads(json.dumps(self.v1_content))
sensitive["profiles"]["prgs"]["auth"]["id"] = "super-secret-token-value"
with open(self.input_file, "w") as f:
json.dump(sensitive, f)
test_args = ["migrate_profiles.py", "-i", self.input_file]
with patch("sys.stdout", new_callable=StringIO) as mock_stdout:
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 0)
stdout_output = mock_stdout.getvalue()
self.assertNotIn("super-secret-token-value", stdout_output)
self.assertNotIn("token", stdout_output.lower())
def test_explicit_operations_are_preserved(self):
"""Explicit v1 permissions must not be replaced by role defaults."""
v1_data = json.loads(json.dumps(self.v1_content))
v1_data["profiles"]["prgs-reviewer"]["allowed_operations"] = ["read"]
v1_data["profiles"]["prgs-reviewer"]["forbidden_operations"] = ["merge"]
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
reviewer = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["reviewer"]
)
self.assertEqual(reviewer["allowed_operations"], ["read"])
self.assertEqual(reviewer["forbidden_operations"], ["merge"])
def test_inferred_role_defaults_only_when_unambiguous(self):
"""Role defaults are allowed only for clear author/reviewer profiles."""
v1_data = {
"version": 1,
"profiles": {
"prgs-author": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "prgs-author",
}
}
}
v2_data = migrate_profiles.migrate_v1_to_v2(v1_data)
author = (
v2_data["environments"]["prgs"]["services"]["gitea"]
["identities"]["author"]
)
self.assertEqual(
author["allowed_operations"],
migrate_profiles.AUTHOR_DEFAULT_ALLOWED,
)
self.assertEqual(
author["forbidden_operations"],
migrate_profiles.AUTHOR_DEFAULT_FORBIDDEN,
)
def test_ambiguous_permission_source_fails_closed(self):
"""A profile without explicit permissions or clear role must not widen."""
v1_data = {
"version": 1,
"profiles": {
"prgs": {
"base_url": "redacted-prgs-service",
"username": "jcwalker3",
"auth": {"type": "keychain", "id": "hidden-author-ref"},
"execution_profile": "personal-prgs",
}
}
}
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_partial_permission_source_fails_closed(self):
"""Allowed without forbidden, or vice versa, is ambiguous."""
v1_data = json.loads(json.dumps(self.v1_content))
del v1_data["profiles"]["prgs"]["forbidden_operations"]
with self.assertRaisesRegex(ValueError, "fail closed"):
migrate_profiles.migrate_v1_to_v2(v1_data)
def test_write_mode_and_backup(self):
"""Verify that write mode creates a backup and correctly saves the validated config."""
output_file = os.path.join(self.temp_dir, "migrated.json")
backup_file = os.path.join(self.temp_dir, "profiles_backup.json.bak")
test_args = [
"migrate_profiles.py",
"-i", self.input_file,
"-o", output_file,
"--backup", backup_file,
"-w"
]
with patch.object(sys, "argv", test_args):
migrate_profiles.main()
# Verify backup exists and matches original v1 config
self.assertTrue(os.path.exists(backup_file))
with open(backup_file, "r") as f:
backup_data = json.load(f)
self.assertEqual(backup_data["version"], 1)
self.assertIn("prgs", backup_data["profiles"])
# Verify migrated v2 config exists and validates
self.assertTrue(os.path.exists(output_file))
with open(output_file, "r") as f:
v2_data = json.load(f)
self.assertEqual(v2_data["version"], 2)
self.assertIn("environments", v2_data)
self.assertEqual(v2_data["aliases"]["prgs"], "prgs.gitea.author")
def test_malformed_input_fails_safely(self):
"""Test that malformed JSON or invalid version numbers cause a clean exit with code 1."""
bad_json_file = os.path.join(self.temp_dir, "bad.json")
with open(bad_json_file, "w") as f:
f.write("{invalid-json}")
test_args = ["migrate_profiles.py", "-i", bad_json_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
bad_version_file = os.path.join(self.temp_dir, "bad_version.json")
with open(bad_version_file, "w") as f:
json.dump({"version": 3, "profiles": {}}, f)
test_args = ["migrate_profiles.py", "-i", bad_version_file]
with patch.object(sys, "argv", test_args):
with self.assertRaises(SystemExit) as cm:
migrate_profiles.main()
self.assertEqual(cm.exception.code, 1)
if __name__ == "__main__":
unittest.main()
-239
View File
@@ -1,239 +0,0 @@
"""Operation-name normalization table and enforcement tests — issue #106.
Covers the required matrix from #106:
- fully qualified allowed / forbidden operations
- legacy unqualified allowed / forbidden operations
- unknown operations (fail closed)
- ambiguous operations (fail closed)
- service mismatch (cross-service names never accepted by the wrong service)
- forbidden-overrides-allowed
- empty / missing allowed list
- duplicate operations after normalization
- no silent permission widening
- eligibility enforcement normalizes before checking
"""
import os
import sys
import unittest
from unittest.mock import patch
sys.path.insert(0, str(__import__("pathlib").Path(__file__).resolve().parent.parent))
import gitea_config # noqa: E402
from gitea_config import ( # noqa: E402
ConfigError,
check_operation,
normalize_operation,
)
from mcp_server import gitea_check_pr_eligibility # noqa: E402
FAKE_AUTH = "Basic dGVzdDp0ZXN0"
# ---------------------------------------------------------------------------
# normalize_operation — canonical table
# ---------------------------------------------------------------------------
class TestNormalizeOperation(unittest.TestCase):
def test_fully_qualified_gitea_op_unchanged(self):
self.assertEqual(normalize_operation("gitea.pr.merge"), "gitea.pr.merge")
def test_legacy_aliases_map_to_canonical_names(self):
expected = {
"merge": "gitea.pr.merge",
"approve": "gitea.pr.approve",
"request_changes": "gitea.pr.request_changes",
"review": "gitea.pr.review",
"comment": "gitea.pr.comment",
"read": "gitea.read",
}
for legacy, canonical in expected.items():
self.assertEqual(normalize_operation(legacy), canonical)
def test_contexts_shape_author_verbs(self):
self.assertEqual(normalize_operation("branch"), "gitea.branch.create")
self.assertEqual(normalize_operation("commit"), "gitea.repo.commit")
self.assertEqual(normalize_operation("push"), "gitea.branch.push")
self.assertEqual(normalize_operation("open_pr"), "gitea.pr.create")
def test_unknown_unqualified_op_fails_closed(self):
with self.assertRaises(ConfigError):
normalize_operation("frobnicate")
def test_ambiguous_dotted_op_fails_closed(self):
# Dotted but neither gitea-prefixed nor an explicit alias: refuse to
# guess which namespace was meant.
with self.assertRaises(ConfigError):
normalize_operation("build.read")
def test_cross_service_name_rejected_by_wrong_service(self):
with self.assertRaises(ConfigError):
normalize_operation("jenkins.read", service="gitea")
with self.assertRaises(ConfigError):
normalize_operation("gitea.read", service="jenkins")
def test_non_gitea_single_word_namespaced_to_service(self):
self.assertEqual(normalize_operation("read", service="jenkins"),
"jenkins.read")
def test_non_gitea_qualified_own_prefix_unchanged(self):
self.assertEqual(
normalize_operation("jenkins.build.read", service="jenkins"),
"jenkins.build.read",
)
def test_empty_and_non_string_fail_closed(self):
for bad in ("", None, 3, ["merge"]):
with self.assertRaises(ConfigError):
normalize_operation(bad)
def test_gitea_alias_not_applied_to_other_services(self):
# "merge" on jenkins must not resolve to the *gitea* merge permission.
self.assertEqual(normalize_operation("merge", service="jenkins"),
"jenkins.merge")
def test_table_is_documented_and_matches_normalization(self):
table = gitea_config.GITEA_OPERATION_ALIASES
self.assertIsInstance(table, dict)
self.assertTrue(table)
for legacy, canonical in table.items():
self.assertEqual(normalize_operation(legacy), canonical)
self.assertTrue(canonical.startswith("gitea."))
# ---------------------------------------------------------------------------
# check_operation — enforcement semantics (normalize BEFORE checking)
# ---------------------------------------------------------------------------
class TestCheckOperation(unittest.TestCase):
def test_fully_qualified_allowed(self):
ok, reason = check_operation("gitea.pr.merge", ["gitea.pr.merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_fully_qualified_forbidden(self):
ok, reason = check_operation(
"gitea.pr.merge", ["gitea.pr.merge"], ["gitea.pr.merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_legacy_unqualified_allowed(self):
ok, reason = check_operation("merge", ["gitea.pr.merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_legacy_unqualified_forbidden(self):
ok, reason = check_operation("merge", ["gitea.pr.merge"], ["merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_unknown_operation_fails_closed(self):
ok, reason = check_operation("frobnicate", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_ambiguous_operation_fails_closed(self):
ok, reason = check_operation("build.read", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_service_mismatch_rejected(self):
ok, reason = check_operation("jenkins.read", ["gitea.read"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-operation")
def test_forbidden_overrides_allowed_across_spellings(self):
# Allowed via legacy spelling, forbidden via canonical spelling: the
# forbidden entry must win after both normalize to the same op.
ok, reason = check_operation("merge", ["merge"], ["gitea.pr.merge"])
self.assertFalse(ok)
self.assertEqual(reason, "forbidden")
def test_empty_allowed_list_denies(self):
ok, reason = check_operation("gitea.read", [])
self.assertFalse(ok)
self.assertEqual(reason, "no-allowed-operations")
def test_missing_allowed_list_denies(self):
ok, reason = check_operation("gitea.read", None)
self.assertFalse(ok)
self.assertEqual(reason, "no-allowed-operations")
def test_duplicates_after_normalization_are_harmless(self):
ok, reason = check_operation(
"merge", ["merge", "gitea.pr.merge", "merge"])
self.assertTrue(ok)
self.assertEqual(reason, "allowed")
def test_unnormalizable_allowed_entry_grants_nothing(self):
# A junk allowed entry must not widen permissions to anything.
ok, reason = check_operation("gitea.read", ["frobnicate"])
self.assertFalse(ok)
self.assertEqual(reason, "not-allowed")
def test_unnormalizable_forbidden_entry_fails_closed(self):
# If a forbidden entry cannot be understood, deny rather than risk
# silently narrowing the forbidden set (which would widen permissions).
ok, reason = check_operation(
"gitea.read", ["gitea.read"], ["frobnicate"])
self.assertFalse(ok)
self.assertEqual(reason, "invalid-forbidden-entry")
# ---------------------------------------------------------------------------
# Eligibility enforcement — normalization happens before checking (#106)
# ---------------------------------------------------------------------------
class TestEligibilityNormalizesOperations(unittest.TestCase):
def _pr(self, author, state="open", sha="abc123", mergeable=True):
return {
"user": {"login": author},
"state": state,
"head": {"sha": sha},
"mergeable": mergeable,
}
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_namespaced_profile_ops_allow_legacy_action(self, _auth, mock_api):
# JSON-config profiles carry canonical namespaced ops; the raw action
# "merge" must still match them after normalization.
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
remote="prgs")
self.assertTrue(r["eligible"])
self.assertNotIn("profile is not allowed to merge", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_namespaced_forbidden_op_blocks_legacy_action(self, _auth, mock_api):
mock_api.side_effect = [{"login": "merger-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-merger",
"GITEA_ALLOWED_OPERATIONS": "gitea.read,gitea.pr.merge",
"GITEA_FORBIDDEN_OPERATIONS": "gitea.pr.merge"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=9, action="merge",
remote="prgs")
self.assertFalse(r["eligible"])
self.assertIn("profile forbids 'merge'", r["reasons"])
@patch("mcp_server.api_request")
@patch("mcp_server.get_auth_header", return_value=FAKE_AUTH)
def test_legacy_env_ops_still_work(self, _auth, mock_api):
# v1/env behaviour stays compatible: unqualified env ops keep working.
mock_api.side_effect = [{"login": "reviewer-bot"}, self._pr("author-bot")]
env = {"GITEA_PROFILE_NAME": "gitea-reviewer",
"GITEA_ALLOWED_OPERATIONS": "read,review,approve"}
with patch.dict(os.environ, env, clear=True):
r = gitea_check_pr_eligibility(pr_number=5, action="review",
remote="prgs")
self.assertTrue(r["eligible"])
if __name__ == "__main__":
unittest.main()