文章目录

Retryer接口OpenFeign中提供的Retryer接口的两个实现Retryer.Default

重试的源码分析asyncResponseHandler.handleResponse源码解析

ErrorDecoder源码分析OpenFeign默认使用的ErrorDecoder是ErrorDecoder.Default

总结

Retryer接口

Retryer接口的实现主要是continueOrPropagate方法,该方法主要用于判断是否可以重试,具体的判断逻辑,我们在下面介绍

void continueOrPropagate(RetryableException e);

Retryer clone(); //Retryer继承了Cloneable接口

OpenFeign中提供的Retryer接口的两个实现

Retryer.Default:OpenFeign中提供的默认的重试实现Retryer.NEVER_RETRY:在无任何自定义配置的情况下,OpenFeign使用该重试配置,表示不重试。其配置在FeignClientsConfiguration中:

@Bean

@ConditionalOnMissingBean

public Retryer feignRetryer() {

return Retryer.NEVER_RETRY;

}

我们可以自定义覆盖该配置,来设置重试接口。

Retryer.Default

源码如下,我们做了一些基本的说明

class Default implements Retryer {

private final int maxAttempts; // 最大尝试次数

private final long period; // 间隔时间

private final long maxPeriod; // 最大间隔时间

int attempt; // 当前尝试了多少次

long sleptForMillis; // 已休眠了多久

public Default() {

this(100, SECONDS.toMillis(1), 5);

}

public Default(long period, long maxPeriod, int maxAttempts) {

this.period = period;

this.maxPeriod = maxPeriod;

this.maxAttempts = maxAttempts;

this.attempt = 1;

}

protected long currentTimeMillis() {

return System.currentTimeMillis();

}

public void continueOrPropagate(RetryableException e) {

if (attempt++ >= maxAttempts) { // 超过最大尝试次数,则直接抛出异常e

throw e;

}

long interval;// 实际的间隔时间

if (e.retryAfter() != null) {

// retryAfter是RetryableException抛出时设置的,这个我们在将ErrorEncoder的时候再说

interval = e.retryAfter().getTime() - currentTimeMillis();

if (interval > maxPeriod) {

interval = maxPeriod;

}

if (interval < 0) {

return;

}

} else {

// retryAfter 为null将使用period 来计算间隔时间

interval = nextMaxInterval();

}

try {

Thread.sleep(interval);// 休眠interval毫秒

} catch (InterruptedException ignored) {

Thread.currentThread().interrupt();

throw e;

}

sleptForMillis += interval;

}

long nextMaxInterval() {

long interval = (long) (period * Math.pow(1.5, attempt - 1));

return interval > maxPeriod ? maxPeriod : interval;

}

@Override

public Retryer clone() {

return new Default(period, maxPeriod, maxAttempts);

}

}

Retryer.Default的continueOrPropagate方法,主要就是计算实际的间隔时间,然后线程休眠对应的时间,如果达到重试次数,则把RetryableException抛出

重试的源码分析

我们在前文中已经提到,每次的FeignClient调用,主要就是SynchronousMethodHandler的invoke和executeAndDecode方法,我们再次来看看这两个方法

public Object invoke(Object[] argv) throws Throwable {

RequestTemplate template = buildTemplateFromArgs.create(argv);

Options options = findOptions(argv);

Retryer retryer = this.retryer.clone(); // 克隆Retryer

while (true) {// 重试循环

try {

// 执行并decode

return executeAndDecode(template, options);

} catch (RetryableException e) {// catch RetryableException异常

try {

// 执行retryery接口的continueOrPropagate方法

retryer.continueOrPropagate(e);

} catch (RetryableException th) {

// 如果continueOrPropagate接着抛出RetryableException 异常,则不再重试

Throwable cause = th.getCause();

if (propagationPolicy == UNWRAP && cause != null) {

throw cause;

} else {

throw th;

}

}

if (logLevel != Logger.Level.NONE) {

logger.logRetry(metadata.configKey(), logLevel);

}

// 休眠之后,重试

continue;

}

}

}

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {

Request request = targetRequest(template);

if (logLevel != Logger.Level.NONE) {

logger.logRequest(metadata.configKey(), logLevel, request);

}

Response response;

long start = System.nanoTime();

try {

response = client.execute(request, options);

// ensure the request is set. TODO: remove in Feign 12

response = response.toBuilder()

.request(request)

.requestTemplate(template)

.build();

} catch (IOException e) {

if (logLevel != Logger.Level.NONE) {

logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));

}

// 执行请求时,如果出现IOException,则认为是请求时网络波动等问题影响了请求,尝试重试

// errorExecuting方法是FeignException类下静态方法

// import static feign.FeignException.errorExecuting;

// 该方法直接抛出RetryableException

throw errorExecuting(request, e);

}

long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);

if (decoder != null)

return decoder.decode(response, metadata.returnType());

CompletableFuture resultFuture = new CompletableFuture<>();

// 处理请求结果(这里涉及到结果的Decoder,可查看我们的上一篇文章 OpenFeign的基本配置)

// 这里面除了我们上一篇文档的Decoder,还有一个特殊的Decoder,ErrorDecoder

asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,

metadata.returnType(),

elapsedTime);

try {

if (!resultFuture.isDone())

throw new IllegalStateException("Response handling not done");

return resultFuture.join();

} catch (CompletionException e) {

Throwable cause = e.getCause();

if (cause != null)

throw cause;

throw e;

}

}

asyncResponseHandler.handleResponse源码解析

void handleResponse(CompletableFuture resultFuture,

String configKey,

Response response,

Type returnType,

long elapsedTime) {

// copied fairly liberally from SynchronousMethodHandler

boolean shouldClose = true;

try {

if (logLevel != Level.NONE) {

response = logger.logAndRebufferResponse(configKey, logLevel, response,

elapsedTime);

}

if (Response.class == returnType) {// 如果请求的结果类型就是Response则直接返回即可

if (response.body() == null) {

resultFuture.complete(response);

} else if (response.body().length() == null

|| response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {

shouldClose = false;

resultFuture.complete(response);

} else {

// Ensure the response body is disconnected

final byte[] bodyData = Util.toByteArray(response.body().asInputStream());

resultFuture.complete(response.toBuilder().body(bodyData).build());

}

} else if (response.status() >= 200 && response.status() < 300) {

// 响应状态为200到300,其实就是正确返回

if (isVoidType(returnType)) {// 如果返回结果为void

resultFuture.complete(null);

} else {// 执行我们的Decoder

final Object result = decode(response, returnType);

shouldClose = closeAfterDecode;

resultFuture.complete(result);

}

} else if (decode404 && response.status() == 404 && !isVoidType(returnType)) {

// 如果是404且不是void,并且decode404的配置为true,则执行Decoder

final Object result = decode(response, returnType);

shouldClose = closeAfterDecode;

resultFuture.complete(result);

} else {

如果decode404为false或者不属于以上的状态码(如5xx异常),则执行errorDecoder

resultFuture.completeExceptionally(errorDecoder.decode(configKey, response));

}

} catch (final IOException e) {

if (logLevel != Level.NONE) {

logger.logIOException(configKey, logLevel, e, elapsedTime);

}

// errorReading 抛出FeignException异常

resultFuture.completeExceptionally(errorReading(response.request(), response, e));

} catch (final Exception e) {

resultFuture.completeExceptionally(e);

} finally {

if (shouldClose) {

ensureClosed(response.body());

}

}

}

ErrorDecoder源码分析

ErrorDecoder只有一个方法

public Exception decode(String methodKey, Response response);

OpenFeign默认使用的ErrorDecoder是ErrorDecoder.Default

ErrorDecoder.Default 是ErrorDecoder的一个内部类

public static class Default implements ErrorDecoder {

// RetryAfterDecoder 也是ErrorDecoder的一个内部类其主要方法apply

private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();

@Override

public Exception decode(String methodKey, Response response) {

// 获取当前的异常类(根据不同的status返回不同的异常类)

FeignException exception = errorStatus(methodKey, response);

// 使用retryAfterDecoder获取响应头信息Retry-After的值

// Retry-After可以是数字,比如100,表示100秒之后重试

// Retry-After也可以是Date类型的字符串,默认格式为new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", US);

// 我们可以自定义ErrorDecoder然后自定义DateFormat来满足我们的日期格式

Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));

if (retryAfter != null) {

// 如果retryAfter不为空,则抛出RetryableException异常,这样就会进入重试

return new RetryableException(

response.status(),

exception.getMessage(),

response.request().httpMethod(),

exception,

retryAfter,

response.request());

}

return exception;

// 如果retryAfter 为空,则直接将请求结果FeignException 异常抛出,而且不会进入重试

}

private T firstOrNull(Map> map, String key) {

if (map.containsKey(key) && !map.get(key).isEmpty()) {

return map.get(key).iterator().next();

}

return null;

}

}

总结

OpenFeign的重试机制基于Retryer接口,其continueOrPropagate方法主要计算重试间隔的时间,以及可以重试的次数。

OpenFeign默认使用Retryer.NEVER_RETRY,不重试。我们可以设置Retryer.Default来开启重试

@Bean

@ConditionalOnMissingBean

public Retryer feignRetryer() {

return new Retryer.Default();// 默认period:100ms,maxPeriod:1000ms,maxAttempts:5

}

有两个地方会进入重试机制

进行Http调用时,client.execute(request, options);如果出现IOException异常,会自动进入重试。出现服务端错误,进入ErrorDecoder后,如果响应设置了响应头Retry-After时才会进行重试。

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式