Apache Sqoop数据采集原理解析
sqoop数据采集格式问题
apache sqoop是一款开源的工具,主要用于在hadoop(hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : mysql ,oracle ,postgres等)中的数据导进到hadoop的hdfs中,也可以将hdfs的数据导进到关系型数据库中。
sqoop项目开始于2009年,最早是作为hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,sqoop独立成为一个apache项目。
一、sqoop工作原理
- 数据导入:sqoop通过mapreduce任务来实现数据的并行导入。首先,它会将关系型数据库中的数据表按照一定的规则进行分区,然后为每个分区启动一个map任务,同时从数据库中读取相应分区的数据,并将数据写入到hdfs或其他hadoop存储系统中。这样可以充分利用hadoop集群的分布式计算能力,提高数据导入的效率。
- 导出过程:与导入类似,sqoop也会将数据进行分区处理,然后通过map任务将hadoop中的数据读取出来,并按照目标关系型数据库的格式要求,将数据写入到数据库中。
sqoop通过创建一个数据传输的mr程序,进而实现数据传输。
sqoop安装:
- java环境配置
- hadoop环境配置
- 相关数据库驱动包
只要环境满足以上设置,直接解压sqoop安装包即可安装,修改配置后即可使用。
二、sqoop命令格式
基础使用语法:
sqoop import | export \ --数据库连接参数 --hdfs或者hive的连接参数 --配置参数
数据传输常用参数:
选项 | 参数 |
---|---|
–connect | jdbc:mysql://hostname:3306(数据库连接url) |
–username | 数据库用户名 |
–password | 数据库用户密码 |
–table | 指定数据表 |
–columns | 指定表列值 |
–where | 数据过滤条件 |
–e/–query | 自定义sql语句 |
–driver | 指定数据库驱动 |
–delete-target-dir | 导入数据时,清空目标目录 |
–target-dir | 指定导入数据的目录(通常为hdfs路径) |
–export-dir | 指定导出数据的源目录(通常为hdfs路径) |
sqoop命令的使用方法可以通过sqoop -h命令查看相关使用方法,此处不在赘述了
三、oracle数据采集格式问题
场景:
step1: 查看业务数据库中 ciss_service_workorder 表的数据条数。
select count(1) as cnt from ciss_service_workorder; 178609条
step2: 采集ciss_service_workorder的数据到hdfs上
sqoop import \ --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \ --username ciss \ --password 123456 \ --table ciss4.ciss_service_workorder \ --delete-target-dir \ --target-dir /test/full_imp/ciss4.ciss_service_workorder \ --fields-terminated-by "\001" \ #指定字段分割符 -m 1 #指定并行度
hive默认使用\001作为表字段的分隔符,但也可以在创建表时指定特殊的分隔符。
step3: 使用hive查看导入数据表的行数
create external table test_text( line string # 将导入的数据一行作为表中的一列 ) location '/test/full_imp/ciss4.ciss_service_workorder'; select count(*) from test_text; 195825条
问题:
sqoop采集完数据后,hdfs数据中存储的数据行数跟源数据库的数据量不符合。
原因:
- sqoop以文本格式导入数据时,hdfs的默认文件格式为textfile,默认的换行符是特殊字符\n。
- oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行
oracle数据:
id | name | age |
---|---|---|
001 | zhang\nsan | 18 |
sqoop转换后的数据:
001 | zhang |
san | 18 |
hive表中的数据:
id | name | age |
---|---|---|
001 | zhang | |
san | 18 |
解决方法:
- 方案一:
- 删除或者替换数据中的换行符
- sqoop参数 --hive-drop-import-delims 删除换行符
- sqoop参数 --hive-delims-replacement char 替换换行符
不建议使用,破坏原始数据结构,ods层数据尽量抱持原结构
- 方案二:
- 采用特殊的存储格式,avro格式
常见的文件格式介绍:
类型 | 介绍 |
---|---|
textfile | hive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,i/o性能较低 |
sequencefile | 含有键值对的二进制文件,优化磁盘利用率和i/o,并行操作数据,查询效率高,但存储空间消耗最大 |
avrofile | 特殊的二进制文件,设计的主要目标是为了满足schema evolution,schema和数据保存在一起 |
orcfile | 列式存储,schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快 |
parquetfile | 列式存储,与orc类似,压缩比不如orc,但是查询性能接近,支持的工具更多,通用性更强 |
avro格式特点
- 优点
- 二进制数据存储,性能好、效率高
- 使用json描述模式,支持场景更丰富
- schema和数据统一存储,消息自描述(将表中的一行数据作为对象存储,并且schema为元数据)
- 模式定义允许定义数据的排序
- 缺点
- 只支持avro自己的序列化格式
- 少量列的读取性能比较差,压缩比较低
- 场景:基于行的大规模结构化数据写入、列的读取非常多或者schema变更操作比较频繁的场景
sqoop使用avro格式:
sqoop import \ -dmapreduce.job.user.classpath.first=true \ --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \ --username ciss \ --password 123456 \ --table ciss4.ciss_service_workorder \ --delete-target-dir \ --target-dir /test/full_imp/ciss4.ciss_service_workorder \ --as-avrodatafile \ # 选择文件存储格式为avro --fields-terminated-by "\001" \ -m 1
hive建表指定文件的存储格式:
create external table test_avro( line string ) stored as avro location '/test/full_imp/ciss4.ciss_service_workorder';
avro 数据以 二进制序列化 存储,字段通过预定义的 模式(schema) 解析,而非依赖分隔符,即使字段内容包含逗号、换行符等特殊字符,也不会影响数据结构的正确性。
schema 定义(json 格式),明确描述了字段名称、类型、顺序等信息。
四、sqoop增量采集方案
sqoop 支持两种增量模式:
- append 模式:
适用于 仅追加数据 的表(如日志表),基于 递增列(如自增主键 id)采集新数据。
- lastmodified 模式:
适用于 数据会更新 的表(如用户表),基于 时间戳列(如 last_update_time)采集新增或修改的数据。
append模式要求源数据表具备自增列,如建表时设置的自增id
lastmodified模式要求源数据表具有时间戳字段。
append模式:
要求:必须有一列自增的值,按照自增的int值进行判断
特点:只能导入增加的数据,无法导入更新的数据
场景:数据只会发生新增,不会发生更新的场景
sqoop import \ # 执行数据导入操作 --connect jdbc:mysql://node3:3306/sqooptest \ # 连接mysql数据库(地址:node3,数据库名:sqooptest) --username root \ # 数据库用户名:root --password 123456 \ # 数据库密码:123456 --table tb_tohdfs \ # 要导入的源表:tb_tohdfs --target-dir /sqoop/import/test02 \ # hdfs目标目录(数据将写入此路径) --fields-terminated-by '\t' \ # 字段分隔符为制表符(\t) --check-column id \ # 指定增量检查列:id(通常是自增主键) --incremental append \ # 增量模式为“append”(仅导入新数据) --last-value 0 \ # 上次导入的id最大值(初始值为0,首次导入id>0的数据) -m 1 # 使用1个map任务(单线程)
appebd模式使用last-value记录上次导入的数据id最大值,初次导入一般为全量导入,即id>0
此处的last_value需要手动填写,因此可以使用sqoop的job管理进行自动记录。
sqoop job --create my_job -- import ... --incremental append --check-column id --last-value 0 sqoop job --exec my_job # 自动更新 last-value
lastmodified模式:
要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
特点:既导入新增的数据也导入更新的数据
场景:表中的记录会新增或更新,且每次更新都会修改 lastmode 时间戳。一般无法满足要求,所以不用。
sqoop import \ # 执行数据导入操作 --connect jdbc:mysql://node3:3306/sqooptest \ # 连接mysql数据库(地址:node3,数据库名:sqooptest) --username root \ # 数据库用户名:root --password 123456 \ # 数据库密码:123456 --table tb_lastmode \ # 要导入的源表:tb_lastmode --target-dir /sqoop/import/test03 \ # hdfs目标目录(数据将写入此路径) --fields-terminated-by '\t' \ # 字段分隔符为制表符(\t) --incremental lastmodified \ # 增量模式为“lastmodified”(采集新增或修改的数据) --check-column lastmode \ # 指定时间戳列:lastmode(记录数据的更新时间) --last-value '2021-06-06 16:09:32' \ # 上次导入的最大时间值(导入此时间之后的新增/修改数据) -m 1 # 使用1个map任务(单线程)
lastmodified模式使用时间戳记载数据的更新线。
若同一条记录被多次更新,且 lastmode 时间超过 --last-value,sqoop 会多次导入该记录。
解决方案:添加 --merge-key <主键列> 参数,合并新旧数据(基于主键去重):
--merge-key id # 假设 id 是主键列
自定义模式:
要求:每次运行的输出目录不能相同
特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集
场景:一般用于自定义增量采集每天的分区数据到hive
sqoop import \ --connect jdbc:mysql://node3:3306/db_order \ --username root \ --password-file file:///export/data/sqoop.passwd \ --query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$conditions " \ --delete-target-dir \ --target-dir /nginx/logs/tb_order/daystr=2021-09-14 \ --fields-terminated-by '\t' \ -m 1
自定义模式可以根据设置的sql进行数据导入,因此是最常用的场景。
到此这篇关于apache sqoop数据采集问题的文章就介绍到这了,更多相关apache sqoop数据采集内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
推荐阅读
-
Nginx重启失败排查与解决方案
前言在linux系统中,nginx作为高性能的web服务器和反向代理服务器,广泛应用于各类生产环境中。然而,作为一款强大而灵活的...
-
MySQL中的分组和多表连接详解
mysql中的分组和多表连接一、mysql的分组(groupby)单例函数函数含义lower将列内容变成小写upper将...
-
MySQL主从同步延迟问题的全面解决方案
一、同步延迟原因深度分析1.1主从复制原理回顾mysql主从复制流程:主库binlog→主库dump线程→从库io...
-
浅谈MySQL中drop、truncate和delete的区别
1.前言对于drop、truncate和delete,虽然简单,但是真要使用或者面试时候问到还是需要有一定的总结,今天来简单讲讲...
-
MySQL查询重写如何把复杂查询变简单详解
-
MySQL数据库约束深入详解
-
Linux搭建单机MySQL8.0.26版本的操作方法
-
史上最全nginx详细参数配置
(enginex)是一个轻量级高性能的http和反向代理服务器,同时也是一个通用代理服务器(tcp/udp/imap/po...
-
nginx负载均衡及详细配置方法
一、nginx负载均衡策略nginx作为一种高效的web服务器和反向代理服务器,广泛应用于网站的负载均衡中。负载均衡是指将接收...
-
WITH在MYSQL中的用法示例详解
with子句(也称为公共表表达式,commontableexpression,简称cte)是sql中一种强大的查询构建...