RTU/claude/工程/websocket-server.md

19 KiB
Raw Blame History

WebSocket 服务端深度分析

一、概述

Mongoose 的 WebSocket 实现位于 src/ws.c~302 行),基于 RFC 6455 规范,同时支持服务端和客户端。本文档聚焦服务端的使用和内部实现。

核心流程

HTTP 请求到达 (Upgrade: websocket)
  → 用户处理器检测到 WebSocket 升级请求
  → 调用 mg_ws_upgrade() 完成握手
  → pfn 切换为 mg_ws_cbWebSocket 协议处理器)
  → 后续消息通过 MG_EV_WS_MSG 事件传递
  → mg_ws_send() 发送帧

二、数据结构

帧操作码Opcode

// 定义在 ws.h
#define WEBSOCKET_OP_CONTINUE 0   // 分片帧的后续帧
#define WEBSOCKET_OP_TEXT     1   // 文本帧UTF-8
#define WEBSOCKET_OP_BINARY   2   // 二进制帧
#define WEBSOCKET_OP_CLOSE    8   // 关闭连接
#define WEBSOCKET_OP_PING     9   // 心跳请求
#define WEBSOCKET_OP_PONG     10  // 心跳响应

WebSocket 消息结构

// 定义在 ws.h 第 12-15 行
struct mg_ws_message {
  struct mg_str data;   // 消息数据(引用 c->recv 缓冲区,零拷贝)
  uint8_t flags;        // 帧标志字节FIN + Opcode
};

flags 字节的高位是 FIN 标志bit 7低 4 位是操作码:

flags = 0b1xxx_xxxx  → FIN=1最后一帧
flags = 0b0xxx_xxxx  → FIN=0还有后续帧
flags & 15            → 操作码0-15

内部帧解析结构ws.c 第 12-16 行)

struct ws_msg {
  uint8_t flags;       // 帧标志
  size_t header_len;   // 帧头长度(含掩码 key
  size_t data_len;     // 数据长度
};

三、WebSocket 服务端完整使用流程

3.1 最小示例

#include "mongoose.h"

// 统一的 HTTP + WebSocket 事件处理函数
static void fn(struct mg_connection *c, int ev, void *ev_data) {
  if (ev == MG_EV_HTTP_MSG) {
    struct mg_http_message *hm = (struct mg_http_message *) ev_data;
    // 判断是否是 WebSocket 升级请求
    if (mg_http_get_header(hm, "Sec-WebSocket-Key")) {
      mg_ws_upgrade(c, hm, NULL);  // 执行握手,切换到 WS 模式
    } else {
      // 普通 HTTP 请求处理
      mg_http_reply(c, 200, "", "hello\n");
    }
  } else if (ev == MG_EV_WS_MSG) {
    // WebSocket 消息到达(握手完成后)
    struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
    // 判断消息类型
    if (wm->flags & WEBSOCKET_OP_TEXT) {
      // 文本消息:回显
      mg_ws_send(c, wm->data.buf, wm->data.len, WEBSOCKET_OP_TEXT);
    } else if (wm->flags & WEBSOCKET_OP_BINARY) {
      // 二进制消息处理
    }
  } else if (ev == MG_EV_CLOSE) {
    // 连接关闭(包括 WebSocket 关闭)
  }
}

int main() {
  struct mg_mgr mgr;
  mg_mgr_init(&mgr);
  mg_http_listen(&mgr, "http://0.0.0.0:8000", fn, NULL);
  for (;;) mg_mgr_poll(&mgr, 1000);
}

3.2 完整事件流

连接建立:
  MG_EV_OPEN     → 连接创建

HTTP 阶段:
  MG_EV_ACCEPT   → 连接被接受(可在此时初始化 TLS
  MG_EV_READ     → 数据到达
  MG_EV_HTTP_MSG → HTTP 请求完整接收
    → 用户检测 Sec-WebSocket-Key 头部
    → 调用 mg_ws_upgrade(c, hm, NULL)

WebSocket 握手:
  mg_ws_upgrade() 内部:
    → 计算 Sec-WebSocket-Accept
    → 发送 HTTP 101 响应
    → c->pfn = mg_ws_cb
    → c->is_websocket = 1
    → 触发 MG_EV_WS_OPEN

WebSocket 通信阶段:
  MG_EV_WS_MSG   → 文本/二进制消息到达
  MG_EV_WS_CTL   → 控制帧到达Ping/Pong/Close
  MG_EV_READ     → 原始数据到达ws_cb 内部处理)
  MG_EV_WRITE    → 数据写入完成

连接关闭:
  MG_EV_CLOSE    → 连接关闭

四、握手过程详解 (mg_ws_upgrade)

4.1 函数签名ws.c 第 269 行)

void mg_ws_upgrade(struct mg_connection *c, struct mg_http_message *hm,
                   const char *fmt, ...);

4.2 内部实现

mg_ws_upgrade():
  1. 从 HTTP 头部提取 "Sec-WebSocket-Key"
     → 如果不存在,返回 426 Upgrade Required 错误
  2. 可选:提取 "Sec-WebSocket-Protocol"(子协议协商)
  3. 调用 ws_handshake() 生成握手响应
  4. 设置 c->pfn = mg_ws_cb切换协议处理器
  5. 设置 c->is_websocket = 1
  6. 设置 c->is_resp = 0标记响应完成
  7. 触发 MG_EV_WS_OPEN 事件

4.3 握手计算 (ws_handshake, 第 35 行)

static void ws_handshake(struct mg_connection *c, const struct mg_str *wskey,
                         const struct mg_str *wsproto, const char *fmt,
                         va_list *ap) {
  const char *magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";  // RFC 6455 魔数
  unsigned char sha[20], b64_sha[30];

  // 1. SHA1(client_key + magic)
  mg_sha1_ctx sha_ctx;
  mg_sha1_init(&sha_ctx);
  mg_sha1_update(&sha_ctx, (unsigned char *) wskey->buf, wskey->len);
  mg_sha1_update(&sha_ctx, (unsigned char *) magic, 36);
  mg_sha1_final(sha, &sha_ctx);

  // 2. Base64 编码 SHA1 结果
  mg_base64_encode(sha, sizeof(sha), (char *) b64_sha, sizeof(b64_sha));

  // 3. 构建 HTTP 101 响应
  mg_xprintf(mg_pfn_iobuf, &c->send,
             "HTTP/1.1 101 Switching Protocols\r\n"
             "Upgrade: websocket\r\n"
             "Connection: Upgrade\r\n"
             "Sec-WebSocket-Accept: %s\r\n",
             b64_sha);

  // 4. 添加用户自定义响应头(通过 fmt 参数)
  if (fmt != NULL) mg_vxprintf(mg_pfn_iobuf, &c->send, fmt, &ap);

  // 5. 可选的子协议响应头
  if (wsproto != NULL) {
    mg_printf(c, "Sec-WebSocket-Protocol: %.*s\r\n", ...);
  }

  // 6. 结束响应头
  mg_send(c, "\r\n", 2);
}

关键常量:魔数 258EAFA5-E914-47DA-95CA-C5AB0DC85B11 是 RFC 6455 第 4.2.2 节规定的固定值,用于防止跨协议攻击。

4.4 握手时的进阶用法

添加自定义响应头(如 Cookie、Token 等):

mg_ws_upgrade(c, hm, "Set-Cookie: token=%s\r\nX-User: %s\r\n", token, username);

子协议协商

struct mg_str *proto = mg_http_get_header(hm, "Sec-WebSocket-Protocol");
// Mongoose 会自动回显匹配的协议,也可手动处理
mg_ws_upgrade(c, hm, NULL);

五、帧格式详解

5.1 RFC 6455 帧结构

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

5.2 Mongoose 的帧头构建 (mkhdr, 第 96 行)

static size_t mkhdr(size_t len, int op, bool is_client, uint8_t *buf) {
  size_t n = 0;
  buf[0] = (uint8_t) (op | 128);      // FIN=1, Opcode=op
  if (len < 126) {                     // 7位长度足够
    buf[1] = (unsigned char) len;
    n = 2;
  } else if (len < 65536) {            // 16位扩展长度
    uint16_t tmp = mg_htons((uint16_t) len);
    buf[1] = 126;
    memcpy(&buf[2], &tmp, sizeof(tmp));
    n = 4;
  } else {                             // 64位扩展长度
    buf[1] = 127;
    // 先写高32位再写低32位
    tmp = mg_htonl((uint32_t) (((uint64_t) len) >> 32));
    memcpy(&buf[2], &tmp, sizeof(tmp));
    tmp = mg_htonl((uint32_t) (len & 0xffffffffU));
    memcpy(&buf[6], &tmp, sizeof(tmp));
    n = 10;
  }
  // 客户端帧需要掩码
  if (is_client) {
    buf[1] |= 1 << 7;               // 设置 MASK 位
    mg_random(&buf[n], 4);           // 生成随机掩码 key
    n += 4;
  }
  return n;
}

注意服务端发送的帧不需要掩码RFC 6455 第 5.1 节。只有客户端发送到服务端的帧才需要掩码。Mongoose 通过 is_client 标志来控制。

5.3 帧解析 (ws_process, 第 66 行)

static size_t ws_process(uint8_t *buf, size_t len, struct ws_msg *msg) {
  memset(msg, 0, sizeof(*msg));
  if (len >= 2) {
    n = buf[1] & 0x7f;                // 7位载荷长度
    mask_len = buf[1] & 128 ? 4 : 0;  // MASK 位 → 掩码 key 长度
    msg->flags = buf[0];

    if (n < 126 && len >= mask_len) {
      msg->data_len = n;
      msg->header_len = 2 + mask_len;
    } else if (n == 126 && len >= 4 + mask_len) {
      msg->header_len = 4 + mask_len;
      msg->data_len = (((size_t) buf[2]) << 8) | buf[3];  // 16位长度
    } else if (len >= 10 + mask_len) {
      msg->header_len = 10 + mask_len;
      msg->data_len = be32(buf+2)<<32 | be32(buf+6);       // 64位长度
    }
  }
  // 安全检查:数据长度不能超过 1GB
  if (msg->data_len > 1024 * 1024 * 1024) return 0;
  if (msg->header_len + msg->data_len > len) return 0;  // 数据不完整

  // 如果有掩码,解码
  if (mask_len > 0) {
    uint8_t *p = buf + msg->header_len, *m = p - mask_len;
    for (i = 0; i < msg->data_len; i++) p[i] ^= m[i & 3];  // XOR 解码
  }
  return msg->header_len + msg->data_len;
}

5.4 掩码处理

为什么需要掩码RFC 6455 要求客户端发往服务端的所有帧必须掩码。这是为了防止"缓存投毒攻击"——恶意脚本通过浏览器发送精心构造的 WebSocket 帧,可能被中间代理缓存误解为 HTTP 请求。

解码算法(异或):

for (i = 0; i < data_len; i++)
  payload[i] = payload[i] ^ masking_key[i % 4]

发送时的掩码mg_ws_mask, ws.c 第 124 行):

static void mg_ws_mask(struct mg_connection *c, size_t len) {
  if (c->is_client && c->send.buf != NULL) {
    uint8_t *p = c->send.buf + c->send.len - len, *mask = p - 4;
    for (i = 0; i < len; i++) p[i] ^= mask[i & 3];
  }
}

只在客户端模式(c->is_client == true)时对数据执行掩码。服务端不需要。

六、协议处理器 mg_ws_cb 详解ws.c 第 166 行)

这是 WebSocket 连接的核心处理器,注册为 c->pfn,处理所有接收到的帧:

mg_ws_cb 处理流程MG_EV_READ 事件时):

  1. 客户端模式:检查握手是否完成
     → 未完成:调用 mg_ws_client_handshake() 验证 HTTP 101 响应
     → 完成:继续解析帧

  2. 循环解析帧:
     while (ws_process() 成功解析一帧) {

       3. 根据操作码分类处理:

          ┌─ WEBSOCKET_OP_CONTINUE (0):
          │    分片帧 → 触发 MG_EV_WS_CTL
          │
          ├─ WEBSOCKET_OP_TEXT (1) / WEBSOCKET_OP_BINARY (2):
          │    如果 FIN=1 (完整帧):
          │      → 触发 MG_EV_WS_MSG用户在此接收消息
          │    如果 FIN=0 (分片开始):
          │      → 不触发事件(等待后续帧组装)
          │
          ├─ WEBSOCKET_OP_CLOSE (8):
          │    → 触发 MG_EV_WS_CTL
          │    → 回显 CLOSE 帧给对端
          │    → 设置 c->is_draining = 1优雅关闭
          │
          ├─ WEBSOCKET_OP_PING (9):
          │    → 自动回复 PONG
          │    → 触发 MG_EV_WS_CTL通知用户
          │
          ├─ WEBSOCKET_OP_PONG (10):
          │    → 触发 MG_EV_WS_CTL用户可检测心跳响应
          │
          └─ 未知操作码:
               → mg_error() 关闭连接

       4. 分片帧处理ws.c 第 215-233 行):
          如果 FIN=0 或 op=CONTINUE:
            → 第一条分片帧:保留 1 字节 op 标记
            → 后续帧:剥离帧头,保留数据
            → 所有分片数据累积在 c->recv 中
          如果 FIN=1 且 op=CONTINUE:
            → 分片结束,触发 MG_EV_WS_MSG
            → 从 c->recv 中删除完整消息
     }

