SpringBoot+netty-socketio如何实现服务器端消息推送
这篇文章主要介绍SpringBoot+netty-socketio如何实现服务器端消息推送,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
首先:因为工作需要,需要对接socket.io框架对接,所以目前只能使用netty-socketio。websocket是不支持对接socket.io框架的。
netty-socketio顾名思义他是一个底层基于netty'实现的socket。
在springboot项目中的集成,请看下面的代码
maven依赖
<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.11</version> </dependency>
下面就是代码了
首先是配置参数
#socketio配置 socketio: host:localhost port:9099 #设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器 maxFramePayloadLength:1048576 #设置http交互最大内容长度 maxHttpContentLength:1048576 #socket连接数大小(如只监听一个端口boss线程组为1即可) bossCount:1 workCount:100 allowCustomRequests:true #协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间 upgradeTimeout:1000000 #Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件 pingTimeout:6000000 #Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔 pingInterval:25000
上面的注释写的很清楚。下面是config代码
importcom.corundumstudio.socketio.Configuration; importcom.corundumstudio.socketio.SocketConfig; importcom.corundumstudio.socketio.SocketIOServer; importorg.springframework.beans.factory.InitializingBean; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.beans.factory.annotation.Value; importorg.springframework.stereotype.Component; importjavax.annotation.Resource; /** *kcm */ @Component publicclassPushServerimplementsInitializingBean{ @Autowired privateEventListennereventListenner; @Value("${socketio.port}") privateintserverPort; @Value("${socketio.host}") privateStringserverHost; @Value("${socketio.bossCount}") privateintbossCount; @Value("${socketio.workCount}") privateintworkCount; @Value("${socketio.allowCustomRequests}") privatebooleanallowCustomRequests; @Value("${socketio.upgradeTimeout}") privateintupgradeTimeout; @Value("${socketio.pingTimeout}") privateintpingTimeout; @Value("${socketio.pingInterval}") privateintpingInterval; @Override publicvoidafterPropertiesSet()throwsException{ Configurationconfig=newConfiguration(); config.setPort(serverPort); config.setHostname(serverHost); config.setBossThreads(bossCount); config.setWorkerThreads(workCount); config.setAllowCustomRequests(allowCustomRequests); config.setUpgradeTimeout(upgradeTimeout); config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); SocketConfigsocketConfig=newSocketConfig(); socketConfig.setReuseAddress(true); socketConfig.setTcpNoDelay(true); socketConfig.setSoLinger(0); config.setSocketConfig(socketConfig); SocketIOServerserver=newSocketIOServer(config); server.addListeners(eventListenner); server.start(); System.out.println("启动正常"); } }
在就是监听代码
importcom.corundumstudio.socketio.AckRequest; importcom.corundumstudio.socketio.SocketIOClient; importcom.corundumstudio.socketio.annotation.OnConnect; importcom.corundumstudio.socketio.annotation.OnDisconnect; importcom.corundumstudio.socketio.annotation.OnEvent; importorg.apache.commons.lang3.StringUtils; importorg.bangying.auth.JwtSupport; importorg.springframework.stereotype.Component; importjavax.annotation.Resource; importjava.util.UUID; @Component publicclassEventListenner{ @Resource privateClientCacheclientCache; @Resource privateJwtSupportjwtSupport; /** *客户端连接 * *@paramclient */ @OnConnect publicvoidonConnect(SocketIOClientclient){ StringuserId=client.getHandshakeData().getSingleUrlParam("userId"); //userId=jwtSupport.getApplicationUser().getId().toString(); //userId="8"; UUIDsessionId=client.getSessionId(); clientCache.saveClient(userId,sessionId,client); System.out.println("建立连接"); } /** *客户端断开 * *@paramclient */ @OnDisconnect publicvoidonDisconnect(SocketIOClientclient){ StringuserId=client.getHandshakeData().getSingleUrlParam("userId"); if(StringUtils.isNotBlank(userId)){ clientCache.deleteSessionClient(userId,client.getSessionId()); System.out.println("关闭连接"); } } //消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息 //暂未使用 @OnEvent("messageevent") publicvoidonEvent(SocketIOClientclient,AckRequestrequest){ } }
本地缓存信息
importcom.corundumstudio.socketio.SocketIOClient; importorg.apache.commons.lang3.StringUtils; importorg.springframework.stereotype.Component; importjava.util.HashMap; importjava.util.Map; importjava.util.UUID; importjava.util.concurrent.ConcurrentHashMap; /** *kcm */ @Component publicclassClientCache{ //本地缓存 privatestaticMap<String,HashMap<UUID,SocketIOClient>>concurrentHashMap=newConcurrentHashMap<>(); /** *存入本地缓存 *@paramuserId用户ID *@paramsessionId页面sessionID *@paramsocketIOClient页面对应的通道连接信息 */ publicvoidsaveClient(StringuserId,UUIDsessionId,SocketIOClientsocketIOClient){ if(StringUtils.isNotBlank(userId)){ HashMap<UUID,SocketIOClient>sessionIdClientCache=concurrentHashMap.get(userId); if(sessionIdClientCache==null){ sessionIdClientCache=newHashMap<>(); } sessionIdClientCache.put(sessionId,socketIOClient); concurrentHashMap.put(userId,sessionIdClientCache); } } /** *根据用户ID获取所有通道信息 *@paramuserId *@return */ publicHashMap<UUID,SocketIOClient>getUserClient(StringuserId){ returnconcurrentHashMap.get(userId); } /** *根据用户ID及页面sessionID删除页面链接信息 *@paramuserId *@paramsessionId */ publicvoiddeleteSessionClient(StringuserId,UUIDsessionId){ concurrentHashMap.get(userId).remove(sessionId); } }
下面是存储客户端连接信息
importcom.corundumstudio.socketio.SocketIOClient; importorg.apache.commons.lang3.StringUtils; importorg.springframework.stereotype.Component; importjava.util.HashMap; importjava.util.Map; importjava.util.UUID; importjava.util.concurrent.ConcurrentHashMap; /** *kcm */ @Component publicclassClientCache{ //本地缓存 privatestaticMap<String,HashMap<UUID,SocketIOClient>>concurrentHashMap=newConcurrentHashMap<>(); /** *存入本地缓存 *@paramuserId用户ID *@paramsessionId页面sessionID *@paramsocketIOClient页面对应的通道连接信息 */ publicvoidsaveClient(StringuserId,UUIDsessionId,SocketIOClientsocketIOClient){ if(StringUtils.isNotBlank(userId)){ HashMap<UUID,SocketIOClient>sessionIdClientCache=concurrentHashMap.get(userId); if(sessionIdClientCache==null){ sessionIdClientCache=newHashMap<>(); } sessionIdClientCache.put(sessionId,socketIOClient); concurrentHashMap.put(userId,sessionIdClientCache); } } /** *根据用户ID获取所有通道信息 *@paramuserId *@return */ publicHashMap<UUID,SocketIOClient>getUserClient(StringuserId){ returnconcurrentHashMap.get(userId); } /** *根据用户ID及页面sessionID删除页面链接信息 *@paramuserId *@paramsessionId */ publicvoiddeleteSessionClient(StringuserId,UUIDsessionId){ concurrentHashMap.get(userId).remove(sessionId); } }
控制层推送方法
@RestController @RequestMapping("/push") publicclassPushController{ @Resource privateClientCacheclientCache; @Autowired privateJwtSupportjwtSupport; @GetMapping("/message") publicStringpushTuUser(@Param("id")Stringid){ IntegeruserId=jwtSupport.getApplicationUser().getId(); HashMap<UUID,SocketIOClient>userClient=clientCache.getUserClient(String.valueOf(userId)); userClient.forEach((uuid,socketIOClient)->{ //向客户端推送消息 socketIOClient.sendEvent("chatevent","服务端推送消息"); }); return"success"; } }
以上是“SpringBoot+netty-socketio如何实现服务器端消息推送”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注恰卡编程网行业资讯频道!
推荐阅读
-
springboot实现基于aop的切面日志
本文实例为大家分享了springboot实现基于aop的切面日志的具体代码,供大家参考,具体内容如下通过aop的切面方式实现日志...
-
SpringBoot定时任务功能怎么实现
-
SpringBoot中的@Import注解怎么使用
-
SpringBoot整合Lombok及常见问题怎么解决
-
springboot图片验证码功能模块怎么实现
-
Springboot+SpringSecurity怎么实现图片验证码登录
-
SpringBoot注解的知识点有哪些
SpringBoot注解的知识点有哪些这篇“SpringBoot注...
-
SpringBoot2.x中management.security.enabled=false无效怎么解决
-
springboot怎么禁用某项健康检查
springboot怎么禁用某项健康检查今天小编给大家分享一下sp...
-
SpringBoot2怎么自定义端点