一、pom依赖

测试案例中,pom依赖如下,根据需要自行删减。

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.test

Examples

1.0-SNAPSHOT

1.8

1.8

UTF-8

2.11.8

2.11

2.6.0

1.14.5

2.0.0

1.2.0

0.12.0

org.apache.flink

flink-java

${flink.version}

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

org.apache.flink

flink-clients_${scala.binary.version}

${flink.version}

org.apache.flink

flink-connector-kafka_${scala.binary.version}

${flink.version}

org.apache.flink

flink-statebackend-rocksdb_${scala.binary.version}

${flink.version}

org.apache.flink

flink-runtime-web_${scala.binary.version}

${flink.version}

org.apache.flink

flink-table-api-java-bridge_2.11

${flink.version}

org.apache.flink

flink-table-planner_2.11

${flink.version}

org.apache.hudi

hudi-flink1.14-bundle

${hudi.version}

org.apache.hive

hive-exec

core

2.3.1

org.apache.hadoop

hadoop-common

${hadoop.version}

slf4j-log4j12

org.slf4j

log4j

log4j

org.apache.hadoop

hadoop-client

${hadoop.version}

org.apache.hadoop

hadoop-hdfs

${hadoop.version}

slf4j-log4j12

org.slf4j

log4j

log4j

org.apache.logging.log4j

log4j-slf4j-impl

provided

2.17.1

org.apache.logging.log4j

log4j-api

provided

2.17.1

org.apache.logging.log4j

log4j-core

provided

2.17.1

com.alibaba.fastjson2

fastjson2

2.0.16

org.springframework.boot

spring-boot-starter-data-rest

2.7.0

org.springframework.boot

spring-boot-starter-logging

com.typesafe

config

1.2.1

${pom.artifactId}-${pom.version}

org.apache.maven.plugins

maven-compiler-plugin

3.3

1.8

1.8

maven-assembly-plugin

jar-with-dependencies

com.test.main.Examples

make-assembly

package

single

src/main/resources

environment/dev/*

environment/test/*

environment/smoke/*

environment/pre/*

environment/online/*

application.properties

src/main/resources/environment/${environment}

.

dev

dev

true

test

test

smoke

smoke

online

online

Hudi官网文档链接:

Flink Guide | Apache Hudi

二、DataStream API方式读写Hudi

2.1 写Hudi

package com.test.hudi;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.runtime.state.StateBackend;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.data.GenericRowData;

import org.apache.flink.table.data.RowData;

import org.apache.flink.table.data.StringData;

import org.apache.hudi.common.model.HoodieTableType;

import org.apache.hudi.configuration.FlinkOptions;

import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;

import java.util.Map;

public class FlinkDataStreamWrite2HudiTest {

public static void main(String[] args) throws Exception {

// 1.创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录

String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";

StateBackend backend = new EmbeddedRocksDBStateBackend(true);

env.setStateBackend(backend);

CheckpointConfig conf = env.getCheckpointConfig();

// 任务流取消和故障应保留检查点

conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

conf.setCheckpointInterval(1000);//milliseconds

conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds

conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔

conf.setCheckpointStorage(checkPointPath);

// 3.准备数据

DataStreamSource studentDS = env.fromElements(

new Student(101L, "Johnson", 17L, "swimming"),

new Student(102L, "Lin", 15L, "shopping"),

new Student(103L, "Tom", 5L, "play"));

// 4.创建Hudi数据流

// 4.1 Hudi表名和路径

String studentHudiTable = "ods_student_table";

String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable;

Map studentOptions = new HashMap<>();

studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath);

studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable)

.column("id BIGINT")

.column("name STRING")

.column("age BIGINT")

.column("hobby STRING")

.pk("id")

// .pk("id,age")// 可以设置联合主键,用逗号分隔

.options(studentOptions);

// 5.转成RowData流

DataStream studentRowDataDS = studentDS.map(new MapFunction() {

@Override

public RowData map(Student value) throws Exception {

try {

Long id = value.id;

String name = value.name;

Long age = value.age;

String hobby = value.hobby;

GenericRowData row = new GenericRowData(4);

row.setField(0, Long.valueOf(id));

row.setField(1, StringData.fromString(name));

row.setField(2, Long.valueOf(age));

row.setField(3, StringData.fromString(hobby));

return row;

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

});

studentBuilder.sink(studentRowDataDS, false);

env.execute("FlinkDataStreamWrite2HudiTest");

}

public static class Student{

public Long id;

public String name;

public Long age;

public String hobby;

public Student() {

}

public Student(Long id, String name, Long age, String hobby) {

this.id = id;

this.name = name;

this.age = age;

this.hobby = hobby;

}

public Long getId() {

return id;

}

public void setId(Long id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Long getAge() {

return age;

}

public void setAge(Long age) {

this.age = age;

}

public String getHobby() {

return hobby;

}

public void setHobby(String hobby) {

this.hobby = hobby;

}

@Override

public String toString() {

return "Student{" +

"id=" + id +

", name='" + name + '\'' +

", age=" + age +

", hobby='" + hobby + '\'' +

'}';

}

}

}

案例中,通过env.fromElements造三条数据写入Hudi,通过查询,可证明3条数据写入成功:

 在实际开发中,需要切换数据源,比如从kafka读取数据,写入Hudi,将上面的数据源进行替换,并完成RowData转换即可。(切记,一定要开启checkpoint,否则只有一个,hoodie目录。本人在这里踩过坑,调了一个下午,数据都没有写入成功,只有一个hoodie目录,后来经过研究才知道需要设置checkpoint。本案例中,由于是造的三条数据,跑完之后程序就停了,不设置checkpoint,数据也会写入hudi表;但是如果正在的流计算,从kafka读数据,写入hudi,如果不设置checkpoint,数据最终无法写入hudi表)。

2.2 读Hudi

package com.test.hudi;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.data.RowData;

import org.apache.hudi.common.model.HoodieTableType;

import org.apache.hudi.configuration.FlinkOptions;

import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;

import java.util.Map;

public class FlinkDataStreamReadFromHudiTest {

public static void main(String[] args) throws Exception {

// 1. 创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2.创建Hudi数据流

String studentHudiTable = "ods_student_table";

String studentHudiTablePath = "hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/" + studentHudiTable;

Map studentOptions = new HashMap<>();

studentOptions.put(FlinkOptions.PATH.key(), studentHudiTablePath);

studentOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

studentOptions.put(FlinkOptions.READ_AS_STREAMING.key(), "true");// this option enable the streaming read

studentOptions.put(FlinkOptions.READ_START_COMMIT.key(), "16811748000000");// specifies the start commit instant time

studentOptions.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "4");//

studentOptions.put(FlinkOptions.CHANGELOG_ENABLED.key(), "true");//

HoodiePipeline.Builder studentBuilder = HoodiePipeline.builder(studentHudiTable)

.column("id BIGINT")

.column("name STRING")

.column("age BIGINT")

.column("hobby STRING")

.pk("id")

.options(studentOptions);

DataStream studentRowDataDS = studentBuilder.source(env);

// 3. 数据转换与输出

DataStream studentDS = studentRowDataDS.map(new MapFunction() {

@Override

public Student map(RowData value) throws Exception {

try {

String rowKind = value.getRowKind().name();

Long id = value.getLong(0);

String name = value.getString(1).toString();

Long age = value.getLong(2);

String hobby = value.getString(3).toString();

Student student = new Student(id, name, age, hobby, rowKind);

return student;

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

});

studentDS.print();

env.execute("FlinkDataStreamReadFromHudiTest");

}

public static class Student{

public Long id;

public String name;

public Long age;

public String hobby;

public String rowKind;

public Student() {

}

public Student(Long id, String name, Long age, String hobby, String rowKind) {

this.id = id;

this.name = name;

this.age = age;

this.hobby = hobby;

this.rowKind = rowKind;

}

public Long getId() {

return id;

}

public void setId(Long id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Long getAge() {

return age;

}

public void setAge(Long age) {

this.age = age;

}

public String getHobby() {

return hobby;

}

public void setHobby(String hobby) {

this.hobby = hobby;

}

public String getRowKind() {

return rowKind;

}

public void setRowKind(String rowKind) {

this.rowKind = rowKind;

}

@Override

public String toString() {

return "Student{" +

"id=" + id +

", name='" + name + '\'' +

", age=" + age +

", hobby='" + hobby + '\'' +

", rowKind='" + rowKind + '\'' +

'}';

}

}

}

输出结果:

 其中,rowKind,是对行的描述,有 INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE,分别对应op的 +I, -U, +U, -D,表示 插入、更新前、更新后、删除 操作。

三、Table API方式读写Hudi

3.1 写Hudi

3.1.1 数据来自DataStream

package com.test.hudi;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.runtime.state.StateBackend;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkDataStreamSqlWrite2HudiTest {

public static void main(String[] args) throws Exception {

// 1.创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

// 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录

String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";

StateBackend backend = new EmbeddedRocksDBStateBackend(true);

env.setStateBackend(backend);

CheckpointConfig conf = env.getCheckpointConfig();

// 任务流取消和故障应保留检查点

conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

conf.setCheckpointInterval(1000);//milliseconds

conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds

conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔

conf.setCheckpointStorage(checkPointPath);

// 3.准备数据,真实环境中,这里可以替换成从kafka读取数据

DataStreamSource studentDS = env.fromElements(

new Student(201L, "zhangsan", 117L, "eat"),

new Student(202L, "lisi", 115L, "drink"),

new Student(203L, "wangwu", 105L, "sleep"));

// 由于后续没有DataStream的执行算子,可以会报错:

// Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

// 不过不影响数据写入Hudi

// 当然,也可以加一步DataStream的执行算子,比如 print

// studentDS.print("DataStream: ");

// 4.通过DataStream创建表

// 4.1 第一个参数:表名;第二个参数:DataStream;第三个可选参数:指定列名,可以指定DataStream中的元素名和列名的匹配关系,比如 "userId as user_id, name, age, hobby"

tabEnv.registerDataStream("tmp_student_table", studentDS, "id, name, age, hobby");

// 5.准备Hudi表的数据流,并将数据写入Hudi表

tabEnv.executeSql("" +

"CREATE TABLE out_ods_student_table(\n" +

" id BIGINT COMMENT '学号',\n" +

" name STRING\t COMMENT '姓名',\n" +

" age BIGINT COMMENT '年龄',\n" +

" hobby STRING COMMENT '爱好',\n" +

" PRIMARY KEY (id) NOT ENFORCED\n" +

")\n" +

"WITH(\n" +

" 'connector' = 'hudi',\n" +

" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +

" 'table.type' = 'MERGE_ON_READ',\n" +

" 'compaction.async.enabled' = 'true',\n" +

" 'compaction.tasks' = '1',\n" +

" 'compaction.trigger.strategy' = 'num_commits',\n" +

" 'compaction.delta_commits' = '3',\n" +

" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +

" 'hoodie.cleaner.commits.retained'='30',\n" +

" 'hoodie.keep.min.commits'='35' ,\n" +

" 'hoodie.keep.max.commits'='40'\n" +

")");

tabEnv.executeSql("insert into out_ods_student_table select id,name,age,hobby from tmp_student_table");

env.execute("FlinkDataStreamSqlWrite2HudiTest");

}

public static class Student{

public Long id;

public String name;

public Long age;

public String hobby;

public Student() {

}

public Student(Long id, String name, Long age, String hobby) {

this.id = id;

this.name = name;

this.age = age;

this.hobby = hobby;

}

public Long getId() {

return id;

}

public void setId(Long id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Long getAge() {

return age;

}

public void setAge(Long age) {

this.age = age;

}

public String getHobby() {

return hobby;

}

public void setHobby(String hobby) {

this.hobby = hobby;

}

@Override

public String toString() {

return "Student{" +

"id=" + id +

", name='" + name + '\'' +

", age=" + age +

", hobby='" + hobby + '\'' +

'}';

}

}

}

通过查看Hudi表,证明3条数据写入成功:

3.1.2 数据来自Table

package com.test.hudi;

import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;

import org.apache.flink.runtime.state.StateBackend;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkValuesSqlWrite2HudiTest {

public static void main(String[] args) throws Exception {

// 1. 创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

// 2.必须开启checkpoint 默认有5个checkpoint后,hudi目录下才会有数据;不然只有一个.hoodie目录

String checkPointPath = "hdfs://hw-cdh-test02:8020/flinkinfo/meta/savepoints/FlinkDataStreamWrite2HudiTest";

StateBackend backend = new EmbeddedRocksDBStateBackend(true);

env.setStateBackend(backend);

CheckpointConfig conf = env.getCheckpointConfig();

// 任务流取消和故障应保留检查点

conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

conf.setCheckpointInterval(1000);//milliseconds

conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds

conf.setMinPauseBetweenCheckpoints(2 * 1000);//相邻两次checkpoint之间的时间间隔

conf.setCheckpointStorage(checkPointPath);

// 3.准备Hudi表的数据流,并将数据写入Hudi表

tabEnv.executeSql("" +

"CREATE TABLE out_ods_student_table(\n" +

" id BIGINT COMMENT '学号',\n" +

" name STRING\t COMMENT '姓名',\n" +

" age BIGINT COMMENT '年龄',\n" +

" hobby STRING COMMENT '爱好',\n" +

" PRIMARY KEY (id) NOT ENFORCED\n" +

")\n" +

"WITH(\n" +

" 'connector' = 'hudi',\n" +

" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +

" 'table.type' = 'MERGE_ON_READ',\n" +

" 'compaction.async.enabled' = 'true',\n" +

" 'compaction.tasks' = '1',\n" +

" 'compaction.trigger.strategy' = 'num_commits',\n" +

" 'compaction.delta_commits' = '3',\n" +

" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +

" 'hoodie.cleaner.commits.retained'='30',\n" +

" 'hoodie.keep.min.commits'='35' ,\n" +

" 'hoodie.keep.max.commits'='40'\n" +

")");

tabEnv.executeSql("" +

"insert into out_ods_student_table values\n" +

" (301, 'xiaoming', 201, 'read'),\n" +

" (302, 'xiaohong', 202, 'write'),\n" +

" (303, 'xiaogang', 203, 'sing')");

env.execute("FlinkValuesSqlWrite2HudiTest");

}

}

通过查看Hudi表,证明3条数据写入成功: 

3.2 读Hudi

package com.test.hudi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlReadFromHudiTest {

public static void main(String[] args) throws Exception {

// 1.创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

// 2.准备Hudi表的数据流,并从Hudi表读取数据

tabEnv.executeSql("" +

"CREATE TABLE out_ods_student_table(\n" +

" id BIGINT COMMENT '学号',\n" +

" name STRING\t COMMENT '姓名',\n" +

" age BIGINT COMMENT '年龄',\n" +

" hobby STRING COMMENT '爱好',\n" +

" PRIMARY KEY (id) NOT ENFORCED\n" +

")\n" +

"WITH(\n" +

" 'connector' = 'hudi',\n" +

" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_student_table',\n" +

" 'table.type' = 'MERGE_ON_READ',\n" +

" 'compaction.async.enabled' = 'true',\n" +

" 'compaction.tasks' = '1',\n" +

" 'compaction.trigger.strategy' = 'num_commits',\n" +

" 'compaction.delta_commits' = '3',\n" +

" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +

" 'hoodie.cleaner.commits.retained'='30',\n" +

" 'hoodie.keep.min.commits'='35' ,\n" +

" 'hoodie.keep.max.commits'='40'\n" +

")");

tabEnv.executeSql("select id,name,age,hobby from out_ods_student_table").print();

env.execute("FlinkSqlReadFromHudiTest");

}

}

输出结果:

四、补充

4.1 联合主键

在Flink Table操作Hudi的时候,可能会涉及到联合组件,可以在SQL中加入联合主键。比如:

tabEnv.executeSql("" +

"CREATE TABLE out_ods_userinfo_table_test(\n" +

" province_id BIGINT COMMENT '省份编号',\n" +

" user_id BIGINT COMMENT '用户编号',\n" +

" name STRING\t COMMENT '姓名',\n" +

" age BIGINT COMMENT '年龄',\n" +

" hobby STRING COMMENT '爱好',\n" +

" PRIMARY KEY (province_id,user_id) NOT ENFORCED\n" +

")\n" +

"WITH(\n" +

" 'connector' = 'hudi',\n" +

" 'path' = 'hdfs://hw-cdh-test02:8020/user/hive/warehouse/lake/ods_userinfo_table_test',\n" +

" 'table.type' = 'MERGE_ON_READ',\n" +

" 'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.ComplexKeyGenerator',\n" +

" 'hoodie.datasource.write.recordkey.field'= 'province_id,user_id',\n" +

" 'compaction.async.enabled' = 'true',\n" +

" 'compaction.tasks' = '1',\n" +

" 'compaction.trigger.strategy' = 'num_commits',\n" +

" 'compaction.delta_commits' = '3',\n" +

" 'hoodie.cleaner.policy'='KEEP_LATEST_COMMITS',\n" +

" 'hoodie.cleaner.commits.retained'='30',\n" +

" 'hoodie.keep.min.commits'='35' ,\n" +

" 'hoodie.keep.max.commits'='40'\n" +

")");

4.2 读取指定时间之后的数据

根据官方文档说明,可以读取指定提交时间之后的数据。比如指定时间20230318130057,那么读取到的都是提交时间在 2023-03-18 13:00:57 之后的数据,之前的数据读取不到。

4.2.1 DataStream API

options.put(FlinkOptions.READ_START_COMMIT.key(), "'20230318130057'"); // specifies the start commit instant time

4.2.2 Table API

'read.start-commit' = '20230318130057', -- specifies the start commit instant time

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: