Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
/**
2
 * @file  ESPAsyncTCPbuffer.cpp
3
 * @date  22.01.2016
4
 * @author Markus Sattler
5
 *
6
 * Copyright (c) 2015 Markus Sattler. All rights reserved.
7
 * This file is part of the Asynv TCP for ESP.
8
 *
9
 * This library is free software; you can redistribute it and/or
10
 * modify it under the terms of the GNU Lesser General Public
11
 * License as published by the Free Software Foundation; either
12
 * version 2.1 of the License, or (at your option) any later version.
13
 *
14
 * This library is distributed in the hope that it will be useful,
15
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17
 * Lesser General Public License for more details.
18
 *
19
 * You should have received a copy of the GNU Lesser General Public
20
 * License along with this library; if not, write to the Free Software
21
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
22
 *
23
 */
24
 
25
 
26
#include <Arduino.h>
27
#include <debug.h>
28
 
29
#include "ESPAsyncTCPbuffer.h"
30
 
31
 
32
AsyncTCPbuffer::AsyncTCPbuffer(AsyncClient* client) {
33
    if(client == NULL) {
34
        DEBUG_ASYNC_TCP("[A-TCP] client is null!!!\n");
35
        panic();
36
    }
37
 
38
    _client = client;
39
    _TXbufferWrite = new (std::nothrow) cbuf(TCP_MSS);
40
    _TXbufferRead = _TXbufferWrite;
41
    _RXbuffer = new (std::nothrow) cbuf(100);
42
    _RXmode = ATB_RX_MODE_FREE;
43
    _rxSize = 0;
44
    _rxTerminator = 0x00;
45
    _rxReadBytesPtr = NULL;
46
    _rxReadStringPtr = NULL;
47
    _cbDisconnect = NULL;
48
 
49
    _cbRX = NULL;
50
    _cbDone = NULL;
51
    _attachCallbacks();
52
}
53
 
54
AsyncTCPbuffer::~AsyncTCPbuffer() {
55
    if(_client) {
56
        _client->close();
57
    }
58
 
59
    if(_RXbuffer) {
60
        delete _RXbuffer;
61
        _RXbuffer = NULL;
62
    }
63
 
64
    if(_TXbufferWrite) {
65
        // will be deleted in _TXbufferRead chain
66
        _TXbufferWrite = NULL;
67
    }
68
 
69
    if(_TXbufferRead) {
70
        cbuf * next = _TXbufferRead->next;
71
        delete _TXbufferRead;
72
        while(next != NULL) {
73
            _TXbufferRead = next;
74
            next = _TXbufferRead->next;
75
            delete _TXbufferRead;
76
        }
77
        _TXbufferRead = NULL;
78
    }
79
}
80
 
81
size_t AsyncTCPbuffer::write(String & data) {
82
    return write(data.c_str(), data.length());
83
}
84
 
85
size_t AsyncTCPbuffer::write(uint8_t data) {
86
    return write(&data, 1);
87
}
88
 
89
size_t AsyncTCPbuffer::write(const char* data) {
90
    return write((const uint8_t *) data, strlen(data));
91
}
92
 
93
size_t AsyncTCPbuffer::write(const char *data, size_t len) {
94
    return write((const uint8_t *) data, len);
95
}
96
 
97
/**
98
 * write data in to buffer and try to send the data
99
 * @param data
100
 * @param len
101
 * @return
102
 */
103
size_t AsyncTCPbuffer::write(const uint8_t *data, size_t len) {
104
    if(_TXbufferWrite == NULL || _client == NULL || !_client->connected() || data == NULL || len == 0) {
105
        return 0;
106
    }
107
 
108
    size_t bytesLeft = len;
109
    while(bytesLeft) {
110
        size_t w = _TXbufferWrite->write((const char*) data, bytesLeft);
111
        bytesLeft -= w;
112
        data += w;
113
        _sendBuffer();
114
 
115
        // add new buffer since we have more data
116
        if(_TXbufferWrite->full() && bytesLeft > 0) {
117
 
118
            // to less ram!!!
119
            if(ESP.getFreeHeap() < 4096) {
120
                DEBUG_ASYNC_TCP("[A-TCP] run out of Heap can not send all Data!\n");
121
                return (len - bytesLeft);
122
            }
123
 
124
            cbuf * next = new (std::nothrow) cbuf(TCP_MSS);
125
            if(next == NULL) {
126
                DEBUG_ASYNC_TCP("[A-TCP] run out of Heap!\n");
127
                panic();
128
            } else {
129
                DEBUG_ASYNC_TCP("[A-TCP] new cbuf\n");
130
            }
131
 
132
            // add new buffer to chain (current cbuf)
133
            _TXbufferWrite->next = next;
134
 
135
            // move ptr for next data
136
            _TXbufferWrite = next;
137
        }
138
    }
139
 
140
    return len;
141
 
142
}
143
 
144
/**
145
 * wait until all data has send out
146
 */
147
void AsyncTCPbuffer::flush() {
148
    while(!_TXbufferWrite->empty()) {
149
        while(connected() && !_client->canSend()) {
150
          delay(0);
151
        }
152
        if(!connected())
153
          return;
154
        _sendBuffer();
155
    }
156
}
157
 
158
void AsyncTCPbuffer::noCallback() {
159
    _RXmode = ATB_RX_MODE_NONE;
160
}
161
 
162
void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done) {
163
    if(_client == NULL) {
164
        return;
165
    }
166
    DEBUG_ASYNC_TCP("[A-TCP] readStringUntil terminator: %02X\n", terminator);
167
    _RXmode = ATB_RX_MODE_NONE;
168
    _cbDone = done;
169
    _rxReadStringPtr = str;
170
    _rxTerminator = terminator;
171
    _rxSize = 0;
172
    _RXmode = ATB_RX_MODE_TERMINATOR_STRING;
173
}
174
 
175
/*
176
 void AsyncTCPbuffer::readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done) {
177
 _RXmode = ATB_RX_MODE_NONE;
178
 _cbDone = done;
179
 _rxReadBytesPtr = (uint8_t *) buffer;
180
 _rxTerminator = terminator;
181
 _rxSize = length;
182
 _RXmode = ATB_RX_MODE_TERMINATOR;
183
 _handleRxBuffer(NULL, 0);
184
 }
185
 
186
 void AsyncTCPbuffer::readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) {
187
 readBytesUntil(terminator, (char *) buffer, length, done);
188
 }
189
 */
190
 
191
void AsyncTCPbuffer::readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done) {
192
    if(_client == NULL) {
193
        return;
194
    }
195
    DEBUG_ASYNC_TCP("[A-TCP] readBytes length: %d\n", length);
196
    _RXmode = ATB_RX_MODE_NONE;
197
    _cbDone = done;
198
    _rxReadBytesPtr = (uint8_t *) buffer;
199
    _rxSize = length;
200
    _RXmode = ATB_RX_MODE_READ_BYTES;
201
}
202
 
203
void AsyncTCPbuffer::readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) {
204
    readBytes((char *) buffer, length, done);
205
}
206
 
207
void AsyncTCPbuffer::onData(AsyncTCPbufferDataCb cb) {
208
    if(_client == NULL) {
209
        return;
210
    }
211
    DEBUG_ASYNC_TCP("[A-TCP] onData\n");
212
    _RXmode = ATB_RX_MODE_NONE;
213
    _cbDone = NULL;
214
    _cbRX = cb;
215
    _RXmode = ATB_RX_MODE_FREE;
216
}
217
 
218
void AsyncTCPbuffer::onDisconnect(AsyncTCPbufferDisconnectCb cb) {
219
    _cbDisconnect = cb;
220
}
221
 
222
IPAddress AsyncTCPbuffer::remoteIP() {
223
    if(!_client) {
224
        return IPAddress(0U);
225
    }
226
    return _client->remoteIP();
227
}
228
 
229
uint16_t AsyncTCPbuffer::remotePort() {
230
    if(!_client) {
231
        return 0;
232
    }
233
    return _client->remotePort();
234
}
235
 
236
bool AsyncTCPbuffer::connected() {
237
    if(!_client) {
238
        return false;
239
    }
240
    return _client->connected();
241
}
242
 
243
void AsyncTCPbuffer::stop() {
244
 
245
    if(!_client) {
246
        return;
247
    }
248
    _client->stop();
249
    _client = NULL;
250
 
251
    if(_cbDone) {
252
        switch(_RXmode) {
253
            case ATB_RX_MODE_READ_BYTES:
254
            case ATB_RX_MODE_TERMINATOR:
255
            case ATB_RX_MODE_TERMINATOR_STRING:
256
                _RXmode = ATB_RX_MODE_NONE;
257
                _cbDone(false, NULL);
258
                break;
259
            default:
260
              break;
261
        }
262
    }
263
    _RXmode = ATB_RX_MODE_NONE;
264
}
265
 
266
void AsyncTCPbuffer::close() {
267
    stop();
268
}
269
 
270
 
271
///--------------------------------
272
 
273
/**
274
 * attachCallbacks to AsyncClient class
275
 */
276
void AsyncTCPbuffer::_attachCallbacks() {
277
    if(!_client) {
278
        return;
279
    }
280
    DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks\n");
281
 
282
    _client->onPoll([](void *obj, AsyncClient* c) {
283
        (void)c;
284
        AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj));
285
        if((b->_TXbufferRead != NULL) && !b->_TXbufferRead->empty()) {
286
            b->_sendBuffer();
287
        }
288
        //    if(!b->_RXbuffer->empty()) {
289
        //       b->_handleRxBuffer(NULL, 0);
290
        //   }
291
    }, this);
