本文工具: ThinkPHP 5.1, RabbitMQ
1.安装 Erlang 环境
wget
yum install erlang-22.2-1.el7.x86_64.rpm
2.下载rabbitMQ的rpm包
wget
3.安装RabbitMQ
yum install rabbitmq-server-3.8.1-1.el7.noarch.rpm
4.启动RabbitMQ
chkconfig rabbitmq-server on #设置rabbitmq 服务为开机启动
/sbin/service rabbitmq-server start #启动
/sbin/service rabbitmq -server stop #关闭
或者
/bin/systemctl start rabbitmq-server.service #启动
/bin/systemctl start rabbitmq-server.service #关闭
5.启动UI插件
创建登录用户 rabbit mq -plugins enable rabbitmq_management #启动管理插件,下次无需再手动启动该插件
PHP
composer “yarayzw/rabbitmq” : “dev-master”,
创建worker配置文件
// +----------------------------------------------------------------------
use core\base\BaseMqService;
use core\helper\SysHelper;
// +----------------------------------------------------------------------
// | Workerman设置 仅对 php think worker 指令有效
// +----------------------------------------------------------------------
return [
// 扩展自身需要的配置
//自动化 处理redis 3677
//sendplan 11001 - 11010
//sendmsg 11020 - 11030
'host' => '0.0.0.0', // 监听地址
'port' => 11020, // 监听端口
' root ' => '', // WEB 根目录 默认会定位public目录
'app_path' => '', // 应用目录 守护进程模式必须设置(绝对路径)
' File _monitor' => false, // 是否开启PHP文件更改监控(调试模式下自动开启)
'file_monitor_interval' => 2, // 文件监控检测时间间隔(秒)
'file_monitor_path' => [], // 文件监控目录 默认监控application和config目录
// 支持workerman的所有配置参数
'name' => 'xx-tj',
'count' => 2,
'daemonize' => false,
// 'pidFile' => App::getRuntimePath().'worker/' . 'worker.pid',
'onWorkerStart' => static function ($worker) {
switch ($worker->id) {
case 0:
//mq消费监听
BaseMqService::setConfig(SysHelper::getConf('rabbitmq.'));
BaseMqService::receive();
break;
case 1:
break;
case 2:
break;
case 3:
break;
default:
throw new Exception ('无效进程');
}
},
];
启动worker
php think worker
创建mq基础类
extends MqService
{
protected static $appName = 'xx-tj';
protected static $config = [
'host' => ' 127.0.0.1 ',
'port' => '5672',
'user' => 'yara',
'password' => 'qwer1234',
'vhost' => 'xx',
'is_delay' => true, //是否需要开启延迟队列
'pre_exchange' => 'xx', //交换机前缀
'exchange_type' => 'topic', //默认topic类型
'exchange_key' => '',
'passive' => false, //查询某一个队列是否已存在,如果不存在,不想建立该队列
'durable' => true, //是否持久化
'auto_delete' => false, //是否自动删除
'exclusive' => false, //队列的排他性
'no_local' => false,
'no_ack' => false, //是否需不需要应答
'nowait' => false, //该方法需要应答确认
'consumer_tag' => ''
];
protected static $consumer = [
'yara' => [
'name' => 'yara',
'exchange' => 'xx.yara',
'route' => 'xx.yara.*',
'queue' => 'xx-tj.yara',
'operations' => [
['name' => 'yara', 'title' => '测试', 'queue' => 'xx-tj.yara.yaratest', 'route' => 'xx.yara.yaratest', 'class' => UserPromulgatingService::class, 'method' => 'yarayzwtest'],
]
],
];
//延迟队列
protected static $delays = [
// 'order_timeout_cancel' => ['name' => 'order_timeout_cancel', 'title' => '订单超时取消订单', 'expiry' => 30 * 60, 'class' => MqDelayOrderService::class, 'method' => 'cancel'],
// 'order_assemble_timeout_cancel' => ['name' => 'order_assemble_timeout_cancel', 'title' => '拼团订单超时退订单', 'expiry' => 24 * 60 * 60, 'class' => MqDelayOrderService::class, 'method' => 'refundAssemble'],
];
/**
* @param MqSendDataStruct $data
* @param $routeKey
* @throws Exception
*/
public static function sendEventMq(Mq send DataStruct $data, string $routeKey): void
{
if (empty($routeKey)) {
throw new BaseException('mq路由key必须填写!');
}
if (empty($data->getEvent())) {
throw new BaseException('请在事件类中定义event属性,并设置默认event_code值,参照Event类中事件对应的code!');
}
if (empty($data->getUserId())) {
throw new BaseException('请在事件类中定义user_id属性');
}
$ EventLog = [
'mq_router_key' => $routeKey,
# 'user_type' => 0,
'user_id' => $data->getUserId(),
'user_name' => $data->getUserName(),
'event_code' => $data->getEvent(),
'send_data' => json_encode($data-> toArray ()),
'create_at' => date('Y-m-d H:i:s')
];
// EventLogService::insert($eventLog);
self::send($data, $routeKey);
}
}
创建字段约定类
thirdparty;
}
/**
* @param mixed $thirdparty
*/
public function setThirdparty($thirdparty): void
{
$this->thirdparty = $thirdparty;
}
/**
* @return mixed
*/
public function getChannelId()
{
return $this->channel_id;
}
/**
* @param mixed $channel_id
*/
public function setChannelId($channel_id): void
{
$this->channel_id = $channel_id;
}
/**
* @return mixed
*/
public function getStartTime()
{
return $this->start_time;
}
/**
* @param mixed $start_time
*/
public function setStartTime($start_time): void
{
$this->start_time = $start_time;
}
/**
* @return mixed
*/
public function getEndTime()
{
return $this->end_time;
}
/**
* @param mixed $end_time
*/
public function setEndTime($end_time): void
{
$this->end_time = $end_time;
}
}
发送mq消息
public function setDataBy Mysql ()
{
$sendData = new GetOrder([
'channel_id' => 0,
'start_time' => 'T00:00:00+08:00',
'end_time' => 'T23:59:59+08:00',
'thirdparty' => 1,
]);
try {
BaseMqService::send($sendData, 'xx.yara.yaratest');
} catch (\Exception $e) {
FilterHelper::writeLog('dsrw_error','推广链接数据采集-error',$e->getMessage());
}
}
海报
0 条评论
148
相关文章
本站已关闭游客评论,请登录或者注册后再评论吧~