HelloWood

Thrift 服务端异步请求

2021-02-01

Thrift 服务端异步请求

实现

IDL

  • helloworld.thrift
1
2
3
4
5
6
7
8
9
10
11
12
13
namespace java io.github.helloworlde.thrift

struct HelloMessage {
1: required string message,
}

struct HelloResponse {
1: required string message,
}

service HelloService {
HelloResponse sayHello(1: HelloMessage request);
}

服务端

  • Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class AsyncServer {

@SneakyThrows
public static void main(String[] args) {

HelloServiceAsyncImpl helloService = new HelloServiceAsyncImpl();
HelloService.AsyncProcessor<HelloService.AsyncIface> helloServiceProcessor = new HelloService.AsyncProcessor<>(helloService);

TNonblockingServerTransport transport = new TNonblockingServerSocket(9090);

// 配置参数以及处理器
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(transport)
.selectorThreads(4)
.workerThreads(10)
.acceptQueueSizePerThread(20)
.processor(helloServiceProcessor);

TServer server = new TThreadedSelectorServer(serverArgs);
server.serve();
}
}
  • 实现

实现类需要实现 AsyncIface 接口,方法定义中会有一个 AsyncMethodCallback,用于处理响应回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class HelloServiceAsyncImpl implements HelloService.AsyncIface {

@Override
public void sayHello(HelloMessage request, AsyncMethodCallback<HelloResponse> resultHandler) throws TException {
String message = request.getMessage();
log.info("接收到请求: {}", message);

HelloResponse response = new HelloResponse();
response.setMessage("Hello " + message);

resultHandler.onComplete(response);
}
}

请求处理流程

Server 端同步与异步处理的流程区别在于使用的 TProcessor 不同;同步使用 TProcessor,异步使用 TAsyncProcessor;除此之外,其他的流程与使用 NIO 的同步处理没有区别

执行请求

  • org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer#invoke

在处理读取事件时,会将 AsyncFrameBuffer 包装为 Runnable,提交给线程池执行;最终由 AsyncFrameBuffer 处理
会获取 Processor,然后调用 process 方法进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void invoke() {
// 重置 Transport
frameTrans_.reset(buffer_.array());
response_.reset();

try {
// 触发事件
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
// 调用处理器处理
((TAsyncProcessor) processorFactory_.getProcessor(inTrans_)).process(this);
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
// 修改状态
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
  • org.apache.thrift.TBaseAsyncProcessor#process(org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer)

会读取消息,然后根据方法名称获取处理的类,然后获取调用回调,将请求信息和回调作为参数,调用处理函数处理请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void process(final AsyncFrameBuffer fb) throws TException {

final TProtocol in = fb.getInputProtocol();
final TProtocol out = fb.getOutputProtocol();

// 读取消息
final TMessage msg = in.readMessageBegin();
// 获取处理函数
AsyncProcessFunction fn = processMap.get(msg.name);
// 获取空参数
TBase args = fn.getEmptyArgsInstance();

// 读取参数
args.read(in);
in.readMessageEnd();

// 如果是 oneway 调用,则完成
if (fn.isOneway()) {
fb.responseReady();
}

//start off processing function
// 获取处理方法
AsyncMethodCallback resultHandler = fn.getResultHandler(fb, msg.seqid);
try {
// 处理调用
fn.start(iface, args, resultHandler);
} catch (Exception e) {
LOGGER.debug("Exception handling function", e);
resultHandler.onError(e);
}
return;
}
  • io.github.helloworlde.thrift.HelloService.AsyncProcessor.sayHello#start
1
2
3
4
public void start(I iface, sayHello_args args, org.apache.thrift.async.AsyncMethodCallback<HelloResponse> resultHandler) 
throws org.apache.thrift.TException {
iface.sayHello(args.request, resultHandler);
}
  • io.github.helloworlde.thrift.HelloServiceAsyncImpl#sayHello

然后会调用实现类,执行具体的处理逻辑;在处理完成后需要调用回调的相应方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
public class HelloServiceAsyncImpl implements HelloService.AsyncIface {

@Override
public void sayHello(HelloMessage request, AsyncMethodCallback<HelloResponse> resultHandler) throws TException {
String message = request.getMessage();
log.info("接收到请求: {}", message);

HelloResponse response = new HelloResponse();
response.setMessage("Hello " + message);

resultHandler.onComplete(response);
}
}

返回响应

  • org.apache.thrift.async.AsyncMethodCallback#onComplete

请求处理成功的回调,会将响应结果发送出去

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onComplete(HelloResponse o) {
sayHello_result result = new sayHello_result();
result.success = o;
try {
fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY, seqid);
} catch (org.apache.thrift.transport.TTransportException e) {
_LOGGER.error("TTransportException writing to internal frame buffer", e);
fb.close();
} catch (java.lang.Exception e) {
_LOGGER.error("Exception writing to internal frame buffer", e);
onError(e);
}
}
  • org.apache.thrift.AsyncProcessFunction#sendResponse

将方法、消息类型,请求的 ID,响应内容按序写入,然后全部发送给传输层,由传输层发送给客户端;请求处理完成

1
2
3
4
5
6
7
8
9
10
public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TSerializable result, final byte type, final int seqid) throws TException {
TProtocol oprot = fb.getOutputProtocol();

oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid));
result.write(oprot);
oprot.writeMessageEnd();
oprot.getTransport().flush();

fb.responseReady();
}

参考文档

Tags: Thrift