目录

举个例子

写入Sink的各种情况

1. 将结果数据收集到客户端

2. 将结果数据转换为Pandas DataFrame,并收集到客户端

3. 将结果写入到一张 Sink 表中

4. 将结果写入多张 Sink 表中

举个例子

将计算结果写入给 sink 表

#将Table API结果表数据写入sink表:

result_table.execute_insert("print").wait()

# 或者通过SQL查询语句来写入sink表:

table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

写入Sink的各种情况

1. 将结果数据收集到客户端

你可以使用 TableResult.collect 将 Table 的结果收集到客户端,结果的类型为迭代器类型。

以下代码展示了如何使用 TableResult.collect() 方法:

#准备source表

source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])

#得到TableResult

res = table_env.execute_sql("select a + 1, b, c from %s" % source)

#遍历结果

with res.collect() as results:

   for result in results:

       print(result)

2. 将结果数据转换为Pandas DataFrame,并收集到客户端

3. 将结果写入到一张 Sink 表中

你可以调用 execute_insert 方法来将 Table 对象中的数据写入到一张 sink 表中:

table_env.execute_sql("""

    CREATE TABLE sink_table (

        id BIGINT,

        data VARCHAR

    ) WITH (

        'connector' = 'print'

    )

""")

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

table.execute_insert("sink_table").wait()

也可以通过 SQL 来完成

table_env.create_temporary_view("table_source", table)

table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

4. 将结果写入多张 Sink 表中

你也可以使用 Statementset 在一个作业中将 Table 中的数据写入到多张 sink 表中:

create_statement_set() 创建一个可接受 DML 语句或表的 Statementset 实例。 它可用于执行包含多个 sink 的作业。

# 准备 source 表和 sink 表

table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

table_env.create_temporary_view("simple_source", table)

table_env.execute_sql("""

    CREATE TABLE first_sink_table (

        id BIGINT,

        data VARCHAR

    ) WITH (

        'connector' = 'print'

    )

""")

table_env.execute_sql("""

    CREATE TABLE second_sink_table (

        id BIGINT,

        data VARCHAR

    ) WITH (

        'connector' = 'print'

    )

""")

# 创建 statement set

statement_set = table_env.create_statement_set()

# 将 "table" 的数据写入 "first_sink_table"

statement_set.add_insert("first_sink_table", table)

# 通过一条 sql 插入语句将数据从 "simple_source" 写入到 "second_sink_table"

statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")

# 执行 statement set

statement_set.execute().wait()

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: