gRPC 中泛化调用服务接口
gRPC 没有直接支持泛化调用,protobuf 可以不依赖于生成的代码实现调用,所以可以通过反射接口间接实现泛化调用
要求 Server 端提供 grpc.reflection.v1alpha.ServerReflection 服务,用于获取服务的描述文件
大致的流程是:
- 根据方法名称,调用服务端反射服务的方法,获取方法所在 proto 文件的描述
- 根据 proto 描述文件,获取文件描述、服务描述,用于重新构建要被调用方法的方法描述
MethodDescriptor
- 根据方法描述,将请求内容序列化为对应的类型
- 使用重新构建的
MethodDescriptor和其他参数对 Server 端相应的方法发起调用
- 解析响应并返回
实现
使用 JSON 格式请求被调用的服务方法,并返回 JSON 格式的响应
proto 定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| syntax = "proto3";
package io.github.helloworlde.grpc;
option go_package = "api;grpc_gateway"; option java_package = "io.github.helloworlde.grpc"; option java_multiple_files = true; option java_outer_classname = "HelloWorldGrpc";
service HelloService{ rpc SayHello(HelloMessage) returns (HelloResponse){ } }
message HelloMessage { string message = 2; }
message HelloResponse { string message = 1; }
|
调用
1. 构建反射服务 Stub
需要调用反射服务的方法,该方法是双向流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| ManagedChannel channel=ManagedChannelBuilder.forAddress("127.0.0.1",9090) .usePlaintext() .build();
ServerReflectionGrpc.ServerReflectionStub reflectionStub=ServerReflectionGrpc.newStub(channel);
StreamObserver<ServerReflectionResponse> streamObserver=new StreamObserver<ServerReflectionResponse>(){ @Override public void onNext(ServerReflectionResponse response){ }
@Override public void onError(Throwable t){
}
@Override public void onCompleted(){ log.info("Complete"); } };
StreamObserver<ServerReflectionRequest> requestStreamObserver=reflectionStub.serverReflectionInfo(streamObserver);
|
2. 根据方法名称获取文件描述
这里的 methodSymbol 即服务或方法的限定名,可以是 package.service 或者 package.service.method
,如 io.github.helloworlde.grpc.HelloService.SayHello,需要注意方法前是 .不是/
1 2 3 4 5
| ServerReflectionRequest getFileContainingSymbolRequest=ServerReflectionRequest.newBuilder() .setFileContainingSymbol(methodSymbol) .build(); requestStreamObserver.onNext(getFileContainingSymbolRequest);
|
3. 处理响应,解析 FileDescriptor
返回的响应后会触发 onNext 方法,如果响应类型是文件描述类型,即 FILE_DESCRIPTOR_RESPONSE,则进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void onNext(ServerReflectionResponse response) { try { if (response.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) { List<ByteString> fileDescriptorProtoList = response.getFileDescriptorResponse().getFileDescriptorProtoList(); handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContent); } else { log.warn("未知响应类型: " + response.getMessageResponseCase()); } } catch (Exception e) { log.error("处理响应失败: {}", e.getMessage(), e); } }
|
在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) { try { String fullServiceName = extraPrefix(methodFullName); String methodName = extraSuffix(methodFullName); String packageName = extraPrefix(fullServiceName); String serviceName = extraSuffix(fullServiceName);
Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);
Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName); Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);
executeCall(channel, fileDescriptor, methodDescriptor, requestContent); } catch (Exception e) { log.error(e.getMessage(), e); } }
|
根据响应找到方法对应的文件的 FileDescriptorProto,然后构建出对应的 FileDescriptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList, String packageName, String serviceName) throws Exception {
Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap = fileDescriptorProtoList.stream() .map(bs -> { try { return DescriptorProtos.FileDescriptorProto.parseFrom(bs); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } return null; }) .filter(Objects::nonNull) .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));
if (fileDescriptorProtoMap.isEmpty()) { log.error("服务不存在"); throw new IllegalArgumentException("方法的文件描述不存在"); }
DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);
Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);
return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies); }
|
4. 执行调用
在执行调用时,需要重新生成 MethodDescriptor;因为获取到的 MethodDescriptor 中的方法全名是package.service.method
格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) { String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName()); MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType()) .buildPartial()); MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType()) .buildPartial());
return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder() .setFullMethodName(fullMethodName) .setRequestMarshaller(inputTypeMarshaller) .setResponseMarshaller(outputTypeMarshaller) .setType(MethodDescriptor.MethodType.UNKNOWN) .build(); }
|
同时需要根据文件描述,将请求的类型转为对应的请求类型,生成 DynamicMessage 对象;然后根据方法类型,使用MethodDescriptor 和 CallOptions
发起请求;当接收到响应后将 DynamicMessage 解析为对应的格式的字符串;完成调用
参考文档