你的位置:首页 > Java教程

[Java教程]第十一章 自己实现一致性hash算法


关于一致性hash算法的意义以及其相对于简单求余法(除数求余法)的好处,查看第六章 memcached剖析

 

注意:真实的hash环的数据结构是二叉树,这里为了简便使用了列表List

1、一致性hash算法的使用地方

  • memcached服务器
  • Jedis分片机制

 

2、真实服务器节点没有虚拟化的一致性hash算法实现

ServerNode:真实服务器节点

 1 package hash; 2  3 /** 4  * server节点 5 */ 6 public class ServerNode { 7   private String serverName; 8   private long serverHash; 9 10   public String getServerName() {11     return serverName;12   }13 14   public void setServerName(String serverName) {15     this.serverName = serverName;16   }17 18   public long getServerHash() {19     return serverHash;20   }21 22   public void setServerHash(long serverHash) {23     this.serverHash = serverHash;24   }25 26   /**27    * 下边重写hashcode()和equals()是为了在删除节点的时候只根据传入的serverName删除即可28   */29   @Override30   public int hashCode() {31     final int prime = 31;32     int result = 1;33     result = prime * result34         + ((serverName == null) ? 0 : serverName.hashCode());35     return result;36   }37 38   @Override39   public boolean equals(Object obj) {40     if (this == obj)41       return true;42     if (obj == null)43       return false;44     if (getClass() != obj.getClass())45       return false;46     ServerNode other = (ServerNode) obj;47     if (serverName == null) {48       if (other.serverName != null)49         return false;50     } else if (!serverName.equals(other.serverName))51       return false;52     return true;53   }54   55   56 }

View Code

注意:

  • serverName可以自己取名,这里取名为"ip:port"
  • 对于hashCode()和equals()方法的重写仅仅是为了删除服务器节点的时候,只根据serverName就可以删除,而不需要再计算服务器节点的hash值

 

ServerComparator:真实服务器比较器

 1 package hash; 2  3 import java.util.Comparator; 4  5 /** 6  * 服务器排序比较器 7 */ 8 public class ServerComparator implements Comparator<ServerNode> { 9 10   public int compare(ServerNode node1, ServerNode node2) {11     if(node1.getServerHash() <= node2.getServerHash()) {12       return -1;//node1<node213     }14     return 1;//node1>node215   }16 17 }

View Code

注意:

  • 关于java的比较器,有两种:(后者用的多一些)
    • javabean实现comparable接口,实现compareTo()方法
    • 另外建一个类实现comparator接口,实现其中的compare()方法

 

ConsistentHash:一致性hash实现类

 1 package hash; 2  3 import java.util.ArrayList; 4 import java.util.Collections; 5 import java.util.List; 6 import java.util.zip.CRC32; 7  8 /** 9  * 一致性hash实现(数据结构:list)(服务器没有虚拟化) 10  * 一致性hash的真正数据结构是二叉树 11 */ 12 public class ConsistentHash { 13   private List<ServerNode> servers = new ArrayList<ServerNode>();//存放服务器 14    15   /** 计算服务器和存储的键的hash值 */ 16   public long hash(String str){ 17     CRC32 crc32 = new CRC32(); 18     crc32.update(str.getBytes()); 19     return crc32.getValue(); 20   } 21    22   /** 23    * 添加server到环上 24    * @param serverName ip:port 25   */ 26   public void addServer(String serverName){ 27  28     ServerNode node = new ServerNode(); 29     node.setServerName(serverName); 30     node.setServerHash(hash(serverName)); 31      32     servers.add(node); 33     Collections.sort(servers, new ServerComparator()); 34   } 35    36   /** 37    * 从环上删除server节点 38   */ 39   public void deleteServer(String serverName){ 40  41     ServerNode node = new ServerNode(); 42     node.setServerName(serverName); 43      44     servers.remove(node); 45   } 46    47   /** 48    * 获取一个缓存key应该存放的位置 49    * @param cachekey 缓存的key 50    * @return 缓存的服务器节点 51   */ 52   public ServerNode getServer(String cachekey){ 53     long keyHash = hash(cachekey); 54      55     for(ServerNode node : servers){ 56       if(keyHash<=node.getServerHash()){ 57         return node; 58       } 59     } 60      61     return servers.get(0);//如果node没有合适放置位置,放在第一台服务器上去 62   } 63    64   /****************测试*******************/ 65   public void printServers(){ 66     for(ServerNode server : servers){ 67       System.out.println(server.getServerName()+"-->"+server.getServerHash()); 68     } 69   } 70    71   public static void main(String[] args) { 72     ConsistentHash ch = new ConsistentHash(); 73     ch.addServer("127.0.0.1:11211"); 74     ch.addServer("127.0.0.1:11212"); 75     ch.addServer("127.0.0.2:11211"); 76     ch.addServer("127.0.0.2:11212"); 77      78     ch.printServers(); 79      80     ServerNode node = ch.getServer("hello"); 81     System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getServerHash()); 82      83     ServerNode node2 = ch.getServer("name"); 84     System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getServerHash()); 85      86     ServerNode node3 = ch.getServer("a"); 87     System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getServerHash()); 88      89     /********************删除节点*********************/ 90     ch.deleteServer("127.0.0.1:11212"); 91     ch.printServers(); 92      93     ServerNode node0 = ch.getServer("hello"); 94     System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getServerHash()); 95      96     ServerNode node02 = ch.getServer("name"); 97     System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getServerHash()); 98      99     ServerNode node03 = ch.getServer("a");100     System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getServerHash());101     102   }103 }

View Code

注意:

  • 在计算服务器节点和存储的key的hash值的时候,不仅仅可以使用crc32算法,还可以使用MD5算法等等,只要是最后得出的结果是一个>=0&&<=232的数就好
  • 在这个实现中,并没有将真实服务器节点进行虚拟化

 

3、真实服务器节点虚拟化后的一致性hash算法实现

为什么要虚拟化,查看第六章 memcached剖析 ,这里只列出几条原因:

  • 在memcached服务器较少的情况下,很难平均的分布到hash环上,这样就会造成负载不均衡--引入虚拟化节点,可以解决这个问题
  • 当一台memcached宕机时,其原先所承受的压力全部给了其下一个节点,为了将其原先所承受的压力尽可能的分布给所有剩余的memcached节点,引入虚拟化节点可以达到这个目的
  • 当新添加了一台memcached服务器server1时,server1只会缓解其中的一台服务器(即server1插入环后,server1的下一个节点)的压力,为了可以让server1尽可能的缓解所有的memcached服务器的压力,引入虚拟节点可以达到这个目的

 

VirtualServerNode:虚拟节点

 1 package hash2; 2  3 /** 4  * 虚拟节点 5 */ 6 public class VirtualServerNode { 7   private String serverName;//真实节点名称 8   private long virtualServerHash;//虚拟节点hash 9 10   public String getServerName() {11     return serverName;12   }13 14   public void setServerName(String serverName) {15     this.serverName = serverName;16   }17 18   public long getVirtualServerHash() {19     return virtualServerHash;20   }21 22   public void setVirtualServerHash(long virtualServerHash) {23     this.virtualServerHash = virtualServerHash;24   }25 26 }

View Code

注意:

  • 该类中的serverName是该虚拟节点对应的真实节点的名称,这里就是"ip:port"
  • 真正的虚拟节点的名称是serverName-i(其中,i是0~virtualCount的整数值),这一块儿请查看ConsistentHashWithVirtualNode的addServer(String serverName)

 

VirtualServerComparator:虚拟节点比较器

 1 package hash2; 2  3 import java.util.Comparator; 4  5 /** 6  * 虚拟节点比较器 7 */ 8 public class VirtualServerComparator implements Comparator<VirtualServerNode> { 9 10   public int compare(VirtualServerNode node1, VirtualServerNode node2) {11     if(node1.getVirtualServerHash() <= node2.getVirtualServerHash()) {12       return -1;13     }14     return 1;15   }16 17 }

View Code

 

ConsistentHashWithVirtualNode:真实节点虚拟化后的一致性hash算法

 1 package hash2; 2  3 import java.util.ArrayList; 4 import java.util.Collections; 5 import java.util.List; 6 import java.util.zip.CRC32; 7  8 /** 9  * 具有虚拟节点的一致性hash实现(数据结构:list) 10  * 一致性hash的真正数据结构是二叉树 11 */ 12 public class ConsistentHashWithVirtualNode { 13   private List<VirtualServerNode> virtualServers = new ArrayList<VirtualServerNode>();//存放虚拟节点 14   private static final int virtualCount = 8;//每个真实节点虚拟成8个虚拟节点 15    16   /** 计算服务器和存储的键的hash值 */ 17   public long hash(String str){ 18     CRC32 crc32 = new CRC32(); 19     crc32.update(str.getBytes()); 20     return crc32.getValue(); 21   } 22    23   /** 24    * 添加server的虚拟节点到环上 25    * @param serverName ip:port 26   */ 27   public void addServer(String serverName){ 28  29     for(int count=0;count<virtualCount;count++){ 30       VirtualServerNode node = new VirtualServerNode(); 31       node.setServerName(serverName); 32       node.setVirtualServerHash(hash(serverName+"-"+count));//虚拟节点的名字:serverName+"-"+count 33       virtualServers.add(node); 34     } 35      36     Collections.sort(virtualServers, new VirtualServerComparator()); 37   } 38    39   /** 40    * 从环上删除server节点(需要删除所有的该server节点对应的虚拟节点) 41   */ 42   public void deleteServer(String serverName){ 43      44     /* 45      * 在这种删除的时候,会出现java.util.ConcurrentModificationException 46      * 这是因为此处的遍历方式为使用ArrayList内部类Itr进行遍历, 47      * 在遍历的过程中发生了remove、add等操作,导致modCount发生了变化, 48      * 产生并发修改异常, 49      * 可以使用下边的那一种方式来进行遍历(遍历方式不是Itr), 50      * 再这样的遍历过程中,add和remove都是没有问题的 51     */ 52     /*for(VirtualServerNode node : virtualServers){ 53       if(node.getServerName().equals(serverName)){ 54         virtualServers.remove(node); 55       } 56     }*/ 57     for(int i=0;i<virtualServers.size();i++) { 58       VirtualServerNode node = virtualServers.get(i); 59       if(node.getServerName().equals(serverName)) { 60         virtualServers.remove(node); 61       } 62     } 63      64   } 65    66   /** 67    * 获取一个缓存key应该存放的位置 68    * @param cachekey 缓存的key 69    * @return 缓存的服务器节点 70   */ 71   public VirtualServerNode getServer(String cachekey){ 72     long keyHash = hash(cachekey); 73      74     for(VirtualServerNode node : virtualServers){ 75       if(keyHash<=node.getVirtualServerHash()){ 76         return node; 77       } 78     } 79      80     return virtualServers.get(0);//如果node没有合适放置位置,放在第一台服务器上去 81   } 82    83   /****************测试*******************/ 84   public void printServers(){ 85     for(VirtualServerNode server : virtualServers){ 86       System.out.println(server.getServerName()+"-->"+server.getVirtualServerHash()); 87     } 88   } 89    90   public static void main(String[] args) { 91     ConsistentHashWithVirtualNode ch = new ConsistentHashWithVirtualNode(); 92     ch.addServer("127.0.0.1:11211"); 93     ch.addServer("127.0.0.1:11212"); 94     ch.addServer("127.0.0.2:11211"); 95     ch.addServer("127.0.0.2:11212"); 96      97     ch.printServers(); 98      99     VirtualServerNode node = ch.getServer("hello");100     System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getVirtualServerHash());101     102     VirtualServerNode node2 = ch.getServer("name");103     System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getVirtualServerHash());104     105     VirtualServerNode node3 = ch.getServer("a");106     System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getVirtualServerHash());107     108     /*********************删除节点之后**********************/109     ch.deleteServer("127.0.0.1:11212");110     ch.printServers();111     112     VirtualServerNode node0 = ch.getServer("hello");113     System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getVirtualServerHash());114     115     VirtualServerNode node02 = ch.getServer("name");116     System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getVirtualServerHash());117     118     VirtualServerNode node03 = ch.getServer("a");119     System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getVirtualServerHash());120     121   }122 }

View Code

注意:

  • 在实际操作中,一台memcached服务器虚拟成150台比较合适(100~200)
  • 从环上删除节点的算法写的较差,但是考虑到删除节点的操作在实际使用中用的比较少(宕机比较少,人为的删除节点也较少),也无所谓
  • 删除节点的时候,注意使用foreach语法糖去遍历的时候,在遍历的过程中不可以做删除、增加操作,否则会抛出并发修改异常,具体的原因见注释和第二章 ArrayList源码解析;想要实现在遍历的过程中进行删除、增加操作,使用简单for循环,见如上代码