gRPC Server 端关闭流程 关闭 Server 关闭 Server 可以使用 shutdown 或者 shutdownNow 方法
shutdown 1 server.shutdown().awaitTermination(10 , TimeUnit.SECONDS);
io.grpc.internal.ServerImpl#shutdown
开始顺序的关闭 Server,已经存在的请求会继续执行,新的请求会被拒绝
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public ServerImpl shutdown () { boolean shutdownTransportServers; synchronized (lock) { if (shutdown) { return this ; } shutdown = true ; shutdownTransportServers = started; if (!shutdownTransportServers) { transportServersTerminated = true ; checkForTermination(); } } if (shutdownTransportServers) { for (InternalServer ts : transportServers) { ts.shutdown(); } } return this ; }
关闭时,首先会检查 Server 是否已经关闭了,如果已经关闭了,则抛出异常;如果没有关闭,则会修改关闭状态,返huan连接池,通知其他的锁; 然后会遍历所有的 Server,调用其 shutdown 方法进行关闭
io.grpc.netty.NettyServer#shutdown
关闭 NettySerer,添加关闭事件监听器,并等待关闭;在监听器中会释放资源,关闭协议协调器,关闭 Transport 等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void shutdown () { if (channel == null || !channel.isOpen()) { return ; } channel.close().addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { log.log(Level.WARNING, "Error shutting down server" , future.cause()); } InternalInstrumented<SocketStats> stats = listenSocketStats; listenSocketStats = null ; if (stats != null ) { channelz.removeListenSocket(stats); } sharedResourceReferenceCounter.release(); protocolNegotiator.close(); synchronized (NettyServer.this ) { listener.serverShutdown(); } } }); try { channel.closeFuture().await(); } catch (InterruptedException e) { log.log(Level.FINE, "Interrupted while shutting down" , e); Thread.currentThread().interrupt(); } }
io.grpc.internal.ServerImpl.ServerListenerImpl#serverShutdown
监听 Server 关闭事件,根据关闭的状态,选择调用 Transport 的 shutdown 或者 shutdownNow 关闭 ServerTransport
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void serverShutdown () { ArrayList<ServerTransport> copiedTransports; Status shutdownNowStatusCopy; synchronized (lock) { activeTransportServers--; if (activeTransportServers != 0 ) { return ; } copiedTransports = new ArrayList<>(transports); shutdownNowStatusCopy = shutdownNowStatus; serverShutdownCallbackInvoked = true ; } for (ServerTransport transport : copiedTransports) { if (shutdownNowStatusCopy == null ) { transport.shutdown(); } else { transport.shutdownNow(shutdownNowStatusCopy); } } synchronized (lock) { transportServersTerminated = true ; checkForTermination(); } }
io.grpc.netty.NettyServerTransport#shutdown
最终在 Transport 中调用了 Netty Channel 的关闭方法,进行关闭
1 2 3 4 5 6 @Override public void shutdown () { if (channel.isOpen()) { channel.close(); } }
shutdownNow 立即关闭 Server,已经存在的请求和新的请求都会被拒绝;尽管是强制的,但是 Server 并不会瞬间关闭
io.grpc.internal.ServerImpl#shutdownNow
立即关闭时,会先调用 shutdown 方法执行正常的关闭流程,然后修改关闭状态;遍历所有的 ServerTransport,调用其 shutdownNow 方法进行关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public ServerImpl shutdownNow () { shutdown(); Collection<ServerTransport> transportsCopy; Status nowStatus = Status.UNAVAILABLE.withDescription("Server shutdownNow invoked" ); boolean savedServerShutdownCallbackInvoked; synchronized (lock) { if (shutdownNowStatus != null ) { return this ; } shutdownNowStatus = nowStatus; transportsCopy = new ArrayList<>(transports); savedServerShutdownCallbackInvoked = serverShutdownCallbackInvoked; } if (savedServerShutdownCallbackInvoked) { for (ServerTransport transport : transportsCopy) { transport.shutdownNow(nowStatus); } } return this ; }
io.grpc.netty.NettyServerTransport#shutdownNow
ServerTransport 的 shutdownNow 会提交一个强制关闭的指令,并清空 channel,执行关闭
1 2 3 4 5 public void shutdownNow (Status reason) { if (channel.isOpen()) { channel.writeAndFlush(new ForcefulCloseCommand(reason)); } }