SpringBoot整合RocketMQ遇到的坑怎么解决

SpringBoot整合RocketMQ遇到的坑怎么解决

本篇内容主要讲解“SpringBoot整合RocketMQ遇到的坑怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“SpringBoot整合RocketMQ遇到的坑怎么解决”吧!

应用场景

在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。

SpringBoot整合RocketMQ遇到的坑怎么解决

引入依赖

<!--RocketMqSpringBootStarter--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency>

消费者代码

@RocketMQMessageListener(consumerGroup="${rocketmq.group}",topic="${rocketmq.topic}",selectorExpression="${rocketmq.selectorExpression}")publicclassConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Strings){System.out.println("消费到的数据为:"+s);}}

问题排查

RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。

@OverridepublicvoidafterSingletonsInstantiated(){//获取所有所有使用了RocketMQMessageListener注解的beanMap<String,Object>beans=this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);if(Objects.nonNull(beans)){//循环注册容器beans.forEach(this::registerContainer);}}privatevoidregisterContainer(StringbeanName,Objectbean){Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean);//校验当前bean是否实现了RocketMQListener接口if(!RocketMQListener.class.isAssignableFrom(bean.getClass())){thrownewIllegalStateException(clazz+"isnotinstanceof"+RocketMQListener.class.getName());}//获取bean上的annotationRocketMQMessageListenerannotation=clazz.getAnnotation(RocketMQMessageListener.class);//解析group及topic,可支持表达式StringconsumerGroup=this.environment.resolvePlaceholders(annotation.consumerGroup());Stringtopic=this.environment.resolvePlaceholders(annotation.topic());booleanlistenerEnabled=(boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup,Collections.EMPTY_MAP).getOrDefault(topic,true);if(!listenerEnabled){log.debug("ConsumerListener(group:{},topic:{})isnotenabledbyconfiguration,willignoreinitialization.",consumerGroup,topic);return;}validate(annotation);StringcontainerBeanName=String.format("%s_%s",DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContextgenericApplicationContext=(GenericApplicationContext)applicationContext;//注册bean的,调用createRocketMQListenerContainergenericApplicationContext.registerBean(containerBeanName,DefaultRocketMQListenerContainer.class,()->createRocketMQListenerContainer(containerBeanName,bean,annotation));DefaultRocketMQListenerContainercontainer=genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);if(!container.isRunning()){try{container.start();}catch(Exceptione){log.error("Startedcontainerfailed.{}",container,e);thrownewRuntimeException(e);}}log.info("Registerthelistenertocontainer,listenerBeanName:{},containerBeanName:{}",beanName,containerBeanName);}privateDefaultRocketMQListenerContainercreateRocketMQListenerContainer(Stringname,Objectbean,RocketMQMessageListenerannotation){DefaultRocketMQListenerContainercontainer=newDefaultRocketMQListenerContainer();container.setRocketMQMessageListener(annotation);StringnameServer=environment.resolvePlaceholders(annotation.nameServer());nameServer=StringUtils.isEmpty(nameServer)?rocketMQProperties.getNameServer():nameServer;StringaccessChannel=environment.resolvePlaceholders(annotation.accessChannel());container.setNameServer(nameServer);if(!StringUtils.isEmpty(accessChannel)){container.setAccessChannel(AccessChannel.valueOf(accessChannel));}container.setTopic(environment.resolvePlaceholders(annotation.topic()));//此处已经根据表达式将数据取出Stringtags=environment.resolvePlaceholders(annotation.selectorExpression());if(!StringUtils.isEmpty(tags)){container.setSelectorExpression(tags);}container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));//此处将SelectorExpression的数据覆盖成了表达式container.setRocketMQMessageListener(annotation);container.setRocketMQListener((RocketMQListener)bean);container.setObjectMapper(objectMapper);container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());container.setName(name);//REVIEWME,usethesameclientIdormultiple?returncontainer;}

问题解决

因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。

/***在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据**/@ConfigurationpublicclassChangeSelectorExpressionBeforeMQInitimplementsInitializingBean{@AutowiredprivateApplicationContextapplicationContext;@AutowiredprivateStandardEnvironmentenvironment;@OverridepublicvoidafterPropertiesSet()throwsException{Map<String,Object>beans=applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);for(Objectbean:beans.values()){Class<?>clazz=AopProxyUtils.ultimateTargetClass(bean);if(!RocketMQListener.class.isAssignableFrom(bean.getClass())){continue;}RocketMQMessageListenerannotation=clazz.getAnnotation(RocketMQMessageListener.class);InvocationHandlerinvocationHandler=Proxy.getInvocationHandler(annotation);Fieldfield=invocationHandler.getClass().getDeclaredField("memberValues");field.setAccessible(true);Map<String,Object>memberValues=(Map<String,Object>)field.get(invocationHandler);for(Map.Entry<String,Object>entry:memberValues.entrySet()){if(Objects.nonNull(entry)){memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));}}}}}

初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。

到此,相信大家对“SpringBoot整合RocketMQ遇到的坑怎么解决”有了更深的了解,不妨来实际操作一番吧!这里是恰卡编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

发布于 2022-04-03 22:34:44
收藏
分享
海报
0 条评论
22
上一篇:springBoot下怎么实现java自动创建数据库表 下一篇:jquery中的index()方法如何使用
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码