以下是我在公司项目上写的一些代码,删去了业务逻辑后的通用内容

后续会再分享一些其他flink的链路

首先肯定要先导入maven依赖

我的依赖如下

1.8

1.8

UTF-8

2.11.8

2.11

3.0.0-cdh6.3.0

1.12.0

1.1.1

2.1.1-cdh6.3.0

1.2.0

5.1.40

1.10.0

dev

true

compile

prod

provided

apache.snapshots

Apache Development Snapshot Repository

https://repository.apache.org/content/repositories/snapshots/

false

true

org.apache.flink

flink-connector-filesystem_2.12

1.11.3

com.alibaba

fastjson

1.2.62

org.apache.flink

flink-avro

${flink.version}

org.apache.flink

flink-avro-confluent-registry

${flink.version}

org.apache.kudu

kudu-client

${kudu.version}

com.google.protobuf

protobuf-java

2.6.0

org.apache.hbase

hbase-protocol

${hbase.version}

provided

org.apache.hbase

hbase-client

${hbase.version}

provided

org.apache.hbase

hbase-common

${hbase.version}

provided

org.apache.hadoop

hadoop-common

3.0.0

provided

org.apache.flink

flink-java

${flink.version}

${maven.dependency.scope}

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

${maven.dependency.scope}

org.apache.flink

flink-core

${flink.version}

${maven.dependency.scope}

org.apache.flink

flink-runtime_2.11

${flink.version}

${maven.dependency.scope}

org.apache.flink

flink-connector-kafka_${scala.binary.version}

${flink.version}

org.apache.kafka

kafka-clients

${kafka.version}

${maven.dependency.scope}

org.apache.flink

flink-connector-hive_2.11

${flink.version}

provided

org.apache.flink

flink-connector-files

${flink.version}

org.apache.flink

flink-table-api-java

${flink.version}

${maven.dependency.scope}

org.apache.flink

flink-table-planner-blink_2.11

${flink.version}

${maven.dependency.scope}

org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}

${flink.version}

provided

org.apache.flink

flink-table-common

${flink.version}

${maven.dependency.scope}

org.slf4j

slf4j-log4j12

1.7.7

runtime

commons-logging

commons-logging-api

1.1

log4j

log4j

1.2.17

runtime

然后在build里的个性化设置就不粘贴了

接下来是主类,整个主类的传参只需要一个配置文件,然后根据代码内容把对应的配置项写到配置文件即可

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.connector.file.sink.FileSink;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;

import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;

import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.util.Collector;

import org.apache.flink.core.fs.Path;

import java.io.BufferedReader;

import java.io.FileReader;

import java.math.BigInteger;

import java.util.Properties;

import java.util.concurrent.TimeUnit;

public class KafkaToHdfs {

public static void main(String[] args) throws Exception{

//传参

String config_path = args[0];

//获取配置文件

Properties properties = new Properties();

BufferedReader bufferedReader = new BufferedReader(new FileReader(config_path));

properties.load(bufferedReader);

//解析配置文件

String topic_name = properties.getProperty("kakfa.topic.name");

String group_name = properties.getProperty("kakfa.group.name");

String kafka_ips = properties.getProperty("kakfa.ips");

String out_path = properties.getProperty("hdfs.outpath");

String check_path = properties.getProperty("hdfs.checkpoint.path");

String job_name = properties.getProperty("flink.job.name");

String head_name = properties.getProperty("file.header.name");

//设置FLINK环境

StreamExecutionEnvironment env = FlinkEnvUtils.creatEnv(check_path);

//创建kafka环境

Properties props = new Properties();

props.setProperty("bootstrap.servers", kafka_ips);

props.setProperty("group.id", group_name);

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic_name, new SimpleStringSchema(), props);

consumer.setCommitOffsetsOnCheckpoints(true);

consumer.setStartFromGroupOffsets();

//创建流

DataStream stream = env.addSource(consumer);

//设置文件格式

OutputFileConfig config = OutputFileConfig

.builder()

.withPartPrefix(head_name)

.withPartSuffix(".dat")

.build();

//设置时间格式

DateTimeBucketAssigner dateTimeBucketAssigner = new DateTimeBucketAssigner("yyyyMMddHH");

//设置文件生成

FileSink sink = FileSink

.forRowFormat(new Path(out_path), new SimpleStringEncoder("UTF-8"))

.withBucketAssigner(dateTimeBucketAssigner)

.withRollingPolicy(

DefaultRollingPolicy.builder()

.withRolloverInterval(TimeUnit.MINUTES.toMillis(5))

.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))

.withMaxPartSize(1024*1024*1024)

.build())

.withOutputFileConfig(config)

.build();

//业务处理

stream.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String value, Collector out) {

if(value!=null) {

//TODO 这里写具体的业务处理逻辑,每读取一个kafka的offset,会进行一次处理

out.collect(value);

}

}

}

}).sinkTo(sink);

//进行执行

env.execute(job_name);

}

}

FileSink可以自定义滚动策略

withRolloverInterval 包含了至少多少时间的数据量

withInactivityInterval 多久没接受到数据

withMaxPartSize 文件大小达到了多少

当满足以上三个条件的任何一个时都会将 In-progress 状态文件转化为正式文件

其中FlinkEnvUtils.creatEnv方法的代码如下,是一些配置项

public static StreamExecutionEnvironment creatEnv(String check_path){

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5*60*1000L);

env.setStateBackend(new FsStateBackend(check_path));

env.getCheckpointConfig().setCheckpointInterval(5*60*1000L);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointTimeout(5*60000L);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

return env;

}

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: