From b099a2ecf8140c494be823f5d01de1f9a679abeb Mon Sep 17 00:00:00 2001 From: Manuel Schrape Date: Wed, 23 Feb 2022 19:40:31 +0100 Subject: [PATCH 1/7] Adding MQTT for local APRS Packet storage. --- data/is-cfg.json | 8 ++++ platformio.ini | 1 + src/LoRa_APRS_iGate.cpp | 11 ++++- src/Task.h | 2 + src/TaskMQTT.cpp | 88 +++++++++++++++++++++++++++++++++++ src/TaskMQTT.h | 22 +++++++++ src/TaskRouter.cpp | 7 ++- src/TaskRouter.h | 5 +- src/project_configuration.cpp | 14 ++++++ src/project_configuration.h | 11 +++++ 10 files changed, 165 insertions(+), 4 deletions(-) create mode 100644 src/TaskMQTT.cpp create mode 100644 src/TaskMQTT.h diff --git a/data/is-cfg.json b/data/is-cfg.json index bab9eae..190ad2c 100644 --- a/data/is-cfg.json +++ b/data/is-cfg.json @@ -65,5 +65,13 @@ } ] }, + "mqtt": { + "active": false, + "server": "", + "port": 1883, + "name": "", + "password": "", + "topic": "LoraAPRS/Data" + }, "ntp_server": "pool.ntp.org" } diff --git a/platformio.ini b/platformio.ini index c955992..ce8560b 100644 --- a/platformio.ini +++ b/platformio.ini @@ -13,6 +13,7 @@ lib_deps = peterus/APRS-Decoder-Lib @ 0.0.6 peterus/esp-logger @ 0.0.1 peterus/ESP-FTP-Server-Lib @ 0.9.5 + knolleary/PubSubClient@^2.8 check_tool = cppcheck check_flags = cppcheck: --suppress=*:*.pio\* --inline-suppr -DCPPCHECK --force lib -ilib/TimeLib -ilib/LoRa -ilib/NTPClient diff --git a/src/LoRa_APRS_iGate.cpp b/src/LoRa_APRS_iGate.cpp index 4f07a62..cdb7931 100644 --- a/src/LoRa_APRS_iGate.cpp +++ b/src/LoRa_APRS_iGate.cpp @@ -16,9 +16,10 @@ #include "TaskOTA.h" #include "TaskRouter.h" #include "TaskWifi.h" +#include "TaskMQTT.h" #include "project_configuration.h" -#define VERSION "22.7.0" +#define VERSION "22.7.0_zj" String create_lat_aprs(double lat); String create_long_aprs(double lng); @@ -26,6 +27,7 @@ String create_long_aprs(double lng); TaskQueue> toAprsIs; TaskQueue> fromModem; TaskQueue> toModem; +TaskQueue> toMQTT; System LoRaSystem; Configuration userConfig; @@ -37,8 +39,9 @@ WifiTask wifiTask; OTATask otaTask; NTPTask ntpTask; FTPTask ftpTask; +MQTTTask mqttTask(toMQTT); AprsIsTask aprsIsTask(toAprsIs); -RouterTask routerTask(fromModem, toModem, toAprsIs); +RouterTask routerTask(fromModem, toModem, toAprsIs, toMQTT); void setup() { Serial.begin(115200); @@ -114,6 +117,10 @@ void setup() { LoRaSystem.getTaskManager().addTask(&aprsIsTask); } + if (userConfig.mqtt.active) { + LoRaSystem.getTaskManager().addTask(&mqttTask); + } + LoRaSystem.getTaskManager().setup(LoRaSystem); LoRaSystem.getDisplay().showSpashScreen("LoRa APRS iGate", VERSION); diff --git a/src/Task.h b/src/Task.h index 4c8163b..3a06ab8 100644 --- a/src/Task.h +++ b/src/Task.h @@ -12,6 +12,7 @@ enum TaskNames TaskWifi, TaskRouter, TaskSize, + TaskMQTT, }; #define TASK_APRS_IS "AprsIsTask" @@ -22,5 +23,6 @@ enum TaskNames #define TASK_OTA "OTATask" #define TASK_WIFI "WifiTask" #define TASK_ROUTER "RouterTask" +#define TASK_MQTT "MQTTTask" #endif diff --git a/src/TaskMQTT.cpp b/src/TaskMQTT.cpp new file mode 100644 index 0000000..a808670 --- /dev/null +++ b/src/TaskMQTT.cpp @@ -0,0 +1,88 @@ +#include + +#include "Task.h" +#include "TaskMQTT.h" +#include "project_configuration.h" + +#include +#include + +#include + +WiFiClient wiFiClient; +PubSubClient _MQTT(wiFiClient); + + +MQTTTask::MQTTTask(TaskQueue> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _beginCalled(false), _toMQTT(toMQTT) { +} + +MQTTTask::~MQTTTask() { +} + +bool MQTTTask::setup(System &system) { + _MQTT.setServer(system.getUserConfig()->mqtt.server.c_str(), system.getUserConfig()->mqtt.port); + + return true; +} + +bool MQTTTask::loop(System &system) { + if (!_beginCalled) { + _beginCalled = true; + } + + if (!system.isWifiEthConnected()) { + return false; + } + + if (!_MQTT.connected()) { + connect(system); + } + + if (!_toMQTT.empty()) { + std::shared_ptr msg = _toMQTT.getElement(); + + DynamicJsonDocument _Data(1024); + String _r; + + _Data["Source"] = msg->getSource(); + _Data["Destination"] = msg->getDestination(); + _Data["Path"] = msg->getPath(); + _Data["Type"] = msg->getType().toString(); + String _body = msg->getBody()->encode(); + _body.replace("\n", ""); + _Data["Data"] = _body; + + serializeJson(_Data, _r); + + logPrintD("Send MQTT: "); + logPrintlnD(_r); + + String _topic = String(system.getUserConfig()->mqtt.topic); + + if (!_topic.endsWith("/")) { + _topic = _topic + "/"; + } + _topic = _topic + system.getUserConfig()->callsign; + + _MQTT.publish(_topic.c_str(), _r.c_str()); + } + _MQTT.loop(); + return true; +} + +bool MQTTTask::connect(const System &system) { + logPrintI("Connecting to MQTT broker: "); + logPrintI(system.getUserConfig()->mqtt.server); + logPrintI(" on port "); + logPrintlnI(String(system.getUserConfig()->mqtt.port)); + + if (_MQTT.connect(system.getUserConfig()->callsign.c_str(),system.getUserConfig()->mqtt.name.c_str() ,system.getUserConfig()->mqtt.password.c_str() )) { + logPrintI("Connected to MQTT broker as: "); + logPrintlnI(system.getUserConfig()->callsign); + + return true; + } else { + logPrintlnI("Connecting to MQTT broker faild. Try again later."); + } + return false; +} diff --git a/src/TaskMQTT.h b/src/TaskMQTT.h new file mode 100644 index 0000000..ca3304c --- /dev/null +++ b/src/TaskMQTT.h @@ -0,0 +1,22 @@ +#ifndef TASK_MQTT_H_ +#define TASK_MQTT_H_ + +#include +#include +#include + +class MQTTTask : public Task { +public: + MQTTTask(TaskQueue> &toMQTT); + virtual ~MQTTTask(); + + virtual bool setup(System &system) override; + virtual bool loop(System &system) override; + +private: + bool _beginCalled; + TaskQueue> &_toMQTT; + bool connect(const System &system); +}; + +#endif diff --git a/src/TaskRouter.cpp b/src/TaskRouter.cpp index a1aa985..61d7c2c 100644 --- a/src/TaskRouter.cpp +++ b/src/TaskRouter.cpp @@ -9,7 +9,7 @@ String create_lat_aprs(double lat); String create_long_aprs(double lng); -RouterTask::RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs) : Task(TASK_ROUTER, TaskRouter), _fromModem(fromModem), _toModem(toModem), _toAprsIs(toAprsIs) { +RouterTask::RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs, TaskQueue> &toMQTT) : Task(TASK_ROUTER, TaskRouter), _fromModem(fromModem), _toModem(toModem), _toAprsIs(toAprsIs), _toMQTT(toMQTT) { } RouterTask::~RouterTask() { @@ -33,6 +33,11 @@ bool RouterTask::loop(System &system) { // do routing if (!_fromModem.empty()) { std::shared_ptr modemMsg = _fromModem.getElement(); + std::shared_ptr DataMQTT = modemMsg; + + if (system.getUserConfig()->mqtt.active) { + _toMQTT.addElement(DataMQTT); + } if (system.getUserConfig()->aprs_is.active && modemMsg->getSource() != system.getUserConfig()->callsign) { std::shared_ptr aprsIsMsg = std::make_shared(*modemMsg); diff --git a/src/TaskRouter.h b/src/TaskRouter.h index 8d8d1ac..617f476 100644 --- a/src/TaskRouter.h +++ b/src/TaskRouter.h @@ -3,10 +3,12 @@ #include #include +#include class RouterTask : public Task { public: - RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs); + RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs, TaskQueue> &toMQTT +); virtual ~RouterTask(); virtual bool setup(System &system) override; @@ -16,6 +18,7 @@ private: TaskQueue> &_fromModem; TaskQueue> &_toModem; TaskQueue> &_toAprsIs; + TaskQueue> &_toMQTT; std::shared_ptr _beaconMsg; Timer _beacon_timer; diff --git a/src/project_configuration.cpp b/src/project_configuration.cpp index 3ca11f2..78a2510 100644 --- a/src/project_configuration.cpp +++ b/src/project_configuration.cpp @@ -77,6 +77,14 @@ void ProjectConfigurationManagement::readProjectConfiguration(DynamicJsonDocumen us.password = "ftp"; conf.ftp.users.push_back(us); } + if (data.containsKey("mqtt")) { + conf.mqtt.active = data["mqtt"]["active"] | false; + conf.mqtt.server = data["mqtt"]["server"].as(); + conf.mqtt.port = data["mqtt"]["port"].as(); + conf.mqtt.name = data["mqtt"]["name"].as(); + conf.mqtt.password = data["mqtt"]["password"].as(); + conf.mqtt.topic = data["mqtt"]["topic"].as(); + } if (data.containsKey("ntp_server")) conf.ntpServer = data["ntp_server"].as(); @@ -133,6 +141,12 @@ void ProjectConfigurationManagement::writeProjectConfiguration(Configuration &co v["name"] = u.name; v["password"] = u.password; } + data["mqtt"]["active"] = conf.mqtt.active; + data["mqtt"]["server"] = conf.mqtt.server; + data["mqtt"]["port"] = conf.mqtt.port; + data["mqtt"]["name"] = conf.mqtt.name; + data["mqtt"]["password"] = conf.mqtt.password; + data["mqtt"]["topic"] = conf.mqtt.topic; data["ntp_server"] = conf.ntpServer; data["board"] = conf.board; diff --git a/src/project_configuration.h b/src/project_configuration.h index 09745c4..12e94a7 100644 --- a/src/project_configuration.h +++ b/src/project_configuration.h @@ -117,6 +117,16 @@ public: std::list users; }; + class MQTT { + public: + bool active; + String server; + uint16_t port; + String name; + String password; + String topic; + }; + Configuration() : callsign("NOCALL-10"), board(""), ntpServer("pool.ntp.org"){}; String callsign; @@ -128,6 +138,7 @@ public: LoRa lora; Display display; Ftp ftp; + MQTT mqtt; String board; String ntpServer; }; From e9158a44fdfef464aea56c58d6940b4a0b5a42d3 Mon Sep 17 00:00:00 2001 From: Manuel Schrape Date: Thu, 24 Feb 2022 15:36:17 +0100 Subject: [PATCH 2/7] Removed unnecessary variable DataMQTT. --- src/TaskRouter.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/TaskRouter.cpp b/src/TaskRouter.cpp index 61d7c2c..2ab3cd5 100644 --- a/src/TaskRouter.cpp +++ b/src/TaskRouter.cpp @@ -33,10 +33,9 @@ bool RouterTask::loop(System &system) { // do routing if (!_fromModem.empty()) { std::shared_ptr modemMsg = _fromModem.getElement(); - std::shared_ptr DataMQTT = modemMsg; if (system.getUserConfig()->mqtt.active) { - _toMQTT.addElement(DataMQTT); + _toMQTT.addElement(modemMsg); } if (system.getUserConfig()->aprs_is.active && modemMsg->getSource() != system.getUserConfig()->callsign) { From c66d7d7d8f86c39db9df424b01b0ed45dc4140db Mon Sep 17 00:00:00 2001 From: Peter Buchegger Date: Thu, 24 Feb 2022 16:02:02 +0100 Subject: [PATCH 3/7] refactor MQTT task --- src/TaskMQTT.cpp | 71 ++++++++++++++++++++---------------------------- src/TaskMQTT.h | 13 +++++---- 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/TaskMQTT.cpp b/src/TaskMQTT.cpp index a808670..a8115ac 100644 --- a/src/TaskMQTT.cpp +++ b/src/TaskMQTT.cpp @@ -9,31 +9,22 @@ #include -WiFiClient wiFiClient; -PubSubClient _MQTT(wiFiClient); - - -MQTTTask::MQTTTask(TaskQueue> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _beginCalled(false), _toMQTT(toMQTT) { +MQTTTask::MQTTTask(TaskQueue> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _toMQTT(toMQTT), _MQTT(_client) { } MQTTTask::~MQTTTask() { } -bool MQTTTask::setup(System &system) { +bool MQTTTask::setup(System &system) { _MQTT.setServer(system.getUserConfig()->mqtt.server.c_str(), system.getUserConfig()->mqtt.port); - return true; } bool MQTTTask::loop(System &system) { - if (!_beginCalled) { - _beginCalled = true; - } - if (!system.isWifiEthConnected()) { return false; } - + if (!_MQTT.connected()) { connect(system); } @@ -41,48 +32,46 @@ bool MQTTTask::loop(System &system) { if (!_toMQTT.empty()) { std::shared_ptr msg = _toMQTT.getElement(); - DynamicJsonDocument _Data(1024); - String _r; + DynamicJsonDocument data(1024); + data["Source"] = msg->getSource(); + data["Destination"] = msg->getDestination(); + data["Path"] = msg->getPath(); + data["Type"] = msg->getType().toString(); + String body = msg->getBody()->encode(); + body.replace("\n", ""); + data["Data"] = body; - _Data["Source"] = msg->getSource(); - _Data["Destination"] = msg->getDestination(); - _Data["Path"] = msg->getPath(); - _Data["Type"] = msg->getType().toString(); - String _body = msg->getBody()->encode(); - _body.replace("\n", ""); - _Data["Data"] = _body; + String r; + serializeJson(data, r); - serializeJson(_Data, _r); - - logPrintD("Send MQTT: "); - logPrintlnD(_r); - - String _topic = String(system.getUserConfig()->mqtt.topic); - - if (!_topic.endsWith("/")) { - _topic = _topic + "/"; + String topic = String(system.getUserConfig()->mqtt.topic); + if (!topic.endsWith("/")) { + topic = topic + "/"; } - _topic = _topic + system.getUserConfig()->callsign; + topic = topic + system.getUserConfig()->callsign; - _MQTT.publish(_topic.c_str(), _r.c_str()); + logPrintD("Send MQTT with topic: \""); + logPrintD(topic); + logPrintD("\", data: "); + logPrintlnD(r); + + _MQTT.publish(topic.c_str(), r.c_str()); } _MQTT.loop(); return true; } bool MQTTTask::connect(const System &system) { - logPrintI("Connecting to MQTT broker: "); + logPrintI("Connecting to MQTT broker: "); logPrintI(system.getUserConfig()->mqtt.server); - logPrintI(" on port "); + logPrintI(" on port "); logPrintlnI(String(system.getUserConfig()->mqtt.port)); - - if (_MQTT.connect(system.getUserConfig()->callsign.c_str(),system.getUserConfig()->mqtt.name.c_str() ,system.getUserConfig()->mqtt.password.c_str() )) { - logPrintI("Connected to MQTT broker as: "); - logPrintlnI(system.getUserConfig()->callsign); + if (_MQTT.connect(system.getUserConfig()->callsign.c_str(), system.getUserConfig()->mqtt.name.c_str(), system.getUserConfig()->mqtt.password.c_str())) { + logPrintI("Connected to MQTT broker as: "); + logPrintlnI(system.getUserConfig()->callsign); return true; - } else { - logPrintlnI("Connecting to MQTT broker faild. Try again later."); - } + } + logPrintlnI("Connecting to MQTT broker faild. Try again later."); return false; } diff --git a/src/TaskMQTT.h b/src/TaskMQTT.h index ca3304c..0f0524f 100644 --- a/src/TaskMQTT.h +++ b/src/TaskMQTT.h @@ -2,8 +2,8 @@ #define TASK_MQTT_H_ #include -#include #include +#include class MQTTTask : public Task { public: @@ -12,11 +12,14 @@ public: virtual bool setup(System &system) override; virtual bool loop(System &system) override; - + private: - bool _beginCalled; - TaskQueue> &_toMQTT; - bool connect(const System &system); + TaskQueue> &_toMQTT; + + WiFiClient _client; + PubSubClient _MQTT; + + bool connect(const System &system); }; #endif From af6fd114bca5bc461bcf0fb00d3d494f874fcc5b Mon Sep 17 00:00:00 2001 From: Peter Buchegger Date: Thu, 24 Feb 2022 16:02:28 +0100 Subject: [PATCH 4/7] fixing format check --- src/LoRa_APRS_iGate.cpp | 2 +- src/project_configuration.cpp | 24 ++++++++++++------------ src/project_configuration.h | 10 +++++----- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/LoRa_APRS_iGate.cpp b/src/LoRa_APRS_iGate.cpp index cdb7931..074c8fa 100644 --- a/src/LoRa_APRS_iGate.cpp +++ b/src/LoRa_APRS_iGate.cpp @@ -11,12 +11,12 @@ #include "TaskDisplay.h" #include "TaskEth.h" #include "TaskFTP.h" +#include "TaskMQTT.h" #include "TaskModem.h" #include "TaskNTP.h" #include "TaskOTA.h" #include "TaskRouter.h" #include "TaskWifi.h" -#include "TaskMQTT.h" #include "project_configuration.h" #define VERSION "22.7.0_zj" diff --git a/src/project_configuration.cpp b/src/project_configuration.cpp index 78a2510..061c531 100644 --- a/src/project_configuration.cpp +++ b/src/project_configuration.cpp @@ -78,12 +78,12 @@ void ProjectConfigurationManagement::readProjectConfiguration(DynamicJsonDocumen conf.ftp.users.push_back(us); } if (data.containsKey("mqtt")) { - conf.mqtt.active = data["mqtt"]["active"] | false; - conf.mqtt.server = data["mqtt"]["server"].as(); - conf.mqtt.port = data["mqtt"]["port"].as(); - conf.mqtt.name = data["mqtt"]["name"].as(); + conf.mqtt.active = data["mqtt"]["active"] | false; + conf.mqtt.server = data["mqtt"]["server"].as(); + conf.mqtt.port = data["mqtt"]["port"].as(); + conf.mqtt.name = data["mqtt"]["name"].as(); conf.mqtt.password = data["mqtt"]["password"].as(); - conf.mqtt.topic = data["mqtt"]["topic"].as(); + conf.mqtt.topic = data["mqtt"]["topic"].as(); } if (data.containsKey("ntp_server")) conf.ntpServer = data["ntp_server"].as(); @@ -141,13 +141,13 @@ void ProjectConfigurationManagement::writeProjectConfiguration(Configuration &co v["name"] = u.name; v["password"] = u.password; } - data["mqtt"]["active"] = conf.mqtt.active; - data["mqtt"]["server"] = conf.mqtt.server; - data["mqtt"]["port"] = conf.mqtt.port; - data["mqtt"]["name"] = conf.mqtt.name; - data["mqtt"]["password"] = conf.mqtt.password; - data["mqtt"]["topic"] = conf.mqtt.topic; - data["ntp_server"] = conf.ntpServer; + data["mqtt"]["active"] = conf.mqtt.active; + data["mqtt"]["server"] = conf.mqtt.server; + data["mqtt"]["port"] = conf.mqtt.port; + data["mqtt"]["name"] = conf.mqtt.name; + data["mqtt"]["password"] = conf.mqtt.password; + data["mqtt"]["topic"] = conf.mqtt.topic; + data["ntp_server"] = conf.ntpServer; data["board"] = conf.board; } diff --git a/src/project_configuration.h b/src/project_configuration.h index 12e94a7..5fe2fb3 100644 --- a/src/project_configuration.h +++ b/src/project_configuration.h @@ -119,12 +119,12 @@ public: class MQTT { public: - bool active; - String server; + bool active; + String server; uint16_t port; - String name; - String password; - String topic; + String name; + String password; + String topic; }; Configuration() : callsign("NOCALL-10"), board(""), ntpServer("pool.ntp.org"){}; From 6534055ac5299bc6d7f6fa1d682dbf9f4b1aac65 Mon Sep 17 00:00:00 2001 From: Peter Buchegger Date: Thu, 24 Feb 2022 16:21:07 +0100 Subject: [PATCH 5/7] format check fix --- src/TaskRouter.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/TaskRouter.h b/src/TaskRouter.h index 617f476..348cdff 100644 --- a/src/TaskRouter.h +++ b/src/TaskRouter.h @@ -2,13 +2,12 @@ #define TASK_ROUTER_H_ #include -#include #include +#include class RouterTask : public Task { public: - RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs, TaskQueue> &toMQTT -); + RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs, TaskQueue> &toMQTT); virtual ~RouterTask(); virtual bool setup(System &system) override; From f5b10a03af1bde6c4f80e8fdea400c0c1478cada Mon Sep 17 00:00:00 2001 From: Peter Buchegger Date: Thu, 24 Feb 2022 16:24:02 +0100 Subject: [PATCH 6/7] include cleanup --- src/TaskMQTT.cpp | 3 --- src/TaskMQTT.h | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/TaskMQTT.cpp b/src/TaskMQTT.cpp index a8115ac..1b7d6ee 100644 --- a/src/TaskMQTT.cpp +++ b/src/TaskMQTT.cpp @@ -4,9 +4,6 @@ #include "TaskMQTT.h" #include "project_configuration.h" -#include -#include - #include MQTTTask::MQTTTask(TaskQueue> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _toMQTT(toMQTT), _MQTT(_client) { diff --git a/src/TaskMQTT.h b/src/TaskMQTT.h index 0f0524f..88fa35e 100644 --- a/src/TaskMQTT.h +++ b/src/TaskMQTT.h @@ -4,6 +4,7 @@ #include #include #include +#include class MQTTTask : public Task { public: From e1b3342e1251d987d64dee333ca7868c8728e252 Mon Sep 17 00:00:00 2001 From: Peter Buchegger Date: Thu, 24 Feb 2022 16:26:23 +0100 Subject: [PATCH 7/7] version update --- src/LoRa_APRS_iGate.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LoRa_APRS_iGate.cpp b/src/LoRa_APRS_iGate.cpp index 074c8fa..0352a6b 100644 --- a/src/LoRa_APRS_iGate.cpp +++ b/src/LoRa_APRS_iGate.cpp @@ -19,7 +19,7 @@ #include "TaskWifi.h" #include "project_configuration.h" -#define VERSION "22.7.0_zj" +#define VERSION "22.8.0" String create_lat_aprs(double lat); String create_long_aprs(double lng);