Subversion Repositories ESP8266_P1_Meter

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 raymond 1
// SPDX-License-Identifier: LGPL-3.0-or-later
2
// Copyright 2016-2026 Hristo Gochkov, Mathieu Carbou, Emil Muratov, Will Miles
3
 
4
#pragma once
5
 
6
#include <Arduino.h>
7
 
8
#if defined(ESP32) || defined(LIBRETINY) || defined(HOST)
9
#include <AsyncTCP.h>
10
#ifdef LIBRETINY
11
#ifdef round
12
#undef round
13
#endif
14
#endif
15
#include <mutex>
16
#ifndef SSE_MAX_QUEUED_MESSAGES
17
#define SSE_MAX_QUEUED_MESSAGES 32
18
#endif
19
#define SSE_MIN_INFLIGH 2 * 1460   // allow 2 MSS packets
20
#define SSE_MAX_INFLIGH 16 * 1024  // but no more than 16k, no need to blow it, since same data is kept in local Q
21
#elif defined(ESP8266)
22
#include <ESPAsyncTCP.h>
23
#ifndef SSE_MAX_QUEUED_MESSAGES
24
#define SSE_MAX_QUEUED_MESSAGES 8
25
#endif
26
#define SSE_MIN_INFLIGH 2 * 1460  // allow 2 MSS packets
27
#define SSE_MAX_INFLIGH 8 * 1024  // but no more than 8k, no need to blow it, since same data is kept in local Q
28
#elif defined(TARGET_RP2040) || defined(TARGET_RP2350) || defined(PICO_RP2040) || defined(PICO_RP2350)
29
#include <RPAsyncTCP.h>
30
#ifndef SSE_MAX_QUEUED_MESSAGES
31
#define SSE_MAX_QUEUED_MESSAGES 32
32
#endif
33
#define SSE_MIN_INFLIGH 2 * 1460   // allow 2 MSS packets
34
#define SSE_MAX_INFLIGH 16 * 1024  // but no more than 16k, no need to blow it, since same data is kept in local Q
35
#endif
36
 
37
#include <ESPAsyncWebServer.h>
38
 
39
#ifdef ESP8266
40
#include <Hash.h>
41
#ifdef CRYPTO_HASH_h  // include Hash.h from espressif framework if the first include was from the crypto library
42
#include <../src/Hash.h>
43
#endif
44
#endif
45
 
46
#include <list>
47
#include <memory>
48
#include <utility>
49
 
50
class AsyncEventSource;
51
class AsyncEventSourceResponse;
52
class AsyncEventSourceClient;
53
using ArEventHandlerFunction = std::function<void(AsyncEventSourceClient *client)>;
54
using ArAuthorizeConnectHandler = ArAuthorizeFunction;
55
// shared message object container
56
using AsyncEvent_SharedData_t = std::shared_ptr<String>;
57
 
58
/**
59
 * @brief Async Event Message container with shared message content data
60
 *
61
 */
62
class AsyncEventSourceMessage {
63
 
64
private:
65
  const AsyncEvent_SharedData_t _data;
66
  size_t _sent{0};   // num of bytes already sent
67
  size_t _acked{0};  // num of bytes acked
68
 
69
public:
70
  AsyncEventSourceMessage(AsyncEvent_SharedData_t data) : _data(data){};
71
#if defined(ESP32)
72
  AsyncEventSourceMessage(const char *data, size_t len) : _data(std::make_shared<String>(data, len)){};
73
#elif defined(TARGET_RP2040) || defined(TARGET_RP2350) || defined(PICO_RP2040) || defined(PICO_RP2350)
74
  AsyncEventSourceMessage(const char *data, size_t len) : _data(std::make_shared<String>()) {
75
    if (data && len > 0) {
76
      _data->concat(data, len);
77
    }
78
  };
79
#else
80
  // esp8266's String does not have constructor with data/length arguments. Use a concat method here
81
  AsyncEventSourceMessage(const char *data, size_t len) {
82
    _data->concat(data, len);
83
  };
84
#endif
85
 
86
  /**
87
     * @brief acknowledge sending len bytes of data
88
     * @note if num of bytes to ack is larger then the unacknowledged message length the number of carried over bytes are returned
89
     *
90
     * @param len bytes to acknowledge
91
     * @param time
92
     * @return size_t number of extra bytes carried over
93
     */
94
  size_t ack(size_t len, uint32_t time = 0);
95
 
96
  /**
97
     * @brief write message data to client's buffer
98
     * @note this method does NOT call client's send
99
     *
100
     * @param client
101
     * @return size_t number of bytes written
102
     */
103
  size_t write(AsyncClient *client);
104
 
105
  /**
106
     * @brief writes message data to client's buffer and calls client's send method
107
     *
108
     * @param client
109
     * @return size_t returns num of bytes the clien was able to send()
110
     */
111
  size_t send(AsyncClient *client);
112
 
113
  // returns true if full message's length were acked
114
  bool finished() {
115
    return _acked == _data->length();
116
  }
117
 
118
  /**
119
     * @brief returns true if all data has been sent already
120
     *
121
     */
122
  bool sent() {
123
    return _sent == _data->length();
124
  }
125
};
126
 
127
/**
128
 * @brief class holds a sse messages queue for a particular client's connection
129
 *
130
 */
131
class AsyncEventSourceClient {
132
private:
133
  AsyncClient *_client;
134
  AsyncEventSource *_server;
135
  uint32_t _lastId{0};
136
  size_t _inflight{0};                    // num of unacknowledged bytes that has been written to socket buffer
137
  size_t _max_inflight{SSE_MAX_INFLIGH};  // max num of unacknowledged bytes that could be written to socket buffer
138
  std::list<AsyncEventSourceMessage> _messageQueue;
139
  mutable asyncsrv::mutex_type _lockmq;
140
  bool _queueMessage(const char *message, size_t len);
141
  bool _queueMessage(AsyncEvent_SharedData_t &&msg);
142
  void _runQueue();
143
 
144
public:
145
  /**
146
   * @brief Construct a new Async Event Source Client object
147
   * @note constructor would take the ownership of of AsyncTCP's client pointer from `request` parameter and call delete on it!
148
   *
149
   * @param request
150
   * @param server
151
   */
152
  AsyncEventSourceClient(AsyncWebServerRequest *request, AsyncEventSource *server);
153
  ~AsyncEventSourceClient();
154
 
155
  /**
156
     * @brief Send an SSE message to client
157
     * it will craft an SSE message and place it to client's message queue
158
     *
159
     * @param message body string, could be single or multi-line string sepprated by \n, \r, \r\n
160
     * @param event body string, a sinle line string
161
     * @param id sequence id
162
     * @param reconnect client's reconnect timeout
163
     * @return true if message was placed in a queue
164
     * @return false if queue is full
165
     */
166
  bool send(const char *message, const char *event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
167
  bool send(const String &message, const String &event, uint32_t id = 0, uint32_t reconnect = 0) {
168
    return send(message.c_str(), event.c_str(), id, reconnect);
169
  }
170
  bool send(const String &message, const char *event, uint32_t id = 0, uint32_t reconnect = 0) {
171
    return send(message.c_str(), event, id, reconnect);
172
  }
173
 
174
  /**
175
     * @brief place supplied preformatted SSE message to the message queue
176
     * @note message must a properly formatted SSE string according to https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
177
     *
178
     * @param message data
179
     * @return true on success
180
     * @return false on queue overflow or no client connected
181
     */
182
  bool write(AsyncEvent_SharedData_t message) {
183
    return connected() && _queueMessage(std::move(message));
184
  };
185
 
186
  [[deprecated("Use _write(AsyncEvent_SharedData_t message) instead to share same data with multiple SSE clients")]]
187
  bool write(const char *message, size_t len) {
188
    return connected() && _queueMessage(message, len);
189
  };
190
 
191
  // close client's connection
192
  void close();
193
 
194
  // getters
195
 
196
  AsyncClient *client() {
197
    return _client;
198
  }
199
  bool connected() const {
200
    return _client && _client->connected();
201
  }
202
  uint32_t lastId() const {
203
    return _lastId;
204
  }
205
  size_t packetsWaiting() const {
206
    asyncsrv::lock_guard_type lock(_lockmq);
207
    return _messageQueue.size();
208
  };
209
 
210
  /**
211
     * @brief Sets max amount of bytes that could be written to client's socket while awaiting delivery acknowledge
212
     * used to throttle message delivery length to tradeoff memory consumption
213
     * @note actual amount of data written could possible be a bit larger but no more than available socket buff space
214
     *
215
     * @param value
216
     */
217
  void set_max_inflight_bytes(size_t value);
218
 
219
  /**
220
     * @brief Get current max inflight bytes value
221
     *
222
     * @return size_t
223
     */
224
  size_t get_max_inflight_bytes() const {
225
    return _max_inflight;
226
  }
227
 
228
  // system callbacks (do not call if from user code!)
229
  void _onAck(size_t len, uint32_t time);
230
  void _onPoll();
231
  void _onTimeout(uint32_t time);
232
  void _onDisconnect();
233
};
234
 
235
/**
236
 * @brief a class that maintains all connected HTTP clients subscribed to SSE delivery
237
 * dispatches supplied messages to the client's queues
238
 *
239
 */
240
class AsyncEventSource : public AsyncWebHandler {
241
private:
242
  String _url;
243
  std::list<std::unique_ptr<AsyncEventSourceClient>> _clients;
244
  // Same as for individual messages, protect mutations of _clients list
245
  // since simultaneous access from different tasks is possible
246
  mutable asyncsrv::mutex_type _client_queue_lock;
247
  ArEventHandlerFunction _connectcb = nullptr;
248
  ArEventHandlerFunction _disconnectcb = nullptr;
249
 
250
  // this method manipulates in-fligh data size for connected client depending on number of active connections
251
  void _adjust_inflight_window();
252
 
253
public:
254
  typedef enum {
255
    DISCARDED = 0,
256
    ENQUEUED = 1,
257
    PARTIALLY_ENQUEUED = 2,
258
  } SendStatus;
259
 
260
  AsyncEventSource(const char *url) : _url(url){};
261
  AsyncEventSource(const String &url) : _url(url){};
262
  ~AsyncEventSource() {
263
    close();
264
  };
265
 
266
  const char *url() const {
267
    return _url.c_str();
268
  }
269
  // close all connected clients
270
  void close();
271
 
272
  /**
273
     * @brief set on-connect callback for the client
274
     * used to deliver messages to client on first connect
275
     *
276
     * @param cb
277
     */
278
  void onConnect(ArEventHandlerFunction cb) {
279
    _connectcb = cb;
280
  }
281
 
282
  /**
283
     * @brief Send an SSE message to client
284
     * it will craft an SSE message and place it to all connected client's message queues
285
     *
286
     * @param message body string, could be single or multi-line string sepprated by \n, \r, \r\n
287
     * @param event body string, a sinle line string
288
     * @param id sequence id
289
     * @param reconnect client's reconnect timeout
290
     * @return SendStatus if message was placed in any/all/part of the client's queues
291
     */
292
  SendStatus send(const char *message, const char *event = NULL, uint32_t id = 0, uint32_t reconnect = 0);
293
  SendStatus send(const String &message, const String &event, uint32_t id = 0, uint32_t reconnect = 0) {
294
    return send(message.c_str(), event.c_str(), id, reconnect);
295
  }
296
  SendStatus send(const String &message, const char *event, uint32_t id = 0, uint32_t reconnect = 0) {
297
    return send(message.c_str(), event, id, reconnect);
298
  }
299
 
300
  // The client pointer sent to the callback is only for reference purposes. DO NOT CALL ANY METHOD ON IT !
301
  void onDisconnect(ArEventHandlerFunction cb) {
302
    _disconnectcb = cb;
303
  }
304
  void authorizeConnect(ArAuthorizeConnectHandler cb);
305
 
306
  // returns number of connected clients
307
  size_t count() const;
308
 
309
  // returns average number of messages pending in all client's queues
310
  size_t avgPacketsWaiting() const;
311
 
312
  // system callbacks (do not call from user code!)
313
  void _addClient(AsyncEventSourceClient *client);
314
  void _handleDisconnect(AsyncEventSourceClient *client);
315
  bool canHandle(AsyncWebServerRequest *request) const final;
316
  void handleRequest(AsyncWebServerRequest *request) final;
317
};
318
 
319
class AsyncEventSourceResponse : public AsyncWebServerResponse {
320
private:
321
  AsyncEventSource *_server;
322
  AsyncWebServerRequest *_request;
323
  // this call back will switch AsyncTCP client to SSE
324
  void _switchClient();
325
 
326
public:
327
  AsyncEventSourceResponse(AsyncEventSource *server);
328
  void _respond(AsyncWebServerRequest *request) override;
329
  size_t _ack(AsyncWebServerRequest *request, size_t len, uint32_t time) override {
330
    return 0;
331
  };
332
  bool _sourceValid() const override {
333
    return true;
334
  }
335
};