292
 
293
    _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time) {
294
        (void)c;
295
        (void)len;
296
        (void)time;
297
        DEBUG_ASYNC_TCP("[A-TCP] onAck\n");
298
        ((AsyncTCPbuffer*)(obj))->_sendBuffer();
299
    }, this);
300
 
301
    _client->onDisconnect([](void *obj, AsyncClient* c) {
302
        DEBUG_ASYNC_TCP("[A-TCP] onDisconnect\n");
303
        AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj));
304
        b->_client = NULL;
305
        bool del = true;
306
        if(b->_cbDisconnect) {
307
            del = b->_cbDisconnect(b);
308
        }
309
        delete c;
310
        if(del) {
311
            delete b;
312
        }
313
    }, this);
314
 
315
    _client->onData([](void *obj, AsyncClient* c, void *buf, size_t len) {
316
        (void)c;
317
        AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj));
318
        b->_rxData((uint8_t *)buf, len);
319
    }, this);
320
 
321
    _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){
322
        (void)obj;
323
        (void)time;
324
        DEBUG_ASYNC_TCP("[A-TCP] onTimeout\n");
325
        c->close();
326
    }, this);
327
 
328
    DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks Done.\n");
329
}
330
 
331
/**
332
 * send TX buffer if possible
333
 */
334
void AsyncTCPbuffer::_sendBuffer() {
335
    //DEBUG_ASYNC_TCP("[A-TCP] _sendBuffer...\n");
336
    size_t available = _TXbufferRead->available();
337
    if(available == 0 || _client == NULL || !_client->connected() || !_client->canSend()) {
338
        return;
339
    }
340
 
341
    while(connected() && (_client->space() > 0) && (_TXbufferRead->available() > 0) && _client->canSend()) {
342
 
343
        available = _TXbufferRead->available();
344
 
345
        if(available > _client->space()) {
346
            available = _client->space();
347
        }
348
 
349
        char *out = new (std::nothrow) char[available];
350
        if(out == NULL) {
351
            DEBUG_ASYNC_TCP("[A-TCP] to less heap, try later.\n");
352
            return;
353
        }
354
 
355
        // read data from buffer
356
        _TXbufferRead->peek(out, available);
357
 
358
        // send data
359
        size_t send = _client->write((const char*) out, available);
360
        if(send != available) {
361
            DEBUG_ASYNC_TCP("[A-TCP] write failed send: %d available: %d \n", send, available);
362
            if(!connected()) {
363
                DEBUG_ASYNC_TCP("[A-TCP] incomplete transfer, connection lost.\n");
364
            }
365
        }
366
 
367
        // remove really send data from buffer
368
        _TXbufferRead->remove(send);
369
 
370
        // if buffer is empty and there is a other buffer in chain delete the empty one
371
        if(_TXbufferRead->available() == 0 && _TXbufferRead->next != NULL) {
372
            cbuf * old = _TXbufferRead;
373
            _TXbufferRead = _TXbufferRead->next;
374
            delete old;
375
            DEBUG_ASYNC_TCP("[A-TCP] delete cbuf\n");
376
        }
377
 
378
        delete out;
379
    }
380
 
381
}
382
 
383
/**
384
 * called on incoming data
385
 * @param buf
386
 * @param len
387
 */
388
void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) {
389
    if(!_client || !_client->connected()) {
390
        DEBUG_ASYNC_TCP("[A-TCP] not connected!\n");
391
        return;
392
    }
393
    if(!_RXbuffer) {
394
        DEBUG_ASYNC_TCP("[A-TCP] _rxData no _RXbuffer!\n");
395
        return;
396
    }
397
    DEBUG_ASYNC_TCP("[A-TCP] _rxData len: %d RXmode: %d\n", len, _RXmode);
398
 
399
    size_t handled = 0;
400
 
401
    if(_RXmode != ATB_RX_MODE_NONE) {
402
        handled = _handleRxBuffer((uint8_t *) buf, len);
403
        buf += handled;
404
        len -= handled;
405
 
406
        // handle as much as possible before using the buffer
407
        if(_RXbuffer->empty()) {
408
            while(_RXmode != ATB_RX_MODE_NONE && handled != 0 && len > 0) {
409
                handled = _handleRxBuffer(buf, len);
410
                buf += handled;
411
                len -= handled;
412
            }
413
        }
414
    }
415
 
416
    if(len > 0) {
417
 
418
        if(_RXbuffer->room() < len) {
419
            // to less space
420
            DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer full try resize\n");
421
            _RXbuffer->resizeAdd((len + _RXbuffer->room()));
422
 
423
            if(_RXbuffer->room() < len) {
424
                DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer to full can only handle %d!!!\n", _RXbuffer->room());
425
            }
426
        }
427
 
428
        _RXbuffer->write((const char *) (buf), len);
429
    }
430
 
431
    if(!_RXbuffer->empty() && _RXmode != ATB_RX_MODE_NONE) {
432
        // handle as much as possible data in buffer
433
        handled = _handleRxBuffer(NULL, 0);
434
        while(_RXmode != ATB_RX_MODE_NONE && handled != 0) {
435
            handled = _handleRxBuffer(NULL, 0);
436
        }
437
    }
438
 
439
    // clean up ram
440
    if(_RXbuffer->empty() && _RXbuffer->room() != 100) {
441
        _RXbuffer->resize(100);
442
    }
443
 
444
}
445
 
