一、flink介绍
Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。
二、环境搭建
安装flink
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/
安装Netcat
Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口,发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用,所以在计算机安全领域也有着广泛的应用。
安装nc命令
yum install -y nc
启动socket端口
[root@node01 bin]# nc -lk 8888
三、代码工程
实验目的:无界流之读取socket文本流
pom.xml
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
SoketJob
package com.et.flink.job;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author liuhaihua
* @version 1.0
* @ClassName SocketJob
* @Description todo
* @date 2024年02月29日 17:06
*/
public class SocketJob {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定并行度,默认电脑线程数
env.setParallelism(3);
// 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
DataStreamSource
// 处理数据: 切换、转换、分组、聚合 得到统计结果
SingleOutputStreamOperator
.flatMap(
(String value, Collector
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
)
.setParallelism(2)
// // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2
.returns(new TypeHint
})
// .returns(Types.TUPLE(Types.STRING,Types.INT))
.keyBy(value -> value.f0)
.sum(1);
// 输出
sum.print();
// 执行
env.execute();
}
}
代码仓库
https://github.com/Harries/springboot-demo
四、测试
启动socket流
[root@cmn-zentao-002 ~]# nc -l 8888
本地执行
本地直接ideal启动main程序,在socket流中输入
abc bcd cde
bcd cde fgh
cde fgh hij
console日志显示
3> (abc,1)
1> (fgh,1)
3> (bcd,1)
3> (cde,1)
3> (bcd,2)
3> (cde,2)
3> (cde,3)
1> (fgh,2)
2> (hij,1)
集群执行
执行maven打包,将打包的jar上传到集群中在socker中输入字符,结果和本地一样
五、引用
https://juejin.cn/post/7283311024979066937http://www.liuhaihua.cn/archives/710270.html
推荐文章
发表评论