gRPC Client 启动流程
gRPC 启动初始化的流程,使用 Netty 作为底层的实现
初始化 Channel
Channel 的初始化通过 ChannelBuilder 构建
这里通过 forTarget 设置了要解析的服务名称,会通过 NameResolver 解析,转换为具体的地址
1 2 3
| ManagedChannel channel = ManagedChannelBuilder.forTarget("grpc-server") .usePlaintext() .build();
|
io.grpc.internal.AbstractManagedChannelImplBuilder#build
调用 build 时,会根据 builder 中的属性,创建 ManagedChannelImpl 的实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public ManagedChannel build() { return new ManagedChannelOrphanWrapper(new ManagedChannelImpl( this, buildTransportFactory(), new ExponentialBackoffPolicy.Provider(), SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR), GrpcUtil.STOPWATCH_SUPPLIER, getEffectiveInterceptors(), TimeProvider.SYSTEM_TIME_PROVIDER)); }
|
初始化 Channel 属性
- io.grpc.internal.ManagedChannelImpl#ManagedChannelImpl
为 ManagedChannel 设置属性,初始化服务发现,负载均衡,拦截器等并创建真正的 Channel
1 2 3 4 5 6 7
| ManagedChannelImpl(AbstractManagedChannelImplBuilder<?> builder, ClientTransportFactory clientTransportFactory, BackoffPolicy.Provider backoffPolicyProvider, ObjectPool<? extends Executor> balancerRpcExecutorPool, Supplier<Stopwatch> stopwatchSupplier, List<ClientInterceptor> interceptors, final TimeProvider timeProvider)
|
设置服务名称
1
| this.target = checkNotNull(builder.target, "target");
|
设置 TransportFactory
创建了支持鉴权的代理的 TransportFactory,用于支持向服务端发起请求进行鉴权
1
| this.transportFactory = new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
|
构建服务发现工厂
1
| this.nameResolverFactory = builder.getNameResolverFactory();
|
- io.grpc.internal.AbstractManagedChannelImplBuilder#getNameResolverFactory
如果没有覆盖服务名称,则使用这个 nameResolverFactory,否则使用 OverrideAuthorityNameResolverFactory
1 2 3 4 5 6 7
| NameResolver.Factory getNameResolverFactory() { if (authorityOverride == null) { return nameResolverFactory; } else { return new OverrideAuthorityNameResolverFactory(nameResolverFactory, authorityOverride); } }
|
构建服务发现实例
1
| this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
|
- io.grpc.internal.ManagedChannelImpl#getNameResolver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| static NameResolver getNameResolver(String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) { URI targetUri = null; StringBuilder uriSyntaxErrors = new StringBuilder(); targetUri = new URI(target);
if (targetUri != null) { NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs); if (resolver != null) { return resolver; } }
if (!URI_PATTERN.matcher(target).matches()) { targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs); if (resolver != null) { return resolver; } } throw new IllegalArgumentException(String.format("cannot find a NameResolver for %s%s", target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : "")); }
|
设置是否开启重试
根据是否主动开启了重试和禁止重试的开关决定是否要重试
1
| this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
|
设置负载均衡
根据负载均衡策略构建 LoadBalancerFactory,会从注册器中获取并初始化负载均衡实例
1
| this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
|
配置信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| ScParser serviceConfigParser = new ScParser( retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts, loadBalancerFactory, channelLogger);
serviceConfigInterceptor = new ServiceConfigInterceptor(retryEnabled);
if (builder.defaultServiceConfig != null) { ConfigOrError parsedDefaultServiceConfig = serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig); this.defaultServiceConfig = (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig(); this.lastServiceConfig = this.defaultServiceConfig; } else { this.defaultServiceConfig = null; }
this.lookUpServiceConfig = builder.lookUpServiceConfig;
if (!lookUpServiceConfig) { if (defaultServiceConfig != null) { channelLogger.log(ChannelLogLevel.INFO, "Service config look-up disabled, using default service config"); } handleServiceConfigUpdate(); }
|
创建 RealChannel
RealChannel 用于发起请求
1
| Channel channel = new RealChannel(nameResolver.getServiceAuthority());
|
- io.grpc.internal.ManagedChannelImpl.RealChannel#newCall
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) { return new ClientCallImpl<>(method, getCallExecutor(callOptions), callOptions, transportProvider, terminated ? null : transportFactory.getScheduledExecutorService(), channelCallTracer, retryEnabled) .setFullStreamDecompression(fullStreamDecompression) .setDecompressorRegistry(decompressorRegistry) .setCompressorRegistry(compressorRegistry); }
|
拦截器
1 2
| channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
|
除此之外,还初始化了一些其他的 Channel 的配置和属性,当构建 Channel 完成后,就可以使用 Channel 构建 Stub,用于发起请求