Initial commit: Netatmo MCP + REST bridge for DC environmental monitoring

Deployed on integr8 VM (pve01/700) at environment.mineracks.com.
Polls Netatmo every 5 min, stores in SQLite, exposes via FastAPI REST + MCP.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
mineracks 2026-04-20 12:14:18 +00:00
commit 9efe05eca0
11 changed files with 1066 additions and 0 deletions

14
.env.example Normal file
View File

@ -0,0 +1,14 @@
# Copy to .env and fill in. Never commit the real .env.
# From https://dev.netatmo.com/apps -> your app
NETATMO_CLIENT_ID=
NETATMO_CLIENT_SECRET=
# Generate ONCE via the app's "Token generator" panel with scope=read_station.
# After first successful refresh, /data/tokens.json takes over and you can
# leave this blank (or remove it) — Netatmo rotates the refresh token on
# every refresh call.
NETATMO_REFRESH_TOKEN=
# openssl rand -hex 32
NETATMO_MCP_API_KEY=

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.env
data/
__pycache__/
*.pyc
*.db
tokens.json

20
Dockerfile Normal file
View File

@ -0,0 +1,20 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /srv
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY app ./app
# Persistent state (sqlite + token cache) lives here. Mount a volume.
VOLUME ["/data"]
ENV DATA_DIR=/data
EXPOSE 8088
CMD ["uvicorn", "app.server:app", "--host", "0.0.0.0", "--port", "8088"]

161
README.md Normal file
View File

@ -0,0 +1,161 @@
# netatmo-mcp
Self-hosted bridge that exposes a Netatmo Weather Station to Claude (via MCP)
**and** to anything else (via REST), with a local SQLite store for history and
threshold-based alerts. Designed to run as a Docker container on Proxmox.
## What you get
One process serving three things on port `8088`:
| Path | What it is |
|-------------|-----------------------------------------------------------------|
| `/mcp/` | MCP server (Streamable HTTP transport). Add to Claude as a connector. |
| `/api/...` | REST API — same data, for n8n / Grafana / cron / curl. |
| `/health` | Unauthenticated liveness + poller status. |
A background poller hits Netatmo every 5 minutes (configurable), stores every
new reading in SQLite, and evaluates any alerts you've configured.
## Capabilities
- **Current readings** for every station and module (`current_readings` tool / `GET /api/current`)
- **Local historical queries** at native ~5min resolution (`history` tool / `GET /api/history`)
- **Direct Netatmo historical pulls** for longer windows or pre-aggregated scales — 1hour, 1day, 1week, 1month (`history_from_netatmo` tool)
- **Threshold alerts** on any metric — e.g. CO₂ > 1000 ppm, temp > 28 °C (`create_alert` tool / `POST /api/alerts`)
- **Multi-station / multi-module** — handled natively; everything is keyed by `module_id`
## One-time Netatmo setup
1. Go to **https://dev.netatmo.com/apps** and create an app. Note the
**client_id** and **client_secret**.
2. On the app page, scroll to **Token generator**. Tick the `read_station`
scope and click "Generate token". Copy the **refresh token** (you don't
need the access token — it only lasts 3 hours).
3. Make sure your Netatmo account has the data centre station added.
> **Important:** Netatmo rotates the refresh token on every refresh call. This
> service persists the rotated token to `/data/tokens.json` so it survives
> restarts. Don't share that file or the refresh chain breaks.
## Deploy on Proxmox
On your Ubuntu VM with Docker:
```bash
git clone <this repo> netatmo-mcp && cd netatmo-mcp
cp .env.example .env
# fill in NETATMO_CLIENT_ID, NETATMO_CLIENT_SECRET, NETATMO_REFRESH_TOKEN,
# and NETATMO_MCP_API_KEY=$(openssl rand -hex 32)
docker compose up -d --build
docker compose logs -f
```
You should see `Poller starting` and, within a minute or so, `Refreshing
Netatmo access token` followed by the first poll. Hit `http://<vm>:8088/health`
to confirm.
After ~10 minutes there'll be a few rows in the DB and you can query history.
### Putting it behind your reverse proxy
For Claude (web/app) connectors you need a **public HTTPS URL**. If you're on
Caddy:
```
netatmo.mineracks.ai {
reverse_proxy <vm-ip>:8088
}
```
Or stick it on Cloudflare Tunnel — same deal. Bearer auth is enforced at the
app, so exposing it is fine as long as your `NETATMO_MCP_API_KEY` is strong.
## Connecting Claude
In **claude.ai → Settings → Connectors → Add custom connector**:
- **Name:** Netatmo Datacentre
- **URL:** `https://netatmo.mineracks.ai/mcp/`
- **Auth:** Bearer token → paste your `NETATMO_MCP_API_KEY`
For **Claude Desktop** (`~/Library/Application Support/Claude/claude_desktop_config.json`
on macOS) or Claude Code (`~/.claude/settings.json`):
```json
{
"mcpServers": {
"netatmo": {
"transport": {
"type": "streamable-http",
"url": "https://netatmo.mineracks.ai/mcp/",
"headers": {
"Authorization": "Bearer YOUR_NETATMO_MCP_API_KEY"
}
}
}
}
}
```
Then ask Claude: "What's the current CO2 in the data centre?" or "Plot the
last 48 hours of temperature for the rack module."
## Using the REST API
```bash
KEY=your_api_key
BASE=https://netatmo.mineracks.ai
# All modules (find your module_id here)
curl -H "Authorization: Bearer $KEY" $BASE/api/modules
# Current snapshot
curl -H "Authorization: Bearer $KEY" $BASE/api/current
# Last 24h of CO2 for a specific module
curl -H "Authorization: Bearer $KEY" \
"$BASE/api/history?module_id=02:00:00:aa:bb:cc&metric=CO2&hours=24"
# Create an alert: CO2 > 1000 ppm on the rack module
curl -X POST -H "Authorization: Bearer $KEY" -H "Content-Type: application/json" \
-d '{"name":"dc_co2_high","module_id":"02:00:00:aa:bb:cc","metric":"CO2","op":">","threshold":1000}' \
$BASE/api/alerts
```
## Metrics reference
The fields exposed depend on module type:
| Module | Metrics |
|------------------|----------------------------------------------------------------|
| Indoor (NAMain / NAModule4) | `Temperature`, `Humidity`, `CO2`, `Noise`, `Pressure` |
| Outdoor (NAModule1) | `Temperature`, `Humidity` |
| Rain (NAModule3) | `Rain`, `sum_rain_1`, `sum_rain_24` |
| Wind (NAModule2) | `WindStrength`, `WindAngle`, `GustStrength`, `GustAngle` |
For a data centre you almost certainly care about indoor `Temperature`,
`Humidity`, and `CO2` — the noise sensor is also a surprisingly useful
"is the AC running differently?" canary.
## Files / state
- `./data/netatmo.db` — SQLite history + alerts. Back it up.
- `./data/tokens.json` — rotating refresh token. **Do not delete** without
generating a new bootstrap refresh token.
## Notes & gotchas
- **Netatmo rate limits:** 50 requests / 10 seconds and 500 / hour per user.
At 5-minute poll interval you use ~12/hour, so you're fine — but don't
drop the interval below 60s.
- **First poll is slow** because it does a token refresh. Subsequent polls
reuse the access token until it nears expiry.
- **History granularity:** the local store records whatever Netatmo returns,
which is roughly one point per metric per ~5 min. For longer windows
(weeks/months) use `history_from_netatmo` with `scale="1day"` to get
Netatmo's pre-aggregated values.
- **Alerts are evaluated each poll**, so worst-case detection latency =
`POLL_INTERVAL_SEC`. To wire alerts to email/Slack/ntfy, point an n8n
workflow at `GET /api/alert-events`, or extend `Poller._tick` to POST
to a webhook.

0
app/__init__.py Normal file
View File

223
app/netatmo.py Normal file
View File

@ -0,0 +1,223 @@
"""
Netatmo Weather Station API client.
Handles OAuth refresh-token flow (the only sane long-lived auth path) and
exposes the two endpoints we actually need: getstationsdata (live snapshot)
and getmeasure (historical time series).
Token state is persisted to disk so refreshes survive restarts.
"""
from __future__ import annotations
import asyncio
import json
import logging
import time
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any
import httpx
log = logging.getLogger(__name__)
NETATMO_BASE = "https://api.netatmo.com"
TOKEN_URL = f"{NETATMO_BASE}/oauth2/token"
STATIONS_URL = f"{NETATMO_BASE}/api/getstationsdata"
MEASURE_URL = f"{NETATMO_BASE}/api/getmeasure"
@dataclass
class TokenState:
access_token: str
refresh_token: str
expires_at: float # unix ts
@classmethod
def load(cls, path: Path) -> "TokenState | None":
if not path.exists():
return None
try:
return cls(**json.loads(path.read_text()))
except Exception as e:
log.warning("Could not load token state: %s", e)
return None
def save(self, path: Path) -> None:
path.write_text(json.dumps(asdict(self), indent=2))
class NetatmoClient:
"""Thin async client around the Weather Station endpoints."""
def __init__(
self,
client_id: str,
client_secret: str,
token_path: Path,
initial_refresh_token: str | None = None,
) -> None:
self._client_id = client_id
self._client_secret = client_secret
self._token_path = token_path
self._lock = asyncio.Lock()
state = TokenState.load(token_path)
if state is None:
if not initial_refresh_token:
raise RuntimeError(
f"No token state at {token_path} and no NETATMO_REFRESH_TOKEN provided. "
"Generate one via the dev portal Token Generator and set the env var."
)
# Bootstrap with whatever we were given; first refresh will fix expiry.
state = TokenState(
access_token="",
refresh_token=initial_refresh_token,
expires_at=0,
)
state.save(token_path)
self._state = state
# ---- auth -------------------------------------------------------------
async def _refresh(self) -> None:
"""Exchange refresh_token for a new access_token. Netatmo rotates the refresh
token on every call we MUST persist the new one or we'll be locked out."""
log.info("Refreshing Netatmo access token")
async with httpx.AsyncClient(timeout=20) as c:
r = await c.post(
TOKEN_URL,
data={
"grant_type": "refresh_token",
"refresh_token": self._state.refresh_token,
"client_id": self._client_id,
"client_secret": self._client_secret,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if r.status_code != 200:
raise RuntimeError(f"Token refresh failed {r.status_code}: {r.text}")
body = r.json()
self._state = TokenState(
access_token=body["access_token"],
refresh_token=body.get("refresh_token", self._state.refresh_token),
# Refresh 60s early to be safe.
expires_at=time.time() + int(body.get("expires_in", 10800)) - 60,
)
self._state.save(self._token_path)
async def _ensure_token(self) -> str:
async with self._lock:
if not self._state.access_token or time.time() >= self._state.expires_at:
await self._refresh()
return self._state.access_token
async def _get(self, url: str, params: dict[str, Any]) -> dict[str, Any]:
token = await self._ensure_token()
async with httpx.AsyncClient(timeout=20) as c:
r = await c.get(
url,
params=params,
headers={"Authorization": f"Bearer {token}"},
)
if r.status_code == 403:
# Token might have been invalidated — force a refresh and retry once.
log.warning("403 from Netatmo; forcing token refresh and retrying")
async with self._lock:
await self._refresh()
token = self._state.access_token
async with httpx.AsyncClient(timeout=20) as c:
r = await c.get(
url,
params=params,
headers={"Authorization": f"Bearer {token}"},
)
if r.status_code != 200:
raise RuntimeError(f"Netatmo {url} -> {r.status_code}: {r.text}")
return r.json()
# ---- public API -------------------------------------------------------
async def get_stations_data(self) -> dict[str, Any]:
"""Live snapshot: every station + module the account can see, with the
most recent dashboard_data values."""
return await self._get(STATIONS_URL, {"get_favorites": "false"})
async def get_measure(
self,
device_id: str,
module_id: str | None,
scale: str,
types: list[str],
date_begin: int | None = None,
date_end: int | None = None,
limit: int | None = None,
optimize: bool = False,
real_time: bool = True,
) -> dict[str, Any]:
"""Historical time-series.
scale: '30min','1hour','3hours','1day','1week','1month' (or 'max' for raw 5-min)
types: e.g. ['Temperature','Humidity','CO2','Noise','Pressure',
'min_temp','max_temp','date_min_temp','date_max_temp']
"""
params: dict[str, Any] = {
"device_id": device_id,
"scale": scale,
"type": ",".join(types),
"optimize": "true" if optimize else "false",
"real_time": "true" if real_time else "false",
}
if module_id:
params["module_id"] = module_id
if date_begin is not None:
params["date_begin"] = date_begin
if date_end is not None:
params["date_end"] = date_end
if limit is not None:
params["limit"] = limit
return await self._get(MEASURE_URL, params)
def flatten_stations(raw: dict[str, Any]) -> list[dict[str, Any]]:
"""Reshape getstationsdata into a flat list of {station, module} sensor records.
Netatmo's response nests modules under devices and puts the indoor module's
readings on the device itself, which is awkward. We normalise it.
"""
out: list[dict[str, Any]] = []
for dev in raw.get("body", {}).get("devices", []):
station_name = dev.get("station_name") or dev.get("home_name") or dev["_id"]
# The base station is itself a module ("NAMain") with dashboard_data on the device.
out.append({
"station_id": dev["_id"],
"station_name": station_name,
"module_id": dev["_id"],
"module_name": dev.get("module_name", "Indoor"),
"module_type": dev.get("type", "NAMain"),
"data_types": dev.get("data_type", []),
"last_seen": dev.get("last_status_store"),
"reachable": dev.get("reachable", True),
"battery_percent": None, # base station is mains powered
"rf_status": None,
"wifi_status": dev.get("wifi_status"),
"place": dev.get("place", {}),
"dashboard": dev.get("dashboard_data", {}),
})
for mod in dev.get("modules", []):
out.append({
"station_id": dev["_id"],
"station_name": station_name,
"module_id": mod["_id"],
"module_name": mod.get("module_name", mod["_id"]),
"module_type": mod.get("type"),
"data_types": mod.get("data_type", []),
"last_seen": mod.get("last_seen") or mod.get("last_message"),
"reachable": mod.get("reachable", True),
"battery_percent": mod.get("battery_percent"),
"rf_status": mod.get("rf_status"),
"wifi_status": None,
"place": dev.get("place", {}),
"dashboard": mod.get("dashboard_data", {}),
})
return out

99
app/poller.py Normal file
View File

@ -0,0 +1,99 @@
"""
Background poller. Pulls live snapshot every N seconds and persists any
unseen readings. Runs in the same asyncio loop as the FastAPI app.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from .netatmo import NetatmoClient, flatten_stations
from .store import Store
log = logging.getLogger(__name__)
# Metrics we care about — anything else in dashboard_data is metadata.
NUMERIC_METRICS = {
"Temperature", "Humidity", "CO2", "Noise", "Pressure", "AbsolutePressure",
"Rain", "sum_rain_1", "sum_rain_24", "WindStrength", "WindAngle",
"GustStrength", "GustAngle", "min_temp", "max_temp", "health_idx",
}
class Poller:
def __init__(self, client: NetatmoClient, store: Store, interval_sec: int = 300):
self.client = client
self.store = store
self.interval = interval_sec
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
self.last_run_ts: int | None = None
self.last_error: str | None = None
self.last_inserted: int = 0
async def _tick(self) -> None:
raw = await self.client.get_stations_data()
modules = flatten_stations(raw)
self.store.upsert_modules(modules)
rows: list[tuple[str, str, str, int, float]] = []
latest: dict[tuple[str, str], float] = {}
for m in modules:
d = m.get("dashboard") or {}
ts = d.get("time_utc")
if not ts:
continue
for k, v in d.items():
if k not in NUMERIC_METRICS:
continue
if not isinstance(v, (int, float)):
continue
rows.append((m["station_id"], m["module_id"], k, int(ts), float(v)))
latest[(m["module_id"], k)] = float(v)
inserted = self.store.insert_readings(rows) if rows else 0
self.last_inserted = inserted
# Evaluate alerts against the freshest snapshot.
import time as _t
fired = self.store.evaluate_alerts(int(_t.time()), latest)
for f in fired:
log.warning("ALERT %s: %s.%s = %s %s %s",
f["name"], f["module_id"], f["metric"],
f["value"], f["op"], f["threshold"])
async def _run(self) -> None:
log.info("Poller starting (interval=%ss)", self.interval)
while not self._stop.is_set():
import time as _t
try:
await self._tick()
self.last_run_ts = int(_t.time())
self.last_error = None
except Exception as e:
self.last_error = str(e)
log.exception("Poll failed")
try:
await asyncio.wait_for(self._stop.wait(), timeout=self.interval)
except asyncio.TimeoutError:
pass
def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
self._stop.set()
if self._task:
await self._task
def status(self) -> dict[str, Any]:
return {
"running": self._task is not None and not self._task.done(),
"interval_sec": self.interval,
"last_run_ts": self.last_run_ts,
"last_inserted": self.last_inserted,
"last_error": self.last_error,
}

276
app/server.py Normal file
View File

@ -0,0 +1,276 @@
"""
Combined service: FastAPI REST API + MCP server (Streamable HTTP transport),
sharing one process and one Netatmo client.
REST is at /api/...
MCP is at /mcp/ (Streamable HTTP)
Health at /health
Auth: a single bearer token (env NETATMO_MCP_API_KEY) protects both surfaces.
Set it to a long random string and pass it as `Authorization: Bearer <token>`.
"""
from __future__ import annotations
import contextlib
import logging
import os
import time
from pathlib import Path
from typing import Any, Optional
from fastapi import Depends, FastAPI, Header, HTTPException, Query
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.server import TransportSecuritySettings
from .netatmo import NetatmoClient
from .poller import Poller
from .store import Store
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
log = logging.getLogger(__name__)
# ---- config -----------------------------------------------------------------
DATA_DIR = Path(os.getenv("DATA_DIR", "/data"))
DATA_DIR.mkdir(parents=True, exist_ok=True)
API_KEY = os.getenv("NETATMO_MCP_API_KEY")
if not API_KEY:
raise RuntimeError("NETATMO_MCP_API_KEY env var is required")
CLIENT_ID = os.environ["NETATMO_CLIENT_ID"]
CLIENT_SECRET = os.environ["NETATMO_CLIENT_SECRET"]
INITIAL_REFRESH = os.environ.get("NETATMO_REFRESH_TOKEN")
POLL_INTERVAL = int(os.getenv("POLL_INTERVAL_SEC", "300"))
store = Store(DATA_DIR / "netatmo.db")
client = NetatmoClient(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
token_path=DATA_DIR / "tokens.json",
initial_refresh_token=INITIAL_REFRESH,
)
poller = Poller(client, store, interval_sec=POLL_INTERVAL)
# ---- MCP server -------------------------------------------------------------
# stateless_http=True so the same instance can serve many clients; json_response=True
# returns a single JSON response per call rather than an SSE stream — simpler for
# tools that just return data.
mcp = FastMCP(
name="netatmo-datacentre",
stateless_http=True,
json_response=True,
transport_security=TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=["environment.mineracks.com", "100.90.183.109:8088", "localhost:8088"],
),
)
@mcp.tool()
def list_modules() -> list[dict[str, Any]]:
"""List every Netatmo station and module known to this server, including
module_id (use this for history queries), module_type, and what data_types
each module reports."""
return store.list_modules()
@mcp.tool()
def current_readings(module_id: Optional[str] = None) -> list[dict[str, Any]]:
"""Get the most recent reading for every metric. If module_id is given,
restrict to that one module. Each row has: station_name, module_name,
module_id, metric, value, ts (unix seconds)."""
return store.latest(module_id=module_id)
@mcp.tool()
def history(
module_id: str,
metric: str,
hours: int = 24,
limit: int = 5000,
) -> dict[str, Any]:
"""Return locally-stored historical readings for one metric on one module.
metric: e.g. 'Temperature', 'Humidity', 'CO2', 'Noise', 'Pressure'.
hours: look back this many hours from now (default 24).
limit: max points returned (default 5000).
"""
now = int(time.time())
rows = store.history(module_id, metric, since=now - hours * 3600,
until=now, limit=limit)
agg = store.aggregate(module_id, metric, since=now - hours * 3600, until=now)
return {"module_id": module_id, "metric": metric, "hours": hours,
"points": rows, "summary": agg}
@mcp.tool()
async def history_from_netatmo(
device_id: str,
module_id: Optional[str],
scale: str = "1hour",
metrics: Optional[list[str]] = None,
hours: int = 168,
) -> dict[str, Any]:
"""Fetch history directly from Netatmo's getmeasure endpoint — useful when
you want a longer window than the local store has, or pre-aggregated values.
scale: '30min','1hour','3hours','1day','1week','1month'
metrics: defaults to ['Temperature','Humidity','CO2'] for indoor modules.
"""
metrics = metrics or ["Temperature", "Humidity", "CO2"]
now = int(time.time())
raw = await client.get_measure(
device_id=device_id,
module_id=module_id if module_id and module_id != device_id else None,
scale=scale,
types=metrics,
date_begin=now - hours * 3600,
date_end=now,
)
# Reshape Netatmo's awkward {beg_time, step_time, value: [[v1,v2,...]]} format.
body = raw.get("body", {})
series: list[dict[str, Any]] = []
for ts_str, values in body.items():
try:
ts = int(ts_str)
except ValueError:
continue
point = {"ts": ts}
for i, m in enumerate(metrics):
if i < len(values):
point[m] = values[i]
series.append(point)
series.sort(key=lambda p: p["ts"])
return {"device_id": device_id, "module_id": module_id, "scale": scale,
"metrics": metrics, "points": series}
@mcp.tool()
def list_alerts() -> list[dict[str, Any]]:
"""List all configured threshold alerts and their current state."""
return store.list_alerts()
@mcp.tool()
def create_alert(
name: str, module_id: str, metric: str, op: str, threshold: float,
notes: Optional[str] = None,
) -> dict[str, Any]:
"""Create a threshold alert. op must be one of '>', '<', '>=', '<='.
Example: create_alert('dc_co2_high', 'aa:bb:..', 'CO2', '>', 1000)."""
aid = store.create_alert(name, module_id, metric, op, threshold, notes)
return {"id": aid, "name": name}
@mcp.tool()
def delete_alert(alert_id: int) -> dict[str, Any]:
"""Delete an alert by id."""
return {"deleted": store.delete_alert(alert_id)}
@mcp.tool()
def recent_alert_events(limit: int = 50) -> list[dict[str, Any]]:
"""Recent alert state-change events (triggered / cleared)."""
return store.recent_alert_events(limit=limit)
@mcp.tool()
def poller_status() -> dict[str, Any]:
"""How is the background poller doing? Useful for diagnosing stale data."""
return poller.status()
# ---- FastAPI REST -----------------------------------------------------------
def require_key(authorization: str = Header(default="")) -> None:
if not authorization.startswith("Bearer ") or authorization[7:] != API_KEY:
raise HTTPException(401, "Invalid or missing bearer token")
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
poller.start()
# Critical: MCP session manager must be running for the mounted app to work.
async with mcp.session_manager.run():
yield
await poller.stop()
app = FastAPI(title="Netatmo MCP/API", lifespan=lifespan)
@app.get("/health")
def health() -> dict[str, Any]:
return {"ok": True, "poller": poller.status()}
@app.get("/api/modules", dependencies=[Depends(require_key)])
def api_modules():
return store.list_modules()
@app.get("/api/current", dependencies=[Depends(require_key)])
def api_current(module_id: Optional[str] = None):
return store.latest(module_id=module_id)
@app.get("/api/history", dependencies=[Depends(require_key)])
def api_history(
module_id: str = Query(...),
metric: str = Query(...),
hours: int = Query(24, ge=1, le=24 * 365),
limit: int = Query(5000, ge=1, le=50000),
):
now = int(time.time())
return {
"module_id": module_id, "metric": metric, "hours": hours,
"points": store.history(module_id, metric, since=now - hours * 3600,
until=now, limit=limit),
"summary": store.aggregate(module_id, metric, since=now - hours * 3600,
until=now),
}
@app.get("/api/alerts", dependencies=[Depends(require_key)])
def api_alerts():
return store.list_alerts()
@app.post("/api/alerts", dependencies=[Depends(require_key)])
def api_create_alert(payload: dict[str, Any]):
aid = store.create_alert(
name=payload["name"], module_id=payload["module_id"],
metric=payload["metric"], op=payload["op"],
threshold=float(payload["threshold"]), notes=payload.get("notes"),
)
return {"id": aid}
@app.delete("/api/alerts/{alert_id}", dependencies=[Depends(require_key)])
def api_delete_alert(alert_id: int):
return {"deleted": store.delete_alert(alert_id)}
@app.get("/api/alert-events", dependencies=[Depends(require_key)])
def api_alert_events(limit: int = 50):
return store.recent_alert_events(limit=limit)
# Mount the MCP Streamable HTTP app at /mcp.
# Note: MCP clients should use bearer auth too — we wrap with a tiny middleware.
mcp_app = mcp.streamable_http_app()
@app.middleware("http")
async def mcp_auth(request, call_next):
if request.url.path.startswith("/mcp"):
auth = request.headers.get("authorization", "")
if not auth.startswith("Bearer ") or auth[7:] != API_KEY:
from starlette.responses import JSONResponse
return JSONResponse({"error": "unauthorized"}, status_code=401)
return await call_next(request)
app.mount("/mcp", mcp_app)

235
app/store.py Normal file
View File

@ -0,0 +1,235 @@
"""
SQLite-backed store for historical sensor readings and alert state.
We poll the live snapshot every N seconds (default 5min Netatmo only updates
that often anyway) and write one row per (station, module, metric, ts).
This gives us our own historical series independent of Netatmo's retention,
plus fast local queries for Claude.
"""
from __future__ import annotations
import sqlite3
import threading
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterable
SCHEMA = """
CREATE TABLE IF NOT EXISTS readings (
station_id TEXT NOT NULL,
module_id TEXT NOT NULL,
metric TEXT NOT NULL,
ts INTEGER NOT NULL, -- unix seconds
value REAL NOT NULL,
PRIMARY KEY (station_id, module_id, metric, ts)
);
CREATE INDEX IF NOT EXISTS idx_readings_ts ON readings(ts);
CREATE INDEX IF NOT EXISTS idx_readings_lookup ON readings(module_id, metric, ts);
CREATE TABLE IF NOT EXISTS modules (
module_id TEXT PRIMARY KEY,
station_id TEXT NOT NULL,
station_name TEXT,
module_name TEXT,
module_type TEXT,
data_types TEXT, -- json array
last_seen INTEGER
);
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
module_id TEXT NOT NULL,
metric TEXT NOT NULL,
op TEXT NOT NULL, -- '>', '<', '>=', '<='
threshold REAL NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
last_triggered INTEGER,
last_value REAL,
notes TEXT
);
CREATE TABLE IF NOT EXISTS alert_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_id INTEGER NOT NULL,
ts INTEGER NOT NULL,
value REAL NOT NULL,
state TEXT NOT NULL, -- 'triggered' | 'cleared'
FOREIGN KEY(alert_id) REFERENCES alerts(id)
);
"""
class Store:
def __init__(self, path: Path) -> None:
self.path = path
self._lock = threading.Lock()
with self._conn() as c:
c.executescript(SCHEMA)
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.path, timeout=10)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
try:
yield conn
conn.commit()
finally:
conn.close()
# ---- writes -----------------------------------------------------------
def upsert_modules(self, modules: Iterable[dict[str, Any]]) -> None:
import json
with self._lock, self._conn() as c:
for m in modules:
c.execute(
"""INSERT INTO modules(module_id, station_id, station_name,
module_name, module_type, data_types, last_seen)
VALUES(?,?,?,?,?,?,?)
ON CONFLICT(module_id) DO UPDATE SET
station_id=excluded.station_id,
station_name=excluded.station_name,
module_name=excluded.module_name,
module_type=excluded.module_type,
data_types=excluded.data_types,
last_seen=excluded.last_seen""",
(
m["module_id"], m["station_id"], m.get("station_name"),
m.get("module_name"), m.get("module_type"),
json.dumps(m.get("data_types", [])), m.get("last_seen"),
),
)
def insert_readings(self, rows: Iterable[tuple[str, str, str, int, float]]) -> int:
"""rows: (station_id, module_id, metric, ts, value)"""
with self._lock, self._conn() as c:
cur = c.executemany(
"INSERT OR IGNORE INTO readings VALUES (?,?,?,?,?)",
list(rows),
)
return cur.rowcount
# ---- reads ------------------------------------------------------------
def list_modules(self) -> list[dict[str, Any]]:
with self._conn() as c:
return [dict(r) for r in c.execute("SELECT * FROM modules ORDER BY station_name, module_name")]
def latest(self, module_id: str | None = None) -> list[dict[str, Any]]:
sql = """
SELECT r.station_id, r.module_id, r.metric, r.ts, r.value,
m.station_name, m.module_name
FROM readings r
JOIN modules m ON m.module_id = r.module_id
JOIN (SELECT module_id, metric, MAX(ts) AS mts
FROM readings GROUP BY module_id, metric) lx
ON lx.module_id = r.module_id AND lx.metric = r.metric AND lx.mts = r.ts
"""
params: list[Any] = []
if module_id:
sql += " WHERE r.module_id = ?"
params.append(module_id)
sql += " ORDER BY m.station_name, m.module_name, r.metric"
with self._conn() as c:
return [dict(r) for r in c.execute(sql, params)]
def history(
self, module_id: str, metric: str, since: int, until: int | None = None,
limit: int = 5000,
) -> list[dict[str, Any]]:
until = until or int(time.time())
with self._conn() as c:
rows = c.execute(
"""SELECT ts, value FROM readings
WHERE module_id=? AND metric=? AND ts BETWEEN ? AND ?
ORDER BY ts ASC LIMIT ?""",
(module_id, metric, since, until, limit),
)
return [dict(r) for r in rows]
def aggregate(
self, module_id: str, metric: str, since: int, until: int | None = None,
) -> dict[str, float | int | None]:
until = until or int(time.time())
with self._conn() as c:
r = c.execute(
"""SELECT COUNT(*) n, MIN(value) lo, MAX(value) hi, AVG(value) avg
FROM readings WHERE module_id=? AND metric=? AND ts BETWEEN ? AND ?""",
(module_id, metric, since, until),
).fetchone()
return {"count": r["n"], "min": r["lo"], "max": r["hi"], "avg": r["avg"]}
# ---- alerts -----------------------------------------------------------
def create_alert(
self, name: str, module_id: str, metric: str, op: str, threshold: float,
notes: str | None = None,
) -> int:
if op not in (">", "<", ">=", "<="):
raise ValueError(f"Invalid operator {op!r}")
with self._lock, self._conn() as c:
cur = c.execute(
"""INSERT INTO alerts(name, module_id, metric, op, threshold, notes)
VALUES(?,?,?,?,?,?)""",
(name, module_id, metric, op, threshold, notes),
)
return cur.lastrowid
def list_alerts(self) -> list[dict[str, Any]]:
with self._conn() as c:
return [dict(r) for r in c.execute("SELECT * FROM alerts ORDER BY name")]
def delete_alert(self, alert_id: int) -> bool:
with self._lock, self._conn() as c:
cur = c.execute("DELETE FROM alerts WHERE id=?", (alert_id,))
return cur.rowcount > 0
def evaluate_alerts(self, now: int, latest: dict[tuple[str, str], float]) -> list[dict[str, Any]]:
"""Check every enabled alert against the latest readings.
latest is keyed by (module_id, metric) -> value. Returns newly-fired events."""
fired: list[dict[str, Any]] = []
with self._lock, self._conn() as c:
alerts = list(c.execute("SELECT * FROM alerts WHERE enabled=1"))
for a in alerts:
v = latest.get((a["module_id"], a["metric"]))
if v is None:
continue
op, t = a["op"], a["threshold"]
triggered = (
(op == ">" and v > t) or
(op == "<" and v < t) or
(op == ">=" and v >= t) or
(op == "<=" and v <= t)
)
prev_state = "triggered" if a["last_triggered"] else "cleared"
new_state = "triggered" if triggered else "cleared"
if new_state != prev_state:
c.execute(
"INSERT INTO alert_events(alert_id, ts, value, state) VALUES (?,?,?,?)",
(a["id"], now, v, new_state),
)
c.execute(
"UPDATE alerts SET last_triggered=?, last_value=? WHERE id=?",
(now if triggered else None, v, a["id"]),
)
if triggered:
fired.append({
"alert_id": a["id"], "name": a["name"],
"module_id": a["module_id"], "metric": a["metric"],
"value": v, "op": op, "threshold": t, "ts": now,
})
return fired
def recent_alert_events(self, limit: int = 50) -> list[dict[str, Any]]:
with self._conn() as c:
rows = c.execute(
"""SELECT e.*, a.name, a.module_id, a.metric, a.op, a.threshold
FROM alert_events e JOIN alerts a ON a.id = e.alert_id
ORDER BY e.ts DESC LIMIT ?""",
(limit,),
)
return [dict(r) for r in rows]

27
docker-compose.yml Normal file
View File

@ -0,0 +1,27 @@
services:
netatmo-mcp:
build: .
image: netatmo-mcp:latest
container_name: netatmo-mcp
restart: unless-stopped
ports:
- "8088:8088"
environment:
# --- required ---
NETATMO_CLIENT_ID: "${NETATMO_CLIENT_ID}"
NETATMO_CLIENT_SECRET: "${NETATMO_CLIENT_SECRET}"
# Only required on first boot — afterwards tokens.json takes over.
NETATMO_REFRESH_TOKEN: "${NETATMO_REFRESH_TOKEN:-}"
# Long random string. Used for both REST and MCP auth.
NETATMO_MCP_API_KEY: "${NETATMO_MCP_API_KEY}"
# --- optional ---
POLL_INTERVAL_SEC: "300" # Netatmo updates roughly every 5 min anyway
LOG_LEVEL: "INFO"
volumes:
- ./data:/data
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8088/health').status==200 else 1)"]
interval: 30s
timeout: 5s
retries: 3

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
fastapi>=0.115
uvicorn[standard]>=0.32
httpx>=0.27
mcp>=1.2
pydantic>=2.7