SpringBoot整合RocketMQ遇到的坑怎么解决
SpringBoot整合RocketMQ遇到的坑怎么解决
本篇内容主要讲解“SpringBoot整合RocketMQ遇到的坑怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringBoot整合RocketMQ遇到的坑怎么解决”吧!
应用场景
在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。
引入依赖
<!--RocketMqSpringBootStarter--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency>
消费者代码
@RocketMQMessageListener(consumerGroup="${rocketmq.group}",topic="${rocketmq.topic}",selectorExpression="${rocketmq.selectorExpression}")publicclassConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Strings){System.out.println("消费到的数据为:"+s);}}
问题排查
RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。
@OverridepublicvoidafterSingletonsInstantiated(){//获取所有所有使用了RocketMQMessageListener注解的beanMap<String,Object>beans=this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);if(Objects.nonNull(beans)){//循环注册容器beans.forEach(this::registerContainer);}}privatevoidregisterContainer(StringbeanName,Objectbean){Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean);//校验当前bean是否实现了RocketMQListener接口if(!RocketMQListener.class.isAssignableFrom(bean.getClass())){thrownewIllegalStateException(clazz+"isnotinstanceof"+RocketMQListener.class.getName());}//获取bean上的annotationRocketMQMessageListenerannotation=clazz.getAnnotation(RocketMQMessageListener.class);//解析group及topic,可支持表达式StringconsumerGroup=this.environment.resolvePlaceholders(annotation.consumerGroup());Stringtopic=this.environment.resolvePlaceholders(annotation.topic());booleanlistenerEnabled=(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup,Collections.EMPTY_MAP).getOrDefault(topic,true);if(!listenerEnabled){log.debug("ConsumerListener(group:{},topic:{})isnotenabledbyconfiguration,willignoreinitialization.",consumerGroup,topic);return;}validate(annotation);StringcontainerBeanName=String.format("%s_%s",DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContextgenericApplicationContext=(GenericApplicationContext)applicationContext;//注册bean的,调用createRocketMQListenerContainergenericApplicationContext.registerBean(containerBeanName,DefaultRocketMQListenerContainer.class,()->createRocketMQListenerContainer(containerBeanName,bean,annotation));DefaultRocketMQListenerContainercontainer=genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if(!container.isRunning()){try{container.start();}catch(Exceptione){log.error("Startedcontainerfailed.{}",container,e);thrownewRuntimeException(e);}}log.info("Registerthelistenertocontainer,listenerBeanName:{},containerBeanName:{}",beanName,containerBeanName);}privateDefaultRocketMQListenerContainercreateRocketMQListenerContainer(Stringname,Objectbean,RocketMQMessageListenerannotation){DefaultRocketMQListenerContainercontainer=newDefaultRocketMQListenerContainer();container.setRocketMQMessageListener(annotation);StringnameServer=environment.resolvePlaceholders(annotation.nameServer());nameServer=StringUtils.isEmpty(nameServer)?rocketMQProperties.getNameServer():nameServer;StringaccessChannel=environment.resolvePlaceholders(annotation.accessChannel());container.setNameServer(nameServer);if(!StringUtils.isEmpty(accessChannel)){container.setAccessChannel(AccessChannel.valueOf(accessChannel));}container.setTopic(environment.resolvePlaceholders(annotation.topic()));//此处已经根据表达式将数据取出Stringtags=environment.resolvePlaceholders(annotation.selectorExpression());if(!StringUtils.isEmpty(tags)){container.setSelectorExpression(tags);}container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));//此处将SelectorExpression的数据覆盖成了表达式container.setRocketMQMessageListener(annotation);container.setRocketMQListener((RocketMQListener)bean);container.setObjectMapper(objectMapper);container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());container.setName(name);//REVIEWME,usethesameclientIdormultiple?returncontainer;}
问题解决
因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。
/***在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据**/@ConfigurationpublicclassChangeSelectorExpressionBeforeMQInitimplementsInitializingBean{@AutowiredprivateApplicationContextapplicationContext;@AutowiredprivateStandardEnvironmentenvironment;@OverridepublicvoidafterPropertiesSet()throwsException{Map<String,Object>beans=applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);for(Objectbean:beans.values()){Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean);if(!RocketMQListener.class.isAssignableFrom(bean.getClass())){continue;}RocketMQMessageListenerannotation=clazz.getAnnotation(RocketMQMessageListener.class);InvocationHandlerinvocationHandler=Proxy.getInvocationHandler(annotation);Fieldfield=invocationHandler.getClass().getDeclaredField("memberValues");field.setAccessible(true);Map<String,Object>memberValues=(Map<String,Object>)field.get(invocationHandler);for(Map.Entry<String,Object>entry:memberValues.entrySet()){if(Objects.nonNull(entry)){memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));}}}}}
初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。
到此,相信大家对“SpringBoot整合RocketMQ遇到的坑怎么解决”有了更深的了解,不妨来实际操作一番吧!这里是恰卡编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
推荐阅读
-
springboot实现基于aop的切面日志
本文实例为大家分享了springboot实现基于aop的切面日志的具体代码,供大家参考,具体内容如下通过aop的切面方式实现日志...
-
SpringBoot定时任务功能怎么实现
-
SpringBoot中的@Import注解怎么使用
-
SpringBoot整合Lombok及常见问题怎么解决
-
springboot图片验证码功能模块怎么实现
-
Springboot+SpringSecurity怎么实现图片验证码登录
-
SpringBoot注解的知识点有哪些
SpringBoot注解的知识点有哪些这篇“SpringBoot注...
-
SpringBoot2.x中management.security.enabled=false无效怎么解决
-
springboot怎么禁用某项健康检查
springboot怎么禁用某项健康检查今天小编给大家分享一下sp...
-
SpringBoot2怎么自定义端点