Files
logwatch/filter.py
LogWatch a62c16d708 feat: initial logwatch – Watchtower ntfy noise filter
Subscribes to an ntfy channel via JSON stream, filters Watchtower
notifications by importance (updates, failures, errors) and forwards
only the relevant ones to a second ntfy channel.

- filter.py: SSE consumer + regex-based importance filter + forwarder
- Dockerfile + docker-compose.yml for containerised deployment
- Configurable via env vars (SOURCE_URL, TARGET_URL, LOG_LEVEL, …)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 08:35:21 +01:00

169 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
logwatch - Watchtower ntfy filter
Subscribes to an ntfy channel, filters Watchtower notifications,
and forwards only important ones (updates, failures, errors) to another channel.
"""
import json
import logging
import os
import re
import time
import requests
# --- Configuration (from environment) ---
SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert")
TARGET_URL = os.environ.get("TARGET_URL", "") # e.g. http://192.168.123.77/wichtig
TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "") # optional ntfy auth token
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10"))
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("logwatch")
# ---------------------------------------------------------------------------
# Filtering logic
# ---------------------------------------------------------------------------
# Keywords that make a message IMPORTANT (case-insensitive)
IMPORTANT_PATTERNS = [
re.compile(r, re.IGNORECASE)
for r in [
r"updated?\s+\d+", # "Updated 3 containers"
r"\d+\s+updated", # "3 updated"
r"fail(ed|ure)?", # failures
r"error", # errors
r"could not",
r"unable to",
r"pull(ed)?\s+\w+", # pulled image
r"restart(ed|ing)?", # restarts
r"stopped", # stopped containers
r"warn(ing)?", # warnings
]
]
# Keywords that mark a message as NOT important (skip even if above matches)
NOISE_PATTERNS = [
re.compile(r, re.IGNORECASE)
for r in [
r"no\s+(new\s+)?updates?\s+(found|available)", # "No updates found"
r"checked\s+\d+\s+container", # "Checked 42 containers"
r"session\s+done",
r"^watchtower\s+started",
r"scanning\s+for\s+updates",
r"0\s+updated.*0\s+failed", # nothing happened
]
]
def is_important(title: str, message: str) -> bool:
text = f"{title} {message}"
for p in NOISE_PATTERNS:
if p.search(text):
log.debug("NOISE | %s", text[:120])
return False
for p in IMPORTANT_PATTERNS:
if p.search(text):
return True
return False
# ---------------------------------------------------------------------------
# Forwarding
# ---------------------------------------------------------------------------
def forward(title: str, message: str, priority: str = "default", tags: list[str] | None = None):
if not TARGET_URL:
log.warning("TARGET_URL not set would forward: [%s] %s", title, message)
return
headers = {
"Title": title,
"Priority": priority,
}
if tags:
headers["Tags"] = ",".join(tags)
if TARGET_TOKEN:
headers["Authorization"] = f"Bearer {TARGET_TOKEN}"
try:
resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10)
resp.raise_for_status()
log.info("FORWARDED | [%s] %s", title, message[:100])
except requests.RequestException as e:
log.error("Forward failed: %s", e)
def classify(title: str, message: str) -> tuple[str, list[str]]:
"""Return (priority, tags) for the ntfy message."""
text = f"{title} {message}".lower()
if re.search(r"fail|error|could not|unable", text):
return "high", ["rotating_light", "watchtower"]
if re.search(r"updated|pulled|restart", text):
return "default", ["arrow_up", "watchtower"]
return "low", ["watchtower"]
# ---------------------------------------------------------------------------
# SSE consumer
# ---------------------------------------------------------------------------
def listen_once():
sse_url = SOURCE_URL.rstrip("/") + "/json"
log.info("Connecting to %s", sse_url)
with requests.get(sse_url, stream=True, timeout=90) as resp:
resp.raise_for_status()
for raw_line in resp.iter_lines():
if not raw_line:
continue
try:
event = json.loads(raw_line)
except json.JSONDecodeError:
log.debug("Non-JSON line: %s", raw_line[:80])
continue
event_type = event.get("event", "message")
if event_type != "message":
continue
title = event.get("title", "")
message = event.get("message", "")
# Only process Watchtower messages (by title tag)
if title and "watchtower" not in title.lower():
log.debug("SKIP (not watchtower) | [%s]", title)
continue
log.debug("RECEIVED | [%s] %s", title, message[:120])
if is_important(title, message):
priority, tags = classify(title, message)
forward(title, message, priority, tags)
else:
log.debug("FILTERED | [%s] %s", title, message[:80])
def main():
log.info("logwatch started source: %s target: %s", SOURCE_URL, TARGET_URL or "(dry-run)")
while True:
try:
listen_once()
log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY)
except requests.RequestException as e:
log.error("Connection error: %s retry in %ds", e, RECONNECT_DELAY)
except KeyboardInterrupt:
log.info("Stopped.")
break
time.sleep(RECONNECT_DELAY)
if __name__ == "__main__":
main()