HelloWood

gRPC Stream

2020-11-08

gRPC Stream

Stream 在 gRPC 中代表一个真正的请求,包含要发送的 消息;Stream 分为 ClientStreamServerStream

grpc-source-code-stream-diagram.png

ClientStream

ClientStream 接口继承 Stream 接口,有多个实现类或抽象实现类:

  • ForwardingClientStream: 用于转发的 ClientStream,支持代理真正的流,可以用于触发一些动作,如统计等
  • NoopClientStream: 不做任何操作的 ClientStream,用于空实现
    • FailingClientStream: 用于失败的 ClientStream,处理请求失败的场景
  • InProcessClientStream: 进程内的 ClientStream,用于测试,不会发出实际请求
  • AbstractClientStream: ClientStream 的抽象实现类,实现了部分基础操作,如发送header,写入消息,半关闭等
    • NettyClientStream: 基于 Netty 实现的 ClientStream,实现了基于 Netty 的帧操作等
    • OkHttpClientStream: 基于 OkHttp 实现的 ClientStream,实现了基于 OkHttp 的帧操作等

方法

  • start

开始一个新的流,对于每一个流,只能调用一次

1
void start(ClientStreamListener listener);
  • halfClose

从客户端关闭流,当关闭后,不能发送更多的消息,但是可以接收消息,只能调用一次

1
void halfClose();
  • cancel

异常终止流,当调用后不会再发送和接收消息,只有在 start 之后才可以被调用

1
void cancel(Status reason);
  • setDeadline

设置请求有效截止时间,过了这个时间之后就会终止请求执行

1
void setDeadline(@Nonnull Deadline deadline);
  • getAttributes

获取流的属性

1
Attributes getAttributes();

监听器

ClientStreamListener 用于监听客户端流的事件

  • onReady

当接收得此事件说明 Transport 已经准备好发送附加消息了

1
void onReady();
  • messagesAvailable

当远程端点接收到消息时调用

1
void messagesAvailable(MessageProducer producer);
  • headersRead

当收到服务端返回的 header 时调用,如果没有 header 返回,则这个方法不会被调用

1
void headersRead(Metadata headers);
  • closed

当流被关闭时调用

1
2
3
void closed(Status status, Metadata trailers);

void closed(Status status, RpcProgress rpcProgress, Metadata trailers);

ClientStream 流程

发起请求

生成的代码中通过 blockingUnaryCall 开始发起请求

  • io.grpc.stub.ClientCalls#blockingUnaryCall
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static <ReqT, RespT> RespT blockingUnaryCall(Channel channel,
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
ReqT req) {
// 构建任务队列和单线程的线程池
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
// 创建新的调用的 ClientCall,指定了调用类型和执行器
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
try {
// 执行调用,发出请求
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
interrupt = true;
call.cancel("Thread interrupted", e);
}
}
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e);
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}

创建 ClientCall 后调用 futureUnaryCall 开始发起请求,并返回用于获取结果的 ListenableFuture

1
2
3
4
5
6
7
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
// 初始化 GrpcFuture
GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
// 将 GrpcFuture 包装为继承了 Listener 的 UnaryStreamToFuture,提交任务
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
return responseFuture;
}
  • io.grpc.stub.ClientCalls#asyncUnaryRequestCall
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static <ReqT, RespT> void asyncUnaryRequestCall(ClientCall<ReqT, RespT> call,
ReqT req,
StartableListener<RespT> responseListener) {
// 开始调用
startCall(call, responseListener);
try {
// 发送消息,提交 BufferEntry 任务
call.sendMessage(req);
// 从客户端关闭流
call.halfClose();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e);
}
}

创建 ClientStream

当调用了 io.grpc.internal.ClientCallImpl#start 方法后,会创建客户端流;
如果已经过了超时时间,则会使用 DEADLINE_EXECEEDED 状态创建 FailingClientStream;如果还为超时,则根据是否开启了重试创建相应的流

  • io.grpc.internal.ClientCallImpl#startInternal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 最后期限
Deadline effectiveDeadline = effectiveDeadline();
boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();
// 如果没有过期
if (!deadlineExceeded) {
// 如果打开了重试,则创建重试流
if (retryEnabled) {
stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context);
} else {
// 根据获取 ClientTransport
ClientTransport transport = clientTransportProvider.get(new PickSubchannelArgsImpl(method, headers, callOptions));
// 创建流
stream = transport.newStream(method, headers, callOptions);
}
} else {
// 初始化超时失败的流
stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription("ClientCall started after deadline exceeded: " + effectiveDeadline));
}

然后根据不同的实现,在 Transport 内创建流

  • io.grpc.netty.NettyClientTransport#newStream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ClientStream newStream(MethodDescriptor<?, ?> method,
Metadata headers,
CallOptions callOptions) {

// 如果 channel 是空的,则返回失败的 ClientStream
if (channel == null) {
return new FailingClientStream(statusExplainingWhyTheChannelIsNull);
}

StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, getAttributes(), headers);

// 创建 NettyClientStream
return new NettyClientStream(/**/);
}

开始流

开始流,并指定监听器

  • io.grpc.internal.AbstractClientStream#start
1
2
3
4
5
6
7
8
public final void start(ClientStreamListener listener) {
transportState().setListener(listener);
// 如果不是 GET 请求,则发送 Header
if (!useGet) {
abstractClientStreamSink().writeHeaders(headers, null);
headers = null;
}
}

如果不是 GET 方法的请求,要先写入 Header

  • io.grpc.netty.shaded.io.grpc.netty.NettyClientStream.Sink#writeHeaders

最终是通过创建 Netty 的指令,将 header 发送给服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private void writeHeadersInternal(Metadata headers, byte[] requestPayload) {
// 将 Header 转为 Netty 的 HTTP2 的 header
// 根据方法名获取路径
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
// 如果路径为 null,则设置路径为方法全名
if (defaultPath == null) {
defaultPath = new AsciiString("/" + method.getFullMethodName());
methodDescriptorAccessor.setRawMethodName(method, defaultPath);
}
// 如果有 payload,则使用 GET 方法
boolean get = (requestPayload != null);
AsciiString httpMethod;
// 如果是 GET 方法,则将负载加到请求路径中,并设置请求方法
if (get) {
// 将负载通过 base64 编码后添加到请求路径中
defaultPath = new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload));
httpMethod = Utils.HTTP_GET_METHOD;
} else {
httpMethod = Utils.HTTP_METHOD;
}
// 将 Header 转为 Netty 的 HTTP2 header
Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath, authority, httpMethod, userAgent);

// 创建 ChannelFuture 的监听器
ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 如果 Future 状态不是成功的
if (!future.isSuccess()) {
// 流创建失败时,如果流没有关闭,则关闭流;当 Channel 关闭时, Lifecycle Manager 更了解失败,
// 尤其是在判断完成之前
// 获取关闭状态
Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
// 如果关闭状态是 null,则从失败的 Future 中获取失败状态
if (s == null) {
s = transportState().statusFromFailedFuture(future);
}
// 上报 Transport 的状态
transportState().transportReportStatus(s, true, new Metadata());
}
}
};
// Write the command requesting the creation of the stream.
// 写入创建流的请求的指令,并添加失败的 Future 监听器
writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
!method.getType().clientSendsOneMessage() || get)
.addListener(failureListener);
}

发起请求

当在 io.grpc.stub.ClientCalls#startCall中调用了responseListener.onStart()后,会开始发送请求

  • io.grpc.stub.ClientCalls.UnaryStreamToFuture#onStart
1
2
3
4
5
6
7
8
9
10
11
void onStart() {
responseFuture.call.request(2);
}
```

- io.grpc.internal.ClientCallImpl#request

```java
public void request(int numMessages) {
stream.request(numMessages);
}
  • io.grpc.internal.AbstractStream#request
1
2
3
public final void request(int numMessages) {
transportState().requestMessagesFromDeframer(numMessages);
}

然后通过 Deframer 发送

  • io.grpc.internal.AbstractStream.TransportState#requestMessagesFromDeframer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void requestMessagesFromDeframer(final int numMessages) {
// 如果是线程安全的解帧器,则直接执行
if (deframer instanceof ThreadOptimizedDeframer) {
// 发送指定数量的消息
deframer.request(numMessages);
return;
}
// 如果不是线程安全的解帧器,则由 Transport 的线程执行
class RequestRunnable implements Runnable {
@Override
public void run() {
try {
deframer.request(numMessages);
} catch (Throwable t) {
deframeFailed(t);
}
}
}

runOnTransportThread(new RequestRunnable());
}

发送消息

  • io.grpc.internal.ClientCallImpl#sendMessageInternal

判断是否是可重试的流,如果是,则使用可重试的流发送消息,如果不是,则使用普通的流发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void sendMessageInternal(ReqT message) {
try {
// 如果是重试流,则通过重试流的方法发送消息
if (stream instanceof RetriableStream) {
RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream;
retriableStream.sendMessage(message);
} else {
// 不是重试流,将消息转为流,发送
stream.writeMessage(method.streamRequest(message));
}
} catch (RuntimeException e) {
// 如果出错则取消请求
stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message"));
return;
} catch (Error e) {
stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error"));
throw e;
}
// 对于 unary 请求,不用flush,因为接下来就是 halfClose, 这样就可以在消息最后搭载 END_STREAM=true,
// 而无需打开损坏的流
if (!unaryRequest) {
stream.flush();
}
}
  • io.grpc.internal.AbstractStream#writeMessage

将消息内容转为流后,最终通过将消息传递给 Framer

1
2
3
4
5
6
7
8
9
10
public final void writeMessage(InputStream message) {
try {
if (!framer().isClosed()) {
// 写入消息体
framer().writePayload(message);
}
} finally {
GrpcUtil.closeQuietly(message);
}
}
  • io.grpc.internal.AbstractClientStream#deliverFrame

将 Framer 的内容传递给 Transport

1
2
3
4
5
6
7
8
public final void deliverFrame(WritableBuffer frame,
boolean endOfStream,
boolean flush,
int numMessages) {
Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
// 通过 netty 写入
abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
}
  • io.grpc.netty.NettyClientStream.Sink#writeFrameInternal

最终通过 Netty 的指令,将消息内容发送给服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void writeFrameInternal(WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
// 将 frame 转换为 ByteBuf
ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf().touch();
// 统计 ByteBuf 的可读字节数
final int numBytes = bytebuf.readableBytes();
// 如果字节数大于 0
if (numBytes > 0) {
// 将要出站的字节数添加到流控中
onSendingBytes(numBytes);
// 将发送 gRPC 帧命令添加到写队列中
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 如果 future 成功,且 Transport 中的流不为 null
if (future.isSuccess() && transportState().http2Stream() != null) {
// 添加发送的字节数及统计
transportState().onSentBytes(numBytes);
NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
}
}
});
} else {
// 如果发送的字节为空,则不会影响流控,仅仅发送
writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
}
}

半关闭

  • io.grpc.internal.AbstractClientStream#halfClose

从客户端关闭流,关闭后客户端不能再发送消息,但是可以接收

1
2
3
4
5
6
7
public final void halfClose() {
if (!transportState().isOutboundClosed()) {
transportState().setOutboundClosed();
// 输出已经到达消息结尾
endOfMessages();
}
}
  • io.grpc.internal.AbstractStream#endOfMessages
1
2
3
protected final void endOfMessages() {
framer().close();
}
  • io.grpc.internal.MessageFramer#close

调用 Framer,释放缓冲区, 提交流;最终还是通过 Netty,将关闭流的帧写入,发送给服务端

1
2
3
4
5
6
7
8
9
10
11
public void close() {
if (!isClosed()) {
closed = true;
// With the current code we don't expect readableBytes > 0 to be possible here, added
// defensively to prevent buffer leak issues if the framer code changes later.
if (buffer != null && buffer.readableBytes() == 0) {
releaseBuffer();
}
commitToSink(true, true);
}
}

获取返回结果

io.grpc.stub.ClientCalls#blockingUnaryCall 方法中,调用完 futureUnaryCall 方法后,会返回 ListenableFuture用于监听返回结果

  • io.grpc.stub.ClientCalls#blockingUnaryCall

会不断的循环,监听线程池返回的结果

1
2
3
4
5
6
7
8
9
10
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
interrupt = true;
call.cancel("Thread interrupted", e);
}
}
return getUnchecked(responseFuture);

当 Server 端返回响应内容时,会调用监听器的 messagesAvailable 方法,从响应的流中解析响应内容

  • io.grpc.internal.ClientCallImpl.ClientStreamListenerImpl#messagesAvailable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
try {
InputStream message;
while ((message = producer.next()) != null) {
try {
// 将消息流解析为响应对象,并传递给 Future
observer.onMessage(method.parseResponse(message));
} catch (Throwable t) {
GrpcUtil.closeQuietly(message);
throw t;
}
message.close();
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
Status status = Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
stream.cancel(status);
close(status, new Metadata());
}
  • io.grpc.stub.ClientCalls.UnaryStreamToFuture#onMessage

为 Future 对象设置值

1
2
3
4
5
6
7
public void onMessage(RespT value) {
if (this.value != null) {
throw Status.INTERNAL.withDescription("More than one value received for unary call")
.asRuntimeException();
}
this.value = value;
}
  • io.grpc.stub.ClientCalls#getUnchecked

返回 Future 的值

1
2
3
4
5
6
7
8
9
10
11
12
13
private static <V> V getUnchecked(Future<V> future) {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Thread interrupted")
.withCause(e)
.asRuntimeException();
} catch (ExecutionException e) {
throw toStatusRuntimeException(e.getCause());
}
}
Tags: gRPC