你的位置:首页 > Java教程

[Java教程]dubbo/dubbox 增加原生thrift及avro支持


(facebook) thrift / (hadoop) avro / (google) probuf(grpc)是近几年来比较抢眼的高效序列化/rpc框架,dubbo框架虽然有thrift的支持,但是依赖的版本较早,只支持0.8.0,而且还对协议做一些扩展,并非原生的thrift协议。

github上虽然也有朋友对dubbo做了扩展支持原生thrift,但是代码实在太多了,只需要一个类即可:

Thrift2Protocal.java:

package com.alibaba.dubbo.rpc.protocol.thrift2;import com.alibaba.dubbo.common.URL;import com.alibaba.dubbo.common.logger.Logger;import com.alibaba.dubbo.common.logger.LoggerFactory;import com.alibaba.dubbo.rpc.RpcException;import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;import org.apache.thrift.TProcessor;import org.apache.thrift.protocol.TCompactProtocol;import org.apache.thrift.protocol.TProtocol;import org.apache.thrift.server.TNonblockingServer;import org.apache.thrift.server.TServer;import org.apache.thrift.transport.TFramedTransport;import org.apache.thrift.transport.TNonblockingServerSocket;import org.apache.thrift.transport.TSocket;import org.apache.thrift.transport.TTransport;import java.lang.reflect.Constructor;/** * 为dubbo-rpc添加"原生thrift"支持 * by 杨俊明(http://yjmyzz.cnblogs.com/) */public class Thrift2Protocol extends AbstractProxyProtocol {  public static final int DEFAULT_PORT = 33208;  private static final Logger logger = LoggerFactory.getLogger(Thrift2Protocol.class);  public int getDefaultPort() {    return DEFAULT_PORT;  }  @Override  protected <T> Runnable doExport(T impl, Class<T> type, URL url)      throws RpcException {    logger.info("impl => " + impl.getClass());    logger.info("type => " + type.getName());    logger.info("url => " + url);    TProcessor tprocessor;    TNonblockingServer.Args tArgs = null;    String iFace = "$Iface";    String processor = "$Processor";    String typeName = type.getName();    TNonblockingServerSocket transport;    if (typeName.endsWith(iFace)) {      String processorClsName = typeName.substring(0, typeName.indexOf(iFace)) + processor;      try {        Class<?> clazz = Class.forName(processorClsName);        Constructor constructor = clazz.getConstructor(type);        try {          tprocessor = (TProcessor) constructor.newInstance(impl);          transport = new TNonblockingServerSocket(url.getPort());          tArgs = new TNonblockingServer.Args(transport);          tArgs.processor(tprocessor);          tArgs.transportFactory(new TFramedTransport.Factory());          tArgs.protocolFactory(new TCompactProtocol.Factory());        } catch (Exception e) {          logger.error(e.getMessage(), e);          throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);        }      } catch (Exception e) {        logger.error(e.getMessage(), e);        throw new RpcException("Fail to create thrift server(" + url + ") : " + e.getMessage(), e);      }    }    if (tArgs == null) {      logger.error("Fail to create thrift server(" + url + ") due to null args");      throw new RpcException("Fail to create thrift server(" + url + ") due to null args");    }    final TServer thriftServer = new TNonblockingServer(tArgs);    new Thread(new Runnable() {      public void run() {        logger.info("Start Thrift Server");        thriftServer.serve();        logger.info("Thrift server started.");      }    }).start();    return new Runnable() {      public void run() {        try {          logger.info("Close Thrift Server");          thriftServer.stop();        } catch (Throwable e) {          logger.warn(e.getMessage(), e);        }      }    };  }  @Override  protected <T> T doRefer(Class<T> type, URL url) throws RpcException {    logger.info("type => " + type.getName());    logger.info("url => " + url);    try {      TSocket tSocket;      TTransport transport;      TProtocol protocol;      T thriftClient = null;      String iFace = "$Iface";      String client = "$Client";      String typeName = type.getName();      if (typeName.endsWith(iFace)) {        String clientClsName = typeName.substring(0, typeName.indexOf(iFace)) + client;        Class<?> clazz = Class.forName(clientClsName);        Constructor constructor = clazz.getConstructor(TProtocol.class);        try {          tSocket = new TSocket(url.getHost(), url.getPort());          transport = new TFramedTransport(tSocket);          protocol = new TCompactProtocol(transport);          thriftClient = (T) constructor.newInstance(protocol);          transport.open();          logger.info("thrift client opened for service(" + url + ")");        } catch (Exception e) {          logger.error(e.getMessage(), e);          throw new RpcException("Fail to create remoting client:" + e.getMessage(), e);        }      }      return thriftClient;    } catch (Exception e) {      logger.error(e.getMessage(), e);      throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);    }  }}

重写父类AbstractProxyProtocol的二个抽象方法doExport及doRefer即可,doExport用于对外暴露RPC服务,在这个方法里启动thrift server,dubbo service provider在启动时会调用该方法。而doRefer用于dubbo service consumer发现服务后,获取对应的rpc-client。

 

参考这个思路,avro也很容易集成进来:

AvroProtocol.java

package com.alibaba.dubbo.rpc.protocol.avro;import com.alibaba.dubbo.common.URL;import com.alibaba.dubbo.common.logger.Logger;import com.alibaba.dubbo.common.logger.LoggerFactory;import com.alibaba.dubbo.rpc.RpcException;import com.alibaba.dubbo.rpc.protocol.AbstractProxyProtocol;import org.apache.avro.ipc.NettyServer;import org.apache.avro.ipc.NettyTransceiver;import org.apache.avro.ipc.Server;import org.apache.avro.ipc.reflect.ReflectRequestor;import org.apache.avro.ipc.reflect.ReflectResponder;import java.net.InetSocketAddress;/** * 为dubbo-rpc添加avro支持 * by 杨俊明(http://yjmyzz.cnblogs.com/) */public class AvroProtocol extends AbstractProxyProtocol {  public static final int DEFAULT_PORT = 40881;  private static final Logger logger = LoggerFactory.getLogger(AvroProtocol.class);  public int getDefaultPort() {    return DEFAULT_PORT;  }  @Override  protected <T> Runnable doExport(T impl, Class<T> type, URL url)      throws RpcException {    logger.info("impl => " + impl.getClass());    logger.info("type => " + type.getName());    logger.info("url => " + url);    final Server server = new NettyServer(new ReflectResponder(type, impl),        new InetSocketAddress(url.getHost(), url.getPort()));    server.start();    return new Runnable() {      public void run() {        try {          logger.info("Close Avro Server");          server.close();        } catch (Throwable e) {          logger.warn(e.getMessage(), e);        }      }    };  }  @Override  protected <T> T doRefer(Class<T> type, URL url) throws RpcException {    logger.info("type => " + type.getName());    logger.info("url => " + url);    try {      NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(url.getHost(), url.getPort()));      T ref = ReflectRequestor.getClient(type, client);      logger.info("Create Avro Client");      return ref;    } catch (Exception e) {      logger.error(e.getMessage(), e);      throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);    }  }}

不要忘记在META-INF/dubbo/internal下添加名为com.alibaba.dubbo.rpc.Protocal的文件,内容为:

avro=com.alibaba.dubbo.rpc.protocol.avro.AvroProtocol

 

最后谈谈如何打包到dubbo的jar里:  

dubbo-rpc/pom.

  <modules>    ...    <module>dubbo-rpc-avro</module>    ...    <module>dubbo-rpc-thrift2</module>    ...        </modules>

然后dubbo/pom.

  <artifactSet>    <includes>  ...      <include>com.alibaba:dubbo-rpc-api</include>      <include>com.alibaba:dubbo-rpc-avro</include>     ...      <include>com.alibaba:dubbo-rpc-thrift2</include>      ...    </includes>  </artifactSet>  

dependencies节也要增加:

<dependency>      <groupId>com.alibaba</groupId>      <artifactId>dubbo-rpc-thrift2</artifactId>      <version>${project.parent.version}</version>      <exclusions>        <exclusion>          <groupId>org.apache.thrift</groupId>          <artifactId>libthrift</artifactId>        </exclusion>      </exclusions>    </dependency>    <dependency>      <groupId>com.alibaba</groupId>      <artifactId>dubbo-rpc-avro</artifactId>      <version>${project.parent.version}</version>      <exclusions>        <exclusion>          <groupId>org.apache.avro</groupId>          <artifactId>avro</artifactId>        </exclusion>        <exclusion>          <groupId>org.apache.avro</groupId>          <artifactId>avro-ipc</artifactId>        </exclusion>      </exclusions>    </dependency>

这样打包出来的dubbo-xxx.jar里,就包括新增的Protocol。至于google的protobuf,目前处于3.x -beta阶段,等以后出正式版了,再看情况整合起来。

以上代码已经提交到github:https://github.com/yjmyzz/dubbox (版本号:2.8.4a)