HelloWood

gRPC 负载均衡

2020-09-20

gRPC 负载均衡

gRPC 内定义了 LoadBalancer 接口,用于负载均衡

grpc-source-code-loadbalancer-methods.png

LoadBalancer 中的主要方法

  • handleResolvedAddress:处理 NameResolver 解析的地址,用于创建 Subchannel
  • handleNameResolutionError: 处理命名解析失败,会销毁已经存在的 Suchannel
  • requestConnection: 创建连接,会为 Subchannel 初始化 Transport,并建立连接

grpc-source-code-loadbalancer-sub-class.png

LoadBalancer 接口有多个实现类,如用于代理的 ForwardingLoadBalancer;基于策略的 RoundRobinLoadBalancer,PickFirstLoadBalancer, GrpclbLoadBalancer等;支持扩展功能的HealthCheckingLoadBalancer, GracefulSwitchLoadBalancer

grpc-source-code-loadbalancer-class-diagram.png

LoadBalancer 有多个内部类,用于实现负载均衡

  • Factory: 用于创建 LoadBalancer,通过 LoadBalancerProvider 实现
  • Subchannel: 逻辑连接,一个 Subchannel 内可能包含多个 IP:PORT
  • Helper: 用于创建 LoadBalancerSubchannel
  • SubchannelPicker: Subchannel 选择器,根据不同的策略使用不同的选择方式
  • SubchannelStateListener: Subchannel 状态监听器,当 Subchannel 状态发生变化时及时更新

LoadBalancer 的工作流程是:

  1. 使用 LoadBalancerRegistry 或者 SPI 的方式注册 LoadBalancerProvider
  2. 调用 Channel Builder 的 defaultLoadBalancingPolicy 设置负载均衡策略
  3. ManagedChannelImpl 的构造方法中,创建 Factory
  4. ManagedChannelImpl#exitIdleMode 中创建 LoadBalancer 实例
  5. 将创建的实例作为参数传递给 NameResolverListener
  6. NameResolver 解析服务名称后,最终调用 handleResolvedAddresses 方法,根据不同的策略进行处理
  7. LoadBalancer 根据解析的地址创建 Subchannel
  8. Subchannel调用 requestConnection 方法建立连接

创建 LoadBalancer

  1. 创建 Channel 前注册 Provider
1
LoadBalancerRegistry.getDefaultRegistry().register(new HealthCheckingRoundRobinLoadBalancerProvider());
  1. 创建 Channel 时设置负载均衡策略
1
2
3
ManagedChannelBuilder.forTarget("server")
.defaultLoadBalancingPolicy("round_robin")
.build();
  1. io.grpc.internal.ManagedChannelImpl#ManagedChannelImpl 构造方法中初始化 Factory

Factory 的实现类是 AutoConfiguredLoadBalancerFactory

1
this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
  1. io.grpc.internal.ManagedChannelImpl#exitIdleMode时创建 LoadBalancer 实例
1
2
3
4
5
6
7
// 构建新的 lbHelper
LbHelperImpl lbHelper = new LbHelperImpl();

// 自动配置负载均衡
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);

this.lbHelper = lbHelper;
  1. 创建 LoadBalancer 实例
  • AutoConfiguredLoadBalancer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public AutoConfiguredLoadBalancer newLoadBalancer(Helper helper) {
return new AutoConfiguredLoadBalancer(helper);
}

AutoConfiguredLoadBalancer(Helper helper) {
this.helper = helper;
// 从注册器中获取默认的负载均衡策略提供器
delegateProvider = registry.getProvider(defaultPolicy);
if (delegateProvider == null) {
throw new IllegalStateException("Could not find policy '" + defaultPolicy
+ "'. Make sure its implementation is either registered to LoadBalancerRegistry or"
+ " included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files.");
}
// 创建新的
delegate = delegateProvider.newLoadBalancer(helper);
}
  • 实现类 io.grpc.services.internal.HealthCheckingRoundRobinLoadBalancerProvider#newLoadBalancer
1
2
3
public LoadBalancer newLoadBalancer(Helper helper) {
return HealthCheckingLoadBalancerUtil.newHealthCheckingLoadBalancer(rrProvider, helper);
}
  • io.grpc.services.HealthCheckingLoadBalancerUtil#newHealthCheckingLoadBalancer
1
2
3
4
5
6
7
8
9
public static LoadBalancer newHealthCheckingLoadBalancer(Factory factory, Helper helper) {
// 创建工厂
HealthCheckingLoadBalancerFactory hcFactory = new HealthCheckingLoadBalancerFactory(factory,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);

// 使用工厂创建 LoadBalancer
return hcFactory.newLoadBalancer(helper);
}
  • io.grpc.services.HealthCheckingLoadBalancerFactory#newLoadBalancer
1
2
3
4
5
6
7
public LoadBalancer newLoadBalancer(Helper helper) {
// 代理 Helper
HelperImpl wrappedHelper = new HelperImpl(helper);
// 创建 LoadBalancer
LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper);
return new HealthCheckingLoadBalancer(wrappedHelper, delegateBalancer);
}
  1. LoadBalancer 实例作为参数传递给 NameResolverListener
  • io.grpc.internal.ManagedChannelImpl#exitIdleMode
1
2
3
// 服务发现监听器
NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
nameResolver.start(listener);

负载均衡

根据 NameResolver 解析的地址,创建相应的 Subchannel,在 RPC 请求时根据策略和状态选择其中的一个发起请求

处理解析的地址

  • io.grpc.internal.ManagedChannelImpl.NameResolverListener#onResult

根据解析的结果,获取配置,如果有配置健康检查,则添加健康检查的属性,用于 LB 在连接前进行检查

然后构建参数,调用 io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer#tryHandleResolvedAddresses 方法处理地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() {

List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();

nameResolverBackoffPolicy = null;
// ...
ManagedChannelServiceConfig effectiveServiceConfig;
effectiveServiceConfig = defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;

// 获取属性
Attributes effectiveAttrs = resolutionResult.getAttributes();
// 如果服务发现没有关闭
if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
// 获取健康检查
Map<String, ?> healthCheckingConfig = effectiveServiceConfig.getHealthCheckingConfig();
// 构建健康检查配置
if (healthCheckingConfig != null) {
effectiveAttrs = effectiveAttrs.toBuilder()
.set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
.build();
}

// 更新负载均衡算法,处理未处理的请求
Status handleResult = helper.lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(effectiveAttrs)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());

if (!handleResult.isOk()) {
handleErrorInSyncContext(handleResult.augmentDescription(resolver + " was used"));
}
}
}

由 LB 处理地址

  • io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer#tryHandleResolvedAddresses

先从解析的结果中获取 LoadBalancerProvider,如果不存在,则使用默认的的;
然后获取被代理的 LoadBalancer,调用 handleResolvedAddresses 方法,由具体的 LB 进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Status tryHandleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Attributes attributes = resolvedAddresses.getAttributes();

// 负载均衡选择
PolicySelection policySelection = (PolicySelection) resolvedAddresses.getLoadBalancingPolicyConfig();

if (policySelection == null) {
LoadBalancerProvider defaultProvider;
// 更新负载均衡提供器
defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy");
policySelection = new PolicySelection(defaultProvider, null, null);
}

Object lbConfig = policySelection.config;

// 设置负载均衡算法参数
if (lbConfig != null) {
attributes = attributes.toBuilder()
.set(ATTR_LOAD_BALANCING_CONFIG, policySelection.rawConfig)
.build();
}

// 负载均衡器
LoadBalancer delegate = getDelegate();
// 如果地址是空的,或者处理失败,则返回错误
if (resolvedAddresses.getAddresses().isEmpty() && !delegate.canHandleEmptyAddressListFromNameResolution()) {
return Status.UNAVAILABLE.withDescription("NameResolver returned no usable address. addrs=" + servers + ", attrs=" + attributes);
} else {
// 返回处理成功
delegate.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(resolvedAddresses.getAddresses())
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(lbConfig)
.build());
return Status.OK;
}
}
  • io.grpc.services.HealthCheckingLoadBalancerFactory.HealthCheckingLoadBalancer#handleResolvedAddresses

根据配置,获取健康检查的服务名称,然后遍历进行检查
然后调用 io.grpc.util.RoundRobinLoadBalancer#handleResolvedAddresses 进行处理

1
2
3
4
5
6
7
8
9
10
11
12
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// 获取健康检查配置
Map<String, ?> healthCheckingConfig = resolvedAddresses.getAttributes()
.get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
// 获取服务的健康检查配置
String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);

// 配置服务健康检查
helper.setHealthCheckedService(serviceName);
// 调用被代理的类处理地址
super.handleResolvedAddresses(resolvedAddresses);
}
  • io.grpc.util.RoundRobinLoadBalancer#handleResolvedAddresses

在处理地址时,根据现有的地址和新的地址,筛选出需要移除的地址;
然后遍历有效的地址,判断是否已经存在,如果存在,则更新地址集合;如果不存,则调用 io.grpc.services.HealthCheckingLoadBalancerFactory.HelperImpl#createSubchannel 创建 Subchannel,启动 SubchannelStateListener,监听 Subchannel 状态变化;并调用 io.grpc.internal.ManagedChannelImpl.SubchannelImpl#requestConnection要求建立连接

将需要移除的 Subchannel 从集合中移除,更新 LB 状态,并关闭要移除的 Subchannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {

// 获取地址列表
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();

// 当前的地址
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet();

// 将地址 List 转为 Map
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers);

// 根据当前的地址,获取需要移除的地址,返回的地址是现有地址中有,新的地址中没有的
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());


for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry : latestAddrs.entrySet()) {

// 不含 Attributes 的 EquivalentAddressGroup
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey();
// 包含 Attributes 的 EquivalentAddressGroup
EquivalentAddressGroup originalAddressGroup = latestEntry.getValue();

// 根据地址获取对应的已经存在的 Subchannel
Subchannel existingSubchannel = subchannels.get(strippedAddressGroup);

// 如果存在已有的 Subchannel,则更新地址并跳出
if (existingSubchannel != null) {
// EAG's Attributes may have changed.
// 更新地址
existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup));
continue;
}
// 根据地址创建新的 Subchannel
// Create new subchannels for new addresses.

// 设置新的连接状态是 IDLE
Attributes.Builder subchannelAttrs = Attributes.newBuilder()
.set(STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE)));

// 创建新 Subchannel
final Subchannel subchannel = checkNotNull(helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(originalAddressGroup)
.setAttributes(subchannelAttrs.build())
.build()),
"subchannel");

// 启动 Subchannel 状态监听器
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
// 处理状态变化
processSubchannelState(subchannel, state);
}
});

// 将新创建的 Subchannel 放在 Subchannel 的 Map 中
subchannels.put(strippedAddressGroup, subchannel);
// 要求建立连接
subchannel.requestConnection();
}

// 移除不包含的地址
ArrayList<Subchannel> removedSubchannels = new ArrayList<>();
for (EquivalentAddressGroup addressGroup : removedAddrs) {
removedSubchannels.add(subchannels.remove(addressGroup));
}

// 在关闭 Subchannel 之前更新 picker,减少关闭期间的风险
updateBalancingState();

// 关闭被移除的 Subchannel
for (Subchannel removedSubchannel : removedSubchannels) {
shutdownSubchannel(removedSubchannel);
}
}

在请求时做负载均衡

  • io.grpc.internal.ClientCallImpl#startInternal

在执行 RPC 请求时,调用 io.grpc.internal.ClientCallImpl#start,在获取 ClientTransport 时,创建 PickSubchannelArgsImpl,通过选择 Subchannel 获取 Transport

1
ClientTransport transport = clientTransportProvider.get(new PickSubchannelArgsImpl(method, headers, callOptions));
  • io.grpc.internal.ManagedChannelImpl.ChannelTransportProvider#get

这个方法里,根据状态获取 Transport,如果当前的状态是关闭,则直接返回延迟执行的 Transport
如果 Picker 是空的,则说明还没有执行过,则调用 exitIdleMode 退出空闲模式,并返回延迟执行的Transport;
如果 Picker 已经初始化了,则调用 io.grpc.util.RoundRobinLoadBalancer.ReadyPicker#pickSubchannel选择 Subchannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
// 如果是关闭状态,则停止调用
if (shutdown.get()) {
return delayedTransport;
}

// 如果是 SubchannelPicker 是空的,则退出 idle 模模式,返回 delayedTransport
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
// 退出 idle 模式,将会创建 LoadBalancer,NameResovler
exitIdleMode();
}
}

syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
// 选择某个 SubChannel 发起调用,即选择某个服务端
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, args.getCallOptions().isWaitForReady());
// 如果有 Transport,则返回
if (transport != null) {
return transport;
}
return delayedTransport;
}
  • io.grpc.util.RoundRobinLoadBalancer.ReadyPicker#pickSubchannel

获取下一个 Subchannel 并返回

1
2
3
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(nextSubchannel());
}
  • io.grpc.util.RoundRobinLoadBalancer.ReadyPicker#nextSubchannel

通过轮询的算法获取下一个 Subchannel

1
2
3
4
5
6
7
8
9
10
private Subchannel nextSubchannel() {
int size = list.size();
int i = indexUpdater.incrementAndGet(this);
if (i >= size) {
int oldi = i;
i %= size;
indexUpdater.compareAndSet(this, oldi, i);
}
return list.get(i);
}
Tags: gRPC