#!/usr/bin/env python3 """ logwatch – unified app - background thread: subscribes to ntfy, filters, stores, forwards - FastAPI: REST API + static file serving """ import json import logging import os import re import sqlite3 import threading import time from contextlib import asynccontextmanager from pathlib import Path import requests from fastapi import FastAPI from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles # ── Config ──────────────────────────────────────────────── SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert") TARGET_URL = os.environ.get("TARGET_URL", "") TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "") LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10")) DB_PATH = os.environ.get("DB_PATH", "/data/logwatch.db") logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s %(levelname)-7s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("logwatch") # ── Database ────────────────────────────────────────────── def get_db(): Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row return conn def init_db(): db = get_db() db.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, ntfy_id TEXT, received_at TEXT NOT NULL, ntfy_time INTEGER, topic TEXT, title TEXT, message TEXT, priority INTEGER, tags TEXT, forwarded INTEGER DEFAULT 0 ) """) db.commit() db.close() _db_lock = threading.Lock() def store_message(event: dict, forwarded: bool) -> None: received_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) tags = json.dumps(event.get("tags", [])) with _db_lock: db = get_db() db.execute(""" INSERT INTO messages (ntfy_id, received_at, ntfy_time, topic, title, message, priority, tags, forwarded) VALUES (?,?,?,?,?,?,?,?,?) """, ( event.get("id"), received_at, event.get("time"), event.get("topic"), event.get("title", ""), event.get("message", ""), event.get("priority", 3), tags, 1 if forwarded else 0, )) db.commit() db.close() # ── Filter logic ────────────────────────────────────────── IMPORTANT_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [ r"updated?\s+\d+", r"\d+\s+updated", r"fail(ed|ure)?", r"error", r"could not", r"unable to", r"pull(ed)?\s+\w+", r"restart(ed|ing)?", r"stopped", r"warn(ing)?", ]] NOISE_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [ r"no\s+(new\s+)?updates?\s+(found|available)", r"checked\s+\d+\s+container", r"session\s+done", r"^watchtower\s+started", r"scanning\s+for\s+updates", r"0\s+updated.*0\s+failed", ]] def is_important(title: str, message: str) -> bool: text = f"{title} {message}" for p in NOISE_PATTERNS: if p.search(text): return False for p in IMPORTANT_PATTERNS: if p.search(text): return True return False def classify(title: str, message: str) -> tuple[str, list[str]]: text = f"{title} {message}".lower() if re.search(r"fail|error|could not|unable", text): return "high", ["rotating_light", "watchtower"] if re.search(r"updated|pulled|restart", text): return "default", ["arrow_up", "watchtower"] return "low", ["watchtower"] def forward(title: str, message: str, priority: str, tags: list[str]) -> None: if not TARGET_URL: return headers = {"Title": title, "Priority": priority, "Tags": ",".join(tags)} if TARGET_TOKEN: headers["Authorization"] = f"Bearer {TARGET_TOKEN}" try: resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10) resp.raise_for_status() log.info("FORWARDED | [%s] %s", title, message[:100]) except requests.RequestException as e: log.error("Forward failed: %s", e) # ── Subscriber thread ───────────────────────────────────── def listen_once(): sse_url = SOURCE_URL.rstrip("/") + "/json" log.info("Connecting to %s", sse_url) with requests.get(sse_url, stream=True, timeout=(10, None)) as resp: resp.raise_for_status() for raw_line in resp.iter_lines(): if not raw_line: continue try: event = json.loads(raw_line) except json.JSONDecodeError: continue if event.get("event", "message") != "message": continue title = event.get("title", "") message = event.get("message", "") if title and "watchtower" not in title.lower(): log.debug("SKIP (not watchtower) | [%s]", title) continue log.debug("RECEIVED | [%s] %s", title, message[:120]) important = is_important(title, message) store_message(event, forwarded=important) if important: priority, tags = classify(title, message) forward(title, message, priority, tags) else: log.debug("FILTERED | [%s] %s", title, message[:80]) def subscriber_loop(): log.info("Subscriber started – source: %s target: %s", SOURCE_URL, TARGET_URL or "(dry-run)") while True: try: listen_once() log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY) except requests.RequestException as e: log.error("Connection error: %s – retry in %ds", e, RECONNECT_DELAY) time.sleep(RECONNECT_DELAY) # ── FastAPI ─────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): init_db() t = threading.Thread(target=subscriber_loop, daemon=True, name="subscriber") t.start() yield app = FastAPI(title="logwatch", lifespan=lifespan) @app.get("/api/messages") def api_messages(): with _db_lock: db = get_db() rows = db.execute( "SELECT * FROM messages ORDER BY id DESC LIMIT 1000" ).fetchall() db.close() result = [] for r in rows: result.append({ "id": r["id"], "ntfy_id": r["ntfy_id"], "received_at": r["received_at"], "ntfy_time": r["ntfy_time"], "topic": r["topic"], "title": r["title"], "message": r["message"], "priority": r["priority"], "tags": json.loads(r["tags"] or "[]"), "forwarded": bool(r["forwarded"]), }) return JSONResponse(result) @app.get("/api/health") def health(): return {"status": "ok"} # static files STATIC_DIR = Path(__file__).parent / "static" app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")