《Flink SQL 基础概念》系列,共包含以下 5 篇文章:

Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 APIFlink SQL 基础概念(二):数据类型Flink SQL 基础概念(三):SQL 动态表 & 连续查询Flink SQL 基础概念(四):SQL 的时间属性Flink SQL 基础概念(五):SQL 时区问题

 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连  吧 (点赞 李、关注 、收藏 )!!!您的支持  将激励  博主输出更多优质内容!!!

Flink SQL 基础概念(一):SQL & Table 运行环境、基本概念及常用 API

1.SQL & Table 简介及运行环境1.1 简介1.2 SQL 和 Table API 运行环境依赖

2.SQL & Table 的基本概念及常用 API2.1 一个 SQL / Table API 任务的代码结构2.2 SQL 上下文:TableEnvironment2.3 SQL 中表的概念(外部表 TABLE、视图 VIEW)2.4 SQL 临时表、永久表2.5 SQL 外部数据表2.5.1 Table API 创建外部数据表2.5.2 SQL API 创建外部数据表

2.6 SQL 视图 VIEW2.6.1 Table API 创建 VIEW2.6.2 SQL API 创建 VIEW

2.7 一个 SQL 查询案例2.8 SQL 与 DataStream API 的转换

1.SQL & Table 简介及运行环境

1.1 简介

Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。

Table API 是一种集成在 Java、Scala 和 Python 语言中的查询 API,简单理解就是用 Java、Scala、Python 按照 SQL 的查询接口封装了一层 lambda 表达式的查询 API,它允许以强类型接口的方式组合各种关系运算符(如选择、筛选和联接)的查询操作,然后生成一个 Flink 任务运行。如下案例所示:

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.*;

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode()

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 下面就是 Table API 的案例,其语义等同于

// select a, count(b) as cnt

// from Orders

// group by a

DataSet result = tEnv

.from("Orders")

.groupBy($("a"))

.select($("a"), $("b").count().as("cnt"))

.toDataSet(counts, Row.class);

result.print();

SQL API 是基于 SQL 标准的 Apache Calcite 框架实现的,我们可以使用纯 SQL 来开发和运行一个 Flink 任务。如下案例所示:

insert into target

select a, count(b) as cnt

from Orders

group by a

注意:无论输入是连续(流处理)还是有界(批处理),在 Table 和 SQL 任一 API 中同一条查询语句是具有相同的语义并且会产出相同的结果的。这就是说为什么 Flink SQL 和 Table API 可以做到在用户接口层面的流批统一。xdm,用一套 SQL 既能跑流任务,也能跑批任务,它不香嘛?

Table API 和 SQL API 也与 DataStream API 做到了无缝集成。可以轻松地在三种 API 之间灵活切换。例如,可以使用 SQL 的 MATCH_RECOGNIZE 子句匹配出异常的数据,然后使用再转为 DataStream API 去灵活的构建针对于异常数据的自定义报警机制。

在大致了解了这两个 API 是干啥的之后,我们就可以直接来看看,怎么使用这两个 API 了。

1.2 SQL 和 Table API 运行环境依赖

根据小伙伴们使用的编程语言的不同(Java 或 Scala),需要将对应的依赖包添加到项目中。

Java 依赖如下:

org.apache.flink

flink-table-api-java-bridge_2.11

1.13.5

org.apache.flink

flink-table-planner-blink_2.11

1.13.5

org.apache.flink

flink-streaming-java_2.11

1.13.5

org.apache.flink

flink-table-common

1.13.5

Scala 依赖如下:

org.apache.flink

flink-table-api-scala-bridge_2.11

1.13.5

org.apache.flink

flink-table-planner-blink_2.11

1.13.5

provided

org.apache.flink

flink-streaming-scala_2.11

1.13.5

provided

org.apache.flink

flink-table-common

1.13.5

引入上述依赖之后,小伙伴萌就可以开始使用 Table / SQL API 了。具体案例如下文所示。

2.SQL & Table 的基本概念及常用 API

在小伙伴萌看下文之前,先看一下整体的思路,跟着博主思路走,会更清晰:

先通过一个 SQL / Table API 任务看一下我们在实际开发时的代码结构应该长啥样,让大家能有直观的感受。重点介绍 SQL / Table API 中核心 API - TableEnvironment。SQL / Table 所有能用的接口都在 TableEnvironment 中。通过两个角度(外部表 / 视图、临时 / 非临时)认识 Flink SQL 体系中的表的概念。举几个创建外部表、视图的实际应用案例。

2.1 一个 SQL / Table API 任务的代码结构

// 创建一个 TableEnvironment,为后续使用 SQL 或者 Table API 提供上线

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 声明为流任务

//.inBatchMode() // 声明为批任务

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 创建一个输入表

tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");

// 创建一个输出表

tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");

// 1. 使用 Table API 做一个查询并返回 Table

Table table2 = tableEnv.from("table1").select(...);

// 2. 使用 SQl API 做一个查询并返回 Table

Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");

// 将 table2 的结果使用 Table API 写入 outputTable 中,并返回结果

TableResult tableResult = table2.executeInsert("outputTable");

tableResult...

总结一下上面案例使用到的一些 API,让大家先对 Table / SQL API 的能力有一个大概了解:

TableEnvironment:Table API 和 SQL API 的都集成在一个 统一上下文(即 TableEnvironment)中,其地位等同于 DataStream API 中的 StreamExecutionEnvironment 的地位TableEnvironment::executeSql:用于 SQL API 中,可以执行一段完整 DDL、DML SQL。举例,方法入参可以是 CREATE TABLE xxx,INSERT INTO xxx SELECT xxx FROM xxx。TableEnvironment::from(xxx):用于 Table API 中,可以以强类型接口的方式运行。方法入参是一个表名称。TableEnvironment::sqlQuery:用于 SQL API 中,可以执行一段查询 SQL,并把结果以 Table 的形式返回。举例,方法的入参是 SELECT xxx FROM xxx。Table::executeInsert:用于将 Table 的结果插入到结果表中。方法入参是写入的目标表。

无论是对于 SQL API 来说还是对于 Table API 来说,都是使用 TableEnvironment 接口承载我们的业务查询逻辑的。只是在用户的使用接口的方式上有区别,以上述的 Java 代码为例,Table API 其实就是模拟 SQL 的查询方式封装了 Java 语言的 lambda 强类型 API,SQL 就是纯 SQL 查询。Table 和 SQL 很多时候都是掺杂在一起的,大家理解的时候就可以直接将 Table 和 SQL API 直接按照 SQL 进行理解,不用强行做特殊的区分。

而且博主推荐的话,直接上 SQL API 就行,其实 Table API 在企业实战中用的不是特别多。你说 Table API 方便吧,它确实比 DataStream API 方便,但是又比 SQL 复杂。一般生产使用不多。

注意:由于 Table 和 SQL API 基本上属于一回事,后续如果没有特别介绍的话,博主就直接按照 SQL API 进行介绍了。

2.2 SQL 上下文:TableEnvironment

TableEnvironment 是使用 SQL API 永远都离不开的一个接口。其是 SQL API 使用的入口(上下文),就像是你要使用 Java DataStream API 去写一个 Flink 任务需要使用到 StreamExecutionEnvironment 一样。

可以认为 TableEnvironment 在 SQL API 中的地位和 StreamExecutionEnvironment 在 DataStream 中的地位是一样的,都是包含了一个 Flink 任务运行时的所有上下文环境信息。大家这样对比学习会比较好理解。

TableEnvironment 包含的功能如下:

Catalog 管理:Catalog 可以理解为 Flink 的 MetaStore,类似 Hive MetaStore 对在 Hive 中的地位,关于 Flink Catalog 的详细内容后续进行介绍。表管理:在 Catalog 中注册表。SQL 查询:(这 TMD 还用说,最基本的功能啊),就像 DataStream 中提供了 addSource、map、flatmap 等接口。UDF 管理:注册用户定义(标量函数:一进一出、表函数:一进多出、聚合函数:多进一出)函数。UDF 扩展:加载可插拔 Module(Module 可以理解为 Flink 管理 UDF 的模块,是可插拔的,可以让小伙伴萌自定义 Module,去支持奇奇怪怪的 UDF 功能)。

DataStream 和 Table(Table / SQL API 的查询结果)之间进行转换:目前

1.13

1.13

1.13 版本的只有流任务支持,批任务不支持。

1.14

1.14

1.14 支持批任务。

接下来介绍如何创建一个 TableEnvironment。案例为 Java。

方法 1:通过 EnvironmentSettings 创建 TableEnvironment

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;

// 1. 就是设置一些环境信息

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 声明为流任务

//.inBatchMode() // 声明为批任务

.build();

// 2. 创建 TableEnvironment

TableEnvironment tEnv = TableEnvironment.create(settings);

1.13

1.13

1.13 版本中:

如果你是 inStreamingMode,则最终创建出来的 TableEnvironment 实例为 StreamTableEnvironmentImpl。如果你是 inBatchMode,则最终创建出来的 TableEnvironment 实例为 TableEnvironmentImpl。

它两虽然都继承了 TableEnvironment 接口,但是 StreamTableEnvironmentImpl 支持的功能更多一些。大家可以直接去看看接口实验一下,这里就不进行详细介绍。

方法 2:通过已有的 StreamExecutionEnvironment 创建 TableEnvironment

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

2.3 SQL 中表的概念(外部表 TABLE、视图 VIEW)

一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称。如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default。

举个例子,下面这个 SQL 创建的 Table 的全名为 default.default.table1。

tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");

下面这个 SQL 创建的 Table 的全名为 default.mydatabase.table1。

tableEnv.executeSql("CREATE TEMPORARY TABLE mydatabase.table1 ... WITH ( 'connector' = ... )");

表 可以是 常规的(外部表 TABLE),也可以是 虚拟的(视图 VIEW)。

外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。视图 VIEW:从已经存在的表中创建,视图一般是一个 SQL 逻辑的查询结果。对比到离线的 Hive SQL 中,在离线的场景(Hive 表)中 VIEW 也都是从已有的表中去创建的。其实 Flink 也是一样的。

注意:这里有不同的地方就是,离线 Hive MetaStore 中不会有 Catalog 这个概念,其标识都是 数据库.数据表。

2.4 SQL 临时表、永久表

表(视图、外部表)可以是 临时的,并与单个 Flink Session(可以理解为 Flink 任务运行一次就是一个 Session)的生命周期绑定。表(视图、外部表)也可以是 永久的,并且对多个 Flink Session 都生效。

临时表:通常保存于内存中并且仅在创建它们的 Flink Session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 Session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。因为这个表的元数据没有被持久化。如下案例:

-- 临时外部表

CREATE TEMPORARY TABLE source_table (

user_id BIGINT,

`name` STRING

) WITH (

'connector' = 'user_defined',

'format' = 'json',

'class.name' = 'flink.examples.sql._03.source_sink.table.user_defined.UserDefinedSource'

);

-- 临时视图

CREATE TEMPORARY VIEW query_view as

SELECT *

FROM source_table;

永久表:需要外部 Catalog(例如 Hive Metastore)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink Session 可见且持续存在,直至从 Catalog 中被明确删除。如下案例:

-- 永久外部表。需要外部 Catalog 持久化!!!

CREATE TABLE source_table (

user_id BIGINT,

`name` STRING

) WITH (

'connector' = 'user_defined',

'format' = 'json',

'class.name' = 'flink.examples.sql._03.source_sink.table.user_defined.UserDefinedSource'

);

-- 永久视图。需要外部 Catalog 持久化!!!

CREATE VIEW query_view as

SELECT *

FROM source_table;

 注意:如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink Session 中,你的任务访问到这个表时,访问到的永远是临时表(即 相同名称的表,临时表会屏蔽永久表)。

2.5 SQL 外部数据表

由于目前在实时数据的场景中多以消息队列作为数据表。此处就以 Kafka 为例创建一个外部数据表。

2.5.1 Table API 创建外部数据表

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =

StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// kafka 数据源

DataStream r = env.addSource(new FlinkKafkaConsumer(xxx));

// 将 DataStream 转为一个 Table API 中的 Table 对象进行使用

Table sourceTable = tEnv.fromDataStream(r

, Schema

.newBuilder()

.column("f0", "string")

.column("f1", "string")

.column("f2", "bigint")

.columnByExpression("proctime", "PROCTIME()")

.build());

tEnv.createTemporaryView("source_table", sourceTable);

String selectWhereSql = "select f0 from source_table where f1 = 'b'";

Table resultTable = tEnv.sqlQuery(selectWhereSql);

tEnv.toRetractStream(resultTable, Row.class).print();

env.execute();

}

上述案例中,Table API 将一个 DataStream 的结果集通过 StreamTableEnvironment::fromDataStream 转为一个 Table 对象来使用。

2.5.2 SQL API 创建外部数据表

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// SQL API 执行 create table 创建表

tEnv.executeSql(

"CREATE TABLE KafkaSourceTable (\n"

+ " `f0` STRING,\n"

+ " `f1` STRING\n"

+ ") WITH (\n"

+ " 'connector' = 'kafka',\n"

+ " 'topic' = 'topic',\n"

+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"

+ " 'properties.group.id' = 'testGroup',\n"

+ " 'format' = 'json'\n"

+ ")"

);

Table t = tEnv.sqlQuery("SELECT * FROM KafkaSourceTable");

具体的创建方式就是使用 Create Table xxx DDL 定义一个 Kafka 数据源(输入)表(也可以是 Kafka 数据汇(输出)表)。

xdm,是不是又和 Hive 一样?惊不惊喜意不意外。对比学习 +1。

2.6 SQL 视图 VIEW

上文已经说了,一个 VIEW 其实就是一段 SQL 逻辑的查询结果。

视图 VIEW 在 Table API 中的体现就是:一个 Table 的 Java 对象,其封装了一段查询逻辑。如下案例所示。

2.6.1 Table API 创建 VIEW

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 声明为流任务

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// Table API 中的一个 Table 对象

Table projTable = tEnv.from("X").select(...);

// 将 projTable 创建为一个叫做 projectedTable 的 VIEW

tEnv.createTemporaryView("projectedTable", projTable);

Table API 是使用了 TableEnvironment::createTemporaryView 接口将一个 Table 对象创建为一个 VIEW。

2.6.2 SQL API 创建 VIEW

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 声明为流任务

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

String sql = "CREATE TABLE source_table (\n"

+ " user_id BIGINT,\n"

+ " `name` STRING\n"

+ ") WITH (\n"

+ " 'connector' = 'user_defined',\n"

+ " 'format' = 'json',\n"

+ " 'class.name' = 'flink.examples.sql._03.source_sink.table.user_defined.UserDefinedSource'\n"

+ ");\n"

+ "\n"

+ "CREATE TABLE sink_table (\n"

+ " user_id BIGINT,\n"

+ " name STRING\n"

+ ") WITH (\n"

+ " 'connector' = 'print'\n"

+ ");\n"

+ "CREATE VIEW query_view as\n" // 创建 VIEW

+ "SELECT\n"

+ " *\n"

+ "FROM source_table\n"

+ ";\n"

+ "INSERT INTO sink_table\n"

+ "SELECT\n"

+ " *\n"

+ "FROM query_view;";

Arrays.stream(sql.split(";"))

.forEach(tEnv::executeSql);

SQL API 是直接通过一段 CREATE VIEW query_view as select * from source_table 来创建的 VIEW,是纯 SQL 写法。

这种创建方式是不是贼熟悉,和离线 Hive 一样。对比学习 +1。

 注意:在 Table API 中的一个 Table 对象被后续的多个查询使用的场景下,Table 对象不会真的产生一个中间表供下游多个查询去引用,即多个查询不共享这个 Table 的结果,小伙伴萌可以理解为是一种中间表的简化写法,不会先产出一个中间表结果,然后将这个结果在下游多个查询中复用,后续的多个查询会将这个 Table 的逻辑执行多次。类似于 with tmp as (DML) 的语法

2.7 一个 SQL 查询案例

来看看一个 SQL 查询案例。

案例场景:计算每一种商品(sku_id 唯一标识)的售出个数、总销售额、平均销售额、最低价、最高价。数据准备:数据源为商品的销售流水(sku_id:商品,price:销售价格),然后写入到 Kafka 的指定 topic 当中(sku_id:商品,count_result:售出个数、sum_result:总销售额、avg_result:平均销售额、min_result:最低价、max_result:最高价)。

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 声明为流任务

//.inBatchMode() // 声明为批任务

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

// 1. 创建一个数据源(输入)表,这里的数据源是 flink 自带的一个随机 mock 数据的数据源。

String sourceSql = "CREATE TABLE source_table (\n"

+ " sku_id STRING,\n"

+ " price BIGINT\n"

+ ") WITH (\n"

+ " 'connector' = 'datagen',\n"

+ " 'rows-per-second' = '1',\n"

+ " 'fields.sku_id.length' = '1',\n"

+ " 'fields.price.min' = '1',\n"

+ " 'fields.price.max' = '1000000'\n"

+ ")";

// 2. 创建一个数据汇(输出)表,输出到 kafka 中

String sinkSql = "CREATE TABLE sink_table (\n"

+ " sku_id STRING,\n"

+ " count_result BIGINT,\n"

+ " sum_result BIGINT,\n"

+ " avg_result DOUBLE,\n"

+ " min_result BIGINT,\n"

+ " max_result BIGINT,\n"

+ " PRIMARY KEY (`sku_id`) NOT ENFORCED\n"

+ ") WITH (\n"

+ " 'connector' = 'upsert-kafka',\n"

+ " 'topic' = 'tuzisir',\n"

+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"

+ " 'key.format' = 'json',\n"

+ " 'value.format' = 'json'\n"

+ ")";

// 3. 执行一段 group by 的聚合 SQL 查询

String selectWhereSql = "insert into sink_table\n"

+ "select sku_id,\n"

+ " count(*) as count_result,\n"

+ " sum(price) as sum_result,\n"

+ " avg(price) as avg_result,\n"

+ " min(price) as min_result,\n"

+ " max(price) as max_result\n"

+ "from source_table\n"

+ "group by sku_id";

tEnv.executeSql(sourceSql);

tEnv.executeSql(sinkSql);

tEnv.executeSql(selectWhereSql);

2.8 SQL 与 DataStream API 的转换

大家会比较好奇,要写 SQL 就纯 SQL 呗,要写 DataStream 就纯 DataStream 呗,为啥还要把这两类接口做集成呢?

博主举一个案例:在 PDD 这种发补贴券的场景下,希望可以在发的补贴券总金额超过

10000

10000

10000 元时,及时报警出来,来帮助控制预算,防止发的太多。

对应的解决方案,我们可以想到使用 SQL 计算补贴券发放的结果,但是 SQL 的问题在于无法做到报警。所以我们可以将 SQL 的查询的结果(即 Table 对象)转为 DataStream,然后就可以在 DataStream 后自定义报警逻辑的算子。

我们直接上 SQL 和 DataStream API 互相转化的案例:

public static void main(String[] args) throws Exception {

FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);

// 1. pdd 发补贴券流水数据

String createTableSql = "CREATE TABLE source_table (\n"

+ " id BIGINT,\n" -- 补贴券的流水 id

+ " money BIGINT,\n" -- 补贴券的金额

+ " row_time AS cast(CURRENT_TIMESTAMP as timestamp_LTZ(3)),\n"

+ " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"

+ ") WITH (\n"

+ " 'connector' = 'datagen',\n"

+ " 'rows-per-second' = '1',\n"

+ " 'fields.id.min' = '1',\n"

+ " 'fields.id.max' = '100000',\n"

+ " 'fields.money.min' = '1',\n"

+ " 'fields.money.max' = '100000'\n"

+ ")\n";

// 2. 计算总计发放补贴券的金额

String querySql = "SELECT UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, \n"

+ " window_start, \n"

+ " sum(money) as sum_money,\n" -- 补贴券的发放总金额

+ " count(distinct id) as count_distinct_id\n"

+ "FROM TABLE(CUMULATE(\n"

+ " TABLE source_table\n"

+ " , DESCRIPTOR(row_time)\n"

+ " , INTERVAL '5' SECOND\n"

+ " , INTERVAL '1' DAY))\n"

+ "GROUP BY window_start, \n"

+ " window_end";

flinkEnv.streamTEnv().executeSql(createTableSql);

Table resultTable = flinkEnv.streamTEnv().sqlQuery(querySql);

// 3. 将金额结果转为 DataStream,然后自定义超过 1w 的报警逻辑

flinkEnv.streamTEnv()

.toDataStream(resultTable, Row.class)

.flatMap(new FlatMapFunction() {

@Override

public void flatMap(Row value, Collector out) throws Exception {

long l = Long.parseLong(String.valueOf(value.getField("sum_money")));

if (l > 10000L) {

log.info("报警,超过 1w");

}

}

});

flinkEnv.env().execute();

}

目前在

1.13

1.13

1.13 版本中,Flink 对于 Table 和 DataStream 的转化是有一些限制的:上面的案例可以看到,Table 和 DataStream 之间的转换目前只有 StreamTableEnvironment::toDataStream、StreamTableEnvironment::fromDataStream 接口支持。

所以其实小伙伴萌可以理解为只有流任务才支持 Table 和 DataStream 之间的转换,批任务是不支持的(虽然可以使用流执行模式处理有界流 - 批数据,也就是模拟按照批执行,但效率较低,这种骚操作不建议大家搞)。

那什么时候才能支持批任务的 Table 和 DataStream 之间的转换呢?

1.14

1.14

1.14 版本支持。

1.14

1.14

1.14 版本中,流和批的都统一到了 StreamTableEnvironment 中,因此就可以做 Table 和 DataStream 的互相转换了。

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式