387 lines
13 KiB
Python
387 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Exoscale MCP server — hybrid GET/CLI approach."""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
mcp = FastMCP("exoscale", instructions="Exoscale Swiss cloud API")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
API_KEY = os.environ.get("EXOSCALE_API_KEY", "")
|
|
API_SECRET = os.environ.get("EXOSCALE_API_SECRET", "")
|
|
EXOSCALE_CONFIG = os.environ.get(
|
|
"EXOSCALE_CONFIG", os.path.expanduser("~/.config/exoscale/exoscale.toml")
|
|
)
|
|
|
|
|
|
def _base_url(zone: str) -> str:
|
|
return f"https://api-{zone}.exoscale.com/v2"
|
|
|
|
|
|
def _get_headers() -> dict:
|
|
if not API_KEY or not API_SECRET:
|
|
raise RuntimeError("EXOSCALE_API_KEY and EXOSCALE_API_SECRET must be set")
|
|
return {
|
|
"Exoscale-Api-Key": API_KEY,
|
|
"Exoscale-Api-Secret": API_SECRET,
|
|
}
|
|
|
|
|
|
def _api_get(path: str, zone: str = "ch-gva-2", params: Optional[dict] = None) -> dict:
|
|
url = f"{_base_url(zone)}{path}"
|
|
resp = httpx.get(url, headers=_get_headers(), params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _cli(*args: str, check: bool = True) -> subprocess.CompletedProcess:
|
|
"""Run exo CLI with the configured config file."""
|
|
cmd = ["exo", "--config", EXOSCALE_CONFIG] + list(args)
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if check and result.returncode != 0:
|
|
raise RuntimeError(
|
|
f"exo command failed (exit {result.returncode}):\n"
|
|
f"cmd: {' '.join(cmd)}\n"
|
|
f"stderr: {result.stderr.strip()}\n"
|
|
f"stdout: {result.stdout.strip()}"
|
|
)
|
|
return result
|
|
|
|
|
|
def _cli_json(*args: str) -> object:
|
|
"""Run exo CLI and parse JSON output."""
|
|
result = _cli(*args, "-O", "json")
|
|
if not result.stdout.strip():
|
|
return []
|
|
return json.loads(result.stdout)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Zones & infrastructure
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def list_zones() -> str:
|
|
"""List all available Exoscale zones."""
|
|
data = _cli_json("zone", "list")
|
|
lines = [f" • {z['name']}" for z in data]
|
|
return "Exoscale zones:\n" + "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_instance_types(zone: str = "ch-gva-2") -> str:
|
|
"""List available compute instance types in a zone."""
|
|
data = _api_get("/instance-type", zone=zone)
|
|
items = data.get("instance-types", [])
|
|
groups: dict[str, list] = {}
|
|
for t in items:
|
|
fam = t.get("family", "other")
|
|
groups.setdefault(fam, []).append(t)
|
|
|
|
lines = [f"Instance types in {zone}:\n"]
|
|
for fam, types in sorted(groups.items()):
|
|
lines.append(f" [{fam}]")
|
|
for t in types:
|
|
mem_gb = t.get("memory", 0) / 1024 / 1024 / 1024
|
|
auth = "" if t.get("authorized", True) else " (not authorized)"
|
|
lines.append(
|
|
f" {t.get('family', '')}.{t.get('size', t.get('name', '?'))}"
|
|
f" — {t.get('cpus', '?')} vCPU, {mem_gb:.0f} GB RAM{auth}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_templates(zone: str = "ch-gva-2") -> str:
|
|
"""List available OS templates in a zone."""
|
|
data = _api_get("/template", zone=zone, params={"visibility": "public"})
|
|
items = data.get("templates", [])
|
|
groups: dict[str, list] = {}
|
|
for t in items:
|
|
fam = t.get("family", "other")
|
|
groups.setdefault(fam, []).append(t)
|
|
|
|
lines = [f"Templates in {zone}:\n"]
|
|
for fam, templates in sorted(groups.items()):
|
|
lines.append(f" [{fam}]")
|
|
for t in templates:
|
|
lines.append(f" {t['name']} (id: {t['id'][:8]}...)")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Compute instances
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def list_instances(zone: str = "ch-gva-2") -> str:
|
|
"""List compute instances in a zone."""
|
|
data = _cli_json("compute", "instance", "list", "--zone", zone)
|
|
if not data:
|
|
return f"No instances found in {zone}."
|
|
lines = [f"Instances in {zone}:\n"]
|
|
for inst in data:
|
|
lines.append(
|
|
f" {inst['name']:<30} {inst.get('state', '?'):<10} "
|
|
f"{inst.get('type', '?'):<20} {inst.get('ip_address', '-')}"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_instance(name: str, zone: str = "ch-gva-2") -> str:
|
|
"""Get details for a specific compute instance."""
|
|
data = _cli_json("compute", "instance", "show", name, "--zone", zone)
|
|
if not data:
|
|
return f"Instance '{name}' not found in {zone}."
|
|
|
|
d = data if isinstance(data, dict) else data[0]
|
|
lines = [
|
|
f"Instance: {d.get('name')}",
|
|
f" ID: {d.get('id')}",
|
|
f" Zone: {d.get('zone')}",
|
|
f" State: {d.get('state')}",
|
|
f" Type: {d.get('instance_type')}",
|
|
f" Template: {d.get('template')}",
|
|
f" Disk: {d.get('disk_size')}",
|
|
f" Public IP: {d.get('ip_address', '-')}",
|
|
f" IPv6: {d.get('ipv6_address', '-')}",
|
|
f" SSH key: {d.get('ssh_key', '-')}",
|
|
f" Security grps:{', '.join(d.get('security_groups', [])) or '-'}",
|
|
f" Created: {d.get('creation_date', '-')}",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def create_instance(
|
|
name: str,
|
|
zone: str,
|
|
instance_type: str,
|
|
template: str,
|
|
disk_size: int = 50,
|
|
ssh_key: str = "",
|
|
security_group: str = "default",
|
|
) -> str:
|
|
"""Create a new compute instance.
|
|
|
|
instance_type format: standard.micro, standard.small, cpu.large, etc.
|
|
template: template name (e.g. "Linux Ubuntu 24.04 LTS 64-bit") or ID.
|
|
disk_size: in GiB (default 50).
|
|
ssh_key: name of registered SSH key to deploy.
|
|
security_group: security group name (default: "default").
|
|
"""
|
|
args = [
|
|
"compute", "instance", "create", name,
|
|
"--zone", zone,
|
|
"--instance-type", instance_type,
|
|
"--template", template,
|
|
"--disk-size", str(disk_size),
|
|
"--security-group", security_group,
|
|
]
|
|
if ssh_key:
|
|
args += ["--ssh-key", ssh_key]
|
|
data = _cli_json(*args)
|
|
d = data if isinstance(data, dict) else (data[0] if data else {})
|
|
return (
|
|
f"Instance created:\n"
|
|
f" Name: {d.get('name', name)}\n"
|
|
f" ID: {d.get('id', '?')}\n"
|
|
f" Zone: {d.get('zone', zone)}\n"
|
|
f" Type: {d.get('instance_type', instance_type)}\n"
|
|
f" IP: {d.get('ip_address', 'pending')}\n"
|
|
f" State: {d.get('state', '?')}"
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def destroy_instance(name: str, zone: str = "ch-gva-2") -> str:
|
|
"""Permanently destroy a compute instance (irreversible)."""
|
|
_cli("compute", "instance", "delete", name, "--zone", zone, "--force")
|
|
return f"Instance '{name}' in {zone} has been destroyed."
|
|
|
|
|
|
@mcp.tool()
|
|
def start_instance(name: str, zone: str = "ch-gva-2") -> str:
|
|
"""Start a stopped compute instance."""
|
|
_cli("compute", "instance", "start", name, "--zone", zone, "--force")
|
|
return f"Instance '{name}' in {zone} started."
|
|
|
|
|
|
@mcp.tool()
|
|
def stop_instance(name: str, zone: str = "ch-gva-2") -> str:
|
|
"""Stop a running compute instance."""
|
|
_cli("compute", "instance", "stop", name, "--zone", zone, "--force")
|
|
return f"Instance '{name}' in {zone} stopped."
|
|
|
|
|
|
@mcp.tool()
|
|
def reboot_instance(name: str, zone: str = "ch-gva-2") -> str:
|
|
"""Reboot a compute instance."""
|
|
_cli("compute", "instance", "reboot", name, "--zone", zone, "--force")
|
|
return f"Instance '{name}' in {zone} rebooted."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSH keys
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def list_ssh_keys() -> str:
|
|
"""List registered SSH keys (global, not zone-specific)."""
|
|
data = _cli_json("compute", "ssh-key", "list")
|
|
if not data:
|
|
return "No SSH keys registered."
|
|
lines = ["SSH keys:\n"]
|
|
for key in data:
|
|
lines.append(f" {key['name']:<30} {key.get('fingerprint', '-')}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def create_ssh_key(name: str, public_key_path: str) -> str:
|
|
"""Register a new SSH public key.
|
|
|
|
public_key_path: path to the .pub file on the local machine.
|
|
"""
|
|
expanded = os.path.expanduser(public_key_path)
|
|
if not os.path.exists(expanded):
|
|
return f"Error: file not found: {expanded}"
|
|
data = _cli_json("compute", "ssh-key", "register", name, expanded)
|
|
d = data if isinstance(data, dict) else (data[0] if data else {})
|
|
return (
|
|
f"SSH key registered:\n"
|
|
f" Name: {d.get('name', name)}\n"
|
|
f" Fingerprint: {d.get('fingerprint', '?')}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Security groups
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@mcp.tool()
|
|
def list_security_groups(zone: str = "ch-gva-2") -> str:
|
|
"""List security groups (global, zone param unused but kept for API consistency)."""
|
|
data = _cli_json("compute", "security-group", "list")
|
|
if not data:
|
|
return "No security groups found."
|
|
lines = ["Security groups:\n"]
|
|
for sg in data:
|
|
lines.append(f" {sg.get('id', '?')[:8]}... {sg['name']}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_security_group(name: str, zone: str = "ch-gva-2") -> str:
|
|
"""Get details and rules for a security group."""
|
|
data = _cli_json("compute", "security-group", "show", name)
|
|
d = data if isinstance(data, dict) else (data[0] if data else {})
|
|
if not d:
|
|
return f"Security group '{name}' not found."
|
|
|
|
lines = [
|
|
f"Security group: {d.get('name')}",
|
|
f" ID: {d.get('id')}",
|
|
f" Description: {d.get('description', '-')}",
|
|
]
|
|
|
|
ingress = d.get("ingress_rules", [])
|
|
if ingress:
|
|
lines.append("\n Ingress rules:")
|
|
for r in ingress:
|
|
port = (
|
|
f"{r['start_port']}-{r['end_port']}"
|
|
if r.get("start_port") != r.get("end_port")
|
|
else str(r.get("start_port", ""))
|
|
)
|
|
lines.append(
|
|
f" [{r.get('protocol','?').upper():>5}] "
|
|
f"port {port:<12} from {r.get('network', r.get('security_group', '-')):<20}"
|
|
f" # {r.get('description', '')}"
|
|
)
|
|
else:
|
|
lines.append("\n Ingress rules: (none)")
|
|
|
|
egress = d.get("egress_rules", [])
|
|
if egress:
|
|
lines.append("\n Egress rules:")
|
|
for r in egress:
|
|
port = (
|
|
f"{r['start_port']}-{r['end_port']}"
|
|
if r.get("start_port") != r.get("end_port")
|
|
else str(r.get("start_port", ""))
|
|
)
|
|
lines.append(
|
|
f" [{r.get('protocol','?').upper():>5}] "
|
|
f"port {port:<12} to {r.get('network', r.get('security_group', '-')):<20}"
|
|
f" # {r.get('description', '')}"
|
|
)
|
|
else:
|
|
lines.append("\n Egress rules: (none — all outbound allowed by default)")
|
|
|
|
instances = d.get("instances", [])
|
|
if instances:
|
|
lines.append(f"\n Attached instances ({len(instances)}):")
|
|
for inst in instances:
|
|
lines.append(f" {inst.get('name')} {inst.get('public_ip', '-')}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
@mcp.tool()
|
|
def add_security_group_rule(
|
|
group_name: str,
|
|
protocol: str,
|
|
port: str,
|
|
network: str = "0.0.0.0/0",
|
|
description: str = "",
|
|
zone: str = "ch-gva-2",
|
|
flow: str = "ingress",
|
|
) -> str:
|
|
"""Add a firewall rule to a security group.
|
|
|
|
protocol: tcp, udp, icmp, etc.
|
|
port: single port (e.g. "443") or range (e.g. "8000-8080").
|
|
network: CIDR block (default: 0.0.0.0/0).
|
|
flow: ingress or egress (default: ingress).
|
|
"""
|
|
args = [
|
|
"compute", "security-group", "rule", "add", group_name,
|
|
"--protocol", protocol,
|
|
"--port", port,
|
|
"--network", network,
|
|
"--flow", flow,
|
|
]
|
|
if description:
|
|
args += ["--description", description]
|
|
_cli(*args)
|
|
return (
|
|
f"Rule added to security group '{group_name}':\n"
|
|
f" Flow: {flow}\n"
|
|
f" Protocol: {protocol.upper()}\n"
|
|
f" Port: {port}\n"
|
|
f" Network: {network}\n"
|
|
f" Desc: {description or '(none)'}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entrypoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run()
|