Subversion Repositories ESP8266_P1_Meter

Rev

Blame | Last modification | View Log | RSS feed

/*
  Asynchronous TCP library for Espressif MCUs

  Copyright (c) 2016 Hristo Gochkov. All rights reserved.
  This file is part of the esp8266 core for Arduino environment.

  This library is free software; you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public
  License as published by the Free Software Foundation; either
  version 2.1 of the License, or (at your option) any later version.

  This library is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public
  License along with this library; if not, write to the Free Software
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*/
#include "Arduino.h"
#include "SyncClient.h"
#include "ESPAsyncTCP.h"
#include "cbuf.h"
#include <interrupts.h>

#define DEBUG_ESP_SYNC_CLIENT
#if defined(DEBUG_ESP_SYNC_CLIENT) && !defined(SYNC_CLIENT_DEBUG)
#define SYNC_CLIENT_DEBUG( format, ...) DEBUG_GENERIC_P("[SYNC_CLIENT]", format, ##__VA_ARGS__)
#endif
#ifndef SYNC_CLIENT_DEBUG
#define SYNC_CLIENT_DEBUG(...) do { (void)0;} while(false)
#endif

/*
  Without LWIP_NETIF_TX_SINGLE_PBUF, all tcp_writes default to "no copy".
  Referenced data must be preserved and free-ed from the specified tcp_sent()
  callback. Alternative, tcp_writes need to use the TCP_WRITE_FLAG_COPY
  attribute.
*/
static_assert(LWIP_NETIF_TX_SINGLE_PBUF, "Required, tcp_write() must always copy.");

SyncClient::SyncClient(size_t txBufLen)
  : _client(NULL)
  , _tx_buffer(NULL)
  , _tx_buffer_size(txBufLen)
  , _rx_buffer(NULL)
  , _ref(NULL)
{
  ref();
}

SyncClient::SyncClient(AsyncClient *client, size_t txBufLen)
  : _client(client)
  , _tx_buffer(new (std::nothrow) cbuf(txBufLen))
  , _tx_buffer_size(txBufLen)
  , _rx_buffer(NULL)
  , _ref(NULL)
{
  if(ref() > 0 && _client != NULL)
    _attachCallbacks();
}

SyncClient::~SyncClient(){
  if (0 == unref())
    _release();
}

void SyncClient::_release(){
  if(_client != NULL){
    _client->onData(NULL, NULL);
    _client->onAck(NULL, NULL);
    _client->onPoll(NULL, NULL);
    _client->abort();
    _client = NULL;
  }
  if(_tx_buffer != NULL){
    cbuf *b = _tx_buffer;
    _tx_buffer = NULL;
    delete b;
  }
  while(_rx_buffer != NULL){
    cbuf *b = _rx_buffer;
    _rx_buffer = _rx_buffer->next;
    delete b;
  }
}

int SyncClient::ref(){
  if(_ref == NULL){
    _ref = new (std::nothrow) int;
    if(_ref != NULL)
      *_ref = 0;
    else
      return -1;
  }
  return (++*_ref);
}

int SyncClient::unref(){
  int count = -1;
  if (_ref != NULL) {
    count = --*_ref;
    if (0 == count) {
      delete _ref;
      _ref = NULL;
    }
  }
  return count;
}

#if ASYNC_TCP_SSL_ENABLED
int SyncClient::_connect(const IPAddress& ip, uint16_t port, bool secure){
#else
int SyncClient::_connect(const IPAddress& ip, uint16_t port){
#endif
  if(connected())
    return 0;
  if(_client != NULL)
    delete _client;

  _client = new (std::nothrow) AsyncClient();
  if (_client == NULL)
    return 0;

  _client->onConnect([](void *obj, AsyncClient *c){ ((SyncClient*)(obj))->_onConnect(c); }, this);
  _attachCallbacks_Disconnect();
#if ASYNC_TCP_SSL_ENABLED
  if(_client->connect(ip, port, secure)){
#else
  if(_client->connect(ip, port)){
#endif
    while(_client != NULL && !_client->connected() && !_client->disconnecting())
      delay(1);
    return connected();
  }
  return 0;
}

#if ASYNC_TCP_SSL_ENABLED
int SyncClient::connect(const char *host, uint16_t port, bool secure){
#else
int SyncClient::connect(const char *host, uint16_t port){
#endif
  if(connected())
    return 0;
  if(_client != NULL)
    delete _client;

  _client = new (std::nothrow) AsyncClient();
  if (_client == NULL)
    return 0;

  _client->onConnect([](void *obj, AsyncClient *c){ ((SyncClient*)(obj))->_onConnect(c); }, this);
  _attachCallbacks_Disconnect();
#if ASYNC_TCP_SSL_ENABLED
  if(_client->connect(host, port, secure)){
#else
  if(_client->connect(host, port)){
#endif
    while(_client != NULL && !_client->connected() && !_client->disconnecting())
      delay(1);
    return connected();
  }
  return 0;
}
//#define SYNCCLIENT_NEW_OPERATOR_EQUAL
#ifdef SYNCCLIENT_NEW_OPERATOR_EQUAL
/*
  New behavior for operator=

  Allow for the object to be placed on a queue and transfered to a new container
  with buffers still in tact. Avoiding receive data drops. Transfers rx and tx
  buffers. Supports return by value.

  Note, this is optional, the old behavior is the default.

*/
SyncClient & SyncClient::operator=(const SyncClient &other){
  int *rhsref = other._ref;
  ++*rhsref; // Just in case the left and right side are the same object with different containers
  if (0 == unref())
    _release();
  _ref = other._ref;
  ref();
  --*rhsref;
  // Why do I not test _tx_buffer for != NULL and free?
  // I allow for the lh target container, to be a copy of an active
  // connection. Thus we are just reusing the container.
  // The above unref() handles releaseing the previous client of the container.
  _tx_buffer_size = other._tx_buffer_size;
  _tx_buffer = other._tx_buffer;
  _client = other._client;
  if (_client != NULL && _tx_buffer == NULL)
    _tx_buffer = new (std::nothrow) cbuf(_tx_buffer_size);

  _rx_buffer = other._rx_buffer;
  if(_client)
    _attachCallbacks();
  return *this;
}
#else   // ! SYNCCLIENT_NEW_OPERATOR_EQUAL
// This is the origianl logic with null checks
SyncClient & SyncClient::operator=(const SyncClient &other){
  if(_client != NULL){
    _client->abort();
    _client->free();
    _client = NULL;
  }
  _tx_buffer_size = other._tx_buffer_size;
  if(_tx_buffer != NULL){
    cbuf *b = _tx_buffer;
    _tx_buffer = NULL;
    delete b;
  }
  while(_rx_buffer != NULL){
    cbuf *b = _rx_buffer;
    _rx_buffer = b->next;
    delete b;
  }
  if(other._client != NULL)
    _tx_buffer = new (std::nothrow) cbuf(other._tx_buffer_size);

  _client = other._client;
  if(_client)
    _attachCallbacks();

  return *this;
}
#endif

void SyncClient::setTimeout(uint32_t seconds){
  if(_client != NULL)
    _client->setRxTimeout(seconds);
}

uint8_t SyncClient::status(){
  if(_client == NULL)
    return 0;
  return _client->state();
}

uint8_t SyncClient::connected(){
  return (_client != NULL && _client->connected());
}

bool SyncClient::stop(unsigned int maxWaitMs){
  (void)maxWaitMs;
  if(_client != NULL)
    _client->close(true);
  return true;
}

size_t SyncClient::_sendBuffer(){
  if(_client == NULL || _tx_buffer == NULL)
    return 0;
  size_t available = _tx_buffer->available();
  if(!connected() || !_client->canSend() || available == 0)
    return 0;
  size_t sendable = _client->space();
  if(sendable < available)
    available= sendable;
  char *out = new (std::nothrow) char[available];
  if(out == NULL)
    return 0;

  _tx_buffer->read(out, available);
  size_t sent = _client->write(out, available);
  delete[] out;
  return sent;
}

void SyncClient::_onData(void *data, size_t len){
  _client->ackLater();
  cbuf *b = new (std::nothrow) cbuf(len+1);
  if(b != NULL){
    b->write((const char *)data, len);
    if(_rx_buffer == NULL)
      _rx_buffer = b;
    else {
      cbuf *p = _rx_buffer;
      while(p->next != NULL)
        p = p->next;
      p->next = b;
    }
  } else {
    // We ran out of memory. This fail causes lost receive data.
    // The connection should be closed in a manner that conveys something
    // bad/abnormal has happened to the connection. Hence, we abort the
    // connection to avoid possible data corruption.
    // Note, callbacks maybe called.
    _client->abort();
  }
}

void SyncClient::_onDisconnect(){
  if(_client != NULL){
    _client = NULL;
  }
  if(_tx_buffer != NULL){
    cbuf *b = _tx_buffer;
    _tx_buffer = NULL;
    delete b;
  }
}

void SyncClient::_onConnect(AsyncClient *c){
  _client = c;
  if(_tx_buffer != NULL){
    cbuf *b = _tx_buffer;
    _tx_buffer = NULL;
    delete b;
  }
  _tx_buffer = new (std::nothrow) cbuf(_tx_buffer_size);
  _attachCallbacks_AfterConnected();
}

void SyncClient::_attachCallbacks(){
  _attachCallbacks_Disconnect();
  _attachCallbacks_AfterConnected();
}

void SyncClient::_attachCallbacks_AfterConnected(){
  _client->onAck([](void *obj, AsyncClient* c, size_t len, uint32_t time){ (void)c; (void)len; (void)time; ((SyncClient*)(obj))->_sendBuffer(); }, this);
  _client->onData([](void *obj, AsyncClient* c, void *data, size_t len){ (void)c; ((SyncClient*)(obj))->_onData(data, len); }, this);
  _client->onTimeout([](void *obj, AsyncClient* c, uint32_t time){ (void)obj; (void)time; c->close(); }, this);
}

void SyncClient::_attachCallbacks_Disconnect(){
  _client->onDisconnect([](void *obj, AsyncClient* c){ ((SyncClient*)(obj))->_onDisconnect(); delete c; }, this);
}

size_t SyncClient::write(uint8_t data){
  return write(&data, 1);
}

size_t SyncClient::write(const uint8_t *data, size_t len){
  if(_tx_buffer == NULL || !connected()){
    return 0;
  }
  size_t toWrite = 0;
  size_t toSend = len;
  while(_tx_buffer->room() < toSend){
    toWrite = _tx_buffer->room();
    _tx_buffer->write((const char*)data, toWrite);
    while(connected() && !_client->canSend())
      delay(0);
    if(!connected())
      return 0;
    _sendBuffer();
    toSend -= toWrite;
  }
  _tx_buffer->write((const char*)(data+(len - toSend)), toSend);
  if(connected() && _client->canSend())
    _sendBuffer();
  return len;
}

int SyncClient::available(){
  if(_rx_buffer == NULL) return 0;
  size_t a = 0;
  cbuf *b = _rx_buffer;
  while(b != NULL){
    a += b->available();
    b = b->next;
  }
  return a;
}

int SyncClient::peek(){
  if(_rx_buffer == NULL) return -1;
  return _rx_buffer->peek();
}

int SyncClient::read(uint8_t *data, size_t len){
  if(_rx_buffer == NULL) return -1;

  size_t readSoFar = 0;
  while(_rx_buffer != NULL && (len - readSoFar) >= _rx_buffer->available()){
    cbuf *b = _rx_buffer;
    _rx_buffer = _rx_buffer->next;
    size_t toRead = b->available();
    readSoFar += b->read((char*)(data+readSoFar), toRead);
    if(connected()){
        _client->ack(b->size() - 1);
    }
    delete b;
  }
  if(_rx_buffer != NULL && readSoFar < len){
    readSoFar += _rx_buffer->read((char*)(data+readSoFar), (len - readSoFar));
  }
  return readSoFar;
}

int SyncClient::read(){
  uint8_t res = 0;
  if(read(&res, 1) != 1)
    return -1;
  return res;
}

bool SyncClient::flush(unsigned int maxWaitMs){
  (void)maxWaitMs;
  if(_tx_buffer == NULL || !connected())
    return false;
  if(_tx_buffer->available()){
    while(connected() && !_client->canSend())
      delay(0);
    if(_client == NULL || _tx_buffer == NULL)
      return false;
    _sendBuffer();
  }
  return true;
}