feat: initial Exoscale MCP server — 15 tools for Swiss cloud management

This commit is contained in:
mineracks 2026-04-26 09:34:36 +10:00
commit 18f4576059
5 changed files with 481 additions and 0 deletions

3
.env.example Normal file
View File

@ -0,0 +1,3 @@
EXOSCALE_API_KEY=EXOxxxxx
EXOSCALE_API_SECRET=xxxxx
EXOSCALE_CONFIG=~/.config/exoscale/exoscale.toml

90
README.md Normal file
View File

@ -0,0 +1,90 @@
# exoscale-mcp
MCP server for the Exoscale Swiss cloud platform.
Uses a hybrid approach:
- **GET** operations hit the Exoscale v2 REST API directly with simple header auth.
- **Write** operations (create/delete/start/stop/reboot/rules) shell out to the `exo` CLI,
which handles HMAC-SHA256 request signing correctly.
## Prerequisites
1. **Python 3.11+** and pip
2. **`exo` CLI** installed (`brew install exoscale/tap/exo` or https://community.exoscale.com/documentation/tools/exoscale-command-line-interface/)
and logged in (`exo config add`)
3. An Exoscale API key/secret (IAM → API keys in the console)
## Setup
```bash
cd /tmp/exoscale-mcp
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
## Environment variables
| Variable | Required | Default | Description |
|---|---|---|---|
| `EXOSCALE_API_KEY` | Yes | — | IAM API key (starts with `EXO`) |
| `EXOSCALE_API_SECRET` | Yes | — | IAM API secret |
| `EXOSCALE_CONFIG` | No | `~/.config/exoscale/exoscale.toml` | Path to exo CLI config file |
Copy `.env.example` to `.env` and fill in your values.
## Running standalone
```bash
source .venv/bin/activate
export EXOSCALE_API_KEY=EXO...
export EXOSCALE_API_SECRET=...
python server.py
```
## Claude Code MCP config
Add to `~/.claude/settings.json` (or `settings.local.json`):
```json
{
"mcpServers": {
"exoscale": {
"command": "/tmp/exoscale-mcp/.venv/bin/python",
"args": ["/tmp/exoscale-mcp/server.py"],
"env": {
"EXOSCALE_API_KEY": "EXO...",
"EXOSCALE_API_SECRET": "...",
"EXOSCALE_CONFIG": "/Users/you/.config/exoscale/exoscale.toml"
}
}
}
}
```
## Available tools
| Tool | Method | Description |
|---|---|---|
| `list_zones` | CLI | List all Exoscale zones |
| `list_instance_types` | GET | List instance types in a zone |
| `list_templates` | GET | List OS templates in a zone |
| `list_instances` | CLI | List compute instances in a zone |
| `get_instance` | CLI | Get full details for an instance |
| `create_instance` | CLI | Create a new instance |
| `destroy_instance` | CLI | Permanently destroy an instance |
| `start_instance` | CLI | Start a stopped instance |
| `stop_instance` | CLI | Stop a running instance |
| `reboot_instance` | CLI | Reboot an instance |
| `list_ssh_keys` | CLI | List registered SSH keys |
| `create_ssh_key` | CLI | Register a new SSH public key |
| `list_security_groups` | CLI | List security groups in a zone |
| `get_security_group` | CLI | Get security group details and rules |
| `add_security_group_rule` | CLI | Add an ingress/egress firewall rule |
## Notes
- `destroy_instance` is irreversible. The CLI `--force` flag skips the confirmation prompt.
- SSH keys are global (not zone-scoped) on Exoscale.
- Security group rules support protocols: `tcp`, `udp`, `icmp`, `icmpv6`, `gre`, `esp`, `ah`, `ipip`.
- The `exo` CLI must be authenticated for the same account as the API key.

Binary file not shown.

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
mcp>=1.0.0
httpx>=0.27.0

386
server.py Normal file
View File

@ -0,0 +1,386 @@
#!/usr/bin/env python3
"""Exoscale MCP server — hybrid GET/CLI approach."""
import json
import os
import subprocess
from typing import Optional
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("exoscale", instructions="Exoscale Swiss cloud API")
# ---------------------------------------------------------------------------
# Config helpers
# ---------------------------------------------------------------------------
API_KEY = os.environ.get("EXOSCALE_API_KEY", "")
API_SECRET = os.environ.get("EXOSCALE_API_SECRET", "")
EXOSCALE_CONFIG = os.environ.get(
"EXOSCALE_CONFIG", os.path.expanduser("~/.config/exoscale/exoscale.toml")
)
def _base_url(zone: str) -> str:
return f"https://api-{zone}.exoscale.com/v2"
def _get_headers() -> dict:
if not API_KEY or not API_SECRET:
raise RuntimeError("EXOSCALE_API_KEY and EXOSCALE_API_SECRET must be set")
return {
"Exoscale-Api-Key": API_KEY,
"Exoscale-Api-Secret": API_SECRET,
}
def _api_get(path: str, zone: str = "ch-gva-2", params: Optional[dict] = None) -> dict:
url = f"{_base_url(zone)}{path}"
resp = httpx.get(url, headers=_get_headers(), params=params, timeout=30)
resp.raise_for_status()
return resp.json()
# ---------------------------------------------------------------------------
# CLI helpers
# ---------------------------------------------------------------------------
def _cli(*args: str, check: bool = True) -> subprocess.CompletedProcess:
"""Run exo CLI with the configured config file."""
cmd = ["exo", "--config", EXOSCALE_CONFIG] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True)
if check and result.returncode != 0:
raise RuntimeError(
f"exo command failed (exit {result.returncode}):\n"
f"cmd: {' '.join(cmd)}\n"
f"stderr: {result.stderr.strip()}\n"
f"stdout: {result.stdout.strip()}"
)
return result
def _cli_json(*args: str) -> object:
"""Run exo CLI and parse JSON output."""
result = _cli(*args, "-O", "json")
if not result.stdout.strip():
return []
return json.loads(result.stdout)
# ---------------------------------------------------------------------------
# Zones & infrastructure
# ---------------------------------------------------------------------------
@mcp.tool()
def list_zones() -> str:
"""List all available Exoscale zones."""
data = _cli_json("zone", "list")
lines = [f"{z['name']}" for z in data]
return "Exoscale zones:\n" + "\n".join(lines)
@mcp.tool()
def list_instance_types(zone: str = "ch-gva-2") -> str:
"""List available compute instance types in a zone."""
data = _api_get("/instance-type", zone=zone)
items = data.get("instance-types", [])
groups: dict[str, list] = {}
for t in items:
fam = t.get("family", "other")
groups.setdefault(fam, []).append(t)
lines = [f"Instance types in {zone}:\n"]
for fam, types in sorted(groups.items()):
lines.append(f" [{fam}]")
for t in types:
mem_gb = t.get("memory", 0) / 1024 / 1024 / 1024
auth = "" if t.get("authorized", True) else " (not authorized)"
lines.append(
f" {t.get('family', '')}.{t.get('size', t.get('name', '?'))}"
f"{t.get('cpus', '?')} vCPU, {mem_gb:.0f} GB RAM{auth}"
)
return "\n".join(lines)
@mcp.tool()
def list_templates(zone: str = "ch-gva-2") -> str:
"""List available OS templates in a zone."""
data = _api_get("/template", zone=zone, params={"visibility": "public"})
items = data.get("templates", [])
groups: dict[str, list] = {}
for t in items:
fam = t.get("family", "other")
groups.setdefault(fam, []).append(t)
lines = [f"Templates in {zone}:\n"]
for fam, templates in sorted(groups.items()):
lines.append(f" [{fam}]")
for t in templates:
lines.append(f" {t['name']} (id: {t['id'][:8]}...)")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Compute instances
# ---------------------------------------------------------------------------
@mcp.tool()
def list_instances(zone: str = "ch-gva-2") -> str:
"""List compute instances in a zone."""
data = _cli_json("compute", "instance", "list", "--zone", zone)
if not data:
return f"No instances found in {zone}."
lines = [f"Instances in {zone}:\n"]
for inst in data:
lines.append(
f" {inst['name']:<30} {inst.get('state', '?'):<10} "
f"{inst.get('type', '?'):<20} {inst.get('ip_address', '-')}"
)
return "\n".join(lines)
@mcp.tool()
def get_instance(name: str, zone: str = "ch-gva-2") -> str:
"""Get details for a specific compute instance."""
data = _cli_json("compute", "instance", "show", name, "--zone", zone)
if not data:
return f"Instance '{name}' not found in {zone}."
d = data if isinstance(data, dict) else data[0]
lines = [
f"Instance: {d.get('name')}",
f" ID: {d.get('id')}",
f" Zone: {d.get('zone')}",
f" State: {d.get('state')}",
f" Type: {d.get('instance_type')}",
f" Template: {d.get('template')}",
f" Disk: {d.get('disk_size')}",
f" Public IP: {d.get('ip_address', '-')}",
f" IPv6: {d.get('ipv6_address', '-')}",
f" SSH key: {d.get('ssh_key', '-')}",
f" Security grps:{', '.join(d.get('security_groups', [])) or '-'}",
f" Created: {d.get('creation_date', '-')}",
]
return "\n".join(lines)
@mcp.tool()
def create_instance(
name: str,
zone: str,
instance_type: str,
template: str,
disk_size: int = 50,
ssh_key: str = "",
security_group: str = "default",
) -> str:
"""Create a new compute instance.
instance_type format: standard.micro, standard.small, cpu.large, etc.
template: template name (e.g. "Linux Ubuntu 24.04 LTS 64-bit") or ID.
disk_size: in GiB (default 50).
ssh_key: name of registered SSH key to deploy.
security_group: security group name (default: "default").
"""
args = [
"compute", "instance", "create", name,
"--zone", zone,
"--instance-type", instance_type,
"--template", template,
"--disk-size", str(disk_size),
"--security-group", security_group,
]
if ssh_key:
args += ["--ssh-key", ssh_key]
data = _cli_json(*args)
d = data if isinstance(data, dict) else (data[0] if data else {})
return (
f"Instance created:\n"
f" Name: {d.get('name', name)}\n"
f" ID: {d.get('id', '?')}\n"
f" Zone: {d.get('zone', zone)}\n"
f" Type: {d.get('instance_type', instance_type)}\n"
f" IP: {d.get('ip_address', 'pending')}\n"
f" State: {d.get('state', '?')}"
)
@mcp.tool()
def destroy_instance(name: str, zone: str = "ch-gva-2") -> str:
"""Permanently destroy a compute instance (irreversible)."""
_cli("compute", "instance", "delete", name, "--zone", zone, "--force")
return f"Instance '{name}' in {zone} has been destroyed."
@mcp.tool()
def start_instance(name: str, zone: str = "ch-gva-2") -> str:
"""Start a stopped compute instance."""
_cli("compute", "instance", "start", name, "--zone", zone, "--force")
return f"Instance '{name}' in {zone} started."
@mcp.tool()
def stop_instance(name: str, zone: str = "ch-gva-2") -> str:
"""Stop a running compute instance."""
_cli("compute", "instance", "stop", name, "--zone", zone, "--force")
return f"Instance '{name}' in {zone} stopped."
@mcp.tool()
def reboot_instance(name: str, zone: str = "ch-gva-2") -> str:
"""Reboot a compute instance."""
_cli("compute", "instance", "reboot", name, "--zone", zone, "--force")
return f"Instance '{name}' in {zone} rebooted."
# ---------------------------------------------------------------------------
# SSH keys
# ---------------------------------------------------------------------------
@mcp.tool()
def list_ssh_keys() -> str:
"""List registered SSH keys (global, not zone-specific)."""
data = _cli_json("compute", "ssh-key", "list")
if not data:
return "No SSH keys registered."
lines = ["SSH keys:\n"]
for key in data:
lines.append(f" {key['name']:<30} {key.get('fingerprint', '-')}")
return "\n".join(lines)
@mcp.tool()
def create_ssh_key(name: str, public_key_path: str) -> str:
"""Register a new SSH public key.
public_key_path: path to the .pub file on the local machine.
"""
expanded = os.path.expanduser(public_key_path)
if not os.path.exists(expanded):
return f"Error: file not found: {expanded}"
data = _cli_json("compute", "ssh-key", "register", name, expanded)
d = data if isinstance(data, dict) else (data[0] if data else {})
return (
f"SSH key registered:\n"
f" Name: {d.get('name', name)}\n"
f" Fingerprint: {d.get('fingerprint', '?')}"
)
# ---------------------------------------------------------------------------
# Security groups
# ---------------------------------------------------------------------------
@mcp.tool()
def list_security_groups(zone: str = "ch-gva-2") -> str:
"""List security groups (global, zone param unused but kept for API consistency)."""
data = _cli_json("compute", "security-group", "list")
if not data:
return "No security groups found."
lines = ["Security groups:\n"]
for sg in data:
lines.append(f" {sg.get('id', '?')[:8]}... {sg['name']}")
return "\n".join(lines)
@mcp.tool()
def get_security_group(name: str, zone: str = "ch-gva-2") -> str:
"""Get details and rules for a security group."""
data = _cli_json("compute", "security-group", "show", name)
d = data if isinstance(data, dict) else (data[0] if data else {})
if not d:
return f"Security group '{name}' not found."
lines = [
f"Security group: {d.get('name')}",
f" ID: {d.get('id')}",
f" Description: {d.get('description', '-')}",
]
ingress = d.get("ingress_rules", [])
if ingress:
lines.append("\n Ingress rules:")
for r in ingress:
port = (
f"{r['start_port']}-{r['end_port']}"
if r.get("start_port") != r.get("end_port")
else str(r.get("start_port", ""))
)
lines.append(
f" [{r.get('protocol','?').upper():>5}] "
f"port {port:<12} from {r.get('network', r.get('security_group', '-')):<20}"
f" # {r.get('description', '')}"
)
else:
lines.append("\n Ingress rules: (none)")
egress = d.get("egress_rules", [])
if egress:
lines.append("\n Egress rules:")
for r in egress:
port = (
f"{r['start_port']}-{r['end_port']}"
if r.get("start_port") != r.get("end_port")
else str(r.get("start_port", ""))
)
lines.append(
f" [{r.get('protocol','?').upper():>5}] "
f"port {port:<12} to {r.get('network', r.get('security_group', '-')):<20}"
f" # {r.get('description', '')}"
)
else:
lines.append("\n Egress rules: (none — all outbound allowed by default)")
instances = d.get("instances", [])
if instances:
lines.append(f"\n Attached instances ({len(instances)}):")
for inst in instances:
lines.append(f" {inst.get('name')} {inst.get('public_ip', '-')}")
return "\n".join(lines)
@mcp.tool()
def add_security_group_rule(
group_name: str,
protocol: str,
port: str,
network: str = "0.0.0.0/0",
description: str = "",
zone: str = "ch-gva-2",
flow: str = "ingress",
) -> str:
"""Add a firewall rule to a security group.
protocol: tcp, udp, icmp, etc.
port: single port (e.g. "443") or range (e.g. "8000-8080").
network: CIDR block (default: 0.0.0.0/0).
flow: ingress or egress (default: ingress).
"""
args = [
"compute", "security-group", "rule", "add", group_name,
"--protocol", protocol,
"--port", port,
"--network", network,
"--flow", flow,
]
if description:
args += ["--description", description]
_cli(*args)
return (
f"Rule added to security group '{group_name}':\n"
f" Flow: {flow}\n"
f" Protocol: {protocol.upper()}\n"
f" Port: {port}\n"
f" Network: {network}\n"
f" Desc: {description or '(none)'}"
)
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
mcp.run()