From 4c64fc7117d80caec10c177594fd48cc8a629c40 Mon Sep 17 00:00:00 2001
From: prabathbr <60564552+prabathbr@users.noreply.github.com>
Date: Sat, 10 Jan 2026 12:17:07 +1100
Subject: [PATCH] serial_rss_bot example
serial_rss_bot example which broadcasts emergency bushfire warnings in VIC, AU
---
examples/serial_rss_bot.py | 251 +++++++++++++++++++++++++++++++++++++
1 file changed, 251 insertions(+)
create mode 100644 examples/serial_rss_bot.py
diff --git a/examples/serial_rss_bot.py b/examples/serial_rss_bot.py
new file mode 100644
index 0000000..5a36fea
--- /dev/null
+++ b/examples/serial_rss_bot.py
@@ -0,0 +1,251 @@
+import asyncio
+import logging
+import re
+import urllib.request
+import xml.etree.ElementTree as ET
+from html import unescape
+from html.parser import HTMLParser
+import hashlib
+
+
+from meshcore import MeshCore, EventType
+
+# =====================
+# Config
+# =====================
+SERIAL_PORT = "COM4" # change this to your serial port
+CHANNEL_IDXS = [6]#[5, 6] # change this to the index of your target channel
+
+RSS_URL = "https://data.emergency.vic.gov.au/Show?pageId=getIncidentRSS"
+POLL_INTERVAL_SEC = 300 # poll interval (seconds)
+
+# Only send items whose description (plain text) contains this keyword (case-insensitive)
+KEYWORDS_REQUIRED = ["BUSHFIRE"]
+
+# Only send these fields from the RSS description
+FIELDS_TO_SEND = [
+ "Type",
+ "Fire District",
+ "Location",
+ "Latitude",
+ "Longitude",
+ "Status",
+ "Size",
+]
+
+logging.basicConfig(level=logging.INFO)
+_LOGGER = logging.getLogger("serial_rssbot")
+
+
+class _HTMLTextExtractor(HTMLParser):
+ def __init__(self):
+ super().__init__()
+ self._chunks: list[str] = []
+
+ def handle_data(self, data: str) -> None:
+ if data:
+ self._chunks.append(data)
+
+ def get_text(self) -> str:
+ return "".join(self._chunks)
+
+
+def _strip_html(html: str) -> str:
+ """Convert HTML-ish fragments into plain text."""
+ if not html:
+ return ""
+
+ # Normalize
into newlines so field parsing is reliable.
+ html = re.sub(r"<\s*br\s*/?\s*>", "\n", html, flags=re.IGNORECASE)
+
+ parser = _HTMLTextExtractor()
+ try:
+ parser.feed(html)
+ parser.close()
+ text = parser.get_text()
+ except Exception:
+ text = re.sub(r"<[^>]+>", "\n", html)
+
+ text = unescape(text)
+ # Normalize whitespace but keep newlines as separators.
+ text = text.replace("\r", "\n")
+ text = re.sub(r"[ \t\f\v]+", " ", text)
+ text = re.sub(r"\n+", "\n", text)
+ return text.strip()
+
+
+def _fetch_rss_bytes(url: str) -> bytes:
+ req = urllib.request.Request(
+ url,
+ headers={
+ "User-Agent": "meshcore-rss-bot/1.0",
+ "Accept": "application/rss+xml, application/xml;q=0.9, */*;q=0.8",
+ },
+ method="GET",
+ )
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return resp.read()
+
+def extract_fields(raw_html: str) -> dict[str, str]:
+ """
+ Extract desired fields from the HTML description.
+ We strip HTML to text with newlines, then parse lines like "Field: value".
+ """
+ text = _strip_html(raw_html)
+ lines = [ln.strip() for ln in text.split("\n") if ln.strip()]
+
+ result: dict[str, str] = {}
+ for ln in lines:
+ m = re.match(r"^([^:]+):\s*(.*)$", ln)
+ if not m:
+ continue
+ key = m.group(1).strip()
+ val = m.group(2).strip()
+
+ # Match desired fields case-insensitively
+ for wanted in FIELDS_TO_SEND:
+ if key.lower() == wanted.lower():
+ result[wanted] = val
+ break
+
+ return result
+
+def _parse_rss_items(xml_bytes: bytes) -> list[dict[str, str]]:
+ """Return list of items with keys: id, description, raw_description."""
+ items: list[dict[str, str]] = []
+
+ root = ET.fromstring(xml_bytes)
+
+ # RSS 2.0: - ...
+ channel = root.find("channel") if root.tag.lower().endswith("rss") else root
+ if channel is None:
+ return items
+
+ for it in channel.findall("item"):
+ title = (it.findtext("title") or "").strip()
+ link = (it.findtext("link") or "").strip()
+ guid = (it.findtext("guid") or "").strip()
+ pub_date = (it.findtext("pubDate") or "").strip()
+
+ desc_html = it.findtext("description") or ""
+ desc_text = _strip_html(desc_html)
+
+ # stable_id = (guid or link or f"{title}|{pub_date}" or desc_text).strip()
+
+ base_id = (guid or link or f"{title}|{pub_date}").strip()
+
+ # fingerprint only the fields you output
+ fields = extract_fields(desc_html)
+ parts = []
+ for k in FIELDS_TO_SEND:
+ v = fields.get(k, "")
+ parts.append(f"{k}={v}")
+ fingerprint = hashlib.sha1("|".join(parts).encode("utf-8")).hexdigest()[:12]
+
+ stable_id = f"{base_id}|{fingerprint}"
+
+
+ items.append(
+ {
+ "id": stable_id,
+ "description": desc_text, # plain text for keyword filtering
+ "raw_description": desc_html, # original HTML-ish description
+ }
+ )
+
+ return items
+
+
+
+
+
+def build_filtered_message(item: dict[str, str]) -> str | None:
+ # Keyword filter (case-insensitive) on plain text description
+ desc_text_lower = (item.get("description") or "").lower()
+ if not any(k.lower() in desc_text_lower for k in KEYWORDS_REQUIRED):
+ return None
+
+ fields = extract_fields(item.get("raw_description") or "")
+ if not fields:
+ return None
+
+ # Keep order exactly as FIELDS_TO_SEND, skip missing
+ parts = []
+ for k in FIELDS_TO_SEND:
+ v = fields.get(k)
+ if v:
+ parts.append(f"{v}")
+
+ if not parts:
+ return None
+
+ return " | ".join(parts)
+
+
+async def _fetch_items_async() -> list[dict[str, str]]:
+ xml_bytes = await asyncio.to_thread(_fetch_rss_bytes, RSS_URL)
+ return _parse_rss_items(xml_bytes)
+
+
+async def main() -> None:
+ meshcore = await MeshCore.create_serial(SERIAL_PORT, debug=True)
+ print(f"Connected on {SERIAL_PORT}")
+
+ # await meshcore.start_auto_message_fetching()
+
+ seen_ids: set[str] = set()
+
+ # Prime seen set so we do not spam old items on startup
+ try:
+ initial_items = await _fetch_items_async()
+ for it in initial_items:
+ if it.get("id"):
+ seen_ids.add(it["id"])
+ _LOGGER.info("Primed %d existing RSS items as seen.", len(seen_ids))
+ except Exception as ex:
+ _LOGGER.warning("Failed to prime RSS items: %s", ex)
+
+ try:
+ while True:
+ try:
+ items = await _fetch_items_async()
+
+ # Feeds are usually newest-first. Send unseen in chronological order.
+ new_items = [it for it in items if it.get("id") and it["id"] not in seen_ids]
+ for it in reversed(new_items):
+ msg_text = build_filtered_message(it)
+
+ # Mark as seen even if it does not match filter, so we do not re-check forever
+ seen_ids.add(it["id"])
+
+ if not msg_text:
+ continue
+
+ # _LOGGER.info("Sending filtered RSS item to channel %s", CHANNEL_IDX)
+ # result = await meshcore.commands.send_chan_msg(CHANNEL_IDX, msg_text)
+ # if result.type == EventType.ERROR:
+ # _LOGGER.error("Error sending RSS message: %s", result.payload)
+ for ch in CHANNEL_IDXS:
+ _LOGGER.info("Sending filtered RSS item to channel %s", ch)
+ result = await meshcore.commands.send_chan_msg(ch, msg_text)
+ if result.type == EventType.ERROR:
+ _LOGGER.error("Error sending RSS message to channel %s: %s", ch, result.payload)
+ await asyncio.sleep(5) # seconds
+ await asyncio.sleep(5)
+
+
+ except Exception as ex:
+ _LOGGER.warning("RSS poll failed: %s", ex)
+
+ await asyncio.sleep(POLL_INTERVAL_SEC)
+
+ except KeyboardInterrupt:
+ print("Stopping...")
+ finally:
+ await meshcore.stop_auto_message_fetching()
+ await meshcore.disconnect()
+ print("Disconnected")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())