切换主题
上行消息处理(客户端 -> 服务端 -> MQ)
本文介绍客户端通过 WebSocket 发送的上行消息如何被服务端接收并写入 MQ,以及在业务侧如何消费处理。
功能开关与基础行为
- 开启接收:在服务端配置中启用
websocket.accept-client-message=true
- 心跳过滤:当收到文本/二进制消息等于
websocket.heartbeat-content
时,仅刷新读超时,不写入 MQ - 消息模型:服务端写入 MQ 的统一结构如下
json
{
"targetType": "user", // 目标类型: all | user | temp_user | channel
"target": "u1001", // 目标ID(userId 或 channel)
"data": "hello" // 原始消息文本
}
提示:上行消息通常来自真实用户会话,因此
targetType
默认为user
,target
为登录用户ID。
MQ 写入策略
- Kafka:写入
clientTopic
,消息键(key)=userId
- RabbitMQ:发布到
exchange
,路由键(routingKey)=clientRoutingKey
两者的消息体均为上述 WebSocketMessage
JSON。
关键配置
在 config.yaml
或 Nacos 配置中,结合 mq.md
文档进行设置:
yaml
websocket:
accept-client-message: true # 启用接收客户端上行消息
heartbeat-content: "PING" # 心跳内容(匹配则不写MQ,仅刷新超时)
mq:
type: "kafka" # MQ类型: kafka, rabbitmq
kafka:
brokers: ["127.0.0.1:9092"]
client-topic: websocket-client # 客户端上行主题
rabbitmq:
exchange: websocket
client-routing-key: ws.client # 客户端上行路由键
更多 MQ 生产者/消费者参数说明参见《MQ 使用》。
时序图
示例:业务侧消费者
项目 web-example
已提供示例消费者:
- Kafka:
com.wueasy.example.mq.KafkaClientConsumer
- RabbitMQ:
com.wueasy.example.mq.RabbitClientConsumer
示例解析消息体为 WebSocketMessage{ targetType, target, data }
并输出日志,便于快速联调。
配置文件
示例配置(web-example/src/main/resources/application-dev.yml
):
yaml
example:
kafka:
clientTopic: websocket-client
rabbitmq:
exchange: websocket
clientRoutingKey: ws.client
clientQueue: client.service.queue
spring:
rabbitmq:
host: 192.168.3.181
port: 5672
username: wueasy
password: 1QAZ2wsx
virtual-host: /
listener:
simple:
acknowledge-mode: auto
kafka:
bootstrap-servers: 192.168.3.181:9092
consumer:
group-id: web-example-client
auto-offset-reset: latest
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Kafka 消费者示例
java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 示例:消费 wueasy-websocket 客户端上行消息(Kafka)
* 仅打印日志。
*/
@Component
public class KafkaClientConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaClientConsumer.class);
@KafkaListener(topics = "${example.kafka.clientTopic}", groupId = "${spring.kafka.consumer.group-id}")
public void onClientMessage(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
log.info("[Kafka] 接收到客户端上行消息: key={}, value={}", key, value);
}
}
RabbitMQ 消费者示例
java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
/**
* 示例:消费 wueasy-websocket 客户端上行消息(RabbitMQ)
* 仅打印日志。
*/
@Configuration
@EnableRabbit
public class RabbitClientConsumer {
private static final Logger log = LoggerFactory.getLogger(RabbitClientConsumer.class);
@Value("${example.rabbitmq.exchange}")
private String exchangeName;
@Value("${example.rabbitmq.clientRoutingKey}")
private String clientRoutingKey;
@Value("${example.rabbitmq.clientQueue}")
private String clientQueue;
@Bean
public DirectExchange clientExchange() {
return new DirectExchange(exchangeName, true, false);
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public Queue clientQueue() {
return new Queue(clientQueue, true, false, false);
}
@Bean
public Binding clientBinding(DirectExchange clientExchange, Queue clientQueue, RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(clientQueue).to(clientExchange).with(clientRoutingKey);
// 确保声明交换机、队列与绑定
rabbitAdmin.declareExchange(clientExchange);
rabbitAdmin.declareQueue(clientQueue);
rabbitAdmin.declareBinding(binding);
return binding;
}
@RabbitListener(queues = "${example.rabbitmq.clientQueue}")
public void onClientMessage(String message) {
log.info("[RabbitMQ] 接收到客户端上行消息: {}", message);
}
}
依赖配置
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
示例:客户端发送上行消息
前端或脚本连接 WebSocket 后直接发送文本消息即可。例如:
javascript
const ws = new WebSocket("ws://localhost:9888/ws?code=YOUR_TOKEN");
ws.onopen = () => {
ws.send("hello"); // 发送上行消息
};
ws.onmessage = (evt) => console.log("下行消息:", evt.data);
如果需要临时会话,请先通过
temp-verify-uri
获取临时 code,再连接ws
。
常见问题
为什么我收不到 MQ 消息?
- 未开启
accept-client-message
- 客户端发送的内容与
heartbeat-content
相同,被判定为心跳 - MQ 基础配置不正确(Kafka
clientTopic
或 RabbitMQclientRoutingKey
) - 消费者组或队列绑定未正确声明
- 未开启
如何区分不同业务的上行消息?
- 可在
data
中使用 JSON 承载业务字段,或依据target
(用户ID/频道)进行分类处理。
- 可在