From 1a4063bbe8378878363c360c2b5d43cadcf91498 Mon Sep 17 00:00:00 2001 From: Scott Powell Date: Sun, 2 Mar 2025 20:15:13 +1100 Subject: [PATCH] * companion radio: connection status now supported (Keep_alive pings, etc) --- examples/companion_radio/main.cpp | 25 +++++- examples/simple_room_server/main.cpp | 10 +-- src/helpers/BaseChatMesh.cpp | 117 ++++++++++++++++++++++++++- src/helpers/BaseChatMesh.h | 25 +++++- 4 files changed, 168 insertions(+), 9 deletions(-) diff --git a/examples/companion_radio/main.cpp b/examples/companion_radio/main.cpp index 912f9593..95034110 100644 --- a/examples/companion_radio/main.cpp +++ b/examples/companion_radio/main.cpp @@ -126,6 +126,8 @@ static uint32_t _atoi(const char* sp) { #define CMD_SEND_RAW_DATA 25 #define CMD_SEND_LOGIN 26 #define CMD_SEND_STATUS_REQ 27 +#define CMD_HAS_CONNECTION 28 +#define CMD_LOGOUT 29 // 'Disconnect' #define RESP_CODE_OK 0 #define RESP_CODE_ERR 1 @@ -435,7 +437,7 @@ protected: expected_ack_crc = 0; // reset our expected hash, now that we have received ACK return true; } - return false; + return checkConnectionsAck(data); } void queueMessage(const ContactInfo& from, uint8_t txt_type, uint8_t path_len, uint32_t sender_timestamp, const uint8_t* extra, int extra_len, const char *text) { @@ -465,14 +467,17 @@ protected: } void onMessageRecv(const ContactInfo& from, uint8_t path_len, uint32_t sender_timestamp, const char *text) override { + markConnectionActive(from); // in case this is from a server, and we have a connection queueMessage(from, TXT_TYPE_PLAIN, path_len, sender_timestamp, NULL, 0, text); } void onCommandDataRecv(const ContactInfo& from, uint8_t path_len, uint32_t sender_timestamp, const char *text) override { + markConnectionActive(from); // in case this is from a server, and we have a connection queueMessage(from, TXT_TYPE_CLI_DATA, path_len, sender_timestamp, NULL, 0, text); } void onSignedMessageRecv(const ContactInfo& from, uint8_t path_len, uint32_t sender_timestamp, const uint8_t *sender_prefix, const char *text) override { + markConnectionActive(from); saveContacts(); // from.sync_since change needs to be persisted queueMessage(from, TXT_TYPE_SIGNED_PLAIN, path_len, sender_timestamp, sender_prefix, 4, text); } @@ -513,7 +518,10 @@ protected: out_frame[i++] = PUSH_CODE_LOGIN_SUCCESS; out_frame[i++] = 0; // legacy: is_admin = false } else if (data[4] == RESP_SERVER_LOGIN_OK) { // new login response - // keep_alive_interval = data[5] * 16 + uint16_t keep_alive_secs = ((uint16_t)data[5]) * 16; + if (keep_alive_secs > 0) { + startConnection(contact, keep_alive_secs); + } out_frame[i++] = PUSH_CODE_LOGIN_SUCCESS; out_frame[i++] = data[6]; // permissions (eg. is_admin) } else { @@ -1021,6 +1029,17 @@ public: } else { writeErrFrame(); // contact not found } + } else if (cmd_frame[0] == CMD_HAS_CONNECTION && len >= 1+PUB_KEY_SIZE) { + uint8_t* pub_key = &cmd_frame[1]; + if (hasConnectionTo(pub_key)) { + writeOKFrame(); + } else { + writeErrFrame(); + } + } else if (cmd_frame[0] == CMD_LOGOUT && len >= 1+PUB_KEY_SIZE) { + uint8_t* pub_key = &cmd_frame[1]; + stopConnection(pub_key); + writeOKFrame(); } else { writeErrFrame(); MESH_DEBUG_PRINTLN("ERROR: unknown command: %02X", cmd_frame[0]); @@ -1050,6 +1069,8 @@ public: _serial->writeFrame(out_frame, 5); _iter_started = false; } + } else if (!_serial->isWriteBusy()) { + checkConnections(); } } }; diff --git a/examples/simple_room_server/main.cpp b/examples/simple_room_server/main.cpp index 95de6343..2c05ad4b 100644 --- a/examples/simple_room_server/main.cpp +++ b/examples/simple_room_server/main.cpp @@ -117,7 +117,8 @@ struct PostInfo { #define CLIENT_KEEP_ALIVE_SECS 128 -#define REQ_TYPE_KEEP_ALIVE 1 +#define REQ_TYPE_GET_STATUS 0x01 // same as _GET_STATS +#define REQ_TYPE_KEEP_ALIVE 0x02 #define RESP_SERVER_LOGIN_OK 0 // response to ANON_REQ @@ -436,12 +437,11 @@ protected: uint32_t forceSince = 0; if (len >= 9) { // optional - last post_timestamp client received memcpy(&forceSince, &data[5], 4); // NOTE: this may be 0, if part of decrypted PADDING! + } else { + memcpy(&data[5], &forceSince, 4); // make sure there are zeroes in payload (for ack_hash calc below) } if (forceSince > 0) { client->sync_since = forceSince; // force-update the 'sync since' - len = 9; // for ACK hash calc below - } else { - len = 5; // for ACK hash calc below } uint32_t now = getRTCClock()->getCurrentTime(); @@ -455,7 +455,7 @@ protected: // RULE: only send keep_alive response DIRECT! if (client->out_path_len >= 0) { uint32_t ack_hash; // calc ACK to prove to sender that we got request - mesh::Utils::sha256((uint8_t *) &ack_hash, 4, data, len, client->id.pub_key, PUB_KEY_SIZE); + mesh::Utils::sha256((uint8_t *) &ack_hash, 4, data, 9, client->id.pub_key, PUB_KEY_SIZE); auto reply = createAck(ack_hash); if (reply) { diff --git a/src/helpers/BaseChatMesh.cpp b/src/helpers/BaseChatMesh.cpp index 8b494ef4..9835bb7f 100644 --- a/src/helpers/BaseChatMesh.cpp +++ b/src/helpers/BaseChatMesh.cpp @@ -374,7 +374,7 @@ int BaseChatMesh::sendStatusRequest(const ContactInfo& recipient, uint32_t& est uint8_t temp[13]; uint32_t now = getRTCClock()->getCurrentTimeUnique(); memcpy(temp, &now, 4); // mostly an extra blob to help make packet_hash unique - temp[4] = CMD_GET_STATUS; + temp[4] = REQ_TYPE_GET_STATUS; memset(&temp[5], 0, 4); // reserved (possibly for 'since' param) getRNG()->random(&temp[9], 4); // random blob to help make packet-hash unique @@ -394,6 +394,121 @@ int BaseChatMesh::sendStatusRequest(const ContactInfo& recipient, uint32_t& est return MSG_SEND_FAILED; } +bool BaseChatMesh::startConnection(const ContactInfo& contact, uint16_t keep_alive_secs) { + int use_idx = -1; + for (int i = 0; i < MAX_CONNECTIONS; i++) { + if (connections[i].keep_alive_millis == 0) { // free slot? + use_idx = i; + } else if (connections[i].server_id.matches(contact.id)) { // already in table? + use_idx = i; + break; + } + } + if (use_idx < 0) { + return false; // table is full + } + connections[use_idx].server_id = contact.id; + uint32_t interval = connections[use_idx].keep_alive_millis = ((uint32_t)keep_alive_secs)*1000; + connections[use_idx].next_ping = futureMillis(interval); + connections[use_idx].expected_ack = 0; + connections[use_idx].last_activity = getRTCClock()->getCurrentTime(); + return true; // success +} + +void BaseChatMesh::stopConnection(const uint8_t* pub_key) { + for (int i = 0; i < MAX_CONNECTIONS; i++) { + if (connections[i].server_id.matches(pub_key)) { + connections[i].keep_alive_millis = 0; // mark slot as now free + connections[i].next_ping = 0; + connections[i].expected_ack = 0; + connections[i].last_activity = 0; + break; + } + } +} + +bool BaseChatMesh::hasConnectionTo(const uint8_t* pub_key) { + for (int i = 0; i < MAX_CONNECTIONS; i++) { + if (connections[i].keep_alive_millis > 0 && connections[i].server_id.matches(pub_key)) return true; + } + return false; +} + +void BaseChatMesh::markConnectionActive(const ContactInfo& contact) { + for (int i = 0; i < MAX_CONNECTIONS; i++) { + if (connections[i].keep_alive_millis > 0 && connections[i].server_id.matches(contact.id)) { + connections[i].last_activity = getRTCClock()->getCurrentTime(); + + // re-schedule next KEEP_ALIVE, now that we have heard from server + connections[i].next_ping = futureMillis(connections[i].keep_alive_millis); + break; + } + } +} + +bool BaseChatMesh::checkConnectionsAck(const uint8_t* data) { + for (int i = 0; i < MAX_CONNECTIONS; i++) { + if (connections[i].keep_alive_millis > 0 && memcmp(&connections[i].expected_ack, data, 4) == 0) { + // yes, got an ack for our keep_alive request! + connections[i].expected_ack = 0; + connections[i].last_activity = getRTCClock()->getCurrentTime(); + + // re-schedule next KEEP_ALIVE, now that we have heard from server + connections[i].next_ping = futureMillis(connections[i].keep_alive_millis); + return true; // yes, a match + } + } + return false; /// no match +} + +void BaseChatMesh::checkConnections() { + // scan connections[] table, send KEEP_ALIVE requests + for (int i = 0; i < MAX_CONNECTIONS; i++) { + if (connections[i].keep_alive_millis == 0) continue; // unused slot + + uint32_t now = getRTCClock()->getCurrentTime(); + uint32_t expire_secs = (connections[i].keep_alive_millis / 1000) * 5 / 2; // 2.5 x keep_alive interval + if (now >= connections[i].last_activity + expire_secs) { + // connection now lost + connections[i].keep_alive_millis = 0; + connections[i].next_ping = 0; + connections[i].expected_ack = 0; + connections[i].last_activity = 0; + continue; + } + + if (millisHasNowPassed(connections[i].next_ping)) { + auto contact = lookupContactByPubKey(connections[i].server_id.pub_key, PUB_KEY_SIZE); + if (contact == NULL) { + MESH_DEBUG_PRINTLN("checkConnections(): Keep_alive contact not found!"); + continue; + } + if (contact->out_path_len < 0) { + MESH_DEBUG_PRINTLN("checkConnections(): Keep_alive contact, no out_path!"); + continue; + } + + // send KEEP_ALIVE request + uint8_t data[9]; + uint32_t now = getRTCClock()->getCurrentTimeUnique(); + memcpy(data, &now, 4); + data[4] = REQ_TYPE_KEEP_ALIVE; + memcpy(&data[5], &contact->sync_since, 4); + + // calc expected ACK reply + mesh::Utils::sha256((uint8_t *)&connections[i].expected_ack, 4, data, 9, self_id.pub_key, PUB_KEY_SIZE); + + auto pkt = createDatagram(PAYLOAD_TYPE_REQ, contact->id, contact->shared_secret, data, 9); + if (pkt) { + sendDirect(pkt, contact->out_path, contact->out_path_len); + } + + // schedule next KEEP_ALIVE + connections[i].next_ping = futureMillis(connections[i].keep_alive_millis); + } + } +} + void BaseChatMesh::resetPathTo(ContactInfo& recipient) { recipient.out_path_len = -1; } diff --git a/src/helpers/BaseChatMesh.h b/src/helpers/BaseChatMesh.h index afc75bbe..0f2e0283 100644 --- a/src/helpers/BaseChatMesh.h +++ b/src/helpers/BaseChatMesh.h @@ -27,7 +27,8 @@ struct ContactInfo { #define MSG_SEND_SENT_FLOOD 1 #define MSG_SEND_SENT_DIRECT 2 -#define CMD_GET_STATUS 0x01 // same as _GET_STATS +#define REQ_TYPE_GET_STATUS 0x01 // same as _GET_STATS +#define REQ_TYPE_KEEP_ALIVE 0x02 #define RESP_SERVER_LOGIN_OK 0 // response to ANON_REQ @@ -48,6 +49,18 @@ public: #define MAX_CONTACTS 32 #endif +#ifndef MAX_CONNECTIONS + #define MAX_CONNECTIONS 16 +#endif + +struct ConnectionInfo { + mesh::Identity server_id; + unsigned long next_ping; + uint32_t last_activity; + uint32_t keep_alive_millis; + uint32_t expected_ack; +}; + /** * \brief abstract Mesh class for common 'chat' client */ @@ -66,6 +79,7 @@ class BaseChatMesh : public mesh::Mesh { #endif mesh::Packet* _pendingLoopback; uint8_t temp_buf[MAX_TRANS_UNIT]; + ConnectionInfo connections[MAX_CONNECTIONS]; mesh::Packet* composeMsgPacket(const ContactInfo& recipient, uint32_t timestamp, uint8_t attempt, const char *text, uint32_t& expected_ack); @@ -79,6 +93,7 @@ protected: #endif txt_send_timeout = 0; _pendingLoopback = NULL; + memset(connections, 0, sizeof(connections)); } // 'UI' concepts, for sub-classes to implement @@ -110,6 +125,14 @@ protected: #endif void onGroupDataRecv(mesh::Packet* packet, uint8_t type, const mesh::GroupChannel& channel, uint8_t* data, size_t len) override; + // Connections + bool startConnection(const ContactInfo& contact, uint16_t keep_alive_secs); + void stopConnection(const uint8_t* pub_key); + bool hasConnectionTo(const uint8_t* pub_key); + void markConnectionActive(const ContactInfo& contact); + bool checkConnectionsAck(const uint8_t* data); + void checkConnections(); + public: mesh::Packet* createSelfAdvert(const char* name, double lat=0.0, double lon=0.0); int sendMessage(const ContactInfo& recipient, uint32_t timestamp, uint8_t attempt, const char* text, uint32_t& expected_ack, uint32_t& est_timeout);