Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
#include "MQTTRemote.h"
2
#include <algorithm>
3
#include <esp_err.h>
4
#include <esp_log.h>
5
#include <freertos/FreeRTOS.h>
6
#include <freertos/task.h>
7
 
8
#define RETRY_CONNECT_WAIT_MS 3000
9
 
10
#define LAST_WILL_MSG "offline"
11
 
12
void MQTTRemote::onMqttEvent(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
13
  MQTTRemote *_this = static_cast<MQTTRemote *>(handler_args);
14
  esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t)event_data;
15
  esp_mqtt_client_handle_t client = event->client;
16
 
17
  switch ((esp_mqtt_event_id_t)event_id) {
18
  case MQTT_EVENT_CONNECTED:
19
    ESP_LOGI(MQTTRemoteLog::TAG, "Connected!");
20
    _this->_connected = true;
21
    xEventGroupSetBits(_this->_connection_state_changed_event_group, ConnectionState::Connected);
22
 
23
    // And publish that we are now online.
24
    _this->publishMessageVerbose(_this->_last_will_topic, "online", true);
25
 
26
    // Subscribe to all topics.
27
    for (const auto &subscription : _this->_subscriptions) {
28
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
29
      esp_mqtt_client_subscribe_single(client, subscription.first.c_str(), 0);
30
#else
31
      esp_mqtt_client_subscribe(client, subscription.first.c_str(), 0);
32
#endif
33
    }
34
 
35
    break;
36
 
37
  case MQTT_EVENT_DISCONNECTED:
38
    ESP_LOGW(MQTTRemoteLog::TAG, "Disconnected.");
39
    _this->_connected = false;
40
    xEventGroupSetBits(_this->_connection_state_changed_event_group, ConnectionState::Disconnected);
41
    break;
42
 
43
  case MQTT_EVENT_ERROR:
44
    ESP_LOGE(MQTTRemoteLog::TAG, "MQTT_EVENT_ERROR: %s", strerror(event->error_handle->esp_transport_sock_errno));
45
    break;
46
 
47
  case MQTT_EVENT_SUBSCRIBED:
48
    ESP_LOGV(MQTTRemoteLog::TAG, "MQTT_EVENT_SUBSCRIBED");
49
    break;
50
 
51
  case MQTT_EVENT_UNSUBSCRIBED:
52
    ESP_LOGV(MQTTRemoteLog::TAG, "MQTT_EVENT_UNSUBSCRIBED");
53
    break;
54
 
55
  case MQTT_EVENT_PUBLISHED:
56
    ESP_LOGV(MQTTRemoteLog::TAG, "MQTT_EVENT_PUBLISHED");
57
    break;
58
 
59
  case MQTT_EVENT_DATA: {
60
    std::string topic = std::string(event->topic, event->topic_len);
61
    std::string msg = std::string(event->data, event->data_len);
62
    ESP_LOGV(MQTTRemoteLog::TAG, "Received message with topic %s and payload size %d", topic.c_str(), event->data_len);
63
    if (auto subscription = _this->_subscriptions.find(topic); subscription != _this->_subscriptions.end()) {
64
      ESP_LOGV(MQTTRemoteLog::TAG, "callback found");
65
      subscription->second(topic, msg);
66
    } else {
67
      ESP_LOGV(MQTTRemoteLog::TAG, "NO callback found");
68
    }
69
    break;
70
  }
71
 
72
  case MQTT_EVENT_BEFORE_CONNECT:
73
    ESP_LOGV(MQTTRemoteLog::TAG, "Trying to connect...");
74
    break;
75
 
76
  case MQTT_EVENT_DELETED:
77
    ESP_LOGV(MQTTRemoteLog::TAG, "MQTT_EVENT_DELETED");
78
    break;
79
 
80
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
81
  case MQTT_USER_EVENT:
82
    ESP_LOGV(MQTTRemoteLog::TAG, "MQTT_USER_EVENT");
83
    break;
84
#endif
85
 
86
  default:
87
    break;
88
  }
89
}
90
 
91
MQTTRemote::MQTTRemote(std::string client_id, std::string host, int port, std::string username, std::string password,
92
                       Configuration configuration)
93
    : _client_id(client_id), _last_will_topic(_client_id + "/status") {
94
 
95
  esp_mqtt_client_config_t mqtt_cfg = {};
96
 
97
  esp_mqtt_transport_t transport = MQTT_TRANSPORT_OVER_TCP;
98
  if (configuration.transport) {
99
    transport = *configuration.transport;
100
  } else {
101
    // try to deduce from schema.
102
    std::transform(host.begin(), host.end(), host.begin(), ::tolower);
103
    if (host.rfind("mqtt://", 0) == 0) {
104
      transport = MQTT_TRANSPORT_OVER_TCP;
105
      host = host.substr(7);
106
    } else if (host.rfind("mqtts://", 0) == 0) {
107
      transport = MQTT_TRANSPORT_OVER_SSL;
108
      host = host.substr(8);
109
    } else if (host.rfind("ws://", 0) == 0) {
110
      transport = MQTT_TRANSPORT_OVER_WS;
111
      host = host.substr(5);
112
    } else if (host.rfind("wss://", 0) == 0) {
113
      transport = MQTT_TRANSPORT_OVER_WSS;
114
      host = host.substr(6);
115
    }
116
  }
117
 
118
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
119
  mqtt_cfg.broker.address.hostname = host.c_str();
120
  mqtt_cfg.broker.address.transport = transport;
121
  if (transport == MQTT_TRANSPORT_OVER_SSL || transport == MQTT_TRANSPORT_OVER_WSS) {
122
    memcpy(&mqtt_cfg.broker.verification, &configuration.verification, sizeof(configuration.verification));
123
    ESP_LOGI(MQTTRemoteLog::TAG, "Using TLS verification");
124
    ESP_LOGI(MQTTRemoteLog::TAG, " -- use_global_ca_store: %d", mqtt_cfg.broker.verification.use_global_ca_store);
125
    ESP_LOGI(MQTTRemoteLog::TAG, " -- skip_cert_common_name_check: %d",
126
             mqtt_cfg.broker.verification.skip_cert_common_name_check);
127
  }
128
  mqtt_cfg.broker.address.port = port;
129
 
130
  mqtt_cfg.buffer.size = configuration.rx_buffer_size;
131
  mqtt_cfg.buffer.out_size = configuration.tx_buffer_size;
132
 
133
  mqtt_cfg.credentials.username = username.c_str();
134
  mqtt_cfg.credentials.client_id = client_id.c_str();
135
  mqtt_cfg.credentials.authentication.password = password.c_str();
136
 
137
  mqtt_cfg.network.reconnect_timeout_ms = RETRY_CONNECT_WAIT_MS;
138
  mqtt_cfg.network.disable_auto_reconnect = false;
139
 
140
  mqtt_cfg.session.keepalive = configuration.keep_alive_s;
141
  mqtt_cfg.session.disable_keepalive = false;
142
 
143
  mqtt_cfg.session.last_will.topic = _last_will_topic.c_str();
144
  mqtt_cfg.session.last_will.msg = LAST_WILL_MSG;
145
  mqtt_cfg.session.last_will.qos = 0;
146
  mqtt_cfg.session.last_will.retain = 0;
147
 
148
  if (configuration.task_size) {
149
    mqtt_cfg.task.stack_size = *configuration.task_size;
150
  }
151
#else
152
  mqtt_cfg.host = host.c_str();
153
  mqtt_cfg.transport = transport;
154
  if (transport == MQTT_TRANSPORT_OVER_SSL || transport == MQTT_TRANSPORT_OVER_WSS) {
155
    mqtt_cfg.use_global_ca_store = configuration.verification.use_global_ca_store;
156
    mqtt_cfg.cert_pem = configuration.verification.certificate;
157
    mqtt_cfg.cert_len = configuration.verification.certificate_len;
158
    mqtt_cfg.skip_cert_common_name_check = configuration.verification.skip_cert_common_name_check;
159
    mqtt_cfg.psk_hint_key = configuration.verification.psk_hint_key;
160
    mqtt_cfg.alpn_protos = configuration.verification.alpn_protos;
161
    ESP_LOGI(MQTTRemoteLog::TAG, "Using TLS verification");
162
    ESP_LOGI(MQTTRemoteLog::TAG, " -- use_global_ca_store: %d", mqtt_cfg.use_global_ca_store);
163
    ESP_LOGI(MQTTRemoteLog::TAG, " -- skip_cert_common_name_check: %d", mqtt_cfg.skip_cert_common_name_check);
164
  }
165
  mqtt_cfg.port = port;
166
 
167
  mqtt_cfg.buffer_size = configuration.rx_buffer_size;
168
  mqtt_cfg.out_buffer_size = configuration.tx_buffer_size;
169
 
170
  mqtt_cfg.username = username.c_str();
171
  mqtt_cfg.client_id = client_id.c_str();
172
  mqtt_cfg.password = password.c_str();
173
 
174
  mqtt_cfg.reconnect_timeout_ms = RETRY_CONNECT_WAIT_MS;
175
  mqtt_cfg.disable_auto_reconnect = false;
176
 
177
  mqtt_cfg.keepalive = configuration.keep_alive_s;
178
  mqtt_cfg.disable_keepalive = false;
179
 
180
  mqtt_cfg.lwt_topic = _last_will_topic.c_str();
181
  mqtt_cfg.lwt_msg = LAST_WILL_MSG;
182
  mqtt_cfg.lwt_msg_len = sizeof(LAST_WILL_MSG) - 1;
183
  mqtt_cfg.lwt_qos = 0;
184
  mqtt_cfg.lwt_retain = 0;
185
 
186
  if (configuration.task_size) {
187
    mqtt_cfg.task_stack = *configuration.task_size;
188
  }
189
#endif
190
 
191
  _mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
192
}
193
 
194
void MQTTRemote::start(std::function<void(bool)> on_connection_change, unsigned long task_size, uint8_t task_priority) {
195
  if (_started) {
196
    ESP_LOGW(MQTTRemoteLog::TAG, "Already started, cannot start again.");
197
    return;
198
  }
199
 
200
  _connection_state_changed_event_group = xEventGroupCreate();
201
 
202
  _on_connection_change = on_connection_change;
203
  if (_on_connection_change) {
204
    xTaskCreate(&runTask, "MQTTRemote_main_task", task_size, this, task_priority, NULL);
205
  }
206
 
207
  startInternal();
208
}
209
 
210
void MQTTRemote::start(EventGroupHandle_t connection_state_changed_event_group) {
211
  if (_started) {
212
    ESP_LOGW(MQTTRemoteLog::TAG, "Already started, cannot start again.");
213
    return;
214
  }
215
 
216
  _connection_state_changed_event_group = connection_state_changed_event_group;
217
 
218
  startInternal();
219
}
220
 
221
void MQTTRemote::startInternal() {
222
  xEventGroupClearBits(_connection_state_changed_event_group, 0xFF);
223
  ESP_ERROR_CHECK(esp_mqtt_client_register_event(_mqtt_client, MQTT_EVENT_ANY, onMqttEvent, this));
224
  ESP_ERROR_CHECK(esp_mqtt_client_start(_mqtt_client));
225
 
226
  _started = true;
227
}
228
 
229
void MQTTRemote::runTask(void *pvParams) {
230
  MQTTRemote *_this = static_cast<MQTTRemote *>(pvParams);
231
  while (1) {
232
    auto event_bits =
233
        xEventGroupWaitBits(_this->_connection_state_changed_event_group,
234
                            MQTTRemote::ConnectionState::Connected | MQTTRemote::ConnectionState::Disconnected, pdTRUE,
235
                            pdFALSE, portMAX_DELAY);
236
    if ((event_bits & MQTTRemote::ConnectionState::Connected) != 0) {
237
      _this->_on_connection_change(true);
238
    } else if ((event_bits & MQTTRemote::ConnectionState::Disconnected) != 0) {
239
      _this->_on_connection_change(false);
240
    }
241
  }
242
}
243
 
244
bool MQTTRemote::publishMessage(std::string topic, std::string message, bool retain, uint8_t qos) {
245
  if (!connected()) {
246
    ESP_LOGW(MQTTRemoteLog::TAG, "Not connected to server when trying to publish to topic %s.", topic.c_str());
247
    return false;
248
  }
249
  return esp_mqtt_client_publish(_mqtt_client, topic.c_str(), message.c_str(), message.length(), qos, retain) >= 0;
250
}
251
 
252
bool MQTTRemote::publishMessageVerbose(std::string topic, std::string message, bool retain, uint8_t qos) {
253
  if (!connected()) {
254
    ESP_LOGW(MQTTRemoteLog::TAG, "Not connected to server when trying to publish to topic %s.", topic.c_str());
255
    return false;
256
  }
257
 
258
  ESP_LOGI(MQTTRemoteLog::TAG, "About to publish message '%s' on topic '%s'...", message.c_str(), topic.c_str());
259
  bool r = publishMessage(topic, message, retain, qos);
260
  ESP_LOGI(MQTTRemoteLog::TAG, "Publish result: %s", (r ? "success" : "failure"));
261
  return r;
262
}
263
 
264
bool MQTTRemote::subscribe(std::string topic, IMQTTRemote::SubscriptionCallback message_callback) {
265
  if (_subscriptions.count(topic) > 0) {
266
    ESP_LOGW(MQTTRemoteLog::TAG, "Topic %s is already subscribed to.", topic.c_str());
267
    return false;
268
  }
269
 
270
  _subscriptions.emplace(topic, message_callback);
271
 
272
  if (!connected()) {
273
    ESP_LOGI(MQTTRemoteLog::TAG, "Not connected. Will subscribe once connected.");
274
    return false;
275
  }
276
 
277
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 1, 0)
278
  return esp_mqtt_client_subscribe_single(_mqtt_client, topic.c_str(), 0) >= 0;
279
#else
280
  return esp_mqtt_client_subscribe(_mqtt_client, topic.c_str(), 0) >= 0;
281
#endif
282
}
283
 
284
bool MQTTRemote::unsubscribe(std::string topic) {
285
  _subscriptions.erase(topic);
286
  return esp_mqtt_client_unsubscribe(_mqtt_client, topic.c_str()) >= 0;
287
}