Subversion Repositories ESP8266_P1_Meter

Rev

Blame | Last modification | View Log | RSS feed

/**
 * @file  ESPAsyncTCPbuffer.cpp
 * @date  22.01.2016
 * @author Markus Sattler
 *
 * Copyright (c) 2015 Markus Sattler. All rights reserved.
 * This file is part of the Asynv TCP for ESP.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 *
 */


#include <Arduino.h>
#include <debug.h>

#include "ESPAsyncTCPbuffer.h"


AsyncTCPbuffer::AsyncTCPbuffer(AsyncClient* client) {
    if(client == NULL) {
        DEBUG_ASYNC_TCP("[A-TCP] client is null!!!\n");
        panic();
    }

    _client = client;
    _TXbufferWrite = new (std::nothrow) cbuf(TCP_MSS);
    _TXbufferRead = _TXbufferWrite;
    _RXbuffer = new (std::nothrow) cbuf(100);
    _RXmode = ATB_RX_MODE_FREE;
    _rxSize = 0;
    _rxTerminator = 0x00;
    _rxReadBytesPtr = NULL;
    _rxReadStringPtr = NULL;
    _cbDisconnect = NULL;

    _cbRX = NULL;
    _cbDone = NULL;
    _attachCallbacks();
}

AsyncTCPbuffer::~AsyncTCPbuffer() {
    if(_client) {
        _client->close();
    }

    if(_RXbuffer) {
        delete _RXbuffer;
        _RXbuffer = NULL;
    }

    if(_TXbufferWrite) {
        // will be deleted in _TXbufferRead chain
        _TXbufferWrite = NULL;
    }

    if(_TXbufferRead) {
        cbuf * next = _TXbufferRead->next;
        delete _TXbufferRead;
        while(next != NULL) {
            _TXbufferRead = next;
            next = _TXbufferRead->next;
            delete _TXbufferRead;
        }
        _TXbufferRead = NULL;
    }
}

size_t AsyncTCPbuffer::write(String & data) {
    return write(data.c_str(), data.length());
}

size_t AsyncTCPbuffer::write(uint8_t data) {
    return write(&data, 1);
}

size_t AsyncTCPbuffer::write(const char* data) {
    return write((const uint8_t *) data, strlen(data));
}

size_t AsyncTCPbuffer::write(const char *data, size_t len) {
    return write((const uint8_t *) data, len);
}

/**
 * write data in to buffer and try to send the data
 * @param data
 * @param len
 * @return
 */
size_t AsyncTCPbuffer::write(const uint8_t *data, size_t len) {
    if(_TXbufferWrite == NULL || _client == NULL || !_client->connected() || data == NULL || len == 0) {
        return 0;
    }

    size_t bytesLeft = len;
    while(bytesLeft) {
        size_t w = _TXbufferWrite->write((const char*) data, bytesLeft);
        bytesLeft -= w;
        data += w;
        _sendBuffer();

        // add new buffer since we have more data
        if(_TXbufferWrite->full() && bytesLeft > 0) {

            // to less ram!!!
            if(ESP.getFreeHeap() < 4096) {
                DEBUG_ASYNC_TCP("[A-TCP] run out of Heap can not send all Data!\n");
                return (len - bytesLeft);
            }

            cbuf * next = new (std::nothrow) cbuf(TCP_MSS);
            if(next == NULL) {
                DEBUG_ASYNC_TCP("[A-TCP] run out of Heap!\n");
                panic();
            } else {
                DEBUG_ASYNC_TCP("[A-TCP] new cbuf\n");
            }

            // add new buffer to chain (current cbuf)
            _TXbufferWrite->next = next;

            // move ptr for next data
            _TXbufferWrite = next;
        }
    }

    return len;

}

/**
 * wait until all data has send out
 */
void AsyncTCPbuffer::flush() {
    while(!_TXbufferWrite->empty()) {
        while(connected() && !_client->canSend()) {
          delay(0);
        }
        if(!connected())
          return;
        _sendBuffer();
    }
}

void AsyncTCPbuffer::noCallback() {
    _RXmode = ATB_RX_MODE_NONE;
}

void AsyncTCPbuffer::readStringUntil(char terminator, String * str, AsyncTCPbufferDoneCb done) {
    if(_client == NULL) {
        return;
    }
    DEBUG_ASYNC_TCP("[A-TCP] readStringUntil terminator: %02X\n", terminator);
    _RXmode = ATB_RX_MODE_NONE;
    _cbDone = done;
    _rxReadStringPtr = str;
    _rxTerminator = terminator;
    _rxSize = 0;
    _RXmode = ATB_RX_MODE_TERMINATOR_STRING;
}

/*
 void AsyncTCPbuffer::readBytesUntil(char terminator, char *buffer, size_t length, AsyncTCPbufferDoneCb done) {
 _RXmode = ATB_RX_MODE_NONE;
 _cbDone = done;
 _rxReadBytesPtr = (uint8_t *) buffer;
 _rxTerminator = terminator;
 _rxSize = length;
 _RXmode = ATB_RX_MODE_TERMINATOR;
 _handleRxBuffer(NULL, 0);
 }

 void AsyncTCPbuffer::readBytesUntil(char terminator, uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) {
 readBytesUntil(terminator, (char *) buffer, length, done);
 }
 */

void AsyncTCPbuffer::readBytes(char *buffer, size_t length, AsyncTCPbufferDoneCb done) {
    if(_client == NULL) {
        return;
    }
    DEBUG_ASYNC_TCP("[A-TCP] readBytes length: %d\n", length);
    _RXmode = ATB_RX_MODE_NONE;
    _cbDone = done;
    _rxReadBytesPtr = (uint8_t *) buffer;
    _rxSize = length;
    _RXmode = ATB_RX_MODE_READ_BYTES;
}

void AsyncTCPbuffer::readBytes(uint8_t *buffer, size_t length, AsyncTCPbufferDoneCb done) {
    readBytes((char *) buffer, length, done);
}

void AsyncTCPbuffer::onData(AsyncTCPbufferDataCb cb) {
    if(_client == NULL) {
        return;
    }
    DEBUG_ASYNC_TCP("[A-TCP] onData\n");
    _RXmode = ATB_RX_MODE_NONE;
    _cbDone = NULL;
    _cbRX = cb;
    _RXmode = ATB_RX_MODE_FREE;
}

void AsyncTCPbuffer::onDisconnect(AsyncTCPbufferDisconnectCb cb) {
    _cbDisconnect = cb;
}

IPAddress AsyncTCPbuffer::remoteIP() {
    if(!_client) {
        return IPAddress(0U);
    }
    return _client->remoteIP();
}

uint16_t AsyncTCPbuffer::remotePort() {
    if(!_client) {
        return 0;
    }
    return _client->remotePort();
}

bool AsyncTCPbuffer::connected() {
    if(!_client) {
        return false;
    }
    return _client->connected();
}

void AsyncTCPbuffer::stop() {

    if(!_client) {
        return;
    }
    _client->stop();
    _client = NULL;

    if(_cbDone) {
        switch(_RXmode) {
            case ATB_RX_MODE_READ_BYTES:
            case ATB_RX_MODE_TERMINATOR:
            case ATB_RX_MODE_TERMINATOR_STRING:
                _RXmode = ATB_RX_MODE_NONE;
                _cbDone(false, NULL);
                break;
            default:
              break;
        }
    }
    _RXmode = ATB_RX_MODE_NONE;
}

void AsyncTCPbuffer::close() {
    stop();
}


///--------------------------------

/**
 * attachCallbacks to AsyncClient class
 */
void AsyncTCPbuffer::_attachCallbacks() {
    if(!_client) {
        return;
    }
    DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks\n");

    _client->onPoll([](void *obj, AsyncClient* c) {
        (void)c;
        AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj));
        if((b->_TXbufferRead != NULL) && !b->_TXbufferRead->empty()) {
            b->_sendBuffer();
        }
        //    if(!b->_RXbuffer->empty()) {
        //       b->_handleRxBuffer(NULL, 0);
        //   }
    }, this);

    _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time) {
        (void)c;
        (void)len;
        (void)time;
        DEBUG_ASYNC_TCP("[A-TCP] onAck\n");
        ((AsyncTCPbuffer*)(obj))->_sendBuffer();
    }, this);

    _client->onDisconnect([](void *obj, AsyncClient* c) {
        DEBUG_ASYNC_TCP("[A-TCP] onDisconnect\n");
        AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj));
        b->_client = NULL;
        bool del = true;
        if(b->_cbDisconnect) {
            del = b->_cbDisconnect(b);
        }
        delete c;
        if(del) {
            delete b;
        }
    }, this);

    _client->onData([](void *obj, AsyncClient* c, void *buf, size_t len) {
        (void)c;
        AsyncTCPbuffer* b = ((AsyncTCPbuffer*)(obj));
        b->_rxData((uint8_t *)buf, len);
    }, this);

    _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){
        (void)obj;
        (void)time;
        DEBUG_ASYNC_TCP("[A-TCP] onTimeout\n");
        c->close();
    }, this);

    DEBUG_ASYNC_TCP("[A-TCP] attachCallbacks Done.\n");
}

/**
 * send TX buffer if possible
 */
void AsyncTCPbuffer::_sendBuffer() {
    //DEBUG_ASYNC_TCP("[A-TCP] _sendBuffer...\n");
    size_t available = _TXbufferRead->available();
    if(available == 0 || _client == NULL || !_client->connected() || !_client->canSend()) {
        return;
    }

    while(connected() && (_client->space() > 0) && (_TXbufferRead->available() > 0) && _client->canSend()) {

        available = _TXbufferRead->available();

        if(available > _client->space()) {
            available = _client->space();
        }

        char *out = new (std::nothrow) char[available];
        if(out == NULL) {
            DEBUG_ASYNC_TCP("[A-TCP] to less heap, try later.\n");
            return;
        }

        // read data from buffer
        _TXbufferRead->peek(out, available);

        // send data
        size_t send = _client->write((const char*) out, available);
        if(send != available) {
            DEBUG_ASYNC_TCP("[A-TCP] write failed send: %d available: %d \n", send, available);
            if(!connected()) {
                DEBUG_ASYNC_TCP("[A-TCP] incomplete transfer, connection lost.\n");
            }
        }

        // remove really send data from buffer
        _TXbufferRead->remove(send);

        // if buffer is empty and there is a other buffer in chain delete the empty one
        if(_TXbufferRead->available() == 0 && _TXbufferRead->next != NULL) {
            cbuf * old = _TXbufferRead;
            _TXbufferRead = _TXbufferRead->next;
            delete old;
            DEBUG_ASYNC_TCP("[A-TCP] delete cbuf\n");
        }

        delete out;
    }

}

/**
 * called on incoming data
 * @param buf
 * @param len
 */
void AsyncTCPbuffer::_rxData(uint8_t *buf, size_t len) {
    if(!_client || !_client->connected()) {
        DEBUG_ASYNC_TCP("[A-TCP] not connected!\n");
        return;
    }
    if(!_RXbuffer) {
        DEBUG_ASYNC_TCP("[A-TCP] _rxData no _RXbuffer!\n");
        return;
    }
    DEBUG_ASYNC_TCP("[A-TCP] _rxData len: %d RXmode: %d\n", len, _RXmode);

    size_t handled = 0;

    if(_RXmode != ATB_RX_MODE_NONE) {
        handled = _handleRxBuffer((uint8_t *) buf, len);
        buf += handled;
        len -= handled;

        // handle as much as possible before using the buffer
        if(_RXbuffer->empty()) {
            while(_RXmode != ATB_RX_MODE_NONE && handled != 0 && len > 0) {
                handled = _handleRxBuffer(buf, len);
                buf += handled;
                len -= handled;
            }
        }
    }

    if(len > 0) {

        if(_RXbuffer->room() < len) {
            // to less space
            DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer full try resize\n");
            _RXbuffer->resizeAdd((len + _RXbuffer->room()));

            if(_RXbuffer->room() < len) {
                DEBUG_ASYNC_TCP("[A-TCP] _rxData buffer to full can only handle %d!!!\n", _RXbuffer->room());
            }
        }

        _RXbuffer->write((const char *) (buf), len);
    }

    if(!_RXbuffer->empty() && _RXmode != ATB_RX_MODE_NONE) {
        // handle as much as possible data in buffer
        handled = _handleRxBuffer(NULL, 0);
        while(_RXmode != ATB_RX_MODE_NONE && handled != 0) {
            handled = _handleRxBuffer(NULL, 0);
        }
    }

    // clean up ram
    if(_RXbuffer->empty() && _RXbuffer->room() != 100) {
        _RXbuffer->resize(100);
    }

}

/**
 *
 */
size_t AsyncTCPbuffer::_handleRxBuffer(uint8_t *buf, size_t len) {
    if(!_client || !_client->connected() || _RXbuffer == NULL) {
        return 0;
    }

    DEBUG_ASYNC_TCP("[A-TCP] _handleRxBuffer len: %d RXmode: %d\n", len, _RXmode);

    size_t BufferAvailable = _RXbuffer->available();
    size_t r = 0;

    if(_RXmode == ATB_RX_MODE_NONE) {
        return 0;
    } else if(_RXmode == ATB_RX_MODE_FREE) {
        if(_cbRX == NULL) {
            return 0;
        }

        if(BufferAvailable > 0) {
            uint8_t * b = new (std::nothrow) uint8_t[BufferAvailable];
            if(b == NULL){
              panic(); //TODO: What action should this be ?
            }
            _RXbuffer->peek((char *) b, BufferAvailable);
            r = _cbRX(b, BufferAvailable);
            _RXbuffer->remove(r);
        }

        if(r == BufferAvailable && buf && (len > 0)) {
            return _cbRX(buf, len);
        } else {
            return 0;
        }

    } else if(_RXmode == ATB_RX_MODE_READ_BYTES) {
        if(_rxReadBytesPtr == NULL || _cbDone == NULL) {
            return 0;
        }

        size_t newReadCount = 0;

        if(BufferAvailable) {
            r = _RXbuffer->read((char *) _rxReadBytesPtr, _rxSize);
            _rxSize -= r;
            _rxReadBytesPtr += r;
        }

        if(_RXbuffer->empty() && (len > 0) && buf) {
            r = len;
            if(r > _rxSize) {
                r = _rxSize;
            }
            memcpy(_rxReadBytesPtr, buf, r);
            _rxReadBytesPtr += r;
            _rxSize -= r;
            newReadCount += r;
        }

        if(_rxSize == 0) {
            _RXmode = ATB_RX_MODE_NONE;
            _cbDone(true, NULL);
        }

        // add left over bytes to Buffer
        return newReadCount;

    } else if(_RXmode == ATB_RX_MODE_TERMINATOR) {
        // TODO implement read terminator non string

    } else if(_RXmode == ATB_RX_MODE_TERMINATOR_STRING) {
        if(_rxReadStringPtr == NULL || _cbDone == NULL) {
            return 0;
        }

        // handle Buffer
        if(BufferAvailable > 0) {
            while(!_RXbuffer->empty()) {
                char c = _RXbuffer->read();
                if(c == _rxTerminator || c == 0x00) {
                    _RXmode = ATB_RX_MODE_NONE;
                    _cbDone(true, _rxReadStringPtr);
                    return 0;
                } else {
                    (*_rxReadStringPtr) += c;
                }
            }
        }

        if(_RXbuffer->empty() && (len > 0) && buf) {
            size_t newReadCount = 0;
            while(newReadCount < len) {
                char c = (char) *buf;
                buf++;
                newReadCount++;
                if(c == _rxTerminator || c == 0x00) {
                    _RXmode = ATB_RX_MODE_NONE;
                    _cbDone(true, _rxReadStringPtr);
                    return newReadCount;
                } else {
                    (*_rxReadStringPtr) += c;
                }
            }
            return newReadCount;
        }
    }

    return 0;
}