gRPC Server 端请求处理流程 [TOC]
初始化
创建并启动 ServerTransport
在 Server 启动的时候,最终调用 NettyServer 的 start() 方法,为 ServerBootstrap 添加了 ChannelInitializer,最终,当有新的连接建立时,会由 NettyServerHandler 调用该类的 initChannel 方法,初始化一个 NettyServerTransport
io.grpc.netty.NettyServer#start
在初始化 Netty Channel 时,会先创建 NettyServerTransport,然后调用监听器的 Transport 创建事件,添加一个超时取消任务; 然后会调用 Transport 的 start 方法启动 Transport
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 b.childHandler(new ChannelInitializer<Channel>() { @Override public void initChannel (Channel ch) { NettyServerTransport transport = new NettyServerTransport(); ServerTransportListener transportListener; synchronized (NettyServer.this ) { transportListener = listener.transportCreated(transport); } transport.start(transportListener); ChannelFutureListener loopReleaser = new LoopReleaser(); channelDone.addListener(loopReleaser); ch.closeFuture().addListener(loopReleaser); } });
io.grpc.netty.NettyServerTransport#start
在启动 Transport 时,会为当前的 Transport 创建一个处理器,并绑定到 Netty 的 Channel 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void start (ServerTransportListener listener) { this .listener = listener; grpcHandler = createHandler(listener, channelUnused); ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler); ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler); ChannelFutureListener terminationNotifier = new TerminationNotifier(); channelUnused.addListener(terminationNotifier); channel.closeFuture().addListener(terminationNotifier); channel.pipeline().addLast(bufferingHandler); }
处理请求 当 Server 与 Client 的连接建立成功之后,可以开始处理请求
请求整体处理流程
读取 Settings 帧,触发 Transport ready 事件
读取 Header 帧,触发 FrameListener#onHeadersRead 事件
由 NettyServerHandler 处理
根据 Header 里面的信息,获取相应的方法
将 HTTP 流转换为 NettyServerStream
触发 Transport#streamCreated 事件
检查编解码、解压缩等信息,创建可取消的上下文
初始化流监听器
提交 StreamCreated 任务
触发 NettyServerStream.TransportState#onStreamAllocated 事件
提交 OnReady 任务
执行 StreamCreated 任务
根据方法名查找方法定义
调用 startCall 开始处理
遍历拦截器,使用拦截器包装方法处理器
调用 startWrappedCall 处理
创建 ServerCallImpl 实例
通过方法定义的请求处理器 startCall 方法处理
创建响应观察器 ServerCallStreamObserverImpl 实例
调用 call.request() 获取指定数量的消息
提交 RequestRunnable 任务获取指定数量的消息
创建调用监听器 UnaryServerCallListener
创建 ServerStreamListenerImpl 流监听器实例
执行 OnReady 任务
调用 UnaryServerCallListener#onReady 处理 Ready 事件
修改 ready 状态
如果有 onReadyHandler 任务,则执行
执行 RequestRunnable 任务
要求指定数量的消息
修改等待投递的消息数量
调用 deliver 方法投递
如果有待投递的消息,根据类型进行投递
当消息类型是消息体时,处理消息体
读取消息体的流
调用 MessageFramer.Listener#messagesAvailable 事件,通知新的消息
提交 MessagesAvailable 任务
调用 MessageDeframer#close 方法关闭帧
调用流监听器半关闭事件
提交 HalfClosed 任务
执行 MessagesAvailable 任务 32. 从 MessageProducer 中获取消息,解析为请求对象 33. 调用 SeverCall.Listener#onMessage 方法处理消息34. 将 `request` 对象赋值给相应的对象,该对象会在 `halfClose` 时处理
执行 HalfClosed 任务
调用 invoke 方法,处理业务逻辑
根据方法 ID,使用相应的实现调用业务逻辑
调用 StreamObserver#onNext 发送响应
发送响应 Header
设置编码和压缩的请求头
写入 Header
发送响应 body
将响应对象序列化为流
写入响应
清空缓存
调用 StreamObserver#onComplete 完成请求
使用 OK 状态关闭调用
修改关闭状态
调用流关闭事件
关闭帧
将响应状态加入响应元数据中
修改 TransportState 的状态
写入响应元数据,发送给客户端
冻结响应
如果 ready 状态,再次执行 onReady 事件
当流关闭时,调用 TransportState#complete 事件
关闭监听器
提交 Closed 任务
执行 Closed 任务
调用 stream#complete 事件
取消上下文
1. 读取 Settings 帧
io.grpc.netty.NettyServerHandler.FrameListener#onSettingsRead
当读取到 Settings 帧时,会调用 onSettingsRead 方法,同时会同时 Transport 监听器 ready 事件
1 2 3 4 5 6 7 public void onSettingsRead (ChannelHandlerContext ctx, Http2Settings settings) { if (firstSettings) { firstSettings = false ; attributes = transportListener.transportReady(negotiationAttributes); } }
io.grpc.internal.ServerImpl.ServerTransportListenerImpl#transportReady
会通知 Transport Ready 事件,会遍历 ServerTransportFilter 通知,默认没有 ServerTransportFilter 的实现
1 2 3 4 5 6 7 8 9 10 11 12 public Attributes transportReady (Attributes attributes) { handshakeTimeoutFuture.cancel(false ); handshakeTimeoutFuture = null ; for (ServerTransportFilter filter : transportFilters) { attributes = Preconditions.checkNotNull(filter.transportReady(attributes), "Filter %s returned null" , filter); } this .attributes = attributes; return attributes; }
当 Server 接收到 Client 发送的 header 后,经过 Netty 处理,最终调用 onHeadersRead 开始处理流
io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.FrameListener#onHeadersRead
接收 header 帧
1 2 3 4 5 6 7 public void onHeadersRead (ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception { if (NettyServerHandler.this .keepAliveManager != null ) { NettyServerHandler.this .keepAliveManager.onDataReceived(); } NettyServerHandler.this .onHeadersRead(ctx, streamId, headers); }
io.grpc.netty.NettyServerHandler.FrameListener#onHeadersRead
开始处理 header
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void onHeadersRead (ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream) throws Http2Exception { if (keepAliveManager != null ) { keepAliveManager.onDataReceived(); } NettyServerHandler.this .onHeadersRead(ctx, streamId, headers); }
io.grpc.netty.NettyServerHandler#onHeadersRead
会根据请求的 Header 信息,查找服务和方法,校验请求类型,请求方法,传输编码等内容;然后根据 HTTP2 流 Id,获取对应的流,将其转换为 NettyServerStream;调用 Transport 的 onStreamCreated 事件 向线程池中提交 StreamCreated,然后调用 onStreamAllocated 方法通知流 StreamListener的onReady事件提交OnReady任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private void onHeadersRead (ChannelHandlerContext ctx, int streamId, Http2Headers headers) throws Http2Exception { try { CharSequence path = headers.path(); String method = path.subSequence(1 , path.length()).toString(); Http2Stream http2Stream = requireHttp2Stream(streamId); Metadata metadata = Utils.convertHeaders(headers); StatsTraceContext statsTraceCtx = StatsTraceContext.newServerContext(streamTracerFactories, method, metadata); NettyServerStream.TransportState state = new NettyServerStream.TransportState( this , ctx.channel().eventLoop(), http2Stream, maxMessageSize, statsTraceCtx, transportTracer, method); try { String authority = getOrUpdateAuthority((AsciiString) headers.authority()); NettyServerStream stream = new NettyServerStream(ctx.channel(), state, attributes, authority, statsTraceCtx, transportTracer); transportListener.streamCreated(stream, method, metadata); state.onStreamAllocated(); http2Stream.setProperty(streamKey, state); } } catch (Exception e) { logger.log(Level.WARNING, "Exception in onHeadersRead()" , e); throw newStreamException(streamId, e); } }
3. 流创建事件 1 transportListener.streamCreated(stream, method, metadata);
io.grpc.internal.ServerImpl.ServerTransportListenerImpl#streamCreated
检查并初始化流的编解码,解压缩等信息;创建可需取消的上下文,选择要执行的线程池,初始化流监听器,最终提交流创建任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void streamCreatedInternal (final ServerStream stream, final String methodName, final Metadata headers, final Tag tag) { final Executor wrappedExecutor; if (executor == directExecutor()) { wrappedExecutor = new SerializeReentrantCallsDirectExecutor(); stream.optimizeForDirectExecutor(); } else { wrappedExecutor = new SerializingExecutor(executor); } final Context.CancellableContext context = createContext(headers, statsTraceCtx); final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener(wrappedExecutor, executor, stream, context, tag); stream.setListener(jumpListener); wrappedExecutor.execute(new StreamCreated()); }
接下来会执行流 ready 的任务
4. 流 ready 事件 1 state.onStreamAllocated();
io.grpc.internal.AbstractStream.TransportState#onStreamAllocated
流分配,会调用流 ready 事件
1 2 3 4 5 6 7 8 protected void onStreamAllocated () { checkState(listener() != null ); synchronized (onReadyLock) { checkState(!allocated, "Already allocated" ); allocated = true ; } notifyIfReady(); }
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#onReady
最终会调用 onReady 提交流的 OnReady 任务
1 2 3 4 5 public void onReady () { try { callExecutor.execute(new OnReady()); } }
5. 执行流创建任务 执行 StreamCreated任务
io.grpc.internal.ServerImpl.ServerTransportListenerImpl#streamCreated
执行 StreamCreated任务时,会先根据方法名称从注册器中查找对应的方法处理器,然后调用 startCall 方法进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 final class StreamCreated extends ContextRunnable { private void runInternal () { ServerStreamListener listener = NOOP_LISTENER; try { ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName); if (method == null ) { method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); } if (method == null ) { Status status = Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName); stream.close(status, new Metadata()); context.cancel(null ); return ; } listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag); } catch (Throwable t) { stream.close(Status.fromThrowable(t), new Metadata()); context.cancel(null ); throw t; } finally { jumpListener.setListener(listener); } final class ServerStreamCancellationListener implements Context .CancellationListener { @Override public void cancelled (Context context) { Status status = statusFromCancelled(context); if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) { stream.cancel(status); } } } context.addListener(new ServerStreamCancellationListener(), directExecutor()); } }
io.grpc.internal.ServerImpl.ServerTransportListenerImpl#startCall
会获取方法的处理器,然后遍历拦截器,封装处理器,调用 startWrappedCall 处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private <ReqT, RespT> ServerStreamListener startCall (ServerStream stream, String fullMethodName, ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers, Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) { ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler(); for (ServerInterceptor interceptor : interceptors) { handler = InternalServerInterceptors.interceptCallHandler(interceptor, handler); } ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler); return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag); }
io.grpc.internal.ServerImpl.ServerTransportListenerImpl#startWrappedCall
创建请求处理器实例,然后调用方法处理器,开始处理请求,同时创建流监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private <WReqT, WRespT> ServerStreamListener startWrappedCall (String fullMethodName, ServerMethodDefinition<WReqT, WRespT> methodDef, ServerStream stream, Metadata headers, Context.CancellableContext context, Tag tag) { ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(stream, methodDef.getMethodDescriptor(), headers, context, decompressorRegistry, compressorRegistry, serverCallTracer, tag); ServerCall.Listener<WReqT> listener = methodDef.getServerCallHandler().startCall(call, headers); if (listener == null ) { throw new NullPointerException("startCall() returned a null listener for method " + fullMethodName); } return call.newServerStreamListener(listener); }
io.grpc.stub.ServerCalls.UnaryServerCallHandler#startCall
会创建响应观察器,要求指定数量的消息,并创建监听器
1 2 3 4 5 6 7 8 public ServerCall.Listener<ReqT> startCall (ServerCall<ReqT, RespT> call, Metadata headers) { ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = new ServerCallStreamObserverImpl<>(call); call.request(2 ); return new UnaryServerCallListener(responseObserver, call); }
6. 提交要求指定数量的消息任务 在执行 StreamCreated 任务时,会调用 startCall 方法,提交 RequestRunnable任务,要求指定数量的消息
io.grpc.internal.AbstractStream#request
在执行 StreamCreated 任务时指定接收的帧的数量
1 2 3 public final void request (int numMessages) { transportState().requestMessagesFromDeframer(numMessages); }
io.grpc.internal.AbstractStream.TransportState#requestMessagesFromDeframer
提交获取指定数量的帧的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void requestMessagesFromDeframer (final int numMessages) { class RequestRunnable implements Runnable { @Override public void run () { try { deframer.request(numMessages); } catch (Throwable t) { deframeFailed(t); } } } runOnTransportThread(new RequestRunnable()); }
7. 执行流 ready 任务 执行 OnReady 任务
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#onReady
执行 onReady 时提交的 OnReady 任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final class OnReady extends ContextRunnable { OnReady() { super (context); } @Override public void runInContext () { try { getListener().onReady(); } catch (Throwable t) { internalClose(t); throw t; } } }
io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener#onReady
处理流 ready 事件,如果有 onReadyHandler 则会执行
1 2 3 4 5 6 7 8 public void onReady () { wasReady = true ; if (responseObserver.onReadyHandler != null ) { responseObserver.onReadyHandler.run(); } }
8. 执行读取指定数量的消息任务并提交有可用消息任务 执行 RequestRunnable任务
1 2 3 4 5 6 7 8 9 10 class RequestRunnable implements Runnable { @Override public void run () { try { deframer.request(numMessages); } catch (Throwable t) { deframeFailed(t); } } }
io.grpc.internal.MessageDeframer#request
读取指定数量的帧
1 2 3 4 5 6 7 public void request (int numMessages) { if (isClosed()) { return ; } pendingDeliveries += numMessages; deliver(); }
io.grpc.internal.MessageDeframer#deliver
读取消息并投递给监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void deliver () { if (inDelivery) { return ; } inDelivery = true ; try { while (!stopDelivery && pendingDeliveries > 0 && readRequiredBytes()) { switch (state) { case HEADER: processHeader(); break ; case BODY: processBody(); pendingDeliveries--; break ; default : throw new AssertionError("Invalid state: " + state); } } if (stopDelivery) { close(); return ; } if (closeWhenComplete && isStalled()) { close(); } } finally { inDelivery = false ; } }
io.grpc.internal.MessageDeframer#processBody
读取请求体并通知监听器有新的消息
1 2 3 4 5 6 7 8 9 10 11 private void processBody () { InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody(); nextFrame = null ; listener.messagesAvailable(new SingleMessageProducer(stream)); state = State.HEADER; requiredLength = HEADER_LENGTH; }
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#messagesAvailable
通知有新的消息可用,会提交 MessageAvailable 任务
1 2 3 4 5 6 public void messagesAvailable (final MessageProducer producer) { try { callExecutor.execute(new MessagesAvailable()); } }
9. 执行有新的可用消息任务
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#messagesAvailable
通知有新的消息可用,会提交 MessageAvailable 任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final class MessagesAvailable extends ContextRunnable { MessagesAvailable() { super (context); } @Override public void runInContext () { try { getListener().messagesAvailable(producer); } catch (Throwable t) { internalClose(t); throw t; } } }
io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl#messagesAvailableInternal
执行时,会先将流解析为请求对象,然后调用监听器的 onMessage方法,处理消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void messagesAvailableInternal (final MessageProducer producer) { if (call.cancelled) { GrpcUtil.closeQuietly(producer); return ; } InputStream message; try { while ((message = producer.next()) != null ) { try { listener.onMessage(call.method.parseRequest(message)); } catch (Throwable t) { GrpcUtil.closeQuietly(message); throw t; } message.close(); } } catch (Throwable t) { GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); throw new RuntimeException(t); } }
io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener#onMessage
由监听器接收消息,并赋值给相应的对象,在 halfClose 事件时处理该请求
1 2 3 4 5 6 7 8 9 10 11 public void onMessage (ReqT request) { if (this .request != null ) { call.close(Status.INTERNAL.withDescription(TOO_MANY_REQUESTS), new Metadata()); canInvoke = false ; return ; } this .request = request; }
10. 提交半关闭请求任务 当执行完 RequestRunnable 任务完成时,会调用 MessageDeframer#close 方法关闭帧
io.grpc.internal.MessageDeframer#close
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void close () { if (isClosed()) { return ; } boolean hasPartialMessage = nextFrame != null && nextFrame.readableBytes() > 0 ; try { if (fullStreamDecompressor != null ) { hasPartialMessage = hasPartialMessage || fullStreamDecompressor.hasPartialData(); fullStreamDecompressor.close(); } if (unprocessed != null ) { unprocessed.close(); } if (nextFrame != null ) { nextFrame.close(); } } finally { fullStreamDecompressor = null ; unprocessed = null ; nextFrame = null ; } listener.deframerClosed(hasPartialMessage); }
io.grpc.internal.AbstractServerStream.TransportState#deframerClosed
会执行关闭帧,然后调用 Stream 的监听器,通知半关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void deframerClosed (boolean hasPartialMessage) { deframerClosed = true ; if (endOfStream) { if (!immediateCloseRequested && hasPartialMessage) { deframeFailed(Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame" ) .asRuntimeException()); deframerClosedTask = null ; return ; } listener.halfClosed(); } if (deframerClosedTask != null ) { deframerClosedTask.run(); deframerClosedTask = null ; } }
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#halfClosed
提交半关闭任务
1 2 3 4 5 public void halfClosed () { try { callExecutor.execute(new HalfClosed()); } }
11. 执行半关闭任务
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#halfClosed
执行半关闭任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final class HalfClosed extends ContextRunnable { HalfClosed() { super (context); } @Override public void runInContext () { try { getListener().halfClosed(); } catch (Throwable t) { internalClose(t); throw t; } } }
io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener#onHalfClose
最终在监听器中调用相应的方法处理器,处理请求,并冻结响应;还会再次调用 onReady 事件,如果有 onReadyHandler 会执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void onHalfClose () { if (!canInvoke) { return ; } if (request == null ) { call.close(Status.INTERNAL.withDescription(MISSING_REQUEST), new Metadata()); return ; } method.invoke(request, responseObserver); request = null ; responseObserver.freeze(); if (wasReady) { onReady(); } }
io.github.helloworlde.HelloServiceGrpc.MethodHandlers#invoke(Req, io.grpc.stub.StreamObserver)
处理请求,这部分是生成的代码,会调用相应的实例,处理请求,并将响应内容通过 StreamObserver 发送出去
1 2 3 4 5 6 7 8 9 10 public void invoke (Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) { switch (methodId) { case METHODID_HOW_ARE_YOU: serviceImpl.howAreYou((io.github.helloworlde.HelloMessage) request, (io.grpc.stub.StreamObserver<io.github.helloworlde.HelloResponse>) responseObserver); break ; default : throw new AssertionError(); } }
处理响应 1. 执行业务逻辑处理
io.github.helloworlde.service.HelloServiceImpl#howAreYou
需要实现生成的接口,在方法中实现逻辑,并将响应通过 StreamObserver 发送出去
1 2 3 4 public void howAreYou (HelloMessage request, StreamObserver<HelloResponse> responseObserver) { responseObserver.onNext(HelloResponse.newBuilder().setResult("Hello : " + request.getMessage()).build()); responseObserver.onCompleted(); }
2. 发送响应内容
io.grpc.stub.ServerCalls.ServerCallStreamObserverImpl#onNext
发送单个响应时,会先检查请求是否取消了,如果已经取消了,则会抛出错误;接着检查请求的状态,如果是已经丢弃或者完成,也会抛出异常
然后会检查是否发送了 header,如果没有发送,则会先发送 header;发送 header 完成后会发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void onNext (RespT response) { if (cancelled) { if (onCancelHandler == null ) { throw Status.CANCELLED.withDescription("call already cancelled" ).asRuntimeException(); } return ; } checkState(!aborted, "Stream was terminated by error, no further calls are allowed" ); checkState(!completed, "Stream is already completed, no further calls are allowed" ); if (!sentHeaders) { call.sendHeaders(new Metadata()); sentHeaders = true ; } call.sendMessage(response); }
io.grpc.internal.ServerCallImpl#sendHeadersInternal
设置 header 内容,发送 header
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void sendHeadersInternal (Metadata headers) { headers.discardAll(MESSAGE_ENCODING_KEY); headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding()); stream.setCompressor(compressor); headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY); if (advertisedEncodings.length != 0 ) { headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings); } sendHeadersCalled = true ; stream.writeHeaders(headers); }
io.grpc.internal.AbstractServerStream#writeHeaders
将 header 内容写入帧中,会调用 Netty 相关的方法发送内容
1 2 3 4 5 6 public final void writeHeaders (Metadata headers) { Preconditions.checkNotNull(headers, "headers" ); headersSent = true ; abstractServerStreamSink().writeHeaders(headers); }
2. 发送响应内容
io.grpc.internal.ServerCallImpl#sendMessageInternal
发送响应内容,会先检查是否已经发送了 header,且请求没有关闭,且响应的状态正确 如果都没有问题,则将响应内容序列化为流,然后发送并清空缓冲区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void sendMessageInternal (RespT message) { checkState(sendHeadersCalled, "sendHeaders has not been called" ); checkState(!closeCalled, "call is closed" ); if (method.getType().serverSendsOneMessage() && messageSent) { internalClose(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES)); return ; } messageSent = true ; try { InputStream resp = method.streamResponse(message); stream.writeMessage(resp); stream.flush(); } catch (RuntimeException e) { close(Status.fromThrowable(e), new Metadata()); } catch (Error e) { close(Status.CANCELLED.withDescription("Server sendMessage() failed with Error" ), new Metadata()); throw e; } }
io.grpc.internal.AbstractStream#writeMessage
检查帧的状态,如果帧没有关闭,则将流的内容写入帧中,并关闭流 最终消息内容通过 Netty 的相关方法发送给客户端
1 2 3 4 5 6 7 8 9 10 11 public final void writeMessage (InputStream message) { checkNotNull(message, "message" ); try { if (!framer().isClosed()) { framer().writePayload(message); } } finally { GrpcUtil.closeQuietly(message); } }
io.grpc.internal.AbstractStream#flush
清空帧的缓冲,将所有内容都发送给客户端
1 2 3 4 5 6 public final void flush () { if (!framer().isClosed()) { framer().flush(); } }
3. 完成请求 当调用 responseObserver.onCompleted 后,会开始处理请求完成的逻辑
io.grpc.stub.ServerCalls.ServerCallStreamObserverImpl#onCompleted
会先检查请求的状态,如果已经被取消了,且没有取消处理任务,则直接抛出取消状态的异常 如果请求正常完成,会使用 OK 状态关闭情趣,修改请求状态为完成
1 2 3 4 5 6 7 8 9 10 11 12 13 public void onCompleted () { if (cancelled) { if (onCancelHandler == null ) { throw Status.CANCELLED.withDescription("call already cancelled" ).asRuntimeException(); } } else { call.close(Status.OK, new Metadata()); completed = true ; } }
io.grpc.internal.ServerCallImpl#closeInternal
使用指定的状态和响应元数据关闭请求 如果没有发送响应,则会取消请求,并返回 INTERNAL 状态的错误;如果请求正常完成,则调用流关闭的接口,完成请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void closeInternal (Status status, Metadata trailers) { checkState(!closeCalled, "call already closed" ); try { closeCalled = true ; if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) { internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE)); return ; } stream.close(status, trailers); } finally { serverCallTracer.reportCallEnded(status.isOk()); } }
io.grpc.internal.AbstractServerStream#close
关闭流,将响应的 header 写入到帧中;最终通过 Netty 的方法将响应发送给客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final void close (Status status, Metadata trailers) { if (!outboundClosed) { outboundClosed = true ; endOfMessages(); addStatusToTrailers(trailers, status); transportState().setClosedStatus(status); abstractServerStreamSink().writeTrailers(trailers, headersSent, status); } }
io.grpc.internal.AbstractStream#endOfMessages
关闭帧
1 2 3 protected final void endOfMessages () { framer().close(); }
在关闭流时,会将相应的状态和其他 header 发送给客户端
io.grpc.internal.AbstractServerStream#close
1 abstractServerStreamSink().writeTrailers(trailers, headersSent, status);
io.grpc.netty.NettyServerStream.Sink#writeTrailers
1 2 3 4 5 6 7 public void writeTrailers (Metadata trailers, boolean headersSent, Status status) { try { Http2Headers http2Trailers = Utils.convertTrailers(trailers, headersSent); writeQueue.enqueue(SendResponseHeadersCommand.createTrailers(transportState(), http2Trailers, status), true ); } }
2. 提交关闭任务 当发送完响应 Header 和 body 时,会因为已经到达帧末尾,调用closeStreamWhenDone 方法进行关闭
io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler#closeStreamWhenDone
1 2 3 4 5 6 7 8 private void closeStreamWhenDone (ChannelPromise promise, int streamId) throws Http2Exception { final TransportState stream = this .serverStream(this .requireHttp2Stream(streamId)); promise.addListener(new ChannelFutureListener() { public void operationComplete (ChannelFuture future) { stream.complete(); } }); }
io.grpc.internal.AbstractServerStream.TransportState#complete
然后调用流的完成事件,关闭监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void complete () { if (deframerClosed) { deframerClosedTask = null ; closeListener(Status.OK); } else { deframerClosedTask = new Runnable() { @Override public void run () { closeListener(Status.OK); } }; immediateCloseRequested = true ; closeDeframer(true ); } }
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#closedInternal 提交流关闭任务Closed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void closedInternal (final Status status) { if (!status.isOk()) { cancelExecutor.execute(new ContextCloser(context, status.getCause())); } final class Closed extends ContextRunnable { Closed() { super (context); } @Override public void runInContext () { PerfMark.startTask("ServerCallListener(app).closed" , tag); PerfMark.linkIn(link); try { getListener().closed(status); } finally { PerfMark.stopTask("ServerCallListener(app).closed" , tag); } } } callExecutor.execute(new Closed()); }
3. 执行关闭任务 执行 Closed 任务
io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener#closed$Closed
1 2 3 4 5 6 7 8 9 final class Closed extends ContextRunnable { @Override public void runInContext () { try { getListener().closed(status); } } }
io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl#closedInternal
根据状态通知流监听器完成或者取消,最终取消上下文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void closedInternal (Status status) { try { if (status.isOk()) { listener.onComplete(); } else { call.cancelled = true ; listener.onCancel(); } } finally { context.cancel(null ); } }
io.grpc.Context.CancellableContext#cancel 取消上下文,取消所有的超时时间任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public boolean cancel (Throwable cause) { boolean triggeredCancel = false ; synchronized (this ) { if (!cancelled) { cancelled = true ; if (pendingDeadline != null ) { pendingDeadline.cancel(false ); pendingDeadline = null ; } this .cancellationCause = cause; triggeredCancel = true ; } } if (triggeredCancel) { notifyAndClearListeners(); } return triggeredCancel; }