Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
#include "MQTTRemote.h"
2
 
3
#define RETRY_CONNECT_WAIT_MS 3000
4
 
5
MQTTRemote::MQTTRemote(std::string client_id, std::string host, int port, std::string username, std::string password,
6
                       Configuration configuration)
7
    : _client_id(client_id), _host(host), _username(username), _password(password),
8
      _receive_verbose(configuration.receive_verbose), _mqtt_client(configuration.buffer_size) {
9
  _mqtt_client.begin(_host.c_str(), port, _wifi_client);
10
  _mqtt_client.setKeepAlive(configuration.keep_alive_s);
11
  std::function<void(MQTTClient * client, char topic[], char bytes[], int length)> callback =
12
      std::bind(&MQTTRemote::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3,
13
                std::placeholders::_4);
14
  _mqtt_client.onMessageAdvanced(callback);
15
}
16
 
17
void MQTTRemote::handle() {
18
  auto now = millis();
19
  auto connected = _mqtt_client.connected();
20
 
21
  if (!connected && (now - _last_connection_attempt_timestamp_ms > RETRY_CONNECT_WAIT_MS)) {
22
    Serial.print("MQTTRemote: Client not connected. Trying to connect... ");
23
    setupWill();
24
    auto r = _mqtt_client.connect(_client_id.c_str(), _username.c_str(), _password.c_str());
25
    if (r) {
26
      Serial.println("success!");
27
 
28
      // And publish that we are now online.
29
      publishMessageVerbose(_client_id + "/status", "online", true);
30
 
31
      // Subscribe to all topics.
32
      for (const auto &subscription : _subscriptions) {
33
        _mqtt_client.subscribe(subscription.first.c_str());
34
      }
35
    } else {
36
      Serial.println(("failed :(, rc=" + std::to_string(_mqtt_client.lastError())).c_str());
37
    }
38
    _last_connection_attempt_timestamp_ms = now;
39
  } else if (connected) {
40
    _mqtt_client.loop();
41
  }
42
 
43
  if (_on_connection_change && connected != _was_connected) {
44
    _on_connection_change(connected);
45
  }
46
  _was_connected = connected;
47
}
48
 
49
bool MQTTRemote::publishMessage(std::string topic, std::string message, bool retain, uint8_t qos) {
50
  if (!connected()) {
51
    Serial.println(("MQTTRemote: Wanted to publish to topic " + topic + ", but no connection to server.").c_str());
52
    return false;
53
  }
54
  return _mqtt_client.publish(topic.c_str(), message.c_str(), retain, qos);
55
}
56
 
57
bool MQTTRemote::publishMessageVerbose(std::string topic, std::string message, bool retain, uint8_t qos) {
58
  if (!connected()) {
59
    Serial.println(("MQTTRemote: Wanted to publish to topic " + topic + ", but no connection to server.").c_str());
60
    return false;
61
  }
62
  Serial.print(("MQTTRemote: About to publish message '" + message + "' on topic '" + topic + "'...: ").c_str());
63
  bool r = publishMessage(topic, message, retain, qos);
64
  Serial.println(std::to_string(r).c_str());
65
  return r;
66
}
67
 
68
bool MQTTRemote::subscribe(std::string topic, IMQTTRemote::SubscriptionCallback message_callback) {
69
  if (_subscriptions.count(topic) > 0) {
70
    Serial.println(("MQTTRemote: Warning: Topic " + topic + " is already subscribed to.").c_str());
71
    return false;
72
  }
73
 
74
  _subscriptions.emplace(topic, message_callback);
75
 
76
  if (!connected()) {
77
    Serial.println("MQTTRemote: Not connected. Will subscribe once connected.");
78
    return false;
79
  }
80
 
81
  return _mqtt_client.subscribe(topic.c_str());
82
}
83
 
84
bool MQTTRemote::unsubscribe(std::string topic) {
85
  _subscriptions.erase(topic);
86
  return _mqtt_client.unsubscribe(topic.c_str());
87
}
88
 
89
void MQTTRemote::onMessage(MQTTClient *client, char topic_cstr[], char message_cstr[], int message_size) {
90
  std::string topic = std::string(topic_cstr);
91
  if (_receive_verbose) {
92
    Serial.print(("Received message with topic " + topic).c_str());
93
  }
94
  if (auto subscription = _subscriptions.find(topic); subscription != _subscriptions.end()) {
95
    if (_receive_verbose) {
96
      Serial.print(" (callback found) ");
97
    }
98
    subscription->second(topic_cstr, message_cstr);
99
  } else {
100
    if (_receive_verbose) {
101
      Serial.print(" (NO callback found) ");
102
    }
103
  }
104
  if (_receive_verbose) {
105
    Serial.println(("and size: " + std::to_string(message_size)).c_str());
106
  }
107
}
108
 
109
void MQTTRemote::setupWill() { _mqtt_client.setWill(std::string(_client_id + "/status").c_str(), "offline", true, 0); }