如何实现基于Flink实时数据处理
如何实现基于Flink实时数据处理
小编给大家分享一下如何实现基于Flink实时数据处理,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
基于Flink 1.11的网络流量实时解析,主要针对基于Pcap的原始网络流量数据进行TCP/UDP/ICMP的协议数据实时解析,并将解析数据装成数据帧Frame,以便进行实时网络流量分析。
为完成以上功能,需要了解Pcap数据解析、TCP/UDP层协议解析、Flink的序列化和反序列化、Flink自定义函数以及基于Stream sql的Flink实时数据分析。
1、Pcap数据解析
要进行基于Pcap格式的网络流量数据解析,就必须了解Pcap文件格式定义:
如上所示,标准Pcap数据由Pcap文件头、数据桢Frame头、数据桢Frame组成。
在Pcap文件头中:Magic :0x1A2B3C 4D,用于表示Pcap数据的开始;Major:用于标示Pcap数据主版本号;Minor:用于标示Pcap数据次版本号;ThisZone:本地标准时间;SigFigs: 时间戳精度;SnapLen:最大的存储长度;LinkType:链路类型。
在数据桢头中:Timestamp1:时间戳高位,精确到S;Timestamp2:时间戳低位,精确到ms;CapLen:当前数据桢长度;
Len:网络中实际数据桢的长度。
注意:目前LinkType链路类型,支持EN10MB、RAW、LOOP、LINUX_SLI;通过以上基本结构,在Pcap文件头中,我们获取最有用的信息即时LinkType,后面我们需要根据不同的LinkType类型,进行数据桢Frame的解析。
除此之外,根据数据桢头,可以获得数据桢的封装时间;
这里根据以太网数据桢类型为例:也就是Ipv4、Ipv6、ARP数据桢,如上图所示,该类型的数据桢数据部分的偏移是14。如果是Ipv4或者Ipv6的协议类型,可以解析获取Mac地址。接下来,其实就是解析TCP/IP层的协议。
2、TCP/UDP协议解析
(1)、TCP协议
//获取TCP头大小tcpOrUdpHeaderSize=getTcpHeaderLength(packetData,ipStart+ipHeaderLen);packet.put(Packet.TCP_HEADER_LENGTH,tcpOrUdpHeaderSize);//Storethesequenceandacknowledgementnumbers--M//获取TCP请求序列号packet.put(Packet.TCP_SEQ,PcapReaderUtil.convertUnsignedInt(packetData,ipStart+ipHeaderLen+PROTOCOL_HEADER_TCP_SEQ_OFFSET));//获取TCP确认序列号packet.put(Packet.TCP_ACK,PcapReaderUtil.convertUnsignedInt(packetData,ipStart+ipHeaderLen+PROTOCOL_HEADER_TCP_ACK_OFFSET));//FlagsstretchtwobytesstartingattheTCPheaderoffsetintflags=PcapReaderUtil.convertShort(newbyte[]{packetData[ipStart+ipHeaderLen+TCP_HEADER_DATA_OFFSET],packetData[ipStart+ipHeaderLen+TCP_HEADER_DATA_OFFSET+1]})&0x1FF;//Filterfirst7bits.First4arethedataoffsetandtheother3reservedforfutureuse.packet.put(Packet.TCP_FLAG_NS,(flags&0x100)==0?false:true);packet.put(Packet.TCP_FLAG_CWR,(flags&0x80)==0?false:true);packet.put(Packet.TCP_FLAG_ECE,(flags&0x40)==0?false:true);packet.put(Packet.TCP_FLAG_URG,(flags&0x20)==0?false:true);packet.put(Packet.TCP_FLAG_ACK,(flags&0x10)==0?false:true);packet.put(Packet.TCP_FLAG_PSH,(flags&0x8)==0?false:true);packet.put(Packet.TCP_FLAG_RST,(flags&0x4)==0?false:true);packet.put(Packet.TCP_FLAG_SYN,(flags&0x2)==0?false:true);packet.put(Packet.TCP_FLAG_FIN,(flags&0x1)==0?false:true);
2、UDP协议
tcpOrUdpHeaderSize=UDP_HEADER_SIZE;if(ipProtocolHeaderVersion==4){intcksum=getUdpChecksum(packetData,ipStart,ipHeaderLen);if(cksum>=0)packet.put(Packet.UDP_SUM,cksum);}intudpLen=getUdpLength(packetData,ipStart,ipHeaderLen);packet.put(Packet.UDP_LENGTH,udpLen);
3、Kafka的序列化和反序列化
基于分布式消息队列Kafka作为网络流量数据的中间临时缓存,通过FlinkKafkaConsumer进行网络流数据的解析,这里我们自定义了PcapResover的解析器,使用自定义的解序列化函数PcapDataDeSerializer。
Kafka Producer,负责转发已采集的网络流量,这里配置使用了Kafka内部的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,ByteArraySerializer.class.getName());this.consumer=newFlinkKafkaConsumer<>(this.topic,(KafkaDeserializationSchema)newPcapDataDeSerializer(Object.class),props);publicclassPcapDataDeSerializerimplementsKafkaDeserializationSchema
PcapDataDeSerializer主要实现KafkaDeserializationSchema
KafkaConsumer的创建使用自定义解序列化函数,主要是为了根据1、2 部分对于Pcap网络流量格式的分析,解析网络流量,并封装成数据桢。
4、Flink自定义函数
基于以上创建的FlinkKafkaConsumer,可以配置Flink Stream DAG,DataStreamSouce ->flatMap->Map->Stream
DataStreamSource
stream=executionEnvironment.addSource(this.consumer);log.info("starttobuildpcapdataStreamDAGgraph,transformpacketintoframestream,"+"anddefaultparallelismis4!");returnstream.flatMap(newFrameFlatMap()).map(newFrameMapFunction()).setParallelism(4);
这里其实返回的是DataStream,也就是说,我们将原始网络流量解析,最后按照数据桢的方式输出数据流,以便与进行数据分析。接下来,为了基于Stream sql做一些数据分析,其实就可以将DataStream注册成临时表视图,然后使用类sql的语法进行实时分析了。
5、Flink实时分析示例
聚合统计10s的窗口内,目的mac地址的计数。当然这里sql的表达方式很多,而且表达能力足够强大。可以根据不同的业务诉求,进行不同的分析。
aggregationSql="selectdstMac,count(1)ascfrom"+KafkaProperties.FRAME_VIEW_NAME+"groupbytumble(PROCTIME(),interval'10'SECOND)"+",dstMac";
之后就是进行sink了,完成DAG 构建完成,Excute提交任务到集群。
Tableresult=streamTableEnvironment.sqlQuery(sql);DataStream
resultData=streamTableEnvironment.toAppendStream(result,Row.class);resultData.print();
总结一下,基本流程如下图所示:
主要通过配置FlinkKafkaConsumer,实现PcapDataDesrializer负责对Pcap数据包中的Frame进行反序列化处理和解析,形式基于Frame的流数据,之后通过自定义FlatMapFunction、MapFunction函数对流数据进行处理和封装成为原始派生流DataStream。
以上是“如何实现基于Flink实时数据处理”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注恰卡编程网行业资讯频道!
推荐阅读
-
park和Flink有什么不同
park和Flink有什么不同这篇文章主要介绍“park和Flin...
-
Flink的函数有哪些
Flink的函数有哪些这篇文章主要介绍了Flink的函数有哪些,具...
-
怎么在Apache Flink中使用Python API
怎么在ApacheFlink中使用PythonAPI这篇文章主...
-
Apache Flink漏洞复现的示例分析
ApacheFlink漏洞复现的示例分析这篇文章主要介绍了Apa...
-
在flink中如何进行keyby窗口数据倾斜的优化
在flink中如何进行keyby窗口数据倾斜的优化今天就跟大家聊聊...
-
flink状态管理keyed的示例分析
flink状态管理keyed的示例分析这篇文章将为大家详细讲解有关...