一、背景

公司需要将Doris数据库中的部分表数据同步至SFTP服务器,以供其他合作企业安全读取和使用。目前,平台数据同步功能统一使用Flink引擎进行实时同步、离线同步的工作。因此,希望能够充分利用现有的Flink引擎,并将其复用于这一需求。下图是我们的解决方案的结构示意图:

二、技术调研

由于我们选择使用Flink引擎来实现需求,我们需要进行调研以确定Flink是否支持SFTP和Doris的连接器。经过查阅当前的Flink版本(v1.20-SNAPSHOT)的文档,我们发现Flink Table API仅提供FileSystem SQL Connector用于操作文件。然而,在文档中并未提及该连接器是否支持SFTP协议,同时也没有提供指定SFTP的主机名、端口和秘钥文件的参数。因此,我们需要进一步验证是否可以使用该连接器来访问SFTP,并配置相关参数。以下是官方示例:

CREATE TABLE MyUserTable (

column_name1 INT,

column_name2 STRING,

...

part_name1 INT,

part_name2 STRING

) PARTITIONED BY (part_name1, part_name2) WITH (

'connector' = 'filesystem', -- required: specify the connector

'path' = 'file:///path/to/whatever', -- required: path to a directory

'format' = '...', -- required: file system connector requires to specify a format,

-- Please refer to Table Formats

-- section for more details

'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition

-- column value is null/empty string

'source.path.regex-pattern' = '...', -- optional: regex pattern to filter files to read under the

-- directory of `path` option. This regex pattern should be

-- matched with the absolute file path. If this option is set,

-- the connector will recursive all files under the directory

-- of `path` option

-- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly

-- reduce the number of file for filesystem sink but may lead data skew, the default value is false.

'sink.shuffle-by-partition.enable' = '...',

...

)

因此,我们需要进一步确认该连接器是否适用于SFTP。首先想到的是Flink自身的CheckPoint功能,它支持HDFS、S3等文件存储系统。底层实现是通过org.apache.flink.core.fs.FileSystem类来进行操作。而这个类同样是FileSystem SQL Connector的底层实现类。既然如此,Flink#FileSystem应该支持访问HDFS、S3等其他文件系统,那其内部必然会使用hadoop#FileSystem的api,而hadoop#FileSystem自身如果支持SFTP,则此路可以走通,为了确认这一点,开始查看源码并查看类注释中的相关信息,发现一段有用信息: /**

* Flink implements and supports some file system types directly (for example the default machine-local file system). Other file * system types are accessed by an implementation that bridges to the suite of file systems supported by Hadoop (such as for

* example HDFS).

*/

// 翻译:Flink直接实现并支持一些文件系统类型(例如默认的机器本地文件系统)。其他文件系统类型由桥接到Hadoop支持的文件系统套件(例如HDFS)的实现访问。

看到此处信心倍增,继续翻阅后发现在Flink#FileSystem有一个getUnguardedFileSystem函数,如下图:

该函数会检测文件url路径,如果路径是file://则会走Flink#FileSystem的内部实现,如果是hdfs://,sftp://这类前缀,则会调用loadHadoopFsFactory函数,如下图:

至此找到Flink#FileSystem 与 Hadoop#FileSystem 的桥接处,在该函数中会先加载hadoop#FileSystem并构建Flink#FileSystem的子类HadoopFileSystem,而在HadoopFileSystem类中使用hadoop#FileSystem提供的能力。那么hadoop#FileSystem是否提供了读写SFTP的能力呢?经过调研发现只有Hadoop-2.8.x版本以上才支持SFTP,JIRA工单

至此总结以下:虽然Flink#FileSystem原生并未支持sftp读写,但Flink#FileSystem中如果遇见不支持的文件前缀如: hdfs:// 或者 sftp:// ,则会桥接到Hadoop#FileSystem类中,而Hadoop#FileSystem底层是支持丰富的文件类型,其中的SFTPFileSystem实现类即可读写SFTP,逻辑图如下:

三、程序验证

有了理论支撑后开始程序验证,先在sftp文件系统上放置一个user.csv文件供FlinkSQL读取,文件内容如下: 1,jack

2,tom

3,lily

再在IDE中导入相关Flink、Hadoop相关依赖:

org.apache.flink

flink-clients

${flink.version}

org.apache.flink

flink-table-planner_2.12

${flink.version}

org.apache.flink

flink-connector-files

${flink.version}

org.apache.flink

flink-csv

${flink.version}

org.apache.hadoop

hadoop-common

${hadoop.version}

org.apache.hadoop

hadoop-client

${hadoop.version}

编写程序:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SftpExample {

public static void main(String[] args) {

// 设置执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 定义文件系统连接器

tableEnv.executeSql(

"CREATE TABLE SftpCsvTable (" +

" age INT," +

" name STRING" +

") WITH (" +

" 'connector'='filesystem'," +

" 'path'='sftp://101.230.65.134:21821/user.csv'," +

" 'format'='csv'" +

")"

);

// 执行 SQL 查询操作

tableEnv.executeSql("SELECT * FROM SftpCsvTable").print();

}

}

启动后报错如下: 没有格式为sftp的FileSystem类。

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'sftp'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:543)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)

at org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator.enumerateSplits(NonSplittingRecursiveEnumerator.java:82)

at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:141)

... 34 more

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop File System abstraction does not support scheme 'sftp'. Either no file system implementation exists for that scheme, or the relevant classes are missing from the classpath.

at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:100)

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:526)

... 38 more

Caused by: java.io.IOException: No FileSystem for scheme: sftp

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)

at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:98)

... 39 more

随后debug定位异常代码发现 :在hadoop程序中默认不开启sftp的实现类,若要使用则需要在core-site.xml配置文件中配置fs.sftp.impl ,如下图:

此外,光配置fs.sftp.impl还不够,生产中访问sftp一般都是指定用户名密码或者秘钥,而这些参数的配置项在SFTPFileSystem中有变量可以参考,如下: 最终我们将开发环境中hadoop#core-site.xml配置文件放置在IDE#Resource目录下,并根据SFTP相关信息配置好对应K,V,内容如下:

fs.sftp.impl

org.apache.hadoop.fs.sftp.SFTPFileSystem

fs.sftp.user.101.230.65.134

username

fs.sftp.keyfile

D:\IdeaProjects\flink-sftp\src\main\resources\uat_sftp_qadmin.rsa

fs.sftp.password.101.230.65.134

password

执行成功,如下图:

至此我们可以使用FlinkSQL#FileSystem已经写好的各种文件格式类型以及分区功能,还可以享受Hadoop#SFTPFileSystem读写sftp的能力,可以说完美的解决此需求。

四、总结

通过这次详细排查和研究,对Flink文件系统的实现有了更加深入的理解。起初,我对FlinkSQL是否支持sftp产生了疑问,然而,通过逐步追踪源代码,逐渐揭示了底层逻辑的实现机制:尽管原生的Flink#FileSystem并没有直接支持SFTP的读写操作,但它通过一个巧妙的桥接机制,将不支持的文件前缀(例如hdfs://或者sftp://)重新定向到Hadoop#FileSystem类来处理。而Hadoop#FileSystem底层提供了对多种文件类型的广泛支持,只要存在SFTP的实现类,就可以顺利进行操作。这个过程中,深刻体会到了理解底层原理和追踪代码的重要性。

这次经历让我明白,在后续的开发过程中,我们应该保持持续的好奇心,提出更多问题,积极思考,并深入探索底层实现原理,时刻保持探索精神,不断拓展我们的知识和技能,以便在开发过程中能够更加游刃有余地应对各种挑战。

五、相关资料

Hadoop#JIRA工单FileSystem SQL Connectororg.apache.flink.core.fs.FileSystemgetUnguardedFileSystemloadHadoopFsFactoryHadoopFileSystem

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: