目录

0. 相关文章链接

1. 为什么要实现将业务数据实时写入到数据仓库中

2. 架构设计

3. FlinkSQL将binlog写入到HDFS中

4. 创建增量外部表(binlog表)

5. 主作业

5.1. 概述

5.2. fmys_goods_ext_start节点说明

5.3. ods_fmys_goods_ext_dt_CDM任务节点说明

5.4. ods_fmys_goods_ext_dt_DLI任务节点说明

5.5. dwd_fmys_goods_ext_视图创建节点说明

5.6. dwd_fmys_goods_ext_dt_dwd层分区表节点说明

5.7. dwd_fmys_goods_ext_record_dwd层历史记录表节点说明

6. 总结

6.1. 历史数据和增量数据合并的注意事项

6.2. Java的nanoTime()

6.3. 各表保存时间以及资源消耗

6.4. 优化前后各方面对比

6.5. 注意事项

注意:博主原有相关博文 FlinkSQL+HDFS+Hive+SparkSQL实现业务数据增量进入数据仓库 ,此方案是在原方案上优化而来,故其中术语名词等说明会跟原博文一致

0. 相关文章链接

   开发随笔文章汇总  

1. 为什么要实现将业务数据实时写入到数据仓库中

        在我们的数据仓库中,一般情况下都是通过Spoop和Datax等数据传输框架,将数据按天同步到数据仓库中。而且根据业务库的表的类型,可以选择全量同步(每天全量或者一次拉取,比如省份表等)、增量同步(按照更新时间字段拉取数据,并将数据和原表合并,比如订单表)、拉链表(按照更新时间拉取数据,并将数据和原表进行计算,比如用户表)等类型进行数据同步。

        但是有时当我们需要实时数据的时候(一般是指小于1小时,但是又没有达到秒级),这按天同步就不能满足需求了,而如果每个任务都使用Flink的话,又会照成资源浪费。

        这个时候就可以考虑将业务数据实时同步到数据仓库中,然后后端使用PerstoAPI等框架,每5分钟等计算一次,并将数据进行缓存,这样就可以进行准实时数据协助,并能有效节省资源。

2. 架构设计

在Hive表中会存在一张外部表,该表为分区表,存储每天对应业务表的历史数据(有2种来源,1种是手动跑数时通过Spoop从MySQL业务库中拉取全量数据,第2种是昨天的离线表和昨天的增量表合并生成的)需要使用FlinkCDC将MySQL中的binlog数据获取,并发送到Kafka对应的topic中通过FlinkSQL将Kafka的数据获取出来,并直接将数据通过filesystem写入到HDFS中,通过外部表指定路径(使用华为云的话可以通过FlinkSQL写入到OBS中)在每天执行的作业中添加该作业对应的日期和表的分区将ods昨天分区+binlog昨天以及以后分区数据合并写入到ods的新分区中(历史分区数据可删除)将ods今天分区+binlog今天以及以后分区创建视图将dwd今天数据保存到分区中(历史分区数据可删除)将dwd历史操作记录表昨天分区+binlog昨天数据合并写入该表今天分区中

3. FlinkSQL将binlog写入到HDFS中

注意:博主使用的是华为云产品,所以使用的是华为云DLI中的FlinkSQL和OBS,可以类比开源的FlinkSQL和HDFS

-- kafka的 bigdata_mysql_binlog 主题中示例数据如下

-- 注意:其中的nanoTime是使用Java中的System.nanoTime()函数生成的,也就是说,如果程序失败,需要将分区数据删除,然后在flink作业中设置拉取时间重跑

-- 具体字段描述如下:

-- db: 在MySQL中对应的数据库

-- tb: 在MySQL中对应的表

-- columns: 一个JSONArray,表示所有的字段,如下示例,一个JSONArray中存在该表中所有字段,存储的json的key为字段名,在该json中又有value这个属性,表示该字段对应的值

-- event: event对应操作类型和数值: insert = 0 ; delete = 1 ; update = 2 ; alter = 3 ;

-- sql: 触发该更改或删除等操作的sql语句

-- primary_key: 该更改的主键

-- create_time: 该数据从binlog中获取出来的时间(分区就用此时间)

-- sendTime: 该数据进入到kafka的时间(该时间不用)

-- nano_time: 纳秒级时间戳,用了区分数据的前后,使用此时间可以保证数据唯一

-- {

-- "columns": {

-- "admin_note": {

-- "key": false,

-- "mysqlType": "varchar(255)",

-- "name": "admin_note",

-- "null_val": false,

-- "update": false,

-- "value": ""

-- },

-- "allot_num": {

-- "key": false,

-- "mysqlType": "smallint(5) unsigned",

-- "name": "allot_num",

-- "null_val": false,

-- "update": false,

-- "value": "0"

-- }

-- },

-- "createTime": 1649213973044,

-- "db": "xxx",

-- "event": 2,

-- "nanoTime": 23494049498709146,

-- "primaryKey": "157128418",

-- "sendTime": 1649213973045,

-- "sql": "",

-- "tb": "fmys_order_infos"

-- }

-- source端,对接kafka,从kafka中获取数据

CREATE SOURCE STREAM bigdata_binlog_ingest_source_bigdata_mysql_binlog (

db STRING,

tb STRING,

columns STRING,

event INT,

`sql` STRING,

`primary_key` STRING,

create_time bigint,

send_time bigint,

nano_time bigint

) WITH (

type = "kafka",

kafka_bootstrap_servers = "bigdata1:9092,bigdata2:9092,bigdata3:9092",

kafka_group_id = "bigdata_mysql_binlog_dim_to_obs_test",

kafka_topic = "bigdata_mysql_binlog_dim",

encode = "json",

json_config = "db=db;tb=tb;columns=columns;event=event;sql=sql;primary_key=primaryKey;create_time=createTime;send_time=sendTime;nano_time=nanoTime;"

);

-- sink端,对接obs,将数据写入到obs中

CREATE SINK STREAM bigdata_binlog_ingest_sink_bigdata_mysql_binlog (

db STRING,

tb STRING,

columns STRING,

event int,

`sql` STRING,

`primary_key` STRING,

create_time bigint,

nano_time bigint,

dt STRING

) PARTITIONED BY(dt,tb) WITH (

type = "filesystem",

file.path = "obs://xxx-bigdata/xxx/xxx/ods_binlog_data",

encode = "parquet",

ak = "akxxx",

sk = "skxxx"

);

-- 从source表获取数据,写出到sink表中

CREATE FUNCTION date_formatted AS 'com.xxx.cdc.udf.DateFormattedUDF';

INSERT INTO

bigdata_binlog_ingest_sink_bigdata_mysql_binlog

SELECT

db,

tb,

columns,

event,

`sql`,

`primary_key`,

create_time,

nano_time,

date_formatted(

cast(coalesce(create_time, UNIX_TIMESTAMP_MS()) / 1000 as VARCHAR(10)),

'yyyyMMdd'

) as dt

FROM

bigdata_binlog_ingest_source_bigdata_mysql_binlog;

上述FlinkSQL主要通过source获取Kafka主题中的数据,然后什么都不要改变,将数据写入到HDFS中,写入的时候注意,以dt为一级分区,tb为二级分区,主要考虑如下几点:

binlog增量数据总体来说数据量并不大,并是使用了分区,所以所有的数据写入到一张表中即可dt日期分区为一级分区,tb表名为二级分区,主要考虑的是删除历史数据容易删除

4. 创建增量外部表(binlog表)

外部表建表语句如下:

CREATE TABLE yishou_data.ods_binlog_data (

`db` STRING COMMENT '数据库名',

`columns` STRING COMMENT '列数据',

`event` BIGINT COMMENT '操作类型',

`sql` STRING COMMENT '执行的SQL',

`primary_key` STRING COMMENT '主键的值',

`create_time` BIGINT COMMENT '从MySQL中获取binlog的13位时间戳',

`send_time` BIGINT COMMENT '发送binlog到kafka的13位时间戳',

`nano_time` BIGINT COMMENT '纳秒级更新时间戳',

`dt` BIGINT COMMENT '日期分区',

`tb` STRING COMMENT '表名'

) USING parquet

PARTITIONED BY (dt, tb) COMMENT 'ods层binlog表(所有通过binlog进入数仓的数据均在此表中)'

LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_binlog_data'

;

建表完后查询结果如下(注意该表是直接插入数据到文件系统中,所以需要根据实际情况来msck该表):

select * from yishou_data.ods_binlog_data_table where dt = 20220816 and tb = 'fmys_goods_ext';

5. 主作业

5.1. 概述

架构图如下:

该作业目的:

定时创建视图(在dwd的视图中不用指定时间,永远都是最新的数据,并只有1天的小文件)定时将昨天的old表数据和昨天的binlog表数据合并,并导入今天的old表中当数据不对或者失败时,可以通过重跑从业务库中拉取全量数据(提高抗风险性)将历史所有的binlog进行保存,可以获取所有的历史修改记录

作业大致说明:

该调度任务时间为凌晨 0点 30分在start节点中会添加binlog的分区,并针对其他表执行msck操作(下述会详细说明)上面部分CDM任务的IF条件为 #{DateUtil.format(Job.planTime,"HHmmss") != '003000' ? 'true' : 'false'} ,即当调度时间不为 00点30分00秒 时,会执行此任务,从业务库中拉取全量数据到数仓中(即不是自动调度就会执行此选项)下面部分DLI任务的IF条件为 #{DateUtil.format(Job.planTime,"HHmmss") == '003000' ? 'true' : 'false'} ,即当调度时间为 00点30分00秒 时,会执行此任务,将ods昨天的数据和binlog昨天的数据合并,并导入到ods今天的数据中(当自动调度就会执行此选项)

5.2. fmys_goods_ext_start节点说明

该节点为SQL节点,具体代码如下:

-- 在ods_binlog_data表中增加对应日期和表名的分区

ALTER TABLE yishou_data.ods_binlog_data ADD if not exists PARTITION (dt=${gmtdate}, tb = 'fmys_goods_ext');

-- msck此作业涉及的会删除历史数据的3张表

MSCK REPAIR TABLE yishou_data.ods_fmys_goods_ext_dt;

MSCK REPAIR TABLE yishou_data.dwd_fmys_goods_ext_dt;

MSCK REPAIR TABLE yishou_data.dwd_fmys_goods_ext_record_dt;

主要目的是新增ods_binlog_data表对应日期和业务表的分区,并对其它表进行msck操作(因为其它表会删除历史数据)。

5.3. ods_fmys_goods_ext_dt_CDM任务节点说明

此节点为CMD(即Sqoop)作业,会将业务库中的全量数据拉取到数仓的ods表中(通过overwrite的方式,将数据写入到以今天的分区中),当手动调度时才执行此节点,主要目的是当感觉数据有误时,可以使用此作业来更新替换最新最全最正确的数据。

ods建表语句如下:

CREATE EXTERNAL TABLE yishou_data.ods_fmys_goods_ext_dt(

`goods_id` BIGINT COMMENT '*',

`limit_day` BIGINT COMMENT '*',

`auto_time` STRING COMMENT '*',

`is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',

`grade` BIGINT COMMENT '商品档次',

`season` BIGINT COMMENT '季节',

`goods_type` BIGINT COMMENT '商品类型{1:特价,}'

) COMMENT '商品拓展信息表(ods层分区表,保留7天的数据)'

PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数时的日期,比如今天2022年8月15日0点30分跑数,那这分区就是20220815')

STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_fmys_goods_ext_dt'

;

CDM任务配置如下:

5.4. ods_fmys_goods_ext_dt_DLI任务节点说明

当正常调度时,会执行此节点,会使用ods昨天的分区然后和昨天以及之后的binlog数据合并,再将数据插入到ods的今天的分区中;因为是跑的SQL作业,不需要拉取数据,所以会执行的较快,正常调度时就使用此种方法运行。

-- DLI sql

-- ******************************************************************** --

-- author: yangshibiao

-- create time: 2022/08/15 10:15:46 GMT+08:00

-- ******************************************************************** --

-- fmys_goods_ext 表在数仓中ods的建表语句(ods层历史数据表,在对应分区内保存着当天0点之前的所有数据,如果0点30分跑数,也会将这30分的数据放入)

-- CREATE EXTERNAL TABLE yishou_data.ods_fmys_goods_ext_dt(

-- `goods_id` BIGINT COMMENT '*',

-- `limit_day` BIGINT COMMENT '*',

-- `auto_time` STRING COMMENT '*',

-- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',

-- `grade` BIGINT COMMENT '商品档次',

-- `season` BIGINT COMMENT '季节',

-- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'

-- ) COMMENT '商品拓展信息表(ods层分区表,保留7天的数据)'

-- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数时的日期,比如今天2022年8月15日0点30分跑数,那这分区就是20220815')

-- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/ods_fmys_goods_ext_dt'

-- ;

-- 从ods_fmys_goods_ext_dt表中获取出昨天分区的数据(即相当于昨天0点之前的所有数据)

-- 从ods_binlog_data表中获取出昨天分区以及之后分区的数据(即昨天的增量更新的binlog数据)

-- 然后2份数据对主键开窗,获取最新的,并且不是被删除的数据,然后将数据插入到ods_fmys_goods_ext今天的分区中(即今天0点之前的所有历史数据)

insert overwrite table yishou_data.ods_fmys_goods_ext_dt partition(dt)

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, substr(${bdp.system.cyctime}, 1, 8) as dt

from (

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, nano_time

, event

, row_number() over(partition by goods_id order by nano_time desc) as row_number

from (

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, -987654321012345 as nano_time

, 0 as event

from yishou_data.ods_fmys_goods_ext_dt

where dt = ${bdp.system.bizdate}

union all

select

get_json_object(columns ,'$.goods_id.value') as goods_id

, get_json_object(columns ,'$.limit_day.value') as limit_day

, get_json_object(columns ,'$.auto_time.value') as auto_time

, get_json_object(columns ,'$.is_new.value') as is_new

, get_json_object(columns ,'$.grade.value') as grade

, get_json_object(columns ,'$.season.value') as season

, get_json_object(columns ,'$.goods_type.value') as goods_type

, nano_time as nano_time

, event as event

from yishou_data.ods_binlog_data

where

dt >= ${bdp.system.bizdate}

and tb = 'fmys_goods_ext'

)

)

-- event为1就是删除的数据,当最后一次记录为删除数据的时候,就不用这个主键了,因为已经删除了

where row_number = 1 and event != 1

;

5.5. dwd_fmys_goods_ext_视图创建节点说明

此节点的目的是使用ods今天的分区和binlog今天以及之后的分区生成一张dwd的视图,因为binlog的数据是实时更新的,所以这个dwd的视图每次查询的都是最新的,跟业务库可以做到准实时。

-- DLI sql

-- ******************************************************************** --

-- author: yangshibiao

-- create time: 2022/08/15 10:25:56 GMT+08:00

-- ******************************************************************** --

drop view if exists yishou_data.dwd_fmys_goods_ext;

create view if not exists yishou_data.dwd_fmys_goods_ext(

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

) comment 'fmys_goods_ext在dwd层视图(根据业务库数据,分钟级别更新)'

as

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

from (

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, nano_time

, event

, row_number() over(partition by goods_id order by nano_time desc) as row_number

from (

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, -987654321012345 as nano_time

, 0 as event

from yishou_data.ods_fmys_goods_ext_dt

where dt = substr(${bdp.system.cyctime}, 1, 8)

union all

select

get_json_object(columns ,'$.goods_id.value') as goods_id

, get_json_object(columns ,'$.limit_day.value') as limit_day

, get_json_object(columns ,'$.auto_time.value') as auto_time

, get_json_object(columns ,'$.is_new.value') as is_new

, get_json_object(columns ,'$.grade.value') as grade

, get_json_object(columns ,'$.season.value') as season

, get_json_object(columns ,'$.goods_type.value') as goods_type

, nano_time as nano_time

, event as event

from yishou_data.ods_binlog_data

where

dt >= substr(${bdp.system.cyctime}, 1, 8)

and tb = 'fmys_goods_ext'

)

)

where row_number = 1 and event != 1

;

5.6. dwd_fmys_goods_ext_dt_dwd层分区表节点说明

此节点的主要目的是将业务库的数据完整保存到一个分区中,主要解决的问题是当业务库的数据变化时(比如订单的状态、快递配送的状态),下一层重跑时使用新状态的问题,当dwd层有分区之后,每次重跑都只会使用那一天的状态,但是要注意,下一层的脚本需要设置分区,并且该分区可以只保留少数分区即可。

-- DLI sql

-- ******************************************************************** --

-- author: yangshibiao

-- create time: 2022/08/15 10:43:41 GMT+08:00

-- ******************************************************************** --

-- DWD层分区表建表语句

-- CREATE EXTERNAL TABLE `yishou_data`.`dwd_fmys_goods_ext_dt`(

-- `goods_id` BIGINT COMMENT '*',

-- `limit_day` BIGINT COMMENT '*',

-- `auto_time` STRING COMMENT '*',

-- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',

-- `grade` BIGINT COMMENT '商品档次',

-- `season` BIGINT COMMENT '季节',

-- `goods_type` BIGINT COMMENT '商品类型{1:特价,}'

-- ) COMMENT '商品拓展信息表(此表为分区表,保存历史90天的数据)'

-- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数分区,即当脚本调度时间为2022年8月15日0点30分,那分区为20220815')

-- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/dwd_fmys_goods_ext_dt'

-- ;

-- 从dwd视图中查出数据,并插入到dwd的分区表中(注意:可以根据具体情况设置文件个数,一般每个文件大小为30M到120M即可)

INSERT OVERWRITE TABLE `yishou_data`.`dwd_fmys_goods_ext_dt` partition(dt)

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, substr(${bdp.system.cyctime}, 1, 8) as dt

from yishou_data.dwd_fmys_goods_ext

DISTRIBUTE BY floor(rand()*20)

;

5.7. dwd_fmys_goods_ext_record_dwd层历史记录表节点说明

此节点主要目的是将业务库历史记录进行保存,该表因为保存的历史记录,所以使用昨天的分区和昨天的binlog数据聚合即可,注意该表需要初始化,详细说明在如下代码中有说明。

-- DLI sql

-- ******************************************************************** --

-- author: yangshibiao

-- create time: 2022/08/15 11:45:36 GMT+08:00

-- ******************************************************************** --

-- DWD层记录表建表语句

-- CREATE EXTERNAL TABLE `yishou_data`.`dwd_fmys_goods_ext_record_dt`(

-- `goods_id` BIGINT COMMENT '*',

-- `limit_day` BIGINT COMMENT '*',

-- `auto_time` STRING COMMENT '*',

-- `is_new` BIGINT COMMENT '-1 无效(下架) 0 旧款(有效/上架) 1 新款',

-- `grade` BIGINT COMMENT '商品档次',

-- `season` BIGINT COMMENT '季节',

-- `goods_type` BIGINT COMMENT '商品类型{1:特价,}',

-- `event` BIGINT comment '数据操作类型(0:插入;1:删除;2:修改)',

-- `record_time` BIGINT comment '从MySQL中获取binlog的13位时间戳',

-- `nano_time` BIGINT comment '纳秒级更新时间戳(当record_time相同时,使用此时间戳来判断先后)'

-- ) COMMENT '商品拓展信息表历史更改记录表(此表为分区表,保存历史7天的数据)'

-- PARTITIONED BY (`dt` BIGINT COMMENT '日期分区(yyyymmdd),跑数分区,即当脚本调度时间为2022年8月15日0点30分,那分区为20220815')

-- STORED AS orc LOCATION 'obs://yishou-bigdata/yishou_data.db/dwd_fmys_goods_ext_record_dt'

-- ;

-- 执行脚本(记录表昨天的数据和binlog昨天的数据 union all 即可)

insert overwrite table `yishou_data`.`dwd_fmys_goods_ext_record_dt` partition(dt)

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, event

, record_time

, nano_time

, substr(${bdp.system.cyctime}, 1, 8) as dt

from (

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, event

, record_time

, nano_time

from yishou_data.dwd_fmys_goods_ext_record_dt

where dt = ${bdp.system.bizdate}

union all

select

get_json_object(columns ,'$.goods_id.value') as goods_id

, get_json_object(columns ,'$.limit_day.value') as limit_day

, get_json_object(columns ,'$.auto_time.value') as auto_time

, get_json_object(columns ,'$.is_new.value') as is_new

, get_json_object(columns ,'$.grade.value') as grade

, get_json_object(columns ,'$.season.value') as season

, get_json_object(columns ,'$.goods_type.value') as goods_type

, event

, create_time as record_time

, nano_time as nano_time

from yishou_data.ods_binlog_data

where

dt = ${bdp.system.bizdate}

and tb = 'fmys_goods_ext'

)

DISTRIBUTE BY floor(rand()*30)

;

-- 初始化脚本(任务调度之前手动执行初始化脚本)(将昨天的历史所有数据和昨天的binlog汇总,根据主键去重,并设置event为0)

-- 注意:执行初始化脚本时,需要注意昨天的历史数据是否有(即 yishou_data.ods_fmys_goods_ext_dt 表中昨天分区是否有数据 以及binlog昨天对应分区的数据是否有 )

-- 如果没有,可以在功能上线正常运行的第二天,执行如下初始化脚本,再手动重跑该作业即可,这样会通过CDM重新拉取业务库中的数据(并且其中的分区表只会保存一定日期,所以可以忽略)

-- insert overwrite table `yishou_data`.`dwd_fmys_goods_ext_record_dt` partition(dt)

-- select

-- goods_id

-- , limit_day

-- , auto_time

-- , is_new

-- , grade

-- , season

-- , goods_type

-- , 0 as event

-- , record_time

-- , nano_time

-- , substr(${bdp.system.cyctime}, 1, 8) as dt

-- from (

-- select

-- goods_id

-- , limit_day

-- , auto_time

-- , is_new

-- , grade

-- , season

-- , goods_type

-- , record_time

-- , nano_time

-- , event

-- , row_number() over(partition by goods_id order by nano_time desc) as row_number

-- from (

-- select

-- goods_id

-- , limit_day

-- , auto_time

-- , is_new

-- , grade

-- , season

-- , goods_type

-- , 0 as record_time

-- , -987654321012345 as nano_time

-- , 0 as event

-- from yishou_data.ods_fmys_goods_ext_dt

-- where dt = ${bdp.system.bizdate}

-- union all

-- select

-- get_json_object(columns ,'$.goods_id.value') as goods_id

-- , get_json_object(columns ,'$.limit_day.value') as limit_day

-- , get_json_object(columns ,'$.auto_time.value') as auto_time

-- , get_json_object(columns ,'$.is_new.value') as is_new

-- , get_json_object(columns ,'$.grade.value') as grade

-- , get_json_object(columns ,'$.season.value') as season

-- , get_json_object(columns ,'$.goods_type.value') as goods_type

-- , create_time as record_time

-- , nano_time as nano_time

-- , event

-- from yishou_data.ods_binlog_data

-- where

-- dt = ${bdp.system.bizdate}

-- and tb = 'fmys_goods_ext'

-- )

-- )

-- where row_number = 1 and event != 1

-- ;

-- 创建记录表视图(该记录表今天的分区 union all 今天以及之后的binlog的数据,准实时更新)

drop view if exists yishou_data.dwd_fmys_goods_ext_record;

create view if not exists yishou_data.dwd_fmys_goods_ext_record(

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, event

, record_time

, nano_time

) comment 'dwd_fmys_goods_ext_record在dwd层视图(根据业务库数据,分钟级别更新)'

as

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, event

, record_time

, nano_time

from (

select

goods_id

, limit_day

, auto_time

, is_new

, grade

, season

, goods_type

, event

, record_time

, nano_time

from yishou_data.dwd_fmys_goods_ext_record_dt

where dt = substr(${bdp.system.cyctime}, 1, 8)

union all

select

get_json_object(columns ,'$.goods_id.value') as goods_id

, get_json_object(columns ,'$.limit_day.value') as limit_day

, get_json_object(columns ,'$.auto_time.value') as auto_time

, get_json_object(columns ,'$.is_new.value') as is_new

, get_json_object(columns ,'$.grade.value') as grade

, get_json_object(columns ,'$.season.value') as season

, get_json_object(columns ,'$.goods_type.value') as goods_type

, event

, create_time as record_time

, nano_time as nano_time

from yishou_data.ods_binlog_data

where

dt >= substr(${bdp.system.cyctime}, 1, 8)

and tb = 'fmys_goods_ext'

)

;

6. 总结

6.1. 历史数据和增量数据合并的注意事项

        该脚本为每天凌晨运行,直接从ods_fmys_goods_ext_dt表中获取昨天的数据,然后从ods_binlog_data表中也获取昨天并对应的业务表的数据。然后根据主键开窗,按照nano_time降序排序,并取第一条,并且要过滤event为1的(event为1的数据不要,这是删除的数据)数据,取出数据后将数据插入到ods_fmys_goods_ext_dt中今天的分区中(保留历史数据,降低误差,提高健壮性)。

        在ods_fmys_goods_ext_dt表中没有nano_time和event字段,需要设置nano_time为-987654321012345(这个值可以随意设置,只要够小就行,关于这样设置可以参考如下的nanoTime的含义),也需要设置event为0,表示这条数据为插入。

6.2. Java的nanoTime()

        java有两个获取和时间相关的秒数方法,一个是广泛使用的 System.currentTimeMillis() , 返回的是从一个长整型结果,表示毫秒。另一个是 System.nanoTime(), 返回的是纳秒。

        “纳”这个单位 一般不是第一次见。前几年相当火爆的“纳米”和他是同一级别。纳表示的是10的-9次方。在真空中,光一纳秒也只能传播30厘米。比纳秒大一级别的是微秒,10的-6次方;然后是就是毫秒,10的-3次方。纳秒下面还有皮秒、飞秒等。既然纳秒比毫秒高10的6次方精度,那么他们的比值就应该是10的6次方。然而并非如此。

        大家可能都知道毫秒方法返回的是自1970年到现在的毫秒数。而Java的日期也是如此,所以他俩是等值的。但是因为纳秒数值精度太高,所以不能从指定1970年到现在纳秒数,这个输出在不同的机器上可能不一样。

具体参考如下纳秒方法的注释:

Returns the current value of the running Java Virtual Machine's high-resolution time source, in nanoseconds.

This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.

返回当前JVM的高精度时间。该方法只能用来测量时段而和系统时间无关。

它的返回值是从某个固定但随意的时间点开始的(可能是未来的某个时间)。

不同的JVM使用的起点可能不同。

        这样有点恐怖的是我们相同的代码在不同机器运行导致结果可能不同。所以它很少用来计算。通常都是测量,并且还有可能是负数。所以如上脚本中,设置ods_fmys_goods_ext_dt表中nano_time的数值,才会设置一个负数,并且是一个够小的负数。

6.3. 各表保存时间以及资源消耗

yishou_data.ods_binlog_data :binlog数据表,以日期和业务库表名为分区,只需要保存7天数据即可yishou_data.ods_fmys_goods_ext_dt :ods层分区表,每个分区都是前一天的全量数据,只需要保存7天数据即可yishou_data.dwd_fmys_goods_ext :dwd层视图,准实时更新,每天会新建视图yishou_data.dwd_fmys_goods_ext_dt :dwd层历史快照,每天都是保存的业务表全量数据,跟ods层类似,但因为ods层正常来说不能提供dws层和分析师使用,所以新建dwd分区快照,可以选择保存90天数据即可yishou_data.dwd_fmys_goods_ext_record :dwd层历史变更记录视图,按分钟级更新,每次查询都是最新的历史变更记录yishou_data.dwd_fmys_goods_ext_record_dt :dwd层历史变更记录表,会将上线之后业务表的所有变更全部保存,因为每天是保存的全量数据,所以保存7天数据即可,而且之所以用分区表保存7天,是为了防止作业运行异常会导致数据异常提高容错率所设,这样前一个分区+binlog数据写入到新分区中

之前每天通过CDM全量拉取数据,需要一台专有的CDM服务器(因为同时运行多作业需要资源较多)和业务库的大IO。而现在增量只需要2个Flink任务,并且能准实时获取数据,同时还能保存历史快照和历史变更记录。

CDM集群 + 业务库映射服务器 + OBS数据存储 = 4000 + 2000 + 100 = 6100

Flink作业 + OBS数据存储 = 600 + 9100 = 9700

一个快照全量数据为1T数据左右,按上述数据保存,大概会保存100T左右数据(90分区 + 7分区 + 7分区),按照华为云OBS收费,一个月花费9100左右。

6.4. 优化前后各方面对比

指标优化前优化后备注资源1、需要一台CDM集群 2、需要业务库高IO(添加映射服务器)1、需要2个Flink任务 2、需要跑SQL的资源优化前为每天全量从业务库中拉取数据功能1、数仓中每张业务表只有1张全量表 2、只能离线(1小时以上)更新,不能实时1、数仓中有准实时表 2、数仓中有按天保存的快照数据 3、数仓中有保存历史所有的变更记录数据既有准实时,又有按天保存快照,还有历史变更记录,数据存储会增多花费1、专用CDM集群(4000) 2、业务库映射服务器(2000) 3、OBS数据存储(100)1、Flink作业(2个作业一共600) 2、OBS数据存储(9100) 3、DLI中跑SQL可以忽略较原先,花费会提高百分之50,但效率、容错等会有本质提高

6.5. 注意事项

进行如上操作后,数仓中的业务库的表就会按照分钟级别更新,同理,对流量数据,也可以参考如上操作,做到分钟级别更新分钟级别更新会比较消耗资源,但白天没作业运行时,集群中的资源也是空闲状态,可以利用上述方法提供一些分钟级别的数据协助上述操作主要是提供分钟级别数据协助,这样消耗的资源比Flink的秒级更新会少很多,具体使用可以考虑资源和效率的选择上述操作均在华为云中进行,如使用其他云平台或者使用开源大数据平台,可以参照上述思想来具体实现

注:其他相关文章链接由此进 ->    开发随笔文章汇总 

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: