Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
// SPDX-License-Identifier: LGPL-3.0-or-later
2
// Copyright 2016-2026 Hristo Gochkov, Mathieu Carbou, Emil Muratov, Will Miles
3
 
4
#include "AsyncEventSource.h"
5
#include "AsyncWebServerLogging.h"
6
 
7
#include <algorithm>
8
#include <memory>
9
#include <utility>
10
 
11
#define ASYNC_SSE_NEW_LINE_CHAR (char)0xa
12
 
13
using namespace asyncsrv;
14
 
15
static String generateEventMessage(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
16
  String str;
17
  size_t len{0};
18
  if (message) {
19
    len += strlen(message);
20
  }
21
 
22
  if (event) {
23
    len += strlen(event);
24
  }
25
 
26
  len += 42;  // give it some overhead
27
 
28
  if (!str.reserve(len)) {
29
    async_ws_log_e("Failed to allocate");
30
    return asyncsrv::emptyString;
31
  }
32
 
33
  if (reconnect) {
34
    str += T_retry_;
35
    str += reconnect;
36
    str += ASYNC_SSE_NEW_LINE_CHAR;  // '\n'
37
  }
38
 
39
  if (id) {
40
    str += T_id__;
41
    str += id;
42
    str += ASYNC_SSE_NEW_LINE_CHAR;  // '\n'
43
  }
44
 
45
  if (event != NULL) {
46
    str += T_event_;
47
    str += event;
48
    str += ASYNC_SSE_NEW_LINE_CHAR;  // '\n'
49
  }
50
 
51
  if (!message) {
52
    return str;
53
  }
54
 
55
  size_t messageLen = strlen(message);
56
  char *lineStart = (char *)message;
57
  char *lineEnd;
58
  do {
59
    char *nextN = strchr(lineStart, '\n');
60
    char *nextR = strchr(lineStart, '\r');
61
    if (nextN == NULL && nextR == NULL) {
62
      // a message is a single-line string
63
      str += T_data_;
64
      str += message;
65
      str += T_nn;
66
      return str;
67
    }
68
 
69
    // a message is a multi-line string
70
    char *nextLine = NULL;
71
    if (nextN != NULL && nextR != NULL) {  // windows line-ending \r\n
72
      if (nextR + 1 == nextN) {
73
        // normal \r\n sequence
74
        lineEnd = nextR;
75
        nextLine = nextN + 1;
76
      } else {
77
        // some abnormal \n \r mixed sequence
78
        lineEnd = std::min(nextR, nextN);
79
        nextLine = lineEnd + 1;
80
      }
81
    } else if (nextN != NULL) {  // Unix/Mac OS X LF
82
      lineEnd = nextN;
83
      nextLine = nextN + 1;
84
    } else {  // some ancient garbage
85
      lineEnd = nextR;
86
      nextLine = nextR + 1;
87
    }
88
 
89
    str += T_data_;
90
    str.concat(lineStart, lineEnd - lineStart);
91
    str += ASYNC_SSE_NEW_LINE_CHAR;  // \n
92
 
93
    lineStart = nextLine;
94
  } while (lineStart < ((char *)message + messageLen));
95
 
96
  // append another \n to terminate message
97
  str += ASYNC_SSE_NEW_LINE_CHAR;  // '\n'
98
 
99
  return str;
100
}
101
 
102
// Message
103
 
104
size_t AsyncEventSourceMessage::ack(size_t len, __attribute__((unused)) uint32_t time) {
105
  // If the whole message is now acked...
106
  if (_acked + len > _data->length()) {
107
    // Return the number of extra bytes acked (they will be carried on to the next message)
108
    const size_t extra = _acked + len - _data->length();
109
    _acked = _data->length();
110
    return extra;
111
  }
112
  // Return that no extra bytes left.
113
  _acked += len;
114
  return 0;
115
}
116
 
117
size_t AsyncEventSourceMessage::write(AsyncClient *client) {
118
  if (!client) {
119
    return 0;
120
  }
121
 
122
  if (_sent >= _data->length() || !client->canSend()) {
123
    return 0;
124
  }
125
 
126
  size_t len = std::min(_data->length() - _sent, client->space());
127
  /*
128
    add() would call lwip's tcp_write() under the AsyncTCP hood with apiflags argument.
129
    By default apiflags=ASYNC_WRITE_FLAG_COPY
130
    we could have used apiflags with this flag unset to pass data by reference and avoid copy to socket buffer,
131
    but looks like it does not work for Arduino's lwip in ESP32/IDF
132
    it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30
133
    if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF
134
    https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744
135
 
136
    So let's just keep it enforced ASYNC_WRITE_FLAG_COPY and keep in mind that there is no zero-copy
137
  */
138
  size_t written = client->add(_data->c_str() + _sent, len, ASYNC_WRITE_FLAG_COPY);  //  ASYNC_WRITE_FLAG_MORE
139
  _sent += written;
140
  return written;
141
}
142
 
143
size_t AsyncEventSourceMessage::send(AsyncClient *client) {
144
  size_t sent = write(client);
145
  return sent && client->send() ? sent : 0;
146
}
147
 
148
// Client
149
 
150
AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server) : _client(request->clientRelease()), _server(server) {
151
 
152
  if (request->hasHeader(T_Last_Event_ID)) {
153
    _lastId = atoi(request->getHeader(T_Last_Event_ID)->value().c_str());
154
  }
155
 
156
  _client->setRxTimeout(0);
157
  _client->onError(NULL, NULL);
158
  _client->onAck(
159
    [](void *r, AsyncClient *c, size_t len, uint32_t time) {
160
      (void)c;
161
      static_cast<AsyncEventSourceClient *>(r)->_onAck(len, time);
162
    },
163
    this
164
  );
165
  _client->onPoll(
166
    [](void *r, AsyncClient *c) {
167
      (void)c;
168
      static_cast<AsyncEventSourceClient *>(r)->_onPoll();
169
    },
170
    this
171
  );
172
  _client->onData(NULL, NULL);
173
  _client->onTimeout(
174
    [this](void *r, AsyncClient *c __attribute__((unused)), uint32_t time) {
175
      static_cast<AsyncEventSourceClient *>(r)->_onTimeout(time);
176
    },
177
    this
178
  );
179
  _client->onDisconnect(
180
    [this](void *r, AsyncClient *c) {
181
      static_cast<AsyncEventSourceClient *>(r)->_onDisconnect();
182
      delete c;
183
    },
184
    this
185
  );
186
 
187
  _server->_addClient(this);
188
  _client->setNoDelay(true);
189
  // delete AsyncWebServerRequest object (and bound response) since we have the ownership on client connection now
190
  delete request;
191
}
192
 
193
AsyncEventSourceClient::~AsyncEventSourceClient() {
194
  // Protect message queue access (size checks and modifications) which is not thread-safe.
195
  asyncsrv::lock_guard_type lock(_lockmq);
196
  _messageQueue.clear();
197
  close();
198
}
199
 
200
bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
201
  // Protect message queue access (size checks and modifications) which is not thread-safe.
202
  asyncsrv::lock_guard_type lock(_lockmq);
203
 
204
  if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
205
    async_ws_log_w("Event message queue overflow: discard message");
206
    return false;
207
  }
208
 
209
  if (_client) {
210
    _messageQueue.emplace_back(message, len);
211
  } else {
212
    _messageQueue.clear();
213
    return false;
214
  }
215
 
216
  /*
217
    throttle queue run
218
    if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
219
    forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
220
    the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
221
  */
222
  if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) {
223
    _runQueue();
224
  }
225
 
226
  return true;
227
}
228
 
229
bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
230
  // Protect message queue access (size checks and modifications) which is not thread-safe.
231
  asyncsrv::lock_guard_type lock(_lockmq);
232
 
233
  if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
234
    async_ws_log_w("Event message queue overflow: discard message");
235
    return false;
236
  }
237
 
238
  if (_client) {
239
    _messageQueue.emplace_back(std::move(msg));
240
  } else {
241
    _messageQueue.clear();
242
    return false;
243
  }
244
 
245
  /*
246
    throttle queue run
247
    if Q is filled for >25% then network/CPU is congested, since there is no zero-copy mode for socket buff
248
    forcing Q run will only eat more heap ram and blow the buffer, let's just keep data in our own queue
249
    the queue will be processed at least on each onAck()/onPoll() call from AsyncTCP
250
  */
251
  if (_client && _client->canSend() && _messageQueue.size() < SSE_MAX_QUEUED_MESSAGES >> 2) {
252
    _runQueue();
253
  }
254
  return true;
255
}
256
 
257
void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))) {
258
  // Protect message queue access (size checks and modifications) which is not thread-safe.
259
  asyncsrv::lock_guard_type lock(_lockmq);
260
 
261
  // adjust in-flight len
262
  if (len < _inflight) {
263
    _inflight -= len;
264
  } else {
265
    _inflight = 0;
266
  }
267
 
268
  // acknowledge as much messages's data as we got confirmed len from a AsyncTCP
269
  while (len && _messageQueue.size()) {
270
    len = _messageQueue.front().ack(len);
271
    if (_messageQueue.front().finished()) {
272
      // now we could release full ack'ed messages, we were keeping it unless send confirmed from AsyncTCP
273
      _messageQueue.pop_front();
274
    }
275
  }
276
 
277
  // try to send another batch of data
278
  if (_messageQueue.size()) {
279
    _runQueue();
280
  }
281
}
282
 
283
void AsyncEventSourceClient::_onPoll() {
284
  // Protect message queue access (size checks and modifications) which is not thread-safe.
285
  asyncsrv::lock_guard_type lock(_lockmq);
286
  if (_messageQueue.size()) {
287
    _runQueue();
288
  }
289
}
290
 
291
void AsyncEventSourceClient::_onTimeout(uint32_t time __attribute__((unused))) {
292
  if (_client) {
293
    _client->close();
294
  }
295
}
296
 
297
void AsyncEventSourceClient::_onDisconnect() {
298
  if (!_client) {
299
    return;
300
  }
301
  _client = nullptr;
302
  _server->_handleDisconnect(this);
303
}
304
 
305
void AsyncEventSourceClient::close() {
306
  if (_client) {
307
    _client->close();
308
  }
309
}
310
 
311
bool AsyncEventSourceClient::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
312
  if (!connected()) {
313
    return false;
314
  }
315
  return _queueMessage(std::make_shared<String>(generateEventMessage(message, event, id, reconnect)));
316
}
317
 
318
void AsyncEventSourceClient::_runQueue() {
319
  if (!_client) {
320
    return;
321
  }
322
 
323
  // there is no need to lock the mutex here, 'cause all the calls to this method must be already lock'ed
324
  size_t total_bytes_written = 0;
325
  for (auto i = _messageQueue.begin(); i != _messageQueue.end(); ++i) {
326
    if (!i->sent()) {
327
      const size_t bytes_written = i->write(_client);
328
      total_bytes_written += bytes_written;
329
      _inflight += bytes_written;
330
      if (bytes_written == 0 || _inflight > _max_inflight) {
331
        // Serial.print("_");
332
        break;
333
      }
334
    }
335
  }
336
 
337
  // flush socket
338
  if (_client && total_bytes_written) {
339
    _client->send();
340
  }
341
}
342
 
343
void AsyncEventSourceClient::set_max_inflight_bytes(size_t value) {
344
  if (value >= SSE_MIN_INFLIGH && value <= SSE_MAX_INFLIGH) {
345
    _max_inflight = value;
346
  }
347
}
348
 
349
/*  AsyncEventSource  */
350
 
351
void AsyncEventSource::authorizeConnect(ArAuthorizeConnectHandler cb) {
352
  AsyncAuthorizationMiddleware *m = new AsyncAuthorizationMiddleware(401, cb);
353
  m->_freeOnRemoval = true;
354
  addMiddleware(m);
355
}
356
 
357
void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
358
  if (!client) {
359
    return;
360
  }
361
 
362
  if (_connectcb) {
363
    _connectcb(client);
364
  }
365
 
366
  asyncsrv::lock_guard_type lock(_client_queue_lock);
367
  _clients.emplace_back(client);
368
 
369
  _adjust_inflight_window();
370
}
371
 
372
void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
373
  if (_disconnectcb) {
374
    _disconnectcb(client);
375
  }
376
  asyncsrv::lock_guard_type lock(_client_queue_lock);
377
  for (auto i = _clients.begin(); i != _clients.end(); ++i) {
378
    if (i->get() == client) {
379
      _clients.erase(i);
380
      break;
381
    }
382
  }
383
  _adjust_inflight_window();
384
}
385
 
386
void AsyncEventSource::close() {
387
  // While the whole loop is not done, the linked list is locked and so the
388
  // iterator should remain valid even when AsyncEventSource::_handleDisconnect()
389
  // is called very early
390
  asyncsrv::lock_guard_type lock(_client_queue_lock);
391
  for (const auto &c : _clients) {
392
    if (c->connected()) {
393
      /**
394
       * @brief: Fix self-deadlock by using recursive_mutex instead.
395
       * Due to c->close() shall call the callback function _onDisconnect()
396
       * The calling flow _onDisconnect() --> _handleDisconnect() --> deadlock
397
      */
398
      c->close();
399
    }
400
  }
401
}
402
 
