1.flink集群搭建

不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink

flink-configuration-configmap.yaml

apiVersion: v1

kind: ConfigMap

metadata:

name: flink-config

labels:

app: flink

data:

flink-conf.yaml: |+

jobmanager.rpc.address: flink-jobmanager

taskmanager.numberOfTaskSlots: 2

blob.server.port: 6124

jobmanager.rpc.port: 6123

taskmanager.rpc.port: 6122

jobmanager.memory.process.size: 1600m

taskmanager.memory.process.size: 1728m

parallelism.default: 2

log4j-console.properties: |+

# This affects logging for both user code and Flink

rootLogger.level = INFO

rootLogger.appenderRef.console.ref = ConsoleAppender

rootLogger.appenderRef.rolling.ref = RollingFileAppender

# Uncomment this if you want to _only_ change Flink's logging

#logger.flink.name = org.apache.flink

#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on

# log level INFO. The root logger does not override this. You have to manually

# change the log levels here.

logger.pekko.name = org.apache.pekko

logger.pekko.level = INFO

logger.kafka.name= org.apache.kafka

logger.kafka.level = INFO

logger.hadoop.name = org.apache.hadoop

logger.hadoop.level = INFO

logger.zookeeper.name = org.apache.zookeeper

logger.zookeeper.level = INFO

# Log all infos to the console

appender.console.name = ConsoleAppender

appender.console.type = CONSOLE

appender.console.layout.type = PatternLayout

appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file

appender.rolling.name = RollingFileAppender

appender.rolling.type = RollingFile

appender.rolling.append = false

appender.rolling.fileName = ${sys:log.file}

appender.rolling.filePattern = ${sys:log.file}.%i

appender.rolling.layout.type = PatternLayout

appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

appender.rolling.policies.type = Policies

appender.rolling.policies.size.type = SizeBasedTriggeringPolicy

appender.rolling.policies.size.size=100MB

appender.rolling.strategy.type = DefaultRolloverStrategy

appender.rolling.strategy.max = 10

# Suppress the irrelevant (wrong) warnings from the Netty channel handler

logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline

logger.netty.level = OFF

jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

apiVersion: v1

kind: Service

metadata:

name: flink-jobmanager

spec:

type: ClusterIP

ports:

- name: rpc

port: 6123

- name: blob-server

port: 6124

- name: webui

port: 8081

selector:

app: flink

component: jobmanager

Session cluster resource definitions #

jobmanager-session-deployment-non-ha.yaml

apiVersion: apps/v1

kind: Deployment

metadata:

name: flink-jobmanager

spec:

replicas: 1

selector:

matchLabels:

app: flink

component: jobmanager

template:

metadata:

labels:

app: flink

component: jobmanager

spec:

containers:

- name: jobmanager

image: apache/flink:latest

args: ["jobmanager"]

ports:

- containerPort: 6123

name: rpc

- containerPort: 6124

name: blob-server

- containerPort: 8081

name: webui

livenessProbe:

tcpSocket:

port: 6123

initialDelaySeconds: 30

periodSeconds: 60

volumeMounts:

- name: flink-config-volume

mountPath: /opt/flink/conf

securityContext:

runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary

volumes:

- name: flink-config-volume

configMap:

name: flink-config

items:

- key: flink-conf.yaml

path: flink-conf.yaml

- key: log4j-console.properties

path: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1

kind: Deployment

metadata:

name: flink-taskmanager

spec:

replicas: 2

selector:

matchLabels:

app: flink

component: taskmanager

template:

metadata:

labels:

app: flink

component: taskmanager

spec:

containers:

- name: taskmanager

image: apache/flink:latest

args: ["taskmanager"]

ports:

- containerPort: 6122

name: rpc

livenessProbe:

tcpSocket:

port: 6122

initialDelaySeconds: 30

periodSeconds: 60

volumeMounts:

- name: flink-config-volume

mountPath: /opt/flink/conf/

securityContext:

runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary

volumes:

- name: flink-config-volume

configMap:

name: flink-config

items:

- key: flink-conf.yaml

path: flink-conf.yaml

- key: log4j-console.properties

path: log4j-console.properties

 kubectl apply -f xxx.yaml 或者 kubectl apply -f ./flink  flink为文件夹,存放的是以上这几个.yaml文件

为flink的ui界面添加nodeport即可外部访问

2. demo代码测试

创建一个maven工程,pom.xml引入依赖:

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

test-platform

com.test

2.0.0-SNAPSHOT

4.0.0

flink-demo

11

11

UTF-8

1.17.0

2.20.0

org.apache.flink

flink-streaming-java

${flink.version}

org.apache.flink

flink-clients

${flink.version}

org.apache.logging.log4j

log4j-slf4j-impl

compile

${log4j.version}

org.apache.logging.log4j

log4j-api

compile

${log4j.version}

org.apache.logging.log4j

log4j-core

compile

${log4j.version}

log4j2.xml:

计数代码:

package com.test.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class WordCountUnboundStreamDemo {

public static void main(String[] args) throws Exception {

// TODO 1.创建执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

// 3, // 尝试重启的次数

// Time.of(10, TimeUnit.SECONDS) // 间隔

// ));

// TODO 2.读取数据

DataStreamSource lineDS = env.socketTextStream("192.168.0.28", 7777);

// TODO 3.处理数据: 切分、转换、分组、聚合

// TODO 3.1 切分、转换

SingleOutputStreamOperator> wordAndOneDS = lineDS //<输入类型, 输出类型>

.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String value, Collector> out) throws Exception {

// 按照 空格 切分

String[] words = value.split(" ");

for (String word : words) {

// 转换成 二元组 (word,1)

Tuple2 wordsAndOne = Tuple2.of(word, 1);

// 通过 采集器 向下游发送数据

out.collect(wordsAndOne);

}

}

});

// TODO 3.2 分组

KeyedStream, String> wordAndOneKS = wordAndOneDS.keyBy(

new KeySelector, String>() {

@Override

public String getKey(Tuple2 value) throws Exception {

return value.f0;

}

}

);

// TODO 3.3 聚合

SingleOutputStreamOperator> sumDS = wordAndOneKS.sum(1);

// TODO 4.输出数据

sumDS.print("接收到的数据=======").setParallelism(1);

// TODO 5.执行:类似 sparkstreaming最后 ssc.start()

env.execute(sumDS.getClass().getSimpleName());

}

}

打成jar包导入flink dashboard:

在另一台机器上运行 nc -lk -p 7777,如果出现连接拒绝,查看是否放开端口号

k8s查看读取到的数据

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: