目录

0. 相关文章链接

1. Flink同步Hive

1.1. 使用方式

1.2. 案例实操

2. Spark同步Hive

2.1. 使用方式

2.2. 案例实操

0. 相关文章链接

 Hudi文章汇总 

1. Flink同步Hive

1.1. 使用方式

        Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:

-- hms mode 配置

CREATE TABLE t1(

uuid VARCHAR(20),

name VARCHAR(10),

age INT,

ts TIMESTAMP(3),

`partition` VARCHAR(20)

)

PARTITIONED BY (`partition`)

with(

'connector'='hudi',

'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',

'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出

'hive_sync.enable'='true', -- required,开启hive同步功能

'hive_sync.table'='${hive_table}', -- required, hive 新建的表名

'hive_sync.db'='${hive_db}', -- required, hive 新建的数据库名

'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc

'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口

);

注意:核心点为上述hive_sync系列的配置。

1.2. 案例实操

CREATE TABLE t10(

id int,

num int,

ts int,

primary key (id) not enforced

)

PARTITIONED BY (num)

with(

'connector'='hudi',

'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t10',

'table.type'='COPY_ON_WRITE',

'hive_sync.enable'='true',

'hive_sync.table'='h10',

'hive_sync.db'='default',

'hive_sync.mode' = 'hms',

'hive_sync.metastore.uris' = 'thrift://hadoop1:9083'

);

insert into t10 values(1,1,1);

2. Spark同步Hive

官网参数地址:Basic Configurations | Apache Hudi

2.1. 使用方式

//设置数据集注册并同步到hive

option("hoodie.datasource.hive_sync.enable","true").

//使用hms

option("hoodie.datasource.hive_sync.mode","hms").

//hivemetastore地址

option("hoodie.datasource.hive_sync.metastore.uris", "thrift://ip:9083").

//登入hiveserver2的用户

option("hoodie.datasource.hive_sync.username","").

//登入hiveserver2的密码

option("hoodie.datasource.hive_sync.password","").

//设置hudi与hive同步的数据库

option("hoodie.datasource.hive_sync.database", "").

//设置hudi与hive同步的表名

option("hoodie.datasource.hive_sync.table", "").

//hive表同步的分区列

option("hoodie.datasource.hive_sync.partition_fields", "").

// 分区提取器 按/ 提取分区

option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").

2.2. 案例实操

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"

val basePath = "file:///tmp/hudi_trips_cow"

val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

.withColumn("a",split(col("partitionpath"),"\\/")(0))

.withColumn("b",split(col("partitionpath"),"\\/")(1))

.withColumn("c",split(col("partitionpath"),"\\/")(2))

df.write.format("hudi").

options(getQuickstartWriteConfigs).

option(PRECOMBINE_FIELD_OPT_KEY, "ts").

option(RECORDKEY_FIELD_OPT_KEY, "uuid").

option("hoodie.table.name", tableName).

option("hoodie.datasource.hive_sync.enable","true").

option("hoodie.datasource.hive_sync.mode","hms").

option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hadoop1:9083").

option("hoodie.datasource.hive_sync.database", "default").

option("hoodie.datasource.hive_sync.table", "spark_hudi").

option("hoodie.datasource.hive_sync.partition_fields", "a,b,c").

option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").

mode(Overwrite).

save(basePath)

注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总 

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: