RTU/claude/工程/websocket-server.md

599 lines
19 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WebSocket 服务端深度分析
## 一、概述
Mongoose 的 WebSocket 实现位于 `src/ws.c`~302 行),基于 RFC 6455 规范,同时支持服务端和客户端。本文档聚焦**服务端**的使用和内部实现。
**核心流程**
```
HTTP 请求到达 (Upgrade: websocket)
→ 用户处理器检测到 WebSocket 升级请求
→ 调用 mg_ws_upgrade() 完成握手
→ pfn 切换为 mg_ws_cbWebSocket 协议处理器)
→ 后续消息通过 MG_EV_WS_MSG 事件传递
→ mg_ws_send() 发送帧
```
## 二、数据结构
### 帧操作码Opcode
```c
// 定义在 ws.h
#define WEBSOCKET_OP_CONTINUE 0 // 分片帧的后续帧
#define WEBSOCKET_OP_TEXT 1 // 文本帧UTF-8
#define WEBSOCKET_OP_BINARY 2 // 二进制帧
#define WEBSOCKET_OP_CLOSE 8 // 关闭连接
#define WEBSOCKET_OP_PING 9 // 心跳请求
#define WEBSOCKET_OP_PONG 10 // 心跳响应
```
### WebSocket 消息结构
```c
// 定义在 ws.h 第 12-15 行
struct mg_ws_message {
struct mg_str data; // 消息数据(引用 c->recv 缓冲区,零拷贝)
uint8_t flags; // 帧标志字节FIN + Opcode
};
```
`flags` 字节的高位是 FIN 标志bit 7低 4 位是操作码:
```
flags = 0b1xxx_xxxx → FIN=1最后一帧
flags = 0b0xxx_xxxx → FIN=0还有后续帧
flags & 15 → 操作码0-15
```
### 内部帧解析结构ws.c 第 12-16 行)
```c
struct ws_msg {
uint8_t flags; // 帧标志
size_t header_len; // 帧头长度(含掩码 key
size_t data_len; // 数据长度
};
```
## 三、WebSocket 服务端完整使用流程
### 3.1 最小示例
```c
#include "mongoose.h"
// 统一的 HTTP + WebSocket 事件处理函数
static void fn(struct mg_connection *c, int ev, void *ev_data) {
if (ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
// 判断是否是 WebSocket 升级请求
if (mg_http_get_header(hm, "Sec-WebSocket-Key")) {
mg_ws_upgrade(c, hm, NULL); // 执行握手,切换到 WS 模式
} else {
// 普通 HTTP 请求处理
mg_http_reply(c, 200, "", "hello\n");
}
} else if (ev == MG_EV_WS_MSG) {
// WebSocket 消息到达(握手完成后)
struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
// 判断消息类型
if (wm->flags & WEBSOCKET_OP_TEXT) {
// 文本消息:回显
mg_ws_send(c, wm->data.buf, wm->data.len, WEBSOCKET_OP_TEXT);
} else if (wm->flags & WEBSOCKET_OP_BINARY) {
// 二进制消息处理
}
} else if (ev == MG_EV_CLOSE) {
// 连接关闭(包括 WebSocket 关闭)
}
}
int main() {
struct mg_mgr mgr;
mg_mgr_init(&mgr);
mg_http_listen(&mgr, "http://0.0.0.0:8000", fn, NULL);
for (;;) mg_mgr_poll(&mgr, 1000);
}
```
### 3.2 完整事件流
```
连接建立:
MG_EV_OPEN → 连接创建
HTTP 阶段:
MG_EV_ACCEPT → 连接被接受(可在此时初始化 TLS
MG_EV_READ → 数据到达
MG_EV_HTTP_MSG → HTTP 请求完整接收
→ 用户检测 Sec-WebSocket-Key 头部
→ 调用 mg_ws_upgrade(c, hm, NULL)
WebSocket 握手:
mg_ws_upgrade() 内部:
→ 计算 Sec-WebSocket-Accept
→ 发送 HTTP 101 响应
→ c->pfn = mg_ws_cb
→ c->is_websocket = 1
→ 触发 MG_EV_WS_OPEN
WebSocket 通信阶段:
MG_EV_WS_MSG → 文本/二进制消息到达
MG_EV_WS_CTL → 控制帧到达Ping/Pong/Close
MG_EV_READ → 原始数据到达ws_cb 内部处理)
MG_EV_WRITE → 数据写入完成
连接关闭:
MG_EV_CLOSE → 连接关闭
```
## 四、握手过程详解 (`mg_ws_upgrade`)
### 4.1 函数签名ws.c 第 269 行)
```c
void mg_ws_upgrade(struct mg_connection *c, struct mg_http_message *hm,
const char *fmt, ...);
```
### 4.2 内部实现
```
mg_ws_upgrade():
1. 从 HTTP 头部提取 "Sec-WebSocket-Key"
→ 如果不存在,返回 426 Upgrade Required 错误
2. 可选:提取 "Sec-WebSocket-Protocol"(子协议协商)
3. 调用 ws_handshake() 生成握手响应
4. 设置 c->pfn = mg_ws_cb切换协议处理器
5. 设置 c->is_websocket = 1
6. 设置 c->is_resp = 0标记响应完成
7. 触发 MG_EV_WS_OPEN 事件
```
### 4.3 握手计算 (`ws_handshake`, 第 35 行)
```c
static void ws_handshake(struct mg_connection *c, const struct mg_str *wskey,
const struct mg_str *wsproto, const char *fmt,
va_list *ap) {
const char *magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; // RFC 6455 魔数
unsigned char sha[20], b64_sha[30];
// 1. SHA1(client_key + magic)
mg_sha1_ctx sha_ctx;
mg_sha1_init(&sha_ctx);
mg_sha1_update(&sha_ctx, (unsigned char *) wskey->buf, wskey->len);
mg_sha1_update(&sha_ctx, (unsigned char *) magic, 36);
mg_sha1_final(sha, &sha_ctx);
// 2. Base64 编码 SHA1 结果
mg_base64_encode(sha, sizeof(sha), (char *) b64_sha, sizeof(b64_sha));
// 3. 构建 HTTP 101 响应
mg_xprintf(mg_pfn_iobuf, &c->send,
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n",
b64_sha);
// 4. 添加用户自定义响应头(通过 fmt 参数)
if (fmt != NULL) mg_vxprintf(mg_pfn_iobuf, &c->send, fmt, &ap);
// 5. 可选的子协议响应头
if (wsproto != NULL) {
mg_printf(c, "Sec-WebSocket-Protocol: %.*s\r\n", ...);
}
// 6. 结束响应头
mg_send(c, "\r\n", 2);
}
```
**关键常量**:魔数 `258EAFA5-E914-47DA-95CA-C5AB0DC85B11` 是 RFC 6455 第 4.2.2 节规定的固定值,用于防止跨协议攻击。
### 4.4 握手时的进阶用法
**添加自定义响应头**(如 Cookie、Token 等):
```c
mg_ws_upgrade(c, hm, "Set-Cookie: token=%s\r\nX-User: %s\r\n", token, username);
```
**子协议协商**
```c
struct mg_str *proto = mg_http_get_header(hm, "Sec-WebSocket-Protocol");
// Mongoose 会自动回显匹配的协议,也可手动处理
mg_ws_upgrade(c, hm, NULL);
```
## 五、帧格式详解
### 5.1 RFC 6455 帧结构
```
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
```
### 5.2 Mongoose 的帧头构建 (`mkhdr`, 第 96 行)
```c
static size_t mkhdr(size_t len, int op, bool is_client, uint8_t *buf) {
size_t n = 0;
buf[0] = (uint8_t) (op | 128); // FIN=1, Opcode=op
if (len < 126) { // 7位长度足够
buf[1] = (unsigned char) len;
n = 2;
} else if (len < 65536) { // 16位扩展长度
uint16_t tmp = mg_htons((uint16_t) len);
buf[1] = 126;
memcpy(&buf[2], &tmp, sizeof(tmp));
n = 4;
} else { // 64位扩展长度
buf[1] = 127;
// 先写高32位再写低32位
tmp = mg_htonl((uint32_t) (((uint64_t) len) >> 32));
memcpy(&buf[2], &tmp, sizeof(tmp));
tmp = mg_htonl((uint32_t) (len & 0xffffffffU));
memcpy(&buf[6], &tmp, sizeof(tmp));
n = 10;
}
// 客户端帧需要掩码
if (is_client) {
buf[1] |= 1 << 7; // 设置 MASK 位
mg_random(&buf[n], 4); // 生成随机掩码 key
n += 4;
}
return n;
}
```
**注意****服务端发送的帧不需要掩码**RFC 6455 第 5.1 节。只有客户端发送到服务端的帧才需要掩码。Mongoose 通过 `is_client` 标志来控制。
### 5.3 帧解析 (`ws_process`, 第 66 行)
```c
static size_t ws_process(uint8_t *buf, size_t len, struct ws_msg *msg) {
memset(msg, 0, sizeof(*msg));
if (len >= 2) {
n = buf[1] & 0x7f; // 7位载荷长度
mask_len = buf[1] & 128 ? 4 : 0; // MASK 位 → 掩码 key 长度
msg->flags = buf[0];
if (n < 126 && len >= mask_len) {
msg->data_len = n;
msg->header_len = 2 + mask_len;
} else if (n == 126 && len >= 4 + mask_len) {
msg->header_len = 4 + mask_len;
msg->data_len = (((size_t) buf[2]) << 8) | buf[3]; // 16位长度
} else if (len >= 10 + mask_len) {
msg->header_len = 10 + mask_len;
msg->data_len = be32(buf+2)<<32 | be32(buf+6); // 64位长度
}
}
// 安全检查:数据长度不能超过 1GB
if (msg->data_len > 1024 * 1024 * 1024) return 0;
if (msg->header_len + msg->data_len > len) return 0; // 数据不完整
// 如果有掩码,解码
if (mask_len > 0) {
uint8_t *p = buf + msg->header_len, *m = p - mask_len;
for (i = 0; i < msg->data_len; i++) p[i] ^= m[i & 3]; // XOR 解码
}
return msg->header_len + msg->data_len;
}
```
### 5.4 掩码处理
**为什么需要掩码**RFC 6455 要求客户端发往服务端的所有帧必须掩码。这是为了防止"缓存投毒攻击"——恶意脚本通过浏览器发送精心构造的 WebSocket 帧,可能被中间代理缓存误解为 HTTP 请求。
**解码算法**(异或):
```
for (i = 0; i < data_len; i++)
payload[i] = payload[i] ^ masking_key[i % 4]
```
**发送时的掩码**`mg_ws_mask`, ws.c 第 124 行):
```c
static void mg_ws_mask(struct mg_connection *c, size_t len) {
if (c->is_client && c->send.buf != NULL) {
uint8_t *p = c->send.buf + c->send.len - len, *mask = p - 4;
for (i = 0; i < len; i++) p[i] ^= mask[i & 3];
}
}
```
只在客户端模式(`c->is_client == true`)时对数据执行掩码。服务端不需要。
## 六、协议处理器 `mg_ws_cb` 详解ws.c 第 166 行)
这是 WebSocket 连接的核心处理器,注册为 `c->pfn`,处理所有接收到的帧:
```
mg_ws_cb 处理流程MG_EV_READ 事件时):
1. 客户端模式:检查握手是否完成
→ 未完成:调用 mg_ws_client_handshake() 验证 HTTP 101 响应
→ 完成:继续解析帧
2. 循环解析帧:
while (ws_process() 成功解析一帧) {
3. 根据操作码分类处理:
┌─ WEBSOCKET_OP_CONTINUE (0):
│ 分片帧 → 触发 MG_EV_WS_CTL
├─ WEBSOCKET_OP_TEXT (1) / WEBSOCKET_OP_BINARY (2):
│ 如果 FIN=1 (完整帧):
│ → 触发 MG_EV_WS_MSG用户在此接收消息
│ 如果 FIN=0 (分片开始):
│ → 不触发事件(等待后续帧组装)
├─ WEBSOCKET_OP_CLOSE (8):
│ → 触发 MG_EV_WS_CTL
│ → 回显 CLOSE 帧给对端
│ → 设置 c->is_draining = 1优雅关闭
├─ WEBSOCKET_OP_PING (9):
│ → 自动回复 PONG
│ → 触发 MG_EV_WS_CTL通知用户
├─ WEBSOCKET_OP_PONG (10):
│ → 触发 MG_EV_WS_CTL用户可检测心跳响应
└─ 未知操作码:
→ mg_error() 关闭连接
4. 分片帧处理ws.c 第 215-233 行):
如果 FIN=0 或 op=CONTINUE:
→ 第一条分片帧:保留 1 字节 op 标记
→ 后续帧:剥离帧头,保留数据
→ 所有分片数据累积在 c->recv 中
如果 FIN=1 且 op=CONTINUE:
→ 分片结束,触发 MG_EV_WS_MSG
→ 从 c->recv 中删除完整消息
}
```
### 分片帧的处理细节
Mongoose 支持分片帧的自动组装:
```
客户端发送三条分片帧:
Frame 1: FIN=0, op=TEXT, data="Hello "
Frame 2: FIN=0, op=CONTINUE, data="World"
Frame 3: FIN=1, op=CONTINUE, data="!"
Mongoose 内部处理:
1. 收到 Frame 1:
→ 保留 flags 在 c->recv 中 (buf[0]=0x01 TEXT)
→ 剥离帧头,数据变为 "\x01Hello "
→ ofs 追踪到数据末尾
2. 收到 Frame 2:
→ 剥离帧头,数据追加 → "\x01Hello World"
→ ofs 更新
3. 收到 Frame 3:
→ 剥离帧头,数据追加 → "\x01Hello World!"
→ FIN=1, op=CONTINUE: 触发 MG_EV_WS_MSG
→ m.flags = c->recv.buf[0] (TEXT)
→ m.data = "Hello World!" (跳过第1字节的标记)
→ 删除已处理数据
```
## 七、发送函数详解
### 7.1 `mg_ws_send()` — 发送一条完整消息(第 132 行)
```c
size_t mg_ws_send(struct mg_connection *c, const void *buf, size_t len, int op);
```
流程:
1. 调用 `mkhdr()` 构建帧头
2. 发送帧头(通过 `mg_send` 写入 `c->send` 缓冲区)
3. 发送数据
4. 如果是客户端模式 → 对数据执行掩码
```c
// 使用示例
mg_ws_send(c, "hello", 5, WEBSOCKET_OP_TEXT); // 发送文本
mg_ws_send(c, data, len, WEBSOCKET_OP_BINARY); // 发送二进制
mg_ws_send(c, NULL, 0, WEBSOCKET_OP_PING); // 发送 Ping
```
### 7.2 `mg_ws_printf()` — 格式化发送(第 26 行)
```c
size_t mg_ws_printf(struct mg_connection *c, int op, const char *fmt, ...);
// 使用示例
mg_ws_printf(c, WEBSOCKET_OP_TEXT, "{\"temp\":%.2f,\"unit\":\"%s\"}", 23.5, "C");
```
内部调用 `mg_vxprintf()` 格式化到 `c->send`,然后调用 `mg_ws_wrap()` 添加帧头。
### 7.3 `mg_ws_wrap()` — 为已有数据添加帧头(第 289 行)
```c
size_t mg_ws_wrap(struct mg_connection *c, size_t len, int op);
```
这个是内部函数,用于为已经在 `c->send` 中的数据添加 WebSocket 帧头。适用场景:先写入 JSON 数据到 send 缓冲区,再包装成 WS 帧。
```c
// mg_ws_printf 的内部流程:
c->send.len mg_vxprintf 写入 JSON 数据
mg_ws_wrap 在前面插入帧头
mg_ws_mask 如果是客户端则掩码
```
## 八、服务端完整事件处理最佳实践
### 8.1 标准事件处理模板
```c
static void fn(struct mg_connection *c, int ev, void *ev_data) {
// 1. TLS 初始化(如果需要 WSS
if (ev == MG_EV_ACCEPT) {
struct mg_tls_opts opts = {
.cert = mg_str(s_cert_pem),
.key = mg_str(s_key_pem),
};
mg_tls_init(c, &opts);
}
// 2. WebSocket 握手
if (ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
struct mg_str *ws_key = mg_http_get_header(hm, "Sec-WebSocket-Key");
if (ws_key != NULL) {
// 可以在此做认证
// struct mg_str *token = mg_http_get_header(hm, "Authorization");
mg_ws_upgrade(c, hm, NULL); // 升级到 WebSocket
} else {
mg_http_reply(c, 200, "", "Use WebSocket\n");
}
}
// 3. WebSocket 连接建立
if (ev == MG_EV_WS_OPEN) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
// hm 是发起升级的原始 HTTP 请求,可以获取 URI、Cookie 等
MG_INFO(("WebSocket connected, URI: %.*s", hm->uri.len, hm->uri.buf));
}
// 4. 接收 WebSocket 消息
if (ev == MG_EV_WS_MSG) {
struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
uint8_t op = wm->flags & 15;
if (op == WEBSOCKET_OP_TEXT) {
// 文本消息
MG_INFO(("TEXT: %.*s", wm->data.len, wm->data.buf));
// 回显
mg_ws_send(c, wm->data.buf, wm->data.len, WEBSOCKET_OP_TEXT);
} else if (op == WEBSOCKET_OP_BINARY) {
// 二进制消息
MG_INFO(("BINARY: %zu bytes", wm->data.len));
// 处理二进制数据
}
}
// 5. 控制帧通知
if (ev == MG_EV_WS_CTL) {
struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
uint8_t op = wm->flags & 15;
if (op == WEBSOCKET_OP_PING) {
MG_DEBUG(("Ping received"));
// Ping 已被 Mongoose 自动回复 Pong
} else if (op == WEBSOCKET_OP_PONG) {
MG_DEBUG(("Pong received"));
// 可用于检测客户端存活
} else if (op == WEBSOCKET_OP_CLOSE) {
MG_INFO(("Client sent close"));
}
}
// 6. 连接关闭
if (ev == MG_EV_CLOSE) {
MG_INFO(("WebSocket disconnected"));
}
}
```
### 8.2 广播消息给多个客户端
```c
static void broadcast(struct mg_mgr *mgr, const char *msg, size_t len) {
struct mg_connection *c;
for (c = mgr->conns; c != NULL; c = c->next) {
if (c->is_websocket && !c->is_listening) { // 只发给 WS 客户端
mg_ws_send(c, msg, len, WEBSOCKET_OP_TEXT);
}
}
}
```
### 8.3 心跳检测
```c
// 定时发送 Ping 帧检测客户端是否存活
static void heartbeat_timer(void *arg) {
struct mg_mgr *mgr = (struct mg_mgr *) arg;
struct mg_connection *c;
for (c = mgr->conns; c != NULL; c = c->next) {
if (c->is_websocket && !c->is_listening) {
mg_ws_send(c, NULL, 0, WEBSOCKET_OP_PING);
}
}
}
// 在 main 中添加定时器
mg_timer_add(&mgr, 30000, MG_TIMER_REPEAT, heartbeat_timer, &mgr);
// 在事件处理器中检测 Pong
if (ev == MG_EV_WS_CTL) {
struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
if ((wm->flags & 15) == WEBSOCKET_OP_PONG) {
// 客户端存活确认
}
}
```
## 九、服务端 vs 客户端差异总结
| 特性 | 服务端 | 客户端 |
|------|--------|--------|
| 帧掩码 | **不掩码** | **必须掩码**4 字节随机 key + XOR |
| 握手方式 | `mg_ws_upgrade()` | `mg_ws_connect()` |
| 握手角色 | 接收 `Sec-WebSocket-Key`,计算 `Accept` | 生成随机 `Key`,验证 `Accept` |
| Ping/Pong | 可主动发送 | 可主动发送 |
| 关闭帧 | 收到后回显 | 收到后回显 |
| `is_client` | `false` | `true`(由 `mg_connect` 设置) |
## 十、调试与诊断
### 启用 hex dump
```c
// 在 MG_EV_ACCEPT 或 MG_EV_WS_OPEN 中设置
c->is_hexdumping = 1;
```
这会将 WebSocket 帧的原始字节打印到日志中(通过 `mg_hexdump()`)。
### 常见问题排查
1. **客户端收到 "not http" 错误**:握手 URL 没有使用 `http://``https://` 前缀
2. **握手被拒绝**:检查 `Sec-WebSocket-Key` 是否存在URL 路径是否正确
3. **消息收到乱码**检查客户端是否正确掩码操作码是否正确TEXT=1, BINARY=2
4. **内存泄漏**:确保 `mg_mgr_poll()` 被循环调用,否则连接不会释放
## 十一、MQTT over WebSocket
Mongoose 支持 MQTT over WebSocket。当 MQTT 连接 URL 使用 `mqtt://` 方案且底层升级为 WebSocket 时MQTT 模块会自动使用 WebSocket 帧封装 MQTT 包。这是服务端常用的场景——通过 WebSocket 传输 MQTT 协议。