使用Datax web 创建同步任务

准备工作

创建数据源 配置数据库相关信息 创建执行器 配置执行器执行地址相关信息

1.构建reade

1.1 SQL语句 (querySql)

在json文件中此部分配置就是 querySql 在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id 当用户配置querySql时,xxxReader直接忽略table、column、where条件的配置。如果配置了querySql又配置了table,column、where等,在log中会有警告日志

1.2 表所有字段 (column)

在json文件中此部分配置就是 column

2.构建reade

2.1 前置sql语句 (preSql)

在json文件中此部分配置就是 preSql 写入数据到目的表前,会先执行这里的标准语句。如果 Sql 中有你需要操作到的表名称,请使用 @table 表示,这样在实际执行 Sql 语句时,会对变量按照实际表名称进行替换。比如你的任务是要写入到目的端的100个同构分表(表名称为:datax_00,datax01, … datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作,那么你可以这样配置:“preSql”:[“delete from 表名”],效果是:在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称。

// 也可以通过配置参数进行部分数据删除 lastTime为传入的辅助参数

"preSql": [

"delete from biz_ad_facebook where date =${lastTime}"

]

2.2 postSql (postSql)

在json文件中此部分配置就是 postSql == 写入数据到目的表后,会执行这里的标准语句。(原理同 preSql ) ==

3.字段映射

4.构建json ----选择模板

5.增量更新的配置

通过上述步骤创建出来的任务,在任务列表找到并编辑。 其中用于增量更新的辅助参数。 1.任务类型选DataX任务 2.辅助参数选择时间自增 3.增量开始时间选择,即sql中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。 4.增量时间字段,-DlastTime=‘%s’ -DcurrentTime=‘%s’ 先来解析下这段字符串

1.-D是DataX参数的标识符,必配

2.-D后面的lastTime和currentTime是DataX json中where条件的时间字段标识符,必须和json中的变量名称保持一致

3.='%s'是项目用来去替换时间的占位符,比配并且格式要完全一致

4.注意-DlastTime='%s'和-DcurrentTime='%s'中间有一个空格,空格必须保留并且是一个空格

6.遇到的问题—增量数据导入 可能重复的主键

解决办法:

datax的writeMode参数:

"writeMode": "update",

1.insert 这个参数可以设置为insert,这样子就是对于同步的主键进行设置。主要主键存在,那么在更新的时候,就不会将结果表中的数据进行修改。只会增加新的数据。 2.update 在同步到时候,设置主键,那么会查看表中主键内容的数据,如果有变动,就会直接进行替换。

// 示例json

{

"job": {

"setting": {

"speed": {

"channel": 3,

"byte": 1048576

},

"errorLimit": {

"record": 0,

"percentage": 0.02

}

},

"content": [

{

"reader": {

"name": "****",

"parameter": {

"username": "****",

"password": "****",

"splitPk": "",

"connection": [

{

"querySql": [

"select * from tablename where date >= ${lastTime} and date< ${currentTime}"

],

"jdbcUrl": [

"****"

]

}

]

}

},

"writer": {

"name": "mysqlwriter",

"parameter": {

"writeMode": "update",

"username": "******",

"password": "*******",

"column": [

"`id`",

"`account_id`",

"`account_name`",

"`campaign_id`",

"`campaign_name`",

"`ad_set_id`",

"`ad_set_name`",

"`ad_id`",

"`ad_name`",

"`date`",

"`date_start`",

"`date_stop`",

"`spend`",

"`cpm`",

"`ctr`",

"`cpc`",

"`purchase`",

"`order_num`",

"`roas`",

"`clicks`",

"`video_avg_time_watched_actions`",

"`video_creator`",

"`level`",

"`create_by`",

"`create_time`",

"`update_by`",

"`update_time`",

"`sys_org_code`",

"`product_name`",

"`site_short_name`",

"`operator`",

"`date_timestamp`",

"`impressions`",

"`add_to_card`",

"`video_thirty_sec_watched`",

"`unique_clicks`",

"`unique_checkout`",

"`unique_add_to_card`",

"`conversion_rate_ranking`",

"`engagement_rate_ranking`",

"`quality_ranking`",

"`video_p100_watched`",

"`video_p25_watched`",

"`video_p50_watched`",

"`video_p75_watched`",

"`video_p95_watched`",

"`view_content`",

"`landing_page_view`",

"`unique_link_clicks`",

"`checkout`",

"`designer`",

"`campaign_objective`",

"`inline_clicks`",

"`unique_inline_clicks`",

"`outbound_clicks`",

"`unique_outbound_clicks`",

"`frequency`",

"`inline_post_engagement`",

"`cost_per_inline_click`",

"`cost_per_inline_post`",

"`cost_per_outbound_click`",

"`cost_per_unique_click`",

"`cost_per_unique_inline_click`",

"`cost_per_unique_outbound_click`",

"`watch_avg_time`",

"`video_play_curve`",

"`asc_flag`"

],

"preSql": [

"delete from tablename where date =${lastTime}"

],

"connection": [

{

"table": [

"biz_ad_facebook"

],

"jdbcUrl": "jdbc:mysql://localhost:3322/powerbi"

}

]

}

}

}

]

}

}

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: