HelloWood

gRPC Transport

2020-10-22

gRPC Transport

Transport 分为 ClientTransportServerTransport,分别用于客户端和服务端

grpc-source-code-transport-classes-diagram.png

ClientTransport

ClientTransport 与真正的 IP 地址是一一对应的,用于建立连接,创建流

实现

ClientTransport 有多个继承接口和实现类:

  • 支持失败的实现类 FailingClientTransport,客户端启动时创建的流会快速失败
  • 支持生命周期管理的接口 ManagedClientTransport
    • 支持延迟处理的实现类 DelayedClientTransport
    • 基于连接的接口 ConnectionClientTransport
      • 基于 Netty 实现的 NettyClientTransport
      • 基于 OkHttp 实现的 OkHttpClientTransport
      • 用于进程内处理请求 InProcessTransport
      • 支持代理的抽象实现类 ForwardingConnectionClientTransport
        • 支持授权检查的实现类 CallCredentialsApplyingTransportFactory
        • 用于统计的 CallTracingTransport

方法

  • ClientTransport#newStream

创建新的流,用于给远程服务端发送消息

1
ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions);
  • ClientTransport#ping

ping 远程的端点,当收到 ack 之后,会使用所给的 Executor 执行回调

1
void ping(PingCallback callback, Executor executor);
  • ManagedClientTransport#start

启动 Transport,尝试建立连接,并启动监听器

1
Runnable start(Listener listener);

监听器

Transport 的 Listener 用于监听 Transport 事件,进行相应的处理

  • 监听就绪事件
1
void transportReady();
  • 监听使用中事件
1
void transportInUse(boolean inUse);
  • 监听关闭事件
1
void transportShutdown(Status s);
  • 监听终止事件
1
void transportTerminated();

ClientTransport 流程

Transport 在 Channel 退出空闲模式时会被创建,然后通过 start 方法启动,建立连接,当连接成功后触发 ready 回调,然后更新 LB 状态为 READY,然后可以执行发送请求到服务端

1. 创建 Transport

有两个地方可以创建 Transport:

  • LB 通过 handleResolvedAddresses处理地址时通过ManagedChannelImpl.SubchannelImpl#requestConnection建立连接,会创建 Transport
  • ClientCallImpl#start 时,通过 ClientTransportProvider#get 方法获取 Transport

这两个方法最终在 InternalSubchannel#obtainActiveTransport 中调用 startNewTransport 执行创建

启动 Transport

io.grpc.internal.InternalSubchannel#startNewTransport

1
2
3
4
5
6
7
8
9
10
11
12
private void startNewTransport() {
// 获取当前地址
SocketAddress address = addressIndex.getCurrentAddress();
// 创建 Transport,并使用 CallTracingTransport 封装
ConnectionClientTransport transport = new CallTracingTransport(transportFactory.newClientTransport(address, options, transportLogger), callsTracer);

// 创建 Transport 监听器,建立连接
Runnable runnable = transport.start(new TransportListener(transport, address));
if (runnable != null) {
syncContext.executeLater(runnable);
}
}
执行创建 Transport

io.grpc.netty.NettyChannelBuilder.NettyTransportFactory#newClientTransport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public ConnectionClientTransport newClientTransport(SocketAddress serverAddress,
ClientTransportOptions options,
ChannelLogger channelLogger) {
// 存活时间
final AtomicBackoff.State keepAliveTimeNanosState = keepAliveTimeNanos.getState();
Runnable tooManyPingsRunnable = new Runnable() {
@Override
public void run() {
keepAliveTimeNanosState.backoff();
}
};

// 构建 Netty 的 Transport
NettyClientTransport transport = new NettyClientTransport(/*..*/);
return transport;
}

2. 启动监听器,建立连接

io.grpc.netty.NettyClientTransport#start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Runnable start(Listener transportListener) {
// 创建 ClientTransportLifecycleManager
lifecycleManager = new ClientTransportLifecycleManager(Preconditions.checkNotNull(transportListener, "listener"));
// 建立连接
SocketAddress localAddress = localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
if (localAddress != null) {
channel.connect(remoteAddress, localAddress);
} else {
channel.connect(remoteAddress);
}

// 开始 keepAlive 监听
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
}

return null;
}
触发 ready 回调

当 Netty 的 Handler 第一次接收到返回的消息时触发回调

  • io.grpc.netty.NettyClientHandler.FrameListener#onSettingsRead
1
2
3
4
5
6
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
if (firstSettings) {
firstSettings = false;
lifecycleManager.notifyReady();
}
}
  • io.grpc.netty.ClientTransportLifecycleManager#notifyReady
1
2
3
4
5
6
7
8
public void notifyReady() {
// 如果已经是 Ready 或者处于 SHUTDOWN 状态,则返回
if (transportReady || transportShutdown) {
return;
}
transportReady = true;
listener.transportReady();
}
  • io.grpc.internal.InternalSubchannel.TransportListener#transportReady
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void transportReady() {
syncContext.execute(new Runnable() {
@Override
public void run() {
reconnectPolicy = null;
// 如果是 SHUTDOWN 则关闭 Transport
if (shutdownReason != null) {
transport.shutdown(shutdownReason);
} else if (pendingTransport == transport) {
activeTransport = transport;
pendingTransport = null;
// 如果是等待 READY,则更新状态为 READY
gotoNonErrorState(READY);
}
}
});
}

最终由 LoadBalancer 处理,将 Subchannel 的连接状态改为 READY,返回非空的 PickResult,这样就可以执行向 Server 端发送的请求

Tags: gRPC