From 0d1b5b17d3dcebbe79c731c62bad444a59e2a450 Mon Sep 17 00:00:00 2001 From: Scott Powell Date: Sat, 12 Jul 2025 12:26:16 +1000 Subject: [PATCH] * simple_sensor: added alert send queue, with retries, checks for ACKs, etc. Low pri alerts only 1 send attempt, otherwise 4 attempts --- examples/simple_sensor/SensorMesh.cpp | 130 +++++++++++++++++----- examples/simple_sensor/SensorMesh.h | 27 +++-- examples/simple_sensor/TimeSeriesData.cpp | 4 +- examples/simple_sensor/TimeSeriesData.h | 2 +- examples/simple_sensor/main.cpp | 7 +- 5 files changed, 126 insertions(+), 44 deletions(-) diff --git a/examples/simple_sensor/SensorMesh.cpp b/examples/simple_sensor/SensorMesh.cpp index 61444b2f..d542407d 100644 --- a/examples/simple_sensor/SensorMesh.cpp +++ b/examples/simple_sensor/SensorMesh.cpp @@ -58,6 +58,8 @@ #define LAZY_CONTACTS_WRITE_DELAY 5000 +#define ALERT_ACK_EXPIRY_MILLIS 6000 // wait 6 secs for ACKs to alert messages + static File openAppend(FILESYSTEM* _fs, const char* fname) { #if defined(NRF52_PLATFORM) || defined(STM32_PLATFORM) return _fs->open(fname, FILE_O_WRITE); @@ -163,6 +165,7 @@ static uint8_t getDataSize(uint8_t type) { case LPP_TEMPERATURE: case LPP_CONCENTRATION: case LPP_BAROMETRIC_PRESSURE: + case LPP_RELATIVE_HUMIDITY: case LPP_ALTITUDE: case LPP_VOLTAGE: case LPP_CURRENT: @@ -185,6 +188,7 @@ static uint32_t getMultiplier(uint8_t type) { return 100; case LPP_TEMPERATURE: case LPP_BAROMETRIC_PRESSURE: + case LPP_RELATIVE_HUMIDITY: return 10; } return 1; @@ -332,46 +336,54 @@ void SensorMesh::applyContactPermissions(const uint8_t* pubkey, uint16_t perms) dirty_contacts_expiry = futureMillis(LAZY_CONTACTS_WRITE_DELAY); // trigger saveContacts() } -void SensorMesh::sendAlert(AlertPriority pri, const char* text) { - int text_len = strlen(text); - uint16_t pri_mask = (pri == HIGH_PRI_ALERT) ? PERM_RECV_ALERTS_HI : PERM_RECV_ALERTS_LO; +void SensorMesh::sendAlert(ContactInfo* c, Trigger* t) { + int text_len = strlen(t->text); - // send text message to all contacts with RECV_ALERT permission - for (int i = 0; i < num_contacts; i++) { - auto c = &contacts[i]; - if ((c->permissions & pri_mask) == 0) continue; // contact does NOT want alert + uint8_t data[MAX_PACKET_PAYLOAD]; + memcpy(data, &t->timestamp, 4); + data[4] = (TXT_TYPE_PLAIN << 2) | t->attempt; // attempt and flags + memcpy(&data[5], t->text, text_len); - uint8_t data[MAX_PACKET_PAYLOAD]; - uint32_t now = getRTCClock()->getCurrentTimeUnique(); // need different timestamp per packet - memcpy(data, &now, 4); - data[4] = (TXT_TYPE_PLAIN << 2); // attempt and flags - memcpy(&data[5], text, text_len); - // calc expected ACK reply - // uint32_t expected_ack; - // mesh::Utils::sha256((uint8_t *)&expected_ack, 4, data, 5 + text_len, self_id.pub_key, PUB_KEY_SIZE); + // calc expected ACK reply + mesh::Utils::sha256((uint8_t *)&t->expected_acks[t->attempt], 4, data, 5 + text_len, self_id.pub_key, PUB_KEY_SIZE); + t->attempt++; - auto pkt = createDatagram(PAYLOAD_TYPE_TXT_MSG, c->id, c->shared_secret, data, 5 + text_len); - if (pkt) { - if (c->out_path_len >= 0) { // we have an out_path, so send DIRECT - sendDirect(pkt, c->out_path, c->out_path_len); - } else { - sendFlood(pkt); - } + auto pkt = createDatagram(PAYLOAD_TYPE_TXT_MSG, c->id, c->shared_secret, data, 5 + text_len); + if (pkt) { + if (c->out_path_len >= 0) { // we have an out_path, so send DIRECT + sendDirect(pkt, c->out_path, c->out_path_len); + } else { + sendFlood(pkt); } } + t->send_expiry = futureMillis(ALERT_ACK_EXPIRY_MILLIS); } void SensorMesh::alertIf(bool condition, Trigger& t, AlertPriority pri, const char* text) { if (condition) { - if (!t.triggered) { - t.triggered = true; - t.time = getRTCClock()->getCurrentTime(); - sendAlert(pri, text); + if (!t.isTriggered() && num_alert_tasks < MAX_CONCURRENT_ALERTS) { + StrHelper::strncpy(t.text, text, sizeof(t.text)); + t.pri = pri; + t.send_expiry = 0; // signal that initial send is needed + t.attempt = 4; + t.curr_contact_idx = -1; // start iterating thru contacts[] + + alert_tasks[num_alert_tasks++] = &t; // add to queue } } else { - if (t.triggered) { - t.triggered = false; - // TODO: apply debounce logic + if (t.isTriggered()) { + t.text[0] = 0; + // remove 't' from alert queue + int i = 0; + while (i < num_alert_tasks && alert_tasks[i] != &t) i++; + + if (i < num_alert_tasks) { // found, now delete from array + num_alert_tasks--; + while (i < num_alert_tasks) { + alert_tasks[i] = alert_tasks[i + 1]; + i++; + } + } } } } @@ -629,6 +641,20 @@ bool SensorMesh::onPeerPathRecv(mesh::Packet* packet, int sender_idx, const uint return false; } +void SensorMesh::onAckRecv(mesh::Packet* packet, uint32_t ack_crc) { + if (num_alert_tasks > 0) { + auto t = alert_tasks[0]; // check current alert task + for (int i = 0; i < t->attempt; i++) { + if (ack_crc == t->expected_acks[i]) { // matching ACK! + t->attempt = 4; // signal to move to next contact + t->send_expiry = 0; + packet->markDoNotRetransmit(); // ACK was for this node, so don't retransmit + return; + } + } + } +} + SensorMesh::SensorMesh(mesh::MainBoard& board, mesh::Radio& radio, mesh::MillisecondClock& ms, mesh::RNG& rng, mesh::RTCClock& rtc, mesh::MeshTables& tables) : mesh::Mesh(radio, ms, rng, rtc, *new StaticPoolPacketManager(32), tables), _cli(board, rtc, &_prefs, this), telemetry(MAX_PACKET_PAYLOAD - 4) @@ -637,6 +663,7 @@ SensorMesh::SensorMesh(mesh::MainBoard& board, mesh::Radio& radio, mesh::Millise next_local_advert = next_flood_advert = 0; dirty_contacts_expiry = 0; last_read_time = 0; + num_alert_tasks = 0; // defaults memset(&_prefs, 0, sizeof(_prefs)); @@ -736,7 +763,14 @@ float SensorMesh::getTelemValue(uint8_t channel, uint8_t type) { } bool SensorMesh::getGPS(uint8_t channel, float& lat, float& lon, float& alt) { - return false; // TODO + if (channel == TELEM_CHANNEL_SELF) { + lat = sensors.node_lat; + lon = sensors.node_lon; + alt = sensors.node_altitude; + return true; + } + // REVISIT: custom GPS channels?? + return false; } void SensorMesh::loop() { @@ -767,6 +801,42 @@ void SensorMesh::loop() { last_read_time = curr; } + // check the alert send queue + if (num_alert_tasks > 0) { + auto t = alert_tasks[0]; // process head of queue + + if (millisHasNowPassed(t->send_expiry)) { // next send needed? + if (t->attempt >= 4) { // max attempts reached, try next contact + t->curr_contact_idx++; + if (t->curr_contact_idx >= num_contacts) { // no more contacts to try? + num_alert_tasks--; // remove t from queue + for (int i = 0; i < num_alert_tasks; i++) { + alert_tasks[i] = alert_tasks[i + 1]; + } + } else { + auto c = &contacts[t->curr_contact_idx]; + uint16_t pri_mask = (t->pri == HIGH_PRI_ALERT) ? PERM_RECV_ALERTS_HI : PERM_RECV_ALERTS_LO; + + if (c->permissions & pri_mask) { // contact wants alert + // reset attempts + t->attempt = (t->pri == LOW_PRI_ALERT) ? 3 : 0; // Low pri alerts, start at attempt #3 (ie. only make ONE attempt) + t->timestamp = getRTCClock()->getCurrentTimeUnique(); // need unique timestamp per contact + + sendAlert(c, t); // NOTE: modifies attempt, expected_acks[] and send_expiry + } else { + // next contact tested in next ::loop() + } + } + } else if (t->curr_contact_idx < num_contacts) { + auto c = &contacts[t->curr_contact_idx]; // send next attempt + sendAlert(c, t); // NOTE: modifies attempt, expected_acks[] and send_expiry + } else { + // contact list has likely been modified while waiting for alert ACK, cancel this task + t->attempt = 4; // next ::loop() will remove t from queue + } + } + } + // is there are pending dirty contacts write needed? if (dirty_contacts_expiry && millisHasNowPassed(dirty_contacts_expiry)) { saveContacts(); diff --git a/examples/simple_sensor/SensorMesh.h b/examples/simple_sensor/SensorMesh.h index e92c163f..e0527fa0 100644 --- a/examples/simple_sensor/SensorMesh.h +++ b/examples/simple_sensor/SensorMesh.h @@ -55,7 +55,8 @@ struct ContactInfo { #define MAX_CONTACTS 32 #endif -#define MAX_SEARCH_RESULTS 8 +#define MAX_SEARCH_RESULTS 8 +#define MAX_CONCURRENT_ALERTS 4 class SensorMesh : public mesh::Mesh, public CommonCLICallbacks { public: @@ -99,13 +100,20 @@ protected: bool getGPS(uint8_t channel, float& lat, float& lon, float& alt); // alerts - struct Trigger { - bool triggered; - uint32_t time; - - Trigger() { triggered = false; time = 0; } - }; enum AlertPriority { LOW_PRI_ALERT, HIGH_PRI_ALERT }; + + struct Trigger { + uint32_t timestamp; + AlertPriority pri; + uint32_t expected_acks[4]; + int8_t curr_contact_idx; + uint8_t attempt; + unsigned long send_expiry; + char text[MAX_PACKET_PAYLOAD]; + + Trigger() { text[0] = 0; } + bool isTriggered() const { return text[0] != 0; } + }; void alertIf(bool condition, Trigger& t, AlertPriority pri, const char* text); virtual void onSensorDataRead() = 0; // for app to implement @@ -124,6 +132,7 @@ protected: void getPeerSharedSecret(uint8_t* dest_secret, int peer_idx) override; void onPeerDataRecv(mesh::Packet* packet, uint8_t type, int sender_idx, const uint8_t* secret, uint8_t* data, size_t len) override; bool onPeerPathRecv(mesh::Packet* packet, int sender_idx, const uint8_t* secret, uint8_t* path, uint8_t path_len, uint8_t extra_type, uint8_t* extra, uint8_t extra_len) override; + void onAckRecv(mesh::Packet* packet, uint32_t ack_crc) override; private: FILESYSTEM* _fs; @@ -137,6 +146,8 @@ private: CayenneLPP telemetry; uint32_t last_read_time; int matching_peer_indexes[MAX_SEARCH_RESULTS]; + int num_alert_tasks; + Trigger* alert_tasks[MAX_CONCURRENT_ALERTS]; void loadContacts(); void saveContacts(); @@ -146,6 +157,6 @@ private: ContactInfo* putContact(const mesh::Identity& id); void applyContactPermissions(const uint8_t* pubkey, uint16_t perms); - void sendAlert(AlertPriority pri, const char* text); + void sendAlert(ContactInfo* c, Trigger* t); }; diff --git a/examples/simple_sensor/TimeSeriesData.cpp b/examples/simple_sensor/TimeSeriesData.cpp index ff7daa25..f6157f9a 100644 --- a/examples/simple_sensor/TimeSeriesData.cpp +++ b/examples/simple_sensor/TimeSeriesData.cpp @@ -10,7 +10,7 @@ void TimeSeriesData::recordData(mesh::RTCClock* clock, float value) { } } -void TimeSeriesData::calcDataMinMaxAvg(mesh::RTCClock* clock, uint32_t start_secs_ago, uint32_t end_secs_ago, MinMaxAvg* dest, uint8_t channel, uint8_t lpp_type) const { +void TimeSeriesData::calcMinMaxAvg(mesh::RTCClock* clock, uint32_t start_secs_ago, uint32_t end_secs_ago, MinMaxAvg* dest, uint8_t channel, uint8_t lpp_type) const { int i = next, n = num_slots; uint32_t ago = clock->getCurrentTime() - last_timestamp; int num_values = 0; @@ -40,6 +40,6 @@ void TimeSeriesData::calcDataMinMaxAvg(mesh::RTCClock* clock, uint32_t start_sec if (num_values > 0) { dest->_avg = total / num_values; } else { - dest->_avg = NAN; + dest->_max = dest->_min = dest->_avg = NAN; } } diff --git a/examples/simple_sensor/TimeSeriesData.h b/examples/simple_sensor/TimeSeriesData.h index ea9e823b..6efa7834 100644 --- a/examples/simple_sensor/TimeSeriesData.h +++ b/examples/simple_sensor/TimeSeriesData.h @@ -24,6 +24,6 @@ public: } void recordData(mesh::RTCClock* clock, float value); - void calcDataMinMaxAvg(mesh::RTCClock* clock, uint32_t start_secs_ago, uint32_t end_secs_ago, MinMaxAvg* dest, uint8_t channel, uint8_t lpp_type) const; + void calcMinMaxAvg(mesh::RTCClock* clock, uint32_t start_secs_ago, uint32_t end_secs_ago, MinMaxAvg* dest, uint8_t channel, uint8_t lpp_type) const; }; diff --git a/examples/simple_sensor/main.cpp b/examples/simple_sensor/main.cpp index 792790d8..d1dd67c5 100644 --- a/examples/simple_sensor/main.cpp +++ b/examples/simple_sensor/main.cpp @@ -15,18 +15,19 @@ public: protected: /* ========================== custom logic here ========================== */ - Trigger low_batt; + Trigger low_batt, critical_batt; TimeSeriesData battery_data; void onSensorDataRead() override { float batt_voltage = getVoltage(TELEM_CHANNEL_SELF); battery_data.recordData(getRTCClock(), batt_voltage); // record battery - alertIf(batt_voltage < 3.4f, low_batt, HIGH_PRI_ALERT, "Battery low!"); + alertIf(batt_voltage < 3.4f, critical_batt, HIGH_PRI_ALERT, "Battery is critical!"); + alertIf(batt_voltage < 3.6f, low_batt, LOW_PRI_ALERT, "Battery is low"); } int querySeriesData(uint32_t start_secs_ago, uint32_t end_secs_ago, MinMaxAvg dest[], int max_num) override { - battery_data.calcDataMinMaxAvg(getRTCClock(), start_secs_ago, end_secs_ago, &dest[0], TELEM_CHANNEL_SELF, LPP_VOLTAGE); + battery_data.calcMinMaxAvg(getRTCClock(), start_secs_ago, end_secs_ago, &dest[0], TELEM_CHANNEL_SELF, LPP_VOLTAGE); return 1; } /* ======================================================================= */