gRPC 重试流程
当第一次调用失败,流监听器关闭的时候,会根据请求的处理状态和方法的配置,判断是否需要重试
请求的处理状态有三种,在io.grpc.internal.ClientStreamListener.RpcProgress中定义:
PROCESSED: 请求被正常处理,按照返回的状态码决定是否要重试
REFUSED: 没有被服务端的应用逻辑层处理,直接重试,不计入重试次数
DROPPED: 请求被负载均衡丢弃了,不重试,如果是对冲则取消其他的对冲请求,直接提交
发起请求
- io.grpc.stub.ClientCalls#blockingUnaryCall

- io.grpc.internal.ClientCallImpl#startInternal

- io.grpc.internal.ManagedChannelImpl.ChannelTransportProvider#newRetriableStream

- io.grpc.internal.RetriableStream#start

- io.grpc.internal.RetriableStream#createSubstream

- io.grpc.internal.ManagedChannelImpl.ChannelTransportProvider#newRetriableStream#RetryStream#newSubstream

- 通过生成的代码中的方法,调用
io.grpc.stub.ClientCalls#blockingUnaryCall
首先通过 channel 创建ClientCall,然后通过 futureUnaryCall 提交请求,返回 Future,根据返回的 Future 循环等待,通过executor.waitAndDrain()执行请求,直到 Future 完成,返回结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static <ReqT, RespT> RespT blockingUnaryCall(Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) .withExecutor(executor)); try { ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req); while (!responseFuture.isDone()) { try { executor.waitAndDrain(); } catch (InterruptedException e) { interrupt = true; call.cancel("Thread interrupted", e); } } return getUnchecked(responseFuture); } }
|
- 执行 unary 调用
futureUnaryCall
这里创建了 Future,通过 asyncUnaryRequestCall 继续调用
1 2 3 4 5 6 7
| public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) { GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call); asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture)); return responseFuture; }
|
- 异步调用
执行调用,发送消息
1 2 3 4 5 6 7 8 9 10 11 12
| private static <ReqT, RespT> void asyncUnaryRequestCall(ClientCall<ReqT, RespT> call, ReqT req, StartableListener<RespT> responseListener) { startCall(call, responseListener); try { call.sendMessage(req); call.halfClose(); } }
|
- 开始一次调用,通过 responseListener 处理返回响应
开始调用,并启动响应监听器
1 2 3 4 5
| private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call, StartableListener<RespT> responseListener) { call.start(responseListener, new Metadata()); responseListener.onStart(); }
|
- 执行请求调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| private void startInternal(final Listener<RespT> observer, Metadata headers) { checkState(stream == null, "Already started"); checkState(!cancelCalled, "call was cancelled"); checkNotNull(observer, "observer"); checkNotNull(headers, "headers");
if (context.isCancelled()) { stream = NoopClientStream.INSTANCE; executeCloseObserverInContext(observer, statusFromCancelled(context)); return; }
final String compressorName = callOptions.getCompressor(); Compressor compressor; if (compressorName != null) { compressor = compressorRegistry.lookupCompressor(compressorName); if (compressor == null) { stream = NoopClientStream.INSTANCE; Status status = Status.INTERNAL.withDescription(String.format("Unable to find compressor by name %s", compressorName)); executeCloseObserverInContext(observer, status); return; } } else { compressor = Codec.Identity.NONE; }
prepareHeaders(headers, decompressorRegistry, compressor, fullStreamDecompression);
Deadline effectiveDeadline = effectiveDeadline(); boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline.isExpired(); if (!deadlineExceeded) { if (retryEnabled) { stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context); } else { ClientTransport transport = clientTransportProvider.get(new PickSubchannelArgsImpl(method, headers, callOptions)); Context origContext = context.attach(); try { stream = transport.newStream(method, headers, callOptions); } finally { context.detach(origContext); } } } else { stream = new FailingClientStream(DEADLINE_EXCEEDED.withDescription("ClientCall started after deadline exceeded: " + effectiveDeadline)); }
if (callExecutorIsDirect) { stream.optimizeForDirectExecutor(); } if (callOptions.getAuthority() != null) { stream.setAuthority(callOptions.getAuthority()); } if (callOptions.getMaxInboundMessageSize() != null) { stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize()); } if (callOptions.getMaxOutboundMessageSize() != null) { stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize()); } if (effectiveDeadline != null) { stream.setDeadline(effectiveDeadline); } stream.setCompressor(compressor); if (fullStreamDecompression) { stream.setFullStreamDecompression(fullStreamDecompression); } stream.setDecompressorRegistry(decompressorRegistry); channelCallsTracer.reportCallStarted(); cancellationListener = new ContextCancellationListener(observer); stream.start(new ClientStreamListenerImpl(observer));
context.addListener(cancellationListener, directExecutor());
if (effectiveDeadline != null && !effectiveDeadline.equals(context.getDeadline()) && deadlineCancellationExecutor != null && !(stream instanceof FailingClientStream)) { deadlineCancellationNotifyApplicationFuture = startDeadlineNotifyApplicationTimer(effectiveDeadline, observer); }
if (cancelListenersShouldBeRemoved) { removeContextListenerAndCancelDeadlineFuture(); } }
|
计算下次请求时间间隔
下次重试请求的时间间隔不固定,由 initialBackoffNanos,backoffMultiplier, maxBackoffNanos 和随机数共同决定
第一次延时是 初始延时 initialBackoffNanos 乘以随机数
在获得了第一次延时之后,会计算下一次延时;下一次延时是前一次延时 nextBackoffIntervalNanos乘以退避指数 backoffMultiplier,与最大延时 maxBackoffNanos比较,取最小的,然后再乘以随机数
1 2
| nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble())
|
在获得了第一次延时之后,会计算下一次延时;下一次延时是前一次延时 nextBackoffIntervalNanos乘以退避指数 backoffMultiplier,与最大延时 maxBackoffNanos比较,取最小的,然后再乘以随机数
1 2 3 4
| nextBackoffIntervalNanos = Math.min((long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier), retryPolicy.maxBackoffNanos);
backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble())
|