gRPC Stream Stream 在 gRPC 中代表一个真正的请求,包含要发送的 消息;Stream 分为 ClientStream 和 ServerStream
ClientStream ClientStream 接口继承 Stream 接口,有多个实现类或抽象实现类:
ForwardingClientStream: 用于转发的 ClientStream,支持代理真正的流,可以用于触发一些动作,如统计等
NoopClientStream: 不做任何操作的 ClientStream,用于空实现
FailingClientStream: 用于失败的 ClientStream,处理请求失败的场景
InProcessClientStream: 进程内的 ClientStream,用于测试,不会发出实际请求
AbstractClientStream: ClientStream 的抽象实现类,实现了部分基础操作,如发送header,写入消息,半关闭等
NettyClientStream: 基于 Netty 实现的 ClientStream,实现了基于 Netty 的帧操作等
OkHttpClientStream: 基于 OkHttp 实现的 ClientStream,实现了基于 OkHttp 的帧操作等
方法
开始一个新的流,对于每一个流,只能调用一次
1 void start (ClientStreamListener listener) ;
从客户端关闭流,当关闭后,不能发送更多的消息,但是可以接收消息,只能调用一次
异常终止流,当调用后不会再发送和接收消息,只有在 start 之后才可以被调用
1 void cancel (Status reason) ;
设置请求有效截止时间,过了这个时间之后就会终止请求执行
1 void setDeadline (@Nonnull Deadline deadline) ;
获取流的属性
1 Attributes getAttributes () ;
监听器 ClientStreamListener 用于监听客户端流的事件
当接收得此事件说明 Transport 已经准备好发送附加消息了
当远程端点接收到消息时调用
1 void messagesAvailable (MessageProducer producer) ;
当收到服务端返回的 header 时调用,如果没有 header 返回,则这个方法不会被调用
1 void headersRead (Metadata headers) ;
当流被关闭时调用
1 2 3 void closed (Status status, Metadata trailers) ;void closed (Status status, RpcProgress rpcProgress, Metadata trailers) ;
ClientStream 流程 发起请求 生成的代码中通过 blockingUnaryCall 开始发起请求
io.grpc.stub.ClientCalls#blockingUnaryCall
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static <ReqT, RespT> RespT blockingUnaryCall (Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { ThreadlessExecutor executor = new ThreadlessExecutor(); boolean interrupt = false ; ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) .withExecutor(executor)); try { ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req); while (!responseFuture.isDone()) { try { executor.waitAndDrain(); } catch (InterruptedException e) { interrupt = true ; call.cancel("Thread interrupted" , e); } } return getUnchecked(responseFuture); } catch (RuntimeException e) { throw cancelThrow(call, e); } catch (Error e) { throw cancelThrow(call, e); } finally { if (interrupt) { Thread.currentThread().interrupt(); } } }
创建 ClientCall 后调用 futureUnaryCall 开始发起请求,并返回用于获取结果的 ListenableFuture
1 2 3 4 5 6 7 public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall (ClientCall<ReqT, RespT> call, ReqT req) { GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call); asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture)); return responseFuture; }
io.grpc.stub.ClientCalls#asyncUnaryRequestCall
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static <ReqT, RespT> void asyncUnaryRequestCall (ClientCall<ReqT, RespT> call, ReqT req, StartableListener<RespT> responseListener) { startCall(call, responseListener); try { call.sendMessage(req); call.halfClose(); } catch (RuntimeException e) { throw cancelThrow(call, e); } catch (Error e) { throw cancelThrow(call, e); } }
创建 ClientStream 当调用了 io.grpc.internal.ClientCallImpl#start 方法后,会创建客户端流; 如果已经过了超时时间,则会使用 DEADLINE_EXECEEDED 状态创建 FailingClientStream;如果还为超时,则根据是否开启了重试创建相应的流
io.grpc.internal.ClientCallImpl#startInternal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Deadline effectiveDeadline = effectiveDeadline(); boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired();if (!deadlineExceeded) { if (retryEnabled) { stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context); } else { ClientTransport transport = clientTransportProvider.get(new PickSubchannelArgsImpl(method, headers, callOptions)); stream = transport.newStream(method, headers, callOptions); } } else { stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription("ClientCall started after deadline exceeded: " + effectiveDeadline)); }
然后根据不同的实现,在 Transport 内创建流
io.grpc.netty.NettyClientTransport#newStream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public ClientStream newStream (MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) { if (channel == null ) { return new FailingClientStream(statusExplainingWhyTheChannelIsNull); } StatsTraceContext statsTraceCtx = StatsTraceContext.newClientContext(callOptions, getAttributes(), headers); return new NettyClientStream(
开始流 开始流,并指定监听器
io.grpc.internal.AbstractClientStream#start
1 2 3 4 5 6 7 8 public final void start (ClientStreamListener listener) { transportState().setListener(listener); if (!useGet) { abstractClientStreamSink().writeHeaders(headers, null ); headers = null ; } }
如果不是 GET 方法的请求,要先写入 Header
io.grpc.netty.shaded.io.grpc.netty.NettyClientStream.Sink#writeHeaders
最终是通过创建 Netty 的指令,将 header 发送给服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private void writeHeadersInternal (Metadata headers, byte [] requestPayload) { AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method); if (defaultPath == null ) { defaultPath = new AsciiString("/" + method.getFullMethodName()); methodDescriptorAccessor.setRawMethodName(method, defaultPath); } boolean get = (requestPayload != null ); AsciiString httpMethod; if (get) { defaultPath = new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload)); httpMethod = Utils.HTTP_GET_METHOD; } else { httpMethod = Utils.HTTP_METHOD; } Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath, authority, httpMethod, userAgent); ChannelFutureListener failureListener = new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { Status s = transportState().handler.getLifecycleManager().getShutdownStatus(); if (s == null ) { s = transportState().statusFromFailedFuture(future); } transportState().transportReportStatus(s, true , new Metadata()); } } }; writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get), !method.getType().clientSendsOneMessage() || get) .addListener(failureListener); }
发起请求 当在 io.grpc.stub.ClientCalls#startCall中调用了responseListener.onStart()后,会开始发送请求
io.grpc.stub.ClientCalls.UnaryStreamToFuture#onStart
1 2 3 4 5 6 7 8 9 10 11 void onStart () { responseFuture.call.request(2 ); } ``` - io.grpc.internal.ClientCallImpl#request ```java public void request (int numMessages) { stream.request(numMessages); }
io.grpc.internal.AbstractStream#request
1 2 3 public final void request (int numMessages) { transportState().requestMessagesFromDeframer(numMessages); }
然后通过 Deframer 发送
io.grpc.internal.AbstractStream.TransportState#requestMessagesFromDeframer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private void requestMessagesFromDeframer (final int numMessages) { if (deframer instanceof ThreadOptimizedDeframer) { deframer.request(numMessages); return ; } class RequestRunnable implements Runnable { @Override public void run () { try { deframer.request(numMessages); } catch (Throwable t) { deframeFailed(t); } } } runOnTransportThread(new RequestRunnable()); }
发送消息
io.grpc.internal.ClientCallImpl#sendMessageInternal
判断是否是可重试的流,如果是,则使用可重试的流发送消息,如果不是,则使用普通的流发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void sendMessageInternal (ReqT message) { try { if (stream instanceof RetriableStream) { RetriableStream<ReqT> retriableStream = (RetriableStream<ReqT>) stream; retriableStream.sendMessage(message); } else { stream.writeMessage(method.streamRequest(message)); } } catch (RuntimeException e) { stream.cancel(Status.CANCELLED.withCause(e).withDescription("Failed to stream message" )); return ; } catch (Error e) { stream.cancel(Status.CANCELLED.withDescription("Client sendMessage() failed with Error" )); throw e; } if (!unaryRequest) { stream.flush(); } }
io.grpc.internal.AbstractStream#writeMessage
将消息内容转为流后,最终通过将消息传递给 Framer
1 2 3 4 5 6 7 8 9 10 public final void writeMessage (InputStream message) { try { if (!framer().isClosed()) { framer().writePayload(message); } } finally { GrpcUtil.closeQuietly(message); } }
io.grpc.internal.AbstractClientStream#deliverFrame
将 Framer 的内容传递给 Transport
1 2 3 4 5 6 7 8 public final void deliverFrame (WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) { Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS" ); abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages); }
io.grpc.netty.NettyClientStream.Sink#writeFrameInternal
最终通过 Netty 的指令,将消息内容发送给服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void writeFrameInternal (WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) { ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf().touch(); final int numBytes = bytebuf.readableBytes(); if (numBytes > 0 ) { onSendingBytes(numBytes); writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush) .addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (future.isSuccess() && transportState().http2Stream() != null ) { transportState().onSentBytes(numBytes); NettyClientStream.this .getTransportTracer().reportMessageSent(numMessages); } } }); } else { writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush); } }
半关闭
io.grpc.internal.AbstractClientStream#halfClose
从客户端关闭流,关闭后客户端不能再发送消息,但是可以接收
1 2 3 4 5 6 7 public final void halfClose () { if (!transportState().isOutboundClosed()) { transportState().setOutboundClosed(); endOfMessages(); } }
io.grpc.internal.AbstractStream#endOfMessages
1 2 3 protected final void endOfMessages () { framer().close(); }
io.grpc.internal.MessageFramer#close
调用 Framer,释放缓冲区, 提交流;最终还是通过 Netty,将关闭流的帧写入,发送给服务端
1 2 3 4 5 6 7 8 9 10 11 public void close () { if (!isClosed()) { closed = true ; if (buffer != null && buffer.readableBytes() == 0 ) { releaseBuffer(); } commitToSink(true , true ); } }
获取返回结果 在 io.grpc.stub.ClientCalls#blockingUnaryCall 方法中,调用完 futureUnaryCall 方法后,会返回 ListenableFuture用于监听返回结果
io.grpc.stub.ClientCalls#blockingUnaryCall
会不断的循环,监听线程池返回的结果
1 2 3 4 5 6 7 8 9 10 ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req); while (!responseFuture.isDone()) { try { executor.waitAndDrain(); } catch (InterruptedException e) { interrupt = true ; call.cancel("Thread interrupted" , e); } } return getUnchecked(responseFuture);
当 Server 端返回响应内容时,会调用监听器的 messagesAvailable 方法,从响应的流中解析响应内容
io.grpc.internal.ClientCallImpl.ClientStreamListenerImpl#messagesAvailable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 try { InputStream message; while ((message = producer.next()) != null ) { try { observer.onMessage(method.parseResponse(message)); } catch (Throwable t) { GrpcUtil.closeQuietly(message); throw t; } message.close(); } } catch (Throwable t) { GrpcUtil.closeQuietly(producer); Status status = Status.CANCELLED.withCause(t).withDescription("Failed to read message." ); stream.cancel(status); close(status, new Metadata()); }
io.grpc.stub.ClientCalls.UnaryStreamToFuture#onMessage
为 Future 对象设置值
1 2 3 4 5 6 7 public void onMessage (RespT value) { if (this .value != null ) { throw Status.INTERNAL.withDescription("More than one value received for unary call" ) .asRuntimeException(); } this .value = value; }
io.grpc.stub.ClientCalls#getUnchecked
返回 Future 的值
1 2 3 4 5 6 7 8 9 10 11 12 13 private static <V> V getUnchecked (Future<V> future) { try { return future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Status.CANCELLED .withDescription("Thread interrupted" ) .withCause(e) .asRuntimeException(); } catch (ExecutionException e) { throw toStatusRuntimeException(e.getCause()); } }