来源:B站尚硅谷

代码中使用FlinkSQL

需要引入的依赖

我们想要在代码中使用Table API,必须引入相关的依赖。

org.apache.flink

flink-table-api-java-bridge

${flink.version}

这里的依赖是一个Java的“桥接器”(bridge),主要就是负责Table API和下层DataStream API的连接支持,按照不同的语言分为Java版和Scala版。 如果我们希望在本地的集成开发环境(IDE)里运行Table API和SQL,还需要引入以下依赖:

org.apache.flink

flink-table-planner-loader

${flink.version}

org.apache.flink

flink-table-runtime

${flink.version}

org.apache.flink

flink-connector-files

${flink.version}

创建表环境

对于Flink这样的流处理框架来说,数据流和表在结构上还是有所区别的。所以使用Table API和SQL需要一个特别的运行时环境,这就是所谓的“表环境”(TableEnvironment)。它主要负责: (1)注册Catalog和表; (2)执行 SQL 查询; (3)注册用户自定义函数(UDF); (4)DataStream 和表之间的转换。 每个表和SQL的执行,都必须绑定在一个表环境(TableEnvironment)中。TableEnvironment是Table API中提供的基本接口类,可以通过调用静态的create()方法来创建一个表环境实例。方法需要传入一个环境的配置参数EnvironmentSettings,它可以指定当前表环境的执行模式和计划器(planner)。执行模式有批处理和流处理两种选择,默认是流处理模式;计划器默认使用blink planner。

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.inStreamingMode() // 使用流处理模式

.build();

TableEnvironment tableEnv = TableEnvironment.create(setting);

对于流处理场景,其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

这里我们引入了一个“流式表环境”(StreamTableEnvironment),它是继承自TableEnvironment的子接口。调用它的create()方法,只需要直接将当前的流执行环境(StreamExecutionEnvironment)传入,就可以创建出对应的流式表环境了。

创建表

表(Table)是我们非常熟悉的一个概念,它是关系型数据库中数据存储的基本形式,也是SQL执行的基本对象。 具体创建表的方式,有通过连接器(connector)和虚拟表(virtual tables)两种。

1)连接器表(Connector Tables) 最直观的创建表的方式,就是通过连接器(connector)连接到一个外部系统,然后定义出对应的表结构。 在代码中,我们可以调用表环境的executeSql()方法,可以传入一个DDL作为参数执行SQL操作。这里我们传入一个CREATE语句进行表的创建,并通过WITH关键字指定连接到外部系统的连接器:

tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");

这里的TEMPORARY关键字可以省略。

2)虚拟表(Virtual Tables) 在环境中注册之后,我们就可以在SQL中直接使用这张表进行查询转换了。

Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");

这里调用了表环境的sqlQuery()方法,直接传入一条SQL语句作为参数执行查询,得到的结果是一个Table对象。Table是Table API中提供的核心接口类,就代表了一个Java中定义的表实例。 由于newTable是一个Table对象,并没有在表环境中注册;所以如果希望直接在SQL中使用,我们还需要将这个中间结果表注册到环境中:

tableEnv.createTemporaryView("NewTable", newTable);

我们发现,这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与SQL语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图”(createTemporaryView)。

表的查询

创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。 Flink为我们提供了两种查询方式:SQL,和Table API。

1)执行SQL进行查询在代码中,我们只要调用表环境的sqlQuery()方法,传入一个字符串形式的SQL查询语句就可以了。执行得到的结果,是一个Table对象。

// 创建表环境

TableEnvironment tableEnv = ...;

// 创建表

tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

// 查询用户Alice的点击事件,并提取表中前两个字段

Table aliceVisitTable = tableEnv.sqlQuery(

"SELECT user, url " +

"FROM EventTable " +

"WHERE user = 'Alice' "

);

目前Flink支持标准SQL中的绝大部分用法,并提供了丰富的计算函数。这样我们就可以把已有的技术迁移过来,像在MySQL、Hive中那样直接通过编写SQL实现自己的处理需求,从而大大降低了Flink上手的难度。 例如,我们也可以通过GROUP BY关键字定义分组聚合,调用COUNT()、SUM()这样的函数来进行统计计算:

Table urlCountTable = tableEnv.sqlQuery(

"SELECT user, COUNT(url) " +

"FROM EventTable " +

"GROUP BY user "

);

上面的例子得到的是一个新的Table对象,我们可以再次将它注册为虚拟表继续在SQL中调用。另外,我们也可以直接将查询的结果写入到已经注册的表中,这需要调用表环境的executeSql()方法来执行DDL,传入的是一个INSERT语句:

// 注册表

tableEnv.executeSql("CREATE TABLE EventTable ... WITH ( 'connector' = ... )");

tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 将查询结果输出到OutputTable中

tableEnv.executeSql (

"INSERT INTO OutputTable " +

"SELECT user, url " +

"FROM EventTable " +

"WHERE user = 'Alice' "

);

2)调用Table API进行查询 由于Table API是基于Table的Java实例进行调用的,因此我们首先要得到表的Java对象。基于环境中已注册的表,可以通过表环境的from()方法非常容易地得到一个Table对象:

Table eventTable = tableEnv.from("EventTable");

传入的参数就是注册好的表名。注意这里eventTable是一个Table对象,而EventTable是在环境中注册的表名。得到Table对象之后,就可以调用API进行各种转换操作了,得到的是一个新的Table对象:

Table maryClickTable = eventTable

.where($("user").isEqual("Alice"))

.select($("url"), $("user"));

“$”符号用来指定表中的一个字段。Table API是嵌入编程语言中的DSL,SQL中的很多特性和功能必须要有对应的实现才可以使用,因此跟直接写SQL比起来肯定就要麻烦一些。

3)两种API的结合使用 (1)无论是那种方式得到的Table对象,都可以继续调用Table API进行查询转换; (2)如果想要对一个表执行SQL操作(用FROM关键字引用),必须先在环境中对它进行注册。所以我们可以通过创建虚拟表的方式实现两者的转换:

tableEnv.createTemporaryView("MyTable", myTable);

两种API殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易引起混淆,而Table API功能相对较少、通用性较差,所以企业项目中往往会直接选择SQL的方式来实现需求。

输出表

表的创建和查询,就对应着流处理中的读取数据源(Source)和转换(Transform);而最后一个步骤Sink,也就是将结果数据输出到外部系统,就对应着表的输出操作。 在代码上,输出一张表最直接的方法,就是调用Table的方法executeInsert()方法将一个 Table写入到注册过的表中,方法传入的参数就是注册的表名。

// 注册表,用于输出数据到外部系统

tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");

// 经过查询转换,得到结果表

Table result = ...

// 将结果表写入已注册的输出表中

result.executeInsert("OutputTable");

在底层,表的输出是通过将数据写入到TableSink来实现的。TableSink是Table API中提供的一个向外部系统写入数据的通用接口,可以支持不同的文件格式(比如CSV、Parquet)、存储数据库(比如JDBC、Elasticsearch)和消息队列(比如Kafka)。

public class SqlDemo {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// TODO 1.创建表环境

// 1.1 写法一:

// EnvironmentSettings settings = EnvironmentSettings.newInstance()

// .inStreamingMode()

// .build();

// StreamTableEnvironment tableEnv = TableEnvironment.create(settings);

// 1.2 写法二

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// TODO 2.创建表

tableEnv.executeSql("CREATE TABLE source ( \n" +

" id INT, \n" +

" ts BIGINT, \n" +

" vc INT\n" +

") WITH ( \n" +

" 'connector' = 'datagen', \n" +

" 'rows-per-second'='1', \n" +

" 'fields.id.kind'='random', \n" +

" 'fields.id.min'='1', \n" +

" 'fields.id.max'='10', \n" +

" 'fields.ts.kind'='sequence', \n" +

" 'fields.ts.start'='1', \n" +

" 'fields.ts.end'='1000000', \n" +

" 'fields.vc.kind'='random', \n" +

" 'fields.vc.min'='1', \n" +

" 'fields.vc.max'='100'\n" +

");\n");

tableEnv.executeSql("CREATE TABLE sink (\n" +

" id INT, \n" +

" sumVC INT \n" +

") WITH (\n" +

"'connector' = 'print'\n" +

");\n");

// TODO 3.执行查询

// 3.1 使用sql进行查询

// Table table = tableEnv.sqlQuery("select id,sum(vc) as sumVC from source where id>5 group by id ;");

// 把table对象,注册成表名

// tableEnv.createTemporaryView("tmp", table);

// tableEnv.sqlQuery("select * from tmp where id > 7");

// 3.2 用table api来查询

Table source = tableEnv.from("source");

Table result = source

.where($("id").isGreater(5))

.groupBy($("id"))

.aggregate($("vc").sum().as("sumVC"))

.select($("id"), $("sumVC"));

// TODO 4.输出表

// 4.1 sql用法

// tableEnv.executeSql("insert into sink select * from tmp");

// 4.2 tableapi用法

result.executeInsert("sink");

}

}

表和流的转换

1)将流(DataStream)转换成表(Table) (1)调用fromDataStream()方法 想要将一个DataStream转换成表很简单,可以通过调用表环境的fromDataStream()方法来实现,返回的就是一个Table对象。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 获取表环境

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 读取数据源

SingleOutputStreamOperator sensorDS = env.fromSource(...)

// 将数据流转换成表

Table sensorTable = tableEnv.fromDataStream(sensorDS);

由于流中的数据本身就是定义好的POJO类型WaterSensor,所以我们将流转换成表之后,每一行数据就对应着一个WaterSensor,而表中的列名就对应着WaterSensor中的属性。 另外,我们还可以在fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置:

// 提取Event中的timestamp和url作为表中的列

Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id"), $("vc"));

也可以通过表达式的as()方法对字段进行重命名:

// 将timestamp字段重命名为ts

Table sensorTable = tableEnv.fromDataStream(sensorDS, $("id").as("sid"), $("vc"));

(2)调用createTemporaryView()方法 调用fromDataStream()方法简单直观,可以直接实现DataStream到Table的转换;不过如果我们希望直接在SQL中引用这张表,就还需要调用表环境的createTemporaryView()方法来创建虚拟视图了。 对于这种场景,也有一种更简洁的调用方式。我们可以直接调用createTemporaryView()方法创建虚拟表,传入的两个参数,第一个依然是注册的表名,而第二个可以直接就是DataStream。之后仍旧可以传入多个参数,用来指定表中的字段

tableEnv.createTemporaryView("sensorTable",sensorDS, $("id"),$("ts"),$("vc"));

这样,我们接下来就可以直接在SQL中引用表sensorTable了。

2)将表(Table)转换成流(DataStream) (1)调用toDataStream()方法 将一个Table对象转换成DataStream非常简单,只要直接调用表环境的方法toDataStream()就可以了。例如:

tableEnv.toDataStream(table).print();

(2)调用toChangelogStream()方法 urlCountTable这个表中进行了分组聚合统计,所以表中的每一行是会“更新”的。对于这样有更新操作的表,我们不应该直接把它转换成DataStream打印输出,而是记录一下它的“更新日志”(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。 代码中需要调用的是表环境的toChangelogStream()方法:

Table table = tableEnv.sqlQuery(

"SELECT id, sum(vc) " +

"FROM source " +

"GROUP BY id "

);

// 将表转换成更新日志流

tableEnv.toChangelogStream(table).print();

3)支持的数据类型 整体来看,DataStream中支持的数据类型,Table中也是都支持的,只不过在进行转换时需要注意一些细节。 1)原子类型 在Flink中,基础数据类型(Integer、Double、String)和通用数据类型(也就是不可再拆分的数据类型)统一称作“原子类型”。原子类型的DataStream,转换之后就成了只有一列的Table,列字段(field)的数据类型可以由原子类型推断出。另外,还可以在fromDataStream()方法里增加参数,用来重新命名列字段。

StreamTableEnvironment tableEnv = ...;

DataStream stream = ...;

// 将数据流转换成动态表,动态表只有一个字段,重命名为myLong

Table table = tableEnv.fromDataStream(stream, $("myLong"));

(2)Tuple类型 当原子类型不做重命名时,默认的字段名就是“f0”,容易想到,这其实就是将原子类型看作了一元组Tuple1的处理结果。 Table支持Flink中定义的元组类型Tuple,对应在表中字段名默认就是元组中元素的属性名f0、f1、f2…。所有字段都可以被重新排序,也可以提取其中的一部分字段。字段还可以通过调用表达式的as()方法来进行重命名。

StreamTableEnvironment tableEnv = ...;

DataStream> stream = ...;

// 将数据流转换成只包含f1字段的表

Table table = tableEnv.fromDataStream(stream, $("f1"));

// 将数据流转换成包含f0和f1字段的表,在表中f0和f1位置交换

Table table = tableEnv.fromDataStream(stream, $("f1"), $("f0"));

// 将f1字段命名为myInt,f0命名为myLong

Table table = tableEnv.fromDataStream(stream, $("f1").as("myInt"), $("f0").as("myLong"));

(3)POJO 类型 Flink也支持多种数据类型组合成的“复合类型”,最典型的就是简单Java对象(POJO 类型)。由于POJO中已经定义好了可读性强的字段名,这种类型的数据流转换成Table就显得无比顺畅了。 将POJO类型的DataStream转换成Table,如果不指定字段名称,就会直接使用原始 POJO 类型中的字段名称。POJO中的字段同样可以被重新排序、提却和重命名。

StreamTableEnvironment tableEnv = ...;

DataStream stream = ...;

Table table = tableEnv.fromDataStream(stream);

Table table = tableEnv.fromDataStream(stream, $("user"));

Table table = tableEnv.fromDataStream(stream, $("user").as("myUser"), $("url").as("myUrl"));

(4)Row类型 Flink中还定义了一个在关系型表中更加通用的数据类型——行(Row),它是Table中数据的基本组织形式。 Row类型也是一种复合类型,它的长度固定,而且无法直接推断出每个字段的类型,所以在使用时必须指明具体的类型信息;我们在创建Table时调用的CREATE语句就会将所有的字段名称和类型指定,这在Flink中被称为表的“模式结构”(Schema)

4)综合应用示例 现在,我们可以将介绍过的所有API整合起来,写出一段完整的代码。同样还是用户的一组点击事件,我们可以查询出某个用户(例如Alice)点击的url列表,也可以统计出每个用户累计的点击次数,这可以用两句SQL来分别实现。具体代码如下:

public class TableStreamDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource sensorDS = env.fromElements(

new WaterSensor("s1", 1L, 1),

new WaterSensor("s1", 2L, 2),

new WaterSensor("s2", 2L, 2),

new WaterSensor("s3", 3L, 3),

new WaterSensor("s3", 4L, 4)

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// TODO 1. 流转表

Table sensorTable = tableEnv.fromDataStream(sensorDS);

tableEnv.createTemporaryView("sensor", sensorTable);

Table filterTable = tableEnv.sqlQuery("select id,ts,vc from sensor where ts>2");

Table sumTable = tableEnv.sqlQuery("select id,sum(vc) from sensor group by id");

// TODO 2. 表转流

// 2.1 追加流

tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");

// 2.2 changelog流(结果需要更新)

tableEnv.toChangelogStream(sumTable ).print("sum");

// 只要代码中调用了 DataStreamAPI,就需要 execute,否则不需要

env.execute();

}

}

自定义函数(UDF)

Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:

标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。

1)整体调用流程 要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。

(1)注册函数 注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:

// 注册函数

tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);

我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。

(2)使用Table API调用函数 在Table API中,需要使用call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField")));

这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。

(3)在SQL中调用函数 当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");

可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。 2)标量函数(Scalar Functions) 自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换。 想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。 这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。 下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。

public class MyScalarFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource sensorDS = env.fromElements(

new WaterSensor("s1", 1L, 1),

new WaterSensor("s1", 2L, 2),

new WaterSensor("s2", 2L, 2),

new WaterSensor("s3", 3L, 3),

new WaterSensor("s3", 4L, 4)

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table sensorTable = tableEnv.fromDataStream(sensorDS);

tableEnv.createTemporaryView("sensor", sensorTable);

// TODO 2.注册函数

tableEnv.createTemporaryFunction("HashFunction", HashFunction.class);

// TODO 3.调用 自定义函数

// 3.1 sql用法

// tableEnv.sqlQuery("select HashFunction(id) from sensor")

// .execute() // 调用了 sql的execute,就不需要 env.execute()

// .print();

// 3.2 table api用法

sensorTable

.select(call("HashFunction",$("id")))

.execute()

.print();

}

// TODO 1.定义 自定义函数的实现类

public static class HashFunction extends ScalarFunction{

// 接受任意类型的输入,返回 INT型输出

public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {

return o.hashCode();

}

}

}

这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。 另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。 3)表函数(Table Functions) 跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。 类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。 在SQL中调用表函数,需要使用LATERAL TABLE()来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。 下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。

public class MyTableFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource strDS = env.fromElements(

"hello flink",

"hello world hi",

"hello java"

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table sensorTable = tableEnv.fromDataStream(strDS, $("words"));

tableEnv.createTemporaryView("str", sensorTable);

// TODO 2.注册函数

tableEnv.createTemporaryFunction("SplitFunction", SplitFunction.class);

// TODO 3.调用 自定义函数

// 3.1 交叉联结

tableEnv

// 3.1 交叉联结

// .sqlQuery("select words,word,length from str,lateral table(SplitFunction(words))")

// 3.2 带 on true 条件的 左联结

// .sqlQuery("select words,word,length from str left join lateral table(SplitFunction(words)) on true")

// 重命名侧向表中的字段

.sqlQuery("select words,newWord,newLength from str left join lateral table(SplitFunction(words)) as T(newWord,newLength) on true")

.execute()

.print();

}

// TODO 1.继承 TableFunction<返回的类型>

// 类型标注: Row包含两个字段:word和length

@FunctionHint(output = @DataTypeHint("ROW"))

public static class SplitFunction extends TableFunction {

// 返回是 void,用 collect方法输出

public void eval(String str) {

for (String word : str.split(" ")) {

collect(Row.of(word, word.length()));

}

}

}

}

这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。 4)聚合函数(Aggregate Functions) 用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。 聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。 自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。 Flink SQL中的聚合函数的工作原理如下: (1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。 (2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。 (3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。 所以,每个 AggregateFunction 都必须实现以下几个方法:

createAccumulator() 这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。accumulate() 这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。getValue() 这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。 在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。 AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。 下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。

public class MyAggregateFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 姓名,分数,权重

DataStreamSource> scoreWeightDS = env.fromElements(

Tuple3.of("zs",80, 3),

Tuple3.of("zs",90, 4),

Tuple3.of("zs",95, 4),

Tuple3.of("ls",75, 4),

Tuple3.of("ls",65, 4),

Tuple3.of("ls",85, 4)

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table scoreWeightTable = tableEnv.fromDataStream(scoreWeightDS, $("f0").as("name"),$("f1").as("score"), $("f2").as("weight"));

tableEnv.createTemporaryView("scores", scoreWeightTable);

// TODO 2.注册函数

tableEnv.createTemporaryFunction("WeightedAvg", WeightedAvg.class);

// TODO 3.调用 自定义函数

tableEnv

.sqlQuery("select name,WeightedAvg(score,weight) from scores group by name")

.execute()

.print();

}

// TODO 1.继承 AggregateFunction< 返回类型,累加器类型<加权总和,权重总和> >

public static class WeightedAvg extends AggregateFunction> {

@Override

public Double getValue(Tuple2 integerIntegerTuple2) {

return integerIntegerTuple2.f0 * 1D / integerIntegerTuple2.f1;

}

@Override

public Tuple2 createAccumulator() {

return Tuple2.of(0, 0);

}

/**

* 累加计算的方法,每来一行数据都会调用一次

* @param acc 累加器类型

* @param score 第一个参数:分数

* @param weight 第二个参数:权重

*/

public void accumulate(Tuple2 acc,Integer score,Integer weight){

acc.f0 += score * weight; // 加权总和 = 分数1 * 权重1 + 分数2 * 权重2 +....

acc.f1 += weight; // 权重和 = 权重1 + 权重2 +....

}

}

}

聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。 5)表聚合函数(Table Aggregate Functions) 用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。 自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:

createAccumulator() 创建累加器的方法,与AggregateFunction中用法相同。accumulate() 聚合计算的核心方法,与AggregateFunction中用法相同。emitValue() 所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为Collect。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。 表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。 具体代码如下:

public class MyTableAggregateFunctionDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 姓名,分数,权重

DataStreamSource numDS = env.fromElements(3, 6, 12, 5, 8, 9, 4);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table numTable = tableEnv.fromDataStream(numDS, $("num"));

// TODO 2.注册函数

tableEnv.createTemporaryFunction("Top2", Top2.class);

// TODO 3.调用 自定义函数: 只能用 Table API

numTable

.flatAggregate(call("Top2", $("num")).as("value", "rank"))

.select( $("value"), $("rank"))

.execute().print();

}

// TODO 1.继承 TableAggregateFunction< 返回类型,累加器类型<加权总和,权重总和> >

// 返回类型 (数值,排名) =》 (12,1) (9,2)

// 累加器类型 (第一大的数,第二大的数) ===》 (12,9)

public static class Top2 extends TableAggregateFunction, Tuple2> {

@Override

public Tuple2 createAccumulator() {

return Tuple2.of(0, 0);

}

/**

* 每来一个数据调用一次,比较大小,更新 最大的前两个数到 acc中

*

* @param acc 累加器

* @param num 过来的数据

*/

public void accumulate(Tuple2 acc, Integer num) {

if (num > acc.f0) {

// 新来的变第一,原来的第一变第二

acc.f1 = acc.f0;

acc.f0 = num;

} else if (num > acc.f1) {

// 新来的变第二,原来的第二不要了

acc.f1 = num;

}

}

/**

* 输出结果: (数值,排名)两条最大的

*

* @param acc 累加器

* @param out 采集器<返回类型>

*/

public void emitValue(Tuple2 acc, Collector> out) {

if (acc.f0 != 0) {

out.collect(Tuple2.of(acc.f0, 1));

}

if (acc.f1 != 0) {

out.collect(Tuple2.of(acc.f1, 2));

}

}

}

}

目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用。 这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。统计num值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: