更多ruoyi-nbcio功能请看演示系统
gitee源代码地址
前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio
演示地址:RuoYi-Nbcio后台管理系统
1、重写websocketserver,如下,这里主要是用到了redis的订阅机制,具体也可以看相关文章:
package com.ruoyi.common.websocket;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.constant.WebsocketConst;
import com.ruoyi.common.core.domain.BaseProtocol;
import com.ruoyi.common.redis.NbcioRedisClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import com.ruoyi.common.base.BaseMap;
// @ServerEndpoint 声明并创建了webSocket端点, 并且指明了请求路径
// id 为客户端请求时携带的参数, 用于服务端区分客户端使用
/**
* @ServerEndpoint 声明并创建了websocket端点, 并且指明了请求路径
* userId 为客户端请求时携带的用户userId, 用于区分发给哪个用户的消息
* @author nbacheng
* @date 2023-09-20
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
public class WebSocketServer {
private Session session;
private String userId;
private static final String REDIS_TOPIC_NAME = "socketHandler";
@Resource
private NbcioRedisClient nbcioRedisClient;
/**
* 缓存 webSocket连接到单机服务class中(整体方案支持集群)
*/
private static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
private static Map<String, Session> sessionPool = new HashMap<String, Session>();
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
}
}
/**
* 服务端推送消息
*
* @param userId
* @param message
*/
public void pushMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
synchronized (session){
log.info("【websocket消息】 单点消息:" + message);
session.getBasicRemote().sendText(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 服务器端推送消息
*/
public void pushMessage(String message) {
try {
webSockets.forEach(ws -> ws.session.getAsyncRemote().sendText(message));
} catch (Exception e) {
e.printStackTrace();
}
}
@OnMessage
public void onMessage(String message) {
//todo 现在有个定时任务刷,应该去掉
log.debug("【websocket消息】收到客户端消息:" + message);
JSONObject obj = new JSONObject();
//业务类型
obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
//消息内容
obj.put(WebsocketConst.MSG_TXT, "心跳响应");
for (WebSocketServer webSocket : webSockets) {
webSocket.pushMessage(message);
}
}
/**
* 后台发送消息到redis
*
* @param message
*/
public void sendMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
BaseMap baseMap = new BaseMap();
baseMap.put("userId", "");
baseMap.put("message", message);
nbcioRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
}
/**
* 此为单点消息
*
* @param userId
* @param message
*/
public void sendMessage(String userId, String message) {
BaseMap baseMap = new BaseMap();
baseMap.put("userId", userId);
baseMap.put("message", message);
nbcioRedisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
}
/**
* 此为单点消息(多人)
*
* @param userIds
* @param message
*/
public void sendMessage(String[] userIds, String message) {
for (String userId : userIds) {
sendMessage(userId, message);
}
}
}
2、RedisConfig增加监听器
/**
* redis 监听配置
*
* @param redisConnectionFactory redis 配置
* @return
*/
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory redisConnectionFactory, RedisReceiver redisReceiver, MessageListenerAdapter commonListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.addMessageListener(commonListenerAdapter, new ChannelTopic(GlobalConstants.REDIS_TOPIC_NAME));
return container;
}
@Bean
MessageListenerAdapter commonListenerAdapter(RedisReceiver redisReceiver) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(redisReceiver, "onMessage");
messageListenerAdapter.setSerializer(jacksonSerializer());
return messageListenerAdapter;
}
private Jackson2JsonRedisSerializer jacksonSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return jackson2JsonRedisSerializer;
}
3、RedisReceiver 如下:文章来源:https://www.uudwc.com/A/LakGo/
package com.ruoyi.common.redis;
import org.springframework.stereotype.Component;
import com.ruoyi.common.base.BaseMap;
import com.ruoyi.common.constant.GlobalConstants;
import com.ruoyi.common.redis.listener.NbcioRedisListener;
import com.ruoyi.common.utils.SpringContextHolder;
import cn.hutool.core.util.ObjectUtil;
import lombok.Data;
/**
* 接受消息并调用业务逻辑处理器
* @author nbacheng
* @date 2023-09-20
*/
@Component
@Data
public class RedisReceiver {
public void onMessage(BaseMap params) {
Object handlerName = params.get(GlobalConstants.HANDLER_NAME);
NbcioRedisListener messageListener = SpringContextHolder.getHandler(handlerName.toString(), NbcioRedisListener.class);
if (ObjectUtil.isNotEmpty(messageListener)) {
messageListener.onMessage(params);
}
}
}
文章来源地址https://www.uudwc.com/A/LakGo/