文章目录

Table Api的常用操作创建表环境从datastream创建一张表指定主键字段取别名提取时间字段(用于timewindow)Watermark

创建临时视图创建临时表或者表查询操作查询选取其中某些列distinct去重查询过滤filter分组聚合

Table Api的常用操作

创建表环境

//构建环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

//构建table环境

val tableEnvironmentSettings = EnvironmentSettings

.newInstance()

.inStreamingMode()

.build()

val tableEnv = StreamTableEnvironment.create(env, tableEnvironmentSettings)

EnvironmentSettings.newInstance().inStreamingMode() 流模式 Flink表环境默认的处理模式就是流模式EnvironmentSettings.newInstance().inBatchMode() 批模式

从datastream创建一张表

老版本的Flink的实现如下:

val inputTable = tableEnv.fromDataStream("sensor_table",datastream,

$("id"),

$("timestamp")

$("temperature"))

但是在新的版本中这一方式已经被移除掉了,使用下面的方式定义表结构

val inputTable = tableEnv.fromDataStream(

datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING())

.column("timestamp",DataTypes.BIGINT())

.column("temperature",DataTypes.DOUBLE())

.build()

)

指定主键

单值主键

val inputTable = tableEnv.fromDataStream(

datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING().notNull())

.column("timestamp",DataTypes.BIGINT())

.column("temperature",DataTypes.DOUBLE())

.primaryKey("id")

.build()

)

复合主键

val inputTable = tableEnv.fromDataStream(

datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING().notNull())

.column("timestamp",DataTypes.BIGINT().notNull())

.column("temperature",DataTypes.DOUBLE())

.primaryKey("id","timestamp")

.build()

)

字段取别名

//整张表按字段顺序重命名

inputTable.as("id","timestamp","ctime")

//重命名指定字段

inputTable.renameColumns($"id","sensor_id")

提取时间字段(用于timewindow)

事件时间EventTime

val inputTable = tableEnv.fromDataStream(

datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING().notNull())

.column("timestamp",DataTypes.BIGINT().notNull())

.column("temperature",DataTypes.DOUBLE())

.primaryKey("id","timestamp")

.columnByExpression("rowtime","CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3))")

.build()

)

对应的DDL操作

CREATE TABLE sensor(

id STRING NOT NULL,

timestamp BIGINT NOT NULL,

temperature DOUBLE NOT NULL,

`proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS proctime(),

CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED

)

处理时间ProcessTime

val inputTable = tableEnv.fromDataStream(

datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING().notNull())

.column("timestamp",DataTypes.BIGINT().notNull())

.column("temperature",DataTypes.DOUBLE())

.primaryKey("id","timestamp")

.columnByExpression("procTime","proctime()")

.build()

)

对应的DDL操作

CREATE TABLE sensor(

id STRING NOT NULL,

timestamp BIGINT NOT NULL,

temperature DOUBLE NOT NULL,

`rowtime` TIMESTAMP(3) AS CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3)),

CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED

)

Watermark

方式一:在DataStream流上定义好时间戳和watermark

datastream.assignTimestampsAndWatermarks(

WatermarkStrategy

.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5))

.withTimestampAssigner(

new SerializableTimestampAssigner[SensorReading] {

override def extractTimestamp(element: SensorReading,

recordTimestamp: Long): Long = {

element.timestamp * 1000L

}

})

)

//然后在表中可以指定相应的时间戳字段

val inputTable2 = tableEnv.fromDataStream(datastream,

$("id"),

$("timestamp").rowtime(),

$("temperature"))

方式二:TableAPI实现

val inputTable = tableEnv.fromDataStream(

datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING().notNull())

.column("timestamp",DataTypes.BIGINT().notNull())

.column("temperature",DataTypes.DOUBLE())

.primaryKey("id","timestamp")

.columnByExpression("rowtime","CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3))")

.watermark("rowtime","rowtime - interval '5' SECOND ")

.build()

)

方式三:SQL实现

语法:WATERMARK FOR order_time AS order_time - INTERVAL ‘5’ SECOND

其中的5,表示的最大延迟时间

CREATE TABLE sensor(

`id` STRING NOT NULL,

`timestamp` BIGINT NOT NULL,

`temperature` DOUBLE,

`rowtime` TIMESTAMP(3) AS CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3)),

WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - interval '5' SECOND ,

CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED

)

创建临时视图

方式一:tableAPI

tableEnv.createTemporaryView("sensor_view",datastream,

Schema

.newBuilder()

.column("id",DataTypes.STRING())

.build()

)

val result = tableEnv.sqlQuery("SELECT * FROM sensor_view")

tableEnv.toDataStream(result).print()

方式二:SQL语句实现

|CREATE TEMPORARY VIEW sensor_view

|AS

|SELECT id,temperature,rowtime FROM sensor

创建临时表或者表

方式一:TABLE API

具体设置见TableAPI的Connector操作一文

val targetTable = tableEnv.createTemporaryTable("sensor_temp_table",

TableDescriptor

.forConnector("filesystem")

.schema(Schema.newBuilder()

.column("id",DataTypes.STRING())

.column("temperature",DataTypes.DOUBLE())

.build())

.format("csv")

.option("path","D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\out")

.build()

)

方式二:SQL

CREATE TEMPORARY TABLE sensor_temp_table (

id STRING NOT NULL,

temperature DOUBLE

) WITH (

'connector'='filesystem',

'format'='csv',

'path'='file:///filepath/'

...

)

查询操作

查询选取其中某些列

方式一:Table Api操作

var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)

tableEnv.toDataStream(result).print()

方式二:SQL实现

SELECT id,timestamp,temperature,rowtime FROM inputtable

distinct去重

方式一:Table Api操作

var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime).dictinct()

tableEnv.toDataStream(result).print()

方式二:SQL实现

SELECT distinct id,timestamp,temperature,rowtime FROM inputtable

查询过滤filter

.filter 可以级联操作,多个filter之间是and的关系

var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)

.filter($"id" === "sensor_1")

.filter($"temperature" >= 40.8 )

.distinct()

tableEnv.toDataStream(result).print()

.filter 里面也可以写多个条件,多个条件之间可以使用“or”或者“and”连接。

var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)

.filter($"id" === "sensor_1" or $"id" === "sensor_2")

.distinct()

tableEnv.toDataStream(result).print()

SQL实现

SELECT distinct id,timestamp,temperature,rowtime FROM inputtable

WHERE id = 'sensor_1' or id = 'sensor_2'

然后使用tableEnv.sqlQuery(sqlStatement)执行查询

分组聚合

方式一:Table Api操作

var result = inputTable

.groupBy($"id")

.select($"id",$"id".count() as "cnt")

tableEnv.toDataStream(result).print()

方式二:SQL实现

SELECT id,count(id) as cnt FROM inputtable

group by id

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: