Thrift 客户端异步请求 实现 IDL
1 2 3 4 5 6 7 8 9 10 11 12 13 namespace java io.github.helloworlde.thriftstruct HelloMessage { 1 : required string message, } struct HelloResponse { 1 : required string message, } service HelloService { HelloResponse sayHello(1 : HelloMessage request); }
客户端 异步客户端调用使用 AsyncClient,传输层使用 TNonblockingSocket;需要实现 AsyncMethodCallback作为回调,用于处理请求结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class AsyncClient { public static void main (String[] args) throws InterruptedException { try { TAsyncClientManager clientManager = new TAsyncClientManager(); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); HelloService.AsyncClient.Factory factory = new HelloService.AsyncClient.Factory(clientManager, protocolFactory); TNonblockingTransport nonblockingTransport = new TNonblockingSocket("localhost" , 9090 ); HelloService.AsyncClient asyncClient = factory.getAsyncClient(nonblockingTransport); AsyncMethodCallback<HelloResponse> callback = new AsyncMethodCallback<HelloResponse>() { @Override public void onComplete (HelloResponse response) { log.info("响应结果: {}" , response.getMessage()); } @Override public void onError (Exception exception) { log.error("请求失败: {}" , exception.getMessage(), exception); } }; HelloMessage request = new HelloMessage(); request.setMessage("Async Thrift" ); asyncClient.sayHello(request, callback); } catch (TException | IOException e) { e.printStackTrace(); } Thread.sleep(3_000 ); } }
请求处理流程 构建 Client 和回调 1. 构建 Client 异步的客户端由抽象类 TAsyncClient 定义,实现类继承了 TAsyncClient,同时实现了 AsyncIface; 在构建其 Factory 时需要三个参数,分别是 TAsyncClientManager,用于管理调用请求的所有流程;和 TProtocolFactory,用于获取协议;还有 TNonblockingTransport,用于底层的传输,必须是非阻塞的
1 2 3 4 5 6 7 8 TAsyncClientManager clientManager = new TAsyncClientManager(); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); HelloService.AsyncClient.Factory factory = new HelloService.AsyncClient.Factory(clientManager, protocolFactory); TNonblockingTransport nonblockingTransport = new TNonblockingSocket("localhost" , 9090 ); HelloService.AsyncClient asyncClient = factory.getAsyncClient(nonblockingTransport);
2. 构建回调 回调由 AsyncMethodCallback 接口定义,有两个方法,分别是用于处理正常返回的 onComplete 和处理异常结果的 onError
1 2 3 4 5 6 7 8 9 10 11 AsyncMethodCallback<HelloResponse> callback = new AsyncMethodCallback<HelloResponse>() { @Override public void onComplete (HelloResponse response) { log.info("响应结果: {}" , response.getMessage()); } @Override public void onError (Exception exception) { log.error("请求失败: {}" , exception.getMessage(), exception); } };
执行请求 发起请求 调用接口时需要两个参数,请求内容和响应回调
1 2 3 4 5 HelloMessage request = new HelloMessage(); request.setMessage("Async Thrift" ); asyncClient.sayHello(request, callback);
io.github.helloworlde.thrift.HelloService.AsyncClient#sayHello
会构建 sayHello_call,该类是 TAsyncMethodCall 的实现类,支持写入消息和获取结果,然后将该实例作为参数调用 org.apache.thrift.async.TAsyncClientManager#call 方法
1 2 3 4 5 6 public void sayHello (HelloMessage request, org.apache.thrift.async.AsyncMethodCallback<HelloResponse> resultHandler) throws org.apache.thrift.TException { checkReady(); sayHello_call method_call = new sayHello_call(request, resultHandler, this , ___protocolFactory, ___transport); this .___currentMethod = method_call; ___manager.call(method_call); }
org.apache.thrift.async.TAsyncClientManager#call
初始化请求,将请求加入到队列中,然后唤醒 Selector 执行
1 2 3 4 5 6 7 8 9 10 public void call (TAsyncMethodCall method) throws TException { if (!isRunning()) { throw new TException("SelectThread is not running" ); } method.prepareMethodCall(); pendingCalls.add(method); selectThread.getSelector().wakeup(); }
org.apache.thrift.async.TAsyncMethodCall#prepareMethodCall
执行初始化请求,,将请求消息写入到 Protocol 中,然后封装为 ByteBuffer
1 2 3 4 5 6 7 8 9 10 11 protected void prepareMethodCall () throws TException { TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE); TProtocol protocol = protocolFactory.getProtocol(memoryBuffer); write_args(protocol); int length = memoryBuffer.length(); frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0 , length); TFramedTransport.encodeFrameSize(length, sizeBufferArray); sizeBuffer = ByteBuffer.wrap(sizeBufferArray); }
io.github.helloworlde.thrift.HelloService.AsyncClient.sayHello_call#write_args
将消息写入到流中
1 2 3 4 5 6 7 public void write_args (org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("sayHello" , org.apache.thrift.protocol.TMessageType.CALL, 0 )); sayHello_args args = new sayHello_args(); args.setRequest(request); args.write(prot); prot.writeMessageEnd(); }
org.apache.thrift.async.TAsyncClientManager.SelectThread#run
由 SelectThread 处理请求,在执行时,会先修改 TAsyncMethodCall 的状态,如果需要连接,则执行建立连接;如果需要写入或者读取则执行写入或者读取;然后会检查请求是否超时,如果没有超时,则开始执行待调用的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public void run () { while (running) { try { try { if (timeoutWatchSet.size() == 0 ) { selector.select(); } else { long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp(); long selectTime = nextTimeout - System.currentTimeMillis(); if (selectTime > 0 ) { selector.select(selectTime); } else { selector.selectNow(); } } } catch (IOException e) { LOGGER.error("Caught IOException in TAsyncClientManager!" , e); } transitionMethods(); timeoutMethods(); startPendingMethods(); } catch (Exception exception) { LOGGER.error("Ignoring uncaught exception in SelectThread" , exception); } } try { selector.close(); } catch (IOException ex) { LOGGER.warn("Could not close selector. This may result in leaked resources!" , ex); } }
org.apache.thrift.async.TAsyncClientManager.SelectThread#startPendingMethods
从队列中获取待执行的请求,然后注册选择器,根据状态进行相应的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void startPendingMethods () { TAsyncMethodCall methodCall; while ((methodCall = pendingCalls.poll()) != null ) { try { methodCall.start(selector); TAsyncClient client = methodCall.getClient(); if (client.hasTimeout() && !client.hasError()) { timeoutWatchSet.add(methodCall); } } catch (Exception exception) { LOGGER.warn("Caught exception in TAsyncClientManager!" , exception); methodCall.onError(exception); } } }
org.apache.thrift.async.TAsyncMethodCall#start
根据状态向 Transport 注册相应的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void start (Selector sel) throws IOException { SelectionKey key; if (transport.isOpen()) { state = State.WRITING_REQUEST_SIZE; key = transport.registerSelector(sel, SelectionKey.OP_WRITE); } else { state = State.CONNECTING; key = transport.registerSelector(sel, SelectionKey.OP_CONNECT); if (transport.startConnect()) { registerForFirstWrite(key); } } key.attach(this ); }
org.apache.thrift.async.TAsyncMethodCall#doWritingRequestBody
在 org.apache.thrift.async.TAsyncMethodCall#transition 执行建立连接,写入请求头后开始写入请求体;当写入完成后,,会根据请求类型做相应操作;如果是 oneway,则执行请求回调流程并完成请求;如果不是,则修改状态,等待读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void doWritingRequestBody (SelectionKey key) throws IOException { if (transport.write(frameBuffer) < 0 ) { throw new IOException("Write call frame failed" ); } if (frameBuffer.remaining() == 0 ) { if (isOneway) { cleanUpAndFireCallback(key); } else { state = State.READING_RESPONSE_SIZE; sizeBuffer.rewind(); key.interestOps(SelectionKey.OP_READ); } } }
处理响应 在请求发送完成后,SelectionKey的状态被修改为 READING_RESPONSE_SIZE,当接收到服务端的响应后,会先读取响应大小,然后将状态修改为 READING_RESPONSE_BODY,然后读取响应内容
org.apache.thrift.async.TAsyncMethodCall#doReadingResponseBody
会读取所有的响应内容到缓冲区,然后执行请求完成流程
1 2 3 4 5 6 7 8 private void doReadingResponseBody (SelectionKey key) throws IOException { if (transport.read(frameBuffer) < 0 ) { throw new IOException("Read call frame failed" ); } if (frameBuffer.remaining() == 0 ) { cleanUpAndFireCallback(key); } }
org.apache.thrift.async.TAsyncMethodCall#cleanUpAndFireCallback
修改请求状态,释放 SelectionKey,然后获取响应结果,执行回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void cleanUpAndFireCallback (SelectionKey key) { state = State.RESPONSE_READ; key.interestOps(0 ); key.attach(null ); try { T result = this .getResult(); client.onComplete(); callback.onComplete(result); } catch (Exception e) { key.cancel(); onError(e); } }
io.github.helloworlde.thrift.HelloService.AsyncClient.sayHello_call#getResult
1 2 3 4 5 6 7 8 public HelloResponse getResult () throws org.apache.thrift.TException { if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { throw new java.lang.IllegalStateException("Method call not finished!" ); } org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); return (new Client(prot)).recv_sayHello(); }
io.github.helloworlde.thrift.HelloService.Client#recv_sayHello
会先构建响应对象,然后获取结果内容,作为结果返回
1 2 3 4 5 6 7 8 public HelloResponse recv_sayHello () throws org.apache.thrift.TException { sayHello_result result = new sayHello_result(); receiveBase(result, "sayHello" ); if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result" ); }
org.apache.thrift.TServiceClient#receiveBase
从 TProtocol 中读取响应内容,根据响应的结果,如果是正常则由相应类读取并解析为其实例,如果异常则抛出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected void receiveBase (TBase<?, ?> result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin(); if (msg.type == TMessageType.EXCEPTION) { TApplicationException x = new TApplicationException(); x.read(iprot_); iprot_.readMessageEnd(); throw x; } if (msg.seqid != seqid_) { throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, String.format("%s failed: out of sequence response: expected %d but got %d" , methodName, seqid_, msg.seqid)); } result.read(iprot_); iprot_.readMessageEnd(); }
org.apache.thrift.async.AsyncMethodCallback#onComplete
最终执行回调
1 2 3 public void onComplete (HelloResponse response) { log.info("响应结果: {}" , response.getMessage()); }
参考文档