flink1.12.0学习笔记第 2 篇-流批一体API

flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇-高级API flink1.12.0学习笔记第4篇-Table与SQL flink1.12.0学习笔记第5篇-业务案例实践 flink1.12.0学习笔记第6篇-高级特性与新特性 flink1.12.0学习笔记第7篇-监控与优化

2-1.流处理相关概念

数据时效性

​ 日常工作中,我们一般会先把数据存储在表,然后对表的数据进行加工、分析。既然先存储在表中,那就会涉及到时效性概念。

​ 如果我们处理以年,月为单位的级别的数据处理,进行统计分析,个性化推荐,那么数据的的最新日期离当前有几个甚至上月都没有问题。但是如果我们处理的是以天为级别,或者一小时甚至更小粒度的数据处理,那么就要求数据的时效性更高了。比如:对网站的实时监控、对异常日志的监控,这些场景需要工作人员立即响应,这样的场景下,传统的统一收集数据,再存到数据库中,再取出来进行分析就无法满足高时效性的需求了。

流处理和批处理

Batch Analytics:批量计算,统一收集数据->存储到DB->对数据进行批量处理,就是传统意义上使用类似于 Map Reduce、Hive、Spark Batch 等,对作业进行分析、处理、生成离线报表。Streaming Analytics:流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如实时大屏、实时报表。

流计算与批计算对比

数据时效性不同

流式计算实时、低延迟,批量计算非实时、高延迟 数据特征不同

流式计算的数据一般是动态的、没有边界的,而批处理的数据一般则是静态数据 应用场景不同

流式计算应用再实时场景,时效性要求比较高的场景,如实时推荐、业务监控,而批处理应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。 运行方式不同

流式计算的任务持续进行的,批量计算的任务则一次性完成

流批一体API

DataStream API 支持批执行模式 Flink 的核心 API 最初是针对特定的场景设计的,尽管 Table API / SQL 针对流处理和批处理已经实现了统一的 API,但当用户使用较底层的 API 时,仍然需要在批处理(DataSet API)和流处理(DataStream API)这两种不同的 API 之间进行选择。鉴于批处理是流处理的一种特例,将这两种 API 合并成统一的 API,有一些非常明显的好处

可复用性:作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。维护简单:统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。

考虑到这些优点,社区已朝着流批统一的 DataStream API 迈出了第一步:支持高效的批处理(FLIP-134)。从长远来看,这意味着 DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。

FLIP-134: DataStream API 的批处理执行

容许在 KeyedStream.intervalJoin() 的配置时间属性,在 Flink 1.12 以前 KeyedStream.intervalJoin() 算子的时间属性依赖于全局设置的时间属性。在 Flink 1.12 中咱们能够在 IntervalJoin 方法后加上 inProcessingTime() 或 inEventTime() ,这样 Join 就再也不依赖于全局的时间属性。在 Flink 1.12 中将 DataStream API 的 timeWindow() 方法标记为过时,请使用 window(WindowAssigner)、TumblingEventTimeWindows、 SlidingEventTimeWindows、TumblingProcessingTimeWindows 或者 SlidingProcessingTimeWindows。将 StreamExecutionEnvironment.setStreamTimeCharacteristic() 和 TimeCharacteristic 方法标记为过时。在 Flink 1.12 中,默认的时间属性改变成 EventTime 了,因而你再也不须要该方法去开启 EventTime 了。在 EventTime 时间属性下,你使用 processing-time 的 windows 和 timers 也都依旧会生效。若是你想禁用水印,请使用 ExecutionConfig.setAutoWatermarkInterval(long) 方法。若是你想使用 IngestionTime,请手动设置适当的 WatermarkStrategy。若是你使用的是基于时间属性更改行为的通用 ‘time window’ 算子(eg: KeyedStream.timeWindow()),请使用等效操做明确的指定处理时间和事件时间。容许在 CEP PatternStream 上显式配置时间属性在 Flink 1.12 以前,CEP 算子里面的时间依赖于全局配置的时间属性,在 1.12 以后能够在 PatternStream 上使用 inProcessingTime() 或 inEventTime() 方法。

API Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度也越大

编程模型 Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示:

2-2.Source

基于集合的Source(Collection-based)

env.fromElements(可变参数);env.fromColletion(各种集合);env.generateSequence(开始,结束);env.fromSequence(开始,结束);

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class SourceDemo01 {

public static void main(String[] args) throws Exception {

// env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// source

DataStreamSource ds1 = env.fromElements("wang", "ting", "flink", "spark");

DataStreamSource ds2 = env.fromCollection(Arrays.asList("wang", "ting", "flink", "spark"));

DataStreamSource ds3 = env.generateSequence(1, 10);

DataStreamSource ds4 = env.fromSequence(1, 10);

ds1.print();

ds2.print();

ds3.print();

ds4.print();

// transformation

// sink

// execute

env.execute();

}

}

基于文件的Source(File-based)

env.readTextFile (本地/HDFS文件/文件夹);//压缩文件也可以

在项目根目录创建data目录,data目录下创建input和output目录

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceDemo02 {

public static void main(String[] args) throws Exception {

//env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//source

DataStreamSource ds1 = env.readTextFile("data/input/wlk.txt");

DataStreamSource ds2 = env.readTextFile("data/input/20220924");

DataStreamSource ds3 = env.readTextFile("data/input/words.tar.gz");

//transformation

//sink

ds1.print();

ds2.print();

ds3.print();

//execute

env.execute();

}

}

基于Socket的Source(Socket-based)

[root@ops01 ~]# yum install -y nc

[root@ops01 ~]# nc -lk 6666

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class SourceDemo03 {

public static void main(String[] args) throws Exception {

// env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// source

DataStream linesDS = env.socketTextStream("ops01", 6666);

// transformation

DataStream wordsDS = linesDS.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String value, Collector out) throws Exception {

String[] words = value.split(" ");

for (String word : words) {

out.collect(word);

}

}

});

DataStream> wordAndOnesDS = wordsDS.map(new MapFunction>() {

@Override

public Tuple2 map(String value) throws Exception {

return Tuple2.of(value, 1);

}

});

KeyedStream, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);

DataStream> result = groupedDS.sum(1);

// sink

result.print();

// execute

env.execute();

}

}

验证:

# 服务器命令行输出:

[root@ops01 lib]# nc -lk 6666

wangting 666

today 20220924

wlk is comming

# ide控制台输出:

2> (wangting,1)

3> (666,1)

4> (today,1)

4> (20220924,1)

1> (comming,1)

4> (wlk,1)

4> (is,1)

Process finished with exit code -1

自定义Source(Custom)

实际开发中,经常会实时接收一些数据,要和MySQL中存储的一些规则进行匹配,那么这时候就可以使用Flink自定义数据源从MySQL中读取数据

从MySQL中实时加载数据并且MySQL中的数据有变化,也能被实时加载出来

MariaDB [(none)]> CREATE DATABASE `wow`;

Query OK, 1 row affected (0.000 sec)

MariaDB [(none)]> use wow;

Database changed

