#!/usr/bin/env python3 """Exoscale MCP server — hybrid GET/CLI approach.""" import json import os import subprocess from typing import Optional import httpx from mcp.server.fastmcp import FastMCP mcp = FastMCP("exoscale", instructions="Exoscale Swiss cloud API") # --------------------------------------------------------------------------- # Config helpers # --------------------------------------------------------------------------- API_KEY = os.environ.get("EXOSCALE_API_KEY", "") API_SECRET = os.environ.get("EXOSCALE_API_SECRET", "") EXOSCALE_CONFIG = os.environ.get( "EXOSCALE_CONFIG", os.path.expanduser("~/.config/exoscale/exoscale.toml") ) def _base_url(zone: str) -> str: return f"https://api-{zone}.exoscale.com/v2" def _get_headers() -> dict: if not API_KEY or not API_SECRET: raise RuntimeError("EXOSCALE_API_KEY and EXOSCALE_API_SECRET must be set") return { "Exoscale-Api-Key": API_KEY, "Exoscale-Api-Secret": API_SECRET, } def _api_get(path: str, zone: str = "ch-gva-2", params: Optional[dict] = None) -> dict: url = f"{_base_url(zone)}{path}" resp = httpx.get(url, headers=_get_headers(), params=params, timeout=30) resp.raise_for_status() return resp.json() # --------------------------------------------------------------------------- # CLI helpers # --------------------------------------------------------------------------- def _cli(*args: str, check: bool = True) -> subprocess.CompletedProcess: """Run exo CLI with the configured config file.""" cmd = ["exo", "--config", EXOSCALE_CONFIG] + list(args) result = subprocess.run(cmd, capture_output=True, text=True) if check and result.returncode != 0: raise RuntimeError( f"exo command failed (exit {result.returncode}):\n" f"cmd: {' '.join(cmd)}\n" f"stderr: {result.stderr.strip()}\n" f"stdout: {result.stdout.strip()}" ) return result def _cli_json(*args: str) -> object: """Run exo CLI and parse JSON output.""" result = _cli(*args, "-O", "json") if not result.stdout.strip(): return [] return json.loads(result.stdout) # --------------------------------------------------------------------------- # Zones & infrastructure # --------------------------------------------------------------------------- @mcp.tool() def list_zones() -> str: """List all available Exoscale zones.""" data = _cli_json("zone", "list") lines = [f" • {z['name']}" for z in data] return "Exoscale zones:\n" + "\n".join(lines) @mcp.tool() def list_instance_types(zone: str = "ch-gva-2") -> str: """List available compute instance types in a zone.""" data = _api_get("/instance-type", zone=zone) items = data.get("instance-types", []) groups: dict[str, list] = {} for t in items: fam = t.get("family", "other") groups.setdefault(fam, []).append(t) lines = [f"Instance types in {zone}:\n"] for fam, types in sorted(groups.items()): lines.append(f" [{fam}]") for t in types: mem_gb = t.get("memory", 0) / 1024 / 1024 / 1024 auth = "" if t.get("authorized", True) else " (not authorized)" lines.append( f" {t.get('family', '')}.{t.get('size', t.get('name', '?'))}" f" — {t.get('cpus', '?')} vCPU, {mem_gb:.0f} GB RAM{auth}" ) return "\n".join(lines) @mcp.tool() def list_templates(zone: str = "ch-gva-2") -> str: """List available OS templates in a zone.""" data = _api_get("/template", zone=zone, params={"visibility": "public"}) items = data.get("templates", []) groups: dict[str, list] = {} for t in items: fam = t.get("family", "other") groups.setdefault(fam, []).append(t) lines = [f"Templates in {zone}:\n"] for fam, templates in sorted(groups.items()): lines.append(f" [{fam}]") for t in templates: lines.append(f" {t['name']} (id: {t['id'][:8]}...)") return "\n".join(lines) # --------------------------------------------------------------------------- # Compute instances # --------------------------------------------------------------------------- @mcp.tool() def list_instances(zone: str = "ch-gva-2") -> str: """List compute instances in a zone.""" data = _cli_json("compute", "instance", "list", "--zone", zone) if not data: return f"No instances found in {zone}." lines = [f"Instances in {zone}:\n"] for inst in data: lines.append( f" {inst['name']:<30} {inst.get('state', '?'):<10} " f"{inst.get('type', '?'):<20} {inst.get('ip_address', '-')}" ) return "\n".join(lines) @mcp.tool() def get_instance(name: str, zone: str = "ch-gva-2") -> str: """Get details for a specific compute instance.""" data = _cli_json("compute", "instance", "show", name, "--zone", zone) if not data: return f"Instance '{name}' not found in {zone}." d = data if isinstance(data, dict) else data[0] lines = [ f"Instance: {d.get('name')}", f" ID: {d.get('id')}", f" Zone: {d.get('zone')}", f" State: {d.get('state')}", f" Type: {d.get('instance_type')}", f" Template: {d.get('template')}", f" Disk: {d.get('disk_size')}", f" Public IP: {d.get('ip_address', '-')}", f" IPv6: {d.get('ipv6_address', '-')}", f" SSH key: {d.get('ssh_key', '-')}", f" Security grps:{', '.join(d.get('security_groups', [])) or '-'}", f" Created: {d.get('creation_date', '-')}", ] return "\n".join(lines) @mcp.tool() def create_instance( name: str, zone: str, instance_type: str, template: str, disk_size: int = 50, ssh_key: str = "", security_group: str = "default", ) -> str: """Create a new compute instance. instance_type format: standard.micro, standard.small, cpu.large, etc. template: template name (e.g. "Linux Ubuntu 24.04 LTS 64-bit") or ID. disk_size: in GiB (default 50). ssh_key: name of registered SSH key to deploy. security_group: security group name (default: "default"). """ args = [ "compute", "instance", "create", name, "--zone", zone, "--instance-type", instance_type, "--template", template, "--disk-size", str(disk_size), "--security-group", security_group, ] if ssh_key: args += ["--ssh-key", ssh_key] data = _cli_json(*args) d = data if isinstance(data, dict) else (data[0] if data else {}) return ( f"Instance created:\n" f" Name: {d.get('name', name)}\n" f" ID: {d.get('id', '?')}\n" f" Zone: {d.get('zone', zone)}\n" f" Type: {d.get('instance_type', instance_type)}\n" f" IP: {d.get('ip_address', 'pending')}\n" f" State: {d.get('state', '?')}" ) @mcp.tool() def destroy_instance(name: str, zone: str = "ch-gva-2") -> str: """Permanently destroy a compute instance (irreversible).""" _cli("compute", "instance", "delete", name, "--zone", zone, "--force") return f"Instance '{name}' in {zone} has been destroyed." @mcp.tool() def start_instance(name: str, zone: str = "ch-gva-2") -> str: """Start a stopped compute instance.""" _cli("compute", "instance", "start", name, "--zone", zone, "--force") return f"Instance '{name}' in {zone} started." @mcp.tool() def stop_instance(name: str, zone: str = "ch-gva-2") -> str: """Stop a running compute instance.""" _cli("compute", "instance", "stop", name, "--zone", zone, "--force") return f"Instance '{name}' in {zone} stopped." @mcp.tool() def reboot_instance(name: str, zone: str = "ch-gva-2") -> str: """Reboot a compute instance.""" _cli("compute", "instance", "reboot", name, "--zone", zone, "--force") return f"Instance '{name}' in {zone} rebooted." # --------------------------------------------------------------------------- # SSH keys # --------------------------------------------------------------------------- @mcp.tool() def list_ssh_keys() -> str: """List registered SSH keys (global, not zone-specific).""" data = _cli_json("compute", "ssh-key", "list") if not data: return "No SSH keys registered." lines = ["SSH keys:\n"] for key in data: lines.append(f" {key['name']:<30} {key.get('fingerprint', '-')}") return "\n".join(lines) @mcp.tool() def create_ssh_key(name: str, public_key_path: str) -> str: """Register a new SSH public key. public_key_path: path to the .pub file on the local machine. """ expanded = os.path.expanduser(public_key_path) if not os.path.exists(expanded): return f"Error: file not found: {expanded}" data = _cli_json("compute", "ssh-key", "register", name, expanded) d = data if isinstance(data, dict) else (data[0] if data else {}) return ( f"SSH key registered:\n" f" Name: {d.get('name', name)}\n" f" Fingerprint: {d.get('fingerprint', '?')}" ) # --------------------------------------------------------------------------- # Security groups # --------------------------------------------------------------------------- @mcp.tool() def list_security_groups(zone: str = "ch-gva-2") -> str: """List security groups (global, zone param unused but kept for API consistency).""" data = _cli_json("compute", "security-group", "list") if not data: return "No security groups found." lines = ["Security groups:\n"] for sg in data: lines.append(f" {sg.get('id', '?')[:8]}... {sg['name']}") return "\n".join(lines) @mcp.tool() def get_security_group(name: str, zone: str = "ch-gva-2") -> str: """Get details and rules for a security group.""" data = _cli_json("compute", "security-group", "show", name) d = data if isinstance(data, dict) else (data[0] if data else {}) if not d: return f"Security group '{name}' not found." lines = [ f"Security group: {d.get('name')}", f" ID: {d.get('id')}", f" Description: {d.get('description', '-')}", ] ingress = d.get("ingress_rules", []) if ingress: lines.append("\n Ingress rules:") for r in ingress: port = ( f"{r['start_port']}-{r['end_port']}" if r.get("start_port") != r.get("end_port") else str(r.get("start_port", "")) ) lines.append( f" [{r.get('protocol','?').upper():>5}] " f"port {port:<12} from {r.get('network', r.get('security_group', '-')):<20}" f" # {r.get('description', '')}" ) else: lines.append("\n Ingress rules: (none)") egress = d.get("egress_rules", []) if egress: lines.append("\n Egress rules:") for r in egress: port = ( f"{r['start_port']}-{r['end_port']}" if r.get("start_port") != r.get("end_port") else str(r.get("start_port", "")) ) lines.append( f" [{r.get('protocol','?').upper():>5}] " f"port {port:<12} to {r.get('network', r.get('security_group', '-')):<20}" f" # {r.get('description', '')}" ) else: lines.append("\n Egress rules: (none — all outbound allowed by default)") instances = d.get("instances", []) if instances: lines.append(f"\n Attached instances ({len(instances)}):") for inst in instances: lines.append(f" {inst.get('name')} {inst.get('public_ip', '-')}") return "\n".join(lines) @mcp.tool() def add_security_group_rule( group_name: str, protocol: str, port: str, network: str = "0.0.0.0/0", description: str = "", zone: str = "ch-gva-2", flow: str = "ingress", ) -> str: """Add a firewall rule to a security group. protocol: tcp, udp, icmp, etc. port: single port (e.g. "443") or range (e.g. "8000-8080"). network: CIDR block (default: 0.0.0.0/0). flow: ingress or egress (default: ingress). """ args = [ "compute", "security-group", "rule", "add", group_name, "--protocol", protocol, "--port", port, "--network", network, "--flow", flow, ] if description: args += ["--description", description] _cli(*args) return ( f"Rule added to security group '{group_name}':\n" f" Flow: {flow}\n" f" Protocol: {protocol.upper()}\n" f" Port: {port}\n" f" Network: {network}\n" f" Desc: {description or '(none)'}" ) # --------------------------------------------------------------------------- # Entrypoint # --------------------------------------------------------------------------- if __name__ == "__main__": mcp.run()