以MemSourceBatchOp组件为例,首先创建一个maven项目,然后在pom.xml文件中写入依赖信息
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
这里需要注意版本问题,首先查看自己的 flink 集群版本
[root@master data]# flink --version
2022-06-30 16:01:16,350 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2022-06-30 16:01:16,350 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Version: 1.11.2
我的flink版本是1.11.2,所以pom.xml里面的flink版本也是1.11.2,保持一致
代码如下
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
import org.apache.flink.types.Row;
import java.util.Arrays;
import java.util.List;
public class demo1 {
public static void main(String[] args) throws Exception {
List
Row.of("1:2.0 2:1.0 4:0.5", 1.5),
Row.of("1:2.0 2:1.0 4:0.5", 1.7),
Row.of("1:2.0 2:1.0 4:0.5", 3.6)
);
BatchOperator> data = new MemSourceBatchOp(df, "f1 string, f2 double");
data.print();
}
}
然后利用maven的package打包功能,生成一个 AlinkDemo1-1.0-jar-with-dependencies.jar
之后在github上找到Alink的页面 这里我的Alink版本是1.5.5,点进去找到对应版本,然后下载对应的文件。这里我下载的pyalink_flink_1.11-1.5.5-py3-none-any.whl,将后缀名改成.zip,之后解压。在解压后目录的 /pyalink/lib 目录下面可以找到两个Jar包 将这两个Jar包放到flink集群的lib目录下 然后修改flink-conf.yaml文件,添加 classloader.resolve-order: parent-first 然后启动flink集群
将之前打包好的Jar包,上传到服务器里面,然后在Jar包所在目录下执行命令
flink run -c demo1 AlinkDemo1-1.0-jar-with-dependencies.jar
执行结果如下
参考文章
大家都在找:
flink:flink菜鸟教程
大数据:大数据技术的就业前景和就业方向
Alink:阿林卡省
发表评论