Thrfit 服务端请求处理流程 使用同步的非阻塞的服务端的请求处理流程
实现 IDL
1 2 3 4 5 6 7 8 9 10 11 12 13 namespace java io.github.helloworlde.thriftstruct HelloMessage { 1 : required string message, } struct HelloResponse { 1 : required string message, } service HelloService { HelloResponse sayHello(1 : HelloMessage request); }
服务端实现 使用 TThreadedSelectorServer 作为服务端,支持接收连接,处理 IO 事件,执行请求由不同的线程实现;底层连接使用 ServerSocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class NonblockingServer { @SneakyThrows public static void main (String[] args) { HelloServiceImpl helloService = new HelloServiceImpl(); HelloService.Processor<HelloService.Iface> helloServiceProcessor = new HelloService.Processor<>(helloService); TNonblockingServerTransport transport = new TNonblockingServerSocket(9090 ); TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(transport) .selectorThreads(4 ) .workerThreads(10 ) .acceptQueueSizePerThread(20 ) .processor(helloServiceProcessor); TServer server = new TThreadedSelectorServer(serverArgs); server.serve(); } }
请求处理流程 1. 启动 Server 1 2 TServer server = new TThreadedSelectorServer(serverArgs); server.serve();
org.apache.thrift.server.AbstractNonblockingServer#serve
启动 Server,启动用于连接的线程 AcceptThread 和用于处理 IO 事件的多个线程 SelectorThread;然后开始监听 IO 事件,由线程池处理请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void serve () { if (!startThreads()) { return ; } if (!startListening()) { return ; } setServing(true ); waitForShutdown(); setServing(false ); stopListening(); }
org.apache.thrift.server.TThreadedSelectorServer#startThreads
启动用于连接的线程 AcceptThread 和用于处理 IO 事件的多个线程 SelectorThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 protected boolean startThreads () { try { for (int i = 0 ; i < args.selectorThreads; ++i) { selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); } acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, createSelectorThreadLoadBalancer(selectorThreads)); for (SelectorThread thread : selectorThreads) { thread.start(); } acceptThread.start(); return true ; } catch (IOException e) { LOGGER.error("Failed to start threads!" , e); return false ; } }
org.apache.thrift.transport.TNonblockingServerSocket#listen
开始监听
1 2 3 4 5 6 7 8 9 10 public void listen () throws TTransportException { if (serverSocket_ != null ) { try { serverSocket_.setSoTimeout(0 ); } catch (SocketException sx) { LOGGER.error("Socket exception while setting socket timeout" , sx); } } }
2. 处理连接事件
org.apache.thrift.server.TThreadedSelectorServer.AcceptThread#run
连接事件由 AcceptThread 线程独立处理;会循环监听 Selector事件,当有新的连接事件时,会建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void run () { try { if (eventHandler_ != null ) { eventHandler_.preServe(); } while (!stopped_) { select(); } } finally { acceptSelector.close(); TThreadedSelectorServer.this .stop(); } }
org.apache.thrift.server.TThreadedSelectorServer.AcceptThread#select
会不断从 Selector获取事件,判断如果是 accept 事件,则处理,并建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void select () { try { acceptSelector.select(); Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue ; } if (key.isAcceptable()) { handleAccept(); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!" , e); } }
org.apache.thrift.server.TThreadedSelectorServer.AcceptThread#handleAccept
会通过底层的 ServerSocketChannel 建立连接,然后将这个连接添加到 SelectorThread 的队列中,由 SelectorThread处理 IO 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void handleAccept () { final TNonblockingTransport client = doAccept(); if (client != null ) { final SelectorThread targetThread = threadChooser.nextThread(); if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null ) { doAddAccept(targetThread, client); } else { try { invoker.submit(new Runnable() { public void run () { doAddAccept(targetThread, client); } }); } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected accept registration!" , rx); client.close(); } } } }
org.apache.thrift.transport.TNonblockingServerSocket#acceptImpl
建立连接,返回新的 TNonblockingSocket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected TNonblockingSocket acceptImpl () throws TTransportException { if (serverSocket_ == null ) { throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket." ); } try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel == null ) { return null ; } TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); tsocket.setTimeout(clientTimeout_); return tsocket; } catch (IOException iox) { throw new TTransportException(iox); } }
org.apache.thrift.server.TThreadedSelectorServer.SelectorThread#addAcceptedConnection
将连接添加到 SelectorThread的队列中,由 SelectorThread处理 IO 事件
1 2 3 4 5 6 7 8 9 10 11 public boolean addAcceptedConnection (TNonblockingTransport accepted) { try { acceptedQueue.put(accepted); } catch (InterruptedException e) { LOGGER.warn("Interrupted while adding accepted connection!" , e); return false ; } selector.wakeup(); return true ; }
3. 处理 IO 事件 IO 事件由 SelectorThread 处理
org.apache.thrift.server.TThreadedSelectorServer.SelectorThread#run
轮询读取事件,如果是 IO 事件,则分别处理;如果是新的连接,则注册 Selector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void run () { try { while (!stopped_) { select(); processAcceptedConnections(); processInterestChanges(); } for (SelectionKey selectionKey : selector.keys()) { cleanupSelectionKey(selectionKey); } } catch (Throwable t) { LOGGER.error("run() on SelectorThread exiting due to uncaught error" , t); } finally { selector.close(); TThreadedSelectorServer.this .stop(); } }
org.apache.thrift.server.TThreadedSelectorServer.SelectorThread#select
处理 IO 事件,根据事件类型分别处理读取或者写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void select () { try { doSelect(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped_ && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { cleanupSelectionKey(key); continue ; } if (key.isReadable()) { handleRead(key); } else if (key.isWritable()) { handleWrite(key); } else { LOGGER.warn("Unexpected state in select! " + key.interestOps()); } } } catch (IOException e) { LOGGER.warn("Got an IOException while selecting!" , e); } }
处理读取事件
org.apache.thrift.server.AbstractNonblockingServer.AbstractSelectThread#handleRead
在处理读取事件时,会读取整个帧,当完全读取时,会调用 requestInvoke 方法,通过线程池处理请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void handleRead (SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.read()) { cleanupSelectionKey(key); return ; } if (buffer.isFrameFullyRead()) { if (!requestInvoke(buffer)) { cleanupSelectionKey(key); } } }
org.apache.thrift.server.TThreadedSelectorServer#requestInvoke
处理调用,会将帧封装为 Runnable 任务,提交给线程池执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected boolean requestInvoke (FrameBuffer frameBuffer) { Runnable invocation = getRunnable(frameBuffer); if (invoker != null ) { try { invoker.execute(invocation); return true ; } catch (RejectedExecutionException rx) { LOGGER.warn("ExecutorService rejected execution!" , rx); return false ; } } else { invocation.run(); return true ; } }
处理写入事件
org.apache.thrift.server.AbstractNonblockingServer.AbstractSelectThread#handleWrite
处理写入事件,调用 FrameBuffer 的写入方法进行处理
1 2 3 4 5 6 protected void handleWrite (SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.write()) { cleanupSelectionKey(key); } }
org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer#write
由 Transport 执行写入,最终由 SocketChannel 执行,将响应内容发送给客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public boolean write () { if (state_ == FrameBufferState.WRITING) { if (trans_.write(buffer_) < 0 ) { return false ; } if (buffer_.remaining() == 0 ) { prepareRead(); } return true ; } return false ; }
4. 执行请求
org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer#invoke
在处理读取事件时,会将 FrameBuffer 包装为 Runnable,提交给线程池执行;最终由 FrameBuffer处理 会获取 Processor,然后调用 process 方法进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public void invoke () { frameTrans_.reset(buffer_.array()); response_.reset(); try { if (eventHandler_ != null ) { eventHandler_.processContext(context_, inTrans_, outTrans_); } processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_); responseReady(); return ; } catch (TException te) { } state_ = FrameBufferState.AWAITING_CLOSE; requestSelectInterestChange(); }
org.apache.thrift.TBaseProcessor#process
在处理时,根据方法名获取具体的处理函数,然后调用响应的处理方法进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void process (TProtocol in, TProtocol out) throws TException { TMessage msg = in.readMessageBegin(); ProcessFunction fn = processMap.get(msg.name); if (fn == null ) { TProtocolUtil.skip(in, TType.STRUCT); in.readMessageEnd(); TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '" +msg.name+"'" ); out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); x.write(out); out.writeMessageEnd(); out.getTransport().flush(); } else { fn.process(msg.seqid, in, out, iface); } }
org.apache.thrift.ProcessFunction#process
读取请求信息,反序列化为对象,然后调用 getResult 方法执行实现逻辑,获取响应;如果不是 oneway 的请求,则将相应结果写入流中,发送给客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public final void process (int seqid, TProtocol iprot, TProtocol oprot, I iface) throws TException { T args = getEmptyArgsInstance(); args.read(iprot); iprot.readMessageEnd(); TSerializable result = null ; byte msgType = TMessageType.REPLY; result = getResult(iface, args); if (!isOneway()) { oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid)); result.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); } }
io.github.helloworlde.thrift.HelloService.Processor.sayHello#getResult
由生成的代码处理,会先构建一个响应结构体,然后调用相应的方法进行处理,返回结果
1 2 3 4 5 public sayHello_result getResult (I iface, sayHello_args args) throws org.apache.thrift.TException { sayHello_result result = new sayHello_result(); result.success = iface.sayHello(args.request); return result; }
io.github.helloworlde.thrift.HelloServiceImpl#sayHello
具体的逻辑处理,返回响应
1 2 3 4 5 6 public HelloResponse sayHello (HelloMessage request) throws TException { String message = request.getMessage(); HelloResponse response = new HelloResponse(); response.setMessage("Hello " + message); return response; }
4. 写入响应
org.apache.thrift.ProcessFunction#process
在处理完请求之后,会判断是否是 oneway 请求,如果不是,则会执行写入响应
1 2 3 4 5 6 if (!isOneway()) { oprot.writeMessageBegin(new TMessage(getMethodName(), msgType, seqid)); result.write(oprot); oprot.writeMessageEnd(); oprot.getTransport().flush(); }
org.apache.thrift.protocol.TBinaryProtocol#writeMessageBegin
写入响应时,会先写入响应头;会将版本信息,消息类型,方法的名称和请求 ID 一起写入
1 2 3 4 5 6 7 8 9 10 11 12 public void writeMessageBegin (TMessage message) throws TException { if (strictWrite_) { int version = VERSION_1 | message.type; writeI32(version); writeString(message.name); writeI32(message.seqid); } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } }
io.github.helloworlde.thrift.HelloResponse.HelloResponseStandardScheme#write
随后写入响应内容,将对象序列化为字节
1 2 3 4 5 6 7 8 9 10 11 12 public void write (org.apache.thrift.protocol.TProtocol oprot, HelloResponse struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); if (struct.message != null ) { oprot.writeFieldBegin(MESSAGE_FIELD_DESC); oprot.writeString(struct.message); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); }
然后会写入响应结尾符,由 SelectorThread 处理写入事件,最终将请求发送给客户端
参考文档