怎么用RabbitMQ和Swoole实现一个异步任务系统
这篇文章给大家分享的是有关怎么用RabbitMQ和Swoole实现一个异步任务系统的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
系统介绍
从图中可以看到,我们这个系统是一个基于事件的异步任务系统。就是说当一个事件产生时,生产者将事件抛给调度器,调度器负责查询事件下有哪些任务,然后将这些任务丢到相应的队列中,最后由消费者消费任务队列中的任务。
在整个系统中主要分为三大部分
1.事件生产者,即产生消息事件的一方。
2.任务调度器(Scheduler),负责注册事件并调度任务。
3.消费者(Worker),负责消费任务队列中的任务。
事件生产者
事件生产者很简单,在业务系统中直接调用即可,代码如下。
<?php require_onceDIR.'/../autoload.php'; useAsynclib\Ebats\Event; try{ $event=newEvent('order_paied');//定义事件 $event->setOptions(['order_id'=>'FB138020392193312']);//事件产生的参数 $event->publish(); }catch(Exception$exc){ echo$exc->getMessage(); }
任务调度器
调度器主要做两件事,一是注册事件,另一个是调度任务。
注册事件代码如下:
//注册事件 EventManager::register('order_create','closeOrder','demo',10);//关闭未付款订单(延迟任务) EventManager::register('order_paied','virtualShipping','demo');//虚拟商品自动发货
这样就注册了两个事件,事件下各有一个任务。
具体调度部分代码很简单,就不多赘述,有兴趣的可以去看代码。
消费者
重头戏来了,一个异步任务系统最重要的就是消费端了,现在让我们来看下Worker的流程图。
可以看到,在这里我们采用了两个交换器和两个队列,一个负责处理正常的任务即ntask,另一个负责处理需要延迟执行的任务即dtask。简单描述下一个任务的生命周期。
正常任务
1、task产生,进入正常任务的交换器Exchange[ebats_core_ntask]
2、交换器根据topic将任务分发到对应的队列中
3、子进程ntask阻塞等待成功获取到task,并执行该任务
4、执行失败,需要重试时抛出RetryException,不需要重试时抛出TaskException
5、子进程ntask捕获到重试异常将任务抛给延迟任务的交换器Exchange[ebats_core_dtask]
6、将任务执行信息回调给上层开发者以便保存查看
延迟任务
1、子进程dtask阻塞等待成功获取到task,并执行该任务2、执行失败,需要重试时抛出RetryException,不需要重试时抛出TaskException3、子进程dtask捕获到重试异常将任务抛给延迟任务的交换器Exchange[ebats_core_dtask]4、将任务执行信息回调给上层开发者以便保存查看
消费者代码如下:
require_onceDIR.'/../autoload.php'; require_onceDIR.'/task/TaskDemoModel.php'; useAsynclib\Ebats\Worker; //执行结果回调函数 $callback=function($topic,$taskid,$taskname,$params,$timeuse,$message){ }; $worker=newWorker($callback);//支持多进程消费默认为1 $worker->setQueue('demo');//队列名和事件的topic一一对应 $worker->run();
自定义调度器
一般来说这是一个基于事件的任务系统,那么能不能直接产生任务呢。答案是肯定的。
只需要创建一个自定义调度器,由您自行实现调度逻辑,最终生成一个任务即可。代码如下:
<?php require_onceDIR.'/../autoload.php'; useAsynclib\Ebats\Task; useAsynclib\Core\Consumer; useAsynclib\Amq\ExchangeTypes; useAsynclib\Exception\ExceptionInterface; /** *本示例演示了如何创建一个自定义调度器,开发者可以根据自身需求开发自己的任务调度器 */ try{ $worker=newConsumer(); $worker->setExchange('order_fanout',ExchangeTypes::TOPIC); $worker->setQueue('shzf_order_paied',['*.*.WAIT_SELLER_SEND_GOODS']); $worker->run(function($key,$msg){ $order_data=json_encode($msg); echo"[$key]$order_data\n"; Task::create('demo','orderAsync',$msg);//创建任务,之后消息将作为参数由任务接管处理 }); }catch(ExceptionInterface$exc){ echo$exc->getMessage(); }
这样,当接收到消息时就会产生一个orderAsync的任务,您只需要启动一个用来消费这个Topic的Worker即可。
也许你会觉得这里直接写业务逻辑的代码就可以了,实际上也确实可以。当你可以忍受一个进程慢慢消费的时候是可以这样做的。但大多数情况下我们还是希望它能够尽快的消费掉,所以建议这里只负责创建任务,具体任务的业务逻辑由worker去执行。
感谢各位的阅读!关于“怎么用RabbitMQ和Swoole实现一个异步任务系统”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
推荐阅读
-
php如何让Swoole/Pool进程池实现Redis持久连接
php如何让Swoole/Pool进程池实现Redis持久连接本篇...
-
php进阶到架构之swoole系列教程(一)windows安装swoole
-
Swoole就能实现高性能HTTP服务器
-
在linux下添加PHP扩展通讯swoole的安装
-
小编带你快速入门swoole框架
swoole有两个部分。一个是PHP扩展,用C开发的,这是核心。另一个是框架,像yii、TP、Laravel一样,是PHP代码...
-
php引用计数基本知识,划重点要考的
-
php有哪些运行环境?每个都有不同的特点,适合不同的系统
-
使用 Swoole 来加速你的 Laravel 应用
-
Swoole 5.0 不再使用 PSR-0 下划线风格的类名
Swoole在1.x–4.x版本中同时提供了PSR-0规范的下划线风格类名和PSR-4的命名空间风格。目前PS...
-
thinkphp 6.0 swoole扩展websocket使用教程