feat: initial logwatch – Watchtower ntfy noise filter

Subscribes to an ntfy channel via JSON stream, filters Watchtower
notifications by importance (updates, failures, errors) and forwards
only the relevant ones to a second ntfy channel.

- filter.py: SSE consumer + regex-based importance filter + forwarder
- Dockerfile + docker-compose.yml for containerised deployment
- Configurable via env vars (SOURCE_URL, TARGET_URL, LOG_LEVEL, …)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
LogWatch
2026-03-22 08:35:21 +01:00
commit a62c16d708
5 changed files with 206 additions and 0 deletions

168
filter.py Normal file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""
logwatch - Watchtower ntfy filter
Subscribes to an ntfy channel, filters Watchtower notifications,
and forwards only important ones (updates, failures, errors) to another channel.
"""
import json
import logging
import os
import re
import time
import requests
# --- Configuration (from environment) ---
SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert")
TARGET_URL = os.environ.get("TARGET_URL", "") # e.g. http://192.168.123.77/wichtig
TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "") # optional ntfy auth token
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10"))
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("logwatch")
# ---------------------------------------------------------------------------
# Filtering logic
# ---------------------------------------------------------------------------
# Keywords that make a message IMPORTANT (case-insensitive)
IMPORTANT_PATTERNS = [
re.compile(r, re.IGNORECASE)
for r in [
r"updated?\s+\d+", # "Updated 3 containers"
r"\d+\s+updated", # "3 updated"
r"fail(ed|ure)?", # failures
r"error", # errors
r"could not",
r"unable to",
r"pull(ed)?\s+\w+", # pulled image
r"restart(ed|ing)?", # restarts
r"stopped", # stopped containers
r"warn(ing)?", # warnings
]
]
# Keywords that mark a message as NOT important (skip even if above matches)
NOISE_PATTERNS = [
re.compile(r, re.IGNORECASE)
for r in [
r"no\s+(new\s+)?updates?\s+(found|available)", # "No updates found"
r"checked\s+\d+\s+container", # "Checked 42 containers"
r"session\s+done",
r"^watchtower\s+started",
r"scanning\s+for\s+updates",
r"0\s+updated.*0\s+failed", # nothing happened
]
]
def is_important(title: str, message: str) -> bool:
text = f"{title} {message}"
for p in NOISE_PATTERNS:
if p.search(text):
log.debug("NOISE | %s", text[:120])
return False
for p in IMPORTANT_PATTERNS:
if p.search(text):
return True
return False
# ---------------------------------------------------------------------------
# Forwarding
# ---------------------------------------------------------------------------
def forward(title: str, message: str, priority: str = "default", tags: list[str] | None = None):
if not TARGET_URL:
log.warning("TARGET_URL not set would forward: [%s] %s", title, message)
return
headers = {
"Title": title,
"Priority": priority,
}
if tags:
headers["Tags"] = ",".join(tags)
if TARGET_TOKEN:
headers["Authorization"] = f"Bearer {TARGET_TOKEN}"
try:
resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10)
resp.raise_for_status()
log.info("FORWARDED | [%s] %s", title, message[:100])
except requests.RequestException as e:
log.error("Forward failed: %s", e)
def classify(title: str, message: str) -> tuple[str, list[str]]:
"""Return (priority, tags) for the ntfy message."""
text = f"{title} {message}".lower()
if re.search(r"fail|error|could not|unable", text):
return "high", ["rotating_light", "watchtower"]
if re.search(r"updated|pulled|restart", text):
return "default", ["arrow_up", "watchtower"]
return "low", ["watchtower"]
# ---------------------------------------------------------------------------
# SSE consumer
# ---------------------------------------------------------------------------
def listen_once():
sse_url = SOURCE_URL.rstrip("/") + "/json"
log.info("Connecting to %s", sse_url)
with requests.get(sse_url, stream=True, timeout=90) as resp:
resp.raise_for_status()
for raw_line in resp.iter_lines():
if not raw_line:
continue
try:
event = json.loads(raw_line)
except json.JSONDecodeError:
log.debug("Non-JSON line: %s", raw_line[:80])
continue
event_type = event.get("event", "message")
if event_type != "message":
continue
title = event.get("title", "")
message = event.get("message", "")
# Only process Watchtower messages (by title tag)
if title and "watchtower" not in title.lower():
log.debug("SKIP (not watchtower) | [%s]", title)
continue
log.debug("RECEIVED | [%s] %s", title, message[:120])
if is_important(title, message):
priority, tags = classify(title, message)
forward(title, message, priority, tags)
else:
log.debug("FILTERED | [%s] %s", title, message[:80])
def main():
log.info("logwatch started source: %s target: %s", SOURCE_URL, TARGET_URL or "(dry-run)")
while True:
try:
listen_once()
log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY)
except requests.RequestException as e:
log.error("Connection error: %s retry in %ds", e, RECONNECT_DELAY)
except KeyboardInterrupt:
log.info("Stopped.")
break
time.sleep(RECONNECT_DELAY)
if __name__ == "__main__":
main()