SpringBoot+netty-socketio如何实现服务器端消息推送

这篇文章主要介绍SpringBoot+netty-socketio如何实现服务器端消息推送,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

首先:因为工作需要,需要对接socket.io框架对接,所以目前只能使用netty-socketio。websocket是不支持对接socket.io框架的。

SpringBoot+netty-socketio如何实现服务器端消息推送

netty-socketio顾名思义他是一个底层基于netty'实现的socket。

在springboot项目中的集成,请看下面的代码

maven依赖

<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.11</version>
</dependency>

下面就是代码了

首先是配置参数

#socketio配置
socketio:
host:localhost
port:9099
#设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
maxFramePayloadLength:1048576
#设置http交互最大内容长度
maxHttpContentLength:1048576
#socket连接数大小(如只监听一个端口boss线程组为1即可)
bossCount:1
workCount:100
allowCustomRequests:true
#协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
upgradeTimeout:1000000
#Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
pingTimeout:6000000
#Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
pingInterval:25000

上面的注释写的很清楚。下面是config代码

importcom.corundumstudio.socketio.Configuration;
importcom.corundumstudio.socketio.SocketConfig;
importcom.corundumstudio.socketio.SocketIOServer;
importorg.springframework.beans.factory.InitializingBean;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.stereotype.Component;

importjavax.annotation.Resource;

/**
*kcm
*/
@Component
publicclassPushServerimplementsInitializingBean{

@Autowired
privateEventListennereventListenner;

@Value("${socketio.port}")
privateintserverPort;

@Value("${socketio.host}")
privateStringserverHost;

@Value("${socketio.bossCount}")
privateintbossCount;

@Value("${socketio.workCount}")
privateintworkCount;

@Value("${socketio.allowCustomRequests}")
privatebooleanallowCustomRequests;

@Value("${socketio.upgradeTimeout}")
privateintupgradeTimeout;

@Value("${socketio.pingTimeout}")
privateintpingTimeout;

@Value("${socketio.pingInterval}")
privateintpingInterval;

@Override
publicvoidafterPropertiesSet()throwsException{
Configurationconfig=newConfiguration();
config.setPort(serverPort);
config.setHostname(serverHost);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setAllowCustomRequests(allowCustomRequests);
config.setUpgradeTimeout(upgradeTimeout);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);

SocketConfigsocketConfig=newSocketConfig();
socketConfig.setReuseAddress(true);
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
config.setSocketConfig(socketConfig);

SocketIOServerserver=newSocketIOServer(config);
server.addListeners(eventListenner);
server.start();
System.out.println("启动正常");
}
}

在就是监听代码

importcom.corundumstudio.socketio.AckRequest;
importcom.corundumstudio.socketio.SocketIOClient;
importcom.corundumstudio.socketio.annotation.OnConnect;
importcom.corundumstudio.socketio.annotation.OnDisconnect;
importcom.corundumstudio.socketio.annotation.OnEvent;
importorg.apache.commons.lang3.StringUtils;
importorg.bangying.auth.JwtSupport;
importorg.springframework.stereotype.Component;

importjavax.annotation.Resource;
importjava.util.UUID;

@Component
publicclassEventListenner{
@Resource
privateClientCacheclientCache;

@Resource
privateJwtSupportjwtSupport;

/**
*客户端连接
*
*@paramclient
*/
@OnConnect
publicvoidonConnect(SocketIOClientclient){
StringuserId=client.getHandshakeData().getSingleUrlParam("userId");
//userId=jwtSupport.getApplicationUser().getId().toString();
//userId="8";
UUIDsessionId=client.getSessionId();
clientCache.saveClient(userId,sessionId,client);
System.out.println("建立连接");
}

/**
*客户端断开
*
*@paramclient
*/
@OnDisconnect
publicvoidonDisconnect(SocketIOClientclient){
StringuserId=client.getHandshakeData().getSingleUrlParam("userId");
if(StringUtils.isNotBlank(userId)){
clientCache.deleteSessionClient(userId,client.getSessionId());
System.out.println("关闭连接");
}
}

//消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息
//暂未使用
@OnEvent("messageevent")
publicvoidonEvent(SocketIOClientclient,AckRequestrequest){
}
}

本地缓存信息

importcom.corundumstudio.socketio.SocketIOClient;
importorg.apache.commons.lang3.StringUtils;
importorg.springframework.stereotype.Component;

importjava.util.HashMap;
importjava.util.Map;
importjava.util.UUID;
importjava.util.concurrent.ConcurrentHashMap;

/**
*kcm
*/
@Component
publicclassClientCache{

//本地缓存
privatestaticMap<String,HashMap<UUID,SocketIOClient>>concurrentHashMap=newConcurrentHashMap<>();
/**
*存入本地缓存
*@paramuserId用户ID
*@paramsessionId页面sessionID
*@paramsocketIOClient页面对应的通道连接信息
*/
publicvoidsaveClient(StringuserId,UUIDsessionId,SocketIOClientsocketIOClient){
if(StringUtils.isNotBlank(userId)){
HashMap<UUID,SocketIOClient>sessionIdClientCache=concurrentHashMap.get(userId);
if(sessionIdClientCache==null){
sessionIdClientCache=newHashMap<>();
}
sessionIdClientCache.put(sessionId,socketIOClient);
concurrentHashMap.put(userId,sessionIdClientCache);
}
}
/**
*根据用户ID获取所有通道信息
*@paramuserId
*@return
*/
publicHashMap<UUID,SocketIOClient>getUserClient(StringuserId){
returnconcurrentHashMap.get(userId);
}
/**
*根据用户ID及页面sessionID删除页面链接信息
*@paramuserId
*@paramsessionId
*/
publicvoiddeleteSessionClient(StringuserId,UUIDsessionId){
concurrentHashMap.get(userId).remove(sessionId);
}
}

下面是存储客户端连接信息

importcom.corundumstudio.socketio.SocketIOClient;
importorg.apache.commons.lang3.StringUtils;
importorg.springframework.stereotype.Component;

importjava.util.HashMap;
importjava.util.Map;
importjava.util.UUID;
importjava.util.concurrent.ConcurrentHashMap;

/**
*kcm
*/
@Component
publicclassClientCache{

//本地缓存
privatestaticMap<String,HashMap<UUID,SocketIOClient>>concurrentHashMap=newConcurrentHashMap<>();
/**
*存入本地缓存
*@paramuserId用户ID
*@paramsessionId页面sessionID
*@paramsocketIOClient页面对应的通道连接信息
*/
publicvoidsaveClient(StringuserId,UUIDsessionId,SocketIOClientsocketIOClient){
if(StringUtils.isNotBlank(userId)){
HashMap<UUID,SocketIOClient>sessionIdClientCache=concurrentHashMap.get(userId);
if(sessionIdClientCache==null){
sessionIdClientCache=newHashMap<>();
}
sessionIdClientCache.put(sessionId,socketIOClient);
concurrentHashMap.put(userId,sessionIdClientCache);
}
}
/**
*根据用户ID获取所有通道信息
*@paramuserId
*@return
*/
publicHashMap<UUID,SocketIOClient>getUserClient(StringuserId){
returnconcurrentHashMap.get(userId);
}
/**
*根据用户ID及页面sessionID删除页面链接信息
*@paramuserId
*@paramsessionId
*/
publicvoiddeleteSessionClient(StringuserId,UUIDsessionId){
concurrentHashMap.get(userId).remove(sessionId);
}
}

控制层推送方法

@RestController
@RequestMapping("/push")
publicclassPushController{
@Resource
privateClientCacheclientCache;

@Autowired
privateJwtSupportjwtSupport;

@GetMapping("/message")
publicStringpushTuUser(@Param("id")Stringid){
IntegeruserId=jwtSupport.getApplicationUser().getId();
HashMap<UUID,SocketIOClient>userClient=clientCache.getUserClient(String.valueOf(userId));
userClient.forEach((uuid,socketIOClient)->{
//向客户端推送消息
socketIOClient.sendEvent("chatevent","服务端推送消息");
});
return"success";
}
}

以上是“SpringBoot+netty-socketio如何实现服务器端消息推送”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注恰卡编程网行业资讯频道!

发布于 2021-03-17 20:53:44
收藏
分享
海报
0 条评论
165
上一篇:Pyqt5怎么实现多线程文件搜索 下一篇:Oracle手动建库安装部署的方法
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码