netatmo-mcp/app/poller.py
mineracks 9efe05eca0 Initial commit: Netatmo MCP + REST bridge for DC environmental monitoring
Deployed on integr8 VM (pve01/700) at environment.mineracks.com.
Polls Netatmo every 5 min, stores in SQLite, exposes via FastAPI REST + MCP.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 12:14:18 +00:00

100 lines
3.4 KiB
Python

"""
Background poller. Pulls live snapshot every N seconds and persists any
unseen readings. Runs in the same asyncio loop as the FastAPI app.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from .netatmo import NetatmoClient, flatten_stations
from .store import Store
log = logging.getLogger(__name__)
# Metrics we care about — anything else in dashboard_data is metadata.
NUMERIC_METRICS = {
"Temperature", "Humidity", "CO2", "Noise", "Pressure", "AbsolutePressure",
"Rain", "sum_rain_1", "sum_rain_24", "WindStrength", "WindAngle",
"GustStrength", "GustAngle", "min_temp", "max_temp", "health_idx",
}
class Poller:
def __init__(self, client: NetatmoClient, store: Store, interval_sec: int = 300):
self.client = client
self.store = store
self.interval = interval_sec
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
self.last_run_ts: int | None = None
self.last_error: str | None = None
self.last_inserted: int = 0
async def _tick(self) -> None:
raw = await self.client.get_stations_data()
modules = flatten_stations(raw)
self.store.upsert_modules(modules)
rows: list[tuple[str, str, str, int, float]] = []
latest: dict[tuple[str, str], float] = {}
for m in modules:
d = m.get("dashboard") or {}
ts = d.get("time_utc")
if not ts:
continue
for k, v in d.items():
if k not in NUMERIC_METRICS:
continue
if not isinstance(v, (int, float)):
continue
rows.append((m["station_id"], m["module_id"], k, int(ts), float(v)))
latest[(m["module_id"], k)] = float(v)
inserted = self.store.insert_readings(rows) if rows else 0
self.last_inserted = inserted
# Evaluate alerts against the freshest snapshot.
import time as _t
fired = self.store.evaluate_alerts(int(_t.time()), latest)
for f in fired:
log.warning("ALERT %s: %s.%s = %s %s %s",
f["name"], f["module_id"], f["metric"],
f["value"], f["op"], f["threshold"])
async def _run(self) -> None:
log.info("Poller starting (interval=%ss)", self.interval)
while not self._stop.is_set():
import time as _t
try:
await self._tick()
self.last_run_ts = int(_t.time())
self.last_error = None
except Exception as e:
self.last_error = str(e)
log.exception("Poll failed")
try:
await asyncio.wait_for(self._stop.wait(), timeout=self.interval)
except asyncio.TimeoutError:
pass
def start(self) -> None:
if self._task is None or self._task.done():
self._stop.clear()
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
self._stop.set()
if self._task:
await self._task
def status(self) -> dict[str, Any]:
return {
"running": self._task is not None and not self._task.done(),
"interval_sec": self.interval,
"last_run_ts": self.last_run_ts,
"last_inserted": self.last_inserted,
"last_error": self.last_error,
}