如何实现基于Flink实时数据处理
如何实现基于Flink实时数据处理
小编给大家分享一下如何实现基于Flink实时数据处理,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
基于Flink 1.11的网络流量实时解析,主要针对基于Pcap的原始网络流量数据进行TCP/UDP/ICMP的协议数据实时解析,并将解析数据装成数据帧Frame,以便进行实时网络流量分析。
为完成以上功能,需要了解Pcap数据解析、TCP/UDP层协议解析、Flink的序列化和反序列化、Flink自定义函数以及基于Stream sql的Flink实时数据分析。
1、Pcap数据解析
要进行基于Pcap格式的网络流量数据解析,就必须了解Pcap文件格式定义:
如上所示,标准Pcap数据由Pcap文件头、数据桢Frame头、数据桢Frame组成。
在Pcap文件头中:Magic :0x1A2B3C 4D,用于表示Pcap数据的开始;Major:用于标示Pcap数据主版本号;Minor:用于标示Pcap数据次版本号;ThisZone:本地标准时间;SigFigs: 时间戳精度;SnapLen:最大的存储长度;LinkType:链路类型。
在数据桢头中:Timestamp1:时间戳高位,精确到S;Timestamp2:时间戳低位,精确到ms;CapLen:当前数据桢长度;
Len:网络中实际数据桢的长度。
注意:目前LinkType链路类型,支持EN10MB、RAW、LOOP、LINUX_SLI;通过以上基本结构,在Pcap文件头中,我们获取最有用的信息即时LinkType,后面我们需要根据不同的LinkType类型,进行数据桢Frame的解析。
除此之外,根据数据桢头,可以获得数据桢的封装时间;
这里根据以太网数据桢类型为例:也就是Ipv4、Ipv6、ARP数据桢,如上图所示,该类型的数据桢数据部分的偏移是14。如果是Ipv4或者Ipv6的协议类型,可以解析获取Mac地址。接下来,其实就是解析TCP/IP层的协议。
2、TCP/UDP协议解析
(1)、TCP协议
//获取TCP头大小tcpOrUdpHeaderSize=getTcpHeaderLength(packetData,ipStart+ipHeaderLen);packet.put(Packet.TCP_HEADER_LENGTH,tcpOrUdpHeaderSize);//Storethesequenceandacknowledgementnumbers--M//获取TCP请求序列号packet.put(Packet.TCP_SEQ,PcapReaderUtil.convertUnsignedInt(packetData,ipStart+ipHeaderLen+PROTOCOL_HEADER_TCP_SEQ_OFFSET));//获取TCP确认序列号packet.put(Packet.TCP_ACK,PcapReaderUtil.convertUnsignedInt(packetData,ipStart+ipHeaderLen+PROTOCOL_HEADER_TCP_ACK_OFFSET));//FlagsstretchtwobytesstartingattheTCPheaderoffsetintflags=PcapReaderUtil.convertShort(newbyte[]{packetData[ipStart+ipHeaderLen+TCP_HEADER_DATA_OFFSET],packetData[ipStart+ipHeaderLen+TCP_HEADER_DATA_OFFSET+1]})&0x1FF;//Filterfirst7bits.First4arethedataoffsetandtheother3reservedforfutureuse.packet.put(Packet.TCP_FLAG_NS,(flags&0x100)==0?false:true);packet.put(Packet.TCP_FLAG_CWR,(flags&0x80)==0?false:true);packet.put(Packet.TCP_FLAG_ECE,(flags&0x40)==0?false:true);packet.put(Packet.TCP_FLAG_URG,(flags&0x20)==0?false:true);packet.put(Packet.TCP_FLAG_ACK,(flags&0x10)==0?false:true);packet.put(Packet.TCP_FLAG_PSH,(flags&0x8)==0?false:true);packet.put(Packet.TCP_FLAG_RST,(flags&0x4)==0?false:true);packet.put(Packet.TCP_FLAG_SYN,(flags&0x2)==0?false:true);packet.put(Packet.TCP_FLAG_FIN,(flags&0x1)==0?false:true);
2、UDP协议
tcpOrUdpHeaderSize=UDP_HEADER_SIZE;if(ipProtocolHeaderVersion==4){intcksum=getUdpChecksum(packetData,ipStart,ipHeaderLen);if(cksum>=0)packet.put(Packet.UDP_SUM,cksum);}intudpLen=getUdpLength(packetData,ipStart,ipHeaderLen);packet.put(Packet.UDP_LENGTH,udpLen);
3、Kafka的序列化和反序列化
基于分布式消息队列Kafka作为网络流量数据的中间临时缓存,通过FlinkKafkaConsumer进行网络流数据的解析,这里我们自定义了PcapResover的解析器,使用自定义的解序列化函数PcapDataDeSerializer。
Kafka Producer,负责转发已采集的网络流量,这里配置使用了Kafka内部的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ByteArraySerializer.class.getName());this.consumer=newFlinkKafkaConsumer<>(this.topic,(KafkaDeserializationSchema)newPcapDataDeSerializer(Object.class),props);publicclassPcapDataDeSerializerimplementsKafkaDeserializationSchema<Object>{privatestaticfinalLoggerlog=LoggerFactory.getLogger(PcapDataDeSerializer.class);privatestaticfinallongserialVersionUID=1L;privateClass<Object>clazz;publicPcapDataDeSerializer(Class<Object>clazz){this.clazz=clazz;}List<Packet>packetList=newArrayList<>();@OverridepublicbooleanisEndOfStream(ObjectnextElement){returnfalse;}@OverridepublicObjectdeserialize(ConsumerRecord<byte[],byte[]>record)throwsIOException{DataInputViewin=newDataInputViewStreamWrapper(newByteArrayInputStream(record.value()));DataInputStreamdataInputStream=newDataInputStream((InputStream)in);PcapReaderreader=newPcapReader(dataInputStream);for(Packetpacket:reader){packetList.add(packet);}log.info("finishdeserializepcapdata,"+record.key()+",topicis"+record.topic()+","+"partitionis"+record.partition()+","+"offsetis"+record.offset());returnJSON.toJSON(packetList);}@OverridepublicTypeInformation<Object>getProducedType(){returnTypeExtractor.getForClass(this.clazz);}}
PcapDataDeSerializer主要实现KafkaDeserializationSchema<Object>中的deserialze即可,在这个函数中,会解析网络流量,并解析的网络流量封装成Pcaket List对象中,进行返回。
KafkaConsumer的创建使用自定义解序列化函数,主要是为了根据1、2 部分对于Pcap网络流量格式的分析,解析网络流量,并封装成数据桢。
4、Flink自定义函数
基于以上创建的FlinkKafkaConsumer,可以配置Flink Stream DAG,DataStreamSouce ->flatMap->Map->Stream<Frame>
DataStreamSource<Object>stream=executionEnvironment.addSource(this.consumer);log.info("starttobuildpcapdataStreamDAGgraph,transformpacketintoframestream,"+"anddefaultparallelismis4!");returnstream.flatMap(newFrameFlatMap()).map(newFrameMapFunction()).setParallelism(4);
这里其实返回的是DataStream<Frame>,也就是说,我们将原始网络流量解析,最后按照数据桢的方式输出数据流,以便与进行数据分析。接下来,为了基于Stream sql做一些数据分析,其实就可以将DataStream注册成临时表视图,然后使用类sql的语法进行实时分析了。
5、Flink实时分析示例
聚合统计10s的窗口内,目的mac地址的计数。当然这里sql的表达方式很多,而且表达能力足够强大。可以根据不同的业务诉求,进行不同的分析。
aggregationSql="selectdstMac,count(1)ascfrom"+KafkaProperties.FRAME_VIEW_NAME+"groupbytumble(PROCTIME(),interval'10'SECOND)"+",dstMac";
之后就是进行sink了,完成DAG 构建完成,Excute提交任务到集群。
Tableresult=streamTableEnvironment.sqlQuery(sql);DataStream<Row>resultData=streamTableEnvironment.toAppendStream(result,Row.class);resultData.print();
总结一下,基本流程如下图所示:
主要通过配置FlinkKafkaConsumer,实现PcapDataDesrializer负责对Pcap数据包中的Frame进行反序列化处理和解析,形式基于Frame的流数据,之后通过自定义FlatMapFunction、MapFunction函数对流数据进行处理和封装成为原始派生流DataStream<Frame>。
以上是“如何实现基于Flink实时数据处理”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注恰卡编程网行业资讯频道!