gRPC 负载均衡 gRPC 内定义了 LoadBalancer 接口,用于负载均衡
LoadBalancer 中的主要方法
handleResolvedAddress:处理 NameResolver 解析的地址,用于创建 Subchannel
handleNameResolutionError: 处理命名解析失败,会销毁已经存在的 Suchannel
requestConnection: 创建连接,会为 Subchannel 初始化 Transport,并建立连接
LoadBalancer 接口有多个实现类,如用于代理的 ForwardingLoadBalancer;基于策略的 RoundRobinLoadBalancer,PickFirstLoadBalancer, GrpclbLoadBalancer等;支持扩展功能的HealthCheckingLoadBalancer, GracefulSwitchLoadBalancer 等
LoadBalancer 有多个内部类,用于实现负载均衡
Factory: 用于创建 LoadBalancer,通过 LoadBalancerProvider 实现
Subchannel: 逻辑连接,一个 Subchannel 内可能包含多个 IP:PORT
Helper: 用于创建 LoadBalancer、Subchannel 等
SubchannelPicker: Subchannel 选择器,根据不同的策略使用不同的选择方式
SubchannelStateListener: Subchannel 状态监听器,当 Subchannel 状态发生变化时及时更新
LoadBalancer 的工作流程是:
使用 LoadBalancerRegistry 或者 SPI 的方式注册 LoadBalancerProvider
调用 Channel Builder 的 defaultLoadBalancingPolicy 设置负载均衡策略
在 ManagedChannelImpl 的构造方法中,创建 Factory
在 ManagedChannelImpl#exitIdleMode 中创建 LoadBalancer 实例
将创建的实例作为参数传递给 NameResolverListener
当 NameResolver 解析服务名称后,最终调用 handleResolvedAddresses 方法,根据不同的策略进行处理
LoadBalancer 根据解析的地址创建 Subchannel
Subchannel调用 requestConnection 方法建立连接
创建 LoadBalancer
创建 Channel 前注册 Provider
1 LoadBalancerRegistry.getDefaultRegistry().register(new HealthCheckingRoundRobinLoadBalancerProvider());
创建 Channel 时设置负载均衡策略
1 2 3 ManagedChannelBuilder.forTarget("server" ) .defaultLoadBalancingPolicy("round_robin" ) .build();
在 io.grpc.internal.ManagedChannelImpl#ManagedChannelImpl 构造方法中初始化 Factory
Factory 的实现类是 AutoConfiguredLoadBalancerFactory
1 this .loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
在 io.grpc.internal.ManagedChannelImpl#exitIdleMode时创建 LoadBalancer 实例
1 2 3 4 5 6 7 LbHelperImpl lbHelper = new LbHelperImpl(); lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); this .lbHelper = lbHelper;
创建 LoadBalancer 实例
AutoConfiguredLoadBalancer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public AutoConfiguredLoadBalancer newLoadBalancer (Helper helper) { return new AutoConfiguredLoadBalancer(helper); } AutoConfiguredLoadBalancer(Helper helper) { this .helper = helper; delegateProvider = registry.getProvider(defaultPolicy); if (delegateProvider == null ) { throw new IllegalStateException("Could not find policy '" + defaultPolicy + "'. Make sure its implementation is either registered to LoadBalancerRegistry or" + " included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files." ); } delegate = delegateProvider.newLoadBalancer(helper); }
实现类 io.grpc.services.internal.HealthCheckingRoundRobinLoadBalancerProvider#newLoadBalancer
1 2 3 public LoadBalancer newLoadBalancer (Helper helper) { return HealthCheckingLoadBalancerUtil.newHealthCheckingLoadBalancer(rrProvider, helper); }
io.grpc.services.HealthCheckingLoadBalancerUtil#newHealthCheckingLoadBalancer
1 2 3 4 5 6 7 8 9 public static LoadBalancer newHealthCheckingLoadBalancer (Factory factory, Helper helper) { HealthCheckingLoadBalancerFactory hcFactory = new HealthCheckingLoadBalancerFactory(factory, new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER); return hcFactory.newLoadBalancer(helper); }
io.grpc.services.HealthCheckingLoadBalancerFactory#newLoadBalancer
1 2 3 4 5 6 7 public LoadBalancer newLoadBalancer (Helper helper) { HelperImpl wrappedHelper = new HelperImpl(helper); LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper); return new HealthCheckingLoadBalancer(wrappedHelper, delegateBalancer); }
将 LoadBalancer 实例作为参数传递给 NameResolverListener
io.grpc.internal.ManagedChannelImpl#exitIdleMode
1 2 3 NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver); nameResolver.start(listener);
负载均衡 根据 NameResolver 解析的地址,创建相应的 Subchannel,在 RPC 请求时根据策略和状态选择其中的一个发起请求
处理解析的地址
io.grpc.internal.ManagedChannelImpl.NameResolverListener#onResult
根据解析的结果,获取配置,如果有配置健康检查,则添加健康检查的属性,用于 LB 在连接前进行检查
然后构建参数,调用 io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer#tryHandleResolvedAddresses 方法处理地址
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public void run () { List<EquivalentAddressGroup> servers = resolutionResult.getAddresses(); nameResolverBackoffPolicy = null ; ManagedChannelServiceConfig effectiveServiceConfig; effectiveServiceConfig = defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig; Attributes effectiveAttrs = resolutionResult.getAttributes(); if (NameResolverListener.this .helper == ManagedChannelImpl.this .lbHelper) { Map<String, ?> healthCheckingConfig = effectiveServiceConfig.getHealthCheckingConfig(); if (healthCheckingConfig != null ) { effectiveAttrs = effectiveAttrs.toBuilder() .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig) .build(); } Status handleResult = helper.lb.tryHandleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(servers) .setAttributes(effectiveAttrs) .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig()) .build()); if (!handleResult.isOk()) { handleErrorInSyncContext(handleResult.augmentDescription(resolver + " was used" )); } } }
由 LB 处理地址
io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer#tryHandleResolvedAddresses
先从解析的结果中获取 LoadBalancerProvider,如果不存在,则使用默认的的; 然后获取被代理的 LoadBalancer,调用 handleResolvedAddresses 方法,由具体的 LB 进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Status tryHandleResolvedAddresses (ResolvedAddresses resolvedAddresses) { List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses(); Attributes attributes = resolvedAddresses.getAttributes(); PolicySelection policySelection = (PolicySelection) resolvedAddresses.getLoadBalancingPolicyConfig(); if (policySelection == null ) { LoadBalancerProvider defaultProvider; defaultProvider = getProviderOrThrow(defaultPolicy, "using default policy" ); policySelection = new PolicySelection(defaultProvider, null , null ); } Object lbConfig = policySelection.config; if (lbConfig != null ) { attributes = attributes.toBuilder() .set(ATTR_LOAD_BALANCING_CONFIG, policySelection.rawConfig) .build(); } LoadBalancer delegate = getDelegate(); if (resolvedAddresses.getAddresses().isEmpty() && !delegate.canHandleEmptyAddressListFromNameResolution()) { return Status.UNAVAILABLE.withDescription("NameResolver returned no usable address. addrs=" + servers + ", attrs=" + attributes); } else { delegate.handleResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(resolvedAddresses.getAddresses()) .setAttributes(attributes) .setLoadBalancingPolicyConfig(lbConfig) .build()); return Status.OK; } }
io.grpc.services.HealthCheckingLoadBalancerFactory.HealthCheckingLoadBalancer#handleResolvedAddresses
根据配置,获取健康检查的服务名称,然后遍历进行检查 然后调用 io.grpc.util.RoundRobinLoadBalancer#handleResolvedAddresses 进行处理
1 2 3 4 5 6 7 8 9 10 11 12 public void handleResolvedAddresses (ResolvedAddresses resolvedAddresses) { Map<String, ?> healthCheckingConfig = resolvedAddresses.getAttributes() .get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG); String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig); helper.setHealthCheckedService(serviceName); super .handleResolvedAddresses(resolvedAddresses); }
io.grpc.util.RoundRobinLoadBalancer#handleResolvedAddresses
在处理地址时,根据现有的地址和新的地址,筛选出需要移除的地址; 然后遍历有效的地址,判断是否已经存在,如果存在,则更新地址集合;如果不存,则调用 io.grpc.services.HealthCheckingLoadBalancerFactory.HelperImpl#createSubchannel 创建 Subchannel,启动 SubchannelStateListener,监听 Subchannel 状态变化;并调用 io.grpc.internal.ManagedChannelImpl.SubchannelImpl#requestConnection要求建立连接
将需要移除的 Subchannel 从集合中移除,更新 LB 状态,并关闭要移除的 Subchannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public void handleResolvedAddresses (ResolvedAddresses resolvedAddresses) { List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses(); Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet(); Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers); Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet()); for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry : latestAddrs.entrySet()) { EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey(); EquivalentAddressGroup originalAddressGroup = latestEntry.getValue(); Subchannel existingSubchannel = subchannels.get(strippedAddressGroup); if (existingSubchannel != null ) { existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup)); continue ; } Attributes.Builder subchannelAttrs = Attributes.newBuilder() .set(STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE))); final Subchannel subchannel = checkNotNull(helper.createSubchannel(CreateSubchannelArgs.newBuilder() .setAddresses(originalAddressGroup) .setAttributes(subchannelAttrs.build()) .build()), "subchannel" ); subchannel.start(new SubchannelStateListener() { @Override public void onSubchannelState (ConnectivityStateInfo state) { processSubchannelState(subchannel, state); } }); subchannels.put(strippedAddressGroup, subchannel); subchannel.requestConnection(); } ArrayList<Subchannel> removedSubchannels = new ArrayList<>(); for (EquivalentAddressGroup addressGroup : removedAddrs) { removedSubchannels.add(subchannels.remove(addressGroup)); } updateBalancingState(); for (Subchannel removedSubchannel : removedSubchannels) { shutdownSubchannel(removedSubchannel); } }
在请求时做负载均衡
io.grpc.internal.ClientCallImpl#startInternal
在执行 RPC 请求时,调用 io.grpc.internal.ClientCallImpl#start,在获取 ClientTransport 时,创建 PickSubchannelArgsImpl,通过选择 Subchannel 获取 Transport
1 ClientTransport transport = clientTransportProvider.get(new PickSubchannelArgsImpl(method, headers, callOptions));
io.grpc.internal.ManagedChannelImpl.ChannelTransportProvider#get
这个方法里,根据状态获取 Transport,如果当前的状态是关闭,则直接返回延迟执行的 Transport; 如果 Picker 是空的,则说明还没有执行过,则调用 exitIdleMode 退出空闲模式,并返回延迟执行的Transport; 如果 Picker 已经初始化了,则调用 io.grpc.util.RoundRobinLoadBalancer.ReadyPicker#pickSubchannel选择 Subchannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public ClientTransport get (PickSubchannelArgs args) { SubchannelPicker pickerCopy = subchannelPicker; if (shutdown.get()) { return delayedTransport; } if (pickerCopy == null ) { final class ExitIdleModeForTransport implements Runnable { @Override public void run () { exitIdleMode(); } } syncContext.execute(new ExitIdleModeForTransport()); return delayedTransport; } PickResult pickResult = pickerCopy.pickSubchannel(args); ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, args.getCallOptions().isWaitForReady()); if (transport != null ) { return transport; } return delayedTransport; }
io.grpc.util.RoundRobinLoadBalancer.ReadyPicker#pickSubchannel
获取下一个 Subchannel 并返回
1 2 3 public PickResult pickSubchannel (PickSubchannelArgs args) { return PickResult.withSubchannel(nextSubchannel()); }
io.grpc.util.RoundRobinLoadBalancer.ReadyPicker#nextSubchannel
通过轮询的算法获取下一个 Subchannel
1 2 3 4 5 6 7 8 9 10 private Subchannel nextSubchannel () { int size = list.size(); int i = indexUpdater.incrementAndGet(this ); if (i >= size) { int oldi = i; i %= size; indexUpdater.compareAndSet(this , oldi, i); } return list.get(i); }