diff --git a/modules/service_plugins/darc_mowas_service.py b/modules/service_plugins/darc_mowas_service.py index 2a1cb7c..38a7e14 100644 --- a/modules/service_plugins/darc_mowas_service.py +++ b/modules/service_plugins/darc_mowas_service.py @@ -14,22 +14,25 @@ https://www.darc.de/index.php?id=58435 """ import asyncio +import random +import time +import xml.dom.minidom +from asyncio import AbstractEventLoop from dataclasses import dataclass from datetime import datetime -import time -from typing import Any +from typing import Any, cast + +import aiohttp from flask import ( Flask, jsonify, redirect, request, ) -import aiohttp -import xml.dom.minidom +from werkzeug.serving import BaseWSGIServer, make_server + from modules import i18n -from werkzeug.serving import make_server from modules.service_plugins.base_service import BaseServicePlugin -import random class DARC_MoWaS_Service(BaseServicePlugin): @@ -65,8 +68,9 @@ class DARC_MoWaS_Service(BaseServicePlugin): } self.app = Flask(__name__) - self._server = None - self._server_future = None + self._server: BaseWSGIServer | None = None + self._server_future: asyncio.Future[None] | None = None + self._loop: AbstractEventLoop | None = None self._tasks: set[asyncio.Task] = set() self._setup_routes() @@ -79,7 +83,7 @@ class DARC_MoWaS_Service(BaseServicePlugin): self.port, ) self._server = make_server(self.host, self.port, self.app) - self._loop = asyncio.get_event_loop() + self._loop = asyncio.get_running_loop() self._server_future = self._loop.run_in_executor( None, self._server.serve_forever ) @@ -141,14 +145,14 @@ class DARC_MoWaS_Service(BaseServicePlugin): self.logger.warning("Failed to download %s: %s", url, exc) self.logger.error("All download URLs failed for alert '%s'", data["title"]) - def _process_emergency_cap(self, cap: xml.dom.Node) -> None: + def _process_emergency_cap(self, cap: xml.dom.minidom.Document) -> None: """ Implementation based on TR DE-Alert """ for alert in cap.getElementsByTagName("alert"): self._process_alert(alert) - def _process_alert(self, cap_alert: xml.dom.Node) -> None: + def _process_alert(self, cap_alert: xml.dom.minidom.Element) -> None: alert = TRDECapAlert.from_xml(cap_alert) self.logger.info("process alert id '%s'", alert.identifier) if not alert.info: @@ -158,6 +162,9 @@ class DARC_MoWaS_Service(BaseServicePlugin): # on NINA test messages the lang is missing lang = info.language.lower()[:2] or "de" channel = self.channels.get(lang) + if channel is None: + self.logger.warning("No channel configured for language '%s'", lang) + continue message = self.make_cb_message(alert, info) chunks = self.chunk_message(message) task = asyncio.create_task(self._send_chunks(channel, chunks)) @@ -260,6 +267,7 @@ class DARC_MoWaS_Service(BaseServicePlugin): certainty = (info.certainty or "").lower() # note, that the TR-DE does not have a mapping for level 3 # other types like eu-test and eu-reserved, ... are not yet mapped in Germany + eu_level: str | None match (status, scope, severity, urgency, certainty): case ("actual", "public", "extreme", "immediate", "observed"): eu_level = "eu-alert-level-1" @@ -278,13 +286,17 @@ class DARC_MoWaS_Service(BaseServicePlugin): f"services.darcmowas.messagetype.{eu_level}" ) else: - severity = info.event + severity = info.event or "" footer = [] # ignore pure polygon areas - area_texts = [x.areaDesc for x in info.area if "polygonal" not in x.areaDesc] + area_texts = [ + area_desc + for area_desc in (x.areaDesc for x in info.area) + if area_desc and "polygonal" not in area_desc + ] if len(area_texts) == 1: - headline = f"[{severity} {info.area[0].areaDesc}] {info.headline}" + headline = f"[{severity} {area_texts[0]}] {info.headline}" else: headline = f"[{severity}] {info.headline}" footer.append( @@ -297,7 +309,7 @@ class DARC_MoWaS_Service(BaseServicePlugin): "services.darcmowas.fields.sender", sender=alert.sender ) ) - return "\n".join(p for p in [headline, info.description] + footer if p) + return "\n".join(part for part in [headline, info.description] + footer if part) @staticmethod def chunk_message(text: str, max_length: int = 130) -> list[str]: @@ -370,7 +382,9 @@ class DARC_MoWaS_Service(BaseServicePlugin): ) -def _child_text(node: xml.dom.Node, tag: str) -> str | None: +def _child_text( + node: xml.dom.minidom.Document | xml.dom.minidom.Element, tag: str +) -> str | None: """Return the text content of the first descendant element with the given tag name.""" elements = node.getElementsByTagName(tag) if not elements or not elements[0].firstChild: @@ -393,13 +407,13 @@ class TRDECapAlert: info: list["TRDECapAlertInfo"] @staticmethod - def from_xml(alert: xml.dom.Node) -> "TRDECapAlert": + def from_xml(alert: xml.dom.minidom.Element) -> "TRDECapAlert": sent_str = _child_text(alert, "sent") sent = datetime.fromisoformat(sent_str) if sent_str else None infos = [] for info_el in alert.getElementsByTagName("info"): - infos.append(TRDECapAlertInfo.from_xml(info_el)) + infos.append(TRDECapAlertInfo.from_xml(cast(xml.dom.minidom.Element, info_el))) return TRDECapAlert( identifier=_child_text(alert, "identifier"), @@ -429,9 +443,10 @@ class TRDECapAlertInfo: area: list["TRDECapAlertArea"] @staticmethod - def from_xml(info: xml.dom.Node) -> "TRDECapAlertInfo": + def from_xml(info: xml.dom.minidom.Element) -> "TRDECapAlertInfo": parameters = [] for param_el in info.getElementsByTagName("parameter"): + param_el = cast(xml.dom.minidom.Element, param_el) name = _child_text(param_el, "valueName") value = _child_text(param_el, "value") if name is not None: @@ -439,7 +454,7 @@ class TRDECapAlertInfo: area = [] for area_el in info.getElementsByTagName("area"): - area.append(TRDECapAlertArea.from_xml(area_el) if area_el else None) + area.append(TRDECapAlertArea.from_xml(cast(xml.dom.minidom.Element, area_el))) return TRDECapAlertInfo( language=_child_text(info, "language") or "", @@ -463,9 +478,10 @@ class TRDECapAlertArea: geocode: list[tuple[str, str]] @staticmethod - def from_xml(area: xml.dom.Node) -> "TRDECapAlertArea": + def from_xml(area: xml.dom.minidom.Element) -> "TRDECapAlertArea": geocodes = [] for geocode_el in area.getElementsByTagName("geocode"): + geocode_el = cast(xml.dom.minidom.Element, geocode_el) name = _child_text(geocode_el, "valueName") value = _child_text(geocode_el, "value") if name is not None: diff --git a/modules/utils.py b/modules/utils.py index d0dcb52..3584e56 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -32,7 +32,7 @@ def is_valid_timezone(tz_str: str) -> bool: except ZoneInfoNotFoundError: return False try: - import pytz + pytz = __import__("pytz") pytz.timezone(tz_str.strip()) return True except Exception: @@ -48,7 +48,7 @@ def get_config_timezone(config: Any, logger: Optional[Any] = None) -> tuple[Any, """ timezone_str = (config.get('Bot', 'timezone', fallback='') or '').strip() if timezone_str and is_valid_timezone(timezone_str): - import pytz + pytz = __import__("pytz") return (pytz.timezone(timezone_str), timezone_str) if timezone_str and logger: logger.warning("Invalid timezone '%s', using system timezone", timezone_str) diff --git a/tests/test_trace_command.py b/tests/test_trace_command.py index b6997b8..1d736b7 100644 --- a/tests/test_trace_command.py +++ b/tests/test_trace_command.py @@ -441,7 +441,7 @@ class TestMultibyteExtractPathFromMessage: def test_mixed_length_parts_skipped(self): msg = mock_message(content="trace", path="feed,01,ab") result = self.cmd._extract_path_from_message(msg) - assert len(set(len(p) for p in result)) <= 1 + assert len({len(p) for p in result}) <= 1 class TestMultibyteReciprocalPath: diff --git a/tests/unit/test_darc_mowas.py b/tests/unit/test_darc_mowas.py index 6cca80a..17ae61b 100644 --- a/tests/unit/test_darc_mowas.py +++ b/tests/unit/test_darc_mowas.py @@ -4,14 +4,14 @@ Unit tests for DARC MoWaS CAP alert parsing """ import xml.dom.minidom -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone import pytest from modules.service_plugins.darc_mowas_service import ( TRDECapAlert, - TRDECapAlertInfo, TRDECapAlertArea, + TRDECapAlertInfo, ) DARC_MOWAS_EXAMPLE_CAP = """