refactor(darc_mowas_service): improve imports and type hints

- Replaced direct imports of `pytz` with dynamic imports for better compatibility.
- Updated type hints for server and loop attributes in the DARC MoWaS service.
- Enhanced XML handling by specifying `xml.dom.minidom.Element` in method signatures.
- Improved message processing logic to handle missing channels and ensure valid area descriptions.
- Refactored `_child_text` function to accept both `Document` and `Element` types for better flexibility.
- Adjusted unit tests to reflect changes in the DARC MoWaS service structure.
This commit is contained in:
agessaman
2026-04-25 20:37:47 -07:00
parent a5f3cb008a
commit 0f3ecea42e
4 changed files with 42 additions and 26 deletions
+37 -21
View File
@@ -14,22 +14,25 @@ https://www.darc.de/index.php?id=58435
"""
import asyncio
import random
import time
import xml.dom.minidom
from asyncio import AbstractEventLoop
from dataclasses import dataclass
from datetime import datetime
import time
from typing import Any
from typing import Any, cast
import aiohttp
from flask import (
Flask,
jsonify,
redirect,
request,
)
import aiohttp
import xml.dom.minidom
from werkzeug.serving import BaseWSGIServer, make_server
from modules import i18n
from werkzeug.serving import make_server
from modules.service_plugins.base_service import BaseServicePlugin
import random
class DARC_MoWaS_Service(BaseServicePlugin):
@@ -65,8 +68,9 @@ class DARC_MoWaS_Service(BaseServicePlugin):
}
self.app = Flask(__name__)
self._server = None
self._server_future = None
self._server: BaseWSGIServer | None = None
self._server_future: asyncio.Future[None] | None = None
self._loop: AbstractEventLoop | None = None
self._tasks: set[asyncio.Task] = set()
self._setup_routes()
@@ -79,7 +83,7 @@ class DARC_MoWaS_Service(BaseServicePlugin):
self.port,
)
self._server = make_server(self.host, self.port, self.app)
self._loop = asyncio.get_event_loop()
self._loop = asyncio.get_running_loop()
self._server_future = self._loop.run_in_executor(
None, self._server.serve_forever
)
@@ -141,14 +145,14 @@ class DARC_MoWaS_Service(BaseServicePlugin):
self.logger.warning("Failed to download %s: %s", url, exc)
self.logger.error("All download URLs failed for alert '%s'", data["title"])
def _process_emergency_cap(self, cap: xml.dom.Node) -> None:
def _process_emergency_cap(self, cap: xml.dom.minidom.Document) -> None:
"""
Implementation based on TR DE-Alert
"""
for alert in cap.getElementsByTagName("alert"):
self._process_alert(alert)
def _process_alert(self, cap_alert: xml.dom.Node) -> None:
def _process_alert(self, cap_alert: xml.dom.minidom.Element) -> None:
alert = TRDECapAlert.from_xml(cap_alert)
self.logger.info("process alert id '%s'", alert.identifier)
if not alert.info:
@@ -158,6 +162,9 @@ class DARC_MoWaS_Service(BaseServicePlugin):
# on NINA test messages the lang is missing
lang = info.language.lower()[:2] or "de"
channel = self.channels.get(lang)
if channel is None:
self.logger.warning("No channel configured for language '%s'", lang)
continue
message = self.make_cb_message(alert, info)
chunks = self.chunk_message(message)
task = asyncio.create_task(self._send_chunks(channel, chunks))
@@ -260,6 +267,7 @@ class DARC_MoWaS_Service(BaseServicePlugin):
certainty = (info.certainty or "").lower()
# note, that the TR-DE does not have a mapping for level 3
# other types like eu-test and eu-reserved, ... are not yet mapped in Germany
eu_level: str | None
match (status, scope, severity, urgency, certainty):
case ("actual", "public", "extreme", "immediate", "observed"):
eu_level = "eu-alert-level-1"
@@ -278,13 +286,17 @@ class DARC_MoWaS_Service(BaseServicePlugin):
f"services.darcmowas.messagetype.{eu_level}"
)
else:
severity = info.event
severity = info.event or ""
footer = []
# ignore pure polygon areas
area_texts = [x.areaDesc for x in info.area if "polygonal" not in x.areaDesc]
area_texts = [
area_desc
for area_desc in (x.areaDesc for x in info.area)
if area_desc and "polygonal" not in area_desc
]
if len(area_texts) == 1:
headline = f"[{severity} {info.area[0].areaDesc}] {info.headline}"
headline = f"[{severity} {area_texts[0]}] {info.headline}"
else:
headline = f"[{severity}] {info.headline}"
footer.append(
@@ -297,7 +309,7 @@ class DARC_MoWaS_Service(BaseServicePlugin):
"services.darcmowas.fields.sender", sender=alert.sender
)
)
return "\n".join(p for p in [headline, info.description] + footer if p)
return "\n".join(part for part in [headline, info.description] + footer if part)
@staticmethod
def chunk_message(text: str, max_length: int = 130) -> list[str]:
@@ -370,7 +382,9 @@ class DARC_MoWaS_Service(BaseServicePlugin):
)
def _child_text(node: xml.dom.Node, tag: str) -> str | None:
def _child_text(
node: xml.dom.minidom.Document | xml.dom.minidom.Element, tag: str
) -> str | None:
"""Return the text content of the first descendant element with the given tag name."""
elements = node.getElementsByTagName(tag)
if not elements or not elements[0].firstChild:
@@ -393,13 +407,13 @@ class TRDECapAlert:
info: list["TRDECapAlertInfo"]
@staticmethod
def from_xml(alert: xml.dom.Node) -> "TRDECapAlert":
def from_xml(alert: xml.dom.minidom.Element) -> "TRDECapAlert":
sent_str = _child_text(alert, "sent")
sent = datetime.fromisoformat(sent_str) if sent_str else None
infos = []
for info_el in alert.getElementsByTagName("info"):
infos.append(TRDECapAlertInfo.from_xml(info_el))
infos.append(TRDECapAlertInfo.from_xml(cast(xml.dom.minidom.Element, info_el)))
return TRDECapAlert(
identifier=_child_text(alert, "identifier"),
@@ -429,9 +443,10 @@ class TRDECapAlertInfo:
area: list["TRDECapAlertArea"]
@staticmethod
def from_xml(info: xml.dom.Node) -> "TRDECapAlertInfo":
def from_xml(info: xml.dom.minidom.Element) -> "TRDECapAlertInfo":
parameters = []
for param_el in info.getElementsByTagName("parameter"):
param_el = cast(xml.dom.minidom.Element, param_el)
name = _child_text(param_el, "valueName")
value = _child_text(param_el, "value")
if name is not None:
@@ -439,7 +454,7 @@ class TRDECapAlertInfo:
area = []
for area_el in info.getElementsByTagName("area"):
area.append(TRDECapAlertArea.from_xml(area_el) if area_el else None)
area.append(TRDECapAlertArea.from_xml(cast(xml.dom.minidom.Element, area_el)))
return TRDECapAlertInfo(
language=_child_text(info, "language") or "",
@@ -463,9 +478,10 @@ class TRDECapAlertArea:
geocode: list[tuple[str, str]]
@staticmethod
def from_xml(area: xml.dom.Node) -> "TRDECapAlertArea":
def from_xml(area: xml.dom.minidom.Element) -> "TRDECapAlertArea":
geocodes = []
for geocode_el in area.getElementsByTagName("geocode"):
geocode_el = cast(xml.dom.minidom.Element, geocode_el)
name = _child_text(geocode_el, "valueName")
value = _child_text(geocode_el, "value")
if name is not None:
+2 -2
View File
@@ -32,7 +32,7 @@ def is_valid_timezone(tz_str: str) -> bool:
except ZoneInfoNotFoundError:
return False
try:
import pytz
pytz = __import__("pytz")
pytz.timezone(tz_str.strip())
return True
except Exception:
@@ -48,7 +48,7 @@ def get_config_timezone(config: Any, logger: Optional[Any] = None) -> tuple[Any,
"""
timezone_str = (config.get('Bot', 'timezone', fallback='') or '').strip()
if timezone_str and is_valid_timezone(timezone_str):
import pytz
pytz = __import__("pytz")
return (pytz.timezone(timezone_str), timezone_str)
if timezone_str and logger:
logger.warning("Invalid timezone '%s', using system timezone", timezone_str)
+1 -1
View File
@@ -441,7 +441,7 @@ class TestMultibyteExtractPathFromMessage:
def test_mixed_length_parts_skipped(self):
msg = mock_message(content="trace", path="feed,01,ab")
result = self.cmd._extract_path_from_message(msg)
assert len(set(len(p) for p in result)) <= 1
assert len({len(p) for p in result}) <= 1
class TestMultibyteReciprocalPath:
+2 -2
View File
@@ -4,14 +4,14 @@ Unit tests for DARC MoWaS CAP alert parsing
"""
import xml.dom.minidom
from datetime import datetime, timezone, timedelta
from datetime import datetime, timedelta, timezone
import pytest
from modules.service_plugins.darc_mowas_service import (
TRDECapAlert,
TRDECapAlertInfo,
TRDECapAlertArea,
TRDECapAlertInfo,
)
DARC_MOWAS_EXAMPLE_CAP = """<?xml version="1.0" encoding="UTF-8"?>