基于Java NIO的即时聊天服务器模型怎么实现
基于Java NIO的即时聊天服务器模型怎么实现
这篇文章主要讲解了“基于Java NIO的即时聊天服务器模型怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“基于Java NIO的即时聊天服务器模型怎么实现”吧!
废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:
JsonParser
Json的解析类,随便封装了下,使用的最近比较火的fastjson
publicclassJsonParser{privatestaticJSONObjectmJson;publicsynchronizedstaticStringget(Stringjson,Stringkey){mJson=JSON.parseObject(json);returnmJson.getString(key);}}
Main
入口,不解释
publicclassMain{publicstaticvoidmain(String...args){newSeekServer().start();}}
Log
publicclassLog{publicstaticvoidi(Objectobj){System.out.println(obj);}publicstaticvoide(Objecte){System.err.println(e);}}
SeekServer:
服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的。
publicclassSeekServerextendsThread{privatefinalintACCPET_PORT=55555;privatefinalintTIME_OUT=1000;privateSelectormSelector=null;privateServerSocketChannelmSocketChannel=null;privateServerSocketmServerSocket=null;privateInetSocketAddressmAddress=null;publicSeekServer(){longsign=System.currentTimeMillis();try{mSocketChannel=ServerSocketChannel.open();if(mSocketChannel==null){System.out.println("can'topenserversocketchannel");}mServerSocket=mSocketChannel.socket();mAddress=newInetSocketAddress(ACCPET_PORT);mServerSocket.bind(mAddress);Log.i("serverbindportis"+ACCPET_PORT);mSelector=Selector.open();mSocketChannel.configureBlocking(false);SelectionKeykey=mSocketChannel.register(mSelector,SelectionKey.OP_ACCEPT);key.attach(newAcceptor());//检测Session状态Looper.getInstance().loop();//开始处理SessionSessionProcessor.start();Log.i("Seekserverstartupin"+(System.currentTimeMillis()-sign)+"ms!");}catch(ClosedChannelExceptione){Log.e(e.getMessage());}catch(IOExceptione){Log.e(e.getMessage());}}publicvoidrun(){Log.i("serverislistening...");while(!Thread.interrupted()){try{if(mSelector.select(TIME_OUT)>0){Set<SelectionKey>keys=mSelector.selectedKeys();Iterator<SelectionKey>iterator=keys.iterator();SelectionKeykey=null;while(iterator.hasNext()){key=iterator.next();Handlerat=(Handler)key.attachment();if(at!=null){at.exec();}iterator.remove();}}}catch(IOExceptione){Log.e(e.getMessage());}}}classAcceptorextendsHandler{publicvoidexec(){try{SocketChannelsc=mSocketChannel.accept();newSession(sc,mSelector);}catch(ClosedChannelExceptione){Log.e(e);}catch(IOExceptione){Log.e(e);}}}}
Handler:
只有一个抽象方法exec,Session将会继承它。
publicabstractclassHandler{publicabstractvoidexec();}
Session:
封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。
publicclassSessionextendsHandler{privateSocketChannelmChannel;privateSelectionKeymKey;privateByteBuffermRreceiveBuffer=ByteBuffer.allocate(10240);privateCharsetcharset=Charset.forName("UTF-8");privateCharsetDecodermDecoder=charset.newDecoder();privateCharsetEncodermEncoder=charset.newEncoder();privatelonglastPant;//最后活动时间privatefinalintTIME_OUT=1000*60*5;//Session超时时间privateStringkey;privateStringsendData="";privateStringreceiveData=null;publicstaticfinalintREADING=0,SENDING=1;intmState=READING;publicSession(SocketChannelsocket,Selectorselector)throwsIOException{this.mChannel=socket;mChannel=socket;mChannel.configureBlocking(false);mKey=mChannel.register(selector,0);mKey.attach(this);mKey.interestOps(SelectionKey.OP_READ);selector.wakeup();lastPant=Calendar.getInstance().getTimeInMillis();}publicStringgetReceiveData(){returnreceiveData;}publicvoidclear(){receiveData=null;}publicvoidsetSendData(StringsendData){mState=SENDING;mKey.interestOps(SelectionKey.OP_WRITE);this.sendData=sendData+"\n";}publicbooleanisKeekAlive(){returnlastPant+TIME_OUT>Calendar.getInstance().getTimeInMillis();}publicvoidsetAlive(){lastPant=Calendar.getInstance().getTimeInMillis();}/***注销当前Session*/publicvoiddistroy(){try{mChannel.close();mKey.cancel();}catch(IOExceptione){}}@Overridepublicsynchronizedvoidexec(){try{if(mState==READING){read();}elseif(mState==SENDING){write();}}catch(IOExceptione){SessionManager.remove(key);try{mChannel.close();}catch(IOExceptione1){Log.e(e1);}mKey.cancel();}}publicvoidread()throwsIOException{mRreceiveBuffer.clear();intsign=mChannel.read(mRreceiveBuffer);if(sign==-1){//客户端连接关闭mChannel.close();mKey.cancel();}if(sign>0){mRreceiveBuffer.flip();receiveData=mDecoder.decode(mRreceiveBuffer).toString();setAlive();setSign();SessionManager.addSession(key,this);}}privatevoidsetSign(){//设置当前Session的Keykey=JsonParser.get(receiveData,"imei");//检测消息类型是否为心跳包//Stringtype=jo.getString("type");//if(type.equals("HEART_BEAT")){//setAlive();//}}/***写消息*/publicvoidwrite(){try{mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));sendData=null;mState=READING;mKey.interestOps(SelectionKey.OP_READ);}catch(CharacterCodingExceptione){e.printStackTrace();}catch(IOExceptione){try{mChannel.close();}catch(IOExceptione1){Log.e(e1);}}}}
SessionManager:
将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,
封装了一些操作Session的方法例如get,remove等。
publicclassSessionManager{privatestaticConcurrentHashMap<String,Session>sessions=newConcurrentHashMap<String,Session>();publicstaticvoidaddSession(Stringkey,Sessionsession){sessions.put(key,session);}publicstaticSessiongetSession(Stringkey){returnsessions.get(key);}publicstaticSet<String>getSessionKeys(){returnsessions.keySet();}publicstaticintgetSessionCount(){returnsessions.size();}publicstaticvoidremove(String[]keys){for(Stringkey:keys){if(sessions.containsKey(key)){sessions.get(key).distroy();sessions.remove(key);}}}publicstaticvoidremove(Stringkey){if(sessions.containsKey(key)){sessions.get(key).distroy();sessions.remove(key);}}}
SessionProcessor
里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response)。
publicclassSessionProcessorimplementsRunnable{privatestaticRunnableprocessor=newSessionProcessor();privatestaticThreadPoolExecutorpool=newThreadPoolExecutor(10,200,500,TimeUnit.MILLISECONDS,newArrayBlockingQueue<Runnable>(10),newThreadPoolExecutor.CallerRunsPolicy());publicstaticvoidstart(){newThread(processor).start();}@Overridepublicvoidrun(){while(true){Sessiontmp=null;for(Stringkey:SessionManager.getSessionKeys()){tmp=SessionManager.getSession(key);//处理Session未处理的请求if(tmp.getReceiveData()!=null){pool.execute(newProcess(tmp));}}try{Thread.sleep(10);}catch(InterruptedExceptione){Log.e(e);}}}classProcessimplementsRunnable{privateSocketRequestrequest;privateSocketResponseresponse;publicProcess(Sessionsession){//将Session封装成Request和Responserequest=newSocketRequest(session);response=newSocketResponse(session);}@Overridepublicvoidrun(){newRequestTransform().transfer(request,response);}}}
RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)
publicclassRequestTransform{publicvoidtransfer(SocketRequestrequest,SocketResponseresponse){Stringaction=request.getValue("action");StringhandlerName=request.getValue("handler");//根据Session的请求类型,让不同的类方法去处理try{Class<?>c=Class.forName("com.seek.server.handler."+handlerName);Class<?>[]arg=newClass[]{SocketRequest.class,SocketResponse.class};Methodmethod=c.getMethod(action,arg);method.invoke(c.newInstance(),newObject[]{request,response});}catch(Exceptione){e.printStackTrace();}}}
SocketRequest和SocketResponse
publicclassSocketRequest{privateSessionmSession;privateStringmReceive;publicSocketRequest(Sessionsession){mSession=session;mReceive=session.getReceiveData();mSession.clear();}publicStringgetValue(Stringkey){returnJsonParser.get(mReceive,key);}publicStringgetQueryString(){returnmReceive;}}
publicclassSocketResponse{privateSessionmSession;publicSocketResponse(Sessionsession){mSession=session;}publicvoidwrite(Stringmsg){mSession.setSendData(msg);}}
最后则是两个处理请求的Handler
publicclassUserHandler{publicvoidlogin(SocketRequestrequest,SocketResponseresponse){System.out.println(request.getQueryString());//TODO:处理用户登录response.write("你肯定收到消息了");}}
publicclassMessageHandler{publicvoidsend(SocketRequestrequest,SocketResponseresponse){System.out.println(request.getQueryString());//消息发送Stringkey=request.getValue("imei");Sessionsession=SessionManager.getSession(key);newSocketResponse(session).write(request.getValue("sms"));}}
还有个监测是否超时的类Looper,定期去删除Session
publicclassLooperextendsThread{privatestaticLooperlooper=newLooper();privatestaticbooleanisStart=false;privatefinalintINTERVAL=1000*60*5;privateLooper(){}publicstaticLoopergetInstance(){returnlooper;}publicvoidloop(){if(!isStart){isStart=true;this.start();}}publicvoidrun(){Tasktask=newTask();while(true){//Session过期检测task.checkState();//心跳包检测//task.sendAck();try{Thread.sleep(INTERVAL);}catch(InterruptedExceptione){Log.e(e);}}}}
publicclassTask{publicvoidcheckState(){Set<String>keys=SessionManager.getSessionKeys();if(keys.size()==0){return;}List<String>removes=newArrayList<String>();Iterator<String>iterator=keys.iterator();Stringkey=null;while(iterator.hasNext()){key=iterator.next();if(!SessionManager.getSession(key).isKeekAlive()){removes.add(key);}}if(removes.size()>0){Log.i("sessionsistimeout,remove"+removes.size()+"session");}SessionManager.remove(removes.toArray(newString[removes.size()]));}publicvoidsendAck(){Set<String>keys=SessionManager.getSessionKeys();if(keys.size()==0){return;}Iterator<String>iterator=keys.iterator();while(iterator.hasNext()){iterator.next();//TODO发送心跳包}}}
注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -!
客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了。
感谢各位的阅读,以上就是“基于Java NIO的即时聊天服务器模型怎么实现”的内容了,经过本文的学习后,相信大家对基于Java NIO的即时聊天服务器模型怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!