接口 V2 完善:分布式环境下的 WebSocket 实现与 Token 校验
🎯 本文档详细介绍了如何使用WebSocket协议优化客户端与服务端之间的通信,特别是在处理异步订单创建通知的场景中。通过引入WebSocket代替传统的HTTP请求-响应模式,实现了服务器主动向客户端推送数据的功能,极大地提高了实时性和效率。文中首先概述了WebSocket的优势,随后深入探讨了其在分布式系统中的具体实现,包括依赖管理、网关配置、WebSocket服务类的设计以及消息队列的使用等关键环节。特别地,针对分布式架构下WebSocket连接状态同步问题,提出了一种基于消息队列广播机制的解决方案,确保了系统的可扩展性和稳定性。同时,还强调了心跳检测机制的重要性,以维护连接的有效性。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)
文章目录
- 前言
- WebSocket 介绍
- 流程图
- 具体实现
- 依赖
- 网关配置
- WebSocket配置类
- WebSocket服务类
- MQ消费者
- 启动类
- 配置文件
- 注意事项
- 登录验证
- WebSocket 配置类
- token校验
- 分布式 WebSocket
- 心跳检测
前言
在时间段预定接口 V2 中,用户预定之后,会发送一个消息,让消息队列异步创建订单。此时客户端是无法知道服务端什么时候完成订单创建的,因此需要服务端告知客户端。但是以往都是客户端给服务端发 http 请求,但是服务端如何主动告知客户端呢?
这个时候就需要请出我们今天的主角 WebSocket 了
WebSocket 介绍
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务器直接向客户端推送数据而不必由客户端发起请求。这种特性让实时性要求较高的应用,如即时通讯工具、在线游戏以及实时交易系统等,能够更加高效地进行数据交互。通过WebSocket,开发者可以构建响应更快、性能更高的网络应用,同时减少不必要的网络开销和延迟。相比传统的HTTP请求-响应模式,WebSocket提供了更低的延迟和更高的效率,特别是在需要频繁更新数据的应用场景中表现出色。
因此使用了 WebSocket ,一旦客户端和服务端建立了连接,当订单创建成功之后,服务端直接别订单数据推送给客户端即可。
流程图
user1、user2 和 user3 分别发起 WebSocket 连接,首先经过网关,连接请求被分发到不同的服务中。WebSocket 服务接收到连接请求之后,对其进行登录校验,如果校验成功,将其 Session 信息存储在服务器的内存中,如果校验失败,直接关闭 Session 。其中 user1、user2 的Session信息被存储在 WebSocket 服务1 中,user3 的Session信息被存储在 WebSocket 服务2 中。
当用户预定时间段,生成订单之后,场馆服务向消息队列中发生订单数据。接着消息队列将订单数据广播到 WebSocket 服务1 和 WebSocket 服务2中。WebSocket 服务2 发现自己的内存中存有 user3 的Session,因此将订单数据通过该 Session 发送给 user3 。
暂时无法在飞书文档外展示此内容
具体实现
为了解耦 WebSocket 和其他服务,单独创建一个 WebSocket 服务。
依赖
<dependencies><dependency><groupId>com.vrs</groupId><artifactId>vrs-web</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.dam</groupId><artifactId>vrs-rocketmq</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.vrs</groupId><artifactId>vrs-common</artifactId></dependency><dependency><groupId>com.vrs</groupId><artifactId>vrs-idempotent</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- websocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
</dependencies>
网关配置
当访问 /websocket/**
路径时,将请求转化到 WebSocket 服务,注意,转发的时候添加了前缀ws:
- id: vrs-websocketuri: lb:ws://vrs-websocketpredicates:- Path=/websocket/**filters:- name: TokenValidateargs:whitePathList:- /websocket/**
【去除默认过滤器】
如果像这样全局配置了默认过滤器,DedupeResponseHeader
过滤器的作用是对指定的响应头(在这个例子中为Vary
、Access-Control-Allow-Origin
和Access-Control-Allow-Credentials
)进行去重。当有多个相同名称的响应头时,它会按照给定的策略保留其中的一个。这里的策略是RETAIN_FIRST
,意味着它将保留这些头部中第一次出现的那个,而删除后续出现的重复头部。
spring:cloud:gateway:default-filters:- DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST
发起 WebSocket 连接的时候,会报如下错误,这是因为修改了只读的请求头
java.lang.UnsupportedOperationException: nullat org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]*__checkpoint ⇢ HTTP GET "/websocket/admin?token=eyJhbGciOiJIUzUxMiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAA_6tWKi5NUrJScgwN8dANDXYNUtJRSq0oULIyNDe2NDMyNrYw0lEqLU4t8kwBilmYmZgZm5sbG5mbGViYGpgYQyX9EnNTgYYkpuRm5ilBhEIqC4BCRrUAvgeVqmEAAAA.e7wanr0gKu4FD-Y_afO2MEIECxZ6oMKGlf8zarZp-GOmzqL5n354gasKr7GKKs4H3Pq0CYJQECO_Rv9ixGsvZQ" [ExceptionHandlingWebHandler]
Original Stack Trace:at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:108) ~[spring-web-6.0.9.jar:6.0.9]
因此需要将上述配置删除,如果还需要这些默认配置,可以到具体的路由下面设置,就像下面一样
spring:cloud:gateway:routes:- id: vrs-adminuri: lb://vrs-adminpredicates:- Path=/admin/**filters:- DedupeResponseHeader=Vary Access-Control-Allow-Origin Access-Control-Allow-Credentials, RETAIN_FIRST- name: TokenValidateargs:whitePathList:- /admin/user/v1/login- /admin/user/v1/wechatLogin- ...
WebSocket配置类
配置类 WebSocketConfig
主要用于配置和初始化 WebSocket 服务器端点,并处理与 WebSocket 连接相关的操作,具体功能如下:
- Spring Bean 注册:通过
@Configuration
注解标明这是一个 Spring 配置类。在该类中定义了一个@Bean
方法serverEndpointExporter()
,它返回一个ServerEndpointExporter
实例。这个实例的作用是自动注册使用了@ServerEndpoint
注解声明的 WebSocket 端点对象到 Spring 容器中。 - 握手请求修改:
modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response)
方法重写了父类中的同名方法,用于在建立 WebSocket 连接前对握手请求进行自定义修改。在这个例子中,方法尝试从握手请求参数中获取名为 “token” 的参数,并将其存储在ServerEndpointConfig
对象的用户属性中(即sec.getUserProperties().put("token", token);
)。这使得后续逻辑可以通过访问端点配置对象来获取令牌信息。 - 端点实例化:
getEndpointInstance(Class<T> clazz)
方法重写了父类的方法,用于提供自定义逻辑来实例化被@ServerEndpoint
标注的 WebSocket 端点类。在这个实现中,它直接调用了父类的实现super.getEndpointInstance(clazz)
来创建端点实例。通常情况下,除非需要特别的实例化逻辑,否则可以直接使用父类的默认实现。
package com.vrs.config;import jakarta.websocket.HandshakeResponse;
import jakarta.websocket.server.HandshakeRequest;
import jakarta.websocket.server.ServerEndpointConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;import java.util.List;
import java.util.Map;/*** @Author dam* @create 2025/1/24 15:25*/
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator {/*** 这个bean会自动注册使用了@ServerEndpoint注解声明的对象** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}/*** 建立握手时,连接前的操作*/@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {// 获取请求参数Map<String, List<String>> parameterMap = request.getParameterMap();List<String> tokenList = parameterMap.get("token");if (tokenList != null && !tokenList.isEmpty()) {String token = tokenList.get(0);sec.getUserProperties().put("token", token);}}/*** 初始化端点对象,也就是被@ServerEndpoint所标注的对象*/@Overridepublic <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {return super.getEndpointInstance(clazz);}
}
WebSocket服务类
WebSocketServer
类是为实现实时通信而设计的,能够有效地管理多个客户端之间的双向通信以及保持这些通信的稳定性和可靠性。它通过 Spring 的 @Component
和 Jakarta WebSocket 的 @ServerEndpoint
注解被注册为一个 Spring Bean,并监听路径为 /websocket/{username}
的 WebSocket 请求。该类利用一个静态的 ConcurrentHashMap
来存储每个用户的会话 (Session
) 和最后一次活动时间,以跟踪在线用户和他们的活跃状态。它实现了以下关键功能:
- 连接管理:处理用户的连接建立 (
onOpen
) 和关闭 (onClose
) 事件,包括校验用户提供的 token 是否有效。 - 消息处理:接收来自客户端的消息 (
onMessage
) 并据此更新用户的最后活动时间,支持发送 PING/PONG 心跳消息来维持连接。 - 心跳检测:通过定时任务每30秒检查一次用户的心跳,若某用户超过60秒未活动,则自动断开其连接,确保资源的有效利用。
- 消息发送:提供了一个方法用于向特定用户发送消息。
package com.vrs.controller;import com.vrs.config.WebSocketConfig;
import com.vrs.constant.RedisCacheConstant;
import com.vrs.utils.JwtUtil;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** @Author dam* @create 2024/1/24 14:32*/
// 将WebSocketServer注册为spring的一个bean
@ServerEndpoint(value = "/websocket/{username}", configurator = WebSocketConfig.class)
@Component
@Slf4j(topic = "WebSocketServer")
public class WebSocketServer {/*** 心跳检查间隔时间(单位:秒)*/private static final int HEARTBEAT_INTERVAL = 30;/*** 心跳超时时间(单位:秒)*/private static final int HEARTBEAT_TIMEOUT = 60;/*** 记录当前在线连接的客户端的session*/private static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();/*** 记录用户最后一次活动时间*/private static final Map<String, Long> lastActivityTimeMap = new ConcurrentHashMap<>();/*** 直接通过 Autowired 注入的话,redisTemplate为null,因此使用这种引入方式*/private static StringRedisTemplate redisTemplate;@Autowiredpublic void setRabbitTemplate(StringRedisTemplate redisTemplate) {WebSocketServer.redisTemplate = redisTemplate;}/*** 定时任务线程池,用于心跳检查*/private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);// 初始化心跳检查任务static {scheduler.scheduleAtFixedRate(WebSocketServer::checkHeartbeat, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}/*** 浏览器和服务端连接建立成功之后会调用这个方法*/@OnOpenpublic void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {// 校验 token 是否有效String token = (String) config.getUserProperties().get("token");boolean validToken = validToken(token);if (!validToken) {try {session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));} catch (IOException e) {e.printStackTrace();}}// 如果用户已存在,关闭旧连接if (usernameAndSessionMap.containsKey(username)) {Session oldSession = usernameAndSessionMap.get(username);if (oldSession != null && oldSession.isOpen()) {try {oldSession.close();} catch (IOException e) {log.error("关闭旧连接时发生错误", e);}}}// 记录新连接usernameAndSessionMap.put(username, session);// 记录用户活动时间lastActivityTimeMap.put(username, System.currentTimeMillis());log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session, @PathParam("username") String username) throws IOException {try {if (session != null && session.isOpen()) {session.close();}} catch (IOException e) {log.error("关闭连接时发生错误", e);} finally {usernameAndSessionMap.remove(username);lastActivityTimeMap.remove(username);log.info("有一连接关闭,移除username={}的用户session, 当前在线人数为:{}", username, usernameAndSessionMap.size());}}/*** 发生错误的时候会调用这个方法*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("发生错误,原因:" + error.getMessage());error.printStackTrace();}/*** 收到客户端消息时调用*/@OnMessagepublic void onMessage(String message, Session session, @PathParam("username") String username) {// 更新用户最后一次活动时间lastActivityTimeMap.put(username, System.currentTimeMillis());if ("PING".equals(message)) {log.debug("收到来自 {} 的心跳检测请求", username);} else {log.info("收到来自 {} 的消息: {}", username, message);}}/*** 服务端发送消息给客户端*/public void sendMessage(String toUsername, String message) {try {Session toSession = usernameAndSessionMap.get(toUsername);if (toSession != null && toSession.isOpen()) {toSession.getBasicRemote().sendText(message);} else {log.warn("用户 {} 的会话已关闭或不存在", toUsername);}} catch (Exception e) {log.error("服务端发送消息给客户端失败", e);}}/*** 关闭心跳检测超时的 session*/private static void checkHeartbeat() {long currentTime = System.currentTimeMillis();for (Map.Entry<String, Long> entry : lastActivityTimeMap.entrySet()) {String username = entry.getKey();long lastActivityTime = entry.getValue();if (currentTime - lastActivityTime > HEARTBEAT_TIMEOUT * 1000) {log.info("用户 {} 心跳超时,关闭连接", username);Session session = usernameAndSessionMap.get(username);if (session != null) {try {session.close();} catch (IOException e) {log.error("关闭连接时发生错误", e);}}usernameAndSessionMap.remove(username);lastActivityTimeMap.remove(username);}}}/*** 校验 token 有效** @param token* @return*/private boolean validToken(String token) {String userName = "";try {// 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效userName = JwtUtil.getUsername(token);} catch (Exception e) {return false;}if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&(redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {// --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验return true;}return false;}}
MQ消费者
package com.vrs.rocketMq.listener;import com.vrs.constant.RocketMqConstant;
import com.vrs.controller.WebSocketServer;
import com.vrs.domain.dto.mq.WebsocketMqDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 执行预订流程 消费者** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG,// 需要使用广播模式messageModel = MessageModel.BROADCASTING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG
)
@RequiredArgsConstructor
public class WebSocketSendMessageListener implements RocketMQListener<MessageWrapper<WebsocketMqDTO>> {private final WebSocketServer webSocketServer;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@SneakyThrows@Overridepublic void onMessage(MessageWrapper<WebsocketMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] websocket发生消息给{}", messageWrapper.getMessage().getToUsername());webSocketServer.sendMessage(messageWrapper.getMessage().getToUsername(), messageWrapper.getMessage().getMessage());}
}
启动类
package com.vrs;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;/*** @Author dam* @create 2025/01/24 16:34*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class VrsWebSocketApplication {public static void main(String[] args) {SpringApplication.run(VrsWebSocketApplication.class, args);}
}
配置文件
server:port: 7054
spring:profiles:active: damapplication:name: vrs-websocketcloud:nacos:discovery:server-addr: 127.0.0.1:8848data:redis:host: 127.0.0.1port: 6379password: 12345678database: 0timeout: 1800000jedis:pool:max-active: 20 #最大连接数max-wait: -1 #最大阻塞等待时间(负数表示没限制)max-idle: 5 #最大空闲min-idle: 0 #最小空闲
rocketmq:# rocketMq的nameServer地址name-server: 127.0.0.1:9876producer:# 生产者组别group: vrs-websocket-group# 消息发送的超时时间send-message-timeout: 10000# 异步消息发送失败重试次数retry-times-when-send-async-failed: 1# 发送消息的最大大小,单位字节,这里等于4Mmax-message-size: 999999999
注意事项
登录验证
为了防止被人恶意发生大量 WebSocket 连接,占用服务器资源,因此在建立连接的时候,需要进行登录验证,用户登录了才可以建立 WebSocket 连接。
由于建立 WebSocket 连接时,无法像之前的 http 请求一样在请求头携带 token 信息,因此之前网关实现的登录校验机制不生效,需要我们针对 WebSocket 连接额外实现一套登录验证方式。
假设前端发起 WebSocket 连接的代码如下:
new WebSocket("ws://localhost:7049/websocket/admin?token=dahidaho");
WebSocket 配置类
在modifyHandshake
中,将客户端发起连接请求时的 token 设置到属性中,这样后面就可以将 token 获取出来进行校验,如果说校验不通过,就关闭 WebSokcet
连接
token校验
代码位于WebSocketServer
类中,当调用validToken
校验失败之后,通过session.close
来关闭连接
/*** 校验 token 有效** @param token* @return*/
private boolean validToken(String token) {String userName = "";try {// 如果从 token 中解析用户名错误,说明 token 是捏造的,或者已经失效userName = JwtUtil.getUsername(token);} catch (Exception e) {return false;}if (StringUtils.hasText(userName) && StringUtils.hasText(token) &&(redisTemplate.opsForHash().get(RedisCacheConstant.USER_LOGIN_KEY + userName, token)) != null) {// --if-- 如果可以通过 token 从 Redis 中获取到用户的登录信息,说明通过校验return true;}return false;
}/*** 浏览器和服务端连接建立成功之后会调用这个方法*/
@OnOpen
public void onOpen(Session session, @PathParam("username") String username, EndpointConfig config) {// 校验 token 是否有效String token = (String) config.getUserProperties().get("token");boolean validToken = validToken(token);if (!validToken) {try {session.close(new CloseReason(CloseReason.CloseCodes.VIOLATED_POLICY, "无效的token,请先登录"));} catch (IOException e) {e.printStackTrace();}}// 如果用户已存在,关闭旧连接if (usernameAndSessionMap.containsKey(username)) {Session oldSession = usernameAndSessionMap.get(username);if (oldSession != null && oldSession.isOpen()) {try {oldSession.close();} catch (IOException e) {log.error("关闭旧连接时发生错误", e);}}}// 记录新连接usernameAndSessionMap.put(username, session);// 记录用户活动时间lastActivityTimeMap.put(username, System.currentTimeMillis());log.info("有新用户加入,username={}, 当前在线人数为:{}", username, usernameAndSessionMap.size());
}
分布式 WebSocket
由于我们的项目是分布式架构的,如果vrs-websocket
启动多个服务的话,需要处理如下问题:
WebSocketServer
中的用户名及其对应的session信息usernameAndSessionMap
是存储在本地的,假设发起连接的时候,session被存储在机器 1 上面。后续服务端要通知客户端时,怎么知道当前用户的信息是存储在机器1、机器 2 还是机器 3 呢?
由于 Session 无法直接序列化存储到 Redis 中,为了解决这个问题,本文通过借助消息队列来解决。
服务端要发送消息给客户端时,先将消息发送至消息队列中,消息设置为广播模式。后续多台部署了vrs-websocket
的机器去消息队列中获取消息来消费,如果机器检查到了这条消息的接收者 session 就在机器上,则执行发送,否则直接 return 即可。
【消息生产者】
package com.vrs.rocketMq.producer;import cn.hutool.core.util.StrUtil;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.WebsocketMqDTO;
import com.vrs.templateMethod.AbstractCommonSendProduceTemplate;
import com.vrs.templateMethod.BaseSendExtendDTO;
import com.vrs.templateMethod.MessageWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import java.util.UUID;/*** websocket发送消息 生产者** @Author dam* @create 2024/9/20 16:00*/
@Slf4j
@Component
public class WebsocketSendMessageProducer extends AbstractCommonSendProduceTemplate<WebsocketMqDTO> {@Overrideprotected BaseSendExtendDTO buildBaseSendExtendParam(WebsocketMqDTO messageSendEvent) {return BaseSendExtendDTO.builder().eventName("执行时间段预定").topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.WEBSOCKET_SEND_MESSAGE_TAG).sentTimeout(2000L).build();}@Overrideprotected Message<?> buildMessage(WebsocketMqDTO messageSendEvent, BaseSendExtendDTO requestParam) {String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();return MessageBuilder.withPayload(new MessageWrapper(keys, messageSendEvent)).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag()).build();}
}
【消息消费者】
消费者的代码就在具体实现中,这里不重复放
【使用】
// 通过 websocket 发送消息,通知前端
websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());
心跳检测
用户建立 WebSocket 连接之后的 session 数据是存储在服务器本地的,随着连接数量的增加,session会占用大量的内存,心跳检测是为了定期清理那些无效的连接。
在WebSocketServer
中,通过定时任务每30秒检查一次客户端的心跳状态,记录每个用户的最后活动时间。如果当前时间与某用户最后活动时间之差超过60秒,则认为该用户心跳超时,服务端将关闭其WebSocket连接并清理相关记录。客户端需定期向服务端发送"PING"消息以维持连接活跃,确保不会因超时而被服务端断开。
相关文章:
接口 V2 完善:分布式环境下的 WebSocket 实现与 Token 校验
🎯 本文档详细介绍了如何使用WebSocket协议优化客户端与服务端之间的通信,特别是在处理异步订单创建通知的场景中。通过引入WebSocket代替传统的HTTP请求-响应模式,实现了服务器主动向客户端推送数据的功能,极大地提高了实时性和效…...
Android中Service在新进程中的启动流程2
目录 1、Service在客户端的启动入口 2、Service启动在AMS的处理 3、Service在新进程中的启动 4、Service与AMS的关系再续 上一篇文章中我们了解了Service在新进程中启动的大致流程,同时认识了与客户端进程交互的接口IApplicationThread以及与AMS交互的接口IActi…...
C语言初阶力扣刷题——349. 两个数组的交集【难度:简单】
1. 题目描述 力扣在线OJ题目 给定两个数组,编写一个函数来计算它们的交集。 示例: 输入:nums1 [1,2,2,1], nums2 [2,2] 输出:[2] 输入:nums1 [4,9,5], nums2 [9,4,9,8,4] 输出:[9,4] 2. 思路 直接暴力…...
Java 大视界 -- Java 大数据在自动驾驶中的数据处理与决策支持(68)
💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…...
DeepSeek学术写作测评第二弹:数据分析、图表解读,效果怎么样?
我是娜姐 迪娜学姐 ,一个SCI医学期刊编辑,探索用AI工具提效论文写作和发表。 针对最近全球热议的DeepSeek开源大模型,娜姐昨天分析了关于论文润色、中译英的详细效果测评: DeepSeek学术写作测评第一弹:论文润色&#…...
(2)SpringBoot自动装配原理简介
SpringBoot自动装配 这里写目录标题 SpringBoot自动装配启动器主程序自定义扫描包SpringBootApplicationSpringBootConfigurationEnableAutoConfigurationAutoConfigurationPackageImport({AutoConfigurationImportSelector.class})选择器AutoConfigurationEntrygetCandidateCo…...
Rust:Rhai脚本编程示例
当然,以下是一个简单的Rhai脚本编程示例,展示了如何在Rust中使用Rhai执行脚本。 首先,你需要确保你的Rust项目中包含了rhai库。你可以在你的Cargo.toml文件中添加以下依赖项: [dependencies] rhai "0.19" # 请检查最…...
深入理解文件描述符
问题 文件描述符只是一个整数值,那么系统是如何利用这个整数值来完成文件读写的呢? 什么是文件系统? 计算机中用于组织、存储和管理文件的数据结构集合 管理磁盘或其他存储介质上的空间 (将存储介质分块管理)保证文件数据不被破坏…...
使用CSS实现一个加载的进度条
文章目录 使用CSS实现一个加载的进度条一、引言二、步骤一:HTML结构与CSS基础样式1、HTML结构2、CSS基础样式 三、步骤二:添加动画效果1、使用CSS动画2、结合JavaScript控制动画 四、使用示例五、总结 使用CSS实现一个加载的进度条 一、引言 在现代网页…...
SQL 指南
SQL 指南 引言 SQL(Structured Query Language,结构化查询语言)是一种用于管理关系数据库系统的标准计算机语言。自1970年代问世以来,SQL已经成为了数据库管理和数据操作的事实标准。本文旨在为初学者和有经验的数据库用户提供一个全面的SQL指南,涵盖SQL的基础知识、高级…...
sqlzoo答案4:SELECT within SELECT Tutorial
sql练习:SELECT within SELECT Tutorial - SQLZoo world表: namecontinentareapopulationgdpAfghanistanAsia6522302550010020343000000AlbaniaEurope28748283174112960000000AlgeriaAfrica238174137100000188681000000AndorraEurope46878115371200000…...
斐波那契数(信息学奥赛一本通-1071)
【题目描述】 菲波那契数列是指这样的数列: 数列的第一个和第二个数都为1,接下来每个数都等于前面2个数之和。给出一个正整数k,要求菲波那契数列中第k个数是多少。 【输入】 输入一行,包含一个正整数k。(1 ≤ k ≤ 46)…...
数据结构与算法再探(六)动态规划
目录 动态规划 (Dynamic Programming, DP) 动态规划的基本思想 动态规划的核心概念 动态规划的实现步骤 动态规划实例 1、爬楼梯 c 递归(超时)需要使用记忆化递归 循环 2、打家劫舍 3、最小路径和 4、完全平方数 5、最长公共子序列 6、0-1背…...
ECMAScript--promise的使用
一、Promise的简介 Promise是一个代理,它所代表的值在创建时并不一定是已知的。借助Promise,我们能够将处理程序与异步操作最终的成功值或者失败原因关联起来。这一特性使得异步方法可以像同步方法那样返回值,不同之处在于异步方法不会立…...
微服务入门(go)
微服务入门(go) 和单体服务对比:里面的服务仅仅用于某个特定的业务 一、领域驱动设计(DDD) 基本概念 领域和子域 领域:有范围的界限(边界) 子域:划分的小范围 核心域…...
【自学笔记】计算机网络的重点知识点-持续更新
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 计算机网络重点知识点一、计算机网络概述二、网络分类三、网络性能指标四、网络协议与体系结构五、数据交换方式六、物理层与数据链路层七、网络层与运输层八、应用…...
leetcode——二叉树的中序遍历(java)
给定一个二叉树的根节点 root ,返回 它的 中序 遍历 。 示例 1: 输入:root [1,null,2,3] 输出:[1,3,2] 示例 2: 输入:root [] 输出:[] 示例 3: 输入:root [1] 输出…...
neo4j-community-5.26.0 install in window10
在住处电脑重新配置一下neo4j, 1.先至官方下载 Neo4j Desktop Download | Free Graph Database Download Neo4j Deployment Center - Graph Database & Analytics 2.配置java jdk jdk 21 官网下载 Java Downloads | Oracle 中国 path: 4.查看java -version 版本 5.n…...
物联网智能项目之——智能家居项目的实现!
成长路上不孤单😊😊😊😊😊😊 【14后😊///计算机爱好者😊///持续分享所学😊///如有需要欢迎收藏转发///😊】 今日分享关于物联网智能项目之——智能家居项目…...
基于SpringBoot的假期周边游平台的设计与实现(源码+SQL脚本+LW+部署讲解等)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
JavaScript_03 超简计算器
版本一: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>计算器</title><script type"text/javascript">function add(){let num1 document.getElementById("number1&qu…...
【重生之我在学习C语言指针详解】
目录 编辑 --------------------------------------begin---------------------------------------- 引言 一、指针基础 1.1 内存地址 1.2 指针变量 1.3 指针声明 1.4 取地址运算符 & 1.5 解引用运算符 *** 二、指针运算 2.1 指针加减运算 2.2 指针关系运算 三…...
深度学习每周学习总结R5(LSTM-实现糖尿病探索与预测-模型优化)
🍨 本文为🔗365天深度学习训练营 中的学习记录博客R7中的内容,为了便于自己整理总结起名为R5🍖 原作者:K同学啊 | 接辅导、项目定制 目录 0. 总结优化细节(目前只采用了1、2两种方式)1. L2 正则…...
单元测试在复杂业务逻辑开发中的重要性与实践
背景 以前编写程序时,我并没有养成大量撰写单元测试的习惯,尤其是在写偏向业务代码的情况下,写的单元测试很少,只有在封装一些公共方法的时候才会写一些测试用例。 然而,最近我在开发的一个业务时,深刻地…...
Kubernetes 环境中的自动化运维实战指南
Kubernetes 作为容器编排领域的领导者,已经成为云原生应用的核心基础设施。然而,随着集群规模的扩大和应用的复杂化,手动运维 Kubernetes 集群变得愈发困难。自动化运维成为提升效率、保障系统稳定性的关键。本文将详细介绍如何在 Kubernetes 环境中实施自动化运维,涵盖工具…...
Linux 如何使用fdisk进行磁盘相关的操作
简介 fdisk 命令是 Linux 中用于管理磁盘分区的强大文本实用程序。它可以创建、删除、调整大小和修改硬盘上的分区。 基本语法 fdisk [options] <device> <device>:要管理的磁盘,例如 /dev/sda、/dev/nvme0n1 或 /dev/vda 示例用法 列…...
嵌入式Linux:如何监视子进程
目录 1、wait()函数 2、waitpid()函数 3、SIGCHLD信号 在嵌入式Linux系统中,父进程通常需要创建子进程来执行特定任务,例如处理网络请求、执行计算任务等。监视子进程的状态不仅可以确保资源的合理利用,还能防止僵尸进程的产生,…...
【信息系统项目管理师-选择真题】2010上半年综合知识答案和详解
更多内容请见: 备考信息系统项目管理师-专栏介绍和目录 文章目录 【第1~2题】【第3题】【第4题】【第5题】【第6题】【第7题】【第8题】【第9题】【第10题】【第11题】【第12题】【第13题】【第14题】【第15题】【第16题】【第17题】【第18题】【第19题】【第20题】【第21题】…...
工作总结:压测篇
前言 压测是测试需要会的一项技能,作为开发,有点时候也要会一点压测。也是被逼着现学现卖的。 一、压测是什么,以及压测工具的选择 压测,即压力测试,是一种性能测试手段,通过模拟大量用户同时访问系统&am…...
doris:STRUCT
STRUCT<field_name:field_type [COMMENT comment_string], ... > 表示由多个 Field 组成的结构体,也可被理解为多个列的集合。 不能作为 Key 使用,目前 STRUCT 仅支持在 Duplicate 模型的表中使用。一个 Struct 中的 Field 的名字和数量固定&…...
二叉树介绍
一.树的概念 树的图: 1.结点的度:一个结点含有子树的个数称为该结点的度; 如上图:A的度为6 2.树的度:一棵树中,所有结点度的最大值称为树的度; 如上图:树的度为6 3.叶子结点或终…...
通过Ngrok实现内网穿透助力远程开发
在现代软件开发和网络应用的环境下,开发人员常常需要在本地搭建服务器进行调试、测试或演示。然而,传统的端口映射(如使用 NAT 或 SSH 隧道)配置繁琐,且并非所有环境都允许直接暴露本地服务。ngrok 作为一款强大的隧道…...
DeepSeek-R1:通过强化学习激励大型语言模型(LLMs)的推理能力
摘要 我们推出了第一代推理模型:DeepSeek-R1-Zero和DeepSeek-R1。DeepSeek-R1-Zero是一个未经监督微调(SFT)作为初步步骤,而是通过大规模强化学习(RL)训练的模型,展现出卓越的推理能力。通过强…...
Node.js基础
浏览器知识 浏览器 个浏览器都内置了DOM、BOM等API函数,供浏览器中的Javascript调用。 每个浏览器都有对应的JavaScript解析引擎。 浏览器中的JavaScript环境 V8引擎负责解析和执行JavaScript代码 内置API是由运行环境提供的特殊接口,只能在所属的运…...
DeepSeek R1:中国AI黑马的崛起与挑战
在人工智能(AI)领域,大型语言模型(LLMs)正以迅猛之势重塑世界,其发展速度和影响力令人瞩目。近期,中国DeepSeek公司发布的DeepSeek R1模型,宛如一颗璀璨新星,凭借卓越的推…...
【JavaEE】_MVC架构与三层架构
目录 1. MVC架构 2. 三层架构 3. MVC架构与三层架构的对比 3.1 MVC与三层架构的对比 3.2 MVC与三层架构的共性 1. MVC架构 在前文已介绍关于SpringMAC的设计模式,详见下文: 【JavaEE】_Spring Web MVC简介-CSDN博客文章浏览阅读967次,点…...
对比DeepSeek、ChatGPT和Kimi的学术写作摘要能力
摘要 摘要是文章的精华,通常在200-250词左右。要包括研究的目的、方法、结果和结论。让AI工具作为某领域内资深的研究专家,编写摘要需要言简意赅,直接概括论文的核心,为读者提供快速了解的窗口。 下面我们使用DeepSeek、ChatGPT…...
ts 进阶
吴悠讲编程 : 20分钟TypeScript进阶!无废话快速提升水平 前端速看 https://www.bilibili.com/video/BV1q64y1j7aH...
Kubernetes(一)
Kubernetes(简称K8s)是一个开源的容器编排平台,已经成为现代云原生应用的核心技术,主要应用于对容器化应用程序的自动化部署、扩展以及管理。k8s配备了一组核心组件以及一系列功能,这些组件能够实现容器的调度、负载均…...
Python里的小整数问题挺有意思的
简单来说,Python为了优化性能,会把一些常用的整数(通常是-5到256)提前创建好,放到一个“缓存池”里。这样,当你用到这些小整数时,Python就不用每次都重新创建对象了,直接从缓存池里拿…...
基于 Jenkins 的测试报告获取与处理并写入 Jira Wiki 的技术总结
title: 基于 Jenkins 的测试报告获取与处理并写入 Jira Wiki 的技术总结 tags: - jenkins - python categories: - jenkins在软件开发的持续集成与持续交付(CI/CD)流程里,及时、准确地获取并分析测试报告对保障软件质量至关重要。本文将详细…...
java.util.Random类(详细案例拆解)(已完结)
前言: 小编打算近期更俩三期类的专栏,一些常用的专集类,给大家分好类别总结和详细的代码举例解释。 今天是除夕,小编先祝贺大家除夕快乐啦!! 今天是第六个 java.lang.Math 包中的 java.util.Random类 我…...
CMake常用命令指南(CMakeList.txt)
CMakeList从入门到精通的文章有很多不再赘述( 此处附带一篇优秀的博文链接:一个简单例子,完全入门CMake语法与CMakeList编写 )。 本文主要列举 CMake 中常用命令的详细说明、优缺点分析以及推荐做法,以更好地理解和灵…...
Mybatis是如何进行分页的?
大家好,我是锋哥。今天分享关于【Mybatis是如何进行分页的?】面试题。希望对大家有帮助; Mybatis是如何进行分页的? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 MyBatis 实现分页的方式有很多种,最常见…...
推动知识共享的在线知识库实施与优化指南
内容概要 在当今迅速发展的数字化时代,在线知识库的实施显得尤为重要。它不仅为企业提供了高效的信息存储与共享平台,还能够有效促进团队成员之间的协作与知识传递。通过集中管理企业内的各类知识资源,在线知识库帮助员工快速查找所需信息&a…...
【最后203篇系列】007 使用APS搭建本地定时任务
说明 最大的好处是方便。 其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。 总结…...
为AI聊天工具添加一个知识系统 之78 详细设计之19 正则表达式 之6
本文要点 要点 本项目设计的正则表达式 是一个 动态正则匹配框架。它是一个谓词系统:谓词 是运动,主语是“维度”,表语是 语言处理。主语的一个 双动结构。 Reg三大功能 语法验证、语义检查和 语用检验,三者 :语义约…...
三天急速通关JavaWeb基础知识:Day 1 后端基础知识
三天急速通关JavaWeb基础知识:Day 1 后端基础知识 0 文章说明1 Http1.1 介绍1.2 通信过程1.3 报文 Message1.3.1 请求报文 Request Message1.3.2 响应报文 Response Message 2 XML2.1 介绍2.2 利用Java解析XML 3 Tomcat3.1 介绍3.2 Tomcat的安装与配置3.3 Tomcat的项…...
代理模式 -- 学习笔记
代理模式学习笔记 什么是代理? 代理是一种设计模式,用户可以通过代理操作,而真正去进行处理的是我们的目标对象,代理可以在方法增强(如:记录日志,添加事务,监控等) 拿一…...
前端-Rollup
Rollup 是一个用于 JavaScript 的模块打包工具,它将小的代码片段编译成更大、更复杂的代码,例如库或应用程序。它使用 JavaScript 的 ES6 版本中包含的新标准化代码模块格式,而不是以前的 CommonJS 和 AMD 等特殊解决方案。ES 模块允许你自由…...