以MemSourceBatchOp组件为例,首先创建一个maven项目,然后在pom.xml文件中写入依赖信息

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.zhang

AlinkDemo1

1.0

1.8

1.11.2

2.11

1.11.2

2.12.1

com.alibaba.alink

alink_core_flink-1.13_2.11

1.5.5

org.apache.flink

flink-streaming-scala_2.11

${flink.version}

org.apache.flink

flink-table-planner_2.11

${flink.version}

org.apache.flink

flink-clients_2.11

${flink.version}

org.apache.flink

flink-java

${flink.version}

org.apache.flink

flink-streaming-java_2.11

${flink.version}

org.apache.flink

flink-table-common

${flink.version}

org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}

${flink.version}

org.apache.flink

flink-table-planner_${scala.binary.version}

${flink.version}

org.apache.flink

flink-table-planner-blink_${scala.binary.version}

${flink.version}

org.apache.flink

flink-connector-kafka_${scala.binary.version}

${flink.version}

org.apache.flink

flink-connector-jdbc_${scala.binary.version}

${flink.version}

org.apache.flink

flink-cep_${scala.binary.version}

${flink.version}

org.apache.flink

flink-csv

${flink.version}

org.apache.flink

flink-json

${flink.version}

com.alibaba.alink

alink_core_flink-1.11_2.11

1.5.5

com.alibaba

fastjson

1.2.75

org.apache.hadoop

hadoop-client

2.7.6

mysql

mysql-connector-java

5.1.49

org.apache.flink

flink-connector-hive_2.11

${flink.version}

org.apache.hive

hive-exec

1.2.1

org.apache.logging.log4j

log4j-slf4j-impl

${log4j.version}

org.apache.logging.log4j

log4j-api

${log4j.version}

org.apache.logging.log4j

log4j-core

${log4j.version}

org.apache.maven.plugins

maven-compiler-plugin

3.1

1.8

1.8

org.scala-tools

maven-scala-plugin

2.15.2

compile

testCompile

org.apache.maven.plugins

maven-assembly-plugin

2.4

jar-with-dependencies

make-assembly

package

single

这里需要注意版本问题,首先查看自己的 flink 集群版本

[root@master data]# flink --version

2022-06-30 16:01:16,350 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.

2022-06-30 16:01:16,350 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.

Version: 1.11.2

我的flink版本是1.11.2,所以pom.xml里面的flink版本也是1.11.2,保持一致

代码如下

import com.alibaba.alink.operator.batch.BatchOperator;

import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;

import org.apache.flink.types.Row;

import java.util.Arrays;

import java.util.List;

public class demo1 {

public static void main(String[] args) throws Exception {

List df = Arrays.asList(

Row.of("1:2.0 2:1.0 4:0.5", 1.5),

Row.of("1:2.0 2:1.0 4:0.5", 1.7),

Row.of("1:2.0 2:1.0 4:0.5", 3.6)

);

BatchOperator data = new MemSourceBatchOp(df, "f1 string, f2 double");

data.print();

}

}

然后利用maven的package打包功能,生成一个 AlinkDemo1-1.0-jar-with-dependencies.jar

之后在github上找到Alink的页面 这里我的Alink版本是1.5.5,点进去找到对应版本,然后下载对应的文件。这里我下载的pyalink_flink_1.11-1.5.5-py3-none-any.whl,将后缀名改成.zip,之后解压。在解压后目录的 /pyalink/lib 目录下面可以找到两个Jar包 将这两个Jar包放到flink集群的lib目录下 然后修改flink-conf.yaml文件,添加 classloader.resolve-order: parent-first 然后启动flink集群

将之前打包好的Jar包,上传到服务器里面,然后在Jar包所在目录下执行命令

flink run -c demo1 AlinkDemo1-1.0-jar-with-dependencies.jar

执行结果如下

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: