先学会用,再去研究原理,代码如下

import java.io.IOException;

import java.util.List;

import java.util.stream.Collectors;

import javax.annotation.Resource;

import org.elasticsearch.action.DocWriteRequest;

import org.elasticsearch.action.bulk.BackoffPolicy;

import org.elasticsearch.action.bulk.BulkItemResponse;

import org.elasticsearch.action.bulk.BulkProcessor;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.delete.DeleteRequest;

import org.elasticsearch.client.RequestOptions;

import org.elasticsearch.client.RestHighLevelClient;

import org.elasticsearch.common.unit.ByteSizeUnit;

import org.elasticsearch.common.unit.ByteSizeValue;

import org.elasticsearch.common.unit.TimeValue;

import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j

@Component

public class EsUtils {

@Resource

private RestHighLevelClient restHighLevelClient;

public BulkProcessor createBulkProcessor() {

BulkProcessor.Listener listener = new BulkProcessor.Listener() {

@Override

public void beforeBulk(long executionId, BulkRequest request) {

log.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());

}

@Override

public void afterBulk(long executionId, BulkRequest request,BulkResponse response) {

if (!response.hasFailures()) {

log.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());

} else {

BulkItemResponse[] items = response.getItems();

for (BulkItemResponse item : items) {

if (item.isFailed()) {

log.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());

break;

}

}

}

}

@Override

public void afterBulk(long executionId, BulkRequest request,Throwable failure) {

List> requests = request.requests();

List esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());

log.error("【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);

}

};

BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {

restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);

}), listener);

//到达10000条时刷新

builder.setBulkActions(10000);

//内存到达8M时刷新

builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));

//设置的刷新间隔10s

builder.setFlushInterval(TimeValue.timeValueSeconds(10));

//设置允许执行的并发请求数。

builder.setConcurrentRequests(8);

//设置重试策略

builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));

return builder.build();

}

}

import java.util.List;

import *************.Actor;

public interface EsService {

/*****

*asIndex 索引

list 入参

*/

void pushData(String asIndex, List list);

}

 业务代码实现

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import javax.annotation.Resource;

import org.elasticsearch.action.index.IndexRequest;

import org.springframework.stereotype.Service;

import *************.Actor;

@Service

public class EsServiceImpl implements EsService {

@Resource

private EsUtils esUtils;

@Override

public void pushData(String asIndex, List list) {

List indexRequests = new ArrayList<>();

list.forEach(data -> {

IndexRequest request = new IndexRequest();

Map map = new HashMap<>();

map.put("id", data.getActorId());

map.put("actorName", data.getActorName());

request.id(data.getActorId()+"");

request.index(asIndex);

request.source(map);

indexRequests.add(request);

});

indexRequests.forEach(esUtils.createBulkProcessor()::add);

}

}

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: