flink1.16.0适配elasticsearch-8 connector 心得
来源:github flink暂时未合并es8源码 https://github.com/apache/flink-connector-elasticsearch/pull/53/files
环境:flink 1.16.0 + jdk 1.8
要点一:OperationSerializer.java
使用的是kryo格式的序列化和反序列化,如果数据源是json,需要调整序列化方法
要点二:NetworkConfigFactory.java
需要在这儿自定义esClient,根据自身环境设置设置es的header 、认证、ssl等 注:这里不要默认header (“Content-Type”, “application/json”)
要点三:需要Operation和Elasticsearch8SinkBuilder做微调,见下图
总结 好了,整体的调整部分就是这样了,接下来就是测试了 ,可与参考test/下的Elasticsearch8SinkTest 我这里也有一个kafka2es的例子给大家参考
package org.apache.flink.connector.elasticsearch.sink;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.http.HttpHost;
import java.util.Arrays;
public class CreateEs8demo {
public static void main(String[] args) {
try {
KafkaSource
.setBootstrapServers("1.1.1.1:9092")
.setTopics("topic")
.setGroupId("test")
.setStartingOffsets(OffsetsInitializer.earliest())
//.setProperties(properties)
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(
true)))
.build();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource
source,
WatermarkStrategy.noWatermarks(),
"source");
HttpHost[] httpHosts = new HttpHost[]{new HttpHost("*.*.*.33", 9200, "https")};
Elasticsearch8Sink
.setMaxBatchSize(1)
.setMaxBufferedRequests(2)
.setMaxTimeInBufferMS(1000)
.setHosts(httpHosts)
.setUsername("xxx")
.setPassword("xxx")
.setConverter(
(element, ctx) ->
new BulkOperation.Builder()
// .index(op -> op
// .index("es8_index")
// .document(element)
// )
.update(op -> op
.index("es8_index")
.id(element.getId())
.action(e -> e.doc(element).docAsUpsert(true))
)
.build())
.build();
sourceStream.map(new MapFunction
@Override
public User map(ObjectNode jsonNodes) throws Exception {
JsonNode value = jsonNodes.get("value");
return User.builder()
.id(value.get("id").asText())
.name(value.get("name").asText())
.age(value.get("age").asText())
.build();
}
})
.sinkTo(elastic);
environment.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public static class User {
private String id;
private String name;
private String age;
}
}
如有不对,欢迎请大家指正,不胜感激,欢迎评论。
相关文章
发表评论