1.基本环境
2. 类文件
package com.flink.tablesql;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.io.File;
import java.util.List;
public class Main2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
String path = "E:\\test\\flinktestsql\\orders.sql";
// String path = "/data/flink/flinksql/orders.sql";
List
StringBuilder stringBuilder = new StringBuilder("");
String sql = "";
for(String var : list){
if(StringUtils.isNotBlank(var)){
stringBuilder.append(var);
if(var.contains("$")){
sql = stringBuilder.toString().replace("$","");
System.out.println(sql);
System.out.println("end-----");
tabEnv.executeSql(sql);
stringBuilder = new StringBuilder("");
}else{
stringBuilder.append("\n");
}
}
}
}
}
3.pom文件
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.后续只需要修改增加 .sql文件即可
String path = "E:\\test\\flinktestsql\\orders.sql";
// String path = "/data/flink/flinksql/orders.sql";
5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebf
at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)
at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)
6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以
7.以上遇到很多问题,总算解决了,
如增加:
参考文章
发表评论