史上最全的kafka知识解析(sparkkafka)
针对不同的Spark、Kafka版本,集成处理数据的方式分为两种:Receiver based Approach和Direct Approach,不同集成版本处理方式的支持,可参考下图:
Receiver based Approach
基于receiver的方式是使用kafka消费者高阶API实现的。
对于所有的receiver,它通过kafka接收的数据会被存储于spark的executors上,底层是写入BlockManager中,默认200ms生成一个block(通过配置参数spark.streaming.blockInterval决定)。然后由spark streaming提交的job构建BlockRdd,最终以spark core任务的形式运行。
关于receiver方式,有以下几点需要注意:
receiver作为一个常驻线程调度到executor上运行,占用一个cpureceiver个数由KafkaUtils.createStream调用次数决定,一次一个receiverkafka中的topic分区并不能关联产生在spark streaming中的rdd分区增加在KafkaUtils.createStream()中的指定的topic分区数,仅仅增加了单个receiver消费的topic的线程数,它不会增加处理数据中的并行的spark的数量【topicMap[topic,num_threads]map的value对应的数值是每个topic对应的消费线程数】receiver默认200ms生成一个block,建议根据数据量大小调整block生成周期。receiver接收的数据会放入到BlockManager,每个executor都会有一个BlockManager实例,由于数据本地性,那些存在receiver的executor会被调度执行更多的task,就会导致某些executor比较空闲
建议通过参数spark.locality.wait调整数据本地性。该参数设置的不合理,比如设置为10而任务2s就处理结束,就会导致越来越多的任务调度到数据存在的executor上执行,导致任务执行缓慢甚至失败(要和数据倾斜区分开)
多个kafka输入的DStreams可以使用不同的groups、topics创建,使用多个receivers接收处理数据
两种receiver可靠的receiver:
可靠的receiver在接收到数据并通过复制机制存储在spark中时准确的向可靠的数据源发送ack确认不可靠的receiver:不可靠的receiver不会向数据源发送数据已接收确认。 这适用于用于不支持ack的数据源当然,我们也可以自定义receiver。receiver处理数据可靠性默认情况下,receiver是可能丢失数据的。可以通过设置spark.streaming.receiver.writeAheadLog.enable为true开启预写日志机制,将数据先写入一个可靠地分布式文件系统如hdfs,确保数据不丢失,但会失去一定性能
限制消费者消费的最大速率涉及三个参数:
spark.streaming.backpressure.enabled:默认是false,设置为true,就开启了背压机制;spark.streaming.backpressure.initialRate:默认没设置初始消费速率,第一次启动时每个receiver接收数据的最大值;spark.streaming.receiver.maxRate:默认值没设置,每个receiver接收数据的最大速率(每秒记录数)。每个流每秒最多将消费此数量的记录,将此配置设置为0或负数将不会对最大速率进行限制
在产生job时,会将当前job有效范围内的所有block组成一个BlockRDD,一个block对应一个分区
kafka082版本消费者高阶API中,有分组的概念,建议使消费者组内的线程数(消费者个数)和kafka分区数保持一致。如果多于分区数,会有部分消费者处于空闲状态
Direct Approach
direct approach是spark streaming不使用receiver集成kafka的方式,一般在企业生产环境中使用较多。相较于receiver,有以下特点:
1.不使用receiver
不需要创建多个kafka streams并聚合它们
减少不必要的CPU占用
减少了receiver接收数据写入BlockManager,然后运行时再通过blockId、网络传输、磁盘读取等来获取数据的整个过程,提升了效率
无需wal,进一步减少磁盘IO操作
2.direct方式生的rdd是KafkaRDD,它的分区数与kafka分区数保持一致一样多的rdd分区来消费,更方便我们对并行度进行控制
注意:在shuffle或者repartition操作后生成的rdd,这种对应关系会失效
3.可以手动维护offset,实现exactly once语义
4.数据本地性问题。在KafkaRDD在compute函数中,使用SimpleConsumer根据指定的topic、分区、offset去读取kafka数据。
但在010版本后,又存在假如kafka和spark处于同一集群存在数据本地性的问题
5.限制消费者消费的最大速率
spark.streaming.kafka.maxRatePerPartition:从每个kafka分区读取数据的最大速率(每秒记录数)。这是针对每个分区进行限速,需要事先知道kafka分区数,来评估系统的吞吐量。
推荐阅读
-
微软在游戏中断后在Windows 11上为谷歌Chrome撤下必应广告
-
一百年!WordPress 以 38,000 美元出售域名和托管计划
-
谷歌湾景园新总部即将上线
-
GPT-3.5 Turbo 现在可支持微调以提高质量和性能
OpenAI已向开发者提供了GPT-3.5Turbo(https://openai.com/blog/gpt-3-5-tur...
-
提高 Snowflake 工作效率的六大工具
-
值得收藏!如何快速画出一幅漂亮的架构图
阿里妹导读这篇文章总结了常用的架构图类型,可以借鉴笔者提供的模板,快速地产出符合业务需要的架构图。为什么要画好一幅架构图?一幅漂亮...
-
祝 Linux 32 岁生日快乐!🎂
-
Google 对十年不变的 Keep 笔记本应用添加功能
-
日本福岛含氚废水排放与争议
日本东京电力公司在24日午间将福岛第一核电站的核处理水(也称ALPS处理水)排放入海,中国全面暂停日本水产品进口。那么什么...
-
Code Llama,一个开源人工智能编码工具