【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作)

1)导入依赖2)resources2.1.appconfig.yml2.2.application.properties2.3.log4j.properties2.4.log4j2.xml

3)util3.1.KafkaMongoUtils3.2.CustomDeSerializationSchema

4)kafkacdc2mongo4.1.Kafka2MongoApp

需求描述:

1、数据从 Kafka 写入 Mongo。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

7、Mongo 使用 Document 进行封装操作。

8、此示例中通过 db.collection 传参的方式进行。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

gaei.cn.x5l

x8vbusiness

1.0.0

UTF-8

1.8

${target.java.version}

${target.java.version}

2.12

2.12.10

1.14.0

2.17.2

3.1.2

3.1.2

3.12.6

4.3.1

com.ververica

flink-connector-mysql-cdc

2.3.0

redis.clients

jedis

2.9.0

org.apache.flink

flink-java

${flink.version}

provided

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

provided

org.apache.flink

flink-clients_${scala.binary.version}

${flink.version}

provided

org.apache.flink

flink-table-api-java-bridge_${scala.binary.version}

1.14.0

provided

org.apache.flink

flink-table-planner_${scala.binary.version}

${flink.version}

provided

org.apache.flink

flink-streaming-scala_${scala.binary.version}

${flink.version}

provided

org.apache.flink

flink-table-common

${flink.version}

provided

org.apache.flink

flink-cep_${scala.binary.version}

${flink.version}

org.apache.flink

flink-json

${flink.version}

provided

org.apache.flink

flink-csv

${flink.version}

provided

org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}

${flink.version}

org.apache.flink

flink-state-processor-api_${scala.binary.version}

${flink.version}

provided

commons-lang

commons-lang

2.5

compile

org.apache.flink

flink-runtime-web_${scala.binary.version}

${flink.version}

provided

org.apache.logging.log4j

log4j-slf4j-impl

${log4j.version}

runtime

org.apache.logging.log4j

log4j-api

${log4j.version}

runtime

org.apache.logging.log4j

log4j-core

${log4j.version}

runtime

org.apache.hadoop

hadoop-client

3.3.1

org.apache.avro

avro

org.apache.hadoop

hadoop-auth

${hadoop.version}

org.apache.flink

flink-statebackend-rocksdb_${scala.binary.version}

${flink.version}

provided

com.alibaba

fastjson

1.1.23

org.projectlombok

lombok

1.16.18

provided

org.jyaml

jyaml

1.3

org.apache.flink

flink-table-planner-blink_${scala.binary.version}

1.13.5

provided

com.google.code.gson

gson

2.8.3

com.ververica

flink-connector-mongodb-cdc

2.3.0

mysql

mysql-connector-java

8.0.27

runtime

com.alibaba

druid

1.2.8

org.mongodb

bson

${mongo.driver.core.version}

org.mongodb

mongodb-driver-core

${mongo.driver.core.version}

org.mongodb

mongodb-driver

3.12.6

org.apache.maven.plugins

maven-compiler-plugin

3.1

${target.java.version}

${target.java.version}

org.apache.maven.plugins

maven-shade-plugin

3.0.0

package

shade

org.apache.flink:force-shading

com.google.code.findbugs:jsr305

org.slf4j:*

org.apache.logging.log4j:*

org.apache.flink:flink-runtime-web_2.11

*:*

META-INF/*.SF

META-INF/*.DSA

META-INF/*.RSA

implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

com.owp.flink.kafka.KafkaSourceDemo

implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

org.eclipse.m2e

lifecycle-mapping

1.0.0

org.apache.maven.plugins

maven-shade-plugin

[3.0.0,)

shade

org.apache.maven.plugins

maven-compiler-plugin

[3.1,)

testCompile

compile

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"

mysql.username: "test"

mysql.password: "123456"

mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin

#database=diagnosis

#collection=diagnosisEntiry

maxConnectionIdleTime=1000000

batchSize=1

# flink

checkpoint.interval=300000

checkpoint.minPauseBetweenCheckpoints=10000

checkpoint.checkpointTimeout=400000

maxConcurrentCheckpoints=1

restartInterval=120

restartStrategy=3

checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo

mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false

mysql.username=test

mysql.password=123456

#envType=PRE

envType=PRD

# mysql druid 连接池生产环境连接池配置

druid.driverClassName=com.mysql.jdbc.Driver

#生产

druid.url=jdbc:mysql://1.1.1.1:3306/test

druid.username=test

druid.password=123456

# 初始化连接数

druid.initialSize=1

# 最大连接数

druid.maxActive=5

# 最大等待时间

druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

3)util

3.1.KafkaMongoUtils

public class KafkaUtils {

public static FlinkKafkaConsumer> getKafkaConsumerForMongo(List topic) throws IOException {

Properties prop1 = confFromYaml();

//认证环境

String envType = prop1.getProperty("envType");

Properties prop = new Properties();

System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");

prop.put("security.protocol", "SASL_PLAINTEXT");

prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "

+ "useTicketCache=false "

+ "serviceName=\"" + "kafka" + "\" "

+ "useKeyTab=true "

+ "keyTab=\"" + "/opt/conf/test.keytab" + "\" "

+ "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");

// prop.put("bootstrap.servers", "kfk01.pre.x8v.com:9092");

prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));

prop.put("group.id", "Kafka2Mongo_all");

prop.put("auto.offset.reset", "earliest");

prop.put("enable.auto.commit", "false");

prop.put("max.poll.interval.ms", "60000");

prop.put("max.poll.records", "3000");

prop.put("session.timeout.ms", "600000");

// List topics = Stream.of(prop.getProperty("topics").split(",", -1))

// .collect(Collectors.toList());

prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");

prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");

FlinkKafkaConsumer> consumer = new FlinkKafkaConsumer>(topic, new CustomDeSerializationSchema(), prop);

consumer.setStartFromGroupOffsets();

consumer.setCommitOffsetsOnCheckpoints(true);

return consumer;

}

public static void main(String[] args) throws Exception {

Properties druidConf = KafkaUtils.getDruidConf();

if (druidConf == null) {

throw new RuntimeException("缺少druid相关配置信息,请检查");

}

DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);

Connection connection = dataSource.getConnection();

PreparedStatement showDatabases = connection.prepareStatement("\n" +

"select count(*) from tab_factory");

ResultSet resultSet = showDatabases.executeQuery();

while (resultSet.next()) {

String string = resultSet.getString(1);

System.out.println(string);

}

resultSet.close();

showDatabases.close();

connection.close();

}

public static Properties getDruidConf() {

try {

Properties prop = confFromYaml();

String driverClassName = prop.get("druid.driverClassName").toString();

String url = prop.get("druid.url").toString();

String username = prop.get("druid.username").toString();

String password = prop.get("druid.password").toString();

String initialSize = prop.get("druid.initialSize").toString();

String maxActive = prop.get("druid.maxActive").toString();

String maxWait = prop.get("druid.maxWait").toString();

Properties p = new Properties();

p.put("driverClassName", driverClassName);

p.put("url", url);

p.put("username", username);

p.put("password", password);

p.put("initialSize", initialSize);

p.put("maxActive", maxActive);

p.put("maxWait", maxWait);

// p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));

return p;

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

// envType PRE PRD

public static Map getKafkaKerberos(String envType) {

Map map = new HashMap<>();

if ("PRD".equalsIgnoreCase(envType)) {

map.put("principal", "prd@PRD.PRD.COM");

map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");

} else if ("PRE".equalsIgnoreCase(envType)) {

map.put("principal", "pre@PRE.PRE.COM");

map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");

} /*else if ("TEST".equalsIgnoreCase(envType)) {

map.put("principal","test@TEST.TEST.COM");

map.put("bootstrap.servers","test@TEST.TEST.COM");

} */ else {

System.out.println("没有该" + envType + "环境");

throw new RuntimeException("没有该" + envType + "环境");

}

return map;

}

public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {

Properties prop = confFromYaml();

env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));

env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));

env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(

Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次

Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时

));

// 设置状态后端存储方式

// env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));

// env.setStateBackend(new MemoryStateBackend());

env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));

return env;

}

public static Properties confFromYaml() {

Properties prop = new Properties();

InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");

try {

prop.load(resourceStream);

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

if (resourceStream != null) {

resourceStream.close();

}

} catch (Exception ex) {

ex.printStackTrace();

}

}

return prop;

}

}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema> {

private static String encoding = "UTF8";

//是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来

@Override

public boolean isEndOfStream(ConsumerRecord nextElement) {

return false;

}

//这里返回一个ConsumerRecord类型的数据,除了原数据还包括topic,offset,partition等信息

@Override

public ConsumerRecord deserialize(ConsumerRecord record) throws Exception {

byte[] key = (record.key() == null ? "".getBytes() : record.key());

return new ConsumerRecord(

record.topic(),

record.partition(),

record.offset(),

record.timestamp(),

record.timestampType(),

record.checksum(),

record.serializedKeySize(),

record.serializedValueSize(),

/*这里我没有进行空值判断,生产一定记得处理*/

new String(key, encoding),

new String(record.value(), encoding));

}

//指定数据的输入类型

@Override

public TypeInformation> getProducedType() {

return TypeInformation.of(new TypeHint>() {

});

}

}

4)kafkacdc2mongo

4.1.Kafka2MongoApp

public class Kafka2MongoApp {

public static void main(String[] args) throws Exception {

System.setProperty("HADOOP_USER_NAME", "hadoop");

String[] split = args[0].split(",");

// String topic = "mongo_" + database + "_" + collection;

List topicList = new ArrayList<>();

Map dbAndCol = new HashMap<>();

for (String s : split) {

String[] t = s.split("\\.");

String e = "mongo_" + t[0] + "_" + t[1];

topicList.add(e);

dbAndCol.put(e, s);

}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();

KafkaUtils.setupFlinkEnv(env);

RichSinkFunction> sinkFunction = new RichSinkFunction>() {

private Stack connectionPool = new Stack<>();

private String url = null;

@Override

public void open(Configuration parameters) throws Exception {

initPool();

}

/**

* 初始化连接池,设置参数。

*/

private void initPool() {

Properties prop = KafkaUtils.confFromYaml();

url = prop.getProperty("url");

try {

for (int i = 0; i < 5; i++) {

MongoClient client = new MongoClient(new MongoClientURI(url));

connectionPool.push(client);

}

} catch (MongoException e) {

e.printStackTrace();

}

}

@Override

public void invoke(ConsumerRecord record, Context context) throws Exception {

MongoClient mongoClient = null;

try {

String topic = record.topic();

String dbAndColstr = dbAndCol.get(topic);

String[] t = dbAndColstr.split("\\.");

String databaseStr = t[0];

String collectionStr = t[1];

Document doc = Document.parse(record.value());

String operationType = doc.getString("operationType");

String documentKey = doc.getString("documentKey");

Object id = documentKey;

id = doc.get("documentKey");

// 从连接池获取连接

mongoClient = connectionPool.pop();

MongoCollection collection = null;

try {

collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);

} catch (Exception e) {

try {

mongoClient.close();

} catch (Exception ignore) {

}

// 链接过期

mongoClient = new MongoClient(new MongoClientURI(url));

collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);

}

if ("delete".equalsIgnoreCase(operationType)) {

collection.deleteOne(new Document("id", id));

}

if (documentKey != null && !documentKey.isEmpty() && !"delete".equals(operationType)) {

Document outputDoc = (Document) doc.get("fullDocument");

outputDoc.put("id", id);

try {

collection.deleteOne(new Document("id", id));

} catch (Exception e) {

System.out.println("添加更新前先删除:异常信息====>>>" + e.getMessage() + "插入的数据是\n" + outputDoc);

}

if ("insert".equalsIgnoreCase(operationType) || "update".equalsIgnoreCase(operationType) || "replace".equalsIgnoreCase(operationType)) {

insertOne(collection, outputDoc);

}

}

} catch (Exception e) {

System.out.printf("mongodb 同步异常,原因是%s,topic是%s,value值是\n%s%n", e.getMessage(), record.topic(), record.value());

} finally {

if (mongoClient != null) {

// 把连接放回连接池

connectionPool.push(mongoClient);

}

}

}

@Override

public void close() throws Exception {

for (MongoClient mongoClient : connectionPool) {

try {

mongoClient.close();

} catch (Exception ignore) {

}

}

}

private void insertOne(MongoCollection collection, Document doc) {

String collectionName = collection.getNamespace().getCollectionName();

//处理特殊字段

handle(collectionName, doc);

collection.insertOne(doc);

}

//如果有时间字段需要处理示例如下

private void handle(String collectionName, Document doc) {

if (collectionName.equals("test1")) {

//systemTime 是 Date类型,不是String 2023-10-13 11:37:43.238

formatStringTime(doc, "systemTime");

return;

}

if (collectionName.equals("test2")) {

formatStringTime(doc, "time");

return;

}

if (collectionName.equals("test3") || collectionName.equals("timer_record")) {

formatStringTime(doc, "createTime");

formatStringTime(doc, "updateTime");

return;

}

}

//将String 转 date

private void formatStringTime(Document doc, String key) {

try {

String time = doc.getString(key);

if (time == null) {

return;

}

Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(time);

doc.put(key, parse);

} catch (Exception e) {

e.printStackTrace();

}

}

};

env.addSource(KafkaUtils.getKafkaConsumerForMongo(topicList))

.keyBy(e -> {

Document doc = Document.parse(e.value());

return doc.getString("documentKey");

})

.addSink(sinkFunction);

env.execute("kafka2mongo synchronization " + topicList);

}

}

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: