Redis消息队列实现异步秒杀功能

2025-04-22 22:11:30 177
魁首哥

1 redis消息队列

在高并发场景下,为了提高秒杀业务的性能,可将部分工作交给 redis 处理,并通过异步方式执行。redis 提供了多种数据结构来实现消息队列,总结三种。

1.1list 结构

  • 原理:基于 list 结构模拟消息队列,使用 brpush 生产消息,brpop 消费消息。
  • 命令示例
    • 生产消息brpush key value [value ...],将一个或多个元素推入到指定列表的头部。如果列表不存在,会自动创建一个新的列表。
    • 消费消息brpop key [key ...] timeout,从指定的一个或多个列表中弹出最后一个元素。如果列表为空,该命令会导致客户端阻塞,直到有数据可用或超过指定的超时时间。
  • 优缺点
  • 优点:不会内存超限、可以持久化、消息有序性。
  • 缺点:无法避免数据丢失、只支持单消费者。

1.2pub/sub 模式

  • 原理:发布订阅模式,基本的点对点消息模型,支持多生产、多消费者。
  • 命令示例
    • 生产消息publish channel message,用于向指定频道发布一条消息。
    • 消费消息 subscribe channel [channel]:订阅一个或多个频道。
      • unsubscribe [channel [channel ...]]:取消订阅一个或多个频道。
      • psubscribe pattern [pattern ...]:订阅一个或多个符合给定模式的频道,接收消息。
      • punsubscribe [pattern [pattern ...]]:取消订阅一个或多个符合给定模式的频道。
  • 优缺点
    • 优点:支持多生产、多消费者。
    • 缺点:不支持持久化、无法避免数据丢失,消息堆积有上限(消费者会缓存消息),超出会丢失消息。

1.3stream 结构

  • 原理:redis 5.0 引入的专门为消息队列设计的数据类型,支持消息可回溯、一个消息可以被多个消费者消费、可以阻塞读取。
  • 命令示例
    • 生产消息xadd key *|id value [value ...],向指定的 stream 流中添加一个消息。例如:xadd users * name jack age 21,创建名为 users 的队列,并向其中发送一个消息,内容是 {name=jack,age=21},使用 redis 自动生成 id。
    • 消费消息xread [count count] [block milliseconds] streams key [key ...] id id。例如:
      • xread count 1 streams users 0:读取 users 队列中的第一条消息。
      • xread count 1 block 1000 streams users $:阻塞 1 秒钟后从 users 队列中读取最新消息。
  • 消费者组模式
    • 特点:消息分流、消息标识、消息确认。
    • 命令示例
      • xgroup create key groupname id:创建消费者组。
      • xgroup destory key groupname:删除指定的消费者组。
      • xgroup createconsumer key groupname consumername:给指定的消费者组添加消费者。
      • xgroup delconsumer key groupname consumername:删除消费者组中指定消费者。
      • xreadgroup group:从消费者组中读取消息。
  • 优缺点
    • 优点:消息可回溯、可以多消费者争抢消息,加快消费速度、可以阻塞读取、没有消息漏读的风险、有消息确认机制,保证消息至少被消费一次。
    • 缺点:有消息漏读的风险(单消费方式下)。

1.4redis stream消息队列的特点

redis 5.0引入的stream类型是专门为消息队列设计的,支持以下特性:

  • 消息持久化:消息存储在内存中,支持持久化到磁盘,避免消息丢失。
  • 消费者组(consumer group)
    • 消息分流:一个队列可以被多个消费者组订阅,组内多个消费者分摊消息处理。
    • 消息回溯:支持按消息id回溯历史消息。
    • 消息确认(ack):消费者处理完消息后需确认,否则消息会进入pending-list等待重试。
  • 阻塞读取:消费者可以阻塞等待新消息,减少cpu空转。
  • 避免消息丢失:通过pending-list机制,确保消息至少被消费一次。

2 秒杀业务处理

2.1使用lua脚本处理库存和订单

目标:在redis中完成库存判断和订单校验,确保原子性。

-- 参数:优惠券id、用户id、订单id
local voucherid = argv[1]
local userid = argv[2]
local orderid = argv[3]
-- 库存key和订单key
local stockkey = 'seckill:stock:' .. voucherid
local orderkey = 'seckill:order:' .. voucherid
-- 判断库存是否充足
if (tonumber(redis.call('get', stockkey)) <= 0 then
    return 1 -- 库存不足
end
-- 判断用户是否已下单
if (redis.call('sismember', orderkey, userid) == 1 then
    return 2 -- 用户已下单
end
-- 扣减库存并记录订单
redis.call('decr', stockkey)
redis.call('sadd', orderkey, userid)
-- 将订单信息发送到消息队列
redis.call('xadd', 'stream.orders', '*', 'userid', userid, 'voucherid', voucherid, 'id', orderid)
return 0 -- 成功

脚本说明

  • 原子性操作:库存检查、订单校验、消息发送在一个脚本中完成。
  • 消息发送:使用xadd将订单信息写入stream.orders队列。

2.2创建消费者组

  • xgroup create stream.orders g1 0 mkstream

g1:消费者组名称。

mkstream:如果队列不存在则自动创建。

2.3java代码实现

  • init 方法:在类初始化时创建消息队列,并启动一个线程任务从消息队列中获取订单信息。
  • voucherorderhandler 类:实现 runnable 接口,作为线程任务,不断从消息队列中获取订单信息。如果获取成功,将消息转换为 voucherorder 对象,调用 handlevoucherorder 方法处理订单,并进行 ack 确认;如果出现异常,调用 handlependinglist 方法处理异常消息。
  • handlependinglist 方法:从 pendinglist 中获取订单信息,处理订单并进行 ack 确认,直到 pendinglist 中没有消息。
  • handlevoucherorder 方法:使用 redisson 分布式锁确保一人一单,调用代理对象的 createvoucherorder 方法创建订单。
  • seckillvoucher 方法:执行 lua 脚本判断用户是否具有秒杀资格,如果具有资格,将订单信息发送到消息队列,并返回下单成功信息。
  • createvoucherorder 方法:判断当前用户是否是第一单,如果是则扣减库存并将订单保存到数据库。

系统启动与初始化

系统启动时,voucherorderserviceimpl 类的 @postconstruct 注解会触发 init 方法执行。该方法先加载创建消息队列的 lua 脚本,通过 stringredistemplate.execute 方法执行脚本创建 redis stream 消息队列和消费者组。若创建成功或队列已存在,会记录相应日志。之后,使用线程池 seckill_order_executor 启动 voucherorderhandler 线程,该线程负责后续从消息队列获取订单信息并处理。

用户发起秒杀请求

用户发起秒杀请求后,系统调用 voucherorderserviceimplseckillvoucher 方法。此方法先从 threadlocalutls 中获取用户 id,用 redisidworker 生成订单 id。接着执行判断用户秒杀资格的 lua 脚本,该脚本接收优惠券 id、用户 id 和订单 id 作为参数。若脚本返回值表明库存不足或用户已下单,方法返回相应的失败提示;若返回值为 0,说明用户有秒杀资格,创建代理对象并返回下单成功结果。

lua 脚本执行逻辑

lua 脚本接收到参数后,根据优惠券 id 拼接库存和订单的 redis key。先通过 get 命令获取库存,若库存小于等于 0 则返回 1 表示库存不足。若库存充足,使用 sismember 命令检查用户是否已下单,若已下单则返回 2。若库存充足且用户未下单,使用 incrby 命令扣减库存,sadd 命令记录订单信息,最后返回 0 表示下单成功。

订单处理线程工作

voucherorderhandler 线程启动后进入无限循环,不断从 redis stream 消息队列获取订单信息。若未获取到消息,继续下一次循环;若获取到消息,将消息转换为 voucherorder 对象,调用 handlevoucherorder 方法处理订单,处理完成后向消息队列发送 ack 确认消息。若处理过程中出现异常,调用 handlependinglist 方法处理异常消息。

订单处理方法 handlevoucherorder

handlevoucherorder 方法接收 voucherorder 对象,根据用户 id 获取 redisson 分布式锁。尝试获取锁,若失败记录错误日志并返回;若成功,调用代理对象的 createvoucherorder 方法创建订单,最后释放锁。

订单创建方法 createvoucherorder

该方法先判断当前用户是否是第一单,通过查询数据库中该用户的订单数量来判断。若不是第一单,记录错误日志并返回;若是第一单,尝试扣减秒杀券库存,若扣减失败抛出异常。若库存扣减成功,将订单信息保存到数据库,若保存失败也抛出异常。

@service
public class voucherorderserviceimpl extends serviceimpl implements ivoucherorderservice {
    @resource
    private iseckillvoucherservice seckillvoucherservice;
    @resource
    private redisidworker redisidworker;
    @resource
    private stringredistemplate stringredistemplate;
    @resource
    private redissonclient redissonclient;
    /**
     * 当前类初始化完毕就立马执行该方法
     */
    @postconstruct
    private void init() {
        // 创建消息队列
        defaultredisscript mqscript = new defaultredisscript<>();
        mqscript.setlocation(new classpathresource("lua/stream-mq.lua"));
        mqscript.setresulttype(long.class);
        long result = null;
        try {
            result = stringredistemplate.execute(mqscript,
                    collections.emptylist(),
                    queue_name,
                    group_name);
        } catch (exception e) {
            log.error("队列创建失败", e);
            return;
        }
        int r = result.intvalue();
        string info = r == 1 ? "队列创建成功" : "队列已存在";
        log.debug(info);
        // 执行线程任务
        seckill_order_executor.submit(new voucherorderhandler());
    }
    /**
     * 线程池
     */
    private static final executorservice seckill_order_executor = executors.newsinglethreadexecutor();
    /**
     * 队列名
     */
    private static final string queue_name = "stream.orders";
    /**
     * 组名
     */
    private static final string group_name = "g1";
    /**
     * 线程任务: 不断从消息队列中获取订单
     */
    private class voucherorderhandler implements runnable {
        @override
        public void run() {
            while (true) {
                try {
                    // 1、从消息队列中获取订单信息 xreadgroup group g1 c1 count 1 block 1000 streams streams.order >
                    list> messagelist = stringredistemplate.opsforstream().read(
                            consumer.from(group_name, "c1"),
                            streamreadoptions.empty().count(1).block(duration.ofseconds(1)),
                            streamoffset.create(queue_name, readoffset.lastconsumed())
                    );
                    // 2、判断消息获取是否成功
                    if (messagelist == null || messagelist.isempty()) {
                        // 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息
                        continue;
                    }
                    // 3、消息获取成功,可以下单
                    // 将消息转成voucherorder对象
                    maprecord record = messagelist.get(0);
                    map messagemap = record.getvalue();
                    voucherorder voucherorder = beanutil.fillbeanwithmap(messagemap, new voucherorder(), true);
                    handlevoucherorder(voucherorder);
                    // 4、ack确认 sack stream.orders g1 id
                    stringredistemplate.opsforstream().acknowledge(queue_name, group_name, record.getid());
                } catch (exception e) {
                    log.error("处理订单异常", e);
                    // 处理异常消息
                    handlependinglist();
                }
            }
        }
    }
    private void handlependinglist() {
        while (true) {
            try {
                // 1、从pendinglist中获取订单信息 xreadgroup group g1 c1 count 1 block 1000 streams streams.order 0
                list> messagelist = stringredistemplate.opsforstream().read(
                        consumer.from(group_name, "c1"),
                        streamreadoptions.empty().count(1).block(duration.ofseconds(1)),
                        streamoffset.create(queue_name, readoffset.from("0"))
                );
                // 2、判断pendinglist中是否有效性
                if (messagelist == null || messagelist.isempty()) {
                    // 2.1 pendinglist中没有消息,直接结束循环
                    break;
                }
                // 3、pendinglist中有消息
                // 将消息转成voucherorder对象
                maprecord record = messagelist.get(0);
                map messagemap = record.getvalue();
                voucherorder voucherorder = beanutil.fillbeanwithmap(messagemap, new voucherorder(), true);
                handlevoucherorder(voucherorder);
                // 4、ack确认 sack stream.orders g1 id
                stringredistemplate.opsforstream().acknowledge(queue_name, group_name, record.getid());
            } catch (exception e) {
                log.error("处理订单异常", e);
                // 这里不用调自己,直接就进入下一次循环,再从pendinglist中取,这里只需要休眠一下,防止获取消息太频繁
                try {
                    thread.sleep(20);
                } catch (interruptedexception ex) {
                    log.error("线程休眠异常", ex);
                }
            }
        }
    }
    /**
     * 创建订单
     *
     * @param voucherorder
     */
    private void handlevoucherorder(voucherorder voucherorder) {
        long userid = voucherorder.getuserid();
        rlock lock = redissonclient.getlock(redisconstants.lock_order_key + userid);
        boolean islock = lock.trylock();
        if (!islock) {
            // 索取锁失败,重试或者直接抛异常(这个业务是一人一单,所以直接返回失败信息)
            log.error("一人只能下一单");
            return;
        }
        try {
            // 创建订单(使用代理对象调用,是为了确保事务生效)
            proxy.createvoucherorder(voucherorder);
        } finally {
            lock.unlock();
        }
    }
    /**
     * 加载 判断秒杀券库存是否充足 并且 判断用户是否已下单 的lua脚本
     */
    private static final defaultredisscript seckill_script;
    static {
        seckill_script = new defaultredisscript<>();
        seckill_script.setlocation(new classpathresource("lua/stream-seckill.lua"));
        seckill_script.setresulttype(long.class);
    }
    /**
     * voucherorderserviceimpl类的代理对象
     * 将代理对象的作用域进行提升,方面子线程取用
     */
    private ivoucherorderservice proxy;
    /**
     * 抢购秒杀券
     *
     * @param voucherid
     * @return
     */
    @transactional
    @override
    public result seckillvoucher(long voucherid) {
        long userid = threadlocalutls.getuser().getid();
        long orderid = redisidworker.nextid(seckill_voucher_order);
        // 1、执行lua脚本,判断用户是否具有秒杀资格
        long result = null;
        try {
            result = stringredistemplate.execute(
                    seckill_script,
                    collections.emptylist(),
                    voucherid.tostring(),
                    userid.tostring(),
                    string.valueof(orderid)
            );
        } catch (exception e) {
            log.error("lua脚本执行失败");
            throw new runtimeexception(e);
        }
        if (result != null && !result.equals(0l)) {
            // result为1表示库存不足,result为2表示用户已下单
            int r = result.intvalue();
            return result.fail(r == 2 ? "不能重复下单" : "库存不足");
        }
        // 2、result为0,下单成功,直接返回ok
        // 索取锁成功,创建代理对象,使用代理对象调用第三方事务方法, 防止事务失效
        ivoucherorderservice proxy = (ivoucherorderservice) aopcontext.currentproxy();
        this.proxy = proxy;
        return result.ok();
    }
    /**
     * 创建订单
     *
     * @param voucherorder
     * @return
     */
    @transactional
    @override
    public void createvoucherorder(voucherorder voucherorder) {
        long userid = voucherorder.getuserid();
        long voucherid = voucherorder.getvoucherid();
        // 1、判断当前用户是否是第一单
        int count = this.count(new lambdaquerywrapper()
                .eq(voucherorder::getuserid, userid));
        if (count >= 1) {
            // 当前用户不是第一单
            log.error("当前用户不是第一单");
            return;
        }
        // 2、用户是第一单,可以下单,秒杀券库存数量减一
        boolean flag = seckillvoucherservice.update(new lambdaupdatewrapper()
                .eq(seckillvoucher::getvoucherid, voucherid)
                .gt(seckillvoucher::getstock, 0)
                .setsql("stock = stock -1"));
        if (!flag) {
            throw new runtimeexception("秒杀券扣减失败");
        }
        // 3、将订单保存到数据库
        flag = this.save(voucherorder);
        if (!flag) {
            throw new runtimeexception("创建秒杀券订单失败");
        }
    }
}

3 秒杀流程剖析

3.1初始化操作

lua 脚本准备:编写 lua 脚本,接收优惠券 id 和用户 id 作为参数,判断库存是否充足以及用户是否已下单。若库存不足返回 1,用户已下单返回 2,下单成功返回 0。

-- 优惠券id
local voucherid = argv[1];
-- 用户id
local userid = argv[2];
local stockkey = 'seckill:stock:' .. voucherid;
local orderkey = 'seckill:order:' .. voucherid;
local stock = redis.call('get', stockkey);
if (tonumber(stock) <= 0) then
    return 1;
end
if (redis.call('sismember', orderkey, userid) == 1) then
    return 2;
end
redis.call('incrby', stockkey, -1);
redis.call('sadd', orderkey, userid);
return 0;

消息队列创建:在 java 代码的 @postconstruct 方法中,通过执行 lua 脚本创建 redis 的 stream 消息队列和消费者组。

@postconstruct
private void init() {
    defaultredisscript mqscript = new defaultredisscript<>();
    mqscript.setlocation(new classpathresource("lua/stream-mq.lua"));
    mqscript.setresulttype(long.class);
    long result = stringredistemplate.execute(mqscript, collections.emptylist(), queue_name, group_name);
    if (result == 1) {
        log.debug("队列创建成功");
    } else {
        log.debug("队列已存在");
    }
    seckill_order_executor.submit(new voucherorderhandler());
}

3.2 秒杀请求处理

资格判断:用户发起秒杀请求,系统执行 lua 脚本,根据返回结果判断用户是否具有秒杀资格。若返回 1 表示库存不足,返回 2 表示用户已下单,均返回失败信息;返回 0 则表示具有秒杀资格。

@override
public result seckillvoucher(long voucherid) {
    long userid = threadlocalutls.getuser().getid();
    long orderid = redisidworker.nextid(seckill_voucher_order);
    long result = stringredistemplate.execute(seckill_script, collections.emptylist(), 
                                            voucherid.tostring(), userid.tostring(), string.valueof(orderid));
    if (result != 0) {
        return result.fail(result == 2 ? "不能重复下单" : "库存不足");
    }
    ivoucherorderservice proxy = (ivoucherorderservice) aopcontext.currentproxy();
    this.proxy = proxy;
    return result.ok();
}

订单入队:具有秒杀资格后,生成订单 id,创建订单对象,将订单信息发送到 redis 的 stream 消息队列。

3.3消息队列消费

订单处理线程:使用线程池启动一个线程任务 voucherorderhandler,不断从消息队列中获取订单信息。

private class voucherorderhandler implements runnable {
    @override
    public void run() {
        while (true) {
            try {
                list> messagelist = stringredistemplate.opsforstream().read(
                    consumer.from(group_name, "c1"),
                    streamreadoptions.empty().count(1).block(duration.ofseconds(1)),
                    streamoffset.create(queue_name, readoffset.lastconsumed())
                );
                if (messagelist == null || messagelist.isempty()) {
                    continue;
                }
                maprecord record = messagelist.get(0);
                map messagemap = record.getvalue();
                voucherorder voucherorder = beanutil.fillbeanwithmap(messagemap, new voucherorder(), true);
                handlevoucherorder(voucherorder);
                stringredistemplate.opsforstream().acknowledge(queue_name, group_name, record.getid());
            } catch (exception e) {
                log.error("处理订单异常", e);
                handlependinglist();
            }
        }
    }
}

异常处理:若处理订单过程中出现异常,调用 handlependinglist 方法从 pendinglist 中获取未处理的订单信息,继续处理。

3.4 订单创建

分布式锁保障:使用 redisson 分布式锁,确保同一用户同一时间只能创建一个订单,避免一人多单问题。

private void handlevoucherorder(voucherorder voucherorder) {
    long userid = voucherorder.getuserid();
    rlock lock = redissonclient.getlock(redisconstants.lock_order_key + userid);
    boolean islock = lock.trylock();
    if (!islock) {
        log.error("一人只能下一单");
        return;
    }
    try {
        proxy.createvoucherorder(voucherorder);
    } finally {
        lock.unlock();
    }
}

数据库操作:判断用户是否是第一单,若是则扣减库存并将订单保存到数据库。

@override
public void createvoucherorder(voucherorder voucherorder) {
    long userid = voucherorder.getuserid();
    long voucherid = voucherorder.getvoucherid();
    int count = this.count(new lambdaquerywrapper().eq(voucherorder::getuserid, userid));
    if (count >= 1) {
        log.error("当前用户不是第一单");
        return;
    }
    boolean flag = seckillvoucherservice.update(new lambdaupdatewrapper()
        .eq(seckillvoucher::getvoucherid, voucherid)
        .gt(seckillvoucher::getstock, 0)
        .setsql("stock = stock -1"));
    if (!flag) {
        throw new runtimeexception("秒杀券扣减失败");
    }
    flag = this.save(voucherorder);
    if (!flag) {
        throw new runtimeexception("创建秒杀券订单失败");
    }
}

4 秒杀流程(文字版)

1. 初始化准备

在系统启动阶段,我们会完成一些必要的初始化工作。一方面,编写好用于判断库存和订单情况的 lua 脚本。这个脚本会接收优惠券 id 和用户 id 作为参数,通过 redis 的相关命令判断库存是否充足以及用户是否已下单,保证这些判断操作的原子性。另一方面,在 java 代码里利用 @postconstruct 注解,通过执行另一个 lua 脚本来创建 redis 的 stream 消息队列和消费者组,为后续处理订单消息做好准备。

2. 用户请求与资格判断

当用户发起秒杀请求后,系统会立即执行之前准备好的 lua 脚本来判断用户是否具有秒杀资格。

  • 如果脚本返回库存不足的标识,系统会迅速返回 “库存不足” 的提示信息,结束本次请求处理。
  • 若返回用户已下单的标识,就会返回 “不能重复下单” 的提示,流程终止。
  • 当判定用户具有秒杀资格时,系统会生成唯一的订单 id,创建订单对象,然后将订单信息发送到 redis 的 stream 消息队列,进入异步处理阶段。

3. 消息队列消费

有一个专门的消息队列消费者线程会持续监听 redis 的 stream 消息队列。

  • 如果没有获取到新的订单信息,线程会继续保持监听状态。
  • 一旦获取到订单信息,线程会马上尝试获取 redisson 分布式锁。这个锁非常关键,它能确保同一用户同一时间只能处理一个订单,有效避免一人多单的问题。

4. 订单创建与处理

获取到锁之后,系统会进一步处理订单。

  • 首先判断当前用户是否是第一单。如果不是,系统会记录错误日志并释放锁,结束流程。
  • 若是第一单,系统会尝试扣减库存。如果库存扣减失败,会抛出异常并释放锁;若扣减成功,就将订单信息保存到数据库。
  • 在保存订单时,若保存失败会抛出异常并释放锁;保存成功后,系统会向 redis 的 stream 消息队列发送 ack 确认消息,最后释放锁,完成整个秒杀流程。

到此这篇关于redis消息队列实现异步秒杀的文章就介绍到这了,更多相关redis异步秒杀内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

分享
海报
177
上一篇:Web应用从零开始,初学者友好型开发教程 下一篇:利用Nginx实现资源代理和接口代理的实现方法

忘记密码?

图形验证码