Deployed on integr8 VM (pve01/700) at environment.mineracks.com. Polls Netatmo every 5 min, stores in SQLite, exposes via FastAPI REST + MCP. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
100 lines
3.4 KiB
Python
100 lines
3.4 KiB
Python
"""
|
|
Background poller. Pulls live snapshot every N seconds and persists any
|
|
unseen readings. Runs in the same asyncio loop as the FastAPI app.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from .netatmo import NetatmoClient, flatten_stations
|
|
from .store import Store
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Metrics we care about — anything else in dashboard_data is metadata.
|
|
NUMERIC_METRICS = {
|
|
"Temperature", "Humidity", "CO2", "Noise", "Pressure", "AbsolutePressure",
|
|
"Rain", "sum_rain_1", "sum_rain_24", "WindStrength", "WindAngle",
|
|
"GustStrength", "GustAngle", "min_temp", "max_temp", "health_idx",
|
|
}
|
|
|
|
|
|
class Poller:
|
|
def __init__(self, client: NetatmoClient, store: Store, interval_sec: int = 300):
|
|
self.client = client
|
|
self.store = store
|
|
self.interval = interval_sec
|
|
self._task: asyncio.Task | None = None
|
|
self._stop = asyncio.Event()
|
|
self.last_run_ts: int | None = None
|
|
self.last_error: str | None = None
|
|
self.last_inserted: int = 0
|
|
|
|
async def _tick(self) -> None:
|
|
raw = await self.client.get_stations_data()
|
|
modules = flatten_stations(raw)
|
|
self.store.upsert_modules(modules)
|
|
|
|
rows: list[tuple[str, str, str, int, float]] = []
|
|
latest: dict[tuple[str, str], float] = {}
|
|
for m in modules:
|
|
d = m.get("dashboard") or {}
|
|
ts = d.get("time_utc")
|
|
if not ts:
|
|
continue
|
|
for k, v in d.items():
|
|
if k not in NUMERIC_METRICS:
|
|
continue
|
|
if not isinstance(v, (int, float)):
|
|
continue
|
|
rows.append((m["station_id"], m["module_id"], k, int(ts), float(v)))
|
|
latest[(m["module_id"], k)] = float(v)
|
|
|
|
inserted = self.store.insert_readings(rows) if rows else 0
|
|
self.last_inserted = inserted
|
|
|
|
# Evaluate alerts against the freshest snapshot.
|
|
import time as _t
|
|
fired = self.store.evaluate_alerts(int(_t.time()), latest)
|
|
for f in fired:
|
|
log.warning("ALERT %s: %s.%s = %s %s %s",
|
|
f["name"], f["module_id"], f["metric"],
|
|
f["value"], f["op"], f["threshold"])
|
|
|
|
async def _run(self) -> None:
|
|
log.info("Poller starting (interval=%ss)", self.interval)
|
|
while not self._stop.is_set():
|
|
import time as _t
|
|
try:
|
|
await self._tick()
|
|
self.last_run_ts = int(_t.time())
|
|
self.last_error = None
|
|
except Exception as e:
|
|
self.last_error = str(e)
|
|
log.exception("Poll failed")
|
|
try:
|
|
await asyncio.wait_for(self._stop.wait(), timeout=self.interval)
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
def start(self) -> None:
|
|
if self._task is None or self._task.done():
|
|
self._stop.clear()
|
|
self._task = asyncio.create_task(self._run())
|
|
|
|
async def stop(self) -> None:
|
|
self._stop.set()
|
|
if self._task:
|
|
await self._task
|
|
|
|
def status(self) -> dict[str, Any]:
|
|
return {
|
|
"running": self._task is not None and not self._task.done(),
|
|
"interval_sec": self.interval,
|
|
"last_run_ts": self.last_run_ts,
|
|
"last_inserted": self.last_inserted,
|
|
"last_error": self.last_error,
|
|
}
|