From b099a2ecf8140c494be823f5d01de1f9a679abeb Mon Sep 17 00:00:00 2001 From: Manuel Schrape Date: Wed, 23 Feb 2022 19:40:31 +0100 Subject: [PATCH] Adding MQTT for local APRS Packet storage. --- data/is-cfg.json | 8 ++++ platformio.ini | 1 + src/LoRa_APRS_iGate.cpp | 11 ++++- src/Task.h | 2 + src/TaskMQTT.cpp | 88 +++++++++++++++++++++++++++++++++++ src/TaskMQTT.h | 22 +++++++++ src/TaskRouter.cpp | 7 ++- src/TaskRouter.h | 5 +- src/project_configuration.cpp | 14 ++++++ src/project_configuration.h | 11 +++++ 10 files changed, 165 insertions(+), 4 deletions(-) create mode 100644 src/TaskMQTT.cpp create mode 100644 src/TaskMQTT.h diff --git a/data/is-cfg.json b/data/is-cfg.json index bab9eae..190ad2c 100644 --- a/data/is-cfg.json +++ b/data/is-cfg.json @@ -65,5 +65,13 @@ } ] }, + "mqtt": { + "active": false, + "server": "", + "port": 1883, + "name": "", + "password": "", + "topic": "LoraAPRS/Data" + }, "ntp_server": "pool.ntp.org" } diff --git a/platformio.ini b/platformio.ini index c955992..ce8560b 100644 --- a/platformio.ini +++ b/platformio.ini @@ -13,6 +13,7 @@ lib_deps = peterus/APRS-Decoder-Lib @ 0.0.6 peterus/esp-logger @ 0.0.1 peterus/ESP-FTP-Server-Lib @ 0.9.5 + knolleary/PubSubClient@^2.8 check_tool = cppcheck check_flags = cppcheck: --suppress=*:*.pio\* --inline-suppr -DCPPCHECK --force lib -ilib/TimeLib -ilib/LoRa -ilib/NTPClient diff --git a/src/LoRa_APRS_iGate.cpp b/src/LoRa_APRS_iGate.cpp index 4f07a62..cdb7931 100644 --- a/src/LoRa_APRS_iGate.cpp +++ b/src/LoRa_APRS_iGate.cpp @@ -16,9 +16,10 @@ #include "TaskOTA.h" #include "TaskRouter.h" #include "TaskWifi.h" +#include "TaskMQTT.h" #include "project_configuration.h" -#define VERSION "22.7.0" +#define VERSION "22.7.0_zj" String create_lat_aprs(double lat); String create_long_aprs(double lng); @@ -26,6 +27,7 @@ String create_long_aprs(double lng); TaskQueue> toAprsIs; TaskQueue> fromModem; TaskQueue> toModem; +TaskQueue> toMQTT; System LoRaSystem; Configuration userConfig; @@ -37,8 +39,9 @@ WifiTask wifiTask; OTATask otaTask; NTPTask ntpTask; FTPTask ftpTask; +MQTTTask mqttTask(toMQTT); AprsIsTask aprsIsTask(toAprsIs); -RouterTask routerTask(fromModem, toModem, toAprsIs); +RouterTask routerTask(fromModem, toModem, toAprsIs, toMQTT); void setup() { Serial.begin(115200); @@ -114,6 +117,10 @@ void setup() { LoRaSystem.getTaskManager().addTask(&aprsIsTask); } + if (userConfig.mqtt.active) { + LoRaSystem.getTaskManager().addTask(&mqttTask); + } + LoRaSystem.getTaskManager().setup(LoRaSystem); LoRaSystem.getDisplay().showSpashScreen("LoRa APRS iGate", VERSION); diff --git a/src/Task.h b/src/Task.h index 4c8163b..3a06ab8 100644 --- a/src/Task.h +++ b/src/Task.h @@ -12,6 +12,7 @@ enum TaskNames TaskWifi, TaskRouter, TaskSize, + TaskMQTT, }; #define TASK_APRS_IS "AprsIsTask" @@ -22,5 +23,6 @@ enum TaskNames #define TASK_OTA "OTATask" #define TASK_WIFI "WifiTask" #define TASK_ROUTER "RouterTask" +#define TASK_MQTT "MQTTTask" #endif diff --git a/src/TaskMQTT.cpp b/src/TaskMQTT.cpp new file mode 100644 index 0000000..a808670 --- /dev/null +++ b/src/TaskMQTT.cpp @@ -0,0 +1,88 @@ +#include + +#include "Task.h" +#include "TaskMQTT.h" +#include "project_configuration.h" + +#include +#include + +#include + +WiFiClient wiFiClient; +PubSubClient _MQTT(wiFiClient); + + +MQTTTask::MQTTTask(TaskQueue> &toMQTT) : Task(TASK_MQTT, TaskMQTT), _beginCalled(false), _toMQTT(toMQTT) { +} + +MQTTTask::~MQTTTask() { +} + +bool MQTTTask::setup(System &system) { + _MQTT.setServer(system.getUserConfig()->mqtt.server.c_str(), system.getUserConfig()->mqtt.port); + + return true; +} + +bool MQTTTask::loop(System &system) { + if (!_beginCalled) { + _beginCalled = true; + } + + if (!system.isWifiEthConnected()) { + return false; + } + + if (!_MQTT.connected()) { + connect(system); + } + + if (!_toMQTT.empty()) { + std::shared_ptr msg = _toMQTT.getElement(); + + DynamicJsonDocument _Data(1024); + String _r; + + _Data["Source"] = msg->getSource(); + _Data["Destination"] = msg->getDestination(); + _Data["Path"] = msg->getPath(); + _Data["Type"] = msg->getType().toString(); + String _body = msg->getBody()->encode(); + _body.replace("\n", ""); + _Data["Data"] = _body; + + serializeJson(_Data, _r); + + logPrintD("Send MQTT: "); + logPrintlnD(_r); + + String _topic = String(system.getUserConfig()->mqtt.topic); + + if (!_topic.endsWith("/")) { + _topic = _topic + "/"; + } + _topic = _topic + system.getUserConfig()->callsign; + + _MQTT.publish(_topic.c_str(), _r.c_str()); + } + _MQTT.loop(); + return true; +} + +bool MQTTTask::connect(const System &system) { + logPrintI("Connecting to MQTT broker: "); + logPrintI(system.getUserConfig()->mqtt.server); + logPrintI(" on port "); + logPrintlnI(String(system.getUserConfig()->mqtt.port)); + + if (_MQTT.connect(system.getUserConfig()->callsign.c_str(),system.getUserConfig()->mqtt.name.c_str() ,system.getUserConfig()->mqtt.password.c_str() )) { + logPrintI("Connected to MQTT broker as: "); + logPrintlnI(system.getUserConfig()->callsign); + + return true; + } else { + logPrintlnI("Connecting to MQTT broker faild. Try again later."); + } + return false; +} diff --git a/src/TaskMQTT.h b/src/TaskMQTT.h new file mode 100644 index 0000000..ca3304c --- /dev/null +++ b/src/TaskMQTT.h @@ -0,0 +1,22 @@ +#ifndef TASK_MQTT_H_ +#define TASK_MQTT_H_ + +#include +#include +#include + +class MQTTTask : public Task { +public: + MQTTTask(TaskQueue> &toMQTT); + virtual ~MQTTTask(); + + virtual bool setup(System &system) override; + virtual bool loop(System &system) override; + +private: + bool _beginCalled; + TaskQueue> &_toMQTT; + bool connect(const System &system); +}; + +#endif diff --git a/src/TaskRouter.cpp b/src/TaskRouter.cpp index a1aa985..61d7c2c 100644 --- a/src/TaskRouter.cpp +++ b/src/TaskRouter.cpp @@ -9,7 +9,7 @@ String create_lat_aprs(double lat); String create_long_aprs(double lng); -RouterTask::RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs) : Task(TASK_ROUTER, TaskRouter), _fromModem(fromModem), _toModem(toModem), _toAprsIs(toAprsIs) { +RouterTask::RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs, TaskQueue> &toMQTT) : Task(TASK_ROUTER, TaskRouter), _fromModem(fromModem), _toModem(toModem), _toAprsIs(toAprsIs), _toMQTT(toMQTT) { } RouterTask::~RouterTask() { @@ -33,6 +33,11 @@ bool RouterTask::loop(System &system) { // do routing if (!_fromModem.empty()) { std::shared_ptr modemMsg = _fromModem.getElement(); + std::shared_ptr DataMQTT = modemMsg; + + if (system.getUserConfig()->mqtt.active) { + _toMQTT.addElement(DataMQTT); + } if (system.getUserConfig()->aprs_is.active && modemMsg->getSource() != system.getUserConfig()->callsign) { std::shared_ptr aprsIsMsg = std::make_shared(*modemMsg); diff --git a/src/TaskRouter.h b/src/TaskRouter.h index 8d8d1ac..617f476 100644 --- a/src/TaskRouter.h +++ b/src/TaskRouter.h @@ -3,10 +3,12 @@ #include #include +#include class RouterTask : public Task { public: - RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs); + RouterTask(TaskQueue> &fromModem, TaskQueue> &toModem, TaskQueue> &toAprsIs, TaskQueue> &toMQTT +); virtual ~RouterTask(); virtual bool setup(System &system) override; @@ -16,6 +18,7 @@ private: TaskQueue> &_fromModem; TaskQueue> &_toModem; TaskQueue> &_toAprsIs; + TaskQueue> &_toMQTT; std::shared_ptr _beaconMsg; Timer _beacon_timer; diff --git a/src/project_configuration.cpp b/src/project_configuration.cpp index 3ca11f2..78a2510 100644 --- a/src/project_configuration.cpp +++ b/src/project_configuration.cpp @@ -77,6 +77,14 @@ void ProjectConfigurationManagement::readProjectConfiguration(DynamicJsonDocumen us.password = "ftp"; conf.ftp.users.push_back(us); } + if (data.containsKey("mqtt")) { + conf.mqtt.active = data["mqtt"]["active"] | false; + conf.mqtt.server = data["mqtt"]["server"].as(); + conf.mqtt.port = data["mqtt"]["port"].as(); + conf.mqtt.name = data["mqtt"]["name"].as(); + conf.mqtt.password = data["mqtt"]["password"].as(); + conf.mqtt.topic = data["mqtt"]["topic"].as(); + } if (data.containsKey("ntp_server")) conf.ntpServer = data["ntp_server"].as(); @@ -133,6 +141,12 @@ void ProjectConfigurationManagement::writeProjectConfiguration(Configuration &co v["name"] = u.name; v["password"] = u.password; } + data["mqtt"]["active"] = conf.mqtt.active; + data["mqtt"]["server"] = conf.mqtt.server; + data["mqtt"]["port"] = conf.mqtt.port; + data["mqtt"]["name"] = conf.mqtt.name; + data["mqtt"]["password"] = conf.mqtt.password; + data["mqtt"]["topic"] = conf.mqtt.topic; data["ntp_server"] = conf.ntpServer; data["board"] = conf.board; diff --git a/src/project_configuration.h b/src/project_configuration.h index 09745c4..12e94a7 100644 --- a/src/project_configuration.h +++ b/src/project_configuration.h @@ -117,6 +117,16 @@ public: std::list users; }; + class MQTT { + public: + bool active; + String server; + uint16_t port; + String name; + String password; + String topic; + }; + Configuration() : callsign("NOCALL-10"), board(""), ntpServer("pool.ntp.org"){}; String callsign; @@ -128,6 +138,7 @@ public: LoRa lora; Display display; Ftp ftp; + MQTT mqtt; String board; String ntpServer; };