切换主题
集群部署
服务支持通过 MQ 实现跨实例的广播与定向推送,适用于水平扩展场景。
开启集群
- 配置
websocket.cluster-enabled: true
- 选择 MQ:Kafka 或 RabbitMQ,并完成对应配置。
配置示例
Kafka 示例(application.yaml):
yaml
websocket:
cluster-enabled: true
slow-client-strategy: drop
heartbeat-content: "ping"
mq:
type: kafka
# Kafka配置
kafka:
brokers: ["192.168.3.181:9092"] # Kafka broker地址列表
topic: "websocket" # 服务端广播主题
client-topic: "websocket-client" # 客户端消息主题(上行消息)
group-id: "websocket-consumer" # 消费者组ID(非广播时所有实例共享)
broadcast: true # 是否广播:true 时每个实例使用唯一消费者组ID
# 生产者配置
producer:
acks: "all" # 确认机制: all, 1, 0
retries: 3 # 重试次数
batch-size: 16384 # 批次大小
linger-ms: 5 # 等待时间(毫秒)
buffer-memory: 33554432 # 缓冲区大小(字节)
# 消费者配置
consumer:
auto-commit: true # 自动提交
auto-offset-reset: "earliest" # 偏移量重置策略: earliest, latest
session-timeout-ms: 30000 # 会话超时时间(毫秒)
heartbeat-interval-ms: 3000 # 心跳间隔(毫秒)
RabbitMQ 示例(application.yaml):
yaml
websocket:
cluster-enabled: true
slow-client-strategy: block
heartbeat-content: "ping"
mq:
type: rabbitmq
# RabbitMQ配置
rabbitmq:
url: "amqp://wueasy:1QAZ2wsx@192.168.3.181:5672/" # RabbitMQ连接URL
exchange: "websocket" # 交换机名称(建议 direct 分流广播与客户端)
routing-key: "ws.broadcast" # 广播路由键(WebSocket 集群消费绑定此键)
client-routing-key: "ws.client" # 客户端消息路由键(其他服务队列绑定此键)
queue: "" # 队列名称,留空将生成安全自定义队列(避免 amq.* 保留前缀),用于区分实例,确保每实例独立消费
# 交换机配置
exchange-type: "direct" # 交换机类型: direct, fanout, topic(建议 direct:使用不同路由键实现分流)
durable: true # 持久化
auto-delete: false # 自动删除
# 队列配置
queue-durable: true # 队列持久化
queue-auto-delete: false # 队列自动删除
# 消息配置
delivery-mode: 1 # 消息持久化: 1(非持久), 2(持久)
priority: 0 # 消息优先级
expiration: "" # 消息过期时间
Kafka 配置要点
brokers
:集群地址topic
:WebSocket 消息主题(下行)clientTopic
:客户端上行消息主题(可选)groupId
:消费组;开启broadcast=true
时,各实例使用唯一组ID实现广播consumer
:autoCommit
、offsetReset
、sessionTimeout
、heartbeatInterval
建议:
- 分区键:根据
targetType
使用userId
或channel
作为 key,以提升同键消息的有序性。 - 广播到所有实例:
groupId
需保证每实例唯一(不同groupId
),避免同组“仅一次消费”的语义。 - 顺序保证:Kafka 仅对同分区内消息保证顺序;跨分区不保证。
RabbitMQ 配置要点
url
:连接地址exchange
:交换机(下行消息)routingKey
:下行消息路由键(all/user/channel 建议同一键,服务端区分)clientRoutingKey
:上行消息路由键(可选)queue
:消费队列,声明并绑定到交换机+路由键exchangeType
:建议direct
- 其他:
durable
、autoDelete
、deliveryMode
、priority
、expiration
建议:
- 路由策略:统一使用一个
routingKey
(如ws.downlink
),在载荷中通过targetType
区分,简化拓扑。 - 有序性:RabbitMQ 默认不保证跨队列/路由键的全局顺序;必要时使用单一队列+单消费者并发控制。
运行机制
- 发布:当
cluster-enabled=true
时,/publish
将消息写入 MQ。 - 消费:各实例
MQConsumer
订阅并根据targetType
分发到 Hub:all
->Hub.BroadcastAll
user
->Hub.BroadcastToUser
channel
->Hub.BroadcastToChannel
temp_user
->Hub.BroadcastToTempUser
(仅临时链接接收)
消息载荷格式(下行建议)
{
"targetType": "all" | "user" | "channel" | "temp_user",
"target": "userId 或 channelId(all 时可为空)",
"data": "消息内容或对象",
"headers": {"contentType":"text/plain","traceId":"..."},
"messageId": "唯一ID(用于幂等去重)",
"timestamp": 1730800000000
}
有序性与幂等性
- 有序性:仅在“同分区/同队列、同键”的维度可期;跨键/跨分区不保证。
- 幂等性:建议提供
messageId
,在消费者/Hub 层可选去重,避免重复投递导致副作用。 - 重复可能:网络重试、消费再平衡等场景会产生重复消息,业务需容忍或做幂等处理。
心跳与慢客户端
heartbeat-content
:服务端定期发送心跳。slow-client-strategy
:drop
或block
;决定处理发送队列满的策略。
扩展:
- 背压:当 Hub 发现大量慢客户端,建议降低消费速率(Kafka 降并发/RabbitMQ 降 prefetch),避免堆积。
- 队列大小:根据峰值流量与客户端能力调整
send
缓冲大小与丢弃策略(drop
更适用于大规模广播)。 - 临时链接:
temp_user
消息仅投递至临时链接,不影响正常链接的吞吐与背压判断。