Deployed on integr8 VM (pve01/700) at environment.mineracks.com. Polls Netatmo every 5 min, stores in SQLite, exposes via FastAPI REST + MCP. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
277 lines
8.9 KiB
Python
277 lines
8.9 KiB
Python
"""
|
|
Combined service: FastAPI REST API + MCP server (Streamable HTTP transport),
|
|
sharing one process and one Netatmo client.
|
|
|
|
REST is at /api/...
|
|
MCP is at /mcp/ (Streamable HTTP)
|
|
Health at /health
|
|
|
|
Auth: a single bearer token (env NETATMO_MCP_API_KEY) protects both surfaces.
|
|
Set it to a long random string and pass it as `Authorization: Bearer <token>`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import logging
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
from fastapi import Depends, FastAPI, Header, HTTPException, Query
|
|
from mcp.server.fastmcp import FastMCP
|
|
from mcp.server.fastmcp.server import TransportSecuritySettings
|
|
|
|
from .netatmo import NetatmoClient
|
|
from .poller import Poller
|
|
from .store import Store
|
|
|
|
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ---- config -----------------------------------------------------------------
|
|
|
|
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
API_KEY = os.getenv("NETATMO_MCP_API_KEY")
|
|
if not API_KEY:
|
|
raise RuntimeError("NETATMO_MCP_API_KEY env var is required")
|
|
|
|
CLIENT_ID = os.environ["NETATMO_CLIENT_ID"]
|
|
CLIENT_SECRET = os.environ["NETATMO_CLIENT_SECRET"]
|
|
INITIAL_REFRESH = os.environ.get("NETATMO_REFRESH_TOKEN")
|
|
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SEC", "300"))
|
|
|
|
store = Store(DATA_DIR / "netatmo.db")
|
|
client = NetatmoClient(
|
|
client_id=CLIENT_ID,
|
|
client_secret=CLIENT_SECRET,
|
|
token_path=DATA_DIR / "tokens.json",
|
|
initial_refresh_token=INITIAL_REFRESH,
|
|
)
|
|
poller = Poller(client, store, interval_sec=POLL_INTERVAL)
|
|
|
|
# ---- MCP server -------------------------------------------------------------
|
|
# stateless_http=True so the same instance can serve many clients; json_response=True
|
|
# returns a single JSON response per call rather than an SSE stream — simpler for
|
|
# tools that just return data.
|
|
|
|
mcp = FastMCP(
|
|
name="netatmo-datacentre",
|
|
stateless_http=True,
|
|
json_response=True,
|
|
transport_security=TransportSecuritySettings(
|
|
enable_dns_rebinding_protection=True,
|
|
allowed_hosts=["environment.mineracks.com", "100.90.183.109:8088", "localhost:8088"],
|
|
),
|
|
)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_modules() -> list[dict[str, Any]]:
|
|
"""List every Netatmo station and module known to this server, including
|
|
module_id (use this for history queries), module_type, and what data_types
|
|
each module reports."""
|
|
return store.list_modules()
|
|
|
|
|
|
@mcp.tool()
|
|
def current_readings(module_id: Optional[str] = None) -> list[dict[str, Any]]:
|
|
"""Get the most recent reading for every metric. If module_id is given,
|
|
restrict to that one module. Each row has: station_name, module_name,
|
|
module_id, metric, value, ts (unix seconds)."""
|
|
return store.latest(module_id=module_id)
|
|
|
|
|
|
@mcp.tool()
|
|
def history(
|
|
module_id: str,
|
|
metric: str,
|
|
hours: int = 24,
|
|
limit: int = 5000,
|
|
) -> dict[str, Any]:
|
|
"""Return locally-stored historical readings for one metric on one module.
|
|
|
|
metric: e.g. 'Temperature', 'Humidity', 'CO2', 'Noise', 'Pressure'.
|
|
hours: look back this many hours from now (default 24).
|
|
limit: max points returned (default 5000).
|
|
"""
|
|
now = int(time.time())
|
|
rows = store.history(module_id, metric, since=now - hours * 3600,
|
|
until=now, limit=limit)
|
|
agg = store.aggregate(module_id, metric, since=now - hours * 3600, until=now)
|
|
return {"module_id": module_id, "metric": metric, "hours": hours,
|
|
"points": rows, "summary": agg}
|
|
|
|
|
|
@mcp.tool()
|
|
async def history_from_netatmo(
|
|
device_id: str,
|
|
module_id: Optional[str],
|
|
scale: str = "1hour",
|
|
metrics: Optional[list[str]] = None,
|
|
hours: int = 168,
|
|
) -> dict[str, Any]:
|
|
"""Fetch history directly from Netatmo's getmeasure endpoint — useful when
|
|
you want a longer window than the local store has, or pre-aggregated values.
|
|
|
|
scale: '30min','1hour','3hours','1day','1week','1month'
|
|
metrics: defaults to ['Temperature','Humidity','CO2'] for indoor modules.
|
|
"""
|
|
metrics = metrics or ["Temperature", "Humidity", "CO2"]
|
|
now = int(time.time())
|
|
raw = await client.get_measure(
|
|
device_id=device_id,
|
|
module_id=module_id if module_id and module_id != device_id else None,
|
|
scale=scale,
|
|
types=metrics,
|
|
date_begin=now - hours * 3600,
|
|
date_end=now,
|
|
)
|
|
# Reshape Netatmo's awkward {beg_time, step_time, value: [[v1,v2,...]]} format.
|
|
body = raw.get("body", {})
|
|
series: list[dict[str, Any]] = []
|
|
for ts_str, values in body.items():
|
|
try:
|
|
ts = int(ts_str)
|
|
except ValueError:
|
|
continue
|
|
point = {"ts": ts}
|
|
for i, m in enumerate(metrics):
|
|
if i < len(values):
|
|
point[m] = values[i]
|
|
series.append(point)
|
|
series.sort(key=lambda p: p["ts"])
|
|
return {"device_id": device_id, "module_id": module_id, "scale": scale,
|
|
"metrics": metrics, "points": series}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_alerts() -> list[dict[str, Any]]:
|
|
"""List all configured threshold alerts and their current state."""
|
|
return store.list_alerts()
|
|
|
|
|
|
@mcp.tool()
|
|
def create_alert(
|
|
name: str, module_id: str, metric: str, op: str, threshold: float,
|
|
notes: Optional[str] = None,
|
|
) -> dict[str, Any]:
|
|
"""Create a threshold alert. op must be one of '>', '<', '>=', '<='.
|
|
Example: create_alert('dc_co2_high', 'aa:bb:..', 'CO2', '>', 1000)."""
|
|
aid = store.create_alert(name, module_id, metric, op, threshold, notes)
|
|
return {"id": aid, "name": name}
|
|
|
|
|
|
@mcp.tool()
|
|
def delete_alert(alert_id: int) -> dict[str, Any]:
|
|
"""Delete an alert by id."""
|
|
return {"deleted": store.delete_alert(alert_id)}
|
|
|
|
|
|
@mcp.tool()
|
|
def recent_alert_events(limit: int = 50) -> list[dict[str, Any]]:
|
|
"""Recent alert state-change events (triggered / cleared)."""
|
|
return store.recent_alert_events(limit=limit)
|
|
|
|
|
|
@mcp.tool()
|
|
def poller_status() -> dict[str, Any]:
|
|
"""How is the background poller doing? Useful for diagnosing stale data."""
|
|
return poller.status()
|
|
|
|
|
|
# ---- FastAPI REST -----------------------------------------------------------
|
|
|
|
def require_key(authorization: str = Header(default="")) -> None:
|
|
if not authorization.startswith("Bearer ") or authorization[7:] != API_KEY:
|
|
raise HTTPException(401, "Invalid or missing bearer token")
|
|
|
|
|
|
@contextlib.asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
poller.start()
|
|
# Critical: MCP session manager must be running for the mounted app to work.
|
|
async with mcp.session_manager.run():
|
|
yield
|
|
await poller.stop()
|
|
|
|
|
|
app = FastAPI(title="Netatmo MCP/API", lifespan=lifespan)
|
|
|
|
|
|
@app.get("/health")
|
|
def health() -> dict[str, Any]:
|
|
return {"ok": True, "poller": poller.status()}
|
|
|
|
|
|
@app.get("/api/modules", dependencies=[Depends(require_key)])
|
|
def api_modules():
|
|
return store.list_modules()
|
|
|
|
|
|
@app.get("/api/current", dependencies=[Depends(require_key)])
|
|
def api_current(module_id: Optional[str] = None):
|
|
return store.latest(module_id=module_id)
|
|
|
|
|
|
@app.get("/api/history", dependencies=[Depends(require_key)])
|
|
def api_history(
|
|
module_id: str = Query(...),
|
|
metric: str = Query(...),
|
|
hours: int = Query(24, ge=1, le=24 * 365),
|
|
limit: int = Query(5000, ge=1, le=50000),
|
|
):
|
|
now = int(time.time())
|
|
return {
|
|
"module_id": module_id, "metric": metric, "hours": hours,
|
|
"points": store.history(module_id, metric, since=now - hours * 3600,
|
|
until=now, limit=limit),
|
|
"summary": store.aggregate(module_id, metric, since=now - hours * 3600,
|
|
until=now),
|
|
}
|
|
|
|
|
|
@app.get("/api/alerts", dependencies=[Depends(require_key)])
|
|
def api_alerts():
|
|
return store.list_alerts()
|
|
|
|
|
|
@app.post("/api/alerts", dependencies=[Depends(require_key)])
|
|
def api_create_alert(payload: dict[str, Any]):
|
|
aid = store.create_alert(
|
|
name=payload["name"], module_id=payload["module_id"],
|
|
metric=payload["metric"], op=payload["op"],
|
|
threshold=float(payload["threshold"]), notes=payload.get("notes"),
|
|
)
|
|
return {"id": aid}
|
|
|
|
|
|
@app.delete("/api/alerts/{alert_id}", dependencies=[Depends(require_key)])
|
|
def api_delete_alert(alert_id: int):
|
|
return {"deleted": store.delete_alert(alert_id)}
|
|
|
|
|
|
@app.get("/api/alert-events", dependencies=[Depends(require_key)])
|
|
def api_alert_events(limit: int = 50):
|
|
return store.recent_alert_events(limit=limit)
|
|
|
|
|
|
# Mount the MCP Streamable HTTP app at /mcp.
|
|
# Note: MCP clients should use bearer auth too — we wrap with a tiny middleware.
|
|
mcp_app = mcp.streamable_http_app()
|
|
|
|
|
|
@app.middleware("http")
|
|
async def mcp_auth(request, call_next):
|
|
if request.url.path.startswith("/mcp"):
|
|
auth = request.headers.get("authorization", "")
|
|
if not auth.startswith("Bearer ") or auth[7:] != API_KEY:
|
|
from starlette.responses import JSONResponse
|
|
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
|
return await call_next(request)
|
|
|
|
|
|
app.mount("/mcp", mcp_app)
|