系列文章目录

一、DataX详解和架构介绍 二、DataX源码分析 JobContainer 三、DataX源码分析 TaskGroupContainer 四、DataX源码分析 TaskExecutor 五、DataX源码分析 reader 六、DataX源码分析 writer 七、DataX源码分析 Channel 八、DataX源码分析-插件机制

文章目录

系列文章目录DataX是什么?DataX支持的数据源DataX的框架设计DataX核心架构核心模块介绍:DataX调度流程:

DataX部署和配置

DataX是什么?

DataX是阿里开源的异构数据源离线同步工具。它致力于实现包括关系型数据库(如MySQL、Oracle等)、HDFS、Hive、MaxCompute(原ODPS)、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

DataX的设计理念是将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源时,只需要将此数据源对接到DataX,便能与已有的数据源实现无缝数据同步。

DataX的架构主要基于Framework + Plugin的设计模式。它将数据读取和写入抽象成为Reader和Writer插件,这些插件可以接入不同的数据源,实现数据的读取和写入操作。同时,DataX提供了丰富的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。

DataX的核心优势包括稳定性、高效性、易用性和扩展性。它经过长时间大规模生产环境的验证,能够保证数据同步的稳定性和可靠性;通过多线程、多进程、流式处理等技术手段,实现高效的数据同步;提供简单易用的配置方式,用户可以通过配置文件来定义数据源、目标端、同步策略等;支持丰富的插件体系,可以方便地扩展新的数据源和目标端。

此外,DataX还提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让作业在库可以承受的范围内达到最佳的同步速度。同时,它还具有强劲的同步性能、健壮的容错机制以及极简的使用体验等特点。

总之,DataX是一个强大而灵活的数据同步工具,能够有效地解决异构数据源之间的数据同步问题。通过合理的配置和优化,它可以帮助用户实现高效、稳定、可靠的数据同步操作。

DataX支持的数据源

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入 。

类型数据源Reader(读)Writer(写)文档RDBMS 关系型数据库MySQL√√读 、写Oracle√√读 、写OceanBase√√读 、写SQLServer√√读 、写PostgreSQL√√读 、写DRDS√√读 、写Kingbase√√读 、写通用RDBMS(支持所有关系型数据库)√√读 、写阿里云数仓数据存储ODPS√√读 、写ADB√写ADS√写OSS√√读 、写OCS√写Hologres√写AnalyticDB For PostgreSQL√写阿里云中间件datahub√√读 、写SLS√√读 、写图数据库阿里云 GDB√√读 、写Neo4j√写NoSQL数据存储OTS√√读 、写Hbase0.94√√读 、写Hbase1.1√√读 、写Phoenix4.x√√读 、写Phoenix5.x√√读 、写MongoDB√√读 、写Cassandra√√读 、写数仓数据存储StarRocks√√读 、写ApacheDoris√写ClickHouse√√读 、写Databend√写Hive√√读 、写kudu√写selectdb√写无结构化数据存储TxtFile√√读 、写FTP√√读 、写HDFS√√读 、写Elasticsearch√写时间序列数据库OpenTSDB√读TSDB√√读 、写TDengine√√读 、写

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

DataX的框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX核心架构

DataX 3.0采用微内核架构模式, 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

核心模块介绍:

DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

DataX调度流程:

DataX的调度流程可以分为以下几个步骤:

Job切分:首先,DataX的Job模块会根据分库分表策略将Job切分成若干个小的Task。这是为了确保每个Task可以独立执行,并且可以并发执行以提高效率。并发数与TaskGroup计算:然后,根据用户配置的并发数,DataX会计算需要分配多少个TaskGroup。计算的方式是将总的Task数量除以每个TaskGroup中的Task数量(通常为5),从而得到TaskGroup的数量。TaskGroup分配与启动:接下来,DataX会根据计算出的TaskGroup数量,将Task分配到各个TaskGroup中。每个TaskGroup会启动多个TaskExecutor来执行具体的Task。TaskExecutor启动:当TaskGroup启动后,其中的TaskExecutor会启动ReaderThread和WriterThread。ReaderThread负责从数据源读取数据,WriterThread负责将数据写入目标端。这两个线程协同工作,实现了数据的读取、转换和写入过程。数据同步:在每个TaskExecutor中,ReaderThread和WriterThread会不断地从数据源读取数据,并将数据写入目标端,直到所有的数据都同步完成。 整个调度流程依赖于Java底层线程池进行并发控制,DataX通过合理的调度策略和线程管理机制,实现了高效、稳定、可靠的数据同步。

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

DataXJob根据分库分表切分成了100个Task。根据20个并发,DataX计算共需要分配4个TaskGroup。4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

DataX部署和配置

工具部署

方法一、直接下载DataX工具包:DataX下载地址 下载后解压至本地某个目录,进入bin目录,即可运行同步作业: $ cd {YOUR_DATAX_HOME}/bin

$ python datax.py {YOUR_JOB.json}

自检脚本: python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json 方法二、下载DataX源码,自己编译:DataX源码 (1)、下载DataX源码: $ git clone git@github.com:alibaba/DataX.git

(2)、通过maven打包: $ cd {DataX_source_code_home}

$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true

打包成功,日志显示如下: [INFO] BUILD SUCCESS

[INFO] -----------------------------------------------------------------

[INFO] Total time: 08:12 min

[INFO] Finished at: 2015-12-13T16:26:48+08:00

[INFO] Final Memory: 133M/960M

[INFO] -----------------------------------------------------------------

打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下: $ cd {DataX_source_code_home}

$ ls ./target/datax/datax/

bin conf job lib log log_perf plugin script tmp

配置示例:从stream读取数据并打印到控制台

第一步、创建作业的配置文件(json格式) 可以通过命令查看配置模板: python datax.py -r {YOUR_READER} -w {YOUR_WRITER} $ cd {YOUR_DATAX_HOME}/bin

$ python datax.py -r streamreader -w streamwriter

DataX (UNKNOWN_DATAX_VERSION), From Alibaba !

Copyright (C) 2010-2015, Alibaba Group. All Rights Reserved.

Please refer to the streamreader document:

https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:

https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

Please save the following configuration as a json file and use

python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json

to run the job.

{

"job": {

"content": [

{

"reader": {

"name": "streamreader",

"parameter": {

"column": [],

"sliceRecordCount": ""

}

},

"writer": {

"name": "streamwriter",

"parameter": {

"encoding": "",

"print": true

}

}

}

],

"setting": {

"speed": {

"channel": ""

}

}

}

}

根据模板配置json如下: #stream2stream.json

{

"job": {

"content": [

{

"reader": {

"name": "streamreader",

"parameter": {

"sliceRecordCount": 10,

"column": [

{

"type": "long",

"value": "10"

},

{

"type": "string",

"value": "hello,你好,世界-DataX"

}

]

}

},

"writer": {

"name": "streamwriter",

"parameter": {

"encoding": "UTF-8",

"print": true

}

}

}

],

"setting": {

"speed": {

"channel": 5

}

}

}

}

第二步:启动DataX $ cd {YOUR_DATAX_DIR_BIN}

$ python datax.py ./stream2stream.json

同步结束,显示日志如下: ...

2023-12-17 11:20:25.263 [job-0] INFO JobContainer -

任务启动时刻 : 2023-12-17 11:20:15

任务结束时刻 : 2023-12-17 11:20:25

任务总计耗时 : 10s

任务平均流量 : 205B/s

记录写入速度 : 5rec/s

读出记录总数 : 50

读写失败总数 : 0

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: