环境:Spring Cloud 2021.0.7 + Spring Boot 2.7.12

配置依赖

maven依赖

org.springframework.cloud

spring-cloud-starter-openfeign

org.springframework.cloud

spring-cloud-loadbalancer

org.springframework.cloud

spring-cloud-dependencies

${spring-cloud.version}

pom

import

开启注解功能

@SpringBootApplication

// 开启Feign功能,在该注解中你还可以配置,如下3个重要的信息:

// 1. 为所有的FeignClient提供统一默认的配置

// 2. 指定扫描那些包写的类

// 3. 指定有哪些@FeignClient类

@EnableFeignClients

public class AppApplication {

public static void main(String[] args) {

SpringApplication.run(AppApplication.class, args);

}

}

FeignClient生成Bean原理

容器在启动过程中会找到所有@FeignClient的接口类,然后将这些类注册为容器Bean,而每一个Feign客户端对应的是FactoryBean对象FeignClientFactoryBean。

具体如何找这些带有@FeignClient注解的接口类可以查看FeignClientsRegistrar该类就在@EnableFeignClients中被导入。

FeignClientFactoryBean

public class FeignClientFactoryBean implements FactoryBean {

public Object getObject() {

return getTarget();

}

T getTarget() {

FeignContext context = beanFactory != null ? beanFactory.getBean(FeignContext.class) : applicationContext.getBean(FeignContext.class);

Feign.Builder builder = feign(context);

if (!StringUtils.hasText(url)) {

if (!name.startsWith("http")) {

url = "http://" + name;

}

else {

url = name;

}

url += cleanPath();

// 负载均衡处理

return (T) loadBalance(builder, context, new HardCodedTarget<>(type, name, url));

}

// ...

}

protected T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget target) {

// 在OpenFeign中核心实现负载均衡的类就是具体的Client类

// Feign负载均衡能力实现通过具体Client实现,每一个FeignClient客户端都会对应一个子容器AnnotationConfigApplicationContext

// 根据@FeignClient配置的服务名name或value为key,从一个LoadBalancerClientFactory(父类)中的Map中查找该name对应的容器

// 如果不存在则创建一个AnnotationConfigApplicationContext。每个子容器都设置了父容器,如果通过子容器查找不到Client的实现,那么会从父容器中查找

Client client = getOptional(context, Client.class);

}

}

Client实现

Client的具体实现可以有如下:

apache httpclient okhttp default(jdk)

具体使用哪个是根据你环境引入了哪个依赖(httpclient,okhttp)

io.github.openfeign

feign-httpclient

${version}

io.github.openfeign

feign-okhttp

${version}

具体选择通过如下配置

@Import({

HttpClientFeignLoadBalancerConfiguration.class,

OkHttpFeignLoadBalancerConfiguration.class,

HttpClient5FeignLoadBalancerConfiguration.class,

DefaultFeignLoadBalancerConfiguration.class })

public class FeignLoadBalancerAutoConfiguration {

}

如果你的环境有多个实现,那么这里会根据这里的导入顺序加载。这里以最后一个 DefaultFeignLoadBalancerConfiguration为例。

class DefaultFeignLoadBalancerConfiguration {

@Bean

@ConditionalOnMissingBean

// 没有启用spring-retry重试功能

@Conditional(OnRetryNotEnabledCondition.class)

public Client feignClient(LoadBalancerClient loadBalancerClient, LoadBalancerClientFactory loadBalancerClientFactory) {

// 这里构造函数第一个参数将会成为最终执行远程接口调用的实现

return new FeignBlockingLoadBalancerClient(new Client.Default(null, null), loadBalancerClient, loadBalancerClientFactory);

}

}

在没有导入httpclient或者okhttp情况下,使用的Client实现是 FeignBlockingLoadBalancerClient。

负载均衡实现

构造 FeignBlockingLoadBalancerClient传入了负载均衡客户端LoadBalancerClient及负载均衡客户端工厂LoadBalancerClientFactory该工厂是用来创建每一个Feign客户端对应的子容器 AnnotationConfigApplicationContext及从对应子容器获取相应的Bean实例对象,如:Client,Request.Options,Logger.Level等。

public class FeignBlockingLoadBalancerClient implements Client {

// 此Client代理对象是上面的new Client.Default(null, null)

private final Client delegate;

private final LoadBalancerClient loadBalancerClient;

private final LoadBalancerClientFactory loadBalancerClientFactory;

public FeignBlockingLoadBalancerClient(Client delegate, LoadBalancerClient loadBalancerClient, LoadBalancerClientFactory loadBalancerClientFactory) {

this.delegate = delegate;

this.loadBalancerClient = loadBalancerClient;

this.loadBalancerClientFactory = loadBalancerClientFactory;

}

@Override

public Response execute(Request request, Request.Options options) throws IOException {

final URI originalUri = URI.create(request.url());

// 获取服务名serviceId

String serviceId = originalUri.getHost();

String hint = getHint(serviceId);

DefaultRequest lbRequest = new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));

// ...

// 通过负载均衡客户端获取指定serviceId的服务实例

ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);

// ...

// 通过获取到的ServiceInstance实例,重新构造请求地址

String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();

// 重新构建一个新的请求

Request newRequest = buildRequest(request, reconstructedUrl);

LoadBalancerProperties loadBalancerProperties = loadBalancerClientFactory.getProperties(serviceId);

return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse, supportedLifecycleProcessors, loadBalancerProperties.isUseRawStatusCodeInResponseData());

}

protected Request buildRequest(Request request, String reconstructedUrl) {

return Request.create(request.httpMethod(), reconstructedUrl, request.headers(), request.body(),

request.charset(), request.requestTemplate());

}

}

LoadBalancerClient具体实现:​​​​​​​BlockingLoadBalancerClient

public class BlockingLoadBalancerClient implements LoadBalancerClient {

private final ReactiveLoadBalancer.Factory loadBalancerClientFactory;

public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory loadBalancerClientFactory) {

this.loadBalancerClientFactory = loadBalancerClientFactory;

}

public ServiceInstance choose(String serviceId, Request request) {

// 获取一个负载均衡器,默认是轮询策略

ReactiveLoadBalancer loadBalancer = loadBalancerClientFactory.getInstance(serviceId);

if (loadBalancer == null) {

return null;

}

Response loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();

if (loadBalancerResponse == null) {

return null;

}

return loadBalancerResponse.getServer();

}

// 重新构造请求的uri

public URI reconstructURI(ServiceInstance serviceInstance, URI original) {

return LoadBalancerUriTools.reconstructURI(serviceInstance, original);

}

}

public final class LoadBalancerUriTools {

public static URI reconstructURI(ServiceInstance serviceInstance, URI original) {

// ...

return doReconstructURI(serviceInstance, original);

}

private static URI doReconstructURI(ServiceInstance serviceInstance, URI original) {

String host = serviceInstance.getHost();

String scheme = Optional.ofNullable(serviceInstance.getScheme()).orElse(computeScheme(original, serviceInstance));

int port = computePort(serviceInstance.getPort(), scheme);

if (Objects.equals(host, original.getHost()) && port == original.getPort() && Objects.equals(scheme, original.getScheme())) {

return original;

}

boolean encoded = containsEncodedParts(original);

return UriComponentsBuilder.fromUri(original).scheme(scheme).host(host).port(port).build(encoded).toUri();

}

}

轮询算法​​​​​​​

public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {

public Mono> choose(Request request) {

ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider

.getIfAvailable(NoopServiceInstanceListSupplier::new);

return supplier.get(request).next()

.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));

}

private Response processInstanceResponse(ServiceInstanceListSupplier supplier, List serviceInstances) {

Response serviceInstanceResponse = getInstanceResponse(serviceInstances);

if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {

((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());

}

return serviceInstanceResponse;

}

private Response getInstanceResponse(List instances) {

// ...

// 如果只有一个实例

if (instances.size() == 1) {

return new DefaultResponse(instances.get(0));

}

// Ignore the sign bit, this allows pos to loop sequentially from 0 to

// Integer.MAX_VALUE

int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

ServiceInstance instance = instances.get(pos % instances.size());

return new DefaultResponse(instance);

}

}

执行远程调用

接着上面 FeignBlockingLoadBalancerClient#execute方法最终的返回方法执行​​​​​​​

final class LoadBalancerUtils {

static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,

Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,

org.springframework.cloud.client.loadbalancer.Response lbResponse,

Set supportedLifecycleProcessors, boolean useRawStatusCodes) throws IOException {

return executeWithLoadBalancerLifecycleProcessing(feignClient, options, feignRequest, lbRequest, lbResponse, supportedLifecycleProcessors, true, useRawStatusCodes);

}

static Response executeWithLoadBalancerLifecycleProcessing(Client feignClient, Request.Options options,

Request feignRequest, org.springframework.cloud.client.loadbalancer.Request lbRequest,

org.springframework.cloud.client.loadbalancer.Response lbResponse,

Set supportedLifecycleProcessors, boolean loadBalanced, boolean useRawStatusCodes) throws IOException {

// 这里执行生命周期实际调用前动作

try {

// 执行时间的调用,而这里的feignClient就是在FeignBlockingLoadBalancerClient传递过来的,new Client.Default(null, null)

Response response = feignClient.execute(feignRequest, options);

// 这里执行生命周期回调,省略

return response;

}

// ...

}

}

Client.Default

public interface Client {

public Response execute(Request request, Options options) throws IOException {

// 通过JDK自带的网络连接进行处理

HttpURLConnection connection = convertAndSend(request, options);

return convertResponse(connection, request);

}

}

以上就是一个Feign客户端请求的负载均衡实现原理

完毕~!~!

求助三连~~~

 

 

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: