1.基本环境

     1.17.0

2. 类文件

package com.flink.tablesql;

import org.apache.commons.io.FileUtils;

import org.apache.commons.lang3.StringUtils;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.io.File;

import java.util.List;

public class Main2 {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

String path = "E:\\test\\flinktestsql\\orders.sql";

// String path = "/data/flink/flinksql/orders.sql";

List list = FileUtils.readLines(new File(path),"UTF-8");

StringBuilder stringBuilder = new StringBuilder("");

String sql = "";

for(String var : list){

if(StringUtils.isNotBlank(var)){

stringBuilder.append(var);

if(var.contains("$")){

sql = stringBuilder.toString().replace("$","");

System.out.println(sql);

System.out.println("end-----");

tabEnv.executeSql(sql);

stringBuilder = new StringBuilder("");

}else{

stringBuilder.append("\n");

}

}

}

}

}

3.pom文件

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

flinktestsql

1.0-SNAPSHOT

aliyun

https://maven.aliyun.com/repository/public

apache

https://maven.aliyun.com/repository/apache-snapshots

cloudera

https://maven.aliyun.com/repository/gradle-plugin

central

https://maven.aliyun.com/repository/central

true

true

UTF-8

UTF-8

8

8

1.17.0

2.0.5

2.11

2.11.12

org.apache.flink

flink-java

${flink.version}

org.apache.flink

flink-streaming-java

${flink.version}

org.apache.flink

flink-clients

${flink.version}

org.apache.flink

flink-connector-files

${flink.version}

org.apache.flink

flink-table-api-java

1.17.0

org.apache.flink

flink-table-api-java-bridge

${flink.version}

org.apache.flink

flink-table-planner-loader

${flink.version}

org.apache.flink

flink-table-runtime

${flink.version}

com.microsoft.sqlserver

mssql-jdbc

11.2.1.jre8

org.apache.flink

flink-json

1.11.0

com.ververica

flink-connector-sqlserver-cdc

2.4.2

org.apache.flink

flink-connector-jdbc

3.1.1-1.17

org.apache.flink

flink-table-test-utils

1.17.1

test

org.slf4j

slf4j-api

${slf4j.version}

org.slf4j

slf4j-log4j12

${slf4j.version}

org.apache.logging.log4j

log4j-to-slf4j

2.19.0

4.后续只需要修改增加 .sql文件即可

       String path = "E:\\test\\flinktestsql\\orders.sql";

        // String path = "/data/flink/flinksql/orders.sql";

5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebf

at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)

at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)

6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以

7.以上遇到很多问题,总算解决了,

如增加:

aliyunmaven * 阿里云公共仓库 https://maven.aliyun.com/repository/public

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: