#!/usr/bin/env python3 """ logwatch - Watchtower ntfy filter Subscribes to an ntfy channel, filters Watchtower notifications, and forwards only important ones (updates, failures, errors) to another channel. """ import json import logging import os import re import time import requests # --- Configuration (from environment) --- SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert") TARGET_URL = os.environ.get("TARGET_URL", "") # e.g. http://192.168.123.77/wichtig TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "") # optional ntfy auth token LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper() RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10")) logging.basicConfig( level=getattr(logging, LOG_LEVEL, logging.INFO), format="%(asctime)s %(levelname)-7s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("logwatch") # --------------------------------------------------------------------------- # Filtering logic # --------------------------------------------------------------------------- # Keywords that make a message IMPORTANT (case-insensitive) IMPORTANT_PATTERNS = [ re.compile(r, re.IGNORECASE) for r in [ r"updated?\s+\d+", # "Updated 3 containers" r"\d+\s+updated", # "3 updated" r"fail(ed|ure)?", # failures r"error", # errors r"could not", r"unable to", r"pull(ed)?\s+\w+", # pulled image r"restart(ed|ing)?", # restarts r"stopped", # stopped containers r"warn(ing)?", # warnings ] ] # Keywords that mark a message as NOT important (skip even if above matches) NOISE_PATTERNS = [ re.compile(r, re.IGNORECASE) for r in [ r"no\s+(new\s+)?updates?\s+(found|available)", # "No updates found" r"checked\s+\d+\s+container", # "Checked 42 containers" r"session\s+done", r"^watchtower\s+started", r"scanning\s+for\s+updates", r"0\s+updated.*0\s+failed", # nothing happened ] ] def is_important(title: str, message: str) -> bool: text = f"{title} {message}" for p in NOISE_PATTERNS: if p.search(text): log.debug("NOISE | %s", text[:120]) return False for p in IMPORTANT_PATTERNS: if p.search(text): return True return False # --------------------------------------------------------------------------- # Forwarding # --------------------------------------------------------------------------- def forward(title: str, message: str, priority: str = "default", tags: list[str] | None = None): if not TARGET_URL: log.warning("TARGET_URL not set – would forward: [%s] %s", title, message) return headers = { "Title": title, "Priority": priority, } if tags: headers["Tags"] = ",".join(tags) if TARGET_TOKEN: headers["Authorization"] = f"Bearer {TARGET_TOKEN}" try: resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10) resp.raise_for_status() log.info("FORWARDED | [%s] %s", title, message[:100]) except requests.RequestException as e: log.error("Forward failed: %s", e) def classify(title: str, message: str) -> tuple[str, list[str]]: """Return (priority, tags) for the ntfy message.""" text = f"{title} {message}".lower() if re.search(r"fail|error|could not|unable", text): return "high", ["rotating_light", "watchtower"] if re.search(r"updated|pulled|restart", text): return "default", ["arrow_up", "watchtower"] return "low", ["watchtower"] # --------------------------------------------------------------------------- # SSE consumer # --------------------------------------------------------------------------- def listen_once(): sse_url = SOURCE_URL.rstrip("/") + "/json" log.info("Connecting to %s", sse_url) with requests.get(sse_url, stream=True, timeout=90) as resp: resp.raise_for_status() for raw_line in resp.iter_lines(): if not raw_line: continue try: event = json.loads(raw_line) except json.JSONDecodeError: log.debug("Non-JSON line: %s", raw_line[:80]) continue event_type = event.get("event", "message") if event_type != "message": continue title = event.get("title", "") message = event.get("message", "") # Only process Watchtower messages (by title tag) if title and "watchtower" not in title.lower(): log.debug("SKIP (not watchtower) | [%s]", title) continue log.debug("RECEIVED | [%s] %s", title, message[:120]) if is_important(title, message): priority, tags = classify(title, message) forward(title, message, priority, tags) else: log.debug("FILTERED | [%s] %s", title, message[:80]) def main(): log.info("logwatch started – source: %s target: %s", SOURCE_URL, TARGET_URL or "(dry-run)") while True: try: listen_once() log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY) except requests.RequestException as e: log.error("Connection error: %s – retry in %ds", e, RECONNECT_DELAY) except KeyboardInterrupt: log.info("Stopped.") break time.sleep(RECONNECT_DELAY) if __name__ == "__main__": main()