你的位置:首页 > Java教程

[Java教程]自己动手写RPC框架到dubbo的服务动态注册,服务路由,负载均衡功能实现


  RPC即远程过程调用,它的实现方式有很多,比如webservice等。框架调多了,烦了,没激情了,我们就该问自己,这些框架的作用到底是什么,来找回当初的激情。
  一般来说,我们写的系统就是一个单机系统,一个web服务器一个数据库服务,但是当这单台服务器的处理能力受硬件成本的限制,是不能无限的提升处理性能的。这个时候我们使用RPC将原来的本地调用转变为调用远端的服务器上的方法,给系统的处理能力和吞吐量带来了提升。
  RPC的实现包括客户端和服务端,即服务的调用方和服务的提供方。服务调用方发送rpc请求到服务提供方,服务提供方根据调用方提供的参数执行请求方法,将执行的结果返回给调用方,一次rpc调用完成。

   先让我们利用socket简单的实现RPC,来看看他是什么鬼样子。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6056763.html

可接网站开发,java开发。

新浪微博:intsmaze刘洋洋哥

微信:intsmaze

     服务端代码如下 

服务端的提供服务的方法

package cn.intsmaze.tcp.two.service;public class SayHelloServiceImpl {  public String sayHello(String helloArg) {    if(helloArg.equals("intsmaze"))    {      return "intsmaze";    }    else    {      return "bye bye";    }  }}

  服务端启动接收外部方法请求的端口类,它接收到来自客户端的请求数据后,利用反射知识,创建指定类的对象,并调用对应方法,然后把执行的结果返回给客户端即可。

package cn.intsmaze.tcp.two.service;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.ServerSocket;import java.net.Socket;public class Provider {  public static void main(String[] args) throws Exception {    ServerSocket server=new ServerSocket(1234);    while(true)    {      Socket socket=server.accept();      ObjectInputStream input=new ObjectInputStream(socket.getInputStream());            String classname=input.readUTF();//获得服务端要调用的类名      String methodName=input.readUTF();//获得服务端要调用的方法名称          Class<?>[] parameterTypes=(Class<?>[]) input.readObject();//获得服务端要调用方法的参数类型      Object[] arguments=(Object[]) input.readObject();//获得服务端要调用方法的每一个参数的值                Class serviceclass=Class.forName(classname);//创建类      Object object = serviceclass.newInstance();//创建对象      Method method=serviceclass.getMethod(methodName, parameterTypes);//获得该类的对应的方法            Object result=method.invoke(object, arguments);//该对象调用指定方法            ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());      output.writeObject(result);      socket.close();    }  }} 

  

服务调用者代码

  调用服务的方法,主要就是客户端启动一个socket,然后向提供服务的服务端发送数据,其中的数据就是告诉服务端去调用哪一个类的哪一个方法,已经调用该方法的参数是多少,然后结束服务端返回的数据即可。

调用服务

package cn.intsmaze.tcp.two.client;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.Socket;public class consumer {    @SuppressWarnings({ "unused", "rawtypes" })  public static void main(String[] arg) throws Exception  {    //我们要想调用远程提供的服务,必须告诉远程我们要调用你的哪一个类,这里我们可以在本地创建一个interface来获取类的名称,但是这样我们必须    //保证该interface和远程的interface的所在包名一致。这种方式不好。所以我们还是通过硬编码的方式吧。
     //虽然webservice就是这样的,我个人觉得不是多好。     // String interfacename=SayHelloService.class.getName(); String classname="cn.intsmaze.tcp.two.service.SayHelloServiceImpl"; String method="sayHello"; Class[] argumentsType={String.class}; Object[] arguments={"intsmaze"}; Socket socket=new Socket("127.0.0.1",1234); ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(classname); output.writeUTF(method); output.writeObject(argumentsType); output.writeObject(arguments); ObjectInputStream input=new ObjectInputStream(socket.getInputStream()); Object result=input.readObject(); System.out.println(result); socket.close(); }}

   当然实际中出于性能考虑,往往采用非阻塞式I/O,避免无限的等待,带来系统性能的消耗。

  上面的只是一个简单的过程,当系统之间的调用变的复杂之后,该方式有如下不足:服务调用者代码以硬编码的方式指明所调用服务的信息(类名,方法名),当服务提供方改动所提供的服务的代码后,服务调用者必须修改代码进行调整,不然会导致服务调用者无法成功进行远程方法调用导致系统异常,并且当服务提供者宕机下线了,服务调用者并不知道服务端是否存活,仍然会进行访问,导致异常。

  一个系统中,服务提供者往往不是一个,而是多个,那么服务消费者如何从众多的服务者找到对应的服务进行RPC就是一个问题了,因为这个时候我们不能在在服务调用者代码中硬编码指出调用哪一个服务的地址等信息,因为我们可以想象,没有一个统一的地方管理所有服务,那么我们在错综复杂的系统之间无法理清有哪些服务,已经服务的调用关系,这简直就是灾难。

    这个时候就要进行服务的注册,通过一个第三方的存储介质,当服务的提供者上线时,通过代码将所提供的服务的相关信息写入到存储介质中,写入的主要信息以key-value方式:服务的名称:(类名,方法名,参数类型,参数,IP地址,端口)。服务的调用者向远程调用服务时,会先到第三方存储介质中根据所要调用的服务名得到(类名,方法名,参数类型,参数,IP地址,端口)等参数,然后再向服务端发出调用请求。通过这种方式,代码就变得灵活多变,不会再因为一个局部的变得引发全局架构的变动。因为一般的改动是不会变得服务的名称的。这种方式其实就是soa架构,服务消费者通过服务名称,从众多服务中找到要调用的服务的相关信息,称为服务的路由。

  下面通过一个静态MAP对象来模拟第三方存储的介质。

package cn.intsmaze.tcp.three;import net.sf.json.JSONObject;public class ClassWays {    String classname;//类名    String method;//方法    Class[] argumentsType;//参数类型     String ip;//服务的ip地址    int port;//服务的端口    get,set...... }

  第三方存储介质,这里固定了服务提供者的相关信息,理想的模拟是,当服务启动后,自动向该类的map集合添加信息。但是因为服务端和客户端启动时,是两个不同的jvm进程,客户端时无法访问到服务端写到静态map集合的数据的。

package cn.intsmaze.tcp.three;import java.util.HashMap;import java.util.Map;import net.sf.json.JSONObject;public class ServiceRoute {    public static Map<String,String> NAME=new HashMap<String, String>();    public ServiceRoute()  {    ClassWays classWays=new ClassWays();    Class[] argumentsType={String.class};    classWays.setArgumentsType(argumentsType);    classWays.setClassname("cn.intsmaze.tcp.three.service.SayHelloServiceImpl");    classWays.setMethod("sayHello");    classWays.setIp("127.0.0.1");    classWays.setPort(1234);    JSONObject js=JSONObject.fromObject(classWays);    NAME.put("SayHello", js.toString());  } }

  接下来看服务端代码的美丽面孔吧。

package cn.intsmaze.tcp.three.service;public class Provider {  //服务启动的时候,组装相关信息,然后写入第三方存储机制,供服务的调用者去获取  public void reallyUse() {        ClassWays classWays = new ClassWays();    Class[] argumentsType = { String.class };    classWays.setArgumentsType(argumentsType);    classWays.setClassname("cn.intsmaze.tcp.three.service.SayHelloServiceImpl");    classWays.setMethod("sayHello");    classWays.setIp("127.0.0.1");    classWays.setPort(1234);        JSONObject js=JSONObject.fromObject(classWays);        //模拟第三方存储介质,实际中应该是redis,mysql,zookeeper等。    ServiceRoute.NAME.put("SayHello", js.toString());  }  public static void main(String[] args) throws Exception {    ServerSocket server = new ServerSocket(1234);    //实际中,这个地方应该调用如下方法,但是因为简单的模拟服务的注册,将注册的信息硬编码在ServiceRoute类中,这个类的构造方法里面会自动注册服务的相关信息。    //server.reallyUse();    while (true) {      Socket socket = server.accept();      ObjectInputStream input = new ObjectInputStream(socket.getInputStream());      String classname = input.readUTF();      String methodName = input.readUTF();      Class<?>[] parameterTypes = (Class<?>[]) input.readObject();      Object[] arguments = (Object[]) input.readObject();      Class serviceclass = Class.forName(classname);      Object object = serviceclass.newInstance();      Method method = serviceclass.getMethod(methodName, parameterTypes);      Object result = method.invoke(object, arguments);      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());      output.writeObject(result);      socket.close();    }  }}

  服务的调用者代码:

package cn.intsmaze.tcp.three.client;public class Consumer {    public Object reallyUse(String provideName,Object[] arguments) throws Exception  {    //模拟从第三方存储介质拿去数据    ServiceRoute serviceRoute=new ServiceRoute();    String js=serviceRoute.NAME.get(provideName);    JSONObject obj = new JSONObject().fromObject(js);    ClassWays classWays = (ClassWays)JSONObject.toBean(obj,ClassWays.class);        String classname=classWays.getClassname();    String method=classWays.getMethod();    Class[] argumentsType=classWays.getArgumentsType();    Socket socket=new Socket(classWays.getIp(),classWays.getPort());        ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());        output.writeUTF(classname);    output.writeUTF(method);    output.writeObject(argumentsType);    output.writeObject(arguments);        ObjectInputStream input=new ObjectInputStream(socket.getInputStream());    Object result=input.readObject();    socket.close();    return result;  }  @SuppressWarnings({ "unused", "rawtypes" })  public static void main(String[] arg) throws Exception  {    Consumer consumer=new Consumer();    Object[] arguments={"intsmaze"};    Object result=consumer.reallyUse("SayHello",arguments);    System.out.println(result);  }}

  回到开始的问题现在我们保证了服务调用者对服务的调用的相关参数以动态的方式进行控制,通过封装,服务调用者只需要指定每一次调用时的参数的值即可。但是当服务提供者宕机下线了,服务调用者并不知道服务端是否存活,仍然会进行访问,导致异常。这个时候我们该如何考虑解决了?

  剩下的我就不写代码示例了,代码只是思想的表现形式,就像开发语言一直变化,但是思想是不变的。

  服务下线我们应该把该服务从第三方存储删除,在服务提供方写代码进行删除控制,也就是服务下线前访问第三方删除自己提供的服务。这样当然行不通的,因为服务宕机时,才不会说,我要宕机了,服务提供者你快去第三方存储介质删掉该服务信息。所以这个时候我们就要在第三方存储介质上做手脚,比如服务提供方并不是直接把服务信息写入第三方存储介质,而是与一个第三方系统进行交互,第三方系统把接收到来自服务提供者的服务信息写入第三方存储介质中,然后在服务提供者和第三方系统间建立一个心跳检测,当第三方系统检测到服务提供者宕机后,就会自动到第三方介质中删除对应服务信息。

  这个时候我们就可以选择zookeeper作为第三方存储介质,服务启动会到zookeeper上面创建一个临时目录,该目录存储该服务的相关信息,当服务端宕机了,zookeeper会自动删除该文件夹,这个时候就实现了服务的动态上下线了。

  这个地方其实就是dubbo的一大特色功能:服务配置中心——动态注册和获取服务信息,来统一管理服务名称和其对于的服务器的信息。服务提供者在启动时,将其提供的服务名称,服务器地址注册到服务配置中心,服务消费者通过配置中心来获得需要调用服务的机器。当服务器宕机或下线,相应的机器需要动态地从服务配置中心移除,并通知相应的服务消费者。这个过程中,服务消费者只在第一次调用服务时需要查询服务配置中心,然后将查询到的信息缓存到本地,后面的调用直接使用本地缓存的服务地址信息,而不需要重新发起请求到服务配置中心去获取相应的服务地址,直到服务的地址列表有变更(机器上线或者下线)。

  zookeeper如何知道的?zookeeper其实就是会和客户端直接有一个心跳检测来判断的,zookeeper功能很简单的,可以自己去看对应的书籍即可。

   随着业务的发展,服务调用者的规模发展到一定的阶段,对服务提供方也带来了巨大的压力,这个时候服务提供方就不在是一台机器了,而是一个服务集群了。这个时候服务调用者如何知道调用服务集群的哪一台机器?

  多台服务器组成的集群,在请求到来时,需要有一个负载均衡程序从服务的地址列表中选取一台服务器进行访问(服务的负载均衡)。这个时候我们可能会这样做,在第三方介质中存储每一个服务集群的代理地址,这样服务消费者获取对应服务集群的代理地址,向代理发送请求,然后再由代理负责转发请求到对应的服务机器。比如nginx。(实现原理,每一个服务启动,向第三方存储介质存储该服务的代理地址即可,具体多个相同服务产生代理地址重复可以通过代码进行控制,这里就不扯了,都是有经验的开发人员,服务下线的检测,第三方存储介质进行控制。)  
  我们来看看这种有什么问题,使用nginx进行负载均衡,一旦nginx宕机,依赖他的服务均将失效。所以我们会对动态注册和获取服务信息这个功能进行改进,同一个服务每上线一台新的机器,在zookeeper中的每一个服务名下面创建一个临时目录,一个临时目录对应该服务的一个集群,进而实现了服务的集群功能。然后考虑服务的调用者,因为内部系统的调用不像对外的服务,他的访问数量是可控的且有限的,所以就没有必要将负载均衡算法在zookeeper那一端进行实现,就是服务调用者每一次调用都到zookeeper上通过负载均衡来选择对应服务下的某一个机器目录,只需要在服务的调用端编写负载均衡算法,具体就不需要讲太多,条条大路通罗马。

  如果有没有讲明白的可以留言,我进行更正。基本上一个RPC就是这样,剩下的一些基于RPC的框架无非就是实现了多些协议,以及一些多种语言环境的考虑和效率的提升。

   觉得不错点个推荐吧,看在我花了一天时间把自己的知识整理分析,谢谢喽。