主要记录下Hudi的概述和打包编译等内容,方便参考

文章目录

简介官网发展历史Hudi特性使用场景

安装部署编译环境准备

编译hudi1.源码包上传到服务器2.修改pom文件3.修改源码兼容hadoop34.手动安装kafka依赖(非必须)5.解决spark模块依赖冲突6.执行编译7.测试hudi-client

简单测试编译后spark包可用性

简介

Apache Hudi(Hadoop Upserts Delete and Incremental)是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。

Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。

Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。

官网

https://hudi.apache.org/

Apache Hudi(Hadoop Upserts Delete and Incremental)是下一代流数据湖平台。Apache Hudi将核心仓库和数据库功能直接引入数据湖。Hudi提供了表、事务、高效的upserts/delete、高级索引、流摄取服务、数据集群/压缩优化和并发,同时保持数据的开源文件格式。

Apache Hudi不仅非常适合于流工作负载,而且还允许创建高效的增量批处理管道。

Apache Hudi可以轻松地在任何云存储平台上使用。Hudi的高级性能优化,使分析工作负载更快的任何流行的查询引擎,包括Apache Spark、Flink、Presto、Trino、Hive等。

发展历史

2015 年:发表了增量处理的核心思想/原则(O’reilly 文章)。

2016 年:由 Uber 创建并为所有数据库/关键业务提供支持。

2017 年:由 Uber 开源,并支撑 100PB 数据湖。

2018 年:吸引大量使用者,并因云计算普及。

2019 年:成为 ASF 孵化项目,并增加更多平台组件。

2020 年:毕业成为 Apache 顶级项目,社区、下载量、采用率增长超过 10 倍。

2021 年:支持 Uber 500PB 数据湖,SQL DML、Flink 集成、索引、元服务器、缓存。

Hudi特性

可插拔索引机制支持快速Upsert/Delete。 支持增量拉取表变更以进行处理。 支持事务提交及回滚,并发控制。 支持Spark、Presto、Trino、Hive、Flink等引擎的SQL读写。 自动管理小文件,数据聚簇,压缩,清理。 流式摄入,内置CDC源和工具。 内置可扩展存储访问的元数据跟踪。 向后兼容的方式实现表结构变更的支持。

使用场景

近实时写入

减少碎片化工具的使用。 CDC 增量导入 RDBMS 数据。 限制小文件的大小和数量。

近实时分析

相对于秒级存储(Druid, OpenTSDB),节省资源。 提供分钟级别时效性,支撑更高效的查询。 Hudi作为lib,非常轻量。

增量 pipeline

区分arrivetime和event time处理延迟数据。更短的调度interval减少端到端延迟(小时 -> 分钟) => Incremental Processing。

增量导出

替代部分Kafka的场景,数据导出到在线服务存储 e.g. ES。

安装部署

hudi是以lib包的形式提供功能,不同版本对spark、flink支持的依赖包不一样,具体要看官网对应版本的版本支持说明

本文会做的测试的环境如下

Linux Centos7

组件版本Hudi0.12.1Hadoop3.2.4Hive3.1.3Flink1.14 scala-2.12Spark3.2.2 scala-2.12

hudi官网只提供了源码,需要自己编译

编译环境准备

linux环境编译

部署maven并配置环境变量

这个简单就不贴了

maven版本最好3.6以上别太低

这里贴下指定阿里仓库

修改setting.xml,指定为阿里仓库地址

vim $MAVEN_HOME/conf/settings.xml

nexus-aliyun

central

Nexus aliyun

http://maven.aliyun.com/nexus/content/groups/public

编译hudi

1.源码包上传到服务器

源码下载:https://dlcdn.apache.org/hudi/0.12.1/hudi-0.12.1.src.tgz

将hudi-0.12.1.src.tgz上传到/opt/software,并解压

tar -zxvf /opt/software/hudi-0.12.1.src.tgz -C /opt/software

2.修改pom文件

vim /opt/software/hudi-0.12.1/pom.xml

新增repository加速依赖下载

nexus-aliyun

nexus-aliyun

http://maven.aliyun.com/nexus/content/groups/public/

true

false

修改Hive/Hadoop依赖的组件版本

3.2.4

3.1.3

3.修改源码兼容hadoop3

要兼容hadoop3,除了修改版本,还需要修改如下代码:

vim /opt/software/hudi-0.12.1/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java

修改第110行,增加一个入参 null

vim 进入末行模式,然后输入set nu回车就可以看到行号

110行原先代码try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {

修改后:try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {

110 try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) {

111 try (HoodieParquetStreamWriter parquetWriter = new HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {

112 for (IndexedRecord record : records) {

113 String recordKey = getRecordKey(record).orElse(null);

114 parquetWriter.writeAvro(recordKey, record);

115 }

116 outputStream.flush();

117 }

118 }

4.手动安装kafka依赖(非必须)

0.12.0似乎是有这个问题,我这0.12.1编译没有这个问题

如果编译报错:common-utils-5.3.4.jar、common-config-5.3.4.jar、kafka-avro-serializer-5.3.4.jar、kafka-schema-registry-client-5.3.4.jar这几个jar找不到,那就要单独下载并install到你的maven仓库。

下载地址:http://packages.confluent.io/archive/5.3/confluent-5.3.4-2.12.zip

解压后找到以上报错找不到的jar包,上传服务器,并install

mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-config-5.3.4.jar

mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-utils-5.3.4.jar

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-avro-serializer-5.3.4.jar

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-schema-registry-client-5.3.4.jar

5.解决spark模块依赖冲突

修改了Hive版本为3.1.3,其携带的jetty是0.9.3,hudi本身用的jetty是0.9.4,存在依赖冲突。

不改可以编译通过,但是运行spark向hudi里插入数据会报错

个人测试是编译OK,但是执行插入数据就报如下错误

java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V

修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty vim /opt/software/hudi-0.12.1/packaging/hudi-spark-bundle/pom.xml 大概369行位置开始的hive-service、hive-jdbc、hive-metastore、hive-common 增加下方的...部分

${hive.groupid}

hive-service

${hive.version}

${spark.bundle.hive.scope}

guava

com.google.guava

org.eclipse.jetty

*

org.pentaho

*

${hive.groupid}

hive-jdbc

${hive.version}

${spark.bundle.hive.scope}

javax.servlet

*

javax.servlet.jsp

*

org.eclipse.jetty

*

${hive.groupid}

hive-metastore

${hive.version}

${spark.bundle.hive.scope}

javax.servlet

*

org.datanucleus

datanucleus-core

javax.servlet.jsp

*

guava

com.google.guava

${hive.groupid}

hive-common

${hive.version}

${spark.bundle.hive.scope}

org.eclipse.jetty.orbit

javax.servlet

org.eclipse.jetty

*

在此文件增加依赖

org.eclipse.jetty

jetty-server

${jetty.version}

org.eclipse.jetty

jetty-util

${jetty.version}

org.eclipse.jetty

jetty-webapp

${jetty.version}

org.eclipse.jetty

jetty-http

${jetty.version}

修改hudi-utilities-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty

解决的是:使用DeltaStreamer工具向hudi表插入数据时,也会报Jetty的错误使用DeltaStreamer工具向hudi表插入数据时,也会报Jetty的错误

vim /opt/software/hudi-0.12.1/packaging/hudi-utilities-bundle/pom.xml

hudi依赖相关:搜索找到hudi-common位置

hudi-0.12.1中,此包使用的是maven-shade-plugin插件进行include hudi相关依赖,故我们也是用相同方式进行exclude

在的下一级(与同级)增加

org.eclipse.jetty:*

另外:如果是hudi-0.12.0版本,可能不是使用maven-shade-plugin插件进行include hudi相关依赖,而使用的是正常depency的依赖引入,那么需要做如下几个依赖的exclude

hudi-common和hudi-client-common增加exclude项【hudi-0.12.1不用此操作】

org.eclipse.jetty

*

Hive依赖相关:搜索hive-service的依赖位置,对如下几个依赖进行处理

hive-service增加exclude项

servlet-api

javax.servlet

guava

com.google.guava

org.eclipse.jetty

*

org.pentaho

*

hive-jdbc增加exclude

javax.servlet

*

javax.servlet.jsp

*

org.eclipse.jetty

*

hive-metastore增加exclude项

javax.servlet

*

org.datanucleus

datanucleus-core

javax.servlet.jsp

*

guava

com.google.guava

hive-common

org.eclipse.jetty.orbit

javax.servlet

org.eclipse.jetty

*

增加jetty单独依赖

org.eclipse.jetty

jetty-server

${jetty.version}

org.eclipse.jetty

jetty-util

${jetty.version}

org.eclipse.jetty

jetty-webapp

${jetty.version}

org.eclipse.jetty

jetty-http

${jetty.version}

6.执行编译

mvn clean package -DskipTests -Dspark3.2 -Dflink1.14 -Dscala-2.12 -Dhadoop.version=3.2.4 -Pflink-bundle-shade-hive3

编译完成后,相关的包在packaging目录的各个模块中的target里:

[root@m1 packaging]# pwd

/opt/software/hudi-0.12.1/packaging

[root@m1 packaging]# ls

hudi-aws-bundle hudi-hadoop-mr-bundle hudi-presto-bundle hudi-utilities-bundle

hudi-datahub-sync-bundle hudi-hive-sync-bundle hudi-spark-bundle hudi-utilities-slim-bundle

hudi-flink-bundle hudi-integ-test-bundle hudi-timeline-server-bundle README.md

hudi-gcp-bundle hudi-kafka-connect-bundle hudi-trino-bundle

7.测试hudi-client

/opt/software/hudi-0.12.1/hudi-cli/hudi-cli.sh

出现如下即OK

Main called

===================================================================

* ___ ___ *

* /\__\ ___ /\ \ ___ *

* / / / /\__\ / \ \ /\ \ *

* / /__/ / / / / /\ \ \ \ \ \ *

* / \ \ ___ / / / / / \ \__\ / \__\ *

* / /\ \ /\__\ / /__/ ___ / /__/ \ |__| / /\/__/ *

* \/ \ \/ / / \ \ \ /\__\ \ \ \ / / / /\/ / / *

* \ / / \ \ / / / \ \ / / / \ /__/ *

* / / / \ \/ / / \ \/ / / \ \__\ *

* / / / \ / / \ / / \/__/ *

* \/__/ \/__/ \/__/ Apache Hudi CLI *

* *

===================================================================

10137 [main] INFO org.apache.hudi.cli.Main [] - Starting Main v0.12.1 using Java 1.8.0_181 on m1 with PID 6681 (/opt

/software/hudi-0.12.1/hudi-cli/target/hudi-cli-0.12.1.jar started by root in /opt/software/hudi-0.12.1/packaging)

10145 [main] INFO org.apache.hudi.cli.Main [] - No active profile set, falling back to 1 default profile: "default"

Table command getting loaded

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/opt/software/hudi-0.12.1/hudi-cli/target/lib/log4j-slf4j-impl-2.17.2.jar!/org/slf4

j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/opt/software/hudi-0.12.1/hudi-cli/target/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/

impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

11689 [main] WARN org.jline [] - The Parser of class org.springframework.shell.jline.ExtendedDefaultParser does not

support the CompletingParsedLine interface. Completion with escaped or quoted words won't work correctly.

11768 [main] INFO org.apache.hudi.cli.Main [] - Started Main in 2.031 seconds (JVM running for 11.806)

hudi->

简单测试编译后spark包可用性

需要有hadoop环境和spark

1.部署好hadoop集群、spark组件

这里不过多赘述如何安装这俩,spark只需要解压就行

spark下载:https://dlcdn.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop3.2.tgz

2.拷贝编译好的包到spark的jars目录

cp /opt/software/hudi-0.12.1/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.1.jar /opt/module/spark-3.2.2/jars

3.启动hadoop

4.spark-shell方式测试

启动spark-shell spark-shell \

--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \

--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \

--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

执行如下scala代码 // 设置表名,基本路径和数据生成器

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"

val basePath = "file:///tmp/hudi_trips_cow"

val dataGen = new DataGenerator

// 插入数据

val inserts = convertToStringList(dataGen.generateInserts(10))

val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

df.write.format("hudi").

options(getQuickstartWriteConfigs).

option(PRECOMBINE_FIELD_OPT_KEY, "ts").

option(RECORDKEY_FIELD_OPT_KEY, "uuid").

option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

option(TABLE_NAME, tableName).

mode(Overwrite).

save(basePath)

// 查询数据

val tripsSnapshotDF = spark.

read.

format("hudi").

load(basePath)

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()

也可以去/tmp/hudi_trips_cow/目录下查看是否有数据文件

执行示例 [root@m3 spark3]# bin/spark-shell --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

2023-01-16 01:40:40,221 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Spark context Web UI available at http://m3:4040

Spark context available as 'sc' (master = local[*], app id = local-1673851241535).

Spark session available as 'spark'.

Welcome to

____ __

/ __/__ ___ _____/ /__

_\ \/ _ \/ _ `/ __/ '_/

/___/ .__/\_,_/_/ /_/\_\ version 3.2.2

/_/

Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)

Type in expressions to have them evaluated.

Type :help for more information.

scala> import org.apache.hudi.QuickstartUtils._

import org.apache.hudi.QuickstartUtils._

scala> import scala.collection.JavaConversions._

import scala.collection.JavaConversions._

scala> import org.apache.spark.sql.SaveMode._

import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.hudi.config.HoodieWriteConfig._

scala> val tableName = "hudi_trips_cow"

tableName: String = hudi_trips_cow

scala> val basePath = "file:///tmp/hudi_trips_cow"

basePath: String = file:///tmp/hudi_trips_cow

scala> val dataGen = new DataGenerator

dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@2b2bcb4a

scala> val inserts = convertToStringList(dataGen.generateInserts(10))

inserts: java.util.List[String] = [{"ts": 1673839191417, "uuid": "0b652f6a-1349-444e-8442-976fc149b589", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1673565741975, "uuid": "20e0932b-5baa-4c74-a423-8e72a3c1dcef", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1673405567377, "uuid": "2417e9e6-c5a7-4399-b7a1-4b6e2fb90372", "rider": "rider-213", "driver"...

scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

warning: one deprecation (since 2.12.0)

warning: one deprecation (since 2.2.0)

warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation'

df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]

scala> df.write.format("hudi").

| options(getQuickstartWriteConfigs).

| option(PRECOMBINE_FIELD_OPT_KEY, "ts").

| option(RECORDKEY_FIELD_OPT_KEY, "uuid").

| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").

| option(TABLE_NAME, tableName).

| mode(Overwrite).

| save(basePath)

warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation'

2023-01-16 01:41:57,422 WARN config.DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf

2023-01-16 01:41:57,443 WARN config.DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file

2023-01-16 01:41:57,471 WARN hudi.HoodieSparkSqlWriter$: hoodie table at file:/tmp/hudi_trips_cow already exists. Deleting existing data & overwriting with new data.

2023-01-16 01:41:58,404 WARN metadata.HoodieBackedTableMetadata: Metadata table was not found at path file:/tmp/hudi_trips_cow/.hoodie/metadata

scala> val tripsSnapshotDF = spark.

| read.

| format("hudi").

| load(basePath)

tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]

scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()

+------------------+-------------------+-------------------+-------------+

| fare| begin_lon| begin_lat| ts|

+------------------+-------------------+-------------------+-------------+

| 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1673405567377|

| 93.56018115236618|0.14285051259466197|0.21624150367601136|1673491795155|

| 27.79478688582596| 0.6273212202489661|0.11488393157088261|1673772916404|

| 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1673347004963|

|34.158284716382845|0.46157858450465483| 0.4726905879569653|1673839191417|

| 66.62084366450246|0.03844104444445928| 0.0750588760043035|1673613988097|

| 43.4923811219014| 0.8779402295427752| 0.6100070562136587|1673565741975|

| 41.06290929046368| 0.8192868687714224| 0.651058505660742|1673298206656|

+------------------+-------------------+-------------------+-------------+

如果插入时报错:java.lang.NoSuchMethodError: org.apache.hudi.org.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V

去看下上文:解决spark依赖冲突小节解决

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: