Files
logwatch/app.py
LogWatch 02968c6288
Some checks failed
Build & Push Docker Image / build (push) Failing after 6s
feat: web UI + SQLite storage + FastAPI backend
- app.py: unified app – ntfy subscriber runs as background thread,
  FastAPI serves REST API (/api/messages) and static frontend
- static/index.html: terminal-style notification table with
  sortable columns, drag & drop column reorder, column visibility
  toggle (localStorage), text search, forwarded-only filter,
  auto-refresh every 30s
- All ntfy fields stored: id, received_at, ntfy_time, topic,
  title, message, priority, tags, forwarded
- /data volume for persistent SQLite DB
- Port 8000 exposed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 17:02:11 +01:00

230 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
logwatch unified app
- background thread: subscribes to ntfy, filters, stores, forwards
- FastAPI: REST API + static file serving
"""
import json
import logging
import os
import re
import sqlite3
import threading
import time
from contextlib import asynccontextmanager
from pathlib import Path
import requests
from fastapi import FastAPI
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
# ── Config ────────────────────────────────────────────────
SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert")
TARGET_URL = os.environ.get("TARGET_URL", "")
TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "")
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10"))
DB_PATH = os.environ.get("DB_PATH", "/data/logwatch.db")
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("logwatch")
# ── Database ──────────────────────────────────────────────
def get_db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
def init_db():
db = get_db()
db.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ntfy_id TEXT,
received_at TEXT NOT NULL,
ntfy_time INTEGER,
topic TEXT,
title TEXT,
message TEXT,
priority INTEGER,
tags TEXT,
forwarded INTEGER DEFAULT 0
)
""")
db.commit()
db.close()
_db_lock = threading.Lock()
def store_message(event: dict, forwarded: bool) -> None:
received_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime())
tags = json.dumps(event.get("tags", []))
with _db_lock:
db = get_db()
db.execute("""
INSERT INTO messages
(ntfy_id, received_at, ntfy_time, topic, title, message, priority, tags, forwarded)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
event.get("id"),
received_at,
event.get("time"),
event.get("topic"),
event.get("title", ""),
event.get("message", ""),
event.get("priority", 3),
tags,
1 if forwarded else 0,
))
db.commit()
db.close()
# ── Filter logic ──────────────────────────────────────────
IMPORTANT_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [
r"updated?\s+\d+",
r"\d+\s+updated",
r"fail(ed|ure)?",
r"error",
r"could not",
r"unable to",
r"pull(ed)?\s+\w+",
r"restart(ed|ing)?",
r"stopped",
r"warn(ing)?",
]]
NOISE_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [
r"no\s+(new\s+)?updates?\s+(found|available)",
r"checked\s+\d+\s+container",
r"session\s+done",
r"^watchtower\s+started",
r"scanning\s+for\s+updates",
r"0\s+updated.*0\s+failed",
]]
def is_important(title: str, message: str) -> bool:
text = f"{title} {message}"
for p in NOISE_PATTERNS:
if p.search(text):
return False
for p in IMPORTANT_PATTERNS:
if p.search(text):
return True
return False
def classify(title: str, message: str) -> tuple[str, list[str]]:
text = f"{title} {message}".lower()
if re.search(r"fail|error|could not|unable", text):
return "high", ["rotating_light", "watchtower"]
if re.search(r"updated|pulled|restart", text):
return "default", ["arrow_up", "watchtower"]
return "low", ["watchtower"]
def forward(title: str, message: str, priority: str, tags: list[str]) -> None:
if not TARGET_URL:
return
headers = {"Title": title, "Priority": priority, "Tags": ",".join(tags)}
if TARGET_TOKEN:
headers["Authorization"] = f"Bearer {TARGET_TOKEN}"
try:
resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10)
resp.raise_for_status()
log.info("FORWARDED | [%s] %s", title, message[:100])
except requests.RequestException as e:
log.error("Forward failed: %s", e)
# ── Subscriber thread ─────────────────────────────────────
def listen_once():
sse_url = SOURCE_URL.rstrip("/") + "/json"
log.info("Connecting to %s", sse_url)
with requests.get(sse_url, stream=True, timeout=(10, None)) as resp:
resp.raise_for_status()
for raw_line in resp.iter_lines():
if not raw_line:
continue
try:
event = json.loads(raw_line)
except json.JSONDecodeError:
continue
if event.get("event", "message") != "message":
continue
title = event.get("title", "")
message = event.get("message", "")
if title and "watchtower" not in title.lower():
log.debug("SKIP (not watchtower) | [%s]", title)
continue
log.debug("RECEIVED | [%s] %s", title, message[:120])
important = is_important(title, message)
store_message(event, forwarded=important)
if important:
priority, tags = classify(title, message)
forward(title, message, priority, tags)
else:
log.debug("FILTERED | [%s] %s", title, message[:80])
def subscriber_loop():
log.info("Subscriber started source: %s target: %s",
SOURCE_URL, TARGET_URL or "(dry-run)")
while True:
try:
listen_once()
log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY)
except requests.RequestException as e:
log.error("Connection error: %s retry in %ds", e, RECONNECT_DELAY)
time.sleep(RECONNECT_DELAY)
# ── FastAPI ───────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
t = threading.Thread(target=subscriber_loop, daemon=True, name="subscriber")
t.start()
yield
app = FastAPI(title="logwatch", lifespan=lifespan)
@app.get("/api/messages")
def api_messages():
with _db_lock:
db = get_db()
rows = db.execute(
"SELECT * FROM messages ORDER BY id DESC LIMIT 1000"
).fetchall()
db.close()
result = []
for r in rows:
result.append({
"id": r["id"],
"ntfy_id": r["ntfy_id"],
"received_at": r["received_at"],
"ntfy_time": r["ntfy_time"],
"topic": r["topic"],
"title": r["title"],
"message": r["message"],
"priority": r["priority"],
"tags": json.loads(r["tags"] or "[]"),
"forwarded": bool(r["forwarded"]),
})
return JSONResponse(result)
@app.get("/api/health")
def health():
return {"status": "ok"}
# static files
STATIC_DIR = Path(__file__).parent / "static"
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")