使用Redisson订阅数问题怎么解决
使用Redisson订阅数问题怎么解决
本文小编为大家详细介绍“使用Redisson订阅数问题怎么解决”,内容详细,步骤清晰,细节处理妥当,希望这篇“使用Redisson订阅数问题怎么解决”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
一、前提
最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize
的大小不够,需要提高配置才能解决。
二、源码分析
下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:
1、RedissonLock#lock() 方法
privatevoidlock(longleaseTime,TimeUnitunit,booleaninterruptibly)throwsInterruptedException{longthreadId=Thread.currentThread().getId();//尝试获取,如果ttl==null,则表示获取锁成功Longttl=tryAcquire(leaseTime,unit,threadId);//lockacquiredif(ttl==null){return;}//订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题RFuture<RedissonLockEntry>future=subscribe(threadId);if(interruptibly){commandExecutor.syncSubscriptionInterrupted(future);}else{commandExecutor.syncSubscription(future);}//后面代码忽略try{//无限循环获取锁,直到获取锁成功//...}finally{//取消订阅锁释放事件unsubscribe(future,threadId);}}
总结下主要逻辑:
获取当前线程的线程id;
tryAquire尝试获取锁,并返回ttl
如果ttl为空,则结束流程;否则进入后续逻辑;
this.subscribe(threadId)订阅当前线程,返回一个RFuture;
如果在指定时间没有监听到,则会产生如上异常。
订阅成功后, 通过while(true)循环,一直尝试获取锁
fially代码块,会解除订阅
所以上述这情况问题应该出现在subscribe()方法中
2、详细看下subscribe()方法
protectedRFuture<RedissonLockEntry>subscribe(longthreadId){//entryName格式:“id:name”;//channelName格式:“redisson_lock__channel:name”;returnpubSub.subscribe(getEntryName(),getChannelName());}
RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:
publicRedissonLock(CommandAsyncExecutorcommandExecutor,Stringname){//....this.pubSub=commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}
而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的
publicMasterSlaveConnectionManager(MasterSlaveServersConfigcfg,Configconfig,UUIDid){this(config,id);this.config=cfg;//初始化initTimer(cfg);initSingleEntry();}protectedvoidinitTimer(MasterSlaveServersConfigconfig){int[]timeouts=newint[]{config.getRetryInterval(),config.getTimeout()};Arrays.sort(timeouts);intminTimeout=timeouts[0];if(minTimeout%100!=0){minTimeout=(minTimeout%100)/2;}elseif(minTimeout==100){minTimeout=50;}else{minTimeout=100;}timer=newHashedWheelTimer(newDefaultThreadFactory("redisson-timer"),minTimeout,TimeUnit.MILLISECONDS,1024,false);connectionWatcher=newIdleConnectionWatcher(this,config);//初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:subscribeService=newPublishSubscribeService(this,config);}
PublishSubscribeService构造函数
privatefinalSemaphorePubSubsemaphorePubSub=newSemaphorePubSub(this);publicPublishSubscribeService(ConnectionManagerconnectionManager,MasterSlaveServersConfigconfig){super();this.connectionManager=connectionManager;this.config=config;for(inti=0;i<locks.length;i++){//这里初始化了一组信号量,每个信号量的初始值为1locks[i]=newAsyncSemaphore(1);}}
3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面
privatefinalConcurrentMap<String,E>entries=newConcurrentHashMap<>();publicRFuture<E>subscribe(StringentryName,StringchannelName){//从PublishSubscribeService获取对应的信号量。相同的channelName获取的是同一个信号量//publicAsyncSemaphoregetSemaphore(ChannelNamechannelName){//returnlocks[Math.abs(channelName.hashCode()%locks.length)];//}AsyncSemaphoresemaphore=service.getSemaphore(newChannelName(channelName));AtomicReference<Runnable>listenerHolder=newAtomicReference<Runnable>();RPromise<E>newPromise=newRedissonPromise<E>(){@Overridepublicbooleancancel(booleanmayInterruptIfRunning){returnsemaphore.remove(listenerHolder.get());}};Runnablelistener=newRunnable(){@Overridepublicvoidrun(){//如果存在RedissonLockEntry,则直接利用已有的监听Eentry=entries.get(entryName);if(entry!=null){entry.acquire();semaphore.release();entry.getPromise().onComplete(newTransferListener<E>(newPromise));return;}Evalue=createEntry(newPromise);value.acquire();EoldValue=entries.putIfAbsent(entryName,value);if(oldValue!=null){oldValue.acquire();semaphore.release();oldValue.getPromise().onComplete(newTransferListener<E>(newPromise));return;}//创建监听,RedisPubSubListener<Object>listener=createListener(channelName,value);//订阅监听service.subscribe(LongCodec.INSTANCE,channelName,semaphore,listener);}};//最终会执行listener.run方法semaphore.acquire(listener);listenerHolder.set(listener);returnnewPromise;}
AsyncSemaphore#acquire()方法
publicvoidacquire(Runnablelistener){acquire(listener,1);}publicvoidacquire(Runnablelistener,intpermits){booleanrun=false;synchronized(this){//counter初始化值为1if(counter<permits){//如果不是第一次执行,则将listener加入到listeners集合中listeners.add(newEntry(listener,permits));return;}else{counter-=permits;run=true;}}//第一次执行acquire,才会执行listener.run()方法if(run){listener.run();}}
梳理上述逻辑:
1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。
从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法
4、PublishSubscribeService#subscribe逻辑如下:
privatefinalConcurrentMap<ChannelName,PubSubConnectionEntry>name2PubSubConnection=newConcurrentHashMap<>();privatefinalQueue<PubSubConnectionEntry>freePubSubConnections=newConcurrentLinkedQueue<>();publicRFuture<PubSubConnectionEntry>subscribe(Codeccodec,StringchannelName,AsyncSemaphoresemaphore,RedisPubSubListener<?>...listeners){RPromise<PubSubConnectionEntry>promise=newRedissonPromise<PubSubConnectionEntry>();//主要逻辑入口,这里要主要channelName每次都是新对象,但内部覆写hashCode+equals。subscribe(codec,newChannelName(channelName),promise,PubSubType.SUBSCRIBE,semaphore,listeners);returnpromise;}privatevoidsubscribe(Codeccodec,ChannelNamechannelName,RPromise<PubSubConnectionEntry>promise,PubSubTypetype,AsyncSemaphorelock,RedisPubSubListener<?>...listeners){PubSubConnectionEntryconnEntry=name2PubSubConnection.get(channelName);if(connEntry!=null){//从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中addListeners(channelName,promise,type,lock,connEntry,listeners);return;}//没有时,才是最重要的逻辑freePubSubLock.acquire(newRunnable(){@Overridepublicvoidrun(){if(promise.isDone()){lock.release();freePubSubLock.release();return;}//从队列中取头部元素PubSubConnectionEntryfreeEntry=freePubSubConnections.peek();if(freeEntry==null){//第一次肯定是没有的需要建立connect(codec,channelName,promise,type,lock,listeners);return;}//如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。intremainFreeAmount=freeEntry.tryAcquire();if(remainFreeAmount==-1){thrownewIllegalStateException();}PubSubConnectionEntryoldEntry=name2PubSubConnection.putIfAbsent(channelName,freeEntry);if(oldEntry!=null){freeEntry.release();freePubSubLock.release();addListeners(channelName,promise,type,lock,oldEntry,listeners);return;}//如果remainFreeAmount=0,则从队列中移除if(remainFreeAmount==0){freePubSubConnections.poll();}freePubSubLock.release();//增加监听RFuture<Void>subscribeFuture=addListeners(channelName,promise,type,lock,freeEntry,listeners);ChannelFuturefuture;if(PubSubType.PSUBSCRIBE==type){future=freeEntry.psubscribe(codec,channelName);}else{future=freeEntry.subscribe(codec,channelName);}future.addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{if(!future.isSuccess()){if(!promise.isDone()){subscribeFuture.cancel(false);}return;}connectionManager.newTimeout(newTimerTask(){@Overridepublicvoidrun(Timeouttimeout)throwsException{subscribeFuture.cancel(false);}},config.getTimeout(),TimeUnit.MILLISECONDS);}});}});}privatevoidconnect(Codeccodec,ChannelNamechannelName,RPromise<PubSubConnectionEntry>promise,PubSubTypetype,AsyncSemaphorelock,RedisPubSubListener<?>...listeners){//根据channelName计算出slot获取PubSubConnectionintslot=connectionManager.calcSlot(channelName.getName());RFuture<RedisPubSubConnection>connFuture=nextPubSubConnection(slot);promise.onComplete((res,e)->{if(e!=null){((RPromise<RedisPubSubConnection>)connFuture).tryFailure(e);}});connFuture.onComplete((conn,e)->{if(e!=null){freePubSubLock.release();lock.release();promise.tryFailure(e);return;}//这里会从配置中读取subscriptionsPerConnectionPubSubConnectionEntryentry=newPubSubConnectionEntry(conn,config.getSubscriptionsPerConnection());//每获取一次,subscriptionsPerConnection就会减直到为0intremainFreeAmount=entry.tryAcquire();//如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中PubSubConnectionEntryoldEntry=name2PubSubConnection.putIfAbsent(channelName,entry);if(oldEntry!=null){releaseSubscribeConnection(slot,entry);freePubSubLock.release();addListeners(channelName,promise,type,lock,oldEntry,listeners);return;}if(remainFreeAmount>0){//加入到队列中freePubSubConnections.add(entry);}freePubSubLock.release();RFuture<Void>subscribeFuture=addListeners(channelName,promise,type,lock,entry,listeners);//这里真正的进行订阅(底层与redis交互)ChannelFuturefuture;if(PubSubType.PSUBSCRIBE==type){future=entry.psubscribe(codec,channelName);}else{future=entry.subscribe(codec,channelName);}future.addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{if(!future.isSuccess()){if(!promise.isDone()){subscribeFuture.cancel(false);}return;}connectionManager.newTimeout(newTimerTask(){@Overridepublicvoidrun(Timeouttimeout)throwsException{subscribeFuture.cancel(false);}},config.getTimeout(),TimeUnit.MILLISECONDS);}});});}
PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:
publicinttryAcquire(){while(true){intvalue=subscribedChannelsAmount.get();if(value==0){return-1;}if(subscribedChannelsAmount.compareAndSet(value,value-1)){returnvalue-1;}}}
梳理上述逻辑:
1、还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法
2.1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是进行底层的subscribe和addListener
3、如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
4、如果remainFreeAmount < 0 会抛出IllegalStateException异常;如果remainFreeAmount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
5、最后也是进行底层的subscribe和addListener;
读到这里,这篇“使用Redisson订阅数问题怎么解决”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注恰卡编程网行业资讯频道。