403
// pmb fix
404
size_t AsyncEventSource::avgPacketsWaiting() const {
405
  size_t aql = 0;
406
  uint32_t nConnectedClients = 0;
407
  asyncsrv::lock_guard_type lock(_client_queue_lock);
408
  for (const auto &c : _clients) {
409
    if (c->connected()) {
410
      aql += c->packetsWaiting();
411
      ++nConnectedClients;
412
    }
413
  }
414
  return nConnectedClients == 0 ? 0 : ((aql) + (nConnectedClients / 2)) / (nConnectedClients);  // round up
415
}
416
 
417
AsyncEventSource::SendStatus AsyncEventSource::send(const char *message, const char *event, uint32_t id, uint32_t reconnect) {
418
  AsyncEvent_SharedData_t shared_msg = std::make_shared<String>(generateEventMessage(message, event, id, reconnect));
419
  asyncsrv::lock_guard_type lock(_client_queue_lock);
420
  size_t hits = 0;
421
  size_t miss = 0;
422
  for (const auto &c : _clients) {
423
    if (c->connected()) {
424
      if (c->write(shared_msg)) {
425
        ++hits;
426
      } else {
427
        ++miss;
428
      }
429
    }
430
  }
431
  return hits == 0 ? DISCARDED : (miss == 0 ? ENQUEUED : PARTIALLY_ENQUEUED);
432
}
433
 
434
size_t AsyncEventSource::count() const {
435
  asyncsrv::lock_guard_type lock(_client_queue_lock);
436
  size_t n_clients{0};
437
  for (const auto &i : _clients) {
438
    if (i->connected()) {
439
      ++n_clients;
440
    }
441
  }
442
 
443
  return n_clients;
444
}
445
 
446
bool AsyncEventSource::canHandle(AsyncWebServerRequest *request) const {
447
  return request->isSSE() && request->url().equals(_url);
448
}
449
 
450
void AsyncEventSource::handleRequest(AsyncWebServerRequest *request) {
451
  request->send(new AsyncEventSourceResponse(this));
452
}
453
 
454
// list iteration protected by caller's lock
455
void AsyncEventSource::_adjust_inflight_window() {
456
  const size_t clientCount = count();
457
  if (clientCount) {
458
    size_t inflight = SSE_MAX_INFLIGH / clientCount;
459
    for (const auto &c : _clients) {
460
      if (c->connected()) {
461
        c->set_max_inflight_bytes(inflight);
462
      }
463
    }
464
    // Serial.printf("adjusted inflight to: %u\n", inflight);
465
  }
466
}
467
 
468
/*  Response  */
469
 
470
AsyncEventSourceResponse::AsyncEventSourceResponse(AsyncEventSource *server) : _server(server) {
471
  _code = 200;
472
  _contentType = T_text_event_stream;
473
  _sendContentLength = false;
474
  addHeader(T_Cache_Control, T_no_cache);
475
  addHeader(T_Connection, T_keep_alive);
476
}
477
 
478
void AsyncEventSourceResponse::_respond(AsyncWebServerRequest *request) {
479
  String out;
480
  _assembleHead(out, request->version());
481
  // unbind client's onAck callback from AsyncWebServerRequest's, we will destroy it on next callback and steal the client,
482
  // can't do it now 'cause now we are in AsyncWebServerRequest::_onAck 's stack actually
483
  // here we are loosing time on one RTT delay, but with current design we can't get rid of Req/Resp objects other way
484
  _request = request;
485
  request->client()->onAck(
486
    [](void *r, AsyncClient *c, size_t len, uint32_t time) {
487
      if (len) {
488
        static_cast<AsyncEventSourceResponse *>(r)->_switchClient();
489
      }
490
    },
491
    this
492
  );
493
  request->client()->write(out.c_str(), _headLength);
494
  _state = RESPONSE_WAIT_ACK;
495
}
496
 
497
void AsyncEventSourceResponse::_switchClient() {
498
  // AsyncEventSourceClient c-tor will take the ownership of AsyncTCP's client connection
499
  new AsyncEventSourceClient(_request, _server);
500
  // AsyncEventSourceClient c-tor would also delete _request and *this
501
};