Springboot整合Redis如何实现超卖问题
Springboot整合Redis如何实现超卖问题
这篇文章将为大家详细讲解有关Springboot整合Redis如何实现超卖问题,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
超卖简单代码
写一段简单正常的超卖逻辑代码,多个用户同时操作同一段数据,探究出现的问题。
Redis中存储一项数据信息,请求对应接口,获取商品数量信息;
商品数量信息如果大于0,则扣减1,重新存储Redis中;
运行代码测试问题。
/***Redis数据库操作,超卖问题模拟*@author**/@RestControllerpublicclassRedisController{//引入String类型redis操作模板@AutowiredprivateStringRedisTemplatestringRedisTemplate;//测试数据设置接口@RequestMapping("/setStock")publicStringsetStock(){stringRedisTemplate.opsForValue().set("stock","100");return"ok";}//模拟商品超卖代码@RequestMapping("/deductStock")publicStringdeductStock(){//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}return"end";}}
超卖问题
单服务器单应用情况下
在单应用模式下,使用jmeter
压测。
测试结果:
每个请求相当于一个线程,当几个线程同时拿到数据时,线程A拿到库存为84,这个时候线程B也进入程序,并且抢占了CPU,访问库存为84,最后两个线程都对库存减一,导致最后修改为83,实际上多卖出去了一件
既然线程和线程之间,数据处理不一致,能否使用synchronized
加锁测试?
设置synchronized
依旧还是先测试单服务器
//模拟商品超卖代码,//设置synchronized同步锁@RequestMapping("/deductStock1")publicStringdeductStock1(){synchronized(this){//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}}return"end";}
数量100
重新压测,得到的日志信息如下所示:
在单机模式下,添加synchronized关键字,的确能够避免商品的超卖现象!
但是在分布式微服务中,针对该服务设置了集群,synchronized依旧还能保证数据的正确性吗?
假设多个请求,被注册中心负载均衡,每个微服务中的该处理接口,都添加有synchronized,
依然会出现类似的超卖
问题:
synchronized
只是针对单一服务器
的JVM
进行加锁
,但是分布式是很多个不同的服务器,导致两个线程或多个在不同服务器上共同对商品数量信息做了操作!
Redis实现分布式锁
在Redis中存在一条命令setnx (set if not exists)
setnx key value
如果不存在key,则可以设置成功;否则设置失败。
修改处理接口,增加key
//模拟商品超卖代码@RequestMapping("/deductStock2")publicStringdeductStock2(){//创建一个key,保存至redisStringkey="lock";//setnx//由于redis是一个单线程,执行命令采取“队列”形式排队!//优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,"thisislock");//当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回falseif(!result){//前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!return"err";}//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}//程序执行完成,则删除这个keystringRedisTemplate.delete(key);return"end";}
1、请求进入接口中,如果redis中不存在key,则会新建一个setnx;如果存在,则不会新建,同时返回错误编码,不会继续执行抢购逻辑。
2、当创建成功后,执行抢购逻辑。
3、抢购逻辑执行完成后,删除数据库中对应的setnx
的key
。让其他请求能够设置并操作。
这种逻辑来说比之前单一使用syn
合理的多,但是如果执行抢购操作中出现了异常,导致这个key
无法被删除
。以至于其他处理请求,一直无法拿到key
,程序逻辑死锁!
可以采取try … finally进行操作
/***模拟商品超卖代码设置**@return*/@RequestMapping("/deductStock3")publicStringdeductStock3(){//创建一个key,保存至redisStringkey="lock";//setnx//由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,"thisislock");//当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回falseif(!result){//前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!return"err";}try{//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}}finally{//程序执行完成,则删除这个key//放置于finally中,保证即使上述逻辑出问题,也能del掉stringRedisTemplate.delete(key);}return"end";}
这个逻辑相比上面其他的逻辑来说,显得更加的严谨。
但是,如果一套服务器,因为断电、系统崩溃等原因出现宕机
,导致本该执行finally
中的语句未成功执行完成!!同样出现key一直存在
,导致死锁
!
通过超时间解决上述问题
在设置成功setnx
后,以及抢购代码逻辑执行前,增加key的限时。
/***模拟商品超卖代码设置setnx保证分布式环境下,数据处理安全行问题;<br>*但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;<br>*如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;<br>*为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。**@return*/@RequestMapping("/deductStock4")publicStringdeductStock4(){//创建一个key,保存至redisStringkey="lock";//setnx//由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败//booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,"thisislock");//让设置key和设置key的有效时间都可以同时执行booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,"thisislock",10,TimeUnit.SECONDS);//当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回falseif(!result){//前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!return"err";}//设置key有效时间//stringRedisTemplate.expire(key,10,TimeUnit.SECONDS);try{//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}}finally{//程序执行完成,则删除这个key//放置于finally中,保证即使上述逻辑出问题,也能del掉stringRedisTemplate.delete(key);}return"end";}
但是上述代码的逻辑中依旧会有问题:
如果处理逻辑中,出现
超时
问题。
当逻辑执行时,时间超过设定key有效时间,此时会出现什么问题?
从上图可以清楚的发现问题:
如果一个请求执行时间超过了key的有效时间。
新的请求执行过来时,必然可以拿到key并设置时间;
此时的redis中保存的key并不是请求1的key,而是别的请求设置的。
当请求1执行完成后,此处删除key,删除的是别的请求设置的key!
依然出现了key形同虚设
的问题!如果失效一直存在,超卖问题依旧不会解决。
通过key设置值匹配的方式解决形同虚设问题
既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!
修改上述代码如下所示:
/***模拟商品超卖代码<br>*解决`deductStock6`中,key形同虚设的问题。**@return*/@RequestMapping("/deductStock5")publicStringdeductStock5(){//创建一个key,保存至redisStringkey="lock";Stringlock_value=UUID.randomUUID().toString();//setnx//让设置key和设置key的有效时间都可以同时执行booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(key,lock_value,10,TimeUnit.SECONDS);//当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回falseif(!result){//前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!return"err";}try{//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}}finally{//程序执行完成,则删除这个key//放置于finally中,保证即使上述逻辑出问题,也能del掉//判断redis中该数据是否是这个接口处理时的设置的,如果是则删除if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))){stringRedisTemplate.delete(key);}}return"end";}
由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁
最终版
我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁
@ComponentpublicclassRedisLock{privatefinalLoggerlog=LoggerFactory.getLogger(this.getClass());privatefinallongacquireTimeout=10*1000;//获取锁之前的超时时间(获取锁的等待重试时间)privatefinalinttimeOut=20;//获取锁之后的超时时间(防止死锁)@AutowiredprivateStringRedisTemplatestringRedisTemplate;//引入String类型redis操作模板/***获取分布式锁*@return锁标识*/publicbooleangetRedisLock(StringlockName,StringlockValue){//1.计算获取锁的时间LongendTime=System.currentTimeMillis()+acquireTimeout;//2.尝试获取锁while(System.currentTimeMillis()<endTime){//3.获取锁成功就设置过期时间让设置key和设置key的有效时间都可以同时执行booleanresult=stringRedisTemplate.opsForValue().setIfAbsent(lockName,lockValue,timeOut,TimeUnit.SECONDS);if(result){returntrue;}}returnfalse;}/***释放分布式锁*@paramlockName锁名称*@paramlockValue锁值*/publicvoidunRedisLock(StringlockName,StringlockValue){if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))){stringRedisTemplate.delete(lockName);}}}
@RestControllerpublicclassRedisController{//引入String类型redis操作模板@AutowiredprivateStringRedisTemplatestringRedisTemplate;@AutowiredprivateRedisLockredisLock;@RequestMapping("/setStock")publicStringsetStock(){stringRedisTemplate.opsForValue().set("stock","100");return"ok";}@RequestMapping("/deductStock")publicStringdeductStock(){//创建一个key,保存至redisStringkey="lock";Stringlock_value=UUID.randomUUID().toString();try{booleanredisLock=this.redisLock.getRedisLock(key,lock_value);//获取锁if(redisLock){//获取Redis数据库中的商品数量Integerstock=Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//减库存if(stock>0){intrealStock=stock-1;stringRedisTemplate.opsForValue().set("stock",String.valueOf(realStock));System.out.println("商品扣减成功,剩余商品:"+realStock);}else{System.out.println("库存不足.....");}}}finally{redisLock.unRedisLock(key,lock_value);//释放锁}return"end";}}
可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束
实际上这个最终版依然存在3个问题
1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。GC操作执行完成后(暂停恢复后),执行del操作,但是此时的key还在当前加锁的key么?
2、问题如图所示
关于“Springboot整合Redis如何实现超卖问题”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
推荐阅读
-
php如何让Swoole/Pool进程池实现Redis持久连接
php如何让Swoole/Pool进程池实现Redis持久连接本篇...
-
php操作redis大全记录
php连接redis测试˂?php$redis=newRedis();$redis-˃conne...
-
PHP经典高级工程师面试题
1.PHP如何实现不用自带的cookie函数为客户端下发cookie。对于分布式系统,如何来保存session值...
-
PHP操作Redis数据库
-
php利用redis防止商品超发来限制抢购,简单又实用
-
php如何实现秒杀功能?php+redis模拟简单抢购场景,快来看看吧
-
PHP高级工程师面试题
-
Laravel结合Redis发送邮箱验证码
-
使用redis缓存实现多服务器PHP sessions共享
-
PHP用redis的有序集合zset实现延迟队列