Some checks failed
Build & Push Docker Image / build (push) Failing after 6s
- app.py: unified app – ntfy subscriber runs as background thread, FastAPI serves REST API (/api/messages) and static frontend - static/index.html: terminal-style notification table with sortable columns, drag & drop column reorder, column visibility toggle (localStorage), text search, forwarded-only filter, auto-refresh every 30s - All ntfy fields stored: id, received_at, ntfy_time, topic, title, message, priority, tags, forwarded - /data volume for persistent SQLite DB - Port 8000 exposed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
230 lines
7.6 KiB
Python
230 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
logwatch – unified app
|
||
- background thread: subscribes to ntfy, filters, stores, forwards
|
||
- FastAPI: REST API + static file serving
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import threading
|
||
import time
|
||
from contextlib import asynccontextmanager
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
from fastapi import FastAPI
|
||
from fastapi.responses import FileResponse, JSONResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
|
||
# ── Config ────────────────────────────────────────────────
|
||
SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert")
|
||
TARGET_URL = os.environ.get("TARGET_URL", "")
|
||
TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "")
|
||
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||
RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10"))
|
||
DB_PATH = os.environ.get("DB_PATH", "/data/logwatch.db")
|
||
|
||
logging.basicConfig(
|
||
level=getattr(logging, LOG_LEVEL, logging.INFO),
|
||
format="%(asctime)s %(levelname)-7s %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
)
|
||
log = logging.getLogger("logwatch")
|
||
|
||
# ── Database ──────────────────────────────────────────────
|
||
def get_db():
|
||
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
|
||
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
|
||
conn.row_factory = sqlite3.Row
|
||
return conn
|
||
|
||
def init_db():
|
||
db = get_db()
|
||
db.execute("""
|
||
CREATE TABLE IF NOT EXISTS messages (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
ntfy_id TEXT,
|
||
received_at TEXT NOT NULL,
|
||
ntfy_time INTEGER,
|
||
topic TEXT,
|
||
title TEXT,
|
||
message TEXT,
|
||
priority INTEGER,
|
||
tags TEXT,
|
||
forwarded INTEGER DEFAULT 0
|
||
)
|
||
""")
|
||
db.commit()
|
||
db.close()
|
||
|
||
_db_lock = threading.Lock()
|
||
|
||
def store_message(event: dict, forwarded: bool) -> None:
|
||
received_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime())
|
||
tags = json.dumps(event.get("tags", []))
|
||
with _db_lock:
|
||
db = get_db()
|
||
db.execute("""
|
||
INSERT INTO messages
|
||
(ntfy_id, received_at, ntfy_time, topic, title, message, priority, tags, forwarded)
|
||
VALUES (?,?,?,?,?,?,?,?,?)
|
||
""", (
|
||
event.get("id"),
|
||
received_at,
|
||
event.get("time"),
|
||
event.get("topic"),
|
||
event.get("title", ""),
|
||
event.get("message", ""),
|
||
event.get("priority", 3),
|
||
tags,
|
||
1 if forwarded else 0,
|
||
))
|
||
db.commit()
|
||
db.close()
|
||
|
||
# ── Filter logic ──────────────────────────────────────────
|
||
IMPORTANT_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [
|
||
r"updated?\s+\d+",
|
||
r"\d+\s+updated",
|
||
r"fail(ed|ure)?",
|
||
r"error",
|
||
r"could not",
|
||
r"unable to",
|
||
r"pull(ed)?\s+\w+",
|
||
r"restart(ed|ing)?",
|
||
r"stopped",
|
||
r"warn(ing)?",
|
||
]]
|
||
|
||
NOISE_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [
|
||
r"no\s+(new\s+)?updates?\s+(found|available)",
|
||
r"checked\s+\d+\s+container",
|
||
r"session\s+done",
|
||
r"^watchtower\s+started",
|
||
r"scanning\s+for\s+updates",
|
||
r"0\s+updated.*0\s+failed",
|
||
]]
|
||
|
||
def is_important(title: str, message: str) -> bool:
|
||
text = f"{title} {message}"
|
||
for p in NOISE_PATTERNS:
|
||
if p.search(text):
|
||
return False
|
||
for p in IMPORTANT_PATTERNS:
|
||
if p.search(text):
|
||
return True
|
||
return False
|
||
|
||
def classify(title: str, message: str) -> tuple[str, list[str]]:
|
||
text = f"{title} {message}".lower()
|
||
if re.search(r"fail|error|could not|unable", text):
|
||
return "high", ["rotating_light", "watchtower"]
|
||
if re.search(r"updated|pulled|restart", text):
|
||
return "default", ["arrow_up", "watchtower"]
|
||
return "low", ["watchtower"]
|
||
|
||
def forward(title: str, message: str, priority: str, tags: list[str]) -> None:
|
||
if not TARGET_URL:
|
||
return
|
||
headers = {"Title": title, "Priority": priority, "Tags": ",".join(tags)}
|
||
if TARGET_TOKEN:
|
||
headers["Authorization"] = f"Bearer {TARGET_TOKEN}"
|
||
try:
|
||
resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10)
|
||
resp.raise_for_status()
|
||
log.info("FORWARDED | [%s] %s", title, message[:100])
|
||
except requests.RequestException as e:
|
||
log.error("Forward failed: %s", e)
|
||
|
||
# ── Subscriber thread ─────────────────────────────────────
|
||
def listen_once():
|
||
sse_url = SOURCE_URL.rstrip("/") + "/json"
|
||
log.info("Connecting to %s", sse_url)
|
||
with requests.get(sse_url, stream=True, timeout=(10, None)) as resp:
|
||
resp.raise_for_status()
|
||
for raw_line in resp.iter_lines():
|
||
if not raw_line:
|
||
continue
|
||
try:
|
||
event = json.loads(raw_line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
if event.get("event", "message") != "message":
|
||
continue
|
||
|
||
title = event.get("title", "")
|
||
message = event.get("message", "")
|
||
|
||
if title and "watchtower" not in title.lower():
|
||
log.debug("SKIP (not watchtower) | [%s]", title)
|
||
continue
|
||
|
||
log.debug("RECEIVED | [%s] %s", title, message[:120])
|
||
|
||
important = is_important(title, message)
|
||
store_message(event, forwarded=important)
|
||
|
||
if important:
|
||
priority, tags = classify(title, message)
|
||
forward(title, message, priority, tags)
|
||
else:
|
||
log.debug("FILTERED | [%s] %s", title, message[:80])
|
||
|
||
def subscriber_loop():
|
||
log.info("Subscriber started – source: %s target: %s",
|
||
SOURCE_URL, TARGET_URL or "(dry-run)")
|
||
while True:
|
||
try:
|
||
listen_once()
|
||
log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY)
|
||
except requests.RequestException as e:
|
||
log.error("Connection error: %s – retry in %ds", e, RECONNECT_DELAY)
|
||
time.sleep(RECONNECT_DELAY)
|
||
|
||
# ── FastAPI ───────────────────────────────────────────────
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
init_db()
|
||
t = threading.Thread(target=subscriber_loop, daemon=True, name="subscriber")
|
||
t.start()
|
||
yield
|
||
|
||
app = FastAPI(title="logwatch", lifespan=lifespan)
|
||
|
||
@app.get("/api/messages")
|
||
def api_messages():
|
||
with _db_lock:
|
||
db = get_db()
|
||
rows = db.execute(
|
||
"SELECT * FROM messages ORDER BY id DESC LIMIT 1000"
|
||
).fetchall()
|
||
db.close()
|
||
result = []
|
||
for r in rows:
|
||
result.append({
|
||
"id": r["id"],
|
||
"ntfy_id": r["ntfy_id"],
|
||
"received_at": r["received_at"],
|
||
"ntfy_time": r["ntfy_time"],
|
||
"topic": r["topic"],
|
||
"title": r["title"],
|
||
"message": r["message"],
|
||
"priority": r["priority"],
|
||
"tags": json.loads(r["tags"] or "[]"),
|
||
"forwarded": bool(r["forwarded"]),
|
||
})
|
||
return JSONResponse(result)
|
||
|
||
@app.get("/api/health")
|
||
def health():
|
||
return {"status": "ok"}
|
||
|
||
# static files
|
||
STATIC_DIR = Path(__file__).parent / "static"
|
||
app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static")
|