446
/**
447
 *
448
 */
449
size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) {
450
    if(!_client || !_client->connected() || _RXbuffer == NULL) {
451
        return 0;
452
    }
453
 
454
    DEBUG_ASYNC_TCP("[A-TCP] _handleRxBuffer len: %d RXmode: %d\n", len, _RXmode);
455
 
456
    size_t BufferAvailable = _RXbuffer->available();
457
    size_t r = 0;
458
 
459
    if(_RXmode == ATB_RX_MODE_NONE) {
460
        return 0;
461
    } else if(_RXmode == ATB_RX_MODE_FREE) {
462
        if(_cbRX == NULL) {
463
            return 0;
464
        }
465
 
466
        if(BufferAvailable > 0) {
467
            uint8_t * b = new (std::nothrow) uint8_t[BufferAvailable];
468
            if(b == NULL){
469
              panic(); //TODO: What action should this be ?
470
            }
471
            _RXbuffer->peek((char *) b, BufferAvailable);
472
            r = _cbRX(b, BufferAvailable);
473
            _RXbuffer->remove(r);
474
        }
475
 
476
        if(r == BufferAvailable && buf && (len > 0)) {
477
            return _cbRX(buf, len);
478
        } else {
479
            return 0;
480
        }
481
 
482
    } else if(_RXmode == ATB_RX_MODE_READ_BYTES) {
483
        if(_rxReadBytesPtr == NULL || _cbDone == NULL) {
484
            return 0;
485
        }
486
 
487
        size_t newReadCount = 0;
488
 
489
        if(BufferAvailable) {
490
            r = _RXbuffer->read((char *) _rxReadBytesPtr, _rxSize);
491
            _rxSize -= r;
492
            _rxReadBytesPtr += r;
493
        }
494
 
495
        if(_RXbuffer->empty() && (len > 0) && buf) {
496
            r = len;
497
            if(r > _rxSize) {
498
                r = _rxSize;
499
            }
500
            memcpy(_rxReadBytesPtr, buf, r);
501
            _rxReadBytesPtr += r;
502
            _rxSize -= r;
503
            newReadCount += r;
504
        }
505
 
506
        if(_rxSize == 0) {
507
            _RXmode = ATB_RX_MODE_NONE;
508
            _cbDone(true, NULL);
509
        }
510
 
511
        // add left over bytes to Buffer
512
        return newReadCount;
513
 
514
    } else if(_RXmode == ATB_RX_MODE_TERMINATOR) {
515
        // TODO implement read terminator non string
516
 
517
    } else if(_RXmode == ATB_RX_MODE_TERMINATOR_STRING) {
518
        if(_rxReadStringPtr == NULL || _cbDone == NULL) {
519
            return 0;
520
        }
521
 
522
        // handle Buffer
523
        if(BufferAvailable > 0) {
524
            while(!_RXbuffer->empty()) {
525
                char c = _RXbuffer->read();
526
                if(c == _rxTerminator || c == 0x00) {
527
                    _RXmode = ATB_RX_MODE_NONE;
528
                    _cbDone(true, _rxReadStringPtr);
529
                    return 0;
530
                } else {
531
                    (*_rxReadStringPtr) += c;
532
                }
533
            }
534
        }
535
 
536
        if(_RXbuffer->empty() && (len > 0) && buf) {
537
            size_t newReadCount = 0;
538
            while(newReadCount < len) {
539
                char c = (char) *buf;
540
                buf++;
541
                newReadCount++;
542
                if(c == _rxTerminator || c == 0x00) {
543
                    _RXmode = ATB_RX_MODE_NONE;
544
                    _cbDone(true, _rxReadStringPtr);
545
                    return newReadCount;
546
                } else {
547
                    (*_rxReadStringPtr) += c;
548
                }
549
            }
550
            return newReadCount;
551
        }
552
    }
553
 
554
    return 0;
555
}