文章目录

前言一、Elasticsearch是什么?二、实现步骤1.创建BulkProcessor2.批量写入数据

总结

前言

批量写入到Elasticsearch会提高写入性能,减少Elasticsearch io压力。

一、Elasticsearch是什么?

Elasticsearch是一个实时的分布式开放源代码全文本搜索和分析引擎。可从RESTful Web服务界面访问它,并使用无模式的JSON(JavaScript对象表示法)文档存储数据。它基于Java编程语言构建,因此Elasticsearch可以在不同平台上运行。它使用户能够以很高的速度浏览大量的数据。

二、实现步骤

1.创建BulkProcessor

BulkProcessor是一个线程安全的批量处理类,允许方便地设置每次写入ES的最大数量,以及超时时间。所谓超时时间,就是在规定的时间内,如果没有请求进来,就把之前累积的请求直接写到ES,不必等待请求数量累积到你规定的最大数量。 代码如下(示例):

import org.elasticsearch.action.ActionRequest;

import org.elasticsearch.action.DocWriteRequest;

import org.elasticsearch.action.bulk.BulkItemResponse;

import org.elasticsearch.action.bulk.BulkProcessor;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.rest.RestStatus;

public class BulkProListener implements BulkProcessor.Listener{

@Override

public void beforeBulk(long l, BulkRequest bulkRequest) {

System.out.println("执行前");

}

@Override

public void afterBulk(long l, BulkRequest request, BulkResponse response) {

System.out.println("执行后");

if (response.hasFailures()) {

BulkItemResponse itemResponse;

Throwable failure;

RestStatus restStatus;

DocWriteRequest actionRequest;

try {

for (int i = 0; i < response.getItems().length; i++) {

itemResponse = response.getItems()[i];

if (itemResponse.isFailed()) {

failure = itemResponse.getFailure().getCause();

if (failure != null) {

restStatus = itemResponse.getFailure().getStatus();

actionRequest = request.requests().get(i);

if (restStatus == null) {

if (actionRequest instanceof ActionRequest) {

System.out.println("Failed Elasticsearch item request: " + failure.getCause().getMessage());

} else {

throw new UnsupportedOperationException(

"The sink currently only supports ActionRequests");

}

}else{

if (actionRequest instanceof ActionRequest) {

System.out.println("Failed sink item request: " + failure.getCause().getMessage()+" status: "+restStatus.getStatus());

failure.printStackTrace();

} else {

throw new UnsupportedOperationException(

"The sink currently only supports ActionRequests");

}

}

}

}

}

}catch (Throwable t){

t.printStackTrace();

}

}

}

@Override

public void afterBulk(long l, BulkRequest request, Throwable failure) {

System.out.println("有错误");

try {

for (DocWriteRequest writeRequest : request.requests()) {

if (writeRequest instanceof ActionRequest) {

System.out.println("Failed Elasticsearch item request: " + failure.getMessage());

failure.printStackTrace();

} else {

throw new UnsupportedOperationException(

"The sink currently only supports ActionRequests");

}

}

} catch (Throwable t) {

// fail the sink and skip the rest of the items

// if the failure handler decides to throw an exception

t.printStackTrace();

}

}

}

2.批量写入数据

在es中建立索引batch,类型my_type,结构为"user_name",“user_id”,“age”,“user_note” //设置满5000条提交,时间间隔10秒 bulkProcessor.setBulkActions(5000).setFlushInterval(TimeValue.timeValueSeconds(10)).build(); 代码如下(示例):

import com.alibaba.fastjson.JSONObject;

import org.elasticsearch.action.bulk.BulkProcessor;

import org.elasticsearch.action.bulk.BulkRequestBuilder;

import org.elasticsearch.action.index.IndexRequestBuilder;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.network.NetworkModule;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.common.unit.TimeValue;

import org.elasticsearch.transport.Netty3Plugin;

import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.net.InetAddress;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.TimeUnit;

public class EsBatchWriterTest {

public static void main(String[] args) throws Exception {

Settings settings = Settings.builder().put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)

.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME).build();

// Settings settings = Settings.EMPTY;

//创建client

TransportClient client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.68.8.60"), 9300));

batch2(client);

// update(client);

client.close();

}

public static void batch2(TransportClient client) throws InterruptedException {

BulkProcessor.Builder bulkProcessor = BulkProcessor.builder(

client,new BulkProListener());

BulkProcessor processor = bulkProcessor.setBulkActions(5000).setFlushInterval(TimeValue.timeValueSeconds(10)).build();

int count = 1;

List list = getData();

System.out.println(list.size());

for(JSONObject obj:list) {

System.out.println(obj.toJSONString());

IndexRequestBuilder builder = client.prepareIndex("batch", "my_type").setId(obj.getString("user_id")).setSource(obj);

processor.add(builder.request());

}

processor.awaitClose(2, TimeUnit.MINUTES);

// processor.close();

}

private static List getData(){

List list =new ArrayList<>();

JSONObject j=new JSONObject();

j.put("user_name","name7");

j.put("user_id","7");

j.put("age","34");

j.put("user_note","note");

list.add(j);

j=new JSONObject();

j.put("user_name","name8");

j.put("user_id","8");

j.put("age","24");

j.put("user_note","note");

list.add(j);

j=new JSONObject();

j.put("user_name","name9");

j.put("user_id","9");

j.put("age","24");

j.put("user_note","note");

list.add(j);

j=new JSONObject();

j.put("user_name","name10");

j.put("user_id","10");

j.put("age","14");

j.put("user_note","note");

list.add(j);

j=new JSONObject();

j.put("user_name","name11");

j.put("user_id","11");

j.put("age","54b");

j.put("user_note","note");

list.add(j);

j=new JSONObject();

j.put("user_name","name20");

j.put("user_id","20");

j.put("age","34a");

j.put("user_note","note");

list.add(j);

j=new JSONObject();

j.put("user_name","name30");

j.put("user_id","30");

j.put("age","30");

j.put("user_note","note");

list.add(j);

return list;

}

public static void batch(TransportClient client){

int count = 1;

//开启批量插入

BulkRequestBuilder bulkRequest = client.prepareBulk();

List list =new ArrayList<>();

JSONObject j=new JSONObject();

j.put("user_name","name1");

j.put("user_id","1");

list.add(j);

j=new JSONObject();

j.put("user_name","name3");

j.put("user_id","3");

list.add(j);

j=new JSONObject();

j.put("user_name","name2");

j.put("user_id","2");

list.add(j);

for(JSONObject obj:list){

IndexRequestBuilder builder = client.prepareIndex("batch", "my_type").setId(obj.getString("user_id")).setSource(obj);

bulkRequest.add(builder);

//每一千条提交一次

if (count% 1000==0) {

bulkRequest.execute().actionGet();

System.out.println("提交了:" + count);

}

count++;

}

bulkRequest.execute().actionGet();

}

}

总结

执行文档批量请求时,首先需要初始化 Elasticsearch Client,其次创建 BulkProcessor , 设置 BulkProcessor 参数,最后关闭processor。本文示例为ES7版本,有关ES5相关api调用示例请下载Elasticsearch5学习笔记和Java对es进行增删改查示例

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: