Files
logwatch/filter.py
LogWatch 42534e0894
Some checks failed
Build & Push Docker Image / build (push) Failing after 6s
fix: use (connect, None) timeout for streaming connection
A read timeout of 90s caused unnecessary reconnects during idle periods.
(10, None) = 10s connect timeout, no read timeout on the open stream.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 09:10:34 +01:00

169 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
logwatch - Watchtower ntfy filter
Subscribes to an ntfy channel, filters Watchtower notifications,
and forwards only important ones (updates, failures, errors) to another channel.
"""
import json
import logging
import os
import re
import time
import requests
# --- Configuration (from environment) ---
SOURCE_URL = os.environ.get("SOURCE_URL", "http://192.168.123.77/albert")
TARGET_URL = os.environ.get("TARGET_URL", "") # e.g. http://192.168.123.77/wichtig
TARGET_TOKEN = os.environ.get("TARGET_TOKEN", "") # optional ntfy auth token
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
RECONNECT_DELAY = int(os.environ.get("RECONNECT_DELAY", "10"))
logging.basicConfig(
level=getattr(logging, LOG_LEVEL, logging.INFO),
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("logwatch")
# ---------------------------------------------------------------------------
# Filtering logic
# ---------------------------------------------------------------------------
# Keywords that make a message IMPORTANT (case-insensitive)
IMPORTANT_PATTERNS = [
re.compile(r, re.IGNORECASE)
for r in [
r"updated?\s+\d+", # "Updated 3 containers"
r"\d+\s+updated", # "3 updated"
r"fail(ed|ure)?", # failures
r"error", # errors
r"could not",
r"unable to",
r"pull(ed)?\s+\w+", # pulled image
r"restart(ed|ing)?", # restarts
r"stopped", # stopped containers
r"warn(ing)?", # warnings
]
]
# Keywords that mark a message as NOT important (skip even if above matches)
NOISE_PATTERNS = [
re.compile(r, re.IGNORECASE)
for r in [
r"no\s+(new\s+)?updates?\s+(found|available)", # "No updates found"
r"checked\s+\d+\s+container", # "Checked 42 containers"
r"session\s+done",
r"^watchtower\s+started",
r"scanning\s+for\s+updates",
r"0\s+updated.*0\s+failed", # nothing happened
]
]
def is_important(title: str, message: str) -> bool:
text = f"{title} {message}"
for p in NOISE_PATTERNS:
if p.search(text):
log.debug("NOISE | %s", text[:120])
return False
for p in IMPORTANT_PATTERNS:
if p.search(text):
return True
return False
# ---------------------------------------------------------------------------
# Forwarding
# ---------------------------------------------------------------------------
def forward(title: str, message: str, priority: str = "default", tags: list[str] | None = None):
if not TARGET_URL:
log.warning("TARGET_URL not set would forward: [%s] %s", title, message)
return
headers = {
"Title": title,
"Priority": priority,
}
if tags:
headers["Tags"] = ",".join(tags)
if TARGET_TOKEN:
headers["Authorization"] = f"Bearer {TARGET_TOKEN}"
try:
resp = requests.post(TARGET_URL, data=message.encode(), headers=headers, timeout=10)
resp.raise_for_status()
log.info("FORWARDED | [%s] %s", title, message[:100])
except requests.RequestException as e:
log.error("Forward failed: %s", e)
def classify(title: str, message: str) -> tuple[str, list[str]]:
"""Return (priority, tags) for the ntfy message."""
text = f"{title} {message}".lower()
if re.search(r"fail|error|could not|unable", text):
return "high", ["rotating_light", "watchtower"]
if re.search(r"updated|pulled|restart", text):
return "default", ["arrow_up", "watchtower"]
return "low", ["watchtower"]
# ---------------------------------------------------------------------------
# SSE consumer
# ---------------------------------------------------------------------------
def listen_once():
sse_url = SOURCE_URL.rstrip("/") + "/json"
log.info("Connecting to %s", sse_url)
with requests.get(sse_url, stream=True, timeout=(10, None)) as resp:
resp.raise_for_status()
for raw_line in resp.iter_lines():
if not raw_line:
continue
try:
event = json.loads(raw_line)
except json.JSONDecodeError:
log.debug("Non-JSON line: %s", raw_line[:80])
continue
event_type = event.get("event", "message")
if event_type != "message":
continue
title = event.get("title", "")
message = event.get("message", "")
# Only process Watchtower messages (by title tag)
if title and "watchtower" not in title.lower():
log.debug("SKIP (not watchtower) | [%s]", title)
continue
log.debug("RECEIVED | [%s] %s", title, message[:120])
if is_important(title, message):
priority, tags = classify(title, message)
forward(title, message, priority, tags)
else:
log.debug("FILTERED | [%s] %s", title, message[:80])
def main():
log.info("logwatch started source: %s target: %s", SOURCE_URL, TARGET_URL or "(dry-run)")
while True:
try:
listen_once()
log.warning("Stream ended, reconnecting in %ds…", RECONNECT_DELAY)
except requests.RequestException as e:
log.error("Connection error: %s retry in %ds", e, RECONNECT_DELAY)
except KeyboardInterrupt:
log.info("Stopped.")
break
time.sleep(RECONNECT_DELAY)
if __name__ == "__main__":
main()