阿里云开发者社区

电脑版
提示:原网页已由神马搜索转码, 内容由developer.aliyun.com提供.

DataX 概述、部署、数据同步运用示例

2024-05-261736
版权
版权声明:
本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《 阿里云开发者社区用户服务协议》和 《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写 侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,5000CU*H 3个月
简介:DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。

@[toc]

什么是 DataX?

DataX是阿里巴巴集团开源的、通用的数据抽取工具,广泛使用的离线数据同步工具/平台。它设计用于支持多种数据源之间的高效数据传输,可以实现不同数据源之间的数据同步、迁移、ETL(抽取、转换、加载)等数据操作。



主要特点和功能包括:

  1. 多数据源支持:DataX支持多种数据源,包括关系型数据库(如 MySQL、Oracle、SQL Server)、NoSQL 数据库(如 MongoDB、HBase、Redis)、HDFS、FTP、Hive 等。

  2. 扩展性:DataX具有良好的扩展性,可以通过编写插件来支持新的数据源或者数据目的地,以满足不同数据存储系统的需求。

  3. 灵活配置:通过配置文件,用户可以灵活地定义数据抽取任务,包括数据源、目的地、字段映射、转换规则等,以适应不同的数据处理需求。

  4. 高效传输:DataX 采用分布式、并行的数据传输策略,可以将数据以高效、快速的方式传输到目标数据源,提高数据处理的效率。

  5. 任务调度和监控:DataX 提供了任务调度和监控的功能,可以通过控制台查看任务运行情况,监控任务的健康状态,以及处理异常情况。

  6. 开源社区支持:DataX 是开源项目,拥有活跃的开源社区支持,可以获取到丰富的文档、示例和开发者社区的帮助。

image.png

DataX 设计框架

DataX 本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。同时 DataX 插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。



DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。

  • Framework:Framework 用于连接 readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 核心架构

  1. DataX 完成单个数据同步的作业,我们称之为 Job,DataX 接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。

  3. 切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。

  4. 每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5

  1. DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非 0


DataX 部署

DataX 3.0下载地址

1. 解压压缩包

tar -zxvf datax.tar.gz -C /opt/module/

2. 配置环境变量

sudo vim /etc/profile.d/my.sh
# 添加如下内容:
#DATAX_HOME
export DATAX_HOME=/opt/module/datax

刷新环境变量:source /etc/profile.d/my.sh

3. 运行测试程序

运行 DataX 自带的测试程序。

cd $DATAX_HOME
python bin/datax.py job/job.json

其中 datax.py是执行的主程序,job.json中存储的是数据源插件配置(其中有数据源的连接信息、存储信息、配置信息等)。

正常运行完成后,会看到输出相关日志内容:

image.png

DataX 无需进行其它配置,解压后即可快速使用。

DataX 数据同步 MySQL —> HDFS

1.创建 MySQL 测试库表

CREATE DATABASE `test_datax` CHARACTER SET 'utf8mb4';
USE `test_datax`;
-- 商品属性表
DROP TABLE IF EXISTS sku_info;
CREATE TABLE sku_info(
    `sku_id`      varchar(100) COMMENT "商品id",
    `name`        varchar(200) COMMENT "商品名称",
    `category_id` varchar(100) COMMENT "所属分类id",
    `from_date`   varchar(100) COMMENT "上架日期",
    `price`       decimal(10,2) COMMENT "商品单价"
) COMMENT "商品属性表";
insert into sku_infovalues ('1', 'xiaomi 10', '1', '2020-01-01', 2000),
       ('2', '手机壳', '1', '2020-02-01', 10),
       ('3', 'apple 12', '1', '2020-03-01', 5000),
       ('4', 'xiaomi 13', '1', '2020-04-01', 6000),
       ('5', '破壁机', '2', '2020-01-01', 500),
       ('6', '洗碗机', '2', '2020-02-01', 2000),
       ('7', '热水壶', '2', '2020-03-01', 100),
       ('8', '微波炉', '2', '2020-04-01', 600),
       ('9', '自行车', '3', '2020-01-01', 1000),
       ('10', '帐篷', '3', '2020-02-01', 100),
       ('11', '烧烤架', '3', '2020-02-01', 50),
       ('12', '遮阳伞', '3', '2020-03-01', 20);

2.配置 DataX 插件

我们进入 DataX 官网,下滑找到对应组件插件的文档。

我们这里是从 MySQL 中读数据,然后存储到 HDFS 上,所以我们这里就先去查找 MySQL Reader 插件文档。



然后就会找到一个 MySQL Reader 的插件配置样例,如下所示:

{

"job": {

"setting": {

// 指定通道数量(并发) "speed": {

"channel": 3 }, // 允许的误差范围 "errorLimit": {


"record": 0, "percentage": 0.02 } }, "content": [ {

// 读取配置 "reader": {

// 固定写法——mysqlreader "name": "mysqlreader", "parameter": {

// 账号密码 "username": "root", "password": "root", // 选取需要进行同步的字段 "column": [ "id", "name" ], // 通过数据切片进行数据同步 "splitPk": "db_id", // 指定库表连接信息 "connection": [ {

"table": [ "table" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/database" ] } ] } }, // 写入配置 "writer": {

"name": "streamwriter", "parameter": {

"print":true } } } ] } }

了解 MySQL Reader 插件配置后,我们现在来学习 HDFS Writer 插件配置如何编写。



HDFS Writer 插件配置样例如下所示:

{

"setting": {

}, "job": {

// 指定通道数量(并发) "setting": {

"speed": {

"channel": 2 } }, "content": [ {

// 读取配置 "reader": {

// 固定写法——txtfilereader "name": "txtfilereader", // 指定读取路径、编码、字段 "parameter": {

"path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"], "encoding": "UTF-8", "column": [ {

"index": 0, "type": "long" }, {

"index": 1, "type": "long" }, {

"index": 2, "type": "long" }, {

"index": 3, "type": "long" }, {

"index": 4, "type": "DOUBLE" }, {

"index": 5, "type": "DOUBLE" }, {

"index": 6, "type": "STRING" }, {

"index": 7, "type": "STRING" }, {

"index": 8, "type": "STRING" }, {

"index": 9, "type": "BOOLEAN" }, {

"index": 10, "type": "date" }, {

"index": 11, "type": "date" } ], "fieldDelimiter": "\t" } }, // 写入配置 "writer": {

// 固定写法——hdfswriter "name": "hdfswriter", // 指定存储路径、格式、文件前缀名、字段 "parameter": {

"defaultFS": "hdfs://xxx:port", "fileType": "orc", "path": "/user/hive/warehouse/writerorc.db/orcfull", "fileName": "xxxx", "column": [ {

"name": "col1", "type": "TINYINT" }, {

"name": "col2", "type": "SMALLINT" }, {

"name": "col3", "type": "INT" }, {

"name": "col4", "type": "BIGINT" }, {

"name": "col5", "type": "FLOAT" }, {

"name": "col6", "type": "DOUBLE" }, {

"name": "col7", "type": "STRING" }, {

"name": "col8", "type": "VARCHAR" }, {

"name": "col9", "type": "CHAR" }, {

"name": "col10", "type": "BOOLEAN" }, {

"name": "col11", "type": "date" }, {

"name": "col12", "type": "TIMESTAMP" } ], // 指定存储模式 "writeMode": "append", // 指定存储间隔符 "fieldDelimiter": "\t", // 指定数据压缩模式 "compress":"NONE" } } } ] } }

3. 编写 DataX 同步 MySQL 数据到 HDFS 插件配置

cd $DATAX_HOME/job
vim test.json

添加下列内容:

{

"job": {

"setting": {

"speed": {

"channel": 1 } }, "content": [ {

"reader": {

"name": "mysqlreader", "parameter": {

"username": "root", "password": "000000", "column": [ "sku_id", "name", "category_id", "from_date", "price" ], "splitPk": "", "connection": [ {

"table": [ "sku_info" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true" ] } ] } }, "writer": {

"name": "hdfswriter", "parameter": {

"defaultFS": "hdfs://hadoop120:8020", "fileType": "text", "path": "/testInputPath", "fileName": "sku_info", "column": [ {

"name": "sku_id", "type": "STRING" }, {

"name": "name", "type": "STRING" }, {

"name": "category_id", "type": "STRING" }, {

"name": "from_date", "type": "STRING" }, {

"name": "price", "type": "DOUBLE" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress":"gzip" } } } ] } }

上面的插件配置信息表明,将 MySQL 中 test_datax.sku_info表的全量数据同步到 HDFS 中的 /testInputPath路径下,如果指定输出文件类型为 text,那么必须指定压缩模式为 gzipbzip2

启动 Hadoop 集群,提前创建 HDFS 中的存储路径

hadoop fs -mkdir /testInputPath

运行 DataX,进行数据同步

cd $DATAX_HOME
python bin/datax.py job/test.json

执行完成后如下所示:

image.png

在 HDFS 上查看存储路径下的文件:

image.png

如果想要直接查看 HDFS 文件中存储的内容,可以运行下列命令:

hadoop fs -cat /testInputPath/* | zcat

image.png

DataX 数据同步 HDFS —> MySQL

将 HDFS 中 /testInputPath路径下的数据全量导入到 MySQL 中 test_datax库中的 test_sku_info表(该表需要提前创建)。

1. 编写 HDFS 读取插件

{

"job": {

"setting": {

"speed": {

"channel": 1 } }, "content": [ {

"reader": {

"name": "hdfsreader", "parameter": {

"path": "/testInputPath/*", "defaultFS": "hdfs://hadoop120:8020", "column": [ "*" ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "\t", "compress": "gzip" } }, "writer": {

// 待编写 } } ] } }

这里需要注意 HDFS 的数据存放格式、间隔符、压缩方式等。

2. 编写 MySQL 写入插件

{

"job": {

"setting": {

"speed": {

"channel": 1 } }, "content": [ {

"reader": {

// 待编写 } }, "writer": {

"name": "mysqlwriter", "parameter": {

"writeMode": "insert", "username": "root", "password": "000000", "column": [ "*" ], "connection": [ {

"jdbcUrl": "jdbc:mysql://hadoop120:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicRetrieval=true", "table": [ "test_sku_info" ] } ] } } } ] } }

导入数据到 MySQL 时,需要注意写入模式 writeMode,因为 HDFS 中的存储数据内容并没有主键的概念,而 MySQL 中有主键,如果模式选择不正确,会带来不必要的麻烦,模式说明如下:

image.png

分别表示插入写 insert、覆盖写 replace、更新写 update

3. 合并读写插件

{

"job": {

"setting": {

"speed": {

"channel": 1 } }, "content": [ {

"reader": {

"name": "hdfsreader", "parameter": {

"path": "/testInputPath/*", "defaultFS": "hdfs://hadoop120:8020", "column": [ "*" ], "fileType": "text", "encoding": "UTF-8", "fieldDelimiter": "\t", "compress": "gzip" } }, "writer": {

"name": "mysqlwriter", "parameter": {

"writeMode": "insert", "username": "root", "password": "000000", "column": [ "*" ], "connection": [ {

"jdbcUrl": "jdbc:mysql://hadoop120:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicRetrieval=true", "table": [ "test_sku_info" ] } ] } } } ] } }

合并完成后,将其存储到 $DATAX_HOME/job/hdfs_to_mysql.json中。

4. 创建 MySQL 中的存储库

USE `test_datax`;
DROP TABLE IF EXISTS `test_sku_info`;
CREATE TABLE `test_sku_info`  (
  `sku_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '商品id',
  `name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '商品名称',
  `category_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '所属分类id',
  `from_date` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '上架日期',
  `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '商品单价'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '商品属性表' ROW_FORMAT = Dynamic;

5. 执行数据同步

将 HDFS 中 /testInputPath路径下的数据全量导入到 MySQL 中 test_datax库中的 test_sku_info表。

cd $DATAX_HOME
python bin/datax.py job/hdfs_to_mysql.json

image.png

运行完成后,进入 MySQL 中查看同步过来的内容:

select * from test_datax.test_sku_info limit 10;

image.png

数据同步完成,其它数据源数据同步方式也基本上差不多,多看看官方插件配置文档就会了。

更多内容请查看:DataX 官网

DataX 优化

DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以根据随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

速度优化

"speed": {

"channel": 5, "byte": 1048576, "record": 10000 }
  • "channel": 5这个参数指定了通道的数量。通道可以理解为并行处理数据的通道数。在数据传输过程中,可以同时处理多个通道,提高数据传输效率。

  • "byte": 1048576这个参数指定了每个通道传输的字节数,这里的数值是 1048576,即 1 MB

  • "record": 10000这个参数指定了每个通道传输的记录数,这里的数值是 10000

配置时必须遵守以下规则:

  • 若配置了总 record记录限速,则必须配置单个 channelrecord记录限速。
  • 若配置了总 byte字节限速,则必须配置单个 channelbyte字节限速。

  • 若配置了总 record记录限速和总 byte字节限速,channel并发数参数就会失效。

配置格式:

{

"core": {

"transport": {

"channel": {

"speed": {

"byte": 1048576 //单个channel的byte字节限速1M/s
} } } }, "job": {

"setting": {

"speed": {

"byte" : 5242880 //总byte字节限速5M/s
} }, ...... } }

内存优化

在运行数据同步的时候直接指定堆内存大小。

cd $DATAX_HOME
python bin/datax.py --jvm="-Xms4G -Xmx4G" job/job.json

在这里,将 JVM 最大和最小堆内存都设置为 4GB

同步 MySQL 中 NULL 值数据到 HDFS 出现错误

解决datax抽mysql数据到hdfs之null值变成‘‘(引号)的问题

配置文件变量传参

有时候我们不想将 JSON 插件配置文件直接固定写死,那么我们就可以通过传参的方式来进行动态设置。

例如,我想要将 MySQL 数据同步到 HDFS 上,每天的同步日期都要进行修改,这时候就可以通过变量传参的方式来解决。

{

"job": {

"setting": {

"speed": {

"channel": 1, }, }, "content": [ {

"reader": {

"name": "mysqlreader", "parameter": {

"username": "root", "password": "000000", "connection": [ {

"querySql": [ "select sku_id,name,category_id,from_date,price from sku_info;" ], "jdbcUrl": [ "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true" ] } ] } }, "writer": {

"name": "hdfswriter", "parameter": {

"defaultFS": "hdfs://hadoop120:8020", "fileType": "text", "path": "/testInputPath/${dt}", "fileName": "sku_info", "column": [ "*" ], "writeMode": "append", "fieldDelimiter": "\t", "compress":"gzip" } } } ] } }

这里使用了 MySQL 数据读取另一种写法(querySql

image.png

以 SQL 作为查询语句,同时也支持使用 Where语句进行条件过滤。

在写入配置 "writer"中设置了需要传参的日期变量 dt

    "path": "/testInputPath/${dt}",

模拟将其存储路径按照日期进行动态设置。

注意:执行数据同步前需要在 HDFS 上先创建对应存储路径。

hadoop fs -mkdir /testInputPath/2023-09-16

现在来进行数据同步测试

cd $DATAX_HOME
python bin/datax.py job/var.json -p "-Ddt=2023-09-16"

正常执行完成后,结果如下所示:

image.png

数据同步完成

image.png

基于 DataX 开发的快速同步 MySQL 数据至 HDFS 上的工具

项目地址:基于 DataX 开发的快速同步 MySQL 数据至 HDFS 上的工具

image.png

相关文章
|
3月前
|
SQL存储关系型数据库
DataX - 全量数据同步工具(2)
DataX - 全量数据同步工具
|
1月前
|
关系型数据库MySQL大数据
DataX:数据同步的超音速英雄!阿里开源工具带你飞越数据传输的银河系,告别等待和故障的恐惧!快来见证这一数据工程的奇迹!
【8月更文挑战第13天】DataX是由阿里巴巴开源的一款专为大规模数据同步设计的工具,在数据工程领域展现强大竞争力。它采用插件化架构,支持多种数据源间的高效迁移。相较于Apache Sqoop和Flume,DataX通过并发写入和流处理实现了高性能同步,并简化了配置流程。DataX还支持故障恢复,能够在同步中断后继续执行,节省时间和资源。这些特性使其成为构建高效可靠数据同步方案的理想选择。
|
1月前
|
Java关系型数据库DataX
DATAX数据同步
DATAX数据同步
|
2月前
|
监控数据挖掘大数据
阿里云开源利器:DataX3.0——高效稳定的离线数据同步解决方案
对于需要集成多个数据源进行大数据分析的场景,DataX3.0同样提供了有力的支持。企业可以使用DataX将多个数据源的数据集成到一个统一的数据存储系统中,以便进行后续的数据分析和挖掘工作。这种集成能力有助于提升数据分析的效率和准确性,为企业决策提供有力支持。
|
2月前
|
分布式计算关系型数据库MySQL
MySQL超时参数优化与DataX高效数据同步实践
通过合理设置MySQL的超时参数,可以有效地提升数据库的稳定性和性能。而DataX作为一种高效的数据同步工具,可以帮助企业轻松实现不同数据源之间的数据迁移。无论是优化MySQL参数还是使用DataX进行数据同步,都需要根据具体的应用场景来进行细致的配置和测试,以达到最佳效果。
|
4月前
|
消息中间件关系型数据库MySQL
Maxwell 概述、安装、数据同步【一篇搞定】!
Maxwell 是一个由 Zendesk 开源的用于 MySQL 数据库实时数据捕获和同步的工具,支持多种数据库系统,以 JSON 格式输出变更数据。它实时监控数据库中的更新,将变化传递给其他系统,常用于实时数据管道、数据仓库和事件驱动架构。Maxwell 具有实时性、可配置性和高性能等特点。其工作流程包括 Binlog 解析、数据解析、重构、发布到消息队列(如 Kafka)以及事件处理。安装时需注意 JDK 版本,并配置 MySQL、Zookeeper 和 Kafka。此外,Maxwell 支持定向监听特定库表,并能进行历史和增量数据同步。
|
3月前
|
SQL关系型数据库MySQL
DataX - 全量数据同步工具(1)
DataX - 全量数据同步工具
|
1月前
|
SQLDataWorks关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
25天前
|
canal消息中间件关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
15944
|
1月前
|
关系型数据库MySQL数据库
【MySQL】手把手教你MySQL数据同步
【MySQL】手把手教你MySQL数据同步

热门文章

最新文章