MariaDB [wow]> CREATE TABLE `wow_info` (

-> `id` int(11) NOT NULL AUTO_INCREMENT,

-> `role` varchar(255) DEFAULT NULL,

-> `pinyin` varchar(255) DEFAULT NULL,

-> PRIMARY KEY (`id`)

-> ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

Query OK, 0 rows affected (0.014 sec)

MariaDB [wow]> INSERT INTO `wow_info` VALUES ('1', 'fs', 'fashi');

Query OK, 1 row affected (0.002 sec)

MariaDB [wow]> INSERT INTO `wow_info` VALUES ('2', 'ms', 'mushi');

Query OK, 1 row affected (0.002 sec)

MariaDB [wow]> INSERT INTO `wow_info` VALUES ('3', 'ss', 'shushi');

Query OK, 1 row affected (0.005 sec)

MariaDB [wow]> INSERT INTO `wow_info` VALUES ('4', 'dz', 'daozei');

Query OK, 1 row affected (0.004 sec)

MariaDB [wow]> INSERT INTO `wow_info` VALUES ('5', 'ws', 'wuseng');

Query OK, 1 row affected (0.002 sec)

MariaDB [wow]> INSERT INTO `wow_info` VALUES ('6', 'xd', 'xiaode');

Query OK, 1 row affected (0.002 sec)

MariaDB [wow]>

package cn.wangting;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

public class SourceDemo04 {

public static void main(String[] args) throws Exception {

//TODO 0.env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//TODO 1.source

DataStream wow_infoDS = env.addSource(new MySQLSource()).setParallelism(1);

//TODO 2.transformation

//TODO 3.sink

wow_infoDS.print();

//TODO 4.execute

env.execute();

}

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class wow_info {

private Integer id;

private String role;

private String pinyin;

}

public static class MySQLSource extends RichParallelSourceFunction {

private boolean flag = true;

private Connection conn = null;

private PreparedStatement ps =null;

private ResultSet rs = null;

//open只执行一次,适合开启资源

@Override

public void open(Configuration parameters) throws Exception {

conn = DriverManager.getConnection("jdbc:mysql://ops01:3306/wow", "root", "123456");

String sql = "select id,role,pinyin from wow_info";

ps = conn.prepareStatement(sql);

}

@Override

public void run(SourceContext ctx) throws Exception {

while (flag) {

rs = ps.executeQuery();

while (rs.next()) {

int id = rs.getInt("id");

String role = rs.getString("role");

String pinyin = rs.getString("pinyin");

ctx.collect(new wow_info(id,role,pinyin));

}

Thread.sleep(5000);

}

}

@Override

public void cancel() {

flag = false;

}

@Override

public void close() throws Exception {

if(conn != null) conn.close();

if(ps != null) ps.close();

if(rs != null) rs.close();

}

}

}

执行后控制台输出:

1> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)

2> SourceDemo04.wow_info(id=4, role=dz, pinyin=daozei)

3> SourceDemo04.wow_info(id=1, role=fs, pinyin=fashi)

3> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)

4> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)

4> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)

2> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)

2> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)

4> SourceDemo04.wow_info(id=4, role=dz, pinyin=daozei)

3> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)

1> SourceDemo04.wow_info(id=1, role=fs, pinyin=fashi)

1> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)

在MySQL中插入新数据验证ide控制台是否实时查到新数据输出:

INSERT INTO `wow_info` VALUES ('7', 'sq', 'shengqi');

INSERT INTO `wow_info` VALUES ('8', 'zs', 'zhanshi');

INSERT INTO `wow_info` VALUES ('9', 'dk', 'siwangqishi');

插入新数据后控制台输出:

1> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)

1> SourceDemo04.wow_info(id=9, role=dk, pinyin=siwangqishi)

3> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)

3> SourceDemo04.wow_info(id=7, role=sq, pinyin=shengqi)

2> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)

2> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)

1> SourceDemo04.wow_info(id=4, role=dz, pinyin=daozei)

1> SourceDemo04.wow_info(id=8, role=zs, pinyin=zhanshi)

2> SourceDemo04.wow_info(id=1, role=fs, pinyin=fashi)

2> SourceDemo04.wow_info(id=5, role=ws, pinyin=wuseng)

2> SourceDemo04.wow_info(id=9, role=dk, pinyin=siwangqishi)

4> SourceDemo04.wow_info(id=3, role=ss, pinyin=shushi)

4> SourceDemo04.wow_info(id=7, role=sq, pinyin=shengqi)

3> SourceDemo04.wow_info(id=2, role=ms, pinyin=mushi)

3> SourceDemo04.wow_info(id=6, role=xd, pinyin=xiaode)

2-3.Transformation

map

将函数作用在集合中的每一个元素上,并返回作用后的结果

flatMap

将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

keyby

按照指定的key来对流中的数据进行分组

注意:

流处理中没有groupBy,而是keyBy

filter

按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

sum

按照指定的字段对集合中的元素进行求和

reduce

对集合中的元素进行聚合

示例:统计信息传输过来的单词统计,并过滤掉"TMD"敏感词

[root@ops01 lib]# nc -lk 6666

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class TransformationDemo01 {

public static void main(String[] args) throws Exception {

// env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// source

DataStream lines = env.socketTextStream("ops01", 6666);

// transformation

DataStream words = lines.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String value, Collector out) throws Exception {

String[] arr = value.split(" ");

for (String word : arr) {

out.collect(word);

}

}

});

DataStream filted = words.filter(new FilterFunction() {

@Override

public boolean filter(String value) throws Exception {

return !value.equals("TMD");

}

});

SingleOutputStreamOperator> wordAndOne = filted.map(new MapFunction>() {

@Override

public Tuple2 map(String value) throws Exception {

return Tuple2.of(value, 1);

}

});

KeyedStream, String> grouped = wordAndOne.keyBy(t -> t.f0);

SingleOutputStreamOperator> result = grouped.reduce(new ReduceFunction>() {

@Override

public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {

return Tuple2.of(value1.f0, value1.f1 + value2.f1); //_+_

}

});

// sink

result.print();

// execute

env.execute();

}

}

服务器6666端口发送信息:

ni hao a laotie

zou yiqi dafuben!

TMD buqu

TMD ni bie lao TMD

查看代码控制台输出信息:

4> (ni,1)

3> (hao,1)

3> (a,1)

3> (laotie,1)

3> (dafuben!,1)

4> (zou,1)

4> (yiqi,1)

1> (buqu,1)

3> (bie,1)

4> (ni,2)

4> (lao,1)

Process finished with exit code -1

合并与拆分

union

union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。

connect

connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

connect只能连接两个数据流,union可以连接多个数据流。 connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。

两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

代码示例:

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class TransformationDemo02 {

public static void main(String[] args) throws Exception {

// env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// source

DataStream ds1 = env.fromElements("hadoop", "spark", "flink");

DataStream ds2 = env.fromElements("hadoop", "spark", "flink");

DataStream ds3 = env.fromElements(1L, 2L, 3L);

// transformation

DataStream result1 = ds1.union(ds2);

ConnectedStreams result2 = ds1.connect(ds2);

ConnectedStreams result3 = ds1.connect(ds3);

SingleOutputStreamOperator result = result3.map(new CoMapFunction() {

@Override

public String map1(String value) throws Exception {

return "String:" + value;

}

@Override

public String map2(Long value) throws Exception {

return "Long:" + value;

}

});

// sink

result1.print();

result.print();

// execute

env.execute();

}

}

控制台输出:

1> hadoop

4> String:flink

1> Long:2

4> Long:1

3> flink

3> hadoop

3> String:spark

2> spark

2> String:hadoop

2> Long:3

4> spark

1> flink

Process finished with exit code 0

split、select、Side Outputs

Split就是将一个流分成多个流Select就是获取分流后对应的数据Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

代码示例:

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.util.Collector;

import org.apache.flink.util.OutputTag;

public class TransformationDemo03 {

public static void main(String[] args) throws Exception {

// env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// source

DataStreamSource ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// transformation

OutputTag oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));

OutputTag evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class));

SingleOutputStreamOperator result = ds.process(new ProcessFunction() {

@Override

public void processElement(Integer value, Context ctx, Collector out) throws Exception {

if (value % 2 == 0) {

ctx.output(evenTag, value);

} else {

ctx.output(oddTag, value);

}

}

});

DataStream oddResult = result.getSideOutput(oddTag);

DataStream evenResult = result.getSideOutput(evenTag);

// sink

System.out.println(oddTag);//OutputTag(Integer, 奇数)

System.out.println(evenTag);//OutputTag(Integer, 偶数)

oddResult.print("奇数:");

evenResult.print("偶数:");

// execute

env.execute();

}

}

控制台输出:

OutputTag(Integer, 奇数)

OutputTag(Integer, 偶数)

偶数::1> 2

偶数::1> 6

偶数::1> 10

奇数::2> 3

奇数::2> 7

偶数::3> 4

偶数::3> 8

奇数::4> 1

奇数::4> 5

奇数::4> 9

Process finished with exit code 0

分区

rebalance重平衡分区

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成

所以在实际的工作中,出现这种情况比较好的解决方案就是rebalance(内部使用round robin方法将数据均匀打散)

代码示例:

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.RichMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransformationDemo04 {

public static void main(String[] args) throws Exception {

// env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

// source

DataStream longDS = env.fromSequence(0, 100);

DataStream filterDS = longDS.filter(new FilterFunction() {

@Override

public boolean filter(Long num) throws Exception {

return num > 10;

}

});

// transformation

SingleOutputStreamOperator> result1 = filterDS

.map(new RichMapFunction>() {

@Override

public Tuple2 map(Long value) throws Exception {

int subTaskId = getRuntimeContext().getIndexOfThisSubtask();

return Tuple2.of(subTaskId, 1);

}

}).keyBy(t -> t.f0).sum(1);

SingleOutputStreamOperator> result2 = filterDS.rebalance()

.map(new RichMapFunction>() {

@Override

public Tuple2 map(Long value) throws Exception {

int subTaskId = getRuntimeContext().getIndexOfThisSubtask();

return Tuple2.of(subTaskId, 1);

}

}).keyBy(t -> t.f0).sum(1);

// sink

result1.print("result1");

result2.print("result2");

// execute

env.execute();

}

}

控制台输出:

result1:3> (0,25)

result1:3> (1,15)

result1:4> (2,25)

result1:4> (3,25)

result2:4> (2,23)

result2:4> (3,23)

result2:3> (0,21)

result2:3> (1,23)

Process finished with exit code 0

其他分区

说明:

recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。

举例:

上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

2-4.Sink

预定义-基于控制台和文件

ds.print 直接输出到控制台ds.printToErr() 直接输出到控制台,用红色ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)

在输出到path的时候,可以在前面设置并行度,如果

并行度>1,则path为目录

并行度=1,则path为文件名

代码示例:

wlk.txt

wlk is comming!

date is 20220927

tbc is goodbye!

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinkDemo01 {

public static void main(String[] args) throws Exception {

//TODO env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//TODO source

DataStreamSource ds = env.readTextFile("data/input/wlk.txt");

//TODO transformation

//TODO sink

ds.print();

ds.print("输出标识");

ds.printToErr();// 红色输出

ds.printToErr("输出标识");

ds.writeAsText("data/output/wlkoutput1").setParallelism(1);

ds.writeAsText("data/output/wlkoutput2").setParallelism(2);

//TODO execute

env.execute();

}

}

wlkoutput1

date is 20220927

wlk is comming!

tbc is goodbye!

1

tbc is goodbye!

wlk is comming!

2

date is 20220927

自定义-MySQL

将Flink集合中的数据通过自定义Sink保存到MySQL

MariaDB [(none)]> use wow;

Database changed

MariaDB [wow]> show tables;

+---------------+

| Tables_in_wow |

+---------------+

| wow_info |

+---------------+

1 row in set (0.000 sec)

MariaDB [wow]> select * from wow_info;

+----+------+-------------+

| id | role | pinyin |

+----+------+-------------+

| 1 | fs | fashi |

| 2 | ms | mushi |

| 3 | ss | shushi |

| 4 | dz | daozei |

| 5 | ws | wuseng |

| 6 | xd | xiaode |

| 7 | sq | shengqi |

| 8 | zs | zhanshi |

| 9 | dk | siwangqishi |

+----+------+-------------+

9 rows in set (0.000 sec)

package cn.wangting;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

public class SinkDemo02 {

public static void main(String[] args) throws Exception {

//TODO 0.env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//TODO 1.source

DataStream wow_infoDS = env.fromElements(new wow_info(null, "dh", "emolieshou"));

//TODO 2.transformation

//TODO 3.sink

wow_infoDS.addSink(new MySQLSink());

//TODO 4.execute

env.execute();

}

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class wow_info {

private Integer id;

private String role;

private String pinyin;

}

public static class MySQLSink extends RichSinkFunction {

private Connection conn = null;

private PreparedStatement ps =null;

@Override

public void open(Configuration parameters) throws Exception {

conn = DriverManager.getConnection("jdbc:mysql://ops01:3306/wow", "root", "123456");

String sql = "INSERT INTO `wow_info` (`id`, `role`, `pinyin`) VALUES (null, ?, ?);";

ps = conn.prepareStatement(sql);

}

@Override

public void invoke(wow_info value, Context context) throws Exception {

//设置?占位符参数值

ps.setString(1,value.role);

ps.setString(2,value.pinyin);

//执行sql

ps.executeUpdate();

}

@Override

public void close() throws Exception {

if(conn != null) conn.close();

if(ps != null) ps.close();

}

}

}

执行后效果:

MariaDB [wow]> select * from wow_info;

+----+------+-------------+

| id | role | pinyin |

+----+------+-------------+

| 1 | fs | fashi |

| 2 | ms | mushi |

| 3 | ss | shushi |

| 4 | dz | daozei |

| 5 | ws | wuseng |

| 6 | xd | xiaode |

| 7 | sq | shengqi |

| 8 | zs | zhanshi |

| 9 | dk | siwangqishi |

| 10 | dh | emolieshou |

+----+------+-------------+

10 rows in set (0.000 sec)

2-5.Connectors

JDBC-connectors

MariaDB [wow]> create database bigdata;

MariaDB [wow]>

CREATE TABLE `t_student` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`name` varchar(255) DEFAULT NULL,

`age` int(11) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

INSERT INTO `t_student` VALUES ('1', 'jack', '18');

INSERT INTO `t_student` VALUES ('2', 'tom', '19');

INSERT INTO `t_student` VALUES ('3', 'rose', '20');

INSERT INTO `t_student` VALUES ('4', 'tom', '19');

INSERT INTO `t_student` VALUES ('5', 'jack', '18');

INSERT INTO `t_student` VALUES ('6', 'rose', '20');

MariaDB [bigdata]> select * from t_student;

+----+----------+------+

| id | name | age |

+----+----------+------+

| 1 | jack | 18 |

| 2 | tom | 19 |

| 3 | rose | 20 |

| 4 | tom | 19 |

| 5 | jack | 18 |

| 6 | rose | 20 |

+----+----------+------+

6 rows in set (0.000 sec)

代码示例:

package cn.wangting;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;

import org.apache.flink.connector.jdbc.JdbcSink;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class JDBCDemo {

public static void main(String[] args) throws Exception {

//TODO 0.env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//TODO 1.source

DataStream studentDS = env.fromElements(new Student(null, "wangting", 666));

//TODO 2.transformation

//TODO 3.sink

studentDS.addSink(JdbcSink.sink(

"INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",

(ps, value) -> {

ps.setString(1, value.getName());

ps.setInt(2, value.getAge());

}, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://ops01:3306/bigdata")

.withUsername("root")

.withPassword("123456")

.withDriverName("com.mysql.jdbc.Driver")

.build()));

//TODO 4.execute

env.execute();

}

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class Student {

private Integer id;

private String name;

private Integer age;

}

}

MariaDB [bigdata]> select * from t_student;

+----+----------+------+

| id | name | age |

+----+----------+------+

| 1 | jack | 18 |

| 2 | tom | 19 |

| 3 | rose | 20 |

| 4 | tom | 19 |

| 5 | jack | 18 |

| 6 | rose | 20 |

| 7 | wangting | 666 |

+----+----------+------+

7 rows in set (0.000 sec)

Kafka

​ Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写 kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。

以下参数都必须/建议设置上

1.订阅的主题

2.反序列化规则

3.消费者属性-集群地址

4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)

5.消费者属性-offset重置规则,如earliest/latest…

6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)

7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中

kafka consumer代码示例

package cn.wangting;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.util.Collector;

import java.util.Properties;

public class ConnectorsDemo_KafkaConsumer {

public static void main(String[] args) throws Exception {

//1.env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.Source

Properties props = new Properties();

props.setProperty("bootstrap.servers", "ops01:9092");

props.setProperty("group.id", "flink");

props.setProperty("auto.offset.reset","latest");

props.setProperty("flink.partition-discovery.interval-millis","5000");

props.setProperty("enable.auto.commit", "true");

props.setProperty("auto.commit.interval.ms", "2000");

FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);

kafkaSource.setStartFromGroupOffsets();

DataStreamSource kafkaDS = env.addSource(kafkaSource);

SingleOutputStreamOperator> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String value, Collector> out) throws Exception {

String[] words = value.split(" ");

for (String word : words) {

out.collect(Tuple2.of(word, 1));

}

}

});

KeyedStream, Tuple> groupedDS = wordAndOneDS.keyBy(0);

SingleOutputStreamOperator> result = groupedDS.sum(1);

//4.Sink

result.print();

//5.execute

env.execute();

}

}

kafka producer代码示例

package cn.wangting;

import com.alibaba.fastjson.JSON;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

public class ConnectorsDemo_KafkaProducer {

public static void main(String[] args) throws Exception {

//1.env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//2.Source

DataStreamSource studentDS = env.fromElements(new Student(1, "tonyma", 18));

SingleOutputStreamOperator jsonDS = studentDS.map(new MapFunction() {

@Override

public String map(Student value) throws Exception {

String jsonStr = JSON.toJSONString(value);

return jsonStr;

}

});

//4.Sink

jsonDS.print();

Properties props = new Properties();

props.setProperty("bootstrap.servers", "ops01:9092");

FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);

jsonDS.addSink(kafkaSink);

//5.execute

env.execute();

}

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class Student {

private Integer id;

private String name;

private Integer age;

}

}

Redis

RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个接口中的三个方法,如下所示

getCommandDescription() :

设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型

String getKeyFromData(T data):

设置value 中的键值对key的值

String getValueFromData(T data);

设置value 中的键值对value的值

将Flink集合中的数据通过自定义Sink保存到Redis

package cn.wangting;

import org.apache.flink.api.common.RuntimeExecutionMode;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.redis.RedisSink;

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

import org.apache.flink.util.Collector;

public class RedisDemo {

public static void main(String[] args) throws Exception {

//TODO 0.env

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

//TODO 1.source

DataStream lines = env.socketTextStream("ops01", 6666);

//TODO 2.transformation

SingleOutputStreamOperator> result = lines.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String value, Collector> out) throws Exception {

String[] arr = value.split(" ");

for (String word : arr) {

out.collect(Tuple2.of(word, 1));

}

}

}).keyBy(t -> t.f0).sum(1);

//TODO 3.sink

result.print();

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("ops01").build();

RedisSink> redisSink = new RedisSink>(conf,new MyRedisMapper());

result.addSink(redisSink);

//TODO 4.execute

env.execute();

}

public static class MyRedisMapper implements RedisMapper>{

@Override

public RedisCommandDescription getCommandDescription() {

//我们选择的数据结构对应的是 key:String("wcresult"),value:Hash(单词,数量),命令为HSET

return new RedisCommandDescription(RedisCommand.HSET,"wcresult");

}

@Override

public String getKeyFromData(Tuple2 t) {

return t.f0;

}

@Override

public String getValueFromData(Tuple2 t) {

return t.f1.toString();

}

}

}

flink1.12.0学习笔记第1篇-部署与入门 flink1.12.0学习笔记第2篇-流批一体API flink1.12.0学习笔记第3篇-高级API flink1.12.0学习笔记第4篇-Table与SQL flink1.12.0学习笔记第5篇-业务案例实践 flink1.12.0学习笔记第6篇-高级特性与新特性 flink1.12.0学习笔记第7篇-监控与优化

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: