gRPC 反射服务 gRPC 提供了 grpc.reflection.v1alpha.ServerReflection 服务,在 Server 端添加后可以通过该服务获取所有服务的信息,包括服务定义,方法,属性等;
可以根据获取到的服务信息调用其他的方法,实现泛化调用;gRPC 调试工具 grpcurl 和 gRPC Swagger 等工具都是通过这种方式实现的
定义 参考 GRPC Server Reflection Protocol 和 reflection.proto
该服务只有一个双向流的方法 ServerReflectionInfo,调用时根据请求参数不同,调用不同的方法进行处理,并返回响应;该方法的流控是非自动的,只有当一个请求完成之后才会获取下一个请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 service ServerReflection { rpc ServerReflectionInfo(stream ServerReflectionRequest) returns (stream ServerReflectionResponse) ; } message ServerReflectionRequest { string host = 1 ; oneof message_request { string file_by_filename = 3 ; string file_containing_symbol = 4 ; ExtensionRequest file_containing_extension = 5 ; string all_extension_numbers_of_type = 6 ; string list_services = 7 ; } }
Server 端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j public class ReflectionServer { @SneakyThrows public static void main(String[] args) { // 构建 Server Server server = NettyServerBuilder.forAddress(new InetSocketAddress(9090)) // 添加服务 .addService(new HelloServiceImpl()) // 添加反射服务 + .addService(ProtoReflectionService.newInstance()) .build(); // 启动 Server server.start(); log.info("服务端启动成功"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { server.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } })); // 保持运行 server.awaitTermination(); } }
Client 端 发起双向流请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Slf4j public class ReflectionClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1" , 9090 ) .usePlaintext() .build(); ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel); StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() { @Override public void onNext (ServerReflectionResponse response) { log.info("{}" , response); } @Override public void onError (Throwable t) { } @Override public void onCompleted () { log.info("Complete" ); } }; StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver); ServerReflectionRequest listServiceRequest = ServerReflectionRequest.newBuilder() .setListServices("" ) .build(); requestStreamObserver.onNext(listServiceRequest); } }
其他的方法使用请参考 ReflectionClient
实现原理 在 Server 端启动时,将反射服务添加到服务中,当客户端触发调用后,会执行 io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex 方法,会从 Server 中获取所有的可变和不可变的服务,遍历获取所有的服务、方法、属性,添加到 ServerReflectionIndex 对象中
io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private ServerReflectionIndex getRefreshedIndex () { synchronized (lock) { Server server = InternalServer.SERVER_CONTEXT_KEY.get(); ServerReflectionIndex index = serverReflectionIndexes.get(server); if (index == null ) { index = new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices()); serverReflectionIndexes.put(server, index); return index; } return index; } }
然后处理请求,会调用 io.grpc.protobuf.services.ProtoReflectionService.ProtoReflectionStreamObserver.handleReflectionRequest 方法,根据请求参数进行判断,使用不同的方法处理,并返回响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void handleReflectionRequest () { if (serverCallStreamObserver.isReady()) { switch (request.getMessageRequestCase()) { case FILE_BY_FILENAME: getFileByName(request); break ; case FILE_CONTAINING_SYMBOL: getFileContainingSymbol(request); break ; case FILE_CONTAINING_EXTENSION: getFileByExtension(request); break ; case ALL_EXTENSION_NUMBERS_OF_TYPE: getAllExtensions(request); break ; case LIST_SERVICES: listServices(request); break ; default : sendErrorResponse(request, Status.Code.UNIMPLEMENTED, "not implemented " + request.getMessageRequestCase()); } request = null ; if (closeAfterSend) { serverCallStreamObserver.onCompleted(); } else { serverCallStreamObserver.request(1 ); } } }
参考文档