flink1.16.0适配elasticsearch-8 connector 心得

来源:github flink暂时未合并es8源码 https://github.com/apache/flink-connector-elasticsearch/pull/53/files

环境:flink 1.16.0 + jdk 1.8

要点一:OperationSerializer.java

使用的是kryo格式的序列化和反序列化,如果数据源是json,需要调整序列化方法

要点二:NetworkConfigFactory.java

需要在这儿自定义esClient,根据自身环境设置设置es的header 、认证、ssl等 注:这里不要默认header (“Content-Type”, “application/json”)

要点三:需要Operation和Elasticsearch8SinkBuilder做微调,见下图

总结 好了,整体的调整部分就是这样了,接下来就是测试了 ,可与参考test/下的Elasticsearch8SinkTest 我这里也有一个kafka2es的例子给大家参考

package org.apache.flink.connector.elasticsearch.sink;

import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;

import lombok.AllArgsConstructor;

import lombok.Builder;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.connector.kafka.source.KafkaSource;

import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;

import org.apache.http.HttpHost;

import java.util.Arrays;

public class CreateEs8demo {

public static void main(String[] args) {

try {

KafkaSource source = KafkaSource.builder()

.setBootstrapServers("1.1.1.1:9092")

.setTopics("topic")

.setGroupId("test")

.setStartingOffsets(OffsetsInitializer.earliest())

//.setProperties(properties)

.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(

true)))

.build();

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource sourceStream = environment.fromSource(

source,

WatermarkStrategy.noWatermarks(),

"source");

HttpHost[] httpHosts = new HttpHost[]{new HttpHost("*.*.*.33", 9200, "https")};

Elasticsearch8Sink elastic = Elasticsearch8SinkBuilder.builder()

.setMaxBatchSize(1)

.setMaxBufferedRequests(2)

.setMaxTimeInBufferMS(1000)

.setHosts(httpHosts)

.setUsername("xxx")

.setPassword("xxx")

.setConverter(

(element, ctx) ->

new BulkOperation.Builder()

// .index(op -> op

// .index("es8_index")

// .document(element)

// )

.update(op -> op

.index("es8_index")

.id(element.getId())

.action(e -> e.doc(element).docAsUpsert(true))

)

.build())

.build();

sourceStream.map(new MapFunction() {

@Override

public User map(ObjectNode jsonNodes) throws Exception {

JsonNode value = jsonNodes.get("value");

return User.builder()

.id(value.get("id").asText())

.name(value.get("name").asText())

.age(value.get("age").asText())

.build();

}

})

.sinkTo(elastic);

environment.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

@Data

@AllArgsConstructor

@NoArgsConstructor

@Builder

public static class User {

private String id;

private String name;

private String age;

}

}

如有不对,欢迎请大家指正,不胜感激,欢迎评论。

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: