Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
/*
2
 
3
  PubSubClient.cpp - A simple client for MQTT.
4
  Nick O'Leary
5
  http://knolleary.net
6
*/
7
 
8
#include "PubSubClient.h"
9
#include "Arduino.h"
10
 
11
PubSubClient::PubSubClient() {
12
    this->_state = MQTT_DISCONNECTED;
13
    this->_client = NULL;
14
    this->stream = NULL;
15
    setCallback(NULL);
16
    this->bufferSize = 0;
17
    setBufferSize(MQTT_MAX_PACKET_SIZE);
18
    setKeepAlive(MQTT_KEEPALIVE);
19
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
20
}
21
 
22
PubSubClient::PubSubClient(Client& client) {
23
    this->_state = MQTT_DISCONNECTED;
24
    setClient(client);
25
    this->stream = NULL;
26
    this->bufferSize = 0;
27
    setBufferSize(MQTT_MAX_PACKET_SIZE);
28
    setKeepAlive(MQTT_KEEPALIVE);
29
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
30
}
31
 
32
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
33
    this->_state = MQTT_DISCONNECTED;
34
    setServer(addr, port);
35
    setClient(client);
36
    this->stream = NULL;
37
    this->bufferSize = 0;
38
    setBufferSize(MQTT_MAX_PACKET_SIZE);
39
    setKeepAlive(MQTT_KEEPALIVE);
40
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
41
}
42
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
43
    this->_state = MQTT_DISCONNECTED;
44
    setServer(addr,port);
45
    setClient(client);
46
    setStream(stream);
47
    this->bufferSize = 0;
48
    setBufferSize(MQTT_MAX_PACKET_SIZE);
49
    setKeepAlive(MQTT_KEEPALIVE);
50
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
51
}
52
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
53
    this->_state = MQTT_DISCONNECTED;
54
    setServer(addr, port);
55
    setCallback(callback);
56
    setClient(client);
57
    this->stream = NULL;
58
    this->bufferSize = 0;
59
    setBufferSize(MQTT_MAX_PACKET_SIZE);
60
    setKeepAlive(MQTT_KEEPALIVE);
61
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
62
}
63
PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
64
    this->_state = MQTT_DISCONNECTED;
65
    setServer(addr,port);
66
    setCallback(callback);
67
    setClient(client);
68
    setStream(stream);
69
    this->bufferSize = 0;
70
    setBufferSize(MQTT_MAX_PACKET_SIZE);
71
    setKeepAlive(MQTT_KEEPALIVE);
72
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
73
}
74
 
75
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
76
    this->_state = MQTT_DISCONNECTED;
77
    setServer(ip, port);
78
    setClient(client);
79
    this->stream = NULL;
80
    this->bufferSize = 0;
81
    setBufferSize(MQTT_MAX_PACKET_SIZE);
82
    setKeepAlive(MQTT_KEEPALIVE);
83
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
84
}
85
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
86
    this->_state = MQTT_DISCONNECTED;
87
    setServer(ip,port);
88
    setClient(client);
89
    setStream(stream);
90
    this->bufferSize = 0;
91
    setBufferSize(MQTT_MAX_PACKET_SIZE);
92
    setKeepAlive(MQTT_KEEPALIVE);
93
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
94
}
95
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
96
    this->_state = MQTT_DISCONNECTED;
97
    setServer(ip, port);
98
    setCallback(callback);
99
    setClient(client);
100
    this->stream = NULL;
101
    this->bufferSize = 0;
102
    setBufferSize(MQTT_MAX_PACKET_SIZE);
103
    setKeepAlive(MQTT_KEEPALIVE);
104
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
105
}
106
PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
107
    this->_state = MQTT_DISCONNECTED;
108
    setServer(ip,port);
109
    setCallback(callback);
110
    setClient(client);
111
    setStream(stream);
112
    this->bufferSize = 0;
113
    setBufferSize(MQTT_MAX_PACKET_SIZE);
114
    setKeepAlive(MQTT_KEEPALIVE);
115
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
116
}
117
 
118
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
119
    this->_state = MQTT_DISCONNECTED;
120
    setServer(domain,port);
121
    setClient(client);
