gRPC 对冲请求取消流程 当客户端接收到对冲请求集合中的一个完成时,会取消其他的请求,被取消的请求最终会提交一个 CancelClientStreamCommand,发送一个 RST_STEAM 请求;当服务端接受到这个流后,如果监听器还没有关闭,会执行取消上下文的操作,最终将这个请求取消
客户端 当客户端成功接收到响应会,会在 io.grpc.internal.RetriableStream.Sublistener#close 中将成功的流进行提交
io.grpc.internal.RetriableStream#commit$CommitTask#run
在提交时,会通过提交 CommitTask 将其他的流取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class CommitTask implements Runnable { @Override public void run () { for (Substream substream : savedDrainedSubstreams) { if (substream != winningSubstream) { substream.stream.cancel(CANCELLED_BECAUSE_COMMITTED); } } if (retryFuture != null ) { retryFuture.cancel(false ); } if (hedgingFuture != null ) { hedgingFuture.cancel(false ); } postCommit(); } }
io.grpc.internal.AbstractClientStream#cancel
使用指定的原因取消流
1 2 3 4 5 public final void cancel (Status reason) { Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status" ); cancelled = true ; abstractClientStreamSink().cancel(reason); }
io.grpc.netty.shaded.io.grpc.netty.NettyClientStream.Sink#cancel
提交取消流的指令
1 2 3 4 5 6 7 8 9 10 public void cancel (Status status) { PerfMark.startTask("NettyClientStream$Sink.cancel" ); try { NettyClientStream.this .writeQueue.enqueue(new CancelClientStreamCommand(NettyClientStream.this .transportState(), status), true ); } finally { PerfMark.stopTask("NettyClientStream$Sink.cancel" ); } }
io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler#write
在执行写入消息时,写入取消指令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof CreateStreamCommand) { this .createStream((CreateStreamCommand)msg, promise); } else if (msg instanceof SendGrpcFrameCommand) { this .sendGrpcFrame(ctx, (SendGrpcFrameCommand)msg, promise); } else if (msg instanceof CancelClientStreamCommand) { this .cancelStream(ctx, (CancelClientStreamCommand)msg, promise); } else if (msg instanceof SendPingCommand) { this .sendPingFrame(ctx, (SendPingCommand)msg, promise); } else if (msg instanceof GracefulCloseCommand) { this .gracefulClose(ctx, (GracefulCloseCommand)msg, promise); } else if (msg instanceof ForcefulCloseCommand) { this .forcefulClose(ctx, (ForcefulCloseCommand)msg, promise); } else { if (msg != NOOP_MESSAGE) { throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); } ctx.write(Unpooled.EMPTY_BUFFER, promise); } }
io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler#cancelStream
执行取消命令的写入,在 transportReportStatus 会提交关闭监听器的指令,如果停止投递,同时也会选择执行或者延迟执行关闭帧 如果流存在,则会发送一个新的 RST_STREAM 请求,该请求表示当前流错误,错误状态为 CANCEL,即值为 8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void cancelStream (ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { TransportState stream = cmd.stream(); try { Status reason = cmd.reason(); if (reason != null ) { stream.transportReportStatus(reason, true , new Metadata()); } if (!cmd.stream().isNonExistent()) { this .encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); } else { promise.setSuccess(); } } finally { PerfMark.stopTask("NettyClientHandler.cancelStream" , stream.tag()); } }
服务端
io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.FrameListener#onRstStreamRead
接收到的请求中,errorCode 为 8,代表请求被取消
1 2 3 4 5 6 7 public void onRstStreamRead (ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception { if (NettyServerHandler.this .keepAliveManager != null ) { NettyServerHandler.this .keepAliveManager.onDataReceived(); } NettyServerHandler.this .onRstStreamRead(streamId, errorCode); }
io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler#onRstStreamRead
然后会在 NettyServerHandler 中根据 streamId 获取流,如果流存在,则会以 CANCELLED 状态取消当前请求 需要注意的是,如果接收到这个请求时流已经完成被清除,则可能无法处理,请求会以 OK 状态完成
1 2 3 4 5 6 7 8 9 10 11 12 13 private void onRstStreamRead (int streamId, long errorCode) throws Http2Exception { try { TransportState stream = this .serverStream(this .connection().stream(streamId)); if (stream != null ) { try { stream.transportReportStatus(Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); } } } catch (Throwable var9) { logger.log(Level.WARNING, "Exception in onRstStreamRead()" , var9); throw this .newStreamException(streamId, var9); } }
io.grpc.internal.AbstractServerStream.TransportState#transportReportStatus
如果解帧器已经关闭,则使用取消状态关闭监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final void transportReportStatus (final Status status) { Preconditions.checkArgument(!status.isOk(), "status must not be OK" ); if (deframerClosed) { deframerClosedTask = null ; closeListener(status); } else { deframerClosedTask = new Runnable() { @Override public void run () { closeListener(status); } }; immediateCloseRequested = true ; closeDeframer(true ); } }
io.grpc.internal.AbstractServerStream.TransportState#closeListener
使用指定状态关闭监听器
1 2 3 4 5 6 7 8 9 10 11 private void closeListener (Status newStatus) { Preconditions.checkState(!newStatus.isOk() || closedStatus != null ); if (!listenerClosed) { listenerClosed = true ; onStreamDeallocated(); listener().closed(newStatus); } }
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#closedInternal
使用 CANCELLED 状态,执行 ContextCloser 任务,取消上下文;然后提交 Closed 任务,取消 监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void closedInternal (final Status status) { if (!status.isOk()) { cancelExecutor.execute(new ContextCloser(context, status.getCause())); } final class Closed extends ContextRunnable { Closed() { super (context); } @Override public void runInContext () { try { getListener().closed(status); } } } callExecutor.execute(new Closed()); }
io.grpc.internal.ServerImpl.ContextCloser#run
执行提交的 ContextCloser 任务,取消上下文
1 2 3 4 public void run () { context.cancel(cause); }
io.grpc.Context.CancellableContext#cancel
执行 Context 取消事件,会修改 Context 的状态,取消等待的 deadline 任务,然后会通知并清除监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public boolean cancel (Throwable cause) { boolean triggeredCancel = false ; synchronized (this ) { if (!cancelled) { cancelled = true ; if (pendingDeadline != null ) { pendingDeadline.cancel(false ); pendingDeadline = null ; } this .cancellationCause = cause; triggeredCancel = true ; } } if (triggeredCancel) { notifyAndClearListeners(); } return triggeredCancel; }
io.grpc.Context.CancellableContext#notifyAndClearListeners
会通知当前的监听器进行取消,默认由两个监听器,一个是CancellationListener,用于取消当前上下文,另一个是 ServerStreamCancellationListener ,用于取消流;如果有其他的监听器,还会通知其他监听器取消,并移除监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void notifyAndClearListeners () { ArrayList<ExecutableListener> tmpListeners; CancellationListener tmpParentListener; synchronized (this ) { if (listeners == null ) { return ; } tmpParentListener = parentListener; parentListener = null ; tmpListeners = listeners; listeners = null ; } for (ExecutableListener tmpListener : tmpListeners) { if (tmpListener.context == this ) { tmpListener.deliver(); } } for (ExecutableListener tmpListener : tmpListeners) { if (!(tmpListener.context == this )) { tmpListener.deliver(); } } if (cancellableAncestor != null ) { cancellableAncestor.removeListener(tmpParentListener); } }
io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl#ServerStreamListenerImpl
最终会调用构造 ServerStreamListenerImpl 时添加的 Context.CancellationListener 的 cancelled 方法,将 ServerCallImpl 的 cancelled 状态改为 true
1 2 3 4 5 6 this .context.addListener(new Context.CancellationListener() { @Override public void cancelled (Context context) { ServerStreamListenerImpl.this .call.cancelled = true ; } }, MoreExecutors.directExecutor());
io.grpc.internal.ServerImpl.ServerTransportListenerImpl#StreamCreated$ServerStreamCancellationListener#cancelled
执行创建流时添加的流取消监听器,如果没有异常信息,则会使用 "io.grpc.Context was cancelled without error"作为描述,更新状态
1 2 3 4 5 6 7 8 public void cancelled (Context context) { Status status = statusFromCancelled(context); if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { stream.cancel(status); } }
io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl#closedInternal
在执行 OnClosed 任务时,会使用 CANCELLED 状态,触发 ServerCall.Listener 的 onCanncel 事件,如果有取消任务,会执行取消任务
另外,无论请求成功与否,都会执行 context.cancel(null),通知 notifyAndClearListeners取消上下文监听器和流监听器,然后移除监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void closedInternal (Status status) { try { if (status.isOk()) { listener.onComplete(); } else { call.cancelled = true ; listener.onCancel(); } } finally { context.cancel(null ); } }