一、flink介绍

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。Flink最适合的应用场景是低时延的数据处理(Data Processing)场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

二、环境搭建

安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。它被广泛用于测试网络中的端口,发送文件等操作。使用 Netcat 可以轻松地进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作。因为其功能强大而又简单易用,所以在计算机安全领域也有着广泛的应用。

安装nc命令

yum install -y nc

启动socket端口

[root@node01 bin]# nc -lk 8888

三、代码工程

实验目的:无界流之读取socket文本流 

pom.xml

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

springboot-demo

com.et

1.0-SNAPSHOT

4.0.0

flink

8

8

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-autoconfigure

org.springframework.boot

spring-boot-starter-test

test

org.apache.flink

flink-streaming-java

1.17.0

org.apache.flink

flink-java

1.17.0

org.apache.flink

flink-clients

1.17.0

org.apache.flink

flink-connector-base

1.17.0

org.apache.flink

flink-connector-files

1.17.0

org.apache.flink

flink-connector-kafka

1.17.0

org.apache.flink

flink-runtime-web

1.17.0

org.apache.maven.plugins

maven-shade-plugin

package

shade

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

META-INF/spring.handlers

implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">

META-INF/spring.factories

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

META-INF/spring.schemas

implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />

implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

com.et.flink.job.SocketJob

SoketJob

package com.et.flink.job;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

/**

* @author liuhaihua

* @version 1.0

* @ClassName SocketJob

* @Description todo

* @date 2024年02月29日 17:06

*/

public class SocketJob {

public static void main(String[] args) throws Exception {

// 创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 指定并行度,默认电脑线程数

env.setParallelism(3);

// 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务

DataStreamSource socketDS = env.socketTextStream("172.24.4.193", 8888);

// 处理数据: 切换、转换、分组、聚合 得到统计结果

SingleOutputStreamOperator> sum = socketDS

.flatMap(

(String value, Collector> out) -> {

String[] words = value.split(" ");

for (String word : words) {

out.collect(Tuple2.of(word, 1));

}

}

)

.setParallelism(2)

// // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式设置系统当前返回类型,才能正确解析出完整数据

.returns(new TypeHint>() {

})

// .returns(Types.TUPLE(Types.STRING,Types.INT))

.keyBy(value -> value.f0)

.sum(1);

// 输出

sum.print();

// 执行

env.execute();

}

}

代码仓库

https://github.com/Harries/springboot-demo

四、测试

启动socket流

[root@cmn-zentao-002 ~]# nc -l 8888

本地执行

本地直接ideal启动main程序,在socket流中输入

abc bcd cde

bcd cde fgh

cde fgh hij

console日志显示

3> (abc,1)

1> (fgh,1)

3> (bcd,1)

3> (cde,1)

3> (bcd,2)

3> (cde,2)

3> (cde,3)

1> (fgh,2)

2> (hij,1)

集群执行

执行maven打包,将打包的jar上传到集群中在socker中输入字符,结果和本地一样

五、引用

https://juejin.cn/post/7283311024979066937http://www.liuhaihua.cn/archives/710270.html

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: