Thrfit 客户端请求处理流程 使用同步的非阻塞的服务端和客户端的请求处理流程
实现 IDL
1 2 3 4 5 6 7 8 9 10 11 12 13 namespace java io.github.helloworlde.thriftstruct HelloMessage { 1 : required string message, } struct HelloResponse { 1 : required string message, } service HelloService { HelloResponse sayHello(1 : HelloMessage request); }
客户端实现 使用 TSocket 作为底层连接,协议使用 TBinaryProtocol
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 try { TTransport transport = new TSocket("localhost" , 9090 ); transport.open(); TProtocol protocol = new TBinaryProtocol(transport); HelloService.Client client = new HelloService.Client(protocol); HelloMessage request = new HelloMessage(); request.setMessage("Thrift" ); HelloResponse response = client.sayHello(request); log.info("返回响应: {}" , response.getMessage()); } catch (TException e) { e.printStackTrace(); }
请求处理流程 1. 建立连接
org.apache.thrift.transport.TSocket#open
初始化 Socket,建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void open () throws TTransportException { if (socket_ == null ) { initSocket(); } try { socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_); inputStream_ = new BufferedInputStream(socket_.getInputStream()); outputStream_ = new BufferedOutputStream(socket_.getOutputStream()); } catch (IOException iox) { close(); throw new TTransportException(TTransportException.NOT_OPEN, iox); } }
2. 执行请求 使用 TProtocol 构建 TServiceClient,用于发送同步请求
io.github.helloworlde.thrift.HelloService.Client#sayHello
1 2 3 4 public HelloResponse sayHello (HelloMessage request) throws org.apache.thrift.TException { send_sayHello(request); return recv_sayHello(); }
发送请求
io.github.helloworlde.thrift.HelloService.Client#send_sayHello
其中的 sayHello_args 用于读写结构体,将消息内容转换为相应格式的字节
1 2 3 4 5 public void send_sayHello (HelloMessage request) throws org.apache.thrift.TException { sayHello_args args = new sayHello_args(); args.setRequest(request); sendBase("sayHello" , args); }
org.apache.thrift.TServiceClient#sendBase(java.lang.String, org.apache.thrift.TBase,?>)
这里设置了消息类型是调用
1 2 3 protected void sendBase (String methodName, TBase<?, ?> args) throws TException { sendBase(methodName, args, TMessageType.CALL); }
org.apache.thrift.TServiceClient#sendBase(java.lang.String, org.apache.thrift.TBase,?>, byte)
在写入请求,写入请求头,写入消息体,然后写入结尾符,将请求发送出去
1 2 3 4 5 6 7 8 9 10 private void sendBase (String methodName, TBase<?, ?> args, byte type) throws TException { oprot_.writeMessageBegin(new TMessage(methodName, type, ++seqid_)); args.write(oprot_); oprot_.writeMessageEnd(); oprot_.getTransport().flush(); }
org.apache.thrift.protocol.TBinaryProtocol#writeMessageBegin
会将版本、调用类型、方法名、请求 ID 写入到请求头
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void writeMessageBegin (TMessage message) throws TException { if (strictWrite_) { int version = VERSION_1 | message.type; writeI32(version); writeString(message.name); writeI32(message.seqid); } else { writeString(message.name); writeByte(message.type); writeI32(message.seqid); } }
io.github.helloworlde.thrift.HelloService.sayHello_args.sayHello_argsStandardScheme#write
写入请求内容,会将结构体的相关描述信息写入到请求中
1 2 3 4 5 6 7 8 9 10 11 12 public void write (org.apache.thrift.protocol.TProtocol oprot, sayHello_args struct) throws org.apache.thrift.TException { struct.validate(); oprot.writeStructBegin(STRUCT_DESC); if (struct.request != null ) { oprot.writeFieldBegin(REQUEST_FIELD_DESC); struct.request.write(oprot); oprot.writeFieldEnd(); } oprot.writeFieldStop(); oprot.writeStructEnd(); }
接收响应
io.github.helloworlde.thrift.HelloService.Client#recv_sayHello
在处理请求时先构建了 sayHello_result对象,用于解析响应的描述
1 2 3 4 5 6 7 8 public HelloResponse recv_sayHello () throws org.apache.thrift.TException { sayHello_result result = new sayHello_result(); receiveBase(result, "sayHello" ); if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result" ); }
org.apache.thrift.TServiceClient#receiveBase
读取响应内容,解析为对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 protected void receiveBase (TBase<?, ?> result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin(); if (msg.type == TMessageType.EXCEPTION) { TApplicationException x = new TApplicationException(); x.read(iprot_); iprot_.readMessageEnd(); throw x; } if (msg.seqid != seqid_) { throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, String.format("%s failed: out of sequence response: expected %d but got %d" , methodName, seqid_, msg.seqid)); } result.read(iprot_); iprot_.readMessageEnd(); }
org.apache.thrift.protocol.TBinaryProtocol#readMessageBegin
读取并校验版本,获取方法名称、消息类型、请求 ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public TMessage readMessageBegin () throws TException { int size = readI32(); if (size < 0 ) { int version = size & VERSION_MASK; if (version != VERSION_1) { throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin" ); } return new TMessage(readString(), (byte ) (size & 0x000000ff ), readI32()); } else { if (strictRead_) { throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?" ); } return new TMessage(readStringBody(size), readByte(), readI32()); } }
io.github.helloworlde.thrift.HelloService.sayHello_result.sayHello_resultStandardScheme#read
读取响应内容,解析为相应的对象,然后赋值给 result 对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void read (org.apache.thrift.protocol.TProtocol iprot, sayHello_result struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TField schemeField; iprot.readStructBegin(); while (true ) { schemeField = iprot.readFieldBegin(); if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break ; } switch (schemeField.id) { case 0 : if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { struct.success = new HelloResponse(); struct.success.read(iprot); struct.setSuccessIsSet(true ); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break ; default : org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } iprot.readFieldEnd(); } iprot.readStructEnd(); struct.validate(); }
参考文档