基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的

基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的

本篇文章给大家分享的是有关基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

####事情起因,用netty实现了websocket,在链接创建成功后发送一个消息给客户端,我们选择在channelActive中发送消息。 可想而知肯定是不行的了 代码如下

EventLoopGroupbossGroup=newNioEventLoopGroup();EventLoopGroupgroup=newNioEventLoopGroup();try{ServerBootstrapsb=newServerBootstrap();sb.option(ChannelOption.SO_BACKLOG,1024);//绑定线程池sb.group(group,bossGroup)//指定使用的channel.channel(NioServerSocketChannel.class)//绑定监听端口.localAddress(this.port).option(ChannelOption.SO_KEEPALIVE,true)//绑定客户端连接时候触发操作.childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannelch)throwsException{log.info("收到新连接");ch.pipeline().addLast(newLoggingHandler("DEBUG"));ch.pipeline().addLast(newIdleStateHandler(60,0,0));//websocket协议本身是基于http协议的,所以这边也要使用http解编码器ch.pipeline().addLast(newHttpServerCodec());//以块的方式来写的处理器ch.pipeline().addLast(newChunkedWriteHandler());ch.pipeline().addLast(newHttpObjectAggregator(8192));ch.pipeline().addLast(newWebSocketServerProtocolHandler("/socket",null,true,65536*10));ch.pipeline().addLast(newMyWebSocketHandler());}});//服务器异步创建绑定ChannelFuturecf=sb.bind().sync();log.info(NettyServer.class+"启动正在监听:"+cf.channel().localAddress());//关闭服务器通道cf.channel().closeFuture().sync();

#####排查1:因为没有任何异常,客户端没有收到消息,故先采用wireshark抓包,发现网卡上没有对应的想发送的消息。 为什么网卡没有对应的包呢,经过debug发现如果发送的数据类型是WebSocketFrame在最终发送时候异常了具体代码在HeadContext的write方法中,headcontext是netty的channelpipeline的头部,最终写出时都会从pipeline的尾部链接到头部来执行(pipeline为双向链表) 为什么在channelread中能写信息而在channelActive无法写信息呢,经过分析发现,channelActive的触发是在socketchannel第一次注册的时候发生的具体代码如下:abstrcatchannel中

privatevoidregister0(ChannelPromisepromise){try{//checkifthechannelisstillopenasitcouldbeclosedinthemeantimewhentheregister//callwasoutsideoftheeventLoopif(!promise.setUncancellable()||!ensureOpen(promise)){return;}booleanfirstRegistration=neverRegistered;doRegister();neverRegistered=false;registered=true;//EnsurewecallhandlerAdded(...)beforeweactuallynotifythepromise.Thisisneededasthe//usermayalreadyfireeventsthroughthepipelineintheChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();//OnlyfireachannelActiveifthechannelhasneverbeenregistered.Thispreventsfiring//multiplechannelactivesifthechannelisderegisteredandre-registered.if(isActive()){if(firstRegistration){pipeline.fireChannelActive();}elseif(config().isAutoRead()){//ThischannelwasregisteredbeforeandautoRead()isset.Thismeansweneedtobeginread//againsothatweprocessinbounddata.////Seehttps://github.com/netty/netty/issues/4805beginRead();}}}catch(Throwablet){//ClosethechanneldirectlytoavoidFDleak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise,t);}}

而抓包时发现触发此channelactive时,服务端还未返回websocket的协议握手包(websocket协议是在在http协议上衍生的,会先发一个http get请求然后服务端返回一个为websocket协议的包给客户端)至此问题就真相大白了,在我们添加的WebSocketServerProtocolHandler这个handller中有如下代码

publicvoidhandlerAdded(ChannelHandlerContextctx){ChannelPipelinecp=ctx.pipeline();if(cp.get(WebSocketServerProtocolHandshakeHandler.class)==null){//AddtheWebSocketHandshakeHandlerbeforethisone.ctx.pipeline().addBefore(ctx.name(),WebSocketServerProtocolHandshakeHandler.class.getName(),newWebSocketServerProtocolHandshakeHandler(websocketPath,subprotocols,allowExtensions,maxFramePayloadLength,allowMaskMismatch,checkStartsWith));}if(cp.get(Utf8FrameValidator.class)==null){//AddtheUFT8checkingbeforethisone.ctx.pipeline().addBefore(ctx.name(),Utf8FrameValidator.class.getName(),newUtf8FrameValidator());}}

添加的WebSocketServerProtocolHandshakeHandler中有如下代码

publicvoidchannelRead(finalChannelHandlerContextctx,Objectmsg)throwsException{finalFullHttpRequestreq=(FullHttpRequest)msg;if(isNotWebSocketPath(req)){ctx.fireChannelRead(msg);return;}try{if(!GET.equals(req.method())){sendHttpResponse(ctx,req,newDefaultFullHttpResponse(HTTP_1_1,FORBIDDEN));return;}finalWebSocketServerHandshakerFactorywsFactory=newWebSocketServerHandshakerFactory(getWebSocketLocation(ctx.pipeline(),req,websocketPath),subprotocols,allowExtensions,maxFramePayloadSize,allowMaskMismatch);finalWebSocketServerHandshakerhandshaker=wsFactory.newHandshaker(req);if(handshaker==null){WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());}else{finalChannelFuturehandshakeFuture=handshaker.handshake(ctx.channel(),req);handshakeFuture.addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{if(!future.isSuccess()){ctx.fireExceptionCaught(future.cause());}else{//Keptforcompatibilityctx.fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);ctx.fireUserEventTriggered(newWebSocketServerProtocolHandler.HandshakeComplete(req.uri(),req.headers(),handshaker.selectedSubprotocol()));}}});WebSocketServerProtocolHandler.setHandshaker(ctx.channel(),handshaker);ctx.pipeline().replace(this,"WS403Responder",WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());}}finally{req.release();}}

接受握手信息时候会添加两个handler 为websocket协议信息的编码和解码handler

publicfinalChannelFuturehandshake(Channelchannel,FullHttpRequestreq,HttpHeadersresponseHeaders,finalChannelPromisepromise){if(logger.isDebugEnabled()){logger.debug("{}WebSocketversion{}serverhandshake",channel,version());}FullHttpResponseresponse=newHandshakeResponse(req,responseHeaders);ChannelPipelinep=channel.pipeline();if(p.get(HttpObjectAggregator.class)!=null){p.remove(HttpObjectAggregator.class);}if(p.get(HttpContentCompressor.class)!=null){p.remove(HttpContentCompressor.class);}ChannelHandlerContextctx=p.context(HttpRequestDecoder.class);finalStringencoderName;if(ctx==null){//thismeanstheuseruseaHttpServerCodecctx=p.context(HttpServerCodec.class);if(ctx==null){promise.setFailure(newIllegalStateException("NoHttpDecoderandnoHttpServerCodecinthepipeline"));returnpromise;}p.addBefore(ctx.name(),"wsdecoder",newWebsocketDecoder());p.addBefore(ctx.name(),"wsencoder",newWebSocketEncoder());encoderName=ctx.name();}else{p.replace(ctx.name(),"wsdecoder",newWebsocketDecoder());encoderName=p.context(HttpResponseEncoder.class).name();p.addBefore(encoderName,"wsencoder",newWebSocketEncoder());}channel.writeAndFlush(response).addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{if(future.isSuccess()){ChannelPipelinep=future.channel().pipeline();p.remove(encoderName);promise.setSuccess();}else{promise.setFailure(future.cause());}}});returnpromise;}

如果想要实现在websocket协议连接成功后发送一个消息给客户端,我们发现在发送握手成功后触发了fireUserEventTriggered,去实现userEventTriggered然后判断evt类型做处理吧

以上就是基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注恰卡编程网行业资讯频道。

发布于 2021-12-23 21:15:18
收藏
分享
海报
0 条评论
53
上一篇:如何分析KEGG Brite数据库 下一篇:如何分析KEGG Enzyme 数据库
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码