如何利用MQ实现事务补偿
本篇内容介绍了“如何利用MQ实现事务补偿”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
rabbitMQ 在互联网公司有着大规模应用,本篇将实战介绍 springboot 整合 rabbitMQ,同时也将在具体的业务场景中介绍利用 MQ 实现事务补偿操作。
一、介绍
本篇我们一起来实操一下SpringBoot整合rabbitMQ,为后续业务处理做铺垫。
废话不多说,直奔主题!
二、整合实战
2.1、创建一个 maven 工程,引入 amqp 包
<!--amqp支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.2、在全局文件中配置 rabbitMQ 服务信息
spring.rabbitmq.addresses=197.168.24.206:5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/
其中,spring.rabbitmq.addresses参数值为 rabbitmq 服务器地址
2.3、编写 rabbitmq 配置类
@Slf4j@ConfigurationpublicclassRabbitConfig{/***初始化连接工厂*@paramaddresses*@paramuserName*@parampassword*@paramvhost*@return*/@BeanConnectionFactoryconnectionFactory(@Value("${spring.rabbitmq.addresses}")Stringaddresses,@Value("${spring.rabbitmq.username}")StringuserName,@Value("${spring.rabbitmq.password}")Stringpassword,@Value("${spring.rabbitmq.virtual-host}")Stringvhost){CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(vhost);returnconnectionFactory;}/***重新实例化RabbitAdmin操作类*@paramconnectionFactory*@return*/@BeanpublicRabbitAdminrabbitAdmin(ConnectionFactoryconnectionFactory){returnnewRabbitAdmin(connectionFactory);}/***重新实例化RabbitTemplate操作类*@paramconnectionFactory*@return*/@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplate=newRabbitTemplate(connectionFactory);//数据转换为json存入消息队列rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());returnrabbitTemplate;}/***将RabbitUtil操作工具类加入IOC容器*@return*/@BeanpublicRabbitUtilrabbitUtil(){returnnewRabbitUtil();}}
2.4、编写 RabbitUtil 工具类
publicclassRabbitUtil{privatestaticfinalLoggerlogger=LoggerFactory.getLogger(RabbitUtil.class);@AutowiredprivateRabbitAdminrabbitAdmin;@AutowiredprivateRabbitTemplaterabbitTemplate;/***创建Exchange*@paramexchangeName*/publicvoidaddExchange(StringexchangeType,StringexchangeName){Exchangeexchange=createExchange(exchangeType,exchangeName);rabbitAdmin.declareExchange(exchange);}/***删除一个Exchange*@paramexchangeName*/publicbooleandeleteExchange(StringexchangeName){returnrabbitAdmin.deleteExchange(exchangeName);}/***创建一个指定的Queue*@paramqueueName*@returnqueueName*/publicvoidaddQueue(StringqueueName){Queuequeue=createQueue(queueName);rabbitAdmin.declareQueue(queue);}/***删除一个queue*@returnqueueName*@paramqueueName*/publicbooleandeleteQueue(StringqueueName){returnrabbitAdmin.deleteQueue(queueName);}/***按照筛选条件,删除队列*@paramqueueName*@paramunused是否被使用*@paramempty内容是否为空*/publicvoiddeleteQueue(StringqueueName,booleanunused,booleanempty){rabbitAdmin.deleteQueue(queueName,unused,empty);}/***清空某个队列中的消息,注意,清空的消息并没有被消费*@returnqueueName*@paramqueueName*/publicvoidpurgeQueue(StringqueueName){rabbitAdmin.purgeQueue(queueName,false);}/***判断指定的队列是否存在*@paramqueueName*@return*/publicbooleanexistQueue(StringqueueName){returnrabbitAdmin.getQueueProperties(queueName)==null?false:true;}/***绑定一个队列到一个匹配型交换器使用一个routingKey*@paramexchangeType*@paramexchangeName*@paramqueueName*@paramroutingKey*@paramisWhereAll*@paramheadersEADERS模式类型设置,其他模式类型传空*/publicvoidaddBinding(StringexchangeType,StringexchangeName,StringqueueName,StringroutingKey,booleanisWhereAll,Map<String,Object>headers){Bindingbinding=bindingBuilder(exchangeType,exchangeName,queueName,routingKey,isWhereAll,headers);rabbitAdmin.declareBinding(binding);}/***声明绑定*@parambinding*/publicvoidaddBinding(Bindingbinding){rabbitAdmin.declareBinding(binding);}/***解除交换器与队列的绑定*@paramexchangeType*@paramexchangeName*@paramqueueName*@paramroutingKey*@paramisWhereAll*@paramheaders*/publicvoidremoveBinding(StringexchangeType,StringexchangeName,StringqueueName,StringroutingKey,booleanisWhereAll,Map<String,Object>headers){Bindingbinding=bindingBuilder(exchangeType,exchangeName,queueName,routingKey,isWhereAll,headers);removeBinding(binding);}/***解除交换器与队列的绑定*@parambinding*/publicvoidremoveBinding(Bindingbinding){rabbitAdmin.removeBinding(binding);}/***创建一个交换器、队列,并绑定队列*@paramexchangeType*@paramexchangeName*@paramqueueName*@paramroutingKey*@paramisWhereAll*@paramheaders*/publicvoidandExchangeBindingQueue(StringexchangeType,StringexchangeName,StringqueueName,StringroutingKey,booleanisWhereAll,Map<String,Object>headers){//声明交换器addExchange(exchangeType,exchangeName);//声明队列addQueue(queueName);//声明绑定关系addBinding(exchangeType,exchangeName,queueName,routingKey,isWhereAll,headers);}/***发送消息*@paramexchange*@paramroutingKey*@paramobject*/publicvoidconvertAndSend(Stringexchange,StringroutingKey,finalObjectobject){rabbitTemplate.convertAndSend(exchange,routingKey,object);}/***转换Message对象*@parammessageType*@parammsg*@return*/publicMessagegetMessage(StringmessageType,Objectmsg){MessagePropertiesmessageProperties=newMessageProperties();messageProperties.setContentType(messageType);Messagemessage=newMessage(msg.toString().getBytes(),messageProperties);returnmessage;}/***声明交换机*@paramexchangeType*@paramexchangeName*@return*/privateExchangecreateExchange(StringexchangeType,StringexchangeName){if(ExchangeType.DIRECT.equals(exchangeType)){returnnewDirectExchange(exchangeName);}if(ExchangeType.TOPIC.equals(exchangeType)){returnnewTopicExchange(exchangeName);}if(ExchangeType.HEADERS.equals(exchangeType)){returnnewHeadersExchange(exchangeName);}if(ExchangeType.FANOUT.equals(exchangeType)){returnnewFanoutExchange(exchangeName);}returnnull;}/***声明绑定关系*@paramexchangeType*@paramexchangeName*@paramqueueName*@paramroutingKey*@paramisWhereAll*@paramheaders*@return*/privateBindingbindingBuilder(StringexchangeType,StringexchangeName,StringqueueName,StringroutingKey,booleanisWhereAll,Map<String,Object>headers){if(ExchangeType.DIRECT.equals(exchangeType)){returnBindingBuilder.bind(newQueue(queueName)).to(newDirectExchange(exchangeName)).with(routingKey);}if(ExchangeType.TOPIC.equals(exchangeType)){returnBindingBuilder.bind(newQueue(queueName)).to(newTopicExchange(exchangeName)).with(routingKey);}if(ExchangeType.HEADERS.equals(exchangeType)){if(isWhereAll){returnBindingBuilder.bind(newQueue(queueName)).to(newHeadersExchange(exchangeName)).whereAll(headers).match();}else{returnBindingBuilder.bind(newQueue(queueName)).to(newHeadersExchange(exchangeName)).whereAny(headers).match();}}if(ExchangeType.FANOUT.equals(exchangeType)){returnBindingBuilder.bind(newQueue(queueName)).to(newFanoutExchange(exchangeName));}returnnull;}/***声明队列*@paramqueueName*@return*/privateQueuecreateQueue(StringqueueName){returnnewQueue(queueName);}/***交换器类型*/publicfinalstaticclassExchangeType{/***直连交换机(全文匹配)*/publicfinalstaticStringDIRECT="DIRECT";/***通配符交换机(两种通配符:*只能匹配一个单词,#可以匹配零个或多个)*/publicfinalstaticStringTOPIC="TOPIC";/***头交换机(自定义键值对匹配,根据发送消息内容中的headers属性进行匹配)*/publicfinalstaticStringHEADERS="HEADERS";/***扇形(广播)交换机(将消息转发到所有与该交互机绑定的队列上)*/publicfinalstaticStringFANOUT="FANOUT";}}
此致, rabbitMQ 核心操作功能操作已经开发完毕!
2.5、编写队列监听类(静态)
@Slf4j@ConfigurationpublicclassDirectConsumeListener{/***监听指定队列,名称:mq.direct.1*@parammessage*@paramchannel*@throwsIOException*/@RabbitListener(queues="mq.direct.1")publicvoidconsume(Messagemessage,Channelchannel)throwsIOException{log.info("DirectConsumeListener,收到消息:{}",message.toString());}}
如果你需要监听指定的队列,只需要方法上加上@RabbitListener(queues = "")即可,同时填写对应的队列名称。
但是,如果你想动态监听队列,而不是通过写死在方法上呢?
请看下面介绍!
2.6、编写队列监听类(动态)
重新实例化一个SimpleMessageListenerContainer对象,这个对象就是监听容器。
@Slf4j@ConfigurationpublicclassDynamicConsumeListener{/***使用SimpleMessageListenerContainer实现动态监听*@paramconnectionFactory*@return*/@BeanpublicSimpleMessageListenerContainermessageListenerContainer(ConnectionFactoryconnectionFactory){SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory);container.setMessageListener((MessageListener)message->{log.info("ConsumerMessageListen,收到消息:{}",message.toString());});returncontainer;}}
如果想向SimpleMessageListenerContainer添加监听队列或者移除队列,只需通过如下方式即可操作。
@Slf4j@RestController@RequestMapping("/consumer")publicclassConsumerController{@AutowiredprivateSimpleMessageListenerContainercontainer;@AutowiredprivateRabbitUtilrabbitUtil;/***添加队列到监听器*@paramconsumerInfo*/@PostMapping("addQueue")publicvoidaddQueue(@RequestBodyConsumerInfoconsumerInfo){booleanexistQueue=rabbitUtil.existQueue(consumerInfo.getQueueName());if(!existQueue){thrownewCommonExecption("当前队列不存在");}//消费mq消息的类container.addQueueNames(consumerInfo.getQueueName());//打印监听容器中正在监听到队列log.info("container-queue:{}",JsonUtils.toJson(container.getQueueNames()));}/***移除正在监听的队列*@paramconsumerInfo*/@PostMapping("removeQueue")publicvoidremoveQueue(@RequestBodyConsumerInfoconsumerInfo){//消费mq消息的类container.removeQueueNames(consumerInfo.getQueueName());//打印监听容器中正在监听到队列log.info("container-queue:{}",JsonUtils.toJson(container.getQueueNames()));}/***查询监听容器中正在监听到队列*/@PostMapping("queryListenerQueue")publicvoidqueryListenerQueue(){log.info("container-queue:{}",JsonUtils.toJson(container.getQueueNames()));}}
2.7、发送消息到交换器
发送消息到交换器,非常简单,只需要通过如下方式即可!
先编写一个请求参数实体类
@DatapublicclassProduceInfoimplementsSerializable{privatestaticfinallongserialVersionUID=1l;/***交换器名称*/privateStringexchangeName;/***路由键key*/privateStringroutingKey;/***消息内容*/publicStringmsg;}
编写接口api
@RestController@RequestMapping("/produce")publicclassProduceController{@AutowiredprivateRabbitUtilrabbitUtil;/***发送消息到交换器*@paramproduceInfo*/@PostMapping("sendMessage")publicvoidsendMessage(@RequestBodyProduceInfoproduceInfo){rabbitUtil.convertAndSend(produceInfo.getExchangeName(),produceInfo.getRoutingKey(),produceInfo);}}
当然,你也可以直接使用rabbitTemplate操作类,来实现发送消息。
rabbitTemplate.convertAndSend(exchange,routingKey,message);
参数内容解释:
exchange:表示交换器名称
routingKey:表示路由键key
message:表示消息
2.8、交换器、队列维护操作
如果想通过接口对 rabbitMQ 中的交换器、队列以及绑定关系进行维护,通过如下方式接口操作,即可实现!
先编写一个请求参数实体类
@DatapublicclassQueueConfigimplementsSerializable{privatestaticfinallongserialVersionUID=1l;/***交换器类型*/privateStringexchangeType;/***交换器名称*/privateStringexchangeName;/***队列名称*/privateStringqueueName;/***路由键key*/privateStringroutingKey;}
编写接口api
/***rabbitMQ管理操作控制层*/@RestController@RequestMapping("/config")publicclassRabbitController{@AutowiredprivateRabbitUtilrabbitUtil;/***创建交换器*@paramconfig*/@PostMapping("addExchange")publicvoidaddExchange(@RequestBodyQueueConfigconfig){rabbitUtil.addExchange(config.getExchangeType(),config.getExchangeName());}/***删除交换器*@paramconfig*/@PostMapping("deleteExchange")publicvoiddeleteExchange(@RequestBodyQueueConfigconfig){rabbitUtil.deleteExchange(config.getExchangeName());}/***添加队列*@paramconfig*/@PostMapping("addQueue")publicvoidaddQueue(@RequestBodyQueueConfigconfig){rabbitUtil.addQueue(config.getQueueName());}/***删除队列*@paramconfig*/@PostMapping("deleteQueue")publicvoiddeleteQueue(@RequestBodyQueueConfigconfig){rabbitUtil.deleteQueue(config.getQueueName());}/***清空队列数据*@paramconfig*/@PostMapping("purgeQueue")publicvoidpurgeQueue(@RequestBodyQueueConfigconfig){rabbitUtil.purgeQueue(config.getQueueName());}/***添加绑定*@paramconfig*/@PostMapping("addBinding")publicvoidaddBinding(@RequestBodyQueueConfigconfig){rabbitUtil.addBinding(config.getExchangeType(),config.getExchangeName(),config.getQueueName(),config.getRoutingKey(),false,null);}/***解除绑定*@paramconfig*/@PostMapping("removeBinding")publicvoidremoveBinding(@RequestBodyQueueConfigconfig){rabbitUtil.removeBinding(config.getExchangeType(),config.getExchangeName(),config.getQueueName(),config.getRoutingKey(),false,null);}/***创建头部类型的交换器*判断条件是所有的键值对都匹配成功才发送到队列*@paramconfig*/@PostMapping("andExchangeBindingQueueOfHeaderAll")publicvoidandExchangeBindingQueueOfHeaderAll(@RequestBodyQueueConfigconfig){HashMap<String,Object>header=newHashMap<>();header.put("queue","queue");header.put("bindType","whereAll");rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS,config.getExchangeName(),config.getQueueName(),null,true,header);}/***创建头部类型的交换器*判断条件是只要有一个键值对匹配成功就发送到队列*@paramconfig*/@PostMapping("andExchangeBindingQueueOfHeaderAny")publicvoidandExchangeBindingQueueOfHeaderAny(@RequestBodyQueueConfigconfig){HashMap<String,Object>header=newHashMap<>();header.put("queue","queue");header.put("bindType","whereAny");rabbitUtil.andExchangeBindingQueue(RabbitUtil.ExchangeType.HEADERS,config.getExchangeName(),config.getQueueName(),null,false,header);}}
至此,rabbitMQ 管理器基本的 crud 全部开发完成!
三、利用 MQ 实现事务补偿
当然,我们花了这么大的力气,绝不仅仅是为了将 rabbitMQ 通过 web 项目将其管理起来,最重要的是能投入业务使用中去!
上面的操作只是告诉我们怎么使用 rabbitMQ!
当你仔细回想整个过程的时候,其实还是回到最初那个问题,什么时候使用 MQ ?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:支付订单、扣减库存、生成相应单据、发红包、发短信通知等等。
在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取 MQ 的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
这种是利用 MQ 实现业务解耦,其它的场景包括最终一致性、广播、错峰流控等等。
利用 MQ 实现业务解耦的过程其实也很简单。
当主流程结束之后,将消息推送到发红包、发短信交换器中即可
@ServicepublicclassOrderService{@AutowiredprivateRabbitUtilrabbitUtil;/***创建订单*@paramorder*/@TransactionalpublicvoidcreateOrder(Orderorder){//1、创建订单//2、调用库存接口,减库存//3、向客户发放红包rabbitUtil.convertAndSend("exchange.send.bonus",null,order);//4、发短信通知rabbitUtil.convertAndSend("exchange.sms.message",null,order);}}
监听发红包操作
/***监听发红包*@parammessage*@paramchannel*@throwsIOException*/@RabbitListener(queues="exchange.send.bonus")publicvoidconsume(Messagemessage,Channelchannel)throwsIOException{StringmsgJson=newString(message.getBody(),"UTF-8");log.info("收到消息:{}",message.toString());//调用发红包接口}
监听发短信操作
/***监听发短信*@parammessage*@paramchannel*@throwsIOException*/@RabbitListener(queues="exchange.sms.message")publicvoidconsume(Messagemessage,Channelchannel)throwsIOException{StringmsgJson=newString(message.getBody(),"UTF-8");log.info("收到消息:{}",message.toString());//调用发短信接口}
既然 MQ 这么好用,那是不是完全可以将以前的业务也按照整个模型进行拆分呢?
答案显然不是!
当引入 MQ 之后业务的确是解耦了,但是当 MQ 一旦挂了,所有的服务基本都挂了,是不是很可怕!
但是没关系,俗话说,兵来将挡、水来土掩,这句话同样适用于 IT 开发者,有坑填坑!
“如何利用MQ实现事务补偿”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注恰卡编程网网站,小编将为大家输出更多高质量的实用文章!