commit 9efe05eca075a8955e380b98980e9c93a7da5f72 Author: mineracks Date: Mon Apr 20 12:14:18 2026 +0000 Initial commit: Netatmo MCP + REST bridge for DC environmental monitoring Deployed on integr8 VM (pve01/700) at environment.mineracks.com. Polls Netatmo every 5 min, stores in SQLite, exposes via FastAPI REST + MCP. Co-Authored-By: Claude Opus 4.6 (1M context) diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..41b2fc2 --- /dev/null +++ b/.env.example @@ -0,0 +1,14 @@ +# Copy to .env and fill in. Never commit the real .env. + +# From https://dev.netatmo.com/apps -> your app +NETATMO_CLIENT_ID= +NETATMO_CLIENT_SECRET= + +# Generate ONCE via the app's "Token generator" panel with scope=read_station. +# After first successful refresh, /data/tokens.json takes over and you can +# leave this blank (or remove it) — Netatmo rotates the refresh token on +# every refresh call. +NETATMO_REFRESH_TOKEN= + +# openssl rand -hex 32 +NETATMO_MCP_API_KEY= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8b476c9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +data/ +__pycache__/ +*.pyc +*.db +tokens.json diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b036e72 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /srv + +COPY requirements.txt . +RUN pip install -r requirements.txt + +COPY app ./app + +# Persistent state (sqlite + token cache) lives here. Mount a volume. +VOLUME ["/data"] +ENV DATA_DIR=/data + +EXPOSE 8088 + +CMD ["uvicorn", "app.server:app", "--host", "0.0.0.0", "--port", "8088"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..423c4ff --- /dev/null +++ b/README.md @@ -0,0 +1,161 @@ +# netatmo-mcp + +Self-hosted bridge that exposes a Netatmo Weather Station to Claude (via MCP) +**and** to anything else (via REST), with a local SQLite store for history and +threshold-based alerts. Designed to run as a Docker container on Proxmox. + +## What you get + +One process serving three things on port `8088`: + +| Path | What it is | +|-------------|-----------------------------------------------------------------| +| `/mcp/` | MCP server (Streamable HTTP transport). Add to Claude as a connector. | +| `/api/...` | REST API — same data, for n8n / Grafana / cron / curl. | +| `/health` | Unauthenticated liveness + poller status. | + +A background poller hits Netatmo every 5 minutes (configurable), stores every +new reading in SQLite, and evaluates any alerts you've configured. + +## Capabilities + +- **Current readings** for every station and module (`current_readings` tool / `GET /api/current`) +- **Local historical queries** at native ~5min resolution (`history` tool / `GET /api/history`) +- **Direct Netatmo historical pulls** for longer windows or pre-aggregated scales — 1hour, 1day, 1week, 1month (`history_from_netatmo` tool) +- **Threshold alerts** on any metric — e.g. CO₂ > 1000 ppm, temp > 28 °C (`create_alert` tool / `POST /api/alerts`) +- **Multi-station / multi-module** — handled natively; everything is keyed by `module_id` + +## One-time Netatmo setup + +1. Go to **https://dev.netatmo.com/apps** and create an app. Note the + **client_id** and **client_secret**. +2. On the app page, scroll to **Token generator**. Tick the `read_station` + scope and click "Generate token". Copy the **refresh token** (you don't + need the access token — it only lasts 3 hours). +3. Make sure your Netatmo account has the data centre station added. + +> **Important:** Netatmo rotates the refresh token on every refresh call. This +> service persists the rotated token to `/data/tokens.json` so it survives +> restarts. Don't share that file or the refresh chain breaks. + +## Deploy on Proxmox + +On your Ubuntu VM with Docker: + +```bash +git clone netatmo-mcp && cd netatmo-mcp +cp .env.example .env +# fill in NETATMO_CLIENT_ID, NETATMO_CLIENT_SECRET, NETATMO_REFRESH_TOKEN, +# and NETATMO_MCP_API_KEY=$(openssl rand -hex 32) +docker compose up -d --build +docker compose logs -f +``` + +You should see `Poller starting` and, within a minute or so, `Refreshing +Netatmo access token` followed by the first poll. Hit `http://:8088/health` +to confirm. + +After ~10 minutes there'll be a few rows in the DB and you can query history. + +### Putting it behind your reverse proxy + +For Claude (web/app) connectors you need a **public HTTPS URL**. If you're on +Caddy: + +``` +netatmo.mineracks.ai { + reverse_proxy :8088 +} +``` + +Or stick it on Cloudflare Tunnel — same deal. Bearer auth is enforced at the +app, so exposing it is fine as long as your `NETATMO_MCP_API_KEY` is strong. + +## Connecting Claude + +In **claude.ai → Settings → Connectors → Add custom connector**: + +- **Name:** Netatmo Datacentre +- **URL:** `https://netatmo.mineracks.ai/mcp/` +- **Auth:** Bearer token → paste your `NETATMO_MCP_API_KEY` + +For **Claude Desktop** (`~/Library/Application Support/Claude/claude_desktop_config.json` +on macOS) or Claude Code (`~/.claude/settings.json`): + +```json +{ + "mcpServers": { + "netatmo": { + "transport": { + "type": "streamable-http", + "url": "https://netatmo.mineracks.ai/mcp/", + "headers": { + "Authorization": "Bearer YOUR_NETATMO_MCP_API_KEY" + } + } + } + } +} +``` + +Then ask Claude: "What's the current CO2 in the data centre?" or "Plot the +last 48 hours of temperature for the rack module." + +## Using the REST API + +```bash +KEY=your_api_key +BASE=https://netatmo.mineracks.ai + +# All modules (find your module_id here) +curl -H "Authorization: Bearer $KEY" $BASE/api/modules + +# Current snapshot +curl -H "Authorization: Bearer $KEY" $BASE/api/current + +# Last 24h of CO2 for a specific module +curl -H "Authorization: Bearer $KEY" \ + "$BASE/api/history?module_id=02:00:00:aa:bb:cc&metric=CO2&hours=24" + +# Create an alert: CO2 > 1000 ppm on the rack module +curl -X POST -H "Authorization: Bearer $KEY" -H "Content-Type: application/json" \ + -d '{"name":"dc_co2_high","module_id":"02:00:00:aa:bb:cc","metric":"CO2","op":">","threshold":1000}' \ + $BASE/api/alerts +``` + +## Metrics reference + +The fields exposed depend on module type: + +| Module | Metrics | +|------------------|----------------------------------------------------------------| +| Indoor (NAMain / NAModule4) | `Temperature`, `Humidity`, `CO2`, `Noise`, `Pressure` | +| Outdoor (NAModule1) | `Temperature`, `Humidity` | +| Rain (NAModule3) | `Rain`, `sum_rain_1`, `sum_rain_24` | +| Wind (NAModule2) | `WindStrength`, `WindAngle`, `GustStrength`, `GustAngle` | + +For a data centre you almost certainly care about indoor `Temperature`, +`Humidity`, and `CO2` — the noise sensor is also a surprisingly useful +"is the AC running differently?" canary. + +## Files / state + +- `./data/netatmo.db` — SQLite history + alerts. Back it up. +- `./data/tokens.json` — rotating refresh token. **Do not delete** without + generating a new bootstrap refresh token. + +## Notes & gotchas + +- **Netatmo rate limits:** 50 requests / 10 seconds and 500 / hour per user. + At 5-minute poll interval you use ~12/hour, so you're fine — but don't + drop the interval below 60s. +- **First poll is slow** because it does a token refresh. Subsequent polls + reuse the access token until it nears expiry. +- **History granularity:** the local store records whatever Netatmo returns, + which is roughly one point per metric per ~5 min. For longer windows + (weeks/months) use `history_from_netatmo` with `scale="1day"` to get + Netatmo's pre-aggregated values. +- **Alerts are evaluated each poll**, so worst-case detection latency = + `POLL_INTERVAL_SEC`. To wire alerts to email/Slack/ntfy, point an n8n + workflow at `GET /api/alert-events`, or extend `Poller._tick` to POST + to a webhook. diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/netatmo.py b/app/netatmo.py new file mode 100644 index 0000000..0245a9a --- /dev/null +++ b/app/netatmo.py @@ -0,0 +1,223 @@ +""" +Netatmo Weather Station API client. + +Handles OAuth refresh-token flow (the only sane long-lived auth path) and +exposes the two endpoints we actually need: getstationsdata (live snapshot) +and getmeasure (historical time series). + +Token state is persisted to disk so refreshes survive restarts. +""" +from __future__ import annotations + +import asyncio +import json +import logging +import time +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any + +import httpx + +log = logging.getLogger(__name__) + +NETATMO_BASE = "https://api.netatmo.com" +TOKEN_URL = f"{NETATMO_BASE}/oauth2/token" +STATIONS_URL = f"{NETATMO_BASE}/api/getstationsdata" +MEASURE_URL = f"{NETATMO_BASE}/api/getmeasure" + + +@dataclass +class TokenState: + access_token: str + refresh_token: str + expires_at: float # unix ts + + @classmethod + def load(cls, path: Path) -> "TokenState | None": + if not path.exists(): + return None + try: + return cls(**json.loads(path.read_text())) + except Exception as e: + log.warning("Could not load token state: %s", e) + return None + + def save(self, path: Path) -> None: + path.write_text(json.dumps(asdict(self), indent=2)) + + +class NetatmoClient: + """Thin async client around the Weather Station endpoints.""" + + def __init__( + self, + client_id: str, + client_secret: str, + token_path: Path, + initial_refresh_token: str | None = None, + ) -> None: + self._client_id = client_id + self._client_secret = client_secret + self._token_path = token_path + self._lock = asyncio.Lock() + + state = TokenState.load(token_path) + if state is None: + if not initial_refresh_token: + raise RuntimeError( + f"No token state at {token_path} and no NETATMO_REFRESH_TOKEN provided. " + "Generate one via the dev portal Token Generator and set the env var." + ) + # Bootstrap with whatever we were given; first refresh will fix expiry. + state = TokenState( + access_token="", + refresh_token=initial_refresh_token, + expires_at=0, + ) + state.save(token_path) + self._state = state + + # ---- auth ------------------------------------------------------------- + + async def _refresh(self) -> None: + """Exchange refresh_token for a new access_token. Netatmo rotates the refresh + token on every call — we MUST persist the new one or we'll be locked out.""" + log.info("Refreshing Netatmo access token") + async with httpx.AsyncClient(timeout=20) as c: + r = await c.post( + TOKEN_URL, + data={ + "grant_type": "refresh_token", + "refresh_token": self._state.refresh_token, + "client_id": self._client_id, + "client_secret": self._client_secret, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + if r.status_code != 200: + raise RuntimeError(f"Token refresh failed {r.status_code}: {r.text}") + body = r.json() + self._state = TokenState( + access_token=body["access_token"], + refresh_token=body.get("refresh_token", self._state.refresh_token), + # Refresh 60s early to be safe. + expires_at=time.time() + int(body.get("expires_in", 10800)) - 60, + ) + self._state.save(self._token_path) + + async def _ensure_token(self) -> str: + async with self._lock: + if not self._state.access_token or time.time() >= self._state.expires_at: + await self._refresh() + return self._state.access_token + + async def _get(self, url: str, params: dict[str, Any]) -> dict[str, Any]: + token = await self._ensure_token() + async with httpx.AsyncClient(timeout=20) as c: + r = await c.get( + url, + params=params, + headers={"Authorization": f"Bearer {token}"}, + ) + if r.status_code == 403: + # Token might have been invalidated — force a refresh and retry once. + log.warning("403 from Netatmo; forcing token refresh and retrying") + async with self._lock: + await self._refresh() + token = self._state.access_token + async with httpx.AsyncClient(timeout=20) as c: + r = await c.get( + url, + params=params, + headers={"Authorization": f"Bearer {token}"}, + ) + if r.status_code != 200: + raise RuntimeError(f"Netatmo {url} -> {r.status_code}: {r.text}") + return r.json() + + # ---- public API ------------------------------------------------------- + + async def get_stations_data(self) -> dict[str, Any]: + """Live snapshot: every station + module the account can see, with the + most recent dashboard_data values.""" + return await self._get(STATIONS_URL, {"get_favorites": "false"}) + + async def get_measure( + self, + device_id: str, + module_id: str | None, + scale: str, + types: list[str], + date_begin: int | None = None, + date_end: int | None = None, + limit: int | None = None, + optimize: bool = False, + real_time: bool = True, + ) -> dict[str, Any]: + """Historical time-series. + + scale: '30min','1hour','3hours','1day','1week','1month' (or 'max' for raw 5-min) + types: e.g. ['Temperature','Humidity','CO2','Noise','Pressure', + 'min_temp','max_temp','date_min_temp','date_max_temp'] + """ + params: dict[str, Any] = { + "device_id": device_id, + "scale": scale, + "type": ",".join(types), + "optimize": "true" if optimize else "false", + "real_time": "true" if real_time else "false", + } + if module_id: + params["module_id"] = module_id + if date_begin is not None: + params["date_begin"] = date_begin + if date_end is not None: + params["date_end"] = date_end + if limit is not None: + params["limit"] = limit + return await self._get(MEASURE_URL, params) + + +def flatten_stations(raw: dict[str, Any]) -> list[dict[str, Any]]: + """Reshape getstationsdata into a flat list of {station, module} sensor records. + + Netatmo's response nests modules under devices and puts the indoor module's + readings on the device itself, which is awkward. We normalise it. + """ + out: list[dict[str, Any]] = [] + for dev in raw.get("body", {}).get("devices", []): + station_name = dev.get("station_name") or dev.get("home_name") or dev["_id"] + # The base station is itself a module ("NAMain") with dashboard_data on the device. + out.append({ + "station_id": dev["_id"], + "station_name": station_name, + "module_id": dev["_id"], + "module_name": dev.get("module_name", "Indoor"), + "module_type": dev.get("type", "NAMain"), + "data_types": dev.get("data_type", []), + "last_seen": dev.get("last_status_store"), + "reachable": dev.get("reachable", True), + "battery_percent": None, # base station is mains powered + "rf_status": None, + "wifi_status": dev.get("wifi_status"), + "place": dev.get("place", {}), + "dashboard": dev.get("dashboard_data", {}), + }) + for mod in dev.get("modules", []): + out.append({ + "station_id": dev["_id"], + "station_name": station_name, + "module_id": mod["_id"], + "module_name": mod.get("module_name", mod["_id"]), + "module_type": mod.get("type"), + "data_types": mod.get("data_type", []), + "last_seen": mod.get("last_seen") or mod.get("last_message"), + "reachable": mod.get("reachable", True), + "battery_percent": mod.get("battery_percent"), + "rf_status": mod.get("rf_status"), + "wifi_status": None, + "place": dev.get("place", {}), + "dashboard": mod.get("dashboard_data", {}), + }) + return out diff --git a/app/poller.py b/app/poller.py new file mode 100644 index 0000000..875a57c --- /dev/null +++ b/app/poller.py @@ -0,0 +1,99 @@ +""" +Background poller. Pulls live snapshot every N seconds and persists any +unseen readings. Runs in the same asyncio loop as the FastAPI app. +""" +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from .netatmo import NetatmoClient, flatten_stations +from .store import Store + +log = logging.getLogger(__name__) + +# Metrics we care about — anything else in dashboard_data is metadata. +NUMERIC_METRICS = { + "Temperature", "Humidity", "CO2", "Noise", "Pressure", "AbsolutePressure", + "Rain", "sum_rain_1", "sum_rain_24", "WindStrength", "WindAngle", + "GustStrength", "GustAngle", "min_temp", "max_temp", "health_idx", +} + + +class Poller: + def __init__(self, client: NetatmoClient, store: Store, interval_sec: int = 300): + self.client = client + self.store = store + self.interval = interval_sec + self._task: asyncio.Task | None = None + self._stop = asyncio.Event() + self.last_run_ts: int | None = None + self.last_error: str | None = None + self.last_inserted: int = 0 + + async def _tick(self) -> None: + raw = await self.client.get_stations_data() + modules = flatten_stations(raw) + self.store.upsert_modules(modules) + + rows: list[tuple[str, str, str, int, float]] = [] + latest: dict[tuple[str, str], float] = {} + for m in modules: + d = m.get("dashboard") or {} + ts = d.get("time_utc") + if not ts: + continue + for k, v in d.items(): + if k not in NUMERIC_METRICS: + continue + if not isinstance(v, (int, float)): + continue + rows.append((m["station_id"], m["module_id"], k, int(ts), float(v))) + latest[(m["module_id"], k)] = float(v) + + inserted = self.store.insert_readings(rows) if rows else 0 + self.last_inserted = inserted + + # Evaluate alerts against the freshest snapshot. + import time as _t + fired = self.store.evaluate_alerts(int(_t.time()), latest) + for f in fired: + log.warning("ALERT %s: %s.%s = %s %s %s", + f["name"], f["module_id"], f["metric"], + f["value"], f["op"], f["threshold"]) + + async def _run(self) -> None: + log.info("Poller starting (interval=%ss)", self.interval) + while not self._stop.is_set(): + import time as _t + try: + await self._tick() + self.last_run_ts = int(_t.time()) + self.last_error = None + except Exception as e: + self.last_error = str(e) + log.exception("Poll failed") + try: + await asyncio.wait_for(self._stop.wait(), timeout=self.interval) + except asyncio.TimeoutError: + pass + + def start(self) -> None: + if self._task is None or self._task.done(): + self._stop.clear() + self._task = asyncio.create_task(self._run()) + + async def stop(self) -> None: + self._stop.set() + if self._task: + await self._task + + def status(self) -> dict[str, Any]: + return { + "running": self._task is not None and not self._task.done(), + "interval_sec": self.interval, + "last_run_ts": self.last_run_ts, + "last_inserted": self.last_inserted, + "last_error": self.last_error, + } diff --git a/app/server.py b/app/server.py new file mode 100644 index 0000000..9912a93 --- /dev/null +++ b/app/server.py @@ -0,0 +1,276 @@ +""" +Combined service: FastAPI REST API + MCP server (Streamable HTTP transport), +sharing one process and one Netatmo client. + +REST is at /api/... +MCP is at /mcp/ (Streamable HTTP) +Health at /health + +Auth: a single bearer token (env NETATMO_MCP_API_KEY) protects both surfaces. +Set it to a long random string and pass it as `Authorization: Bearer `. +""" +from __future__ import annotations + +import contextlib +import logging +import os +import time +from pathlib import Path +from typing import Any, Optional + +from fastapi import Depends, FastAPI, Header, HTTPException, Query +from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp.server import TransportSecuritySettings + +from .netatmo import NetatmoClient +from .poller import Poller +from .store import Store + +logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO")) +log = logging.getLogger(__name__) + +# ---- config ----------------------------------------------------------------- + +DATA_DIR = Path(os.getenv("DATA_DIR", "/data")) +DATA_DIR.mkdir(parents=True, exist_ok=True) + +API_KEY = os.getenv("NETATMO_MCP_API_KEY") +if not API_KEY: + raise RuntimeError("NETATMO_MCP_API_KEY env var is required") + +CLIENT_ID = os.environ["NETATMO_CLIENT_ID"] +CLIENT_SECRET = os.environ["NETATMO_CLIENT_SECRET"] +INITIAL_REFRESH = os.environ.get("NETATMO_REFRESH_TOKEN") +POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SEC", "300")) + +store = Store(DATA_DIR / "netatmo.db") +client = NetatmoClient( + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET, + token_path=DATA_DIR / "tokens.json", + initial_refresh_token=INITIAL_REFRESH, +) +poller = Poller(client, store, interval_sec=POLL_INTERVAL) + +# ---- MCP server ------------------------------------------------------------- +# stateless_http=True so the same instance can serve many clients; json_response=True +# returns a single JSON response per call rather than an SSE stream — simpler for +# tools that just return data. + +mcp = FastMCP( + name="netatmo-datacentre", + stateless_http=True, + json_response=True, + transport_security=TransportSecuritySettings( + enable_dns_rebinding_protection=True, + allowed_hosts=["environment.mineracks.com", "100.90.183.109:8088", "localhost:8088"], + ), +) + + +@mcp.tool() +def list_modules() -> list[dict[str, Any]]: + """List every Netatmo station and module known to this server, including + module_id (use this for history queries), module_type, and what data_types + each module reports.""" + return store.list_modules() + + +@mcp.tool() +def current_readings(module_id: Optional[str] = None) -> list[dict[str, Any]]: + """Get the most recent reading for every metric. If module_id is given, + restrict to that one module. Each row has: station_name, module_name, + module_id, metric, value, ts (unix seconds).""" + return store.latest(module_id=module_id) + + +@mcp.tool() +def history( + module_id: str, + metric: str, + hours: int = 24, + limit: int = 5000, +) -> dict[str, Any]: + """Return locally-stored historical readings for one metric on one module. + + metric: e.g. 'Temperature', 'Humidity', 'CO2', 'Noise', 'Pressure'. + hours: look back this many hours from now (default 24). + limit: max points returned (default 5000). + """ + now = int(time.time()) + rows = store.history(module_id, metric, since=now - hours * 3600, + until=now, limit=limit) + agg = store.aggregate(module_id, metric, since=now - hours * 3600, until=now) + return {"module_id": module_id, "metric": metric, "hours": hours, + "points": rows, "summary": agg} + + +@mcp.tool() +async def history_from_netatmo( + device_id: str, + module_id: Optional[str], + scale: str = "1hour", + metrics: Optional[list[str]] = None, + hours: int = 168, +) -> dict[str, Any]: + """Fetch history directly from Netatmo's getmeasure endpoint — useful when + you want a longer window than the local store has, or pre-aggregated values. + + scale: '30min','1hour','3hours','1day','1week','1month' + metrics: defaults to ['Temperature','Humidity','CO2'] for indoor modules. + """ + metrics = metrics or ["Temperature", "Humidity", "CO2"] + now = int(time.time()) + raw = await client.get_measure( + device_id=device_id, + module_id=module_id if module_id and module_id != device_id else None, + scale=scale, + types=metrics, + date_begin=now - hours * 3600, + date_end=now, + ) + # Reshape Netatmo's awkward {beg_time, step_time, value: [[v1,v2,...]]} format. + body = raw.get("body", {}) + series: list[dict[str, Any]] = [] + for ts_str, values in body.items(): + try: + ts = int(ts_str) + except ValueError: + continue + point = {"ts": ts} + for i, m in enumerate(metrics): + if i < len(values): + point[m] = values[i] + series.append(point) + series.sort(key=lambda p: p["ts"]) + return {"device_id": device_id, "module_id": module_id, "scale": scale, + "metrics": metrics, "points": series} + + +@mcp.tool() +def list_alerts() -> list[dict[str, Any]]: + """List all configured threshold alerts and their current state.""" + return store.list_alerts() + + +@mcp.tool() +def create_alert( + name: str, module_id: str, metric: str, op: str, threshold: float, + notes: Optional[str] = None, +) -> dict[str, Any]: + """Create a threshold alert. op must be one of '>', '<', '>=', '<='. + Example: create_alert('dc_co2_high', 'aa:bb:..', 'CO2', '>', 1000).""" + aid = store.create_alert(name, module_id, metric, op, threshold, notes) + return {"id": aid, "name": name} + + +@mcp.tool() +def delete_alert(alert_id: int) -> dict[str, Any]: + """Delete an alert by id.""" + return {"deleted": store.delete_alert(alert_id)} + + +@mcp.tool() +def recent_alert_events(limit: int = 50) -> list[dict[str, Any]]: + """Recent alert state-change events (triggered / cleared).""" + return store.recent_alert_events(limit=limit) + + +@mcp.tool() +def poller_status() -> dict[str, Any]: + """How is the background poller doing? Useful for diagnosing stale data.""" + return poller.status() + + +# ---- FastAPI REST ----------------------------------------------------------- + +def require_key(authorization: str = Header(default="")) -> None: + if not authorization.startswith("Bearer ") or authorization[7:] != API_KEY: + raise HTTPException(401, "Invalid or missing bearer token") + + +@contextlib.asynccontextmanager +async def lifespan(app: FastAPI): + poller.start() + # Critical: MCP session manager must be running for the mounted app to work. + async with mcp.session_manager.run(): + yield + await poller.stop() + + +app = FastAPI(title="Netatmo MCP/API", lifespan=lifespan) + + +@app.get("/health") +def health() -> dict[str, Any]: + return {"ok": True, "poller": poller.status()} + + +@app.get("/api/modules", dependencies=[Depends(require_key)]) +def api_modules(): + return store.list_modules() + + +@app.get("/api/current", dependencies=[Depends(require_key)]) +def api_current(module_id: Optional[str] = None): + return store.latest(module_id=module_id) + + +@app.get("/api/history", dependencies=[Depends(require_key)]) +def api_history( + module_id: str = Query(...), + metric: str = Query(...), + hours: int = Query(24, ge=1, le=24 * 365), + limit: int = Query(5000, ge=1, le=50000), +): + now = int(time.time()) + return { + "module_id": module_id, "metric": metric, "hours": hours, + "points": store.history(module_id, metric, since=now - hours * 3600, + until=now, limit=limit), + "summary": store.aggregate(module_id, metric, since=now - hours * 3600, + until=now), + } + + +@app.get("/api/alerts", dependencies=[Depends(require_key)]) +def api_alerts(): + return store.list_alerts() + + +@app.post("/api/alerts", dependencies=[Depends(require_key)]) +def api_create_alert(payload: dict[str, Any]): + aid = store.create_alert( + name=payload["name"], module_id=payload["module_id"], + metric=payload["metric"], op=payload["op"], + threshold=float(payload["threshold"]), notes=payload.get("notes"), + ) + return {"id": aid} + + +@app.delete("/api/alerts/{alert_id}", dependencies=[Depends(require_key)]) +def api_delete_alert(alert_id: int): + return {"deleted": store.delete_alert(alert_id)} + + +@app.get("/api/alert-events", dependencies=[Depends(require_key)]) +def api_alert_events(limit: int = 50): + return store.recent_alert_events(limit=limit) + + +# Mount the MCP Streamable HTTP app at /mcp. +# Note: MCP clients should use bearer auth too — we wrap with a tiny middleware. +mcp_app = mcp.streamable_http_app() + + +@app.middleware("http") +async def mcp_auth(request, call_next): + if request.url.path.startswith("/mcp"): + auth = request.headers.get("authorization", "") + if not auth.startswith("Bearer ") or auth[7:] != API_KEY: + from starlette.responses import JSONResponse + return JSONResponse({"error": "unauthorized"}, status_code=401) + return await call_next(request) + + +app.mount("/mcp", mcp_app) diff --git a/app/store.py b/app/store.py new file mode 100644 index 0000000..ddc8ef8 --- /dev/null +++ b/app/store.py @@ -0,0 +1,235 @@ +""" +SQLite-backed store for historical sensor readings and alert state. + +We poll the live snapshot every N seconds (default 5min — Netatmo only updates +that often anyway) and write one row per (station, module, metric, ts). +This gives us our own historical series independent of Netatmo's retention, +plus fast local queries for Claude. +""" +from __future__ import annotations + +import sqlite3 +import threading +import time +from contextlib import contextmanager +from pathlib import Path +from typing import Any, Iterable + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS readings ( + station_id TEXT NOT NULL, + module_id TEXT NOT NULL, + metric TEXT NOT NULL, + ts INTEGER NOT NULL, -- unix seconds + value REAL NOT NULL, + PRIMARY KEY (station_id, module_id, metric, ts) +); +CREATE INDEX IF NOT EXISTS idx_readings_ts ON readings(ts); +CREATE INDEX IF NOT EXISTS idx_readings_lookup ON readings(module_id, metric, ts); + +CREATE TABLE IF NOT EXISTS modules ( + module_id TEXT PRIMARY KEY, + station_id TEXT NOT NULL, + station_name TEXT, + module_name TEXT, + module_type TEXT, + data_types TEXT, -- json array + last_seen INTEGER +); + +CREATE TABLE IF NOT EXISTS alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + module_id TEXT NOT NULL, + metric TEXT NOT NULL, + op TEXT NOT NULL, -- '>', '<', '>=', '<=' + threshold REAL NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + last_triggered INTEGER, + last_value REAL, + notes TEXT +); + +CREATE TABLE IF NOT EXISTS alert_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + alert_id INTEGER NOT NULL, + ts INTEGER NOT NULL, + value REAL NOT NULL, + state TEXT NOT NULL, -- 'triggered' | 'cleared' + FOREIGN KEY(alert_id) REFERENCES alerts(id) +); +""" + + +class Store: + def __init__(self, path: Path) -> None: + self.path = path + self._lock = threading.Lock() + with self._conn() as c: + c.executescript(SCHEMA) + + @contextmanager + def _conn(self): + conn = sqlite3.connect(self.path, timeout=10) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + try: + yield conn + conn.commit() + finally: + conn.close() + + # ---- writes ----------------------------------------------------------- + + def upsert_modules(self, modules: Iterable[dict[str, Any]]) -> None: + import json + with self._lock, self._conn() as c: + for m in modules: + c.execute( + """INSERT INTO modules(module_id, station_id, station_name, + module_name, module_type, data_types, last_seen) + VALUES(?,?,?,?,?,?,?) + ON CONFLICT(module_id) DO UPDATE SET + station_id=excluded.station_id, + station_name=excluded.station_name, + module_name=excluded.module_name, + module_type=excluded.module_type, + data_types=excluded.data_types, + last_seen=excluded.last_seen""", + ( + m["module_id"], m["station_id"], m.get("station_name"), + m.get("module_name"), m.get("module_type"), + json.dumps(m.get("data_types", [])), m.get("last_seen"), + ), + ) + + def insert_readings(self, rows: Iterable[tuple[str, str, str, int, float]]) -> int: + """rows: (station_id, module_id, metric, ts, value)""" + with self._lock, self._conn() as c: + cur = c.executemany( + "INSERT OR IGNORE INTO readings VALUES (?,?,?,?,?)", + list(rows), + ) + return cur.rowcount + + # ---- reads ------------------------------------------------------------ + + def list_modules(self) -> list[dict[str, Any]]: + with self._conn() as c: + return [dict(r) for r in c.execute("SELECT * FROM modules ORDER BY station_name, module_name")] + + def latest(self, module_id: str | None = None) -> list[dict[str, Any]]: + sql = """ + SELECT r.station_id, r.module_id, r.metric, r.ts, r.value, + m.station_name, m.module_name + FROM readings r + JOIN modules m ON m.module_id = r.module_id + JOIN (SELECT module_id, metric, MAX(ts) AS mts + FROM readings GROUP BY module_id, metric) lx + ON lx.module_id = r.module_id AND lx.metric = r.metric AND lx.mts = r.ts + """ + params: list[Any] = [] + if module_id: + sql += " WHERE r.module_id = ?" + params.append(module_id) + sql += " ORDER BY m.station_name, m.module_name, r.metric" + with self._conn() as c: + return [dict(r) for r in c.execute(sql, params)] + + def history( + self, module_id: str, metric: str, since: int, until: int | None = None, + limit: int = 5000, + ) -> list[dict[str, Any]]: + until = until or int(time.time()) + with self._conn() as c: + rows = c.execute( + """SELECT ts, value FROM readings + WHERE module_id=? AND metric=? AND ts BETWEEN ? AND ? + ORDER BY ts ASC LIMIT ?""", + (module_id, metric, since, until, limit), + ) + return [dict(r) for r in rows] + + def aggregate( + self, module_id: str, metric: str, since: int, until: int | None = None, + ) -> dict[str, float | int | None]: + until = until or int(time.time()) + with self._conn() as c: + r = c.execute( + """SELECT COUNT(*) n, MIN(value) lo, MAX(value) hi, AVG(value) avg + FROM readings WHERE module_id=? AND metric=? AND ts BETWEEN ? AND ?""", + (module_id, metric, since, until), + ).fetchone() + return {"count": r["n"], "min": r["lo"], "max": r["hi"], "avg": r["avg"]} + + # ---- alerts ----------------------------------------------------------- + + def create_alert( + self, name: str, module_id: str, metric: str, op: str, threshold: float, + notes: str | None = None, + ) -> int: + if op not in (">", "<", ">=", "<="): + raise ValueError(f"Invalid operator {op!r}") + with self._lock, self._conn() as c: + cur = c.execute( + """INSERT INTO alerts(name, module_id, metric, op, threshold, notes) + VALUES(?,?,?,?,?,?)""", + (name, module_id, metric, op, threshold, notes), + ) + return cur.lastrowid + + def list_alerts(self) -> list[dict[str, Any]]: + with self._conn() as c: + return [dict(r) for r in c.execute("SELECT * FROM alerts ORDER BY name")] + + def delete_alert(self, alert_id: int) -> bool: + with self._lock, self._conn() as c: + cur = c.execute("DELETE FROM alerts WHERE id=?", (alert_id,)) + return cur.rowcount > 0 + + def evaluate_alerts(self, now: int, latest: dict[tuple[str, str], float]) -> list[dict[str, Any]]: + """Check every enabled alert against the latest readings. + latest is keyed by (module_id, metric) -> value. Returns newly-fired events.""" + fired: list[dict[str, Any]] = [] + with self._lock, self._conn() as c: + alerts = list(c.execute("SELECT * FROM alerts WHERE enabled=1")) + for a in alerts: + v = latest.get((a["module_id"], a["metric"])) + if v is None: + continue + op, t = a["op"], a["threshold"] + triggered = ( + (op == ">" and v > t) or + (op == "<" and v < t) or + (op == ">=" and v >= t) or + (op == "<=" and v <= t) + ) + prev_state = "triggered" if a["last_triggered"] else "cleared" + new_state = "triggered" if triggered else "cleared" + if new_state != prev_state: + c.execute( + "INSERT INTO alert_events(alert_id, ts, value, state) VALUES (?,?,?,?)", + (a["id"], now, v, new_state), + ) + c.execute( + "UPDATE alerts SET last_triggered=?, last_value=? WHERE id=?", + (now if triggered else None, v, a["id"]), + ) + if triggered: + fired.append({ + "alert_id": a["id"], "name": a["name"], + "module_id": a["module_id"], "metric": a["metric"], + "value": v, "op": op, "threshold": t, "ts": now, + }) + return fired + + def recent_alert_events(self, limit: int = 50) -> list[dict[str, Any]]: + with self._conn() as c: + rows = c.execute( + """SELECT e.*, a.name, a.module_id, a.metric, a.op, a.threshold + FROM alert_events e JOIN alerts a ON a.id = e.alert_id + ORDER BY e.ts DESC LIMIT ?""", + (limit,), + ) + return [dict(r) for r in rows] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..980de5b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,27 @@ +services: + netatmo-mcp: + build: . + image: netatmo-mcp:latest + container_name: netatmo-mcp + restart: unless-stopped + ports: + - "8088:8088" + environment: + # --- required --- + NETATMO_CLIENT_ID: "${NETATMO_CLIENT_ID}" + NETATMO_CLIENT_SECRET: "${NETATMO_CLIENT_SECRET}" + # Only required on first boot — afterwards tokens.json takes over. + NETATMO_REFRESH_TOKEN: "${NETATMO_REFRESH_TOKEN:-}" + # Long random string. Used for both REST and MCP auth. + NETATMO_MCP_API_KEY: "${NETATMO_MCP_API_KEY}" + + # --- optional --- + POLL_INTERVAL_SEC: "300" # Netatmo updates roughly every 5 min anyway + LOG_LEVEL: "INFO" + volumes: + - ./data:/data + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8088/health').status==200 else 1)"] + interval: 30s + timeout: 5s + retries: 3 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f67113b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.115 +uvicorn[standard]>=0.32 +httpx>=0.27 +mcp>=1.2 +pydantic>=2.7