HelloWood

gRPC 反射服务

2021-01-17

gRPC 反射服务

gRPC 提供了 grpc.reflection.v1alpha.ServerReflection 服务,在 Server 端添加后可以通过该服务获取所有服务的信息,包括服务定义,方法,属性等;

可以根据获取到的服务信息调用其他的方法,实现泛化调用;gRPC 调试工具 grpcurlgRPC Swagger 等工具都是通过这种方式实现的

定义

参考 GRPC Server Reflection Protocolreflection.proto

该服务只有一个双向流的方法 ServerReflectionInfo,调用时根据请求参数不同,调用不同的方法进行处理,并返回响应;该方法的流控是非自动的,只有当一个请求完成之后才会获取下一个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
service ServerReflection {
rpc ServerReflectionInfo(stream ServerReflectionRequest) returns (stream ServerReflectionResponse);
}

message ServerReflectionRequest {
string host = 1;
oneof message_request {
// 根据服务名查询 proto 文件
string file_by_filename = 3;

// 根据名称获取 proto 文件,如 <package>.<service>[.<method>] 或 <package>.<type>
string file_containing_symbol = 4;

// 根据 message 类型和序号获取 proto 文件
ExtensionRequest file_containing_extension = 5;

// 查找给定消息类型的所有已知扩展使用的标记号,并将它们以未定义的顺序附加到ExtensionNumberResponse
string all_extension_numbers_of_type = 6;

// 查询所有的服务
string list_services = 7;
}
}

Server 端

  • 服务实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class ReflectionServer {

@SneakyThrows
public static void main(String[] args) {
// 构建 Server
Server server = NettyServerBuilder.forAddress(new InetSocketAddress(9090))
// 添加服务
.addService(new HelloServiceImpl())
// 添加反射服务
+ .addService(ProtoReflectionService.newInstance())
.build();

// 启动 Server
server.start();
log.info("服务端启动成功");

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));

// 保持运行
server.awaitTermination();
}
}

Client 端

发起双向流请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
public class ReflectionClient {

public static void main(String[] args) throws InterruptedException {
// 构建 Channel
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9090)
.usePlaintext()
.build();

// 使用 Channel 构建 BlockingStub
ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);

StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
@Override
public void onNext(ServerReflectionResponse response) {
log.info("{}", response);
}

@Override
public void onError(Throwable t) {

}

@Override
public void onCompleted() {
log.info("Complete");
}
};

StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);

// 列举所有的服务
ServerReflectionRequest listServiceRequest = ServerReflectionRequest.newBuilder()
.setListServices("")
.build();
requestStreamObserver.onNext(listServiceRequest);
}
}

其他的方法使用请参考 ReflectionClient

实现原理

在 Server 端启动时,将反射服务添加到服务中,当客户端触发调用后,会执行 io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex 方法,会从 Server 中获取所有的可变和不可变的服务,遍历获取所有的服务、方法、属性,添加到 ServerReflectionIndex 对象中

  • io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private ServerReflectionIndex getRefreshedIndex() {
synchronized (lock) {
Server server = InternalServer.SERVER_CONTEXT_KEY.get();
ServerReflectionIndex index = serverReflectionIndexes.get(server);

if (index == null) {
index = new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices());
serverReflectionIndexes.put(server, index);
return index;
}

// 更新可变服务信息 ...
return index;
}
}

然后处理请求,会调用 io.grpc.protobuf.services.ProtoReflectionService.ProtoReflectionStreamObserver.handleReflectionRequest 方法,根据请求参数进行判断,使用不同的方法处理,并返回响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void handleReflectionRequest() {
if (serverCallStreamObserver.isReady()) {
switch (request.getMessageRequestCase()) {
case FILE_BY_FILENAME:
getFileByName(request);
break;
case FILE_CONTAINING_SYMBOL:
getFileContainingSymbol(request);
break;
case FILE_CONTAINING_EXTENSION:
getFileByExtension(request);
break;
case ALL_EXTENSION_NUMBERS_OF_TYPE:
getAllExtensions(request);
break;
case LIST_SERVICES:
listServices(request);
break;
default:
sendErrorResponse(request, Status.Code.UNIMPLEMENTED, "not implemented " + request.getMessageRequestCase());
}
request = null;
// 如果在发送完成后关闭,则关闭流,否则要求下一个请求
if (closeAfterSend) {
serverCallStreamObserver.onCompleted();
} else {
serverCallStreamObserver.request(1);
}
}
}

参考文档

Tags: gRPC