你的位置:首页 > Java教程

[Java教程]Apache thrift RPC 双向通信


      在上一篇介绍Apache thrift 安装和使用,写了一个简单的demo,讲解thrift服务的发布和客户端调用,但只是单向的客户端发送消息,服务端接收消息。而客户端却得不到服务器的响应。

在不涉及语言平台的制约,WebService可胜任做这些服务端的处理。

     基于大部分业务需求,更需要服务端能够响应处理数据。下面我通过一个demo案例,介绍下Apache thrift 双向通信的使用。

一.首先我们还是需要安装好Apache thrift。这里不再赘述,戳这里查看我上篇文章的介绍:http://www.cnblogs.com/sumingk/articles/6073105.html

二.其次准备好thrift 所需的jar包:

    

三.新建一个Java web项目,编写thrift脚本,命名为student.thrift  如下:

namespace java com.zhj.studenttypedef i32 int typedef i16 shorttypedef i64 long//Student Entitystruct Student {  1: string name} service Zthrift {  oneway void send(1:Student msg)}

四.执行student.thrift 文件,thrift  --gen java  student.thrift (该文件我还是放在c盘根目录下执行),随后生产gen-java文件,如下:

五.将新生成的两文件拷入项目中,其中Student.java 是实体类,Zthrift.java是生成的类。

六.编写thrift服务端类。

package com.zhj.server;import org.apache.thrift.TException;import org.apache.thrift.TProcessor;import org.apache.thrift.TProcessorFactory;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.server.TThreadPoolServer;import org.apache.thrift.transport.TServerSocket;import org.apache.thrift.transport.TTransport;import org.apache.thrift.transport.TTransportException;import com.zhj.student.Student;import com.zhj.student.Zthrift;import com.zhj.student.Zthrift.Iface;public class ZServer {    public static void main(String[] args){      try {        TServerSocket tServerSocket=new TServerSocket(9999);        TThreadPoolServer.Args targs=new TThreadPoolServer.Args(tServerSocket);        TBinaryProtocol.Factory factory=new TBinaryProtocol.Factory();        //获取processFactory        TProcessorFactory tProcessorFactory= getProcessorFactory();        targs.protocolFactory(factory);        targs.processorFactory(tProcessorFactory);        TThreadPoolServer tThreadPoolServer=new TThreadPoolServer(targs);        System.out.println("start server...");        tThreadPoolServer.serve();              } catch (TTransportException e) {        // TODO Auto-generated catch block        e.printStackTrace();      }    }        /**     * 内部类获取 getProcessorFactory     * @return     */    public static int tt= 0;    public static TProcessorFactory getProcessorFactory(){            TProcessorFactory tProcessorFactory=new TProcessorFactory(null){        public TProcessor getProcessor(final TTransport tTransport){          Thread thread = new Thread(new Runnable() {                        @Override            public void run() {              try {                                System.out.println("服务端休眠5秒后,执行响应......");                //延时五秒回复(延迟执行给客户端发送消息)                Thread.sleep(5000);                tt +=100;                System.out.println("延时五秒回复时,tt = " +tt);                 //这里可以把client提取作为成员变量来多次使用                Zthrift.Client client = new Zthrift.Client(new TBinaryProtocol(tTransport));                //给客户端响应消息                client.send(new Student("....test"));                              } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();              } catch (TException e) {                // TODO Auto-generated catch block                e.printStackTrace();              }            }          });          thread.start();                    return new Zthrift.Processor<Iface>(new Iface() {                        @Override            public void send(Student msg) throws TException {              // TODO Auto-generated method stub              tt+=10;              System.out.println("接收客户端消息时,tt = " +tt);              //接受客户端消息               System.out.println("....."+msg.toString());            }          });                  }      };      return tProcessorFactory;    }}

此处,内部类使用比较频繁,阅读会有些困难。Zthrift,Processor构造方法需要传入一个Iface 接口,该接口有一个接收客户端的方法send(), msg 是一个Student对象。

 七.实现的客户端调用。如下:

package com.zhj.client;import org.apache.thrift.TException;import org.apache.thrift.protocol.TBinaryProtocol;import org.apache.thrift.transport.TSocket;import org.apache.thrift.transport.TTransportException;import com.zhj.student.Student;import com.zhj.student.Zthrift.Iface;import com.zhj.student.Zthrift;public class ZClient {  public static void main(String[]args){    final TSocket tSocket=new TSocket("127.0.0.1",9999);    Zthrift.Client client=new Zthrift.Client(new TBinaryProtocol(tSocket));    try {      tSocket.open();      runMethod(tSocket);      //向服务端发送消息      client.send(new Student("小明1"));          } catch (TTransportException e) {      // TODO Auto-generated catch block      e.printStackTrace();    } catch (TException e) {      // TODO Auto-generated catch block      e.printStackTrace();    }  }    public static void runMethod(final TSocket tSocket){    Thread thread = new Thread(new Runnable() {            @Override      public void run() {        Zthrift.Processor<Iface> mp = new Zthrift.Processor<Zthrift.Iface>(new Iface() {                    @Override          public void send(Student msg) throws TException {            // TODO Auto-generated method stub            Long start = System.currentTimeMillis();            try {              while(true){                //具体接收时间待定                if((System.currentTimeMillis()-start)>0.1*60*1000){                  System.out.println("响应消息超时...");                  break;                }                else {                  System.out.println("收到服务端响应消息: "+msg);                }                //休眠两秒                Thread.sleep(2000L);              }            } catch (InterruptedException e) {              // TODO Auto-generated catch block              e.printStackTrace();            }          }                  });                try {          while(mp.process(new TBinaryProtocol(tSocket), new TBinaryProtocol(tSocket))){            //阻塞式方法,不需要内容            System.out.println("走阻塞式方法");            //关闭tScoket            // tSocket.close();          }        } catch (TException e) {          System.out.println("连接已断开...");          e.printStackTrace();        }      }    });    thread.start();  }}

在这里,我加入了一个超时响应的死循环,用于接收服务端返回的消息,控制台可以查看服务端给的响应消息。

八.运行服务端和客户端main方法,控制台打印如下:

  

代码阅读有些困难,有困难或不合理之处,请小伙伴指出。Thank you!