分片帧的处理细节

Mongoose 支持分片帧的自动组装:

客户端发送三条分片帧:
  Frame 1: FIN=0, op=TEXT,    data="Hello "
  Frame 2: FIN=0, op=CONTINUE, data="World"
  Frame 3: FIN=1, op=CONTINUE, data="!"

Mongoose 内部处理:
  1. 收到 Frame 1: 
     → 保留 flags 在 c->recv 中 (buf[0]=0x01 TEXT)
     → 剥离帧头,数据变为 "\x01Hello "
     → ofs 追踪到数据末尾
  2. 收到 Frame 2:
     → 剥离帧头,数据追加 → "\x01Hello World"
     → ofs 更新
  3. 收到 Frame 3:
     → 剥离帧头,数据追加 → "\x01Hello World!"
     → FIN=1, op=CONTINUE: 触发 MG_EV_WS_MSG
     → m.flags = c->recv.buf[0] (TEXT)
     → m.data = "Hello World!" (跳过第1字节的标记)
     → 删除已处理数据

七、发送函数详解

7.1 mg_ws_send() — 发送一条完整消息(第 132 行)

size_t mg_ws_send(struct mg_connection *c, const void *buf, size_t len, int op);

流程:

  1. 调用 mkhdr() 构建帧头
  2. 发送帧头(通过 mg_send 写入 c->send 缓冲区)
  3. 发送数据
  4. 如果是客户端模式 → 对数据执行掩码
// 使用示例
mg_ws_send(c, "hello", 5, WEBSOCKET_OP_TEXT);       // 发送文本
mg_ws_send(c, data, len, WEBSOCKET_OP_BINARY);       // 发送二进制
mg_ws_send(c, NULL, 0, WEBSOCKET_OP_PING);           // 发送 Ping

7.2 mg_ws_printf() — 格式化发送(第 26 行)

size_t mg_ws_printf(struct mg_connection *c, int op, const char *fmt, ...);

// 使用示例
mg_ws_printf(c, WEBSOCKET_OP_TEXT, "{\"temp\":%.2f,\"unit\":\"%s\"}", 23.5, "C");

内部调用 mg_vxprintf() 格式化到 c->send,然后调用 mg_ws_wrap() 添加帧头。

7.3 mg_ws_wrap() — 为已有数据添加帧头(第 289 行)

size_t mg_ws_wrap(struct mg_connection *c, size_t len, int op);

这个是内部函数,用于为已经在 c->send 中的数据添加 WebSocket 帧头。适用场景:先写入 JSON 数据到 send 缓冲区,再包装成 WS 帧。

// mg_ws_printf 的内部流程:
c->send.len   mg_vxprintf 写入 JSON 数据
               mg_ws_wrap 在前面插入帧头
               mg_ws_mask 如果是客户端则掩码

八、服务端完整事件处理最佳实践

8.1 标准事件处理模板

static void fn(struct mg_connection *c, int ev, void *ev_data) {
  // 1. TLS 初始化(如果需要 WSS
  if (ev == MG_EV_ACCEPT) {
    struct mg_tls_opts opts = {
      .cert = mg_str(s_cert_pem),
      .key  = mg_str(s_key_pem),
    };
    mg_tls_init(c, &opts);
  }

  // 2. WebSocket 握手
  if (ev == MG_EV_HTTP_MSG) {
    struct mg_http_message *hm = (struct mg_http_message *) ev_data;
    struct mg_str *ws_key = mg_http_get_header(hm, "Sec-WebSocket-Key");
    if (ws_key != NULL) {
      // 可以在此做认证
      // struct mg_str *token = mg_http_get_header(hm, "Authorization");
      mg_ws_upgrade(c, hm, NULL);  // 升级到 WebSocket
    } else {
      mg_http_reply(c, 200, "", "Use WebSocket\n");
    }
  }

  // 3. WebSocket 连接建立
  if (ev == MG_EV_WS_OPEN) {
    struct mg_http_message *hm = (struct mg_http_message *) ev_data;
    // hm 是发起升级的原始 HTTP 请求,可以获取 URI、Cookie 等
    MG_INFO(("WebSocket connected, URI: %.*s", hm->uri.len, hm->uri.buf));
  }

  // 4. 接收 WebSocket 消息
  if (ev == MG_EV_WS_MSG) {
    struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
    uint8_t op = wm->flags & 15;
    if (op == WEBSOCKET_OP_TEXT) {
      // 文本消息
      MG_INFO(("TEXT: %.*s", wm->data.len, wm->data.buf));
      // 回显
      mg_ws_send(c, wm->data.buf, wm->data.len, WEBSOCKET_OP_TEXT);
    } else if (op == WEBSOCKET_OP_BINARY) {
      // 二进制消息
      MG_INFO(("BINARY: %zu bytes", wm->data.len));
      // 处理二进制数据
    }
  }

  // 5. 控制帧通知
  if (ev == MG_EV_WS_CTL) {
    struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
    uint8_t op = wm->flags & 15;
    if (op == WEBSOCKET_OP_PING) {
      MG_DEBUG(("Ping received"));
      // Ping 已被 Mongoose 自动回复 Pong
    } else if (op == WEBSOCKET_OP_PONG) {
      MG_DEBUG(("Pong received"));
      // 可用于检测客户端存活
    } else if (op == WEBSOCKET_OP_CLOSE) {
      MG_INFO(("Client sent close"));
    }
  }

  // 6. 连接关闭
  if (ev == MG_EV_CLOSE) {
    MG_INFO(("WebSocket disconnected"));
  }
}

8.2 广播消息给多个客户端

static void broadcast(struct mg_mgr *mgr, const char *msg, size_t len) {
  struct mg_connection *c;
  for (c = mgr->conns; c != NULL; c = c->next) {
    if (c->is_websocket && !c->is_listening) {  // 只发给 WS 客户端
      mg_ws_send(c, msg, len, WEBSOCKET_OP_TEXT);
    }
  }
}

8.3 心跳检测

// 定时发送 Ping 帧检测客户端是否存活
static void heartbeat_timer(void *arg) {
  struct mg_mgr *mgr = (struct mg_mgr *) arg;
  struct mg_connection *c;
  for (c = mgr->conns; c != NULL; c = c->next) {
    if (c->is_websocket && !c->is_listening) {
      mg_ws_send(c, NULL, 0, WEBSOCKET_OP_PING);
    }
  }
}

// 在 main 中添加定时器
mg_timer_add(&mgr, 30000, MG_TIMER_REPEAT, heartbeat_timer, &mgr);

// 在事件处理器中检测 Pong
if (ev == MG_EV_WS_CTL) {
  struct mg_ws_message *wm = (struct mg_ws_message *) ev_data;
  if ((wm->flags & 15) == WEBSOCKET_OP_PONG) {
    // 客户端存活确认
  }
}

九、服务端 vs 客户端差异总结

特性 服务端 客户端
帧掩码 不掩码 必须掩码4 字节随机 key + XOR
握手方式 mg_ws_upgrade() mg_ws_connect()
握手角色 接收 Sec-WebSocket-Key,计算 Accept 生成随机 Key,验证 Accept
Ping/Pong 可主动发送 可主动发送
关闭帧 收到后回显 收到后回显
is_client false true(由 mg_connect 设置)

十、调试与诊断

启用 hex dump

// 在 MG_EV_ACCEPT 或 MG_EV_WS_OPEN 中设置
c->is_hexdumping = 1;

这会将 WebSocket 帧的原始字节打印到日志中(通过 mg_hexdump())。

常见问题排查

  1. 客户端收到 "not http" 错误:握手 URL 没有使用 http://https:// 前缀
  2. 握手被拒绝:检查 Sec-WebSocket-Key 是否存在URL 路径是否正确
  3. 消息收到乱码检查客户端是否正确掩码操作码是否正确TEXT=1, BINARY=2
  4. 内存泄漏:确保 mg_mgr_poll() 被循环调用,否则连接不会释放

十一、MQTT over WebSocket

Mongoose 支持 MQTT over WebSocket。当 MQTT 连接 URL 使用 mqtt:// 方案且底层升级为 WebSocket 时MQTT 模块会自动使用 WebSocket 帧封装 MQTT 包。这是服务端常用的场景——通过 WebSocket 传输 MQTT 协议。