| 2 |
raymond |
1 |
#include "MQTTRemote.h"
|
|
|
2 |
|
|
|
3 |
#define RETRY_CONNECT_WAIT_MS 3000
|
|
|
4 |
|
|
|
5 |
MQTTRemote::MQTTRemote(std::string client_id, std::string host, int port, std::string username, std::string password,
|
|
|
6 |
Configuration configuration)
|
|
|
7 |
: _client_id(client_id), _host(host), _username(username), _password(password),
|
|
|
8 |
_receive_verbose(configuration.receive_verbose), _mqtt_client(configuration.buffer_size) {
|
|
|
9 |
_mqtt_client.begin(_host.c_str(), port, _wifi_client);
|
|
|
10 |
_mqtt_client.setKeepAlive(configuration.keep_alive_s);
|
|
|
11 |
std::function<void(MQTTClient * client, char topic[], char bytes[], int length)> callback =
|
|
|
12 |
std::bind(&MQTTRemote::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
|
|
|
13 |
std::placeholders::_4);
|
|
|
14 |
_mqtt_client.onMessageAdvanced(callback);
|
|
|
15 |
}
|
|
|
16 |
|
|
|
17 |
void MQTTRemote::handle() {
|
|
|
18 |
auto now = millis();
|
|
|
19 |
auto connected = _mqtt_client.connected();
|
|
|
20 |
|
|
|
21 |
if (!connected && (now - _last_connection_attempt_timestamp_ms > RETRY_CONNECT_WAIT_MS)) {
|
|
|
22 |
Serial.print("MQTTRemote: Client not connected. Trying to connect... ");
|
|
|
23 |
setupWill();
|
|
|
24 |
auto r = _mqtt_client.connect(_client_id.c_str(), _username.c_str(), _password.c_str());
|
|
|
25 |
if (r) {
|
|
|
26 |
Serial.println("success!");
|
|
|
27 |
|
|
|
28 |
// And publish that we are now online.
|
|
|
29 |
publishMessageVerbose(_client_id + "/status", "online", true);
|
|
|
30 |
|
|
|
31 |
// Subscribe to all topics.
|
|
|
32 |
for (const auto &subscription : _subscriptions) {
|
|
|
33 |
_mqtt_client.subscribe(subscription.first.c_str());
|
|
|
34 |
}
|
|
|
35 |
} else {
|
|
|
36 |
Serial.println(("failed :(, rc=" + std::to_string(_mqtt_client.lastError())).c_str());
|
|
|
37 |
}
|
|
|
38 |
_last_connection_attempt_timestamp_ms = now;
|
|
|
39 |
} else if (connected) {
|
|
|
40 |
_mqtt_client.loop();
|
|
|
41 |
}
|
|
|
42 |
|
|
|
43 |
if (_on_connection_change && connected != _was_connected) {
|
|
|
44 |
_on_connection_change(connected);
|
|
|
45 |
}
|
|
|
46 |
_was_connected = connected;
|
|
|
47 |
}
|
|
|
48 |
|
|
|
49 |
bool MQTTRemote::publishMessage(std::string topic, std::string message, bool retain, uint8_t qos) {
|
|
|
50 |
if (!connected()) {
|
|
|
51 |
Serial.println(("MQTTRemote: Wanted to publish to topic " + topic + ", but no connection to server.").c_str());
|
|
|
52 |
return false;
|
|
|
53 |
}
|
|
|
54 |
return _mqtt_client.publish(topic.c_str(), message.c_str(), retain, qos);
|
|
|
55 |
}
|
|
|
56 |
|
|
|
57 |
bool MQTTRemote::publishMessageVerbose(std::string topic, std::string message, bool retain, uint8_t qos) {
|
|
|
58 |
if (!connected()) {
|
|
|
59 |
Serial.println(("MQTTRemote: Wanted to publish to topic " + topic + ", but no connection to server.").c_str());
|
|
|
60 |
return false;
|
|
|
61 |
}
|
|
|
62 |
Serial.print(("MQTTRemote: About to publish message '" + message + "' on topic '" + topic + "'...: ").c_str());
|
|
|
63 |
bool r = publishMessage(topic, message, retain, qos);
|
|
|
64 |
Serial.println(std::to_string(r).c_str());
|
|
|
65 |
return r;
|
|
|
66 |
}
|
|
|
67 |
|
|
|
68 |
bool MQTTRemote::subscribe(std::string topic, IMQTTRemote::SubscriptionCallback message_callback) {
|
|
|
69 |
if (_subscriptions.count(topic) > 0) {
|
|
|
70 |
Serial.println(("MQTTRemote: Warning: Topic " + topic + " is already subscribed to.").c_str());
|
|
|
71 |
return false;
|
|
|
72 |
}
|
|
|
73 |
|
|
|
74 |
_subscriptions.emplace(topic, message_callback);
|
|
|
75 |
|
|
|
76 |
if (!connected()) {
|
|
|
77 |
Serial.println("MQTTRemote: Not connected. Will subscribe once connected.");
|
|
|
78 |
return false;
|
|
|
79 |
}
|
|
|
80 |
|
|
|
81 |
return _mqtt_client.subscribe(topic.c_str());
|
|
|
82 |
}
|
|
|
83 |
|
|
|
84 |
bool MQTTRemote::unsubscribe(std::string topic) {
|
|
|
85 |
_subscriptions.erase(topic);
|
|
|
86 |
return _mqtt_client.unsubscribe(topic.c_str());
|
|
|
87 |
}
|
|
|
88 |
|
|
|
89 |
void MQTTRemote::onMessage(MQTTClient *client, char topic_cstr[], char message_cstr[], int message_size) {
|
|
|
90 |
std::string topic = std::string(topic_cstr);
|
|
|
91 |
if (_receive_verbose) {
|
|
|
92 |
Serial.print(("Received message with topic " + topic).c_str());
|
|
|
93 |
}
|
|
|
94 |
if (auto subscription = _subscriptions.find(topic); subscription != _subscriptions.end()) {
|
|
|
95 |
if (_receive_verbose) {
|
|
|
96 |
Serial.print(" (callback found) ");
|
|
|
97 |
}
|
|
|
98 |
subscription->second(topic_cstr, message_cstr);
|
|
|
99 |
} else {
|
|
|
100 |
if (_receive_verbose) {
|
|
|
101 |
Serial.print(" (NO callback found) ");
|
|
|
102 |
}
|
|
|
103 |
}
|
|
|
104 |
if (_receive_verbose) {
|
|
|
105 |
Serial.println(("and size: " + std::to_string(message_size)).c_str());
|
|
|
106 |
}
|
|
|
107 |
}
|
|
|
108 |
|
|
|
109 |
void MQTTRemote::setupWill() { _mqtt_client.setWill(std::string(_client_id + "/status").c_str(), "offline", true, 0); }
|