你的位置:首页 > Java教程

[Java教程]RPC原来就是Socket——RPC框架到dubbo的服务动态注册,服务路由,负载均衡演化


  序:RPC就是使用socket告诉服务端我要调你的哪一个类的哪一个方法然后获得处理的结果。服务注册和路由就是借助第三方存储介质存储服务信息让服务消费者调用。然我们自己动手从0开始写一个rpc功能以及实现服务注册,动态上下线,服务路由,负载均衡。

  RPC即远程过程调用,它的实现方式有很多,比如webservice等。框架调多了,烦了,没激情了,我们就该问自己,这些框架的作用到底是什么,来找回当初的激情。
  一般来说,我们写的系统就是一个单机系统,一个web服务器一个数据库服务,但是当这单台服务器的处理能力受硬件成本的限制,是不能无限的提升处理性能的。这个时候我们使用RPC将原来的本地调用转变为调用远端的服务器上的方法,给系统的处理能力和吞吐量带来了提升。
  RPC的实现包括客户端和服务端,即服务的调用方和服务的提供方。服务调用方发送rpc请求到服务提供方,服务提供方根据调用方提供的参数执行请求方法,将执行的结果返回给调用方,一次rpc调用完成。

  先让我们利用socket简单的实现RPC,来看看他是什么鬼样子。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6058765.html

可接网站开发,java开发。

新浪微博:intsmaze刘洋洋哥

微信:intsmaze

服务端代码如下 

服务端的提供服务的方法

package cn.intsmaze.tcp.two.service;public class SayHelloServiceImpl {  public String sayHello(String helloArg) {    if(helloArg.equals("intsmaze"))    {      return "intsmaze";    }    else    {      return "bye bye";    }  }}

  服务端启动接收外部方法请求的端口类,它接收到来自客户端的请求数据后,利用反射知识,创建指定类的对象,并调用对应方法,然后把执行的结果返回给客户端即可。

package cn.intsmaze.tcp.two.service;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.ServerSocket;import java.net.Socket;public class Provider {  public static void main(String[] args) throws Exception {    ServerSocket server=new ServerSocket(1234);    while(true)    {      Socket socket=server.accept();      ObjectInputStream input=new ObjectInputStream(socket.getInputStream());            String classname=input.readUTF();//获得服务端要调用的类名      String methodName=input.readUTF();//获得服务端要调用的方法名称          Class<?>[] parameterTypes=(Class<?>[]) input.readObject();//获得服务端要调用方法的参数类型      Object[] arguments=(Object[]) input.readObject();//获得服务端要调用方法的每一个参数的值                Class serviceclass=Class.forName(classname);//创建类      Object object = serviceclass.newInstance();//创建对象      Method method=serviceclass.getMethod(methodName, parameterTypes);//获得该类的对应的方法            Object result=method.invoke(object, arguments);//该对象调用指定方法            ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());      output.writeObject(result);      socket.close();    }  }}  

服务调用者代码

  调用服务的方法,主要就是客户端启动一个socket,然后向提供服务的服务端发送数据,其中的数据就是告诉服务端去调用哪一个类的哪一个方法,已经调用该方法的参数是多少,然后结束服务端返回的数据即可。

package cn.intsmaze.tcp.two.client;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.net.Socket;public class consumer {    @SuppressWarnings({ "unused", "rawtypes" })  public static void main(String[] arg) throws Exception  {    //我们要想调用远程提供的服务,必须告诉远程我们要调用你的哪一个类,这里我们可以在本地创建一个interface来获取类的名称,但是这样我们必须    //保证该interface和远程的interface的所在包名一致。这种方式不好。所以我们还是通过硬编码的方式吧。
     //虽然webservice就是这样的,我个人觉得不是多好。     // String interfacename=SayHelloService.class.getName(); String classname="cn.intsmaze.tcp.two.service.SayHelloServiceImpl"; String method="sayHello"; Class[] argumentsType={String.class}; Object[] arguments={"intsmaze"}; Socket socket=new Socket("127.0.0.1",1234); ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(classname); output.writeUTF(method); output.writeObject(argumentsType); output.writeObject(arguments); ObjectInputStream input=new ObjectInputStream(socket.getInputStream()); Object result=input.readObject(); System.out.println(result); socket.close(); }}

   当然实际中出于性能考虑,往往采用非阻塞式I/O,避免无限的等待,带来系统性能的消耗。

  上面的只是一个简单的过程,当系统之间的调用变的复杂之后,该方式有如下不足:服务调用者代码以硬编码的方式指明所调用服务的信息(类名,方法名),当服务提供方改动所提供的服务的代码后,服务调用者必须修改代码进行调整,不然会导致服务调用者无法成功进行远程方法调用导致系统异常,并且当服务提供者宕机下线了,服务调用者并不知道服务端是否存活,仍然会进行访问,导致异常。

  一个系统中,服务提供者往往不是一个,而是多个,那么服务消费者如何从众多的服务者找到对应的服务进行RPC就是一个问题了,因为这个时候我们不能在在服务调用者代码中硬编码指出调用哪一个服务的地址等信息,因为我们可以想象,没有一个统一的地方管理所有服务,那么我们在错综复杂的系统之间无法理清有哪些服务,已经服务的调用关系,这简直就是灾难。

    这个时候就要进行服务的注册,通过一个第三方的存储介质,当服务的提供者上线时,通过代码将所提供的服务的相关信息写入到存储介质中,写入的主要信息以key-value方式:服务的名称:(类名,方法名,参数类型,参数,IP地址,端口)。服务的调用者向远程调用服务时,会先到第三方存储介质中根据所要调用的服务名得到(类名,方法名,参数类型,参数,IP地址,端口)等参数,然后再向服务端发出调用请求。通过这种方式,代码就变得灵活多变,不会再因为一个局部的变得引发全局架构的变动。因为一般的改动是不会变得服务的名称的。这种方式其实就是soa架构,服务消费者通过服务名称,从众多服务中找到要调用的服务的相关信息,称为服务的路由。

  下面通过一个静态MAP对象来模拟第三方存储的介质。

package cn.intsmaze.tcp.three;import net.sf.json.JSONObject;public class ClassWays {    String classname;//类名    String method;//方法    Class[] argumentsType;//参数类型     String ip;//服务的ip地址    int port;//服务的端口    get,set...... }

  第三方存储介质,这里固定了服务提供者的相关信息,理想的模拟是,当服务启动后,自动向该类的map集合添加信息。但是因为服务端和客户端启动时,是两个不同的jvm进程,客户端时无法访问到服务端写到静态map集合的数据的。

package cn.intsmaze.tcp.three;import java.util.HashMap;import java.util.Map;import net.sf.json.JSONObject;public class ServiceRoute {    public static Map<String,String> NAME=new HashMap<String, String>();    public ServiceRoute()  {    ClassWays classWays=new ClassWays();    Class[] argumentsType={String.class};    classWays.setArgumentsType(argumentsType);    classWays.setClassname("cn.intsmaze.tcp.three.service.SayHelloServiceImpl");    classWays.setMethod("sayHello");    classWays.setIp("127.0.0.1");    classWays.setPort(1234);    JSONObject js=JSONObject.fromObject(classWays);    NAME.put("SayHello", js.toString());  } }

  接下来看服务端代码的美丽面孔吧。

package cn.intsmaze.tcp.three.service;public class Provider {  //服务启动的时候,组装相关信息,然后写入第三方存储机制,供服务的调用者去获取  public void reallyUse() {        ClassWays classWays = new ClassWays();    Class[] argumentsType = { String.class };    classWays.setArgumentsType(argumentsType);    classWays.setClassname("cn.intsmaze.tcp.three.service.SayHelloServiceImpl");    classWays.setMethod("sayHello");    classWays.setIp("127.0.0.1");    classWays.setPort(1234);        JSONObject js=JSONObject.fromObject(classWays);        //模拟第三方存储介质,实际中应该是redis,mysql,zookeeper等。    ServiceRoute.NAME.put("SayHello", js.toString());  }  public static void main(String[] args) throws Exception {    ServerSocket server = new ServerSocket(1234);    //实际中,这个地方应该调用如下方法,但是因为简单的模拟服务的注册,将注册的信息硬编码在ServiceRoute类中,这个类的构造方法里面会自动注册服务的相关信息。    //server.reallyUse();    while (true) {      Socket socket = server.accept();      ObjectInputStream input = new ObjectInputStream(socket.getInputStream());      String classname = input.readUTF();      String methodName = input.readUTF();      Class<?>[] parameterTypes = (Class<?>[]) input.readObject();      Object[] arguments = (Object[]) input.readObject();      Class serviceclass = Class.forName(classname);      Object object = serviceclass.newInstance();      Method method = serviceclass.getMethod(methodName, parameterTypes);      Object result = method.invoke(object, arguments);      ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());      output.writeObject(result);      socket.close();    }  }}

  服务的调用者代码:

package cn.intsmaze.tcp.three.client;public class Consumer {    public Object reallyUse(String provideName,Object[] arguments) throws Exception  {    //模拟从第三方存储介质拿去数据    ServiceRoute serviceRoute=new ServiceRoute();    String js=serviceRoute.NAME.get(provideName);    JSONObject obj = new JSONObject().fromObject(js);    ClassWays classWays = (ClassWays)JSONObject.toBean(obj,ClassWays.class);        String classname=classWays.getClassname();    String method=classWays.getMethod();    Class[] argumentsType=classWays.getArgumentsType();    Socket socket=new Socket(classWays.getIp(),classWays.getPort());        ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());        output.writeUTF(classname);    output.writeUTF(method);    output.writeObject(argumentsType);    output.writeObject(arguments);        ObjectInputStream input=new ObjectInputStream(socket.getInputStream());    Object result=input.readObject();    socket.close();    return result;  }  @SuppressWarnings({ "unused", "rawtypes" })  public static void main(String[] arg) throws Exception  {    Consumer consumer=new Consumer();    Object[] arguments={"intsmaze"};    Object result=consumer.reallyUse("SayHello",arguments);    System.out.println(result);  }}

  回到开始的问题现在我们保证了服务调用者对服务的调用的相关参数以动态的方式进行控制,通过封装,服务调用者只需要指定每一次调用时的参数的值即可。但是当服务提供者宕机下线了,服务调用者并不知道服务端是否存活,仍然会进行访问,导致异常。这个时候我们该如何考虑解决了?

  剩下的我就不写代码示例了,代码只是思想的表现形式,就像开发语言一直变化,但是思想是不变的。

  服务下线我们应该把该服务从第三方存储删除,在服务提供方写代码进行删除控制,也就是服务下线前访问第三方删除自己提供的服务。这样当然行不通的,因为服务宕机时,才不会说,我要宕机了,服务提供者你快去第三方存储介质删掉该服务信息。所以这个时候我们就要在第三方存储介质上做手脚,比如服务提供方并不是直接把服务信息写入第三方存储介质,而是与一个第三方系统进行交互,第三方系统把接收到来自服务提供者的服务信息写入第三方存储介质中,然后在服务提供者和第三方系统间建立一个心跳检测,当第三方系统检测到服务提供者宕机后,就会自动到第三方介质中删除对应服务信息。

  这个时候我们就可以选择zookeeper作为第三方存储介质,服务启动会到zookeeper上面创建一个临时目录,该目录存储该服务的相关信息,当服务端宕机了,zookeeper会自动删除该文件夹,这个时候就实现了服务的动态上下线了。

  这个地方其实就是dubbo的一大特色功能:服务配置中心——动态注册和获取服务信息,来统一管理服务名称和其对于的服务器的信息。服务提供者在启动时,将其提供的服务名称,服务器地址注册到服务配置中心,服务消费者通过配置中心来获得需要调用服务的机器。当服务器宕机或下线,相应的机器需要动态地从服务配置中心移除,并通知相应的服务消费者。这个过程中,服务消费者只在第一次调用服务时需要查询服务配置中心,然后将查询到的信息缓存到本地,后面的调用直接使用本地缓存的服务地址信息,而不需要重新发起请求到服务配置中心去获取相应的服务地址,直到服务的地址列表有变更(机器上线或者下线)。

  zookeeper如何知道的?zookeeper其实就是会和客户端直接有一个心跳检测来判断的,zookeeper功能很简单的,可以自己去看对应的书籍即可。

  随着业务的发展,服务调用者的规模发展到一定的阶段,对服务提供方也带来了巨大的压力,这个时候服务提供方就不在是一台机器了,而是一个服务集群了。

  服务调用者面对服务提供者集群如何高效选择服务提供者集群中某一台机器?

  一说到集群,我们都会想到反向代理nginx,所以我们就会采用nginx的配置文件中存储集群中的所有IP和端口信息。然后把第三方存储介质中存储的服务信息——key-value:服务的名称:(类名,方法名,参数类型,参数,IP地址,端口)IP地址改为集群的代理地址,然后服务消费者根据服务名称获得服务信息后组装请求把数据发送到nginx,再由nginx负责转发请求到对应的服务提供者集群中的一台。

  这确实是可以满足的,但是如果吹毛求疵就会发现他所暴露的问题!

  一:使用nginx进行负载均衡,一旦nginx宕机,那么依赖他的服务均将失效,这个时候服务的提供者并没有宕机。

  二:这是一个内部系统的调用,服务调用者集群数量远远小于外部系统的请求数量,那么我们将所有的服务消费者到服务提供者的请求都经过nginx,带来不必要的效率开销。

  改进方案:将服务提供者集群的所有信息都存储到第三方系统(如zookeeper)中对应服务名称下,表现形式为——服务名:[{机器IP:(类名,方法名,参数类型,参数,IP地址,端口)}...]。这样服务消费者向第三方存储系统(如zookeeper)获得服务的所有信息(服务集群的地址列表),然后服务调用者就从这个列表中根据负载均衡算法选择一个进行访问。

  这个时候我们可能会思考,负载均衡算法我们是参考nginx把IP地址的分配选择在第三方系统(如zookeeper)上进行实现还是在服务调用者端进行实现?负载均衡算法部署在第三方系统(如zookeeper),服务消费者把服务名称发给第三方系统,第三方系统根据服务名然后根据负载均衡算法从该服务的地址信息列表中选择一个返回给服务消费者,服务消费者获得所调用服务的具体信息后,直接向服务的提供者发送请求。但是正如我所说,这只是一个内部系统,请求的数量往往没有多大的变化,而且实现起来要在服务消费者直接调用zookeeper系统前面编写一个中间件作为一个中间,不免过于麻烦。我们完全可以在服务的消费者处嵌入负载均衡算法,服务消费者获取服务的地址信息列表后,运算负载均衡算法从所得的地址信息列表中选择一个地址信息发送请求的数据。更进一步,服务消费者第一次执行负载均衡算法后就把选择的地址信息存储到本地缓存,以后再次访问就直接从本地拿去,不再到第三方系统中获取了。

  基于第三方系统实现服务的负载均衡的方案已经实现,那么我们来解决下一个问题,服务的上线和下线如何告知服务的消费者,避免服务消费者访问异常?

  前面我们说了,服务提供者利用zookeeper系统的特性,可以实现服务的注册和删除,那么同样,我们也可以让服务的消费者监听zookeeper上对应的服务目录,当服务目录变动后,服务消费者则重新到zookeeper上获取新的服务地址信息,然后运算负载均衡算法选择一个新的服务进行请求。

  如果有没有讲明白的可以留言,我进行更正。基本上一个RPC就是这样,剩下的一些基于RPC的框架无非就是实现了多些协议,以及一些多种语言环境的考虑和效率的提升。

   觉得不错点个推荐吧,看在我花了一天时间把自己的知识整理分析,谢谢喽。当然这还是没有写好,等我下周有时间再添加图片进行完善,关于这个架构的设计欢迎大家讨论,共同成长。