Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src/specify_cli/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
}
)
EXTENSION_COMMAND_NAME_PATTERN = re.compile(r"^speckit\.([a-z0-9-]+)\.([a-z0-9-]+)$")
EXTENSION_ID_PATTERN = re.compile(r"^[a-z0-9-]+$")

VALID_EFFECTS = frozenset({"read-only", "read-write"})

Expand Down Expand Up @@ -201,7 +202,7 @@ def _validate(self):
raise ValidationError(f"Missing extension.{field}")

# Validate extension ID format
if not re.match(r"^[a-z0-9-]+$", ext["id"]):
if not EXTENSION_ID_PATTERN.match(ext["id"]):
raise ValidationError(
f"Invalid extension ID '{ext['id']}': "
"must be lowercase alphanumeric with hyphens only"
Expand Down Expand Up @@ -2572,6 +2573,14 @@ def download_extension(
"""
import urllib.error

if not isinstance(extension_id, str) or not EXTENSION_ID_PATTERN.match(
extension_id
):
raise ExtensionError(
f"Invalid extension ID '{extension_id}': "
"must be lowercase alphanumeric with hyphens only"
)

# Get extension info from catalog
ext_info = self.get_extension_info(extension_id)
if not ext_info:
Expand Down Expand Up @@ -2607,6 +2616,10 @@ def download_extension(
version = ext_info.get("version", "unknown")
zip_filename = f"{extension_id}-{version}.zip"
zip_path = target_dir / zip_filename
try:
zip_path.resolve().relative_to(target_dir.resolve())
Comment on lines 2616 to +2620
except ValueError as e:
raise ExtensionError(f"Refusing unsafe extension ZIP path: {zip_filename}") from e

extra_headers = None
resolved_download_url = self._resolve_github_release_asset_api_url(download_url)
Expand Down
91 changes: 67 additions & 24 deletions src/specify_cli/extensions/_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _resolve_catalog_extension(

# Try by display name - search using argument as query, then filter for exact match
search_results = catalog.search(query=argument)
name_matches = [ext for ext in search_results if ext["name"].lower() == argument.lower()]
name_matches = [ext for ext in search_results if str(ext.get("name", "")).lower() == argument.lower()]

if len(name_matches) == 1:
return (name_matches[0], None)
Expand Down Expand Up @@ -207,22 +207,30 @@ def extension_list(
available: bool = typer.Option(False, "--available", help="Show available extensions from catalog"),
all_extensions: bool = typer.Option(False, "--all", help="Show both installed and available"),
):
"""List installed extensions."""
from . import ExtensionManager
"""List installed extensions, and available catalog extensions with --available/--all."""
from . import ExtensionManager, ExtensionCatalog, ExtensionError

project_root = _require_specify_project()
manager = ExtensionManager(project_root)
installed = manager.list_installed()

if not installed and not (available or all_extensions):
# Default (no flags) lists installed; --all also lists installed.
# --available alone lists only catalog extensions, not installed.
show_installed = all_extensions or not available
show_available = available or all_extensions

if not installed and not show_available:
console.print("[yellow]No extensions installed.[/yellow]")
console.print("\nInstall an extension with:")
console.print(" specify extension add <extension-name>")
return

if installed:
if show_installed:
console.print("\n[bold cyan]Installed Extensions:[/bold cyan]\n")

if not installed:
console.print(" [dim]No extensions installed.[/dim]")
console.print()
for ext in installed:
status_icon = "✓" if ext["enabled"] else "✗"
status_color = "green" if ext["enabled"] else "red"
Expand All @@ -233,9 +241,41 @@ def extension_list(
console.print(f" Commands: {ext['command_count']} | Hooks: {ext['hook_count']} | Priority: {ext['priority']} | Status: {'Enabled' if ext['enabled'] else 'Disabled'}")
console.print()

if available or all_extensions:
console.print("\nInstall an extension:")
console.print(" [cyan]specify extension add <name>[/cyan]")
if show_available:
# Query the catalog and show extensions that are not already installed.
catalog = ExtensionCatalog(project_root)
installed_ids = {ext["id"] for ext in installed}

try:
results = catalog.search()
except ExtensionError as e:
console.print(f"\n[red]Error:[/red] Could not query extension catalog: {_escape_markup(str(e))}")
console.print("[dim]The catalog may be temporarily unavailable. Try again later.[/dim]")
raise typer.Exit(1)

available_exts = [ext for ext in results if ext.get("id") not in installed_ids]

console.print("\n[bold cyan]Available Extensions:[/bold cyan]\n")
if not available_exts:
console.print(" [dim]No additional extensions available in the catalog.[/dim]")
else:
for ext in available_exts:
# Catalog fields are untrusted (remote/community catalogs); escape
# before embedding in Rich markup to prevent markup injection.
safe_id = _escape_markup(str(ext.get("id", "")))
verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else ""
safe_name = _escape_markup(str(ext.get("name", "(unnamed)")))
safe_version = _escape_markup(str(ext.get("version", "?")))
console.print(f" [bold]{safe_name}[/bold] (v{safe_version}){verified_badge}")
console.print(f" [dim]{safe_id}[/dim]")
console.print(f" {_escape_markup(str(ext.get('description', '')))}")
install_allowed = ext.get("_install_allowed", True)
if install_allowed:
console.print(f" [cyan]Install:[/cyan] specify extension add {safe_id}")
else:
catalog_name = _escape_markup(str(ext.get("_catalog_name", "")))
console.print(f" [yellow]Discovery only — not installable from '{catalog_name}'[/yellow]")
console.print()
Comment on lines +262 to +278


@catalog_app.command("list")
Expand Down Expand Up @@ -577,7 +617,7 @@ def extension_add(

# Download extension ZIP (use resolved ID, not original argument which may be display name)
extension_id = ext_info['id']
console.print(f"Downloading {_escape_markup(str(ext_info['name']))} v{_escape_markup(str(ext_info.get('version', 'unknown')))}...")
console.print(f"Downloading {_escape_markup(str(ext_info.get('name', extension_id)))} v{_escape_markup(str(ext_info.get('version', 'unknown')))}...")
zip_path = catalog.download_extension(extension_id)
Comment on lines 618 to 621

try:
Expand Down Expand Up @@ -729,8 +769,8 @@ def extension_search(
for ext in results:
# Extension header
verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else ""
console.print(f"[bold]{_escape_markup(str(ext['name']))}[/bold] (v{_escape_markup(str(ext['version']))}){verified_badge}")
console.print(f" {_escape_markup(str(ext['description']))}")
console.print(f"[bold]{_escape_markup(str(ext.get('name', '(unnamed)')))}[/bold] (v{_escape_markup(str(ext.get('version', '?')))}){verified_badge}")
console.print(f" {_escape_markup(str(ext.get('description', '')))}")
Comment on lines +772 to +773

# Metadata
console.print(f"\n [dim]Author:[/dim] {_escape_markup(str(ext.get('author', 'Unknown')))}")
Expand All @@ -752,7 +792,7 @@ def extension_search(
if ext.get('downloads') is not None:
stats.append(f"Downloads: {ext['downloads']:,}")
if ext.get('stars') is not None:
stats.append(f"Stars: {ext['stars']}")
stats.append(f"Stars: {_escape_markup(str(ext['stars']))}")
if stats:
console.print(f" [dim]{' | '.join(stats)}[/dim]")

Expand Down Expand Up @@ -873,12 +913,12 @@ def _print_extension_info(ext_info: dict, manager):

# Header
verified_badge = " [green]✓ Verified[/green]" if ext_info.get("verified") else ""
console.print(f"\n[bold]{_escape_markup(str(ext_info['name']))}[/bold] (v{_escape_markup(str(ext_info['version']))}){verified_badge}")
console.print(f"\n[bold]{_escape_markup(str(ext_info.get('name', '(unnamed)')))}[/bold] (v{_escape_markup(str(ext_info.get('version', '?')))}){verified_badge}")
console.print(f"ID: {_escape_markup(str(ext_info['id']))}")
console.print()

# Description
console.print(f"{_escape_markup(str(ext_info['description']))}")
console.print(f"{_escape_markup(str(ext_info.get('description', '')))}")
console.print()

# Author and License
Expand All @@ -899,23 +939,26 @@ def _print_extension_info(ext_info: dict, manager):
console.print()

# Requirements
if ext_info.get('requires'):
reqs = ext_info.get('requires')
if isinstance(reqs, dict) and reqs:
console.print("[bold]Requirements:[/bold]")
reqs = ext_info['requires']
if reqs.get('speckit_version'):
console.print(f" • Spec Kit: {_escape_markup(str(reqs['speckit_version']))}")
if reqs.get('tools'):
for tool in reqs['tools']:
tool_name = _escape_markup(str(tool['name']))
tools = reqs.get('tools')
if isinstance(tools, list):
for tool in tools:
if not isinstance(tool, dict):
continue
tool_name = _escape_markup(str(tool.get('name', '(unnamed)')))
tool_version = _escape_markup(str(tool.get('version', 'any')))
required = " (required)" if tool.get('required') else " (optional)"
console.print(f" • {tool_name}: {tool_version}{required}")
console.print()

# Provides
if ext_info.get('provides'):
provides = ext_info.get('provides')
if isinstance(provides, dict) and provides:
console.print("[bold]Provides:[/bold]")
provides = ext_info['provides']
if provides.get('commands'):
console.print(f" • Commands: {_escape_markup(str(provides['commands']))}")
if provides.get('hooks'):
Expand All @@ -933,7 +976,7 @@ def _print_extension_info(ext_info: dict, manager):
if ext_info.get('downloads') is not None:
stats.append(f"Downloads: {ext_info['downloads']:,}")
if ext_info.get('stars') is not None:
stats.append(f"Stars: {ext_info['stars']}")
stats.append(f"Stars: {_escape_markup(str(ext_info['stars']))}")
if stats:
console.print(f"[bold]Statistics:[/bold] {' | '.join(stats)}")
console.print()
Expand Down Expand Up @@ -1040,8 +1083,8 @@ def extension_update(
continue

try:
catalog_version = pkg_version.Version(ext_info["version"])
except pkg_version.InvalidVersion:
catalog_version = pkg_version.Version(str(ext_info["version"]))
except (pkg_version.InvalidVersion, KeyError):
console.print(
f"⚠ {safe_ext_id}: Invalid catalog version '{_escape_markup(str(ext_info.get('version')))}' (skipping)"
)
Expand Down
71 changes: 71 additions & 0 deletions tests/test_extension_add_path_traversal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Security test for `specify extension add <label> --from <url>`.

A label containing path separators (e.g. "../../etc/evil") must not let the
download escape the downloads cache directory. The handler does not let the
label participate in the path at all: it downloads to a generated tempfile
(``extension-url-download-*.zip``) created with ``dir=download_dir``, so the
filename is machine-generated and confined to the downloads cache. This test
asserts the resulting path stays inside that directory and carries no path
separators from the raw label.
"""

import contextlib

import pytest
from typer.testing import CliRunner

from specify_cli import app
from specify_cli.extensions import ExtensionManager

runner = CliRunner()


@pytest.fixture
def project_dir(tmp_path):
proj_dir = tmp_path / "project"
proj_dir.mkdir()
(proj_dir / ".specify").mkdir()
(proj_dir / ".specify" / "config.toml").write_text("ai = 'claude'")
return proj_dir


def test_add_from_url_sanitizes_traversal_label(project_dir, monkeypatch):
monkeypatch.chdir(project_dir)

captured = {}

@contextlib.contextmanager
def fake_open_url(url, timeout=60):
class _Resp:
def read(self):
return b"not-a-real-zip"
yield _Resp()

monkeypatch.setattr("specify_cli.authentication.http.open_url", fake_open_url)

def fake_install_from_zip(self, zip_path, *args, **kwargs):
captured["zip_path"] = zip_path
# Stop the flow before real install/registration runs.
raise RuntimeError("stop after capture")

monkeypatch.setattr(ExtensionManager, "install_from_zip", fake_install_from_zip)

malicious = "../../../etc/evil"
runner.invoke(
app,
["extension", "add", malicious, "--from", "https://example.com/payload.zip"],
obj={"project_root": project_dir},
input="y\n", # confirm the "Untrusted Source" prompt for --from URLs
)

zip_path = captured.get("zip_path")
assert zip_path is not None, "install_from_zip was never reached"

download_dir = (project_dir / ".specify" / "extensions" / ".cache" / "downloads").resolve()
resolved = zip_path.resolve()

# The download must stay inside the downloads cache dir...
assert resolved.parent == download_dir
# ...and the filename must not carry path separators from the raw label.
assert "/" not in zip_path.name
assert ".." not in zip_path.name
Loading