flume-ng怎么自定义拦截器
flume-ng怎么自定义拦截器
本篇内容主要讲解“flume-ng怎么自定义拦截器”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flume-ng怎么自定义拦截器”吧!
代码如下:
packagecom.wy.flume.interceptor;importjava.util.List;importjava.util.Map;importjava.util.regex.Matcher;importjava.util.regex.Pattern;importorg.apache.commons.lang.StringUtils;importorg.apache.flume.Context;importorg.apache.flume.Event;importorg.apache.flume.interceptor.Interceptor;importorg.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;importorg.apache.flume.interceptor.RegexExtractorInterceptorSerializer;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.google.common.base.Charsets;importcom.google.common.base.Preconditions;importcom.google.common.base.Throwables;importcom.google.common.collect.Lists;publicclassRegexExtractorHeaderInterceptorimplementsInterceptor{staticfinalStringREGEX="regex";staticfinalStringSERIALIZERS="serializers";staticfinalStringEXTRACTOR_HEADER="extractorHeader";staticfinalbooleanDEFAULT_EXTRACTOR_HEADER=false;staticfinalStringEXTRACTOR_HEADER_KEY="extractorHeaderKey";privatestaticfinalLoggerlogger=LoggerFactory.getLogger(RegexExtractorHeaderInterceptor.class);privatefinalPatternregex;privatefinalList<NameAndSerializer>serializers;privatefinalbooleanextractorHeader;privatefinalStringextractorHeaderKey;privateRegexExtractorHeaderInterceptor(Patternregex,List<NameAndSerializer>serializers,booleanextractorHeader,StringextractorHeaderKey){this.regex=regex;this.serializers=serializers;this.extractorHeader=extractorHeader;this.extractorHeaderKey=extractorHeaderKey;}@Overridepublicvoidinitialize(){//NO-OP...}@Overridepublicvoidclose(){//NO-OP...}@OverridepublicEventintercept(Eventevent){StringextractorHeaderVal;if(extractorHeader){extractorHeaderVal=event.getHeaders().get(extractorHeaderKey);}else{extractorHeaderVal=newString(event.getBody(),Charsets.UTF_8);}Matchermatcher=regex.matcher(extractorHeaderVal);Map<String,String>headers=event.getHeaders();if(matcher.find()){for(intgroup=0,count=matcher.groupCount();group<count;group++){intgroupIndex=group+1;if(groupIndex>serializers.size()){if(logger.isDebugEnabled()){logger.debug("Skippinggroup{}to{}duetomissingserializer",group,count);}break;}NameAndSerializerserializer=serializers.get(group);if(logger.isDebugEnabled()){logger.debug("Serializing{}using{}",serializer.headerName,serializer.serializer);}headers.put(serializer.headerName,serializer.serializer.serialize(matcher.group(groupIndex)));}}returnevent;}@OverridepublicList<Event>intercept(List<Event>events){List<Event>intercepted=Lists.newArrayListWithCapacity(events.size());for(Eventevent:events){EventinterceptedEvent=intercept(event);if(interceptedEvent!=null){intercepted.add(interceptedEvent);}}returnintercepted;}publicstaticclassBuilderimplementsInterceptor.Builder{privatePatternregex;privateList<NameAndSerializer>serializerList;privatebooleanextractorHeader;privateStringextractorHeaderKey;privatefinalRegexExtractorInterceptorPassThroughSerializerdefaultSerializer=newRegexExtractorInterceptorPassThroughSerializer();@Overridepublicvoidconfigure(Contextcontext){StringregexString=context.getString(REGEX);Preconditions.checkArgument(!StringUtils.isEmpty(regexString),"Mustsupplyavalidregexstring");regex=Pattern.compile(regexString);regex.pattern();regex.matcher("").groupCount();configureSerializers(context);extractorHeader=context.getBoolean(EXTRACTOR_HEADER,DEFAULT_EXTRACTOR_HEADER);if(extractorHeader){extractorHeaderKey=context.getString(EXTRACTOR_HEADER_KEY);Preconditions.checkArgument(!StringUtils.isEmpty(extractorHeaderKey),"headerkeymust");}}privatevoidconfigureSerializers(Contextcontext){StringserializerListStr=context.getString(SERIALIZERS);Preconditions.checkArgument(!StringUtils.isEmpty(serializerListStr),"Mustsupplyatleastonenameandserializer");String[]serializerNames=serializerListStr.split("\\s+");ContextserializerContexts=newContext(context.getSubProperties(SERIALIZERS+"."));serializerList=Lists.newArrayListWithCapacity(serializerNames.length);for(StringserializerName:serializerNames){ContextserializerContext=newContext(serializerContexts.getSubProperties(serializerName+"."));Stringtype=serializerContext.getString("type","DEFAULT");Stringname=serializerContext.getString("name");Preconditions.checkArgument(!StringUtils.isEmpty(name),"Suppliednamecannotbeempty.");if("DEFAULT".equals(type)){serializerList.add(newNameAndSerializer(name,defaultSerializer));}else{serializerList.add(newNameAndSerializer(name,getCustomSerializer(type,serializerContext)));}}}privateRegexExtractorInterceptorSerializergetCustomSerializer(StringclazzName,Contextcontext){try{RegexExtractorInterceptorSerializerserializer=(RegexExtractorInterceptorSerializer)Class.forName(clazzName).newInstance();serializer.configure(context);returnserializer;}catch(Exceptione){logger.error("Couldnotinstantiateeventserializer.",e);Throwables.propagate(e);}returndefaultSerializer;}@OverridepublicInterceptorbuild(){Preconditions.checkArgument(regex!=null,"Regexpatternwasmisconfigured");Preconditions.checkArgument(serializerList.size()>0,"Mustsupplyavalidgroupmatchidlist");returnnewRegexExtractorHeaderInterceptor(regex,serializerList,extractorHeader,extractorHeaderKey);}}staticclassNameAndSerializer{privatefinalStringheaderName;privatefinalRegexExtractorInterceptorSerializerserializer;publicNameAndSerializer(StringheaderName,RegexExtractorInterceptorSerializerserializer){this.headerName=headerName;this.serializer=serializer;}}}
应用配置:
hdp2.sources.s1.interceptors = i2
hdp2.sources.s1.interceptors.i2.type = com.wy.flume.interceptor.RegexExtractorHeaderInterceptor$Builder
hdp2.sources.s1.interceptors.i2.regex = ([^_]+)_(\\d{8}).*
hdp2.sources.s1.interceptors.i2.extractorHeader = true
hdp2.sources.s1.interceptors.i2.extractorHeaderKey = basename
hdp2.sources.s1.interceptors.i2.serializers = s1 s2
hdp2.sources.s1.interceptors.i2.serializers.s1.name = log_type
hdp2.sources.s1.interceptors.i2.serializers.s2.name = log_day
到此,相信大家对“flume-ng怎么自定义拦截器”有了更深的了解,不妨来实际操作一番吧!这里是恰卡编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!