切换主题
集群部署
服务支持通过 MQ 实现跨实例的广播与定向推送,适用于水平扩展场景。
开启集群
- 配置
websocket.cluster-enabled: true - 选择 MQ:Kafka 或 RabbitMQ,并完成对应配置。
配置示例
Kafka 示例(application.yaml):
yaml
websocket:
cluster-enabled: true
slow-client-strategy: drop
heartbeat-content: "ping"
mq:
type: kafka
# Kafka配置
kafka:
brokers: ["192.168.3.181:9092"] # Kafka broker地址列表
topic: "websocket" # 服务端广播主题
client-topic: "websocket-client" # 客户端消息主题(上行消息)
group-id: "websocket-consumer" # 消费者组ID(非广播时所有实例共享)
broadcast: true # 是否广播:true 时每个实例使用唯一消费者组ID
# 生产者配置
producer:
acks: "all" # 确认机制: all, 1, 0
retries: 3 # 重试次数
batch-size: 16384 # 批次大小
linger-ms: 5 # 等待时间(毫秒)
buffer-memory: 33554432 # 缓冲区大小(字节)
# 消费者配置
consumer:
auto-commit: true # 自动提交
auto-offset-reset: "earliest" # 偏移量重置策略: earliest, latest
session-timeout-ms: 30000 # 会话超时时间(毫秒)
heartbeat-interval-ms: 3000 # 心跳间隔(毫秒)RabbitMQ 示例(application.yaml):
yaml
websocket:
cluster-enabled: true
slow-client-strategy: block
heartbeat-content: "ping"
mq:
type: rabbitmq
# RabbitMQ配置
rabbitmq:
url: "amqp://wueasy:1QAZ2wsx@192.168.3.181:5672/" # RabbitMQ连接URL
exchange: "websocket" # 交换机名称(建议 direct 分流广播与客户端)
routing-key: "ws.broadcast" # 广播路由键(WebSocket 集群消费绑定此键)
client-routing-key: "ws.client" # 客户端消息路由键(其他服务队列绑定此键)
queue: "" # 队列名称,留空将生成安全自定义队列(避免 amq.* 保留前缀),用于区分实例,确保每实例独立消费
# 交换机配置
exchange-type: "direct" # 交换机类型: direct, fanout, topic(建议 direct:使用不同路由键实现分流)
durable: true # 持久化
auto-delete: false # 自动删除
# 队列配置
queue-durable: true # 队列持久化
queue-auto-delete: false # 队列自动删除
# 消息配置
delivery-mode: 1 # 消息持久化: 1(非持久), 2(持久)
priority: 0 # 消息优先级
expiration: "" # 消息过期时间Kafka 配置要点
brokers:集群地址topic:WebSocket 消息主题(下行)clientTopic:客户端上行消息主题(可选)groupId:消费组;开启broadcast=true时,各实例使用唯一组ID实现广播consumer:autoCommit、offsetReset、sessionTimeout、heartbeatInterval
建议:
- 分区键:根据
targetType使用userId或channel作为 key,以提升同键消息的有序性。 - 广播到所有实例:
groupId需保证每实例唯一(不同groupId),避免同组“仅一次消费”的语义。 - 顺序保证:Kafka 仅对同分区内消息保证顺序;跨分区不保证。
RabbitMQ 配置要点
url:连接地址exchange:交换机(下行消息)routingKey:下行消息路由键(all/user/channel 建议同一键,服务端区分)clientRoutingKey:上行消息路由键(可选)queue:消费队列,声明并绑定到交换机+路由键exchangeType:建议direct- 其他:
durable、autoDelete、deliveryMode、priority、expiration
建议:
- 路由策略:统一使用一个
routingKey(如ws.downlink),在载荷中通过targetType区分,简化拓扑。 - 有序性:RabbitMQ 默认不保证跨队列/路由键的全局顺序;必要时使用单一队列+单消费者并发控制。
运行机制
- 发布:当
cluster-enabled=true时,/publish将消息写入 MQ。 - 消费:各实例
MQConsumer订阅并根据targetType分发到 Hub:all->Hub.BroadcastAlluser->Hub.BroadcastToUserchannel->Hub.BroadcastToChanneltemp_user->Hub.BroadcastToTempUser(仅临时链接接收)
消息载荷格式(下行建议)
{
"targetType": "all" | "user" | "channel" | "temp_user",
"target": "userId 或 channelId(all 时可为空)",
"data": "消息内容或对象",
"headers": {"contentType":"text/plain","traceId":"..."},
"messageId": "唯一ID(用于幂等去重)",
"timestamp": 1730800000000
}有序性与幂等性
- 有序性:仅在“同分区/同队列、同键”的维度可期;跨键/跨分区不保证。
- 幂等性:建议提供
messageId,在消费者/Hub 层可选去重,避免重复投递导致副作用。 - 重复可能:网络重试、消费再平衡等场景会产生重复消息,业务需容忍或做幂等处理。
心跳与慢客户端
heartbeat-content:服务端定期发送心跳。slow-client-strategy:drop或block;决定处理发送队列满的策略。
扩展:
- 背压:当 Hub 发现大量慢客户端,建议降低消费速率(Kafka 降并发/RabbitMQ 降 prefetch),避免堆积。
- 队列大小:根据峰值流量与客户端能力调整
send缓冲大小与丢弃策略(drop更适用于大规模广播)。 - 临时链接:
temp_user消息仅投递至临时链接,不影响正常链接的吞吐与背压判断。