Apache Spark 什么时候开始支持集成 Hive 功能?

笔者相信只要使用过 Spark 的读者,应该都会说这是很久以前的事情了。

那 Apache Flink 什么时候支持与 Hive 的集成呢?

读者可能有些疑惑,还没有支持吧,没用过?或者说最近版本才支持,但是功能还比较弱。

其实比较也没啥意义,不同社区发展的目标总是会有差异,而且 Flink 在真正的实时流计算方面投入的精力很多。不过笔者想表达的是,Apache Hive 已经成为数据仓库生态系统的焦点,它不仅是一个用于大数据分析和 ETL 的 SQL 引擎,也是一个数据管理平台,所以无论是 Spark,还是 Flink,或是 Impala、Presto 等,都会积极地支持集成 Hive 的功能。

的确,对真正需要使用 Flink 访问 Hive 进行数据读写的读者会发现,Apache Flink 1.9.0 版本才开始提供与 Hive 集成的功能。不过,值得欣慰的是,Flink 社区在集成 Hive 功能方面付出很多,目前进展也比较顺利,最近 Flink 1.10.0 RC1 版本已经发布,感兴趣的读者可以进行调研和验证功能。

架构设计

首先,笔者基于社区公开的资料以及博客,概括性地讲解 Flink 集成 Hive 的架构设计。

Apache Flink 与 Hive 集成的目的,主要包含了元数据和实际表数据的访问。

1. 元数据

为了访问外部系统的元数据,Flink 刚开始提供了 ExternalCatalog 的概念。但是 ExternalCatalog 的定义非常不完整,基本处于不可用的状态。Flink 1.10 版本正式删除了 ExternalCatalog API (FLINK-13697),这包括:

ExternalCatalog(以及所有依赖的类,比如 ExternalTable)SchematicDescriptor、MetadataDescriptor 和 StatisticsDescriptor

针对 ExternalCatalog 的问题,Flink 社区提出了一套全新的 Catalog 接口(new Catalog API)来取代现有的 ExternalCatalog。新的 Catalog 实现的功能包括:

能够支持数据库、表、分区等多种元数据对象允许在一个用户 Session 中维护多个 Catalog 实例,从而支持同时访问多个外部系统Catalog 以可插拔的方式接入 Flink,允许用户提供自定义的实现

下图展示了新的 Catalog API 的总体架构:

创建 TableEnvironment 的时候会同时创建一个 CatalogManager,负责管理不同的 Catalog 实例。TableEnvironment 通过 Catalog 来为 Table API 和 SQL Client 用户提供元数据服务。

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

val tableEnv = TableEnvironment.create(settings)val name = "myhive"

val defaultDatabase = "mydatabase"

val hiveConfDir = "/opt/hive-conf" // a local pathval version = "2.3.4"

val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version)tableEnv.registerCatalog("myhive", hive)// set the HiveCatalog as the current catalog of the sessiontableEnv.useCatalog("myhive")

目前 Catalog 有两个实现,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元数据管理机制,将所有元数据保存在内存中。而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。另一方面,HiveCatalog 也可以用来处理 Flink 自身的元数据,在这种场景下,HiveCatalog 仅将 Hive Metastore 作为持久化存储使用,写入 Hive Metastore 中的元数据并不一定是 Hive 所支持的格式。一个 HiveCatalog 实例可以同时支持这两种模式,用户无需为管理 Hive 和 Flink 的元数据创建不同的实例。

另外,通过设计 HiveShim 来支持不同版本的 Hive Metastore,具体支持的 Hive 版本列表,请参考官方文档。

2. 表数据

Flink 提供了 Hive Data Connector 来读写 Hive 的表数据。Hive Data Connector 尽可能的复用了 Hive 本身的 Input/Output Format 和 SerDe 等类,这样做的好处一方面是减少了代码重复,更重要的是可以最大程度的保持与 Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。

集成 Hive 功能

Flink 与 Hive 集成的功能在 1.9.0 版本中作为试用功能发布,存在不少使用的局限性,但是不久将发布的 Flink 1.10 稳定版本会更加完善集成 Hive 的功能并应用到企业场景中。

为了让读者提前体验 Flink 1.10 集成 Hive 的功能,笔者会基于 Cloudera CDH 编译 Flink 1.10.0 RC1 版本并进行较为完整的测试。

1. 环境信息

CDH 版本:cdh5.16.2

Flink 版本:release-1.10.0-rc1

Flink 使用了 RC 版本,仅供测试,不建议用于生产环境。

目前 Cloudera Data Platform 正式集成了 Flink 作为其流计算产品,非常方便用户使用。

CDH 环境开启了 Sentry 和 Kerberos。

2. 下载并编译 Flink

$ wget https://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz

$ tar zxvf release-1.10.0-rc1.tar.gz

$ cd flink-release-1.10.0-rc1/

$ mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

不出意外的话,编译到 flink-hadoop-fs 模块时,会报如下错误:

[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to HDPReleases (https://repo.hortonworks.com/content/repositories/releases/): Remote host closed connection during handshake: SSL peer shut down incorrectly

编译中遇到 flink-shaded-hadoop-2 找不到的问题,其实查看 Maven 仓库会发现,根本原因是 CDH 的 flink-shaded-hadoop-2 的 jar 包在 Maven 中央仓库是没有对应的编译版本,所以需要先对 Flink 依赖的 flink-shaded-hadoop-2 进行打包,再进行编译。

解决 flink-shaded-hadoop-2 问题

(1). 获取 flink-shaded 源码

git clone https://github.com/apache/flink-shaded.git

(2). 切换依赖的版本分支

根据上面报错时提示缺少的版本切换对应的代码分支,即缺少的是 9.0 版本的 flink-shaded-hadoop-2:

git checkout release-9.0

(3). 配置 CDH Repo 仓库

修改 flink-shaded 项目中的 pom.xml,添加 CDH maven 仓库,否则编译时找不到 CDH 相关的包。

... 中添加如下内容:

vendor-repos

vendor-repos

cloudera-releases

https://repository.cloudera.com/artifactory/cloudera-repos

true

false

(4). 编译 flink-shaded

开始执行编译:

mvn clean install -DskipTests -Drat.skip=true -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

建议通过科学上网方式编译,如果读者遇到一些网络连接的问题,可以试着重试或者更换依赖组件的仓库地址。

编译成功后,就会把 flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar 安装在本地 maven 仓库,如下为编译的最后日志:

Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar

Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom

3. 重新编译 Flink

mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

漫长的等待过程,读者可以并行做其他事情。

编译过程中,如果不出意外的话,会看到类似下面的错误信息:

[INFO] Running 'npm ci --cache-max=0 --no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/mime/-/mime-2.4.0.tgz failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from https://registry.npmjs.org/ due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/optimist/-/optimist-0.6.1.tgz failed, reason: read ECONNRESET

可以看到, flink-runtime-web 模块引入了对 frontend-maven-plugin 的依赖,需要安装 node、npm 和依赖组件。

如果没有通过科学上网,可以修改 flink-runtime-web/pom.xml 文件,添加 nodeDownloadRoot 和 npmDownloadRoot 的信息:

com.github.eirslett

frontend-maven-plugin

1.6

install node and npm

install-node-and-npm

https://registry.npm.taobao.org/dist/

https://registry.npmjs.org/npm/-/

v10.9.0

npm install

npm

ci --cache-max=0 --no-save

true

npm run build

npm

run build

web-dashboard

编译成功后,Flink 安装文件位于 flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin 目录下,打包并上传到部署到节点:

$ cd flink-dist/target/flink-1.10.0-bin$ tar zcvf flink-1.10.0.tar.gz flink-1.10.0

4. 部署和配置

Flink 部署比较简单,解压缩包即可。另外可以设置软链接、环境变量等,笔者不再介绍。

Flink 的核心配置文件是 flink-conf.yaml,一个典型的配置如下:

jobmanager.rpc.address: localhost

jobmanager.rpc.port: 6123

jobmanager.heap.size: 2048m

taskmanager.heap.size: 1024m

taskmanager.numberOfTaskSlots: 4

parallelism.default: 1

high-availability: zookeeper

high-availability.storageDir: hdfs:///user/flink110/recovery

high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181

state.backend: filesystem

state.checkpoints.dir: hdfs:///user/flink110/checkpoints

state.savepoints.dir: hdfs:///user/flink110/savepoints

jobmanager.execution.failover-strategy: region

rest.port: 8081

taskmanager.memory.preallocate: false

classloader.resolve-order: parent-first

security.kerberos.login.use-ticket-cache: true

security.kerberos.login.keytab: /home/flink_user/flink_user.keytab

security.kerberos.login.principal: flink_user

jobmanager.archive.fs.dir: hdfs:///user/flink110/completed-jobs

historyserver.web.address: 0.0.0.0historyserver.web.port: 8082

historyserver.archive.fs.dir: hdfs:///user/flink110/completed-jobs

historyserver.archive.fs.refresh-interval: 10000

笔者只罗列了一些常见的配置参数,读者根据实际情况修改。配置参数其实还是比较容易理解的,以后结合实战的文章再进行详细讲解。

4.1 集成 Hive 配置的依赖

如果要使用 Flink 与 Hive 集成的功能,除了上面的配置外,用户还需要添加相应的依赖:

如果需要使用 SQL Client,则需要将依赖的 jar 拷贝到 Flink 的 lib 目录中如果需要使用 Table API,则需要将相应的依赖添加到项目中(如 pom.xml)

org.apache.flink

flink-connector-hive_2.11

1.11-SNAPSHOT

provided

org.apache.flink

flink-table-api-java-bridge_2.11

1.11-SNAPSHOT

provided

org.apache.hive

hive-exec

${hive.version}

provided

笔者主要介绍使用 SQL Client 的方式,由于使用的 CDH 版本为 5.16.2,其中 Hadoop 版本为 2.6.0,Hive 版本为 1.1.0,所以需要将如下 jar 包拷贝到 flink 部署家目录中的 lib 目录下:

Flink 的 Hive connectorflink-connector-hive2.11-1.10.0.jarflink-hadoop-compatibility2.11-1.10.0.jarflink-orc_2.11-1.10.0.jar

flink-release-1.10.0-rc1/flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10.0.jar

flink-release-1.10.0-rc1/flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10.0.jar

flink-release-1.10.0-rc1/flink-formats/flink-orc/target/flink-orc_2.11-1.10.0.jar

Hadoop 依赖flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar

flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar

Hive 依赖hive-exec-1.1.0-cdh5.16.2.jarhive-metastore-1.1.0-cdh5.16.2.jarlibfb303-0.9.3.jar

/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec-1.1.0-cdh5.16.2.jar

/opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore-1.1.0-cdh5.16.2.jar

/opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.3.jar

其中 flink-shaded-hadoop-2-uber 包含了 Hive 对于 Hadoop 的依赖。如果不用 Flink 提供的包,用户也可以将集群中使用的 Hadoop 包添加进来,不过需要保证添加的 Hadoop 版本与 Hive 所依赖的版本是兼容的。

依赖的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用户集群中 Hive 所提供的 jar 包,详情请见支持不同的 Hive 版本。

Flink 部署的节点要添加 Hadoop、Yarn 以及 Hive 的客户端。

4.2 配置 HiveCatalog

多年来,Hive Metastore 在 Hadoop 生态系统中已发展成为事实上的元数据中心。许多公司在其生产中有一个单独的 Hive Metastore 服务实例,以管理其所有元数据(Hive 元数据或非 Hive 元数据)。

如果同时部署了 Hive 和 Flink,那么通过 HiveCatalog 能够使用 Hive Metastore 来管理 Flink 的元数据。

如果仅部署 Flink,HiveCatalog 就是 Flink 开箱即用提供的唯一持久化的 Catalog。如果没有持久化的 Catalog,那么使用 Flink SQL CREATE DDL 时必须在每个会话中重复创建像 Kafka 表这样的元对象,这会浪费大量时间。HiveCatalog 通过授权用户只需要创建一次表和其他元对象,并在以后的跨会话中非常方便地进行引用和管理。

如果要使用 SQL Client 时,用户需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的 catalogs 列表中可以指定一个或多个 Catalog 实例。

以下的示例展示了如何指定一个 HiveCatalog:

execution:

planner: blink

type: streaming ...

current-catalog: myhive # set the HiveCatalog as the current catalog of the session

current-database: mydatabasecatalogs:

- name: myhive

type: hive

hive-conf-dir: /opt/hive-conf # contains hive-site.xml

hive-version: 2.3.4

其中:

name 是用户给每个 Catalog 实例指定的名字,Catalog 名字和 DB 名字构成了 FlinkSQL 中元数据的命名空间,因此需要保证每个 Catalog 的名字是唯一的。type 表示 Catalog 的类型,对于 HiveCatalog 而言,type 应该指定为 hive。hive-conf-dir 用于读取 Hive 的配置文件,用户可以将其设定为集群中 Hive 的配置文件目录。hive-version 用于指定所使用的 Hive 版本。

指定了 HiveCatalog 以后,用户就可以启动 sql-client,并通过以下命令验证 HiveCatalog 已经正确加载。

Flink SQL> show catalogs;

default_catalogmyhiveFlink SQL> use catalog myhive;

其中 show catalogs 会列出加载的所有 Catalog 实例。需要注意的是,除了用户在 sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 还会自动加载一个 GenericInMemoryCatalog 实例作为内置的 Catalog,该内置 Catalog 默认名字为 default_catalog。

5. 读写 Hive 表

设置好 HiveCatalog 以后就可以通过 SQL Client 或者 Table API 来读写 Hive 中的表了。

假设 Hive 中已经有一张名为 mytable 的表,我们可以用以下的 SQL 语句来读写这张表。

5.1 读数据

Flink SQL> show catalogs;

myhive

default_catalog

Flink SQL> use catalog myhive;

Flink SQL> show databases;

defaultFlink SQL> show tables;

mytableFlink SQL> describe mytable;

root |-- name: name |-- type: STRING |-- name: value |-- type: DOUBLEFlink SQL> SELECT * FROM mytable;

name value__________ __________ Tom 4.72 John 8.0 Tom 24.2 Bob 3.14 Bob 4.72 Tom 34.9 Mary 4.79 Tiff 2.72 Bill 4.33 Mary 77.7

5.2 写数据

Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;

Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

# 静态分区

Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;

# 动态分区

Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25,

'type_1', '2019-08-08';# 静态分区和动态分区Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';

总结

在本文中,笔者首先介绍了 Flink 与 Hive 集成功能的架构设计,然后从源码开始编译,解决遇到的一些问题,接着部署和配置 Flink 环境以及集成 Hive 的具体操作过程,最后参考官方的案例,对 Hive 表进行读写操作。

后续,笔者会结合生产环境的实际使用情况,讲解通过 Flink SQL 来操作 Hive。

在上篇文章中,笔者使用的 CDH 版本为 5.16.2,其中 Hive 版本为 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代码本身对 Hive 1.1.0 版本兼容性不好,存在不少问题。为了兼容目前版本,笔者基于 CDH 5.16.2 环境,对 Flink 代码进行了修改,重新打包并部署。

其实经过很多开源项目的实战,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情况下,替换一些 Jar 包,是可以解决兼容性的问题。对于笔者的环境来说,可以使用 Hive 1.2.1 版本的一些 Jar 包来代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的开始部分,笔者会解决这个问题,然后再补充上篇文章缺少的实战内容。

剪不断理还乱的问题

根据读者的反馈,笔者将所有的问题总结为三类:

Flink 如何连接 Hive 除了 API 外,有没有类似 spark-sql 命令识别不到 Hadoop 环境或配置文件找不到依赖包、类或方法找不到

1. Flink 如何连接 Hive

有的读者不太清楚,如何配置 Flink 连接 Hive 的 Catalog,这里补充一个完整的 conf/sql-client-hive.yaml 示例:

sql-client-hive.yaml 配置文件里面包含:

Hive 配置文件 catalogs 中配置了 Hive 的配置文件路径。Yarn 配置信息 deployment 中配置了 Yarn 的配置信息。执行引擎信息 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比较稳定,适合传统的批处理作业,而且可以容错,另外中间数据落盘,建议开启压缩功能。除了 batch,Flink 也支持 streaming 模式。

■ Flink SQL CLI 工具

类似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 脚本。在 Flink 1.10 版本中,Flink SQL CLI 改进了很多功能,笔者后面讲解。

sql-client.sh 使用方式如下:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

2. 识别不到 Hadoop 环境或配置文件找不到

笔者在上篇文章中提到过,在部署 Flink 的环境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外还需要配置一些环境变量,如下:

export HADOOP_CONF_DIR=/etc/hadoop/conf

export YARN_CONF_DIR=/etc/hadoop/conf

export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive

export HIVE_CONF_DIR=/etc/hive/conf

3. 依赖包、类或方法找不到

先查看一下 Flink 家目录下的 lib 目录:

如果上面前两个问题都解决后,执行如下命令:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

报错,报错,还是报错:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory

其实在运行 sql-client.sh 脚本前,需要指定 Hadoop 环境的依赖包的路径,建议不要报错一个添加一个,除非有的读者喜欢。这里笔者提示一个方便的方式,即设置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)环境变量:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

再次执行:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

很抱歉,继续报错:

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

这里就是 Hive 1.1.0 版本的 Jar 包与 Flink 出现版本不兼容性的问题了,解决方法是:

下载 apache-hive-1.2.1 版本替换 Flink lib 目录下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然后添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次查看 lib 目录:

最后再执行:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

这时,读者就可以看到手握栗子的可爱小松鼠了。

Flink SQL CLI 实践

在 Flink 1.10 版本(目前为 RC1 阶段) 中,Flink 社区对 SQL CLI 做了大量的改动,比如支持 View、支持更多的数据类型和 DDL 语句、支持分区读写、支持 INSERT OVERWRITE 等,实现了更多的 TableEnvironment API 的功能,更加方便用户使用。

接下来,笔者详细讲解 Flink SQL CLI。

0. Help

执行下面命令,登录 Flink SQL 客户端:

$ bin/sql-client.sh embedded -d conf/sql-client-hive.yamlFlink SQL>

执行 HELP,查看 Flink SQL 支持的命令,如下为大部分常用的:

CREATE TABLEDROP TABLECREATE VIEWDESCRIBEDROP VIEWEXPLAININSERT INTOINSERT OVERWRITESELECTSHOW FUNCTIONSUSE CATALOGSHOW TABLESSHOW DATABASESSOURCEUSESHOW CATALOGS

1. Hive 操作

■ 1.1 创建表和导入数据

为了方便读者进行实验,笔者使用 ssb-dbgen 生成测试数据,读者也可以使用测试环境已有的数据来进行实验。

具体如何在 Hive 中一键式创建表并插入数据,可以参考笔者早期的项目 https://github.com/MLikeWater/ssb-kylin。

■ 1.2 Hive 表

查看上个步骤中创建的 Hive 表:

读者可以对 Hive 进行各种查询,对比后面 Flink SQL 查询的结果。

2. Flink 操作

■ 2.1 通过 HiveCatalog 访问 Hive 数据库

登录 Flink SQL CLI,并查询 catalogs:

通过 show catalogs 获取配置的所有 catalog。由于笔者在 sql-client-hive.yaml 文件中设置了默认的 catalog,即为 staginghive。如果需要切换到其他 catalog,可以使用 usecatalog xxx。

■ 2.2 查询 Hive 元数据

通过 Flink SQL 查询 Hive 数据库和表:

■ 2.3 查询

接下来,在 Flink SQL CLI 中查询一些 SQL 语句,完整 SQL 参考 https://github.com/MLikeWater/ssb-kylin 的 README。

目前 Flink SQL 解析 Hive 视图元数据时,会遇到一些 Bug,比如执行 Q1.1 SQL:

Flink SQL 找不到视图中的实体表。

p_lineorder 表是 Hive 中的一张视图,创建表的语句如下:

但是对于 Hive 中视图的定义,Flink SQL 并没有很好地处理元数据。为了后面 SQL 的顺利执行,这里笔者在 Hive 中删除并重建该视图

然后继续在 Flink SQL CLI 中查询 Q1.1 SQL:

继续查询 Q2.1 SQL:

最后再查询一个 Q4.3 SQL:

Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit

> from p_lineorder> left join dates on lo_orderdate = d_datekey

> left join customer on lo_custkey = c_custkey

> left join supplier on lo_suppkey = s_suppkey> left join part on lo_partkey = p_partkey

> where c_region = 'AMERICA'and s_nation = 'UNITED STATES'> and (d_year = 1997 or d_year = 1998)

> and p_category = 'MFGR#14'> group by d_year, s_city, p_brand

> order by d_year, s_city, p_brand;

d_year s_city p_brand profit

1998 UNITED ST9 MFGR#1440 6665681

如果读者感兴趣的话,可以查询剩余的 SQL,当然也可以和 Spark SQL 进行比较。另外 Flink SQL 也支持 EXPLAIN,查询 SQL 的执行计划。

■ 2.4 创建视图

同样,可以在 Flink SQL CLI 中创建和删除视图,如下:

这里笔者需要特别强调的是,目前 Flink 无法删除 Hive 中的视图:

Flink SQL> drop view p_lineorder;

[ERROR] Could not execute SQL statement. Reason:

The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

■ 2.5 分区操作

Hive 数据库中创建一张分区表:

CREATE TABLE IF NOT EXISTS flink_partition_test (

id int,

name string

) PARTITIONED BY (day string, type string)

stored as textfile;

接着,通过 Flink SQL 插入和查询数据:

# 插入静态分区的数据Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';

# 查询Flink SQL> select * from flink_partition_test;id name day type100001 Flink001 2020-02-01 Flink

# 插入动态分区Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';

# 查询Flink SQL> select * from flink_partition_test;id name day type100002 Spark 2020-02-02 SparkSQL100001 FlinkSQL 2020-02-01 Flink

# 动态和静态分区结合使用类似,不再演示

# 覆盖插入数据

Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';

id name day type

100002 Spark 2020-02-02 SparkSQL

100001 FlinkSQL 2020-02-01 Flink

字段 day 在 Flink 属于关键字,要特殊处理。

■ 2.6 其他功能

2.6.1 函数

Flink SQL 支持内置的函数和自定义函数。对于内置的函数,可以执行 show functions 进行查看,这一块笔者以后会单独介绍如何创建自定义函数。

2.6.2 设置参数

Flink SQL 支持设置环境参数,可以使用 set 命令查看和设置参数:

总结

在本文中,笔者通过 Flink SQL 比较详细地去操作 Hive 数据库,以及 Flink SQL 提供的一些功能。

当然,目前 Flink SQL 操作 Hive 数据库还是存在一些问题:

目前只支持 TextFile 存储格式,还无法指定其他存储格式 只支持 Hive 数据库中 TextFile 存储格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。虽然实现了 RCFile、ORC、Parquet、Sequence 等存储格式,但是无法自动识别 Hive 表的存储格式。如果要使用其他存储格式,需要修改源码,重新编译。不过社区已经对这些存储格式进行了测试,相信不久以后就可以在 Flink SQL 中使用。OpenCSVSerde 支持不完善 如果读者使用 TextFile 的 row format serde 为 org.apache.hadoop.hive.serde2.OpenCSVSerde 时,无法正确识别字段类型,会把 Hive 表的字段全部映射为 String 类型。暂时不支持 Bucket 表暂时不支持 ACID 表Flink SQL 优化方面功能较少权限控制方面 这方面和 Spark SQL 类似,目前基于 HDFS ACL 控制,暂时还没有实现 Sentry 或 Ranger 控制权限,不过目前 Cloudera 正在开发基于 Ranger 设置 Spark SQL 和 Hive 共享访问权限的策略,实现行/列级控制以及审计信息。

Flink 社区发展很快,所有这些问题只是暂时的,随着新版本的发布会被逐个解决。

如果 Flink SQL 目前不满足的需求,建议使用 API 方式来解决问题。

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: