Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
// SPDX-License-Identifier: LGPL-3.0-or-later
2
// Copyright 2016-2026 Hristo Gochkov, Mathieu Carbou, Emil Muratov, Will Miles
3
 
4
#include "AsyncTCP.h"
5
#include "AsyncTCPLogging.h"
6
#include "AsyncTCPSimpleIntrusiveList.h"
7
 
8
/**
9
 * LibreTiny specific configurations
10
 */
11
#if defined(LIBRETINY)
12
#include <Arduino.h>
13
// LibreTiny does not support IDF - disable code that expects it to be available
14
#define ESP_IDF_VERSION_MAJOR (0)
15
// xTaskCreatePinnedToCore is not available, force single-core operation
16
#define CONFIG_FREERTOS_UNICORE 1
17
// ESP watchdog is not available
18
#undef CONFIG_ASYNC_TCP_USE_WDT
19
#define CONFIG_ASYNC_TCP_USE_WDT 0
20
#endif  // LIBRETINY
21
 
22
/**
23
 * Arduino specific configurations
24
 */
25
#if defined(ARDUINO) && !defined(LIBRETINY)
26
#include <Arduino.h>
27
#include <esp_idf_version.h>
28
#if (ESP_IDF_VERSION_MAJOR >= 5)
29
#include <NetworkInterface.h>
30
#endif  // ESP_IDF_VERSION_MAJOR
31
#endif  // ARDUINO
32
 
33
/**
34
 * ESP-IDF specific configurations
35
 */
36
#if !defined(LIBRETINY) && !defined(ARDUINO)
37
#include "esp_timer.h"
38
static unsigned long millis() {
39
  return (unsigned long)(esp_timer_get_time() / 1000ULL);
40
}
41
#endif  // !LIBRETINY && !ARDUINO
42
 
43
extern "C" {
44
#include "lwip/dns.h"
45
#include "lwip/err.h"
46
#include "lwip/inet.h"
47
#include "lwip/opt.h"
48
#include "lwip/tcp.h"
49
#include "lwip/tcpip.h"
50
}
51
 
52
#if CONFIG_ASYNC_TCP_USE_WDT
53
#include "esp_task_wdt.h"
54
#endif
55
 
56
// Required for:
57
// https://github.com/espressif/arduino-esp32/blob/3.0.3/libraries/Network/src/NetworkInterface.cpp#L37-L47
58
 
59
#if CONFIG_ASYNC_TCP_USE_WDT
60
#include "esp_task_wdt.h"
61
#define ASYNC_TCP_MAX_TASK_SLEEP (pdMS_TO_TICKS(1000 * CONFIG_ESP_TASK_WDT_TIMEOUT_S) / 4)
62
#else
63
#define ASYNC_TCP_MAX_TASK_SLEEP portMAX_DELAY
64
#endif
65
 
66
// https://github.com/espressif/arduino-esp32/issues/10526
67
namespace {
68
#ifdef CONFIG_LWIP_TCPIP_CORE_LOCKING
69
struct tcp_core_guard {
70
  bool do_lock;
71
  inline tcp_core_guard() : do_lock(!sys_thread_tcpip(LWIP_CORE_LOCK_QUERY_HOLDER)) {
72
    if (do_lock) {
73
      LOCK_TCPIP_CORE();
74
    }
75
  }
76
  inline ~tcp_core_guard() {
77
    if (do_lock) {
78
      UNLOCK_TCPIP_CORE();
79
    }
80
  }
81
  tcp_core_guard(const tcp_core_guard &) = delete;
82
  tcp_core_guard(tcp_core_guard &&) = delete;
83
  tcp_core_guard &operator=(const tcp_core_guard &) = delete;
84
  tcp_core_guard &operator=(tcp_core_guard &&) = delete;
85
} __attribute__((unused));
86
#else   // CONFIG_LWIP_TCPIP_CORE_LOCKING
87
struct tcp_core_guard {
88
} __attribute__((unused));
89
#endif  // CONFIG_LWIP_TCPIP_CORE_LOCKING
90
}  // anonymous namespace
91
 
92
#define INVALID_CLOSED_SLOT -1
93
 
94
/*
95
  TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second
96
  https://github.com/espressif/esp-lwip/blob/2acf959a2bb559313cd2bf9306c24612ba3d0e19/src/core/tcp.c#L1895
97
*/
98
#define CONFIG_ASYNC_TCP_POLL_TIMER 1
99
 
100
/*
101
 * TCP/IP Event Task
102
 * */
103
 
104
typedef enum {
105
  LWIP_TCP_SENT,
106
  LWIP_TCP_RECV,
107
  LWIP_TCP_FIN,
108
  LWIP_TCP_ERROR,
109
  LWIP_TCP_POLL,
110
  LWIP_TCP_ACCEPT,
111
  LWIP_TCP_CONNECTED,
112
  LWIP_TCP_DNS
113
} lwip_tcp_event_t;
114
 
115
struct lwip_tcp_event_packet_t {
116
  lwip_tcp_event_packet_t *next;
117
  lwip_tcp_event_t event;
118
  AsyncClient *client;
119
  union {
120
    struct {
121
      tcp_pcb *pcb;
122
      int8_t err;
123
    } connected;
124
    struct {
125
      int8_t err;
126
    } error;
127
    struct {
128
      tcp_pcb *pcb;
129
      uint16_t len;
130
    } sent;
131
    struct {
132
      tcp_pcb *pcb;
133
      pbuf *pb;
134
      int8_t err;
135
    } recv;
136
    struct {
137
      tcp_pcb *pcb;
138
      int8_t err;
139
    } fin;
140
    struct {
141
      tcp_pcb *pcb;
142
    } poll;
143
    struct {
144
      AsyncServer *server;
145
    } accept;
146
    struct {
147
      const char *name;
148
      ip_addr_t addr;
149
    } dns;
150
  };
151
 
152
  inline lwip_tcp_event_packet_t(lwip_tcp_event_t _event, AsyncClient *_client) : next(nullptr), event(_event), client(_client){};
153
};
154
 
155
// Detail class for interacting with AsyncClient internals, but without exposing the API
156
class AsyncTCP_detail {
157
public:
158
  // Helper functions
159
  static void __attribute__((visibility("internal"))) handle_async_event(lwip_tcp_event_packet_t *event);
160
 
161
  // LwIP TCP event callbacks that (will) require privileged access
162
  static int8_t __attribute__((visibility("internal"))) tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err);
163
  static int8_t __attribute__((visibility("internal"))) tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len);
164
  static void __attribute__((visibility("internal"))) tcp_error(void *arg, int8_t err);
165
  static int8_t __attribute__((visibility("internal"))) tcp_poll(void *arg, struct tcp_pcb *pcb);
166
  static int8_t __attribute__((visibility("internal"))) tcp_accept(void *arg, tcp_pcb *pcb, int8_t err);
167
};
168
 
169
// Guard class for the global queue
170
namespace {
171
 
172
static SemaphoreHandle_t _async_queue_mutex = nullptr;
173
 
174
class queue_mutex_guard {
175
  bool holds_mutex;
176
 
177
public:
178
  inline queue_mutex_guard() : holds_mutex(xSemaphoreTake(_async_queue_mutex, portMAX_DELAY)){};
179
  inline ~queue_mutex_guard() {
180
    if (holds_mutex) {
181
      xSemaphoreGive(_async_queue_mutex);
182
    }
183
  };
184
  inline explicit operator bool() const {
185
    return holds_mutex;
186
  };
187
};
188
}  // anonymous namespace
189
 
190
static SimpleIntrusiveList<lwip_tcp_event_packet_t> _async_queue;
191
static TaskHandle_t _async_service_task_handle = NULL;
192
 
193
static uint32_t _xor_shift_state = 31;  // any nonzero seed will do
194
static uint32_t _xor_shift_next() {
195
  uint32_t x = _xor_shift_state;
196
  x ^= x << 13;
197
  x ^= x >> 17;
198
  x ^= x << 5;
199
  return _xor_shift_state = x;
200
}
201
 
202
static void _free_event(lwip_tcp_event_packet_t *evpkt) {
203
  if ((evpkt->event == LWIP_TCP_RECV) && (evpkt->recv.pb != nullptr)) {
204
    pbuf_free(evpkt->recv.pb);
205
  }
206
  delete evpkt;
207
}
208
 
209
static inline void _send_async_event(lwip_tcp_event_packet_t *e) {
210
  if (e == nullptr) {
211
    return;
212
  }
213
  _async_queue.push_back(e);
214
  xTaskNotifyGive(_async_service_task_handle);
215
}
216
 
217
static inline void _prepend_async_event(lwip_tcp_event_packet_t *e) {
218
  if (e == nullptr) {
219
    return;
220
  }
221
  _async_queue.push_front(e);
222
  xTaskNotifyGive(_async_service_task_handle);
223
}
224
 
225
static inline lwip_tcp_event_packet_t *_get_async_event() {
226
  queue_mutex_guard guard;
227
  while (1) {
228
    lwip_tcp_event_packet_t *e = _async_queue.pop_front();
229
 
230
    if ((!e) || (e->event != LWIP_TCP_POLL)) {
231
      return e;
232
    }
233
 
234
    /*
235
      Let's try to coalesce two (or more) consecutive poll events into one
236
      this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
237
      if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks.
238
      This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time.
239
      It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving.
240
      todo: implement some kind of fair dequeuing or (better) simply punish user for a bad designed callbacks by resetting hog connections
241
    */
242
    for (lwip_tcp_event_packet_t *next_pkt = _async_queue.begin(); next_pkt && (next_pkt->client == e->client) && (next_pkt->event == LWIP_TCP_POLL);
243
         next_pkt = _async_queue.begin()) {
244
      // if the next event that will come is a poll event for the same connection, we can discard it and continue
245
      _free_event(_async_queue.pop_front());
246
      async_tcp_log_d("coalescing polls, network congestion or async callbacks might be too slow!");
247
    }
248
 
249
    /*
250
      now we have to decide if to proceed with poll callback handler or discard it?
251
      poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
252
      I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect
253
      for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length
254
      grows under control (if possible) and poll events are the safest to discard.
255
      Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4
256
      Poll events are periodic and connection could get another chance next time
257
    */
258
    if (_async_queue.size() > (_xor_shift_next() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
259
      _free_event(e);
260
      async_tcp_log_d("discarding poll due to queue congestion");
261
      continue;
262
    }
263
 
264
    return e;
265
  }
266
}
267
 
268
static size_t _remove_events_for_client(AsyncClient *client) {
269
  lwip_tcp_event_packet_t *removed_event_chain;
270
  {
271
    queue_mutex_guard guard;
272
    removed_event_chain = _async_queue.remove_if([=](lwip_tcp_event_packet_t &pkt) {
273
      return pkt.client == client;
274
    });
275
  }
276
 
277
  size_t count = 0;
278
  while (removed_event_chain) {
279
    ++count;
280
    auto t = removed_event_chain;
281
    removed_event_chain = t->next;
282
    _free_event(t);
283
  }
284
  return count;
285
};
286
 
287
void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) {
288
  if (e->client == NULL) {
289
    // do nothing when arg is NULL
290
    // ets_printf("event arg == NULL: 0x%08x\n", e->recv.pcb);
291
  } else if (e->event == LWIP_TCP_RECV) {
292
    // ets_printf("-R: 0x%08x\n", e->recv.pcb);
293
    e->client->_recv(e->recv.pcb, e->recv.pb, e->recv.err);
294
    e->recv.pb = nullptr;  // given to client
295
  } else if (e->event == LWIP_TCP_FIN) {
296
    // ets_printf("-F: 0x%08x\n", e->fin.pcb);
297
    e->client->_fin(e->fin.pcb, e->fin.err);
298
  } else if (e->event == LWIP_TCP_SENT) {
299
    // ets_printf("-S: 0x%08x\n", e->sent.pcb);
300
    e->client->_sent(e->sent.pcb, e->sent.len);
301
  } else if (e->event == LWIP_TCP_POLL) {
302
    // ets_printf("-P: 0x%08x\n", e->poll.pcb);
303
    e->client->_poll(e->poll.pcb);
304
  } else if (e->event == LWIP_TCP_ERROR) {
305
    // ets_printf("-E: 0x%08x %d\n", e->client, e->error.err);
306
    e->client->_error(e->error.err);
307
  } else if (e->event == LWIP_TCP_CONNECTED) {
308
    // ets_printf("C: 0x%08x 0x%08x %d\n", e->client, e->connected.pcb, e->connected.err);
309
    e->client->_connected(e->connected.pcb, e->connected.err);
310
  } else if (e->event == LWIP_TCP_ACCEPT) {
311
    // ets_printf("A: 0x%08x 0x%08x\n", e->client, e->accept.client);
312
    e->accept.server->_accepted(e->client);
313
  } else if (e->event == LWIP_TCP_DNS) {
314
    // ets_printf("D: 0x%08x %s = %s\n", e->client, e->dns.name, ipaddr_ntoa(&e->dns.addr));
315
    e->client->_dns_found(&e->dns.addr);
316
  }
317
  _free_event(e);
318
}
319
 
320
static void _async_service_task(void *pvParameters) {
321
#if CONFIG_ASYNC_TCP_USE_WDT
322
  if (esp_task_wdt_add(NULL) != ESP_OK) {
323
    async_tcp_log_w("Failed to add async task to WDT");
324
  }
325
#endif
326
  for (;;) {
327
    while (auto packet = _get_async_event()) {
328
      AsyncTCP_detail::handle_async_event(packet);
329
#if CONFIG_ASYNC_TCP_USE_WDT
330
      esp_task_wdt_reset();
331
#endif
332
    }
333
    // queue is empty
334
    // DEBUG_PRINTF("Async task waiting 0x%08",(intptr_t)_async_queue_head);
335
    ulTaskNotifyTake(pdTRUE, ASYNC_TCP_MAX_TASK_SLEEP);
336
    // DEBUG_PRINTF("Async task woke = %d 0x%08x",q, (intptr_t)_async_queue_head);
337
#if CONFIG_ASYNC_TCP_USE_WDT
338
    esp_task_wdt_reset();
339
#endif
340
  }
341
#if CONFIG_ASYNC_TCP_USE_WDT
342
  esp_task_wdt_delete(NULL);
343
#endif
344
  vTaskDelete(NULL);
345
  _async_service_task_handle = NULL;
346
}
347
 
348
/*
349
static void _stop_async_task(){
350
    if(_async_service_task_handle){
351
        vTaskDelete(_async_service_task_handle);
352
        _async_service_task_handle = NULL;
353
    }
354
}
355
*/
356
 
357
static bool customTaskCreateUniversal(
358
  TaskFunction_t pxTaskCode, const char *const pcName, const uint32_t usStackDepth, void *const pvParameters, UBaseType_t uxPriority,
359
  TaskHandle_t *const pxCreatedTask, const BaseType_t xCoreID
360
) {
361
#ifndef CONFIG_FREERTOS_UNICORE
362
  if (xCoreID >= 0 && xCoreID < 2) {
363
    return xTaskCreatePinnedToCore(pxTaskCode, pcName, usStackDepth, pvParameters, uxPriority, pxCreatedTask, xCoreID);
364
  } else {
365
#endif
366
    return xTaskCreate(pxTaskCode, pcName, usStackDepth, pvParameters, uxPriority, pxCreatedTask);
367
#ifndef CONFIG_FREERTOS_UNICORE
368
  }
369
#endif
370
}
371
 
372
static bool _start_async_task() {
373
  if (!_async_queue_mutex) {
374
    _async_queue_mutex = xSemaphoreCreateMutex();
375
    if (!_async_queue_mutex) {
376
      return false;
377
    }
378
  }
379
 
380
  if (!_async_service_task_handle) {
381
    customTaskCreateUniversal(
382
      _async_service_task, "async_tcp", CONFIG_ASYNC_TCP_STACK_SIZE, NULL, CONFIG_ASYNC_TCP_PRIORITY, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE
383
    );
384
    if (!_async_service_task_handle) {
385
      return false;
386
    }
387
  }
388
  return true;
389
}
390
 
391
/*
392
 * LwIP Callbacks
393
 * */
394
 
395
static void _bind_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) {
396
  tcp_arg(pcb, client);
397
  tcp_recv(pcb, &AsyncTCP_detail::tcp_recv);
398
  tcp_sent(pcb, &AsyncTCP_detail::tcp_sent);
399
  tcp_err(pcb, &AsyncTCP_detail::tcp_error);
400
  tcp_poll(pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
401
}
402
 
403
static void _reset_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) {
404
  tcp_arg(pcb, NULL);
405
  tcp_sent(pcb, NULL);
406
  tcp_recv(pcb, NULL);
407
  tcp_err(pcb, NULL);
408
  tcp_poll(pcb, NULL, 0);
409
  if (client) {
410
    _remove_events_for_client(client);
411
  }
412
}
413
 
414
static int8_t _tcp_connected(void *arg, tcp_pcb *pcb, int8_t err) {
415
  // ets_printf("+C: 0x%08x\n", pcb);
416
  AsyncClient *client = reinterpret_cast<AsyncClient *>(arg);
417
  lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_CONNECTED, client};
418
  if (!e) {
419
    async_tcp_log_e("Failed to allocate event packet");
420
    return ERR_MEM;
421
  }
422
  e->connected.pcb = pcb;
423
  e->connected.err = err;
424
  queue_mutex_guard guard;
425
  _send_async_event(e);
426
  return ERR_OK;
427
}
428
 
429
int8_t AsyncTCP_detail::tcp_poll(void *arg, struct tcp_pcb *pcb) {
430
  // throttle polling events queueing when event queue is getting filled up, let it handle _onack's
431
  {
432
    queue_mutex_guard guard;
433
    // async_tcp_log_d("qs:%u", _async_queue.size());
434
    if (_async_queue.size() > (_xor_shift_next() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) {
435
      async_tcp_log_d("throttling");
436
      return ERR_OK;
437
    }
438
  }
439
 
440
  // ets_printf("+P: 0x%08x\n", pcb);
441
  AsyncClient *client = reinterpret_cast<AsyncClient *>(arg);
442
  lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_POLL, client};
443
  if (!e) {
444
    async_tcp_log_e("Failed to allocate event packet");
445
    return ERR_MEM;
446
  }
447
  e->poll.pcb = pcb;
448
 
449
  queue_mutex_guard guard;
450
  _send_async_event(e);
451
  return ERR_OK;
452
}
453
 
454
int8_t AsyncTCP_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) {
455
  AsyncClient *client = reinterpret_cast<AsyncClient *>(arg);
456
  lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_RECV, client};
457
  if (!e) {
458
    async_tcp_log_e("Failed to allocate event packet");
459
    return ERR_MEM;
460
  }
461
  if (pb) {
462
    // ets_printf("+R: 0x%08x\n", pcb);
463
    e->recv.pcb = pcb;
464
    e->recv.pb = pb;
465
    e->recv.err = err;
466
  } else {
467
    // ets_printf("+F: 0x%08x\n", pcb);
468
    e->event = LWIP_TCP_FIN;
469
    e->fin.pcb = pcb;
470
    e->fin.err = err;
471
  }
472
 
473
  queue_mutex_guard guard;
474
  _send_async_event(e);
475
  return ERR_OK;
476
}
477
 
478
int8_t AsyncTCP_detail::tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) {
479
  // ets_printf("+S: 0x%08x\n", pcb);
480
  AsyncClient *client = reinterpret_cast<AsyncClient *>(arg);
481
  lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_SENT, client};
482
  if (!e) {
483
    async_tcp_log_e("Failed to allocate event packet");
484
    return ERR_MEM;
485
  }
486
  e->sent.pcb = pcb;
487
  e->sent.len = len;
488
 
489
  queue_mutex_guard guard;
490
  _send_async_event(e);
491
  return ERR_OK;
492
}
493
 
494
void AsyncTCP_detail::tcp_error(void *arg, int8_t err) {
495
  // ets_printf("+E: 0x%08x\n", arg);
496
  AsyncClient *client = reinterpret_cast<AsyncClient *>(arg);
497
  if (client && client->_pcb) {
498
    // The pcb has already been freed by LwIP; do not attempt to clear the callbacks!
499
    _remove_events_for_client(client);
500
    client->_pcb = nullptr;
501
  }
502
 
503
  // enqueue event to be processed in the async task for the user callback
504
  lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_ERROR, client};
505
  if (!e) {
506
    async_tcp_log_e("Failed to allocate event packet");
507
    return;
508
  }
509
  e->error.err = err;
510
 
511
  queue_mutex_guard guard;
512
  _send_async_event(e);
513
}
514
 
515
static void _tcp_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) {
516
  // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg);
517
  auto client = reinterpret_cast<AsyncClient *>(arg);
518
 
519
  lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_DNS, client};
520
  if (!e) {
521
    async_tcp_log_e("Failed to allocate event packet");
522
    return;
523
  }
524
 
525
  e->dns.name = name;
526
  if (ipaddr) {
527
    memcpy(&e->dns.addr, ipaddr, sizeof(ip_addr_t));
528
  } else {
529
    memset(&e->dns.addr, 0, sizeof(e->dns.addr));
530
  }
531
 
532
  queue_mutex_guard guard;
533
  _send_async_event(e);
534
}
535
 
536
/*
537
 * TCP/IP API Calls
538
 * */
539
 
540
#include "lwip/priv/tcpip_priv.h"
541
 
542
typedef struct {
543
  struct tcpip_api_call_data call;
544
  tcp_pcb **pcb;
545
  int8_t err;
546
  union {
547
    AsyncClient *close;
548
    struct {
549
      const char *data;
550
      size_t size;
551
      uint8_t apiflags;
552
    } write;
553
    size_t received;
554
    struct {
555
      ip_addr_t *addr;
556
      uint16_t port;
557
      tcp_connected_fn cb;
558
    } connect;
559
    struct {
560
      ip_addr_t *addr;
561
      uint16_t port;
562
    } bind;
563
    uint8_t backlog;
564
  };
565
} tcp_api_call_t;
566
 
567
static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg) {
568
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
569
  msg->err = ERR_CONN;
570
  if (*msg->pcb) {
571
    msg->err = tcp_output(*msg->pcb);
572
  }
573
  return msg->err;
574
}
575
 
576
static esp_err_t _tcp_output(tcp_pcb **pcb) {
577
  if (!pcb || !*pcb) {
578
    return ERR_CONN;
579
  }
580
  tcp_api_call_t msg;
581
  msg.pcb = pcb;
582
  tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data *)&msg);
583
  return msg.err;
584
}
585
 
586
static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg) {
587
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
588
  msg->err = ERR_CONN;
589
  if (*msg->pcb) {
590
    msg->err = tcp_write(*msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags);
591
  }
592
  return msg->err;
593
}
594
 
595
static esp_err_t _tcp_write(tcp_pcb **pcb, const char *data, size_t size, uint8_t apiflags) {
596
  if (!pcb || !*pcb) {
597
    return ERR_CONN;
598
  }
599
  tcp_api_call_t msg;
600
  msg.pcb = pcb;
601
  msg.write.data = data;
602
  msg.write.size = size;
603
  msg.write.apiflags = apiflags;
604
  tcpip_api_call(_tcp_write_api, (struct tcpip_api_call_data *)&msg);
605
  return msg.err;
606
}
607
 
608
static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg) {
609
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
610
  msg->err = ERR_CONN;
611
  if (*msg->pcb) {
612
    msg->err = 0;
613
    tcp_recved(*msg->pcb, msg->received);
614
  }
615
  return msg->err;
616
}
617
 
618
static esp_err_t _tcp_recved(tcp_pcb **pcb, size_t len) {
619
  if (!pcb || !*pcb) {
620
    return ERR_CONN;
621
  }
622
  tcp_api_call_t msg;
623
  msg.pcb = pcb;
624
  msg.received = len;
625
  tcpip_api_call(_tcp_recved_api, (struct tcpip_api_call_data *)&msg);
626
  return msg.err;
627
}
628
 
629
static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg) {
630
  // Unlike the other calls, this is not a direct wrapper of the LwIP function;
631
  // we perform the AsyncClient teardown interlocked safely with the LwIP task.
632
 
633
  // As a postcondition, the queue must not have any events referencing
634
  // the AsyncClient in api_call_msg->close.  This is because it is possible for
635
  // an error event to have been queued, clearing the pcb*, but after the async
636
  // thread has committed to closing/destructing the AsyncClient object.
637
 
638
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
639
  msg->err = ERR_CONN;
640
  if (*msg->pcb) {
641
    tcp_pcb *pcb = *msg->pcb;
642
    _reset_tcp_callbacks(pcb, msg->close);
643
    if (tcp_close(pcb) != ERR_OK) {
644
      // We do not permit failure here: abandon the pcb anyways.
645
      tcp_abort(pcb);
646
    }
647
    msg->err = ERR_OK;
648
    *msg->pcb = nullptr;  // PCB is now the property of LwIP
649
  } else {
650
    // Ensure there is not an error event queued for this client
651
    if (_remove_events_for_client(msg->close)) {
652
      msg->err = ERR_OK;  // dispose needs to be run
653
    }
654
  }
655
  return msg->err;
656
}
657
 
658
static esp_err_t _tcp_close(tcp_pcb **pcb, AsyncClient *client) {
659
  tcp_api_call_t msg;
660
  msg.pcb = pcb;
661
  msg.close = client;
662
  tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data *)&msg);
663
  return msg.err;
664
}
665
 
666
static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg) {
667
  // Like close(), we must ensure that the queue is cleared
668
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
669
  if (*msg->pcb) {
670
    tcp_abort(*msg->pcb);
671
    *msg->pcb = nullptr;  // PCB is now the property of LwIP
672
    msg->err = ERR_ABRT;
673
  } else {
674
    msg->err = ERR_CONN;
675
  }
676
  return msg->err;
677
}
678
 
679
static esp_err_t _tcp_abort(tcp_pcb **pcb, AsyncClient *client) {
680
  if (!pcb || !*pcb) {
681
    return ERR_CONN;
682
  }
683
  tcp_api_call_t msg;
684
  msg.pcb = pcb;
685
  msg.close = client;
686
  tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data *)&msg);
687
  return msg.err;
688
}
689
 
690
static err_t _tcp_connect_api(struct tcpip_api_call_data *api_call_msg) {
691
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
692
  msg->err = tcp_connect(*msg->pcb, msg->connect.addr, msg->connect.port, msg->connect.cb);
693
  return msg->err;
694
}
695
 
696
static esp_err_t _tcp_connect(tcp_pcb *pcb, ip_addr_t *addr, uint16_t port, tcp_connected_fn cb) {
697
  if (!pcb) {
698
    return ESP_FAIL;
699
  }
700
  tcp_api_call_t msg;
701
  msg.pcb = &pcb;  // cannot be invalidated by LwIP at this point
702
  msg.connect.addr = addr;
703
  msg.connect.port = port;
704
  msg.connect.cb = cb;
705
  tcpip_api_call(_tcp_connect_api, (struct tcpip_api_call_data *)&msg);
706
  return msg.err;
707
}
708
 
709
static err_t _tcp_bind_api(struct tcpip_api_call_data *api_call_msg) {
710
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
711
  tcp_pcb *pcb = *msg->pcb;
712
  msg->err = tcp_bind(pcb, msg->bind.addr, msg->bind.port);
713
  if (msg->err != ERR_OK) {
714
    // Close the pcb on behalf of the server without an extra round-trip through the LwIP lock
715
    if (tcp_close(pcb) != ERR_OK) {
716
      tcp_abort(pcb);
717
    }
718
    *msg->pcb = nullptr;  // PCB is now owned by LwIP
719
  }
720
  return msg->err;
721
}
722
 
723
static esp_err_t _tcp_bind(tcp_pcb **pcb, ip_addr_t *addr, uint16_t port) {
724
  if (!pcb || !*pcb) {
725
    return ESP_FAIL;
726
  }
727
  tcp_api_call_t msg;
728
  msg.pcb = pcb;
729
  msg.bind.addr = addr;
730
  msg.bind.port = port;
731
  tcpip_api_call(_tcp_bind_api, (struct tcpip_api_call_data *)&msg);
732
  return msg.err;
733
}
734
 
735
static err_t _tcp_listen_api(struct tcpip_api_call_data *api_call_msg) {
736
  tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg;
737
  msg->err = 0;
738
  *msg->pcb = tcp_listen_with_backlog(*msg->pcb, msg->backlog);
739
  return msg->err;
740
}
741
 
742
static tcp_pcb *_tcp_listen_with_backlog(tcp_pcb *pcb, uint8_t backlog) {
743
  if (!pcb) {
744
    return NULL;
745
  }
746
  tcp_api_call_t msg;
747
  msg.pcb = &pcb;
748
  msg.backlog = backlog ? backlog : 0xFF;
749
  tcpip_api_call(_tcp_listen_api, (struct tcpip_api_call_data *)&msg);
750
  return pcb;
751
}
752
 
753
/*
754
  Async TCP Client
755
 */
756
 
757
AsyncClient::AsyncClient(tcp_pcb *pcb)
758
  : _connect_cb(0), _connect_cb_arg(0), _discard_cb(0), _discard_cb_arg(0), _sent_cb(0), _sent_cb_arg(0), _error_cb(0), _error_cb_arg(0), _recv_cb(0),
759
    _recv_cb_arg(0), _pb_cb(0), _pb_cb_arg(0), _timeout_cb(0), _timeout_cb_arg(0), _poll_cb(0), _poll_cb_arg(0), _ack_pcb(true), _tx_last_packet(0),
760
    _rx_timeout(0), _rx_last_ack(0), _ack_timeout(CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port(0) {
761
  _pcb = pcb;
762
  if (_pcb) {
763
    _rx_last_packet = millis();
764
    _bind_tcp_callbacks(_pcb, this);
765
  }
766
}
767
 
768
AsyncClient::~AsyncClient() {
769
  if (_pcb) {
770
    _close();
771
  }
772
}
773
 
774
/*
775
 * Operators
776
 * */
777
 
778
bool AsyncClient::operator==(const AsyncClient &other) const {
779
  return _pcb == other._pcb;
780
}
781
 
782
/*
783
 * Callback Setters
784
 * */
785
 
786
void AsyncClient::onConnect(AcConnectHandler cb, void *arg) {
787
  _connect_cb = cb;
788
  _connect_cb_arg = arg;
789
}
790
 
791
void AsyncClient::onDisconnect(AcConnectHandler cb, void *arg) {
792
  _discard_cb = cb;
793
  _discard_cb_arg = arg;
794
}
795
 
796
void AsyncClient::onAck(AcAckHandler cb, void *arg) {
797
  _sent_cb = cb;
798
  _sent_cb_arg = arg;
799
}
800
 
801
void AsyncClient::onError(AcErrorHandler cb, void *arg) {
802
  _error_cb = cb;
803
  _error_cb_arg = arg;
804
}
805
 
806
void AsyncClient::onData(AcDataHandler cb, void *arg) {
807
  _recv_cb = cb;
808
  _recv_cb_arg = arg;
809
}
810
 
811
void AsyncClient::onPacket(AcPacketHandler cb, void *arg) {
812
  _pb_cb = cb;
813
  _pb_cb_arg = arg;
814
}
815
 
816
void AsyncClient::onTimeout(AcTimeoutHandler cb, void *arg) {
817
  _timeout_cb = cb;
818
  _timeout_cb_arg = arg;
819
}
820
 
821
void AsyncClient::onPoll(AcConnectHandler cb, void *arg) {
822
  _poll_cb = cb;
823
  _poll_cb_arg = arg;
824
}
825
 
826
/*
827
 * Main Public Methods
828
 * */
829
 
830
bool AsyncClient::connect(ip_addr_t addr, uint16_t port) {
831
  if (_pcb) {
832
    async_tcp_log_d("already connected, state %d", _pcb->state);
833
    return false;
834
  }
835
  if (!_start_async_task()) {
836
    async_tcp_log_e("failed to start task");
837
    return false;
838
  }
839
 
840
  tcp_pcb *pcb;
841
  {
842
    tcp_core_guard tcg;
843
#if LWIP_IPV4 && LWIP_IPV6
844
    pcb = tcp_new_ip_type(addr.type);
845
#else
846
    pcb = tcp_new_ip_type(IPADDR_TYPE_V4);
847
#endif
848
    if (!pcb) {
849
      async_tcp_log_e("pcb == NULL");
850
      return false;
851
    }
852
    _bind_tcp_callbacks(pcb, this);
853
  }
854
 
855
  esp_err_t err = _tcp_connect(pcb, &addr, port, (tcp_connected_fn)&_tcp_connected);
856
  return err == ESP_OK;
857
}
858
 
859
#ifdef ARDUINO
860
bool AsyncClient::connect(const IPAddress &ip, uint16_t port) {
861
  ip_addr_t addr;
862
#if ESP_IDF_VERSION_MAJOR < 5
863
#if LWIP_IPV4 && LWIP_IPV6
864
  // if both IPv4 and IPv6 are enabled, ip_addr_t has a union field and the address type
865
  addr.u_addr.ip4.addr = ip;
866
  addr.type = IPADDR_TYPE_V4;
867
#else
868
  addr.addr = ip;
869
#endif
870
#else
871
  ip.to_ip_addr_t(&addr);
872
#endif
873
 
874
  return connect(addr, port);
875
}
876
#endif
877
 
878
#if LWIP_IPV6 && ESP_IDF_VERSION_MAJOR < 5
879
bool AsyncClient::connect(const IPv6Address &ip, uint16_t port) {
880
  auto ipaddr = static_cast<const uint32_t *>(ip);
881
  ip_addr_t addr = IPADDR6_INIT(ipaddr[0], ipaddr[1], ipaddr[2], ipaddr[3]);
882
 
883
  return connect(addr, port);
884
}
885
#endif
886
 
887
bool AsyncClient::connect(const char *host, uint16_t port) {
888
  ip_addr_t addr;
889
 
890
  if (!_start_async_task()) {
891
    async_tcp_log_e("failed to start task");
892
    return false;
893
  }
894
 
895
  err_t err;
896
  {
897
    tcp_core_guard tcg;
898
    err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this);
899
  }
900
 
901
  if (err == ERR_OK) {
902
#if ESP_IDF_VERSION_MAJOR < 5
903
#if LWIP_IPV6
904
    if (addr.type == IPADDR_TYPE_V6) {
905
      return connect(IPv6Address(addr.u_addr.ip6.addr), port);
906
    }
907
    return connect(IPAddress(addr.u_addr.ip4.addr), port);
908
#else
909
    return connect(IPAddress(addr.addr), port);
910
#endif
911
#else
912
    return connect(addr, port);
913
#endif
914
  } else if (err == ERR_INPROGRESS) {
915
    _connect_port = port;
916
    return true;
917
  }
918
  async_tcp_log_d("error: %d", err);
919
  return false;
920
}
921
 
922
void AsyncClient::close() {
923
  if (_pcb) {
924
    _tcp_recved(&_pcb, _rx_ack_len);
925
  }
926
  _close();
927
}
928
 
929
int8_t AsyncClient::abort() {
930
  return _tcp_abort(&_pcb, this);
931
  // _pcb is now NULL
932
}
933
 
934
size_t AsyncClient::space() const {
935
  if ((_pcb != NULL) && (_pcb->state == ESTABLISHED)) {
936
    return tcp_sndbuf(_pcb);
937
  }
938
  return 0;
939
}
940
 
941
size_t AsyncClient::add(const char *data, size_t size, uint8_t apiflags) {
942
  if (!_pcb || size == 0 || data == NULL) {
943
    return 0;
944
  }
945
  size_t room = space();
946
  if (!room) {
947
    return 0;
948
  }
949
  size_t will_send = (room < size) ? room : size;
950
  int8_t err = ERR_OK;
951
  err = _tcp_write(&_pcb, data, will_send, apiflags);
952
  if (err != ERR_OK) {
953
    return 0;
954
  }
955
  return will_send;
956
}
957
 
958
bool AsyncClient::send() {
959
  auto backup = _tx_last_packet;
960
  _tx_last_packet = millis();
961
  if (_tcp_output(&_pcb) == ERR_OK) {
962
    return true;
963
  }
964
  _tx_last_packet = backup;
965
  return false;
966
}
967
 
968
size_t AsyncClient::ack(size_t len) {
969
  if (len > _rx_ack_len) {
970
    len = _rx_ack_len;
971
  }
972
  if (len) {
973
    _tcp_recved(&_pcb, len);
974
  }
975
  _rx_ack_len -= len;
976
  return len;
977
}
978
 
979
void AsyncClient::ackPacket(struct pbuf *pb) {
980
  if (!pb) {
981
    return;
982
  }
983
  _tcp_recved(&_pcb, pb->len);
984
  pbuf_free(pb);
985
}
986
 
987
/*
988
 * Main Private Methods
989
 * */
990
 
991
int8_t AsyncClient::_close() {
992
  // ets_printf("X: 0x%08x\n", (uint32_t)this);
993
  int8_t err = _tcp_close(&_pcb, this);
994
  // _pcb is now NULL
995
  if ((err == ERR_OK) && _discard_cb) {
996
    // _pcb was closed here
997
    _discard_cb(_discard_cb_arg, this);
998
  }
999
  return err;
1000
}
1001
 
1002
/*
1003
 * Private Callbacks
1004
 * */
1005
 
1006
int8_t AsyncClient::_connected(tcp_pcb *pcb, int8_t err) {
1007
  _pcb = reinterpret_cast<tcp_pcb *>(pcb);
1008
  if (_pcb) {
1009
    _rx_last_packet = millis();
1010
  }
1011
  _tx_last_packet = 0;
1012
  _rx_last_ack = 0;
1013
  if (_connect_cb) {
1014
    _connect_cb(_connect_cb_arg, this);
1015
  }
1016
  return ERR_OK;
1017
}
1018
 
1019
void AsyncClient::_error(int8_t err) {
1020
  if (_error_cb) {
1021
    _error_cb(_error_cb_arg, this, err);
1022
  }
1023
  if (_discard_cb) {
1024
    _discard_cb(_discard_cb_arg, this);
1025
  }
1026
}
1027
 
1028
// In LwIP Thread
1029
int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) {
1030
  if (!_pcb || pcb != _pcb) {
1031
    async_tcp_log_d("0x%08" PRIx32 " != 0x%08" PRIx32, (uint32_t)pcb, (uint32_t)_pcb);
1032
    return ERR_OK;
1033
  }
1034
  _reset_tcp_callbacks(_pcb, this);
1035
  if (tcp_close(_pcb) != ERR_OK) {
1036
    tcp_abort(_pcb);
1037
  }
1038
  _pcb = NULL;
1039
  return ERR_OK;
1040
}
1041
 
1042
// In Async Thread
1043
int8_t AsyncClient::_fin(tcp_pcb *pcb, int8_t err) {
1044
  close();
1045
  return ERR_OK;
1046
}
1047
 
1048
int8_t AsyncClient::_sent(tcp_pcb *pcb, uint16_t len) {
1049
  _rx_last_ack = _rx_last_packet = millis();
1050
  if (_sent_cb) {
1051
    _sent_cb(_sent_cb_arg, this, len, (_rx_last_packet - _tx_last_packet));
1052
  }
1053
  return ERR_OK;
1054
}
1055
 
1056
int8_t AsyncClient::_recv(tcp_pcb *pcb, pbuf *pb, int8_t err) {
1057
  while (pb != NULL) {
1058
    _rx_last_packet = millis();
1059
    // we should not ack before we assimilate the data
1060
    _ack_pcb = true;
1061
    pbuf *b = pb;
1062
    pb = b->next;
1063
    b->next = NULL;
1064
    if (_pb_cb) {
1065
      _pb_cb(_pb_cb_arg, this, b);
1066
    } else {
1067
      if (_recv_cb) {
1068
        _recv_cb(_recv_cb_arg, this, b->payload, b->len);
1069
      }
1070
      if (!_ack_pcb) {
1071
        _rx_ack_len += b->len;
1072
      } else if (_pcb) {
1073
        _tcp_recved(&_pcb, b->len);
1074
      }
1075
      pbuf_free(b);
1076
    }
1077
  }
1078
  return ERR_OK;
1079
}
1080
 
1081
int8_t AsyncClient::_poll(tcp_pcb *pcb) {
1082
  if (!_pcb) {
1083
    // async_tcp_log_d("pcb is NULL");
1084
    return ERR_OK;
1085
  }
1086
  if (pcb != _pcb) {
1087
    async_tcp_log_d("0x%08" PRIx32 " != 0x%08" PRIx32, (uint32_t)pcb, (uint32_t)_pcb);
1088
    return ERR_OK;
1089
  }
1090
 
1091
  uint32_t now = millis();
1092
 
1093
  // ACK Timeout
1094
  if (_ack_timeout) {
1095
    const uint32_t one_day = 86400000;
1096
    bool last_tx_is_after_last_ack = (_rx_last_ack - _tx_last_packet + one_day) < one_day;
1097
    if (last_tx_is_after_last_ack && (now - _tx_last_packet) >= _ack_timeout) {
1098
      async_tcp_log_d("ack timeout %d", pcb->state);
1099
      if (_timeout_cb) {
1100
        _timeout_cb(_timeout_cb_arg, this, (now - _tx_last_packet));
1101
      }
1102
      return ERR_OK;
1103
    }
1104
  }
1105
  // RX Timeout
1106
  if (_rx_timeout && (now - _rx_last_packet) >= (_rx_timeout * 1000)) {
1107
    async_tcp_log_d("rx timeout %d", pcb->state);
1108
    _close();
1109
    return ERR_OK;
1110
  }
1111
  // Everything is fine
1112
  if (_poll_cb) {
1113
    _poll_cb(_poll_cb_arg, this);
1114
  }
1115
  return ERR_OK;
1116
}
1117
 
1118
void AsyncClient::_dns_found(ip_addr_t *ipaddr) {
1119
  if (ipaddr) {
1120
    connect(*ipaddr, _connect_port);
1121
  } else {
1122
    if (_error_cb) {
1123
      _error_cb(_error_cb_arg, this, -55);
1124
    }
1125
    if (_discard_cb) {
1126
      _discard_cb(_discard_cb_arg, this);
1127
    }
1128
  }
1129
}
1130
 
1131
/*
1132
 * Public Helper Methods
1133
 * */
1134
 
1135
bool AsyncClient::free() {
1136
  if (!_pcb) {
1137
    return true;
1138
  }
1139
  if (_pcb->state == CLOSED || _pcb->state > ESTABLISHED) {
1140
    return true;
1141
  }
1142
  return false;
1143
}
1144
 
1145
size_t AsyncClient::write(const char *data, size_t size, uint8_t apiflags) {
1146
  size_t will_send = add(data, size, apiflags);
1147
  if (!will_send || !send()) {
1148
    return 0;
1149
  }
1150
  return will_send;
1151
}
1152
 
1153
void AsyncClient::setRxTimeout(uint32_t timeout) {
1154
  _rx_timeout = timeout;
1155
}
1156
 
1157
uint32_t AsyncClient::getRxTimeout() const {
1158
  return _rx_timeout;
1159
}
1160
 
1161
uint32_t AsyncClient::getAckTimeout() const {
1162
  return _ack_timeout;
1163
}
1164
 
1165
void AsyncClient::setAckTimeout(uint32_t timeout) {
1166
  _ack_timeout = timeout;
1167
}
1168
 
1169
void AsyncClient::setNoDelay(bool nodelay) const {
1170
  if (!_pcb) {
1171
    return;
1172
  }
1173
  if (nodelay) {
1174
    tcp_nagle_disable(_pcb);
1175
  } else {
1176
    tcp_nagle_enable(_pcb);
1177
  }
1178
}
1179
 
1180
bool AsyncClient::getNoDelay() {
1181
  if (!_pcb) {
1182
    return false;
1183
  }
1184
  return tcp_nagle_disabled(_pcb);
1185
}
1186
 
1187
void AsyncClient::setKeepAlive(uint32_t ms, uint8_t cnt) {
1188
  if (ms != 0) {
1189
    _pcb->so_options |= SOF_KEEPALIVE;  // Turn on TCP Keepalive for the given pcb
1190
    // Set the time between keepalive messages in milli-seconds
1191
    _pcb->keep_idle = ms;
1192
    _pcb->keep_intvl = ms;
1193
    _pcb->keep_cnt = cnt;  // The number of unanswered probes required to force closure of the socket
1194
  } else {
1195
    _pcb->so_options &= ~SOF_KEEPALIVE;  // Turn off TCP Keepalive for the given pcb
1196
  }
1197
}
1198
 
1199
uint16_t AsyncClient::getMss() const {
1200
  if (!_pcb) {
1201
    return 0;
1202
  }
1203
  return tcp_mss(_pcb);
1204
}
1205
 
1206
uint32_t AsyncClient::getRemoteAddress() const {
1207
  if (!_pcb) {
1208
    return 0;
1209
  }
1210
#if LWIP_IPV4 && LWIP_IPV6
1211
  return _pcb->remote_ip.u_addr.ip4.addr;
1212
#else
1213
  return _pcb->remote_ip.addr;
1214
#endif
1215
}
1216
 
1217
#if LWIP_IPV6
1218
ip6_addr_t AsyncClient::getRemoteAddress6() const {
1219
  if (_pcb && _pcb->remote_ip.type == IPADDR_TYPE_V6) {
1220
    return _pcb->remote_ip.u_addr.ip6;
1221
  } else {
1222
    ip6_addr_t nulladdr;
1223
    ip6_addr_set_zero(&nulladdr);
1224
    return nulladdr;
1225
  }
1226
}
1227
 
1228
ip6_addr_t AsyncClient::getLocalAddress6() const {
1229
  if (_pcb && _pcb->local_ip.type == IPADDR_TYPE_V6) {
1230
    return _pcb->local_ip.u_addr.ip6;
1231
  } else {
1232
    ip6_addr_t nulladdr;
1233
    ip6_addr_set_zero(&nulladdr);
1234
    return nulladdr;
1235
  }
1236
}
1237
#ifdef ARDUINO
1238
#if ESP_IDF_VERSION_MAJOR < 5
1239
IPv6Address AsyncClient::remoteIP6() const {
1240
  return IPv6Address(getRemoteAddress6().addr);
1241
}
1242
 
1243
IPv6Address AsyncClient::localIP6() const {
1244
  return IPv6Address(getLocalAddress6().addr);
1245
}
1246
#else
1247
IPAddress AsyncClient::remoteIP6() const {
1248
  if (!_pcb) {
1249
    return IPAddress(IPType::IPv6);
1250
  }
1251
  IPAddress ip;
1252
  ip.from_ip_addr_t(&(_pcb->remote_ip));
1253
  return ip;
1254
}
1255
 
1256
IPAddress AsyncClient::localIP6() const {
1257
  if (!_pcb) {
1258
    return IPAddress(IPType::IPv6);
1259
  }
1260
  IPAddress ip;
1261
  ip.from_ip_addr_t(&(_pcb->local_ip));
1262
  return ip;
1263
}
1264
#endif
1265
#endif
1266
#endif
1267
 
1268
uint16_t AsyncClient::getRemotePort() const {
1269
  if (!_pcb) {
1270
    return 0;
1271
  }
1272
  return _pcb->remote_port;
1273
}
1274
 
1275
uint32_t AsyncClient::getLocalAddress() const {
1276
  if (!_pcb) {
1277
    return 0;
1278
  }
1279
#if LWIP_IPV4 && LWIP_IPV6
1280
  return _pcb->local_ip.u_addr.ip4.addr;
1281
#else
1282
  return _pcb->local_ip.addr;
1283
#endif
1284
}
1285
 
1286
uint16_t AsyncClient::getLocalPort() const {
1287
  if (!_pcb) {
1288
    return 0;
1289
  }
1290
  return _pcb->local_port;
1291
}
1292
 
1293
ip4_addr_t AsyncClient::getRemoteAddress4() const {
1294
#if LWIP_IPV4 && LWIP_IPV6
1295
  if (_pcb && _pcb->remote_ip.type == IPADDR_TYPE_V4) {
1296
    return _pcb->remote_ip.u_addr.ip4;
1297
  }
1298
#else
1299
  if (_pcb) {
1300
    return _pcb->remote_ip;
1301
  }
1302
#endif
1303
  else {
1304
    ip4_addr_t nulladdr;
1305
    ip4_addr_set_zero(&nulladdr);
1306
    return nulladdr;
1307
  }
1308
}
1309
 
1310
ip4_addr_t AsyncClient::getLocalAddress4() const {
1311
#if LWIP_IPV4 && LWIP_IPV6
1312
  if (_pcb && _pcb->local_ip.type == IPADDR_TYPE_V4) {
1313
    return _pcb->local_ip.u_addr.ip4;
1314
  }
1315
#else
1316
  if (_pcb) {
1317
    return _pcb->local_ip;
1318
  }
1319
#endif
1320
  else {
1321
    ip4_addr_t nulladdr;
1322
    ip4_addr_set_zero(&nulladdr);
1323
    return nulladdr;
1324
  }
1325
}
1326
 
1327
#ifdef ARDUINO
1328
IPAddress AsyncClient::remoteIP() const {
1329
#if ESP_IDF_VERSION_MAJOR < 5
1330
  return IPAddress(getRemoteAddress());
1331
#else
1332
  if (!_pcb) {
1333
    return IPAddress();
1334
  }
1335
  IPAddress ip;
1336
  ip.from_ip_addr_t(&(_pcb->remote_ip));
1337
  return ip;
1338
#endif
1339
}
1340
 
1341
IPAddress AsyncClient::localIP() const {
1342
#if ESP_IDF_VERSION_MAJOR < 5
1343
  return IPAddress(getLocalAddress());
1344
#else
1345
  if (!_pcb) {
1346
    return IPAddress();
1347
  }
1348
  IPAddress ip;
1349
  ip.from_ip_addr_t(&(_pcb->local_ip));
1350
  return ip;
1351
#endif
1352
}
1353
#endif
1354
 
1355
uint8_t AsyncClient::state() const {
1356
  if (!_pcb) {
1357
    return 0;
1358
  }
1359
  return _pcb->state;
1360
}
1361
 
1362
bool AsyncClient::connected() const {
1363
  if (!_pcb) {
1364
    return false;
1365
  }
1366
  return _pcb->state == ESTABLISHED;
1367
}
1368
 
1369
bool AsyncClient::connecting() const {
1370
  if (!_pcb) {
1371
    return false;
1372
  }
1373
  return _pcb->state > CLOSED && _pcb->state < ESTABLISHED;
1374
}
1375
 
1376
bool AsyncClient::disconnecting() const {
1377
  if (!_pcb) {
1378
    return false;
1379
  }
1380
  return _pcb->state > ESTABLISHED && _pcb->state < TIME_WAIT;
1381
}
1382
 
1383
bool AsyncClient::disconnected() const {
1384
  if (!_pcb) {
1385
    return true;
1386
  }
1387
  return _pcb->state == CLOSED || _pcb->state == TIME_WAIT;
1388
}
1389
 
1390
bool AsyncClient::freeable() const {
1391
  if (!_pcb) {
1392
    return true;
1393
  }
1394
  return _pcb->state == CLOSED || _pcb->state > ESTABLISHED;
1395
}
1396
 
1397
bool AsyncClient::canSend() const {
1398
  return space() > 0;
1399
}
1400
 
1401
const char *AsyncClient::errorToString(int8_t error) {
1402
  switch (error) {
1403
    case ERR_OK:         return "OK";
1404
    case ERR_MEM:        return "Out of memory error";
1405
    case ERR_BUF:        return "Buffer error";
1406
    case ERR_TIMEOUT:    return "Timeout";
1407
    case ERR_RTE:        return "Routing problem";
1408
    case ERR_INPROGRESS: return "Operation in progress";
1409
    case ERR_VAL:        return "Illegal value";
1410
    case ERR_WOULDBLOCK: return "Operation would block";
1411
    case ERR_USE:        return "Address in use";
1412
    case ERR_ALREADY:    return "Already connected";
1413
    case ERR_CONN:       return "Not connected";
1414
    case ERR_IF:         return "Low-level netif error";
1415
    case ERR_ABRT:       return "Connection aborted";
1416
    case ERR_RST:        return "Connection reset";
1417
    case ERR_CLSD:       return "Connection closed";
1418
    case ERR_ARG:        return "Illegal argument";
1419
    case -55:            return "DNS failed";
1420
    default:             return "UNKNOWN";
1421
  }
1422
}
1423
 
1424
const char *AsyncClient::stateToString() const {
1425
  switch (state()) {
1426
    case 0:  return "Closed";
1427
    case 1:  return "Listen";
1428
    case 2:  return "SYN Sent";
1429
    case 3:  return "SYN Received";
1430
    case 4:  return "Established";
1431
    case 5:  return "FIN Wait 1";
1432
    case 6:  return "FIN Wait 2";
1433
    case 7:  return "Close Wait";
1434
    case 8:  return "Closing";
1435
    case 9:  return "Last ACK";
1436
    case 10: return "Time Wait";
1437
    default: return "UNKNOWN";
1438
  }
1439
}
1440
 
1441
/*
1442
  Async TCP Server
1443
 */
1444
 
1445
AsyncServer::AsyncServer(ip_addr_t addr, uint16_t port)
1446
  : _port(port), _addr(addr), _noDelay(false), _pcb(nullptr), _connect_cb(nullptr), _connect_cb_arg(nullptr) {}
1447
 
1448
#ifdef ARDUINO
1449
AsyncServer::AsyncServer(IPAddress addr, uint16_t port) : _port(port), _noDelay(false), _pcb(0), _connect_cb(0), _connect_cb_arg(0) {
1450
#if ESP_IDF_VERSION_MAJOR < 5
1451
#if LWIP_IPV4 && LWIP_IPV6
1452
  _addr.type = IPADDR_TYPE_V4;
1453
  _addr.u_addr.ip4.addr = addr;
1454
#else
1455
  _addr.addr = addr;
1456
#endif
1457
#else
1458
  addr.to_ip_addr_t(&_addr);
1459
#endif
1460
}
1461
#if ESP_IDF_VERSION_MAJOR < 5 && __has_include(<IPv6Address.h>) && LWIP_IPV6
1462
AsyncServer::AsyncServer(IPv6Address addr, uint16_t port) : _port(port), _noDelay(false), _pcb(0), _connect_cb(0), _connect_cb_arg(0) {
1463
#if LWIP_IPV4 && LWIP_IPV6
1464
  _addr.type = IPADDR_TYPE_V6;
1465
#endif
1466
  auto ipaddr = static_cast<const uint32_t *>(addr);
1467
  _addr = IPADDR6_INIT(ipaddr[0], ipaddr[1], ipaddr[2], ipaddr[3]);
1468
}
1469
#endif
1470
#endif
1471
 
1472
AsyncServer::AsyncServer(uint16_t port) : _port(port), _noDelay(false), _pcb(0), _connect_cb(0), _connect_cb_arg(0) {
1473
#if LWIP_IPV4 && LWIP_IPV6
1474
  _addr.type = IPADDR_TYPE_ANY;
1475
  _addr.u_addr.ip4.addr = INADDR_ANY;
1476
#else
1477
  _addr.addr = INADDR_ANY;
1478
#endif
1479
}
1480
 
1481
AsyncServer::~AsyncServer() {
1482
  end();
1483
}
1484
 
1485
void AsyncServer::onClient(AcConnectHandler cb, void *arg) {
1486
  _connect_cb = cb;
1487
  _connect_cb_arg = arg;
1488
}
1489
 
1490
void AsyncServer::begin() {
1491
  if (_pcb) {
1492
    return;
1493
  }
1494
 
1495
  if (!_start_async_task()) {
1496
    async_tcp_log_e("failed to start task");
1497
    return;
1498
  }
1499
  int8_t err;
1500
  {
1501
    tcp_core_guard tcg;
1502
#if LWIP_IPV4 && LWIP_IPV6
1503
    _pcb = tcp_new_ip_type(_addr.type);
1504
#else
1505
    _pcb = tcp_new_ip_type(IPADDR_TYPE_ANY);
1506
#endif
1507
  }
1508
  if (!_pcb) {
1509
    async_tcp_log_e("_pcb == NULL");
1510
    return;
1511
  }
1512
 
1513
  err = _tcp_bind(&_pcb, &_addr, _port);
1514
 
1515
  if (err != ERR_OK) {
1516
    // pcb was closed by _tcp_bind
1517
    async_tcp_log_e("bind error: %d", err);
1518
    return;
1519
  }
1520
 
1521
  static uint8_t backlog = 5;
1522
  _pcb = _tcp_listen_with_backlog(_pcb, backlog);
1523
  if (!_pcb) {
1524
    async_tcp_log_e("listen_pcb == NULL");
1525
    return;
1526
  }
1527
  tcp_core_guard tcg;
1528
  tcp_arg(_pcb, (void *)this);
1529
  tcp_accept(_pcb, &AsyncTCP_detail::tcp_accept);
1530
}
1531
 
1532
void AsyncServer::end() {
1533
  if (_pcb) {
1534
    tcp_core_guard tcg;
1535
    tcp_arg(_pcb, NULL);
1536
    tcp_accept(_pcb, NULL);
1537
    if (tcp_close(_pcb) != ERR_OK) {
1538
      tcp_abort(_pcb);
1539
    }
1540
    _pcb = NULL;
1541
  }
1542
}
1543
 
1544
// runs on LwIP thread
1545
int8_t AsyncTCP_detail::tcp_accept(void *arg, tcp_pcb *pcb, int8_t err) {
1546
  if (!pcb) {
1547
    async_tcp_log_e("_accept failed: pcb is NULL");
1548
    return ERR_ABRT;
1549
  }
1550
  auto server = reinterpret_cast<AsyncServer *>(arg);
1551
  if (server->_connect_cb) {
1552
    AsyncClient *c = new (std::nothrow) AsyncClient(pcb);
1553
    if (c && c->pcb()) {
1554
      c->setNoDelay(server->_noDelay);
1555
 
1556
      lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_ACCEPT, c};
1557
      if (e) {
1558
        e->accept.server = server;
1559
 
1560
        queue_mutex_guard guard;
1561
        _prepend_async_event(e);
1562
        return ERR_OK;  // success
1563
      }
1564
 
1565
      // Couldn't allocate accept event
1566
      // We can't let the client object call in to close, as we're on the LWIP thread; it could deadlock trying to RPC to itself
1567
      c->_pcb = nullptr;
1568
      tcp_abort(pcb);
1569
      async_tcp_log_e("_accept failed: couldn't accept client");
1570
      return ERR_ABRT;
1571
    }
1572
    if (c) {
1573
      // Couldn't complete setup
1574
      // pcb has already been aborted
1575
      delete c;
1576
      pcb = nullptr;
1577
      async_tcp_log_e("_accept failed: couldn't complete setup");
1578
      return ERR_ABRT;
1579
    }
1580
    async_tcp_log_e("_accept failed: couldn't allocate client");
1581
  } else {
1582
    async_tcp_log_e("_accept failed: no onConnect callback");
1583
  }
1584
  tcp_abort(pcb);
1585
  return ERR_OK;
1586
}
1587
 
1588
int8_t AsyncServer::_accepted(AsyncClient *client) {
1589
  if (_connect_cb) {
1590
    _connect_cb(_connect_cb_arg, client);
1591
  }
1592
  return ERR_OK;
1593
}
1594
 
1595
void AsyncServer::setNoDelay(bool nodelay) {
1596
  _noDelay = nodelay;
1597
}
1598
 
1599
bool AsyncServer::getNoDelay() const {
1600
  return _noDelay;
1601
}
1602
 
1603
uint8_t AsyncServer::status() const {
1604
  if (!_pcb) {
1605
    return 0;
1606
  }
1607
  return _pcb->state;
1608
}