提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

前言一、写入到Elasticsearch5二、写入到Elasticsearch7总结

前言

Flink sink 流数据写入到es5和es7的简单示例。

一、写入到Elasticsearch5

pom maven依赖

org.apache.flink

flink-connector-elasticsearch5_2.11

${flink.version}

代码如下(示例):

public class Es5SinkDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));

Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));

Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));

Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));

Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));

Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));

DataStreamSource source =env.fromElements(row,row2,row3,row4,row5,row6);

Map config = new HashMap<>();

// config.put("cluster.name", "my-cluster-name");

// config.put("bulk.flush.max.actions", "1");

List transportAddresses = new ArrayList<>();

transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.68.8.60"), 9300));

//Sink操作

DataStreamSink rowDataStreamSink = source.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction() {

public IndexRequest createIndexRequest(Row element) {

Map json = new HashMap<>();

json.put("name22", element.getField(0).toString());

json.put("no22", element.getField(1));

json.put("age", 34);

json.put("create_time", element.getField(2));

return Requests.indexRequest()

.index("cc")

.type("mtype")

.id(element.getField(1).toString())

.source(json);

}

@Override

public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {

//利用requestIndexer进行发送请求,写入数据

indexer.add(createIndexRequest(element));

}

}));

env.execute("es demo");

}

private static java.sql.Timestamp getTimestamp(String str) throws Exception {

// String string = "2016-10-24 21:59:06";

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

java.util.Date date=sdf.parse(str);

java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());

return s;

}

二、写入到Elasticsearch7

pom maven依赖

org.apache.flink

flink-connector-elasticsearch7_2.11

${flink.version}

provided

代码如下(示例):

import org.apache.flink.api.common.functions.RuntimeContext;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;

import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;

import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;

import org.apache.flink.types.Row;

import org.apache.http.HttpHost;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.client.Requests;

import java.text.SimpleDateFormat;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

public class EsSinkDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));

Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));

Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));

Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));

Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));

Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));

DataStreamSource source =env.fromElements(row,row2,row3,row4,row5,row6);

Map config = new HashMap<>();

// config.put("cluster.name", "my-cluster-name");

// This instructs the sink to emit after every element, otherwise they would be buffered

// config.put("bulk.flush.max.actions", "1");

List hosts = new ArrayList<>();

hosts.add(new HttpHost("10.68.8.69",9200,"http"));

ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder(hosts,new ElasticsearchSinkFunction() {

public IndexRequest createIndexRequest(Row element) {

Map json = new HashMap<>();

json.put("name22", element.getField(0).toString());

json.put("no22", element.getField(1));

json.put("age", 34);

// json.put("create_time", element.getField(2));

return Requests.indexRequest()

.index("cc")

.id(element.getField(1).toString())

.source(json);

}

@Override

public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {

//利用requestIndexer进行发送请求,写入数据

indexer.add(createIndexRequest(element));

}

});

esSinkBuilder.setBulkFlushMaxActions(100);

//Sink操作

source.addSink(esSinkBuilder.build());

env.execute("es demo");

}

private static java.sql.Timestamp getTimestamp(String str) throws Exception {

// String string = "2016-10-24 21:59:06";

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

java.util.Date date=sdf.parse(str);

java.sql.Timestamp s = new java.sql.Timestamp(date.getTime());

return s;

}

}

总结

flink写入es5和es7 的区别是引入不同的flink-connector-elasticsearch,es7已没有type的概念故无需再设置type。

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: