文章目录
Table Api的常用操作创建表环境从datastream创建一张表指定主键字段取别名提取时间字段(用于timewindow)Watermark
创建临时视图创建临时表或者表查询操作查询选取其中某些列distinct去重查询过滤filter分组聚合
Table Api的常用操作
创建表环境
//构建环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//构建table环境
val tableEnvironmentSettings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, tableEnvironmentSettings)
EnvironmentSettings.newInstance().inStreamingMode() 流模式 Flink表环境默认的处理模式就是流模式EnvironmentSettings.newInstance().inBatchMode() 批模式
从datastream创建一张表
老版本的Flink的实现如下:
val inputTable = tableEnv.fromDataStream("sensor_table",datastream,
$("id"),
$("timestamp")
$("temperature"))
但是在新的版本中这一方式已经被移除掉了,使用下面的方式定义表结构
val inputTable = tableEnv.fromDataStream(
datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING())
.column("timestamp",DataTypes.BIGINT())
.column("temperature",DataTypes.DOUBLE())
.build()
)
指定主键
单值主键
val inputTable = tableEnv.fromDataStream(
datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING().notNull())
.column("timestamp",DataTypes.BIGINT())
.column("temperature",DataTypes.DOUBLE())
.primaryKey("id")
.build()
)
复合主键
val inputTable = tableEnv.fromDataStream(
datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING().notNull())
.column("timestamp",DataTypes.BIGINT().notNull())
.column("temperature",DataTypes.DOUBLE())
.primaryKey("id","timestamp")
.build()
)
字段取别名
//整张表按字段顺序重命名
inputTable.as("id","timestamp","ctime")
//重命名指定字段
inputTable.renameColumns($"id","sensor_id")
提取时间字段(用于timewindow)
事件时间EventTime
val inputTable = tableEnv.fromDataStream(
datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING().notNull())
.column("timestamp",DataTypes.BIGINT().notNull())
.column("temperature",DataTypes.DOUBLE())
.primaryKey("id","timestamp")
.columnByExpression("rowtime","CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3))")
.build()
)
对应的DDL操作
CREATE TABLE sensor(
id STRING NOT NULL,
timestamp BIGINT NOT NULL,
temperature DOUBLE NOT NULL,
`proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS proctime(),
CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
)
处理时间ProcessTime
val inputTable = tableEnv.fromDataStream(
datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING().notNull())
.column("timestamp",DataTypes.BIGINT().notNull())
.column("temperature",DataTypes.DOUBLE())
.primaryKey("id","timestamp")
.columnByExpression("procTime","proctime()")
.build()
)
对应的DDL操作
CREATE TABLE sensor(
id STRING NOT NULL,
timestamp BIGINT NOT NULL,
temperature DOUBLE NOT NULL,
`rowtime` TIMESTAMP(3) AS CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3)),
CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
)
Watermark
方式一:在DataStream流上定义好时间戳和watermark
datastream.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[SensorReading](Duration.ofSeconds(5))
.withTimestampAssigner(
new SerializableTimestampAssigner[SensorReading] {
override def extractTimestamp(element: SensorReading,
recordTimestamp: Long): Long = {
element.timestamp * 1000L
}
})
)
//然后在表中可以指定相应的时间戳字段
val inputTable2 = tableEnv.fromDataStream(datastream,
$("id"),
$("timestamp").rowtime(),
$("temperature"))
方式二:TableAPI实现
val inputTable = tableEnv.fromDataStream(
datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING().notNull())
.column("timestamp",DataTypes.BIGINT().notNull())
.column("temperature",DataTypes.DOUBLE())
.primaryKey("id","timestamp")
.columnByExpression("rowtime","CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3))")
.watermark("rowtime","rowtime - interval '5' SECOND ")
.build()
)
方式三:SQL实现
语法:WATERMARK FOR order_time AS order_time - INTERVAL ‘5’ SECOND
其中的5,表示的最大延迟时间
CREATE TABLE sensor(
`id` STRING NOT NULL,
`timestamp` BIGINT NOT NULL,
`temperature` DOUBLE,
`rowtime` TIMESTAMP(3) AS CAST(TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)) AS TIMESTAMP(3)),
WATERMARK FOR `rowtime`: TIMESTAMP(3) AS rowtime - interval '5' SECOND ,
CONSTRAINT `PK_id_timestamp` PRIMARY KEY (`id`, `timestamp`) NOT ENFORCED
)
创建临时视图
方式一:tableAPI
tableEnv.createTemporaryView("sensor_view",datastream,
Schema
.newBuilder()
.column("id",DataTypes.STRING())
.build()
)
val result = tableEnv.sqlQuery("SELECT * FROM sensor_view")
tableEnv.toDataStream(result).print()
方式二:SQL语句实现
|CREATE TEMPORARY VIEW sensor_view
|AS
|SELECT id,temperature,rowtime FROM sensor
创建临时表或者表
方式一:TABLE API
具体设置见TableAPI的Connector操作一文
val targetTable = tableEnv.createTemporaryTable("sensor_temp_table",
TableDescriptor
.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id",DataTypes.STRING())
.column("temperature",DataTypes.DOUBLE())
.build())
.format("csv")
.option("path","D:\\LearnWorkSpace\\FlinkDemo\\src\\main\\resources\\out")
.build()
)
方式二:SQL
CREATE TEMPORARY TABLE sensor_temp_table (
id STRING NOT NULL,
temperature DOUBLE
) WITH (
'connector'='filesystem',
'format'='csv',
'path'='file:///filepath/'
...
)
查询操作
查询选取其中某些列
方式一:Table Api操作
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)
tableEnv.toDataStream(result).print()
方式二:SQL实现
SELECT id,timestamp,temperature,rowtime FROM inputtable
distinct去重
方式一:Table Api操作
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime).dictinct()
tableEnv.toDataStream(result).print()
方式二:SQL实现
SELECT distinct id,timestamp,temperature,rowtime FROM inputtable
查询过滤filter
.filter 可以级联操作,多个filter之间是and的关系
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)
.filter($"id" === "sensor_1")
.filter($"temperature" >= 40.8 )
.distinct()
tableEnv.toDataStream(result).print()
.filter 里面也可以写多个条件,多个条件之间可以使用“or”或者“and”连接。
var result = inputTable.select($"id",$"timestamp",'temperature,'rowtime)
.filter($"id" === "sensor_1" or $"id" === "sensor_2")
.distinct()
tableEnv.toDataStream(result).print()
SQL实现
SELECT distinct id,timestamp,temperature,rowtime FROM inputtable
WHERE id = 'sensor_1' or id = 'sensor_2'
然后使用tableEnv.sqlQuery(sqlStatement)执行查询
分组聚合
方式一:Table Api操作
var result = inputTable
.groupBy($"id")
.select($"id",$"id".count() as "cnt")
tableEnv.toDataStream(result).print()
方式二:SQL实现
SELECT id,count(id) as cnt FROM inputtable
group by id
相关链接
大家都在找:
flink:flink面试题
学习:学习通
scala:scala是什么
发表评论