netatmo-mcp/app/server.py
mineracks 9efe05eca0 Initial commit: Netatmo MCP + REST bridge for DC environmental monitoring
Deployed on integr8 VM (pve01/700) at environment.mineracks.com.
Polls Netatmo every 5 min, stores in SQLite, exposes via FastAPI REST + MCP.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 12:14:18 +00:00

277 lines
8.9 KiB
Python

"""
Combined service: FastAPI REST API + MCP server (Streamable HTTP transport),
sharing one process and one Netatmo client.
REST is at /api/...
MCP is at /mcp/ (Streamable HTTP)
Health at /health
Auth: a single bearer token (env NETATMO_MCP_API_KEY) protects both surfaces.
Set it to a long random string and pass it as `Authorization: Bearer <token>`.
"""
from __future__ import annotations
import contextlib
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from fastapi import Depends, FastAPI, Header, HTTPException, Query
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.server import TransportSecuritySettings
from .netatmo import NetatmoClient
from .poller import Poller
from .store import Store
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
log = logging.getLogger(__name__)
# ---- config -----------------------------------------------------------------
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
DATA_DIR.mkdir(parents=True, exist_ok=True)
API_KEY = os.getenv("NETATMO_MCP_API_KEY")
if not API_KEY:
raise RuntimeError("NETATMO_MCP_API_KEY env var is required")
CLIENT_ID = os.environ["NETATMO_CLIENT_ID"]
CLIENT_SECRET = os.environ["NETATMO_CLIENT_SECRET"]
INITIAL_REFRESH = os.environ.get("NETATMO_REFRESH_TOKEN")
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SEC", "300"))
store = Store(DATA_DIR / "netatmo.db")
client = NetatmoClient(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_path=DATA_DIR / "tokens.json",
initial_refresh_token=INITIAL_REFRESH,
)
poller = Poller(client, store, interval_sec=POLL_INTERVAL)
# ---- MCP server -------------------------------------------------------------
# stateless_http=True so the same instance can serve many clients; json_response=True
# returns a single JSON response per call rather than an SSE stream — simpler for
# tools that just return data.
mcp = FastMCP(
name="netatmo-datacentre",
stateless_http=True,
json_response=True,
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=["environment.mineracks.com", "100.90.183.109:8088", "localhost:8088"],
),
)
@mcp.tool()
def list_modules() -> list[dict[str, Any]]:
"""List every Netatmo station and module known to this server, including
module_id (use this for history queries), module_type, and what data_types
each module reports."""
return store.list_modules()
@mcp.tool()
def current_readings(module_id: Optional[str] = None) -> list[dict[str, Any]]:
"""Get the most recent reading for every metric. If module_id is given,
restrict to that one module. Each row has: station_name, module_name,
module_id, metric, value, ts (unix seconds)."""
return store.latest(module_id=module_id)
@mcp.tool()
def history(
module_id: str,
metric: str,
hours: int = 24,
limit: int = 5000,
) -> dict[str, Any]:
"""Return locally-stored historical readings for one metric on one module.
metric: e.g. 'Temperature', 'Humidity', 'CO2', 'Noise', 'Pressure'.
hours: look back this many hours from now (default 24).
limit: max points returned (default 5000).
"""
now = int(time.time())
rows = store.history(module_id, metric, since=now - hours * 3600,
until=now, limit=limit)
agg = store.aggregate(module_id, metric, since=now - hours * 3600, until=now)
return {"module_id": module_id, "metric": metric, "hours": hours,
"points": rows, "summary": agg}
@mcp.tool()
async def history_from_netatmo(
device_id: str,
module_id: Optional[str],
scale: str = "1hour",
metrics: Optional[list[str]] = None,
hours: int = 168,
) -> dict[str, Any]:
"""Fetch history directly from Netatmo's getmeasure endpoint — useful when
you want a longer window than the local store has, or pre-aggregated values.
scale: '30min','1hour','3hours','1day','1week','1month'
metrics: defaults to ['Temperature','Humidity','CO2'] for indoor modules.
"""
metrics = metrics or ["Temperature", "Humidity", "CO2"]
now = int(time.time())
raw = await client.get_measure(
device_id=device_id,
module_id=module_id if module_id and module_id != device_id else None,
scale=scale,
types=metrics,
date_begin=now - hours * 3600,
date_end=now,
)
# Reshape Netatmo's awkward {beg_time, step_time, value: [[v1,v2,...]]} format.
body = raw.get("body", {})
series: list[dict[str, Any]] = []
for ts_str, values in body.items():
try:
ts = int(ts_str)
except ValueError:
continue
point = {"ts": ts}
for i, m in enumerate(metrics):
if i < len(values):
point[m] = values[i]
series.append(point)
series.sort(key=lambda p: p["ts"])
return {"device_id": device_id, "module_id": module_id, "scale": scale,
"metrics": metrics, "points": series}
@mcp.tool()
def list_alerts() -> list[dict[str, Any]]:
"""List all configured threshold alerts and their current state."""
return store.list_alerts()
@mcp.tool()
def create_alert(
name: str, module_id: str, metric: str, op: str, threshold: float,
notes: Optional[str] = None,
) -> dict[str, Any]:
"""Create a threshold alert. op must be one of '>', '<', '>=', '<='.
Example: create_alert('dc_co2_high', 'aa:bb:..', 'CO2', '>', 1000)."""
aid = store.create_alert(name, module_id, metric, op, threshold, notes)
return {"id": aid, "name": name}
@mcp.tool()
def delete_alert(alert_id: int) -> dict[str, Any]:
"""Delete an alert by id."""
return {"deleted": store.delete_alert(alert_id)}
@mcp.tool()
def recent_alert_events(limit: int = 50) -> list[dict[str, Any]]:
"""Recent alert state-change events (triggered / cleared)."""
return store.recent_alert_events(limit=limit)
@mcp.tool()
def poller_status() -> dict[str, Any]:
"""How is the background poller doing? Useful for diagnosing stale data."""
return poller.status()
# ---- FastAPI REST -----------------------------------------------------------
def require_key(authorization: str = Header(default="")) -> None:
if not authorization.startswith("Bearer ") or authorization[7:] != API_KEY:
raise HTTPException(401, "Invalid or missing bearer token")
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
poller.start()
# Critical: MCP session manager must be running for the mounted app to work.
async with mcp.session_manager.run():
yield
await poller.stop()
app = FastAPI(title="Netatmo MCP/API", lifespan=lifespan)
@app.get("/health")
def health() -> dict[str, Any]:
return {"ok": True, "poller": poller.status()}
@app.get("/api/modules", dependencies=[Depends(require_key)])
def api_modules():
return store.list_modules()
@app.get("/api/current", dependencies=[Depends(require_key)])
def api_current(module_id: Optional[str] = None):
return store.latest(module_id=module_id)
@app.get("/api/history", dependencies=[Depends(require_key)])
def api_history(
module_id: str = Query(...),
metric: str = Query(...),
hours: int = Query(24, ge=1, le=24 * 365),
limit: int = Query(5000, ge=1, le=50000),
):
now = int(time.time())
return {
"module_id": module_id, "metric": metric, "hours": hours,
"points": store.history(module_id, metric, since=now - hours * 3600,
until=now, limit=limit),
"summary": store.aggregate(module_id, metric, since=now - hours * 3600,
until=now),
}
@app.get("/api/alerts", dependencies=[Depends(require_key)])
def api_alerts():
return store.list_alerts()
@app.post("/api/alerts", dependencies=[Depends(require_key)])
def api_create_alert(payload: dict[str, Any]):
aid = store.create_alert(
name=payload["name"], module_id=payload["module_id"],
metric=payload["metric"], op=payload["op"],
threshold=float(payload["threshold"]), notes=payload.get("notes"),
)
return {"id": aid}
@app.delete("/api/alerts/{alert_id}", dependencies=[Depends(require_key)])
def api_delete_alert(alert_id: int):
return {"deleted": store.delete_alert(alert_id)}
@app.get("/api/alert-events", dependencies=[Depends(require_key)])
def api_alert_events(limit: int = 50):
return store.recent_alert_events(limit=limit)
# Mount the MCP Streamable HTTP app at /mcp.
# Note: MCP clients should use bearer auth too — we wrap with a tiny middleware.
mcp_app = mcp.streamable_http_app()
@app.middleware("http")
async def mcp_auth(request, call_next):
if request.url.path.startswith("/mcp"):
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer ") or auth[7:] != API_KEY:
from starlette.responses import JSONResponse
return JSONResponse({"error": "unauthorized"}, status_code=401)
return await call_next(request)
app.mount("/mcp", mcp_app)