PHP|Rabbitmq 安装与使用

2022-10-11 21:44:07 148 0
魁首哥

本文工具: ThinkPHP 5.1, RabbitMQ

1.安装 Erlang 环境

PHP|Rabbitmq 安装与使用

wget
yum install erlang-22.2-1.el7.x86_64.rpm

2.下载rabbitMQ的rpm包

wget

3.安装RabbitMQ

yum install rabbitmq-server-3.8.1-1.el7.noarch.rpm

4.启动RabbitMQ
chkconfig rabbitmq-server on #设置rabbitmq 服务为开机启动

/sbin/service rabbitmq-server start #启动

/sbin/service rabbitmq -server stop #关闭

或者

/bin/systemctl start rabbitmq-server.service #启动

/bin/systemctl start rabbitmq-server.service #关闭

5.启动UI插件

创建登录用户 rabbit mq -plugins enable rabbitmq_management #启动管理插件,下次无需再手动启动该插件

PHP

composer “yarayzw/rabbitmq” : “dev-master”,

创建worker配置文件

 
// +----------------------------------------------------------------------
use core\base\BaseMqService;
use core\helper\SysHelper;


// +----------------------------------------------------------------------
// | Workerman设置 仅对 php think worker 指令有效
// +----------------------------------------------------------------------
return [
    // 扩展自身需要的配置
    //自动化 处理redis 3677
    //sendplan 11001 - 11010
    //sendmsg 11020 - 11030
    'host'                  => '0.0.0.0', // 监听地址
    'port'                  => 11020, // 监听端口
    ' root '                  => '', // WEB 根目录 默认会定位public目录
    'app_path'              => '', // 应用目录 守护进程模式必须设置(绝对路径)
    ' File _monitor'          => false, // 是否开启PHP文件更改监控(调试模式下自动开启)
    'file_monitor_interval' => 2, // 文件监控检测时间间隔(秒)
    'file_monitor_path'     => [], // 文件监控目录 默认监控application和config目录

    // 支持workerman的所有配置参数
    'name'                  => 'xx-tj',
    'count'                 => 2,
    'daemonize'             => false,
//    'pidFile'               => App::getRuntimePath().'worker/' . 'worker.pid',
    'onWorkerStart' =>  static  function ($worker) {
        switch ($worker->id) {
            case 0:
                //mq消费监听
                BaseMqService::setConfig(SysHelper::getConf('rabbitmq.'));
                BaseMqService::receive();
                break;
            case 1:

                break;
            case 2:
                break;
            case 3:
                break;
            default:
                throw new  Exception ('无效进程');
        }

    },
];
  

启动worker

php think worker

创建mq基础类

 extends  MqService
{
    protected static $appName = 'xx-tj';    

    protected static $config = [
        'host' => ' 127.0.0.1 ',
        'port' => '5672',
        'user' => 'yara',
        'password' => 'qwer1234',
        'vhost' => 'xx',

        'is_delay' => true,        //是否需要开启延迟队列
        'pre_exchange' => 'xx',       //交换机前缀

        'exchange_type' => 'topic',     //默认topic类型
        'exchange_key' => '',

        'passive' => false,     //查询某一个队列是否已存在,如果不存在,不想建立该队列
        'durable' => true,      //是否持久化
        'auto_delete' => false, //是否自动删除

        'exclusive' => false,   //队列的排他性
        'no_local' => false,
        'no_ack' => false,       //是否需不需要应答
        'nowait' => false,      //该方法需要应答确认
        'consumer_tag' => ''
    ];

    protected static $consumer = [
        'yara' => [
            'name' => 'yara',
            'exchange' => 'xx.yara',
            'route' => 'xx.yara.*',
            'queue' => 'xx-tj.yara',
            'operations' => [
                ['name' => 'yara', 'title' => '测试', 'queue' => 'xx-tj.yara.yaratest', 'route' => 'xx.yara.yaratest', 'class' => UserPromulgatingService::class, 'method' => 'yarayzwtest'],
            ]
        ],

    ];

    //延迟队列
    protected static $delays = [
//        'order_timeout_cancel' => ['name' => 'order_timeout_cancel', 'title' => '订单超时取消订单', 'expiry' => 30 * 60, 'class' => MqDelayOrderService::class, 'method' => 'cancel'],
//        'order_assemble_timeout_cancel' => ['name' => 'order_assemble_timeout_cancel', 'title' => '拼团订单超时退订单', 'expiry' => 24 * 60 * 60, 'class' => MqDelayOrderService::class, 'method' => 'refundAssemble'],
    ];

    /**
     * @param MqSendDataStruct $data
     * @param $routeKey
     * @throws Exception
     */
    public static function sendEventMq(Mq send DataStruct $data, string $routeKey):  void 
    {
        if (empty($routeKey)) {
            throw new BaseException('mq路由key必须填写!');
        }
        if (empty($data->getEvent())) {
            throw new BaseException('请在事件类中定义event属性,并设置默认event_code值,参照Event类中事件对应的code!');
        }
        if (empty($data->getUserId())) {
            throw new BaseException('请在事件类中定义user_id属性');
        }
        $ EventLog  = [
            'mq_router_key' => $routeKey,
            # 'user_type' => 0,
            'user_id' => $data->getUserId(),
            'user_name' => $data->getUserName(),
            'event_code' => $data->getEvent(),
            'send_data' => json_encode($data-> toArray ()),
            'create_at' => date('Y-m-d H:i:s')
        ];
//        EventLogService::insert($eventLog);
        self::send($data, $routeKey);
    }


}  

创建字段约定类

 thirdparty;
    }

    /**
     * @param mixed $thirdparty
     */
    public function setThirdparty($thirdparty): void
    {
        $this->thirdparty = $thirdparty;
    }

    /**
     * @return mixed
     */
    public function getChannelId()
    {
        return $this->channel_id;
    }

    /**
     * @param mixed $channel_id
     */
    public function setChannelId($channel_id): void
    {
        $this->channel_id = $channel_id;
    }

    /**
     * @return mixed
     */
    public function getStartTime()
    {
        return $this->start_time;
    }

    /**
     * @param mixed $start_time
     */
    public function setStartTime($start_time): void
    {
        $this->start_time = $start_time;
    }

    /**
     * @return mixed
     */
    public function getEndTime()
    {
        return $this->end_time;
    }

    /**
     * @param mixed $end_time
     */
    public function setEndTime($end_time): void
    {
        $this->end_time = $end_time;
    }

}  

发送mq消息

 public function setDataBy Mysql ()
{
    $sendData = new GetOrder([
        'channel_id' => 0,
        'start_time' => 'T00:00:00+08:00',
        'end_time' => 'T23:59:59+08:00',
        'thirdparty' => 1,
    ]);
    try {
        BaseMqService::send($sendData, 'xx.yara.yaratest');
    } catch (\Exception $e) {
        FilterHelper::writeLog('dsrw_error','推广链接数据采集-error',$e->getMessage());
    }
}  

收藏
分享
海报
0 条评论
148
上一篇:3分钟短文 | PHP 根据值移除数组元素,哪个方法最简单? 下一篇:利用php的cURL实现模拟登录,抓取页面数据等功能

本站已关闭游客评论,请登录或者注册后再评论吧~

忘记密码?

图形验证码