sql与nosql执行效率(第三课在sqlclientsql方式从kafka读数据到iceberg)
[root@hadoop101 software]# bin/sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.11.1.jar -j /opt/software/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar -j /opt/software/flink-sql-connector-kafka_2.11-1.11.6.jar shell
CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://hadoop101:9083', 'clients'='5', 'property-version'='1', 'hive-conf-dir'='/opt/module/hive/conf');Flink SQL> show catalogs;default_cataloghive_catalog
创建hadoop catalog
CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://ns/user/hive/warehouse/iceberg_hadoop_catalog', 'property-version'='1');
代码如下(示例):创建数据库
use catalog hive_catalogcreate database iceberg_db;
创建表:
create table `hive_catalog`.`iceberg_db`.`ib_test_log`( data String);
[root@hadoop102 kafka]# kafka-console-consumer.sh --topic test_log --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092生产者随意生产数据[root@hadoop101 ~]# kafka-console-producer.sh --topic test_log --broker-list hadoop101:9092,hadoop102:9092
#5. 检查生产者的数据是否能读取到
Flink SQL> select data from default_catalog.source.kafka_test_log;[INFO] Result retrieval cancelled.
很确定能读取到
5. 把数据写入到iceberg表insert into hive_catalog.iceberg_db.ib_test_log select data from default_catalog.source.kafka_test_log;
发现hdfs上的表,数据目录大小一直都是0,但生产者在不断写入数据,topic名称也检查了。
6.写入失败分析一开始,觉得是catalog不对,重新定义catalog,(kafka表没有加catalog和database前缀,如下图)
重新定义kafka的catalog后,还是读取不到。
给kafka表定义一个catalog和database, 发现还是读不到数据,bytes received还是0
taskManager日志
TaskManager的stack信息,的确有在select数据,难道没有select到?
"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, source, kafka_test_log]], fields=[data]) -> IcebergStreamWriter (1/1)" Id=2405 RUNNABLEat sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked sun.nio.ch.Util$3@19f5e8f7- locked java.util.Collections$UnmodifiableSet@744f2387- locked sun.nio.ch.EPollSelectorImpl@2180e63eat sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.select(Selector.java:794)at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:467)at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)...Number of locked synchronizers = 1- java.util.concurrent.locks.ReentrantLock$FairSync@2402c423"Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, source, kafka_test_log]], fields=[data]) -> IcebergStreamWriter (1/1)" Id=2401 WAITING on java.lang.Object@12863204at java.lang.Object.wait(Native Method)- waiting on java.lang.Object@12863204at java.lang.Object.wait(Object.java:502)at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:73)at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:823)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)"Thread-2084" Id=2404 TIMED_WAITING on java.util.LinkedList@1fd868c8at java.lang.Object.wait(Native Method)- waiting on java.util.LinkedList@1fd868c8at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:418)"OutputFlusher for Source: TableSourceScan(table=[[default_catalog, source, kafka_test_log]], fields=[data]) -> IcebergStreamWriter" Id=2400 TIMED_WAITINGat java.lang.Thread.sleep(Native Method)at org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWriter.java:328)"IcebergFilesCommitter -> Sink: IcebergSink hive_catalog.iceberg_db.ib_test_log (1/1)" Id=2399 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@394fdc1bat sun.misc.Unsafe.park(Native Method)- waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@394fdc1bat java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:136)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:313)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:188)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)...
总结(问题排查思路)
什么原因造成呢?看官网是这么做的,看网上其他博客,都能正常获取到数据(他们为什么这么顺利完成这个写入 demo?)。其实以为是自己的kafka,flink版本不一致,kafka是2.11的scala,flink,iceberg是2.12的,但也没有代码报错,我还是统一更改为2.12的scala版本,还是不行 (排除了版本不一致问题)。还是说明原因呢?
把hive catalog改为hadoop catalog也不行(排除是hive的问题)。
是否是flink对应的kafka版本不一致呢? 换个kafka版本,校验看看
,推荐阅读
-
围棋口诀练习软件(推荐一款免费做围棋练习题的软件)
-
一般小学生手抄报怎么画(手抄报系列.1校园万能手抄报模板教程)
-
如何设置华为鸿蒙系统好看的桌面(如何用华为手机的鸿蒙系统内置功能打造自由主题的个性化桌面)
-
华为手机桌面简易设置(让华为手机桌面更加与众不同)
-
华为手机的智能电池设置(华为手机快去设置)
-
win10电脑怎么在桌面添加回收站(Win10系统想在桌面上显示计算机)
-
oppo reno3 pro优缺点(三千多买个拍最稳的5G双模手机)
-
u 盘文件不见了怎么办(u盘文件没删除却消失了该怎么办)
-
电脑强制关机对电脑有伤害吗(强制关机对电脑有什么危害)
-
车厂为什么需要中科创达(中科创达缓速前进)