diff --git a/Dockerfile b/Dockerfile index fb6c5f1..9263ca8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,12 +5,18 @@ WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY filter.py . +COPY app.py filter.py ./ +COPY static/ static/ + +VOLUME ["/data"] ENV SOURCE_URL=http://192.168.123.77/albert ENV TARGET_URL= ENV TARGET_TOKEN= ENV LOG_LEVEL=INFO ENV RECONNECT_DELAY=10 +ENV DB_PATH=/data/logwatch.db -CMD ["python", "-u", "filter.py"] +EXPOSE 8000 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/app.py b/app.py new file mode 100644 index 0000000..a0ef677 --- /dev/null +++ b/app.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +""" +logwatch – unified app +- background thread: subscribes to ntfy, filters, stores, forwards +- FastAPI: REST API + static file serving +""" + +import json +import logging +import os +import re +import sqlite3 +import threading +import time +from contextlib import asynccontextmanager +from pathlib import Path + +import requests +from fastapi import FastAPI +from fastapi.responses import FileResponse, JSONResponse +from fastapi.staticfiles import StaticFiles + +# ── Config ──────────────────────────────────────────────── +SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert") +TARGET_URL = os.environ.get("TARGET_URL", "") +TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "") +LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() +RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10")) +DB_PATH = os.environ.get("DB_PATH", "/data/logwatch.db") + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL, logging.INFO), + format="%(asctime)s %(levelname)-7s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger("logwatch") + +# ── Database ────────────────────────────────────────────── +def get_db(): + Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(DB_PATH, check_same_thread=False) + conn.row_factory = sqlite3.Row + return conn + +def init_db(): + db = get_db() + db.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ntfy_id TEXT, + received_at TEXT NOT NULL, + ntfy_time INTEGER, + topic TEXT, + title TEXT, + message TEXT, + priority INTEGER, + tags TEXT, + forwarded INTEGER DEFAULT 0 + ) + """) + db.commit() + db.close() + +_db_lock = threading.Lock() + +def store_message(event: dict, forwarded: bool) -> None: + received_at = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime()) + tags = json.dumps(event.get("tags", [])) + with _db_lock: + db = get_db() + db.execute(""" + INSERT INTO messages + (ntfy_id, received_at, ntfy_time, topic, title, message, priority, tags, forwarded) + VALUES (?,?,?,?,?,?,?,?,?) + """, ( + event.get("id"), + received_at, + event.get("time"), + event.get("topic"), + event.get("title", ""), + event.get("message", ""), + event.get("priority", 3), + tags, + 1 if forwarded else 0, + )) + db.commit() + db.close() + +# ── Filter logic ────────────────────────────────────────── +IMPORTANT_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [ + r"updated?\s+\d+", + r"\d+\s+updated", + r"fail(ed|ure)?", + r"error", + r"could not", + r"unable to", + r"pull(ed)?\s+\w+", + r"restart(ed|ing)?", + r"stopped", + r"warn(ing)?", +]] + +NOISE_PATTERNS = [re.compile(r, re.IGNORECASE) for r in [ + r"no\s+(new\s+)?updates?\s+(found|available)", + r"checked\s+\d+\s+container", + r"session\s+done", + r"^watchtower\s+started", + r"scanning\s+for\s+updates", + r"0\s+updated.*0\s+failed", +]] + +def is_important(title: str, message: str) -> bool: + text = f"{title} {message}" + for p in NOISE_PATTERNS: + if p.search(text): + return False + for p in IMPORTANT_PATTERNS: + if p.search(text): + return True + return False + +def classify(title: str, message: str) -> tuple[str, list[str]]: + text = f"{title} {message}".lower() + if re.search(r"fail|error|could not|unable", text): + return "high", ["rotating_light", "watchtower"] + if re.search(r"updated|pulled|restart", text): + return "default", ["arrow_up", "watchtower"] + return "low", ["watchtower"] + +def forward(title: str, message: str, priority: str, tags: list[str]) -> None: + if not TARGET_URL: + return + headers = {"Title": title, "Priority": priority, "Tags": ",".join(tags)} + if TARGET_TOKEN: + headers["Authorization"] = f"Bearer {TARGET_TOKEN}" + try: + resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10) + resp.raise_for_status() + log.info("FORWARDED | [%s] %s", title, message[:100]) + except requests.RequestException as e: + log.error("Forward failed: %s", e) + +# ── Subscriber thread ───────────────────────────────────── +def listen_once(): + sse_url = SOURCE_URL.rstrip("/") + "/json" + log.info("Connecting to %s", sse_url) + with requests.get(sse_url, stream=True, timeout=(10, None)) as resp: + resp.raise_for_status() + for raw_line in resp.iter_lines(): + if not raw_line: + continue + try: + event = json.loads(raw_line) + except json.JSONDecodeError: + continue + + if event.get("event", "message") != "message": + continue + + title = event.get("title", "") + message = event.get("message", "") + + if title and "watchtower" not in title.lower(): + log.debug("SKIP (not watchtower) | [%s]", title) + continue + + log.debug("RECEIVED | [%s] %s", title, message[:120]) + + important = is_important(title, message) + store_message(event, forwarded=important) + + if important: + priority, tags = classify(title, message) + forward(title, message, priority, tags) + else: + log.debug("FILTERED | [%s] %s", title, message[:80]) + +def subscriber_loop(): + log.info("Subscriber started – source: %s target: %s", + SOURCE_URL, TARGET_URL or "(dry-run)") + while True: + try: + listen_once() + log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY) + except requests.RequestException as e: + log.error("Connection error: %s – retry in %ds", e, RECONNECT_DELAY) + time.sleep(RECONNECT_DELAY) + +# ── FastAPI ─────────────────────────────────────────────── +@asynccontextmanager +async def lifespan(app: FastAPI): + init_db() + t = threading.Thread(target=subscriber_loop, daemon=True, name="subscriber") + t.start() + yield + +app = FastAPI(title="logwatch", lifespan=lifespan) + +@app.get("/api/messages") +def api_messages(): + with _db_lock: + db = get_db() + rows = db.execute( + "SELECT * FROM messages ORDER BY id DESC LIMIT 1000" + ).fetchall() + db.close() + result = [] + for r in rows: + result.append({ + "id": r["id"], + "ntfy_id": r["ntfy_id"], + "received_at": r["received_at"], + "ntfy_time": r["ntfy_time"], + "topic": r["topic"], + "title": r["title"], + "message": r["message"], + "priority": r["priority"], + "tags": json.loads(r["tags"] or "[]"), + "forwarded": bool(r["forwarded"]), + }) + return JSONResponse(result) + +@app.get("/api/health") +def health(): + return {"status": "ok"} + +# static files +STATIC_DIR = Path(__file__).parent / "static" +app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static") diff --git a/docker-compose.yml b/docker-compose.yml index 564cad5..ece6d70 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,9 +4,17 @@ services: image: logwatch:latest container_name: logwatch restart: unless-stopped + ports: + - "${PORT:-8000}:8000" + volumes: + - logwatch_data:/data environment: - SOURCE_URL=${SOURCE_URL:-https://ntfy.albert-zangerl.com/albert} - TARGET_URL=${TARGET_URL:-https://ntfy.albert-zangerl.com/wichtig} - TARGET_TOKEN=${TARGET_TOKEN:-} - LOG_LEVEL=${LOG_LEVEL:-INFO} - RECONNECT_DELAY=${RECONNECT_DELAY:-10} + - DB_PATH=/data/logwatch.db + +volumes: + logwatch_data: diff --git a/portainer-stack.yml b/portainer-stack.yml index d8d3648..f6d28da 100644 --- a/portainer-stack.yml +++ b/portainer-stack.yml @@ -3,5 +3,17 @@ services: image: git.albert-zangerl.com/al/logwatch:latest container_name: logwatch restart: unless-stopped - env_file: - - stack.env + ports: + - "8000:8000" + volumes: + - logwatch_data:/data + environment: + - SOURCE_URL=https://ntfy.albert-zangerl.com/albert + - TARGET_URL=https://ntfy.albert-zangerl.com/wichtig + # - TARGET_TOKEN= + - LOG_LEVEL=INFO + - RECONNECT_DELAY=10 + - DB_PATH=/data/logwatch.db + +volumes: + logwatch_data: diff --git a/requirements.txt b/requirements.txt index 0eb8cae..b242f2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ requests>=2.31.0 +fastapi>=0.110.0 +uvicorn>=0.29.0 diff --git a/stack.env.example b/stack.env.example index d4a5587..61c01da 100644 --- a/stack.env.example +++ b/stack.env.example @@ -9,3 +9,4 @@ TARGET_URL=https://ntfy.albert-zangerl.com/wichtig LOG_LEVEL=INFO RECONNECT_DELAY=10 +PORT=8000 diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..10ddb0f --- /dev/null +++ b/static/index.html @@ -0,0 +1,694 @@ + + + + + +logwatch + + + + +
+ + + connecting… + + +
+ +
+
+ +
+ + +
+ +
+
show / hide
+
+
+
+ +
+ + + +
+
+ +
+ + + +