一、Table API & SQL

注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现Flink中所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

Table API和SQL的由来: Flink针对标准的流处理和批处理提供了两种关系型API,Table API和SQL。Table API允许用户以一种很直观的方式进行select 、filter和join操作。 Flink SQL基于 Apache Calcite实现标准SQL。针对批处理和流处理可以提供相同的处理语义和结果。 Flink Table API、SQL和Flink的DataStream API、DataSet API是紧密联系在一起的。

Table API和SQL是一种关系型 API,用户可以像操作 Mysql 数据库表一样的操作数据,而不需要写代码,更不需要手工的对代码进行调优。 另外,SQL 作为一个非程序员可操作的语言,学习成本很低,如果一个系统提供 SQL 支持,将很容易被用户接受。

如果你想要使用Table API 和SQL的话,需要添加下面的依赖。

org.apache.flink

flink-table-api-java-bridge_2.12

1.11.0

provided

org.apache.flink

flink-table-api-scala-bridge_2.12

1.11.0

provided

如果你想在 本地 IDE中运行程序,还需要添加下面的依赖。

org.apache.flink

flink-table-planner-blink_2.12

1.11.0

provided

如果你用到了老的执行引擎,还需要添加下面这个依赖。

org.apache.flink

flink-table-planner_2.12

1.11.1

provided

注意:由于部分 table 相关的代码是用 Scala 实现的,所以,这个依赖也是必须的。【这个依赖我们在前面开发DataStream程序的时候已经添加过了】

org.apache.flink

flink-streaming-scala_2.12

1.11.0

provided

Table API和SQL通过join API集成在一起,这个join API的核心概念是Table,Table可以作为查询的输入和输出。

针对Table API和SQL我们主要讲解以下内容 1:Table API和SQL的使用 2:DataStream、DataSet和Table之间的互相转换

二、Table API 和SQL的使用

想要使用Table API 和SQL,首先要创建一个TableEnvironment对象。

1、创建一个TableEnvironment对象

下面我们来创建一个TableEnvironment对象 scala代码如下:

package com.imooc.scala.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**

* 创建TableEnvironment对象

*

*/

object CreateTableEnvironmentScala {

def main(args: Array[String]): Unit = {

/**

* 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换

* 则针对stream和batch都可以使用TableEnvironment

*/

//指定底层使用Blink引擎,以及数据处理模式-stream

//从1.11版本开始,Blink引擎成为Table API和SQL的默认执行引擎,在生产环境下面,推荐使用Blink引擎

val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

//创建TableEnvironment对象

val sTableEnv = TableEnvironment.create(sSettings)

//指定底层使用Blink引擎,以及数据处理模式-batch

val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

//创建TableEnvironment对象

val bTableEnv = TableEnvironment.create(bSettings)

/**

* 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换

* 针对stream需要使用StreamTableEnvironment

* 针对batch需要使用BatchTableEnvironment

*/

//创建StreamTableEnvironment

val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment

val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

//创建BatchTableEnvironment

//注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换

val bbEnv = ExecutionEnvironment.getExecutionEnvironment

val bbTableEnv = BatchTableEnvironment.create(bbEnv)

}

}

java代码如下:

package com.imooc.java.tablesql;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

* 创建TableEnvironment对象

*

*/

public class CreateTableEnvironmentJava {

public static void main(String[] args) {

/**

* 注意:如果Table API和SQL不需要和DataStream或者DataSet互相转换

* 则针对stream和batch都可以使用TableEnvironment

*/

//创建TableEnvironment对象-stream

EnvironmentSettings sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

TableEnvironment sTableEnv = TableEnvironment.create(sSettings);

//创建TableEnvironment对象-batch

EnvironmentSettings bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

TableEnvironment bTableEnv = TableEnvironment.create(bSettings);

/**

* 注意:如果Table API和SQL需要和DataStream或者DataSet互相转换

* 针对stream需要使用StreamTableEnvironment

* 针对batch需要使用BatchTableEnvironment

*/

//创建StreamTableEnvironment

StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);

//创建BatchTableEnvironment

//注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换

ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);

}

}

2、下Table API和 SQL的使用

下面我们来演示一下Table API和 SQL的使用 目前创建Table的很多方法都过时了,都不推荐使用了,例如:registerTableSource、connect等方法 目前官方推荐使用executeSql的方式,executeSql里面支持DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE等语法

下面我们来演示一下

scala代码如下:

package com.imooc.scala.tablesql

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

/**

* TableAPI 和 SQL的使用

*

*/

object TableAPIAndSQLOpScala {

def main(args: Array[String]): Unit = {

//获取TableEnvironment

val sSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode().build

val sTableEnv = TableEnvironment.create(sSettings)

//创建输入表

/**

* connector.type:指定connector的类型

* connector.path:指定文件或者目录地址

* format.type:文件数据格式化类型,现在只支持csv格式

* 注意:SQL语句如果出现了换行,行的末尾可以添加空格或者\n都可以,最后一行不用添加

*/

sTableEnv.executeSql("" +

"create table myTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\source',\n" +

"'format.type' = 'csv'\n" +

")")

//使用Table API实现数据查询和过滤等操作

/*import org.apache.flink.table.api._

val result = sTableEnv.from("myTable")

.select($"id",$"name")

.filter($"id" > 1)*/

//使用SQL实现数据查询和过滤等操作

val result = sTableEnv.sqlQuery("select id,name from myTable where id > 1")

//输出结果到控制台

result.execute.print()

//创建输出表

sTableEnv.executeSql("" +

"create table newTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\res',\n" +

"'format.type' = 'csv'\n" +

")")

//输出结果到表newTable中

result.executeInsert("newTable")

}

}

注意:针对SQL建表语句的写法还有一种比较清晰的写法

sTableEnv.executeSql(

"""

|create table myTable(

|id int,

|name string

|) with (

|'connector.type' = 'filesystem',

|'connector.path' = 'D:\data\source',

|'format.type' = 'csv'

|)

|""".stripMargin)

java代码如下:

package com.imooc.java.tablesql;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**

* TableAPI 和 SQL的使用

*

*/

public class TableAPIAndSQLOpJava {

public static void main(String[] args) {

//获取TableEnvironment

EnvironmentSettings sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

TableEnvironment sTableEnv = TableEnvironment.create(sSettings);

//创建输入表

sTableEnv.executeSql("" +

"create table myTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\source',\n" +

"'format.type' = 'csv'\n" +

")");

//使用Table API实现数据查询和过滤等操作

/*Table result = sTableEnv.from("myTable")

.select($("id"), $("name"))

.filter($("id").isGreater(1));*/

//使用SQL实现数据查询和过滤等操作

Table result = sTableEnv.sqlQuery("select id,name from myTable where id > 1");

//输出结果到控制台

result.execute().print();

//创建输出表

sTableEnv.executeSql("" +

"create table newTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\res',\n" +

"'format.type' = 'csv'\n" +

")");

//输出结果到表newTable中

result.executeInsert("newTable");

}

}

三、DataStream、DataSet和Table之间的互相转换

Table API和SQL可以很容易的和DataStream和DataSet程序集成到一块。 通过TableEnvironment ,可以把DataStream或者DataSet注册为Table,这样就可以使用Table API和SQL查询了。 通过TableEnvironment 也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream或者DataSet中的相关API了。

1:使用DataStream创建表

主要包含下面这两种情况

使用DataStream创建view视图

使用DataStream创建table对象

scala代码如下:

package com.imooc.scala.tablesql

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api.EnvironmentSettings

import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**

* 将DataStream转换成表

*

*/

object DataStreamToTableScala {

def main(args: Array[String]): Unit = {

//获取StreamTableEnvironment

val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment

val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

//获取DataStream

import org.apache.flink.api.scala._

val stream = ssEnv.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mack")))

//第一种:将DataStream转换为view视图

import org.apache.flink.table.api._

ssTableEnv.createTemporaryView("myTable",stream,'id,'name)

ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()

//第二种:将DataStream转换为table对象

val table = ssTableEnv.fromDataStream(stream, $"id", $"name")

table.select($"id",$"name")

.filter($"id" > 1)

.execute()

.print()

//注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果

}

}

java代码如下:

package com.imooc.java.tablesql;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.ArrayList;

import static org.apache.flink.table.api.Expressions.$;

/**

* 将DataStream转换成表

*

*/

public class DataStreamToTableJava {

public static void main(String[] args) {

//获取StreamTableEnvironment

StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);

//获取DataStream

ArrayList> data = new ArrayList<>();

data.add(new Tuple2(1,"jack"));

data.add(new Tuple2(2,"tom"));

data.add(new Tuple2(3,"mick"));

DataStreamSource> stream = ssEnv.fromCollection(data);

//第一种:将DataStream转换为view视图

ssTableEnv.createTemporaryView("myTable",stream,$("id"),$("name"));

ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print();

//第二种:将DataStream转换为table对象

Table table = ssTableEnv.fromDataStream(stream, $("id"), $("name"));

table.select($("id"), $("name"))

.filter($("id").isGreater(1))

.execute()

.print();

}

}

2:使用DataSet创建表

注意:此时只能使用旧的执行引擎,新的Blink执行引擎不支持和DataSet转换

scala代码如下:

package com.imooc.scala.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api.EnvironmentSettings

import org.apache.flink.table.api.bridge.scala.{BatchTableEnvironment, StreamTableEnvironment}

/**

* 将DataSet转换成表

*

*/

object DataSetToTableScala {

def main(args: Array[String]): Unit = {

//获取BatchTableEnvironment

val bbEnv = ExecutionEnvironment.getExecutionEnvironment

val bbTableEnv = BatchTableEnvironment.create(bbEnv)

//获取DataSet

import org.apache.flink.api.scala._

val set = bbEnv.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mack")))

//第一种:将DataSet转换为view视图

import org.apache.flink.table.api._

bbTableEnv.createTemporaryView("myTable",set,'id,'name)

bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print()

//第二种:将DataSet转换为table对象

val table = bbTableEnv.fromDataSet(set, $"id", $"name")

table.select($"id",$"name")

.filter($"id" > 1)

.execute()

.print()

//注意:'id,'name 和 $"id", $"name" 这两种写法是一样的效果

}

}

java代码如下:

package com.imooc.java.tablesql;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.ArrayList;

import static org.apache.flink.table.api.Expressions.$;

/**

* 将DataSet转换成表

*

*/

public class DataSetToTableJava {

public static void main(String[] args) {

//获取BatchTableEnvironment

ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);

//获取DataSet

ArrayList> data = new ArrayList<>();

data.add(new Tuple2(1,"jack"));

data.add(new Tuple2(2,"tom"));

data.add(new Tuple2(3,"mick"));

DataSource> set = bbEnv.fromCollection(data);

//第一种:将DataSet转换为view视图

bbTableEnv.createTemporaryView("myTable",set,$("id"),$("name"));

bbTableEnv.sqlQuery("select * from myTable where id > 1").execute().print();

//第二种:将DataSet转换为table对象

Table table = bbTableEnv.fromDataSet(set, $("id"), $("name"));

table.select($("id"), $("name"))

.filter($("id").isGreater(1))

.execute()

.print();

}

}

将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即,Table 的每行数据要转换成的数据类型。 通常最方便的选择是转换成 Row 。

以下列表概述了不同选项的功能:

Row: 通过角标映射字段,支持任意数量的字段,支持 null 值,无类型安全(type-safe)检查。

POJO: Java中的实体类,这个实体类中的字段名称需要和Table中的字段名称保持一致,支持任意数量的字段,支持null值,有类型安全检查。

Case Class: 通过角标映射字段,不支持null值,有类型安全检查。

Tuple: 通过角标映射字段,Scala中限制22个字段,Java中限制25个字段,不支持null值,有类型安全检查。

Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。

3:将表转换成 DataStream

流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此动态查询的DataStream需要对表的更新进行编码。 有几种模式可以将Table转换为DataStream。

Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时(仅附加),之前添加的数据不会被更新。

Retract Mode:可以始终使用此模式,它使用一个Boolean标识来编码INSERT和DELETE更改。

Scala代码如下:

package com.imooc.scala.tablesql

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.table.api.EnvironmentSettings

import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import org.apache.flink.types.Row

/**

* 将table转换成 DataStream

*

*/

object TableToDataStreamScala {

def main(args: Array[String]): Unit = {

//获取StreamTableEnvironment

val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment

val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)

//创建输入表

ssTableEnv.executeSql("" +

"create table myTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\source',\n" +

"'format.type' = 'csv'\n" +

")")

//获取table

val table = ssTableEnv.from("myTable")

//将table转换为DataStream

//如果只有新增(追加)操作,可以使用toAppendStream

import org.apache.flink.api.scala._

val appStream = ssTableEnv.toAppendStream[Row](table)

appStream.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))

.print()

//如果有增加操作,还有删除操作,则使用toRetractStream

val retStream = ssTableEnv.toRetractStream[Row](table)

retStream.map(tup=>{

val flag = tup._1

val row = tup._2

val id = row.getField(0).toString.toInt

val name = row.getField(1).toString

(flag,id,name)

}).print()

//注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvironment中的execute方法了

ssEnv.execute("TableToDataStreamScala")

}

}

java代码如下:

package com.imooc.java.tablesql;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

/**

* 将table转换成 DataStream

*

*/

public class TableToDataStreamJava {

public static void main(String[] args) throws Exception{

//获取StreamTableEnvironment

StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);

//创建输入表

ssTableEnv.executeSql("" +

"create table myTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\source',\n" +

"'format.type' = 'csv'\n" +

")");

//获取table

Table table = ssTableEnv.from("myTable");

//将table转换为DataStream

//如果只有新增(追加)操作,可以使用toAppendStream

DataStream appStream = ssTableEnv.toAppendStream(table, Row.class);

appStream.map(new MapFunction>() {

@Override

public Tuple2 map(Row row)

throws Exception {

int id = Integer.parseInt(row.getField(0).toString());

String name = row.getField(1).toString();

return new Tuple2(id, name);

}

}).print();

//如果有增加操作,还有删除操作,则使用toRetractStream

DataStream> retStream = ssTableEnv.toRetractStream(table, Row.class);

retStream.map(new MapFunction, Tuple3>() {

@Override

public Tuple3 map(Tuple2 tup)

throws Exception {

Boolean flag = tup.f0;

int id = Integer.parseInt(tup.f1.getField(0).toString());

String name = tup.f1.getField(1).toString();

return new Tuple3(flag, id, name);

}

}).print();

ssEnv.execute("TableToDataStreamJava");

}

}

4:将表转换成 DataSet

Scala代码如下:

package com.imooc.scala.tablesql

import org.apache.flink.api.scala.ExecutionEnvironment

import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment

import org.apache.flink.types.Row

/**

* 将table转换成 DataSet

*

*/

object TableToDataSetScala {

def main(args: Array[String]): Unit = {

//获取BatchTableEnvironment

val bbEnv = ExecutionEnvironment.getExecutionEnvironment

val bbTableEnv = BatchTableEnvironment.create(bbEnv)

//创建输入表

bbTableEnv.executeSql("" +

"create table myTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\source',\n" +

"'format.type' = 'csv'\n" +

")")

//获取table

val table = bbTableEnv.from("myTable")

//将table转换为DataSet

import org.apache.flink.api.scala._

val set = bbTableEnv.toDataSet[Row](table)

set.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))

.print()

}

}

Java代码如下:

package com.imooc.java.tablesql;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

/**

* 将table转换成 DataSet

*

*/

public class TableToDataSetJava {

public static void main(String[] args) throws Exception{

//获取BatchTableEnvironment

ExecutionEnvironment bbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment bbTableEnv = BatchTableEnvironment.create(bbEnv);

//创建输入表

bbTableEnv.executeSql("" +

"create table myTable(\n" +

"id int,\n" +

"name string\n" +

") with (\n" +

"'connector.type' = 'filesystem',\n" +

"'connector.path' = 'D:\\data\\source',\n" +

"'format.type' = 'csv'\n" +

")");

//获取table

Table table = bbTableEnv.from("myTable");

//将table转换为DataSet

DataSet set = bbTableEnv.toDataSet(table, Row.class);

set.map(new MapFunction>() {

@Override

public Tuple2 map(Row row)

throws Exception {

int id = Integer.parseInt(row.getField(0).toString());

String name = row.getField(1).toString();

return new Tuple2(id, name);

}

}).print();

}

}

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: