Thrfit 中的 Server
Thrift 中有多种 Server 的实现,支持单线程、多线程、异步等多种方式
Server 定义
属性
processorFactory_ : 处理器工厂
serverTransport_: 服务端 Transport
eventHandler_ : 事件监听器,可以监听 Server 所有启动、关闭、处理请求相关的事件
inputTransportFactory_ : 输入流工厂
outputTransportFactory_ : 输出流工厂
inputProtocolFactory_ : 输入流协议工厂
outputProtocolFactory_ : 输出流协议工厂
方法
启动 Server,监听端口,对外提供服务
1
| public abstract void serve();
|
关闭 Server,断开连接,释放并清除资源
实现类

阻塞
Server 的简单实现,是单线程阻塞的 Server,连接实现取决于 TServerTransport具体类型;用于测试场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public void serve() { serverTransport_.listen();
eventHandler_.preServe();
setServing(true);
while (!stopped_) { client = serverTransport_.accept(); connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol); while (true) { eventHandler_.processContext(connectionContext, inputTransport, outputTransport); processor.process(inputProtocol, outputProtocol); } }
eventHandler_.deleteContext(connectionContext, inputProtocol, outputProtocol);
inputTransport.close(); outputTransport.close(); setServing(false); }
|
在 TSimpleServer 的基础上优化,使用了线程池处理请求;构建参数中可以指定创建线程池的参数,支持线程池饱和后超时;连接实现取决于 TServerTransport具体类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public void serve() { if (!preServe()) { return; }
execute(); waitForShutdown();
setServing(false); }
protected boolean preServe() { serverTransport_.listen(); eventHandler_.preServe(); stopped_ = false; setServing(true);
return true; }
protected void execute() { while (!stopped_) { TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); while (true) { executorService_.execute(wp); break; } } }
|
非阻塞
- AbstractNonblockingServer
AbstractNonblockingServer 是非阻塞的 Server 的抽象类;非阻塞 Server 有独立的线程分别处理连接和处理请求;底层实现变为 NIO,读取和写入由 FrameBuffer 处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public void serve() { if (!startThreads()) { return; } if (!startListening()) { return; } setServing(true); waitForShutdown(); setServing(false); stopListening(); }
protected void handleRead(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.read()) { cleanupSelectionKey(key); return; }
if (buffer.isFrameFullyRead()) { if (!requestInvoke(buffer)) { cleanupSelectionKey(key); } } }
protected void handleWrite(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.write()) { cleanupSelectionKey(key); } }
|
THsHaServer 是半同步半异步的 Server,继承自TNonblockingServer,是指处理连接和 IO 事件是同步的,处理请求使用线程池,是异步的;与 TThreadPoolServer类似,不过连接使用的是 NIO;处理连接和 IO 事件的逻辑使用 AbstractNonblockingServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public void run() { eventHandler_.preServe(); while (!stopped_) { select(); processInterestChanges(); } for (SelectionKey selectionKey : selector.keys()) { cleanupSelectionKey(selectionKey); } }
protected boolean requestInvoke(FrameBuffer frameBuffer) { Runnable invocation = getRunnable(frameBuffer); invoker.execute(invocation); return true; }
|
TThreadedSelectorServer 的性能优于 TNonblockingServer 和 THsHaServer,可以配置多个处理 IO 事件的线程,有独立的处理连接的线程,以及单独执行请求的线程池
会由 AcceptThread 建立连接,将连接信息添加到队列中;由 SelectorThread 处理 IO 事件,由线程池执行请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public void run() { eventHandler_.preServe(); while (!stopped_) { select(); } }
public void run() { while (!stopped_) { select(); processAcceptedConnections(); processInterestChanges(); }
for (SelectionKey selectionKey : selector.keys()) { cleanupSelectionKey(selectionKey); } }
protected boolean requestInvoke(FrameBuffer frameBuffer) { Runnable invocation = getRunnable(frameBuffer); if (invoker != null) { invoker.execute(invocation); return true; } else { invocation.run(); return true; } }
|