122
    this->stream = NULL;
123
    this->bufferSize = 0;
124
    setBufferSize(MQTT_MAX_PACKET_SIZE);
125
    setKeepAlive(MQTT_KEEPALIVE);
126
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
127
}
128
PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
129
    this->_state = MQTT_DISCONNECTED;
130
    setServer(domain,port);
131
    setClient(client);
132
    setStream(stream);
133
    this->bufferSize = 0;
134
    setBufferSize(MQTT_MAX_PACKET_SIZE);
135
    setKeepAlive(MQTT_KEEPALIVE);
136
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
137
}
138
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
139
    this->_state = MQTT_DISCONNECTED;
140
    setServer(domain,port);
141
    setCallback(callback);
142
    setClient(client);
143
    this->stream = NULL;
144
    this->bufferSize = 0;
145
    setBufferSize(MQTT_MAX_PACKET_SIZE);
146
    setKeepAlive(MQTT_KEEPALIVE);
147
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
148
}
149
PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
150
    this->_state = MQTT_DISCONNECTED;
151
    setServer(domain,port);
152
    setCallback(callback);
153
    setClient(client);
154
    setStream(stream);
155
    this->bufferSize = 0;
156
    setBufferSize(MQTT_MAX_PACKET_SIZE);
157
    setKeepAlive(MQTT_KEEPALIVE);
158
    setSocketTimeout(MQTT_SOCKET_TIMEOUT);
159
}
160
 
161
PubSubClient::~PubSubClient() {
162
  free(this->buffer);
163
}
164
 
165
boolean PubSubClient::connect(const char *id) {
166
    return connect(id,NULL,NULL,0,0,0,0,1);
167
}
168
 
169
boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
170
    return connect(id,user,pass,0,0,0,0,1);
171
}
172
 
173
boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
174
    return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage,1);
175
}
176
 
177
boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
178
    return connect(id,user,pass,willTopic,willQos,willRetain,willMessage,1);
179
}
180
 
181
boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage, boolean cleanSession) {
182
    if (!connected()) {
183
        int result = 0;
184
 
185
 
186
        if(_client->connected()) {
187
            result = 1;
188
        } else {
189
            if (domain != NULL) {
190
                result = _client->connect(this->domain, this->port);
191
            } else {
192
                result = _client->connect(this->ip, this->port);
193
            }
194
        }
195
 
196
        if (result == 1) {
197
            nextMsgId = 1;
198
            // Leave room in the buffer for header and variable length field
199
            uint16_t length = MQTT_MAX_HEADER_SIZE;
200
            unsigned int j;
201
 
202
#if MQTT_VERSION == MQTT_VERSION_3_1
203
            uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
204
#define MQTT_HEADER_VERSION_LENGTH 9
205
#elif MQTT_VERSION == MQTT_VERSION_3_1_1
206
            uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
207
#define MQTT_HEADER_VERSION_LENGTH 7
208
#endif
209
            for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
210
                this->buffer[length++] = d[j];
211
            }
212
 
213
            uint8_t v;
214
            if (willTopic) {
215
                v = 0x04|(willQos<<3)|(willRetain<<5);
216
            } else {
217
                v = 0x00;
218
            }
219
            if (cleanSession) {
220
                v = v|0x02;
221
            }
222
 
223
            if(user != NULL) {
224
                v = v|0x80;
225
 
226
                if(pass != NULL) {
227
                    v = v|(0x80>>1);
228
                }
229
            }
230
            this->buffer[length++] = v;
231
 
232
            this->buffer[length++] = ((this->keepAlive) >> 8);
233
            this->buffer[length++] = ((this->keepAlive) & 0xFF);
234
 
235
            CHECK_STRING_LENGTH(length,id)
236
            length = writeString(id,this->buffer,length);
237
            if (willTopic) {
238
                CHECK_STRING_LENGTH(length,willTopic)
239
                length = writeString(willTopic,this->buffer,length);
240
                CHECK_STRING_LENGTH(length,willMessage)
241
                length = writeString(willMessage,this->buffer,length);
242
            }
243
 
244
            if(user != NULL) {
245
                CHECK_STRING_LENGTH(length,user)
246
                length = writeString(user,this->buffer,length);
247
                if(pass != NULL) {
248
                    CHECK_STRING_LENGTH(length,pass)
249
                    length = writeString(pass,this->buffer,length);
250
                }
251
            }
252
 
253
            write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE);
254
 
255
            lastInActivity = lastOutActivity = millis();
256
 
257
            while (!_client->available()) {
258
                unsigned long t = millis();
259
                if (t-lastInActivity >= ((int32_t) this->socketTimeout*1000UL)) {
260
                    _state = MQTT_CONNECTION_TIMEOUT;
261
                    _client->stop();
262
                    return false;
263
                }
264
            }
265
            uint8_t llen;
266
            uint32_t len = readPacket(&llen);
267
 
268
            if (len == 4) {
269
                if (buffer[3] == 0) {
270
                    lastInActivity = millis();
271
                    pingOutstanding = false;
272
                    _state = MQTT_CONNECTED;
273
                    return true;
274
                } else {
275
                    _state = buffer[3];
276
                }
277
            }
278
            _client->stop();
279
        } else {
280
            _state = MQTT_CONNECT_FAILED;
281
        }
282
        return false;
283
    }
284
    return true;
285
}
286
 
287
// reads a byte into result
288
boolean PubSubClient::readByte(uint8_t * result) {
289
   uint32_t previousMillis = millis();
290
   while(!_client->available()) {
291
     yield();
292
     uint32_t currentMillis = millis();
293
     if(currentMillis - previousMillis >= ((int32_t) this->socketTimeout * 1000)){
294
       return false;
295
     }
296
   }
297
   *result = _client->read();
298
   return true;
299
}
300
 
301
// reads a byte into result[*index] and increments index
302
boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
303
  uint16_t current_index = *index;
304
  uint8_t * write_address = &(result[current_index]);
305
  if(readByte(write_address)){
306
    *index = current_index + 1;
307
    return true;
308
  }
309
  return false;
310
}
311
 
312
uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
313
    uint16_t len = 0;
314
    if(!readByte(this->buffer, &len)) return 0;
315
    bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
316
    uint32_t multiplier = 1;
317
    uint32_t length = 0;
318
    uint8_t digit = 0;
319
    uint16_t skip = 0;
320
    uint32_t start = 0;
321
 
322
    do {
323
        if (len == 5) {
324
            // Invalid remaining length encoding - kill the connection
325
            _state = MQTT_DISCONNECTED;
326
            _client->stop();
327
            return 0;
328
        }
329
        if(!readByte(&digit)) return 0;
330
        this->buffer[len++] = digit;
331
        length += (digit & 127) * multiplier;
332
        multiplier <<=7; //multiplier *= 128
333
    } while ((digit & 128) != 0);
334
    *lengthLength = len-1;
335
 
336
    if (isPublish) {
337
        // Read in topic length to calculate bytes to skip over for Stream writing
338
        if(!readByte(this->buffer, &len)) return 0;
339
        if(!readByte(this->buffer, &len)) return 0;
340
        skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2];
341
        start = 2;
342
        if (this->buffer[0]&MQTTQOS1) {
343
            // skip message id
344
            skip += 2;
345
        }
346
    }
347
    uint32_t idx = len;
348
 
349
    for (uint32_t i = start;i<length;i++) {
350
        if(!readByte(&digit)) return 0;
351
        if (this->stream) {
352
            if (isPublish && idx-*lengthLength-2>skip) {
353
                this->stream->write(digit);
354
            }
355
        }
356
 
357
        if (len < this->bufferSize) {
358
            this->buffer[len] = digit;
359
            len++;
360
        }
361
        idx++;
362
    }
363
 
364
    if (!this->stream && idx > this->bufferSize) {
365
        len = 0; // This will cause the packet to be ignored.
366
    }
367
    return len;
368
}
369
 
370
boolean PubSubClient::loop() {
371
    if (connected()) {
372
        unsigned long t = millis();
373
        if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) {
374
            if (pingOutstanding) {
375
                this->_state = MQTT_CONNECTION_TIMEOUT;
376
                _client->stop();
377
                return false;
378
            } else {
379
                this->buffer[0] = MQTTPINGREQ;
380
                this->buffer[1] = 0;
381
                _client->write(this->buffer,2);
382
                lastOutActivity = t;
383
                lastInActivity = t;
384
                pingOutstanding = true;
385
            }
386
        }
387
        if (_client->available()) {
388
            uint8_t llen;
389
            uint16_t len = readPacket(&llen);
390
            uint16_t msgId = 0;
391
            uint8_t *payload;
392
            if (len > 0) {
393
                lastInActivity = t;
394
                uint8_t type = this->buffer[0]&0xF0;
395
                if (type == MQTTPUBLISH) {
396
                    if (callback) {
397
                        uint16_t tl = (this->buffer[llen+1]<<8)+this->buffer[llen+2]; /* topic length in bytes */
398
                        memmove(this->buffer+llen+2,this->buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
399
                        this->buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
400
                        char *topic = (char*) this->buffer+llen+2;
401
                        // msgId only present for QOS>0
402
                        if ((this->buffer[0]&0x06) == MQTTQOS1) {
403
                            msgId = (this->buffer[llen+3+tl]<<8)+this->buffer[llen+3+tl+1];
404
                            payload = this->buffer+llen+3+tl+2;
405
                            callback(topic,payload,len-llen-3-tl-2);
406
 
407
                            this->buffer[0] = MQTTPUBACK;
408
                            this->buffer[1] = 2;
409
                            this->buffer[2] = (msgId >> 8);
410
                            this->buffer[3] = (msgId & 0xFF);
411
                            _client->write(this->buffer,4);
412
                            lastOutActivity = t;
413
 
414
                        } else {
415
                            payload = this->buffer+llen+3+tl;
416
                            callback(topic,payload,len-llen-3-tl);
417
                        }
418
                    }
419
                } else if (type == MQTTPINGREQ) {
420
                    this->buffer[0] = MQTTPINGRESP;
421
                    this->buffer[1] = 0;
422
                    _client->write(this->buffer,2);
423
                } else if (type == MQTTPINGRESP) {
424
                    pingOutstanding = false;
425
                }
426
            } else if (!connected()) {
427
                // readPacket has closed the connection
428
                return false;
429
            }
430
        }
431
        return true;
432
    }
433
    return false;
434
}
435
 
436
boolean PubSubClient::publish(const char* topic, const char* payload) {
437
    return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,false);
438
}
439
 
440
boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
441
    return publish(topic,(const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0,retained);
442
}
443
 
444
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
445
    return publish(topic, payload, plength, false);
446
}
447
 
448
boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
449
    if (connected()) {
450
        if (this->bufferSize < MQTT_MAX_HEADER_SIZE + 2+strnlen(topic, this->bufferSize) + plength) {
451
            // Too long
452
            return false;
453
        }
454
        // Leave room in the buffer for header and variable length field
455
        uint16_t length = MQTT_MAX_HEADER_SIZE;
456
        length = writeString(topic,this->buffer,length);
457
 
458
        // Add payload
459
        uint16_t i;
460
        for (i=0;i<plength;i++) {
461
            this->buffer[length++] = payload[i];
462
        }
463
 
464
        // Write the header
465
        uint8_t header = MQTTPUBLISH;
466
        if (retained) {
467
            header |= 1;
468
        }
469
        return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE);
470
    }
471
    return false;
472
}
473
 
474
boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean retained) {
475
    return publish_P(topic, (const uint8_t*)payload, payload ? strnlen(payload, this->bufferSize) : 0, retained);
476
}
477
 
478
boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
479
    uint8_t llen = 0;
480
    uint8_t digit;
481
    unsigned int rc = 0;
482
    uint16_t tlen;
483
    unsigned int pos = 0;
484
    unsigned int i;
485
    uint8_t header;
486
    unsigned int len;
487
    int expectedLength;
488
 
489
    if (!connected()) {
490
        return false;
491
    }
492
 
493
    tlen = strnlen(topic, this->bufferSize);
494
 
495
    header = MQTTPUBLISH;
496
    if (retained) {
497
        header |= 1;
498
    }
499
    this->buffer[pos++] = header;
500
    len = plength + 2 + tlen;
501
    do {
502
        digit = len  & 127; //digit = len %128
503
        len >>= 7; //len = len / 128
504
        if (len > 0) {
505
            digit |= 0x80;
506
        }
507
        this->buffer[pos++] = digit;
508
        llen++;
509
    } while(len>0);
510
 
511
    pos = writeString(topic,this->buffer,pos);
512
 
513
    rc += _client->write(this->buffer,pos);
514
 
515
    for (i=0;i<plength;i++) {
516
        rc += _client->write((char)pgm_read_byte_near(payload + i));
517
    }
518
 
519
    lastOutActivity = millis();
520
 
521
    expectedLength = 1 + llen + 2 + tlen + plength;
522
 
523
    return (rc == expectedLength);
524
}
525
 
526
boolean PubSubClient::beginPublish(const char* topic, unsigned int plength, boolean retained) {
527
    if (connected()) {
528
        // Send the header and variable length field
529
        uint16_t length = MQTT_MAX_HEADER_SIZE;
530
        length = writeString(topic,this->buffer,length);
531
        uint8_t header = MQTTPUBLISH;
532
        if (retained) {
533
            header |= 1;
534
        }
535
        size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
536
        uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
537
        lastOutActivity = millis();
538
        return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
539
    }
540
    return false;
541
}
542
 
543
int PubSubClient::endPublish() {
544
 return 1;
545
}
546
 
547
size_t PubSubClient::write(uint8_t data) {
548
    lastOutActivity = millis();
549
    return _client->write(data);
550
}
551
 
552
size_t PubSubClient::write(const uint8_t *buffer, size_t size) {
553
    lastOutActivity = millis();
554
    return _client->write(buffer,size);
555
}
556
 
557
size_t PubSubClient::buildHeader(uint8_t header, uint8_t* buf, uint16_t length) {
558
    uint8_t lenBuf[4];
559
    uint8_t llen = 0;
560
    uint8_t digit;
561
    uint8_t pos = 0;
562
    uint16_t len = length;
563
    do {
564
 
565
        digit = len  & 127; //digit = len %128
566
        len >>= 7; //len = len / 128
567
        if (len > 0) {
568
            digit |= 0x80;
569
        }
570
        lenBuf[pos++] = digit;
571
        llen++;
572
    } while(len>0);
573
 
574
    buf[4-llen] = header;
575
    for (int i=0;i<llen;i++) {
576
        buf[MQTT_MAX_HEADER_SIZE-llen+i] = lenBuf[i];
577
    }
578
    return llen+1; // Full header size is variable length bit plus the 1-byte fixed header
579
}
580
 
581
boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
582
    uint16_t rc;
583
    uint8_t hlen = buildHeader(header, buf, length);
584
 
585
#ifdef MQTT_MAX_TRANSFER_SIZE
586
    uint8_t* writeBuf = buf+(MQTT_MAX_HEADER_SIZE-hlen);
587
    uint16_t bytesRemaining = length+hlen;  //Match the length type
588
    uint8_t bytesToWrite;
589
    boolean result = true;
590
    while((bytesRemaining > 0) && result) {
591
        bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
592
        rc = _client->write(writeBuf,bytesToWrite);
593
        result = (rc == bytesToWrite);
594
        bytesRemaining -= rc;
595
        writeBuf += rc;
596
    }
597
    return result;
598
#else
599
    rc = _client->write(buf+(MQTT_MAX_HEADER_SIZE-hlen),length+hlen);
600
    lastOutActivity = millis();
601
    return (rc == hlen+length);
602
#endif
603
}
604
 
605
boolean PubSubClient::subscribe(const char* topic) {
606
    return subscribe(topic, 0);
607
}
608
 
609
boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
610
    size_t topicLength = strnlen(topic, this->bufferSize);
611
    if (topic == 0) {
612
        return false;
613
    }
614
    if (qos > 1) {
615
        return false;
616
    }
617
    if (this->bufferSize < 9 + topicLength) {
618
        // Too long
619
        return false;
620
    }
621
    if (connected()) {
622
        // Leave room in the buffer for header and variable length field
623
        uint16_t length = MQTT_MAX_HEADER_SIZE;
624
        nextMsgId++;
625
        if (nextMsgId == 0) {
626
            nextMsgId = 1;
627
        }
628
        this->buffer[length++] = (nextMsgId >> 8);
629
        this->buffer[length++] = (nextMsgId & 0xFF);
630
        length = writeString((char*)topic, this->buffer,length);
631
        this->buffer[length++] = qos;
632
        return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
633
    }
634
    return false;
635
}
636
 
637
boolean PubSubClient::unsubscribe(const char* topic) {
638
	size_t topicLength = strnlen(topic, this->bufferSize);
639
    if (topic == 0) {
640
        return false;
641
    }
642
    if (this->bufferSize < 9 + topicLength) {
643
        // Too long
644
        return false;
645
    }
646
    if (connected()) {
647
        uint16_t length = MQTT_MAX_HEADER_SIZE;
648
        nextMsgId++;
649
        if (nextMsgId == 0) {
650
            nextMsgId = 1;
651
        }
652
        this->buffer[length++] = (nextMsgId >> 8);
653
        this->buffer[length++] = (nextMsgId & 0xFF);
654
        length = writeString(topic, this->buffer,length);
655
        return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
656
    }
657
    return false;
658
}
659
 
660
void PubSubClient::disconnect() {
661
    this->buffer[0] = MQTTDISCONNECT;
662
    this->buffer[1] = 0;
663
    _client->write(this->buffer,2);
664
    _state = MQTT_DISCONNECTED;
665
    _client->flush();
666
    _client->stop();
667
    lastInActivity = lastOutActivity = millis();
668
}
669
 
670
uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
671
    const char* idp = string;
672
    uint16_t i = 0;
673
    pos += 2;
674
    while (*idp) {
675
        buf[pos++] = *idp++;
676
        i++;
677
    }
678
    buf[pos-i-2] = (i >> 8);
679
    buf[pos-i-1] = (i & 0xFF);
680
    return pos;
681
}
682
 
683
 
684
boolean PubSubClient::connected() {
685
    boolean rc;
686
    if (_client == NULL ) {
687
        rc = false;
688
    } else {
689
        rc = (int)_client->connected();
690
        if (!rc) {
691
            if (this->_state == MQTT_CONNECTED) {
692
                this->_state = MQTT_CONNECTION_LOST;
693
                _client->flush();
694
                _client->stop();
695
            }
696
        } else {
697
            return this->_state == MQTT_CONNECTED;
698
        }
699
    }
700
    return rc;
701
}
702
 
703
PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
704
    IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
705
    return setServer(addr,port);
706
}
707
 
708
PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
709
    this->ip = ip;
710
    this->port = port;
711
    this->domain = NULL;
712
    return *this;
713
}
714
 
715
PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
716
    this->domain = domain;
717
    this->port = port;
718
    return *this;
719
}
720
 
721
PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
722
    this->callback = callback;
723
    return *this;
724
}
725
 
726
PubSubClient& PubSubClient::setClient(Client& client){
727
    this->_client = &client;
728
    return *this;
729
}
730
 
731
PubSubClient& PubSubClient::setStream(Stream& stream){
732
    this->stream = &stream;
733
    return *this;
734
}
735
 
736
int PubSubClient::state() {
737
    return this->_state;
738
}
739
 
740
boolean PubSubClient::setBufferSize(uint16_t size) {
741
    if (size == 0) {
742
        // Cannot set it back to 0
743
        return false;
744
    }
745
    if (this->bufferSize == 0) {
746
        this->buffer = (uint8_t*)malloc(size);
747
    } else {
748
        uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
749
        if (newBuffer != NULL) {
750
            this->buffer = newBuffer;
751
        } else {
752
            return false;
753
        }
754
    }
755
    this->bufferSize = size;
756
    return (this->buffer != NULL);
757
}
758
 
759
uint16_t PubSubClient::getBufferSize() {
760
    return this->bufferSize;
761
}
762
PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) {
763
    this->keepAlive = keepAlive;
764
    return *this;
765
}
766
PubSubClient& PubSubClient::setSocketTimeout(uint16_t timeout) {
767
    this->socketTimeout = timeout;
768
    return *this;
769
}