你的位置:首页 > 数据库

[数据库]HBase学习笔记

使用Java API与HBase集群交互时,需要构建HTable对象,使用该对象提供的方法来进行插入/删除/查询等操作。要创建HTable对象,首先要创建一个带有HBase集群信息的配置对象Configuration conf,其一般创建方法如下:

Configuration conf = HBaseConfiguration.create();//设置HBase集群的IP和端口conf.set("hbase.zookeeper.quorum", "10.172.1.61");conf.set("hbase.zookeeper.property.clientPort", "2181");

在拥有了conf之后,可以通过HTable提供的如下两种构造方法来创建HTable对象:

(1)直接利用conf来创建HTable对象,对应的构造函数如下:

public HTable(Configuration conf, final TableName tableName) throws IOException {  this.tableName = tableName;  this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;  if (conf == null) {   this.connection = null;   return;  }  this.connection = HConnectionManager.getConnection(conf);  this.configuration = conf;  this.pool = getDefaultExecutor(conf);  this.finishSetup(); }

注意红色部分的代码。这种构造方法实际上调用了HConnectionManager的getConnection函数,来获取了一个HConnection对象。一般使用Java API进行数据库操作的时候,都会创建一个类似的connection对象来维护一些数据库连接相关的信息(熟悉odbc,jdbc的话这一块就没有理解问题)。getConnection函数的具体实现如下:

public static HConnection getConnection(final Configuration conf) throws IOException {  HConnectionKey connectionKey = new HConnectionKey(conf);  synchronized (CONNECTION_INSTANCES) {   HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);   if (connection == null) {    connection = (HConnectionImplementation)createConnection(conf, true);    CONNECTION_INSTANCES.put(connectionKey, connection);   } else if (connection.isClosed()) {    HConnectionManager.deleteConnection(connectionKey, true);    connection = (HConnectionImplementation)createConnection(conf, true);    CONNECTION_INSTANCES.put(connectionKey, connection);   }   connection.incCount();   return connection;  }}

其中,CONNECTION_INSTANCES的类型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。同样注意红色部分的三行代码。第一行,根据conf信息创建了一个HConnectionKey的对象;第二行,去CONNECTION_INSTANCES中查找是否存在刚才创建的HConnectionKey;第三行,如果不存在,那么调用createConnection来创建一个HConnection的对象,否则直接返回刚才从Map中查找得到的HConnection对象

不嫌麻烦,再看一下HConnectionKey的构造函数和重写的hashCode函数,代码分别如下:

HConnectionKey(Configuration conf) {  Map<String, String> m = new HashMap<String, String>();  if (conf != null) {   for (String property : CONNECTION_PROPERTIES) {    String value = conf.get(property);    if (value != null) {     m.put(property, value);    }   }  }  this.properties = Collections.unmodifiableMap(m);  try {   UserProvider provider = UserProvider.instantiate(conf);   User currentUser = provider.getCurrent();   if (currentUser != null) {    username = currentUser.getName();   }  } catch (IOException ioe) {   HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);  }
}

public int hashCode() {  final int prime = 31;  int result = 1;  if (username != null) {   result = username.hashCode();  }  for (String property : CONNECTION_PROPERTIES) {   String value = properties.get(property);   if (value != null) {    result = prime * result + value.hashCode();   }  }  return result;}

可以看到,hashCode函数被重写以后,其返回值实际上是username的hashCode函数的返回值,而username来自于currentuser,currentuser又来自于provider,provider是由conf创建的。可以看出,只要有相同的conf,就能创建出相同的username,也就能保证HConnectionKey的hashCode函数被重写以后,能够在username相同时返回相同的值。而CONNECTION_INSTANCES是一个LinkedHashMap,其get函数会调用HConnectionKey的hashCode函数来判断该对象是否已经存在。因此,getConnection函数的本质就是根据conf信息返回connection对象,对每一个内容相同的conf,只会返回一个connection

(2)调用createConnection方法来显式地创建connection,再使用connection来创建HTable对象。createConnection方法和Htable对应的构造函数分别如下:

public static HConnection createConnection(Configuration conf) throws IOException {  UserProvider provider = UserProvider.instantiate(conf);  return createConnection(conf, false, null, provider.getCurrent());}static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); }}

public HTable(TableName tableName, HConnection connection) throws IOException {  this.tableName = tableName;  this.cleanupPoolOnClose = true;  this.cleanupConnectionOnClose = false;  this.connection = connection;  this.configuration = connection.getConfiguration();  this.pool = getDefaultExecutor(this.configuration);  this.finishSetup(); }

可以看出,这样的话每次创建HTable对象,都需要创建一个新的HConnection对象,而不像方法(1)中那样共享一个HConnection对象。

 

那么,上述两种方法,在执行插入/删除/查找的时候,性能如何呢?先从代码角度分析一下。为了简便,先分析HTable在执行put(插入)操作时具体做的事情。

HTable的put函数如下:

public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {  doPut(put);  if (autoFlush) {   flushCommits();  }}private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {  if (ap.hasError()){   writeAsyncBuffer.add(put);   backgroundFlushCommits(true);  }  validatePut(put);  currentWriteBufferSize += put.heapSize();  writeAsyncBuffer.add(put);  while (currentWriteBufferSize > writeBufferSize) {   backgroundFlushCommits(false);  }}private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException {  try {   do {    ap.submit(writeAsyncBuffer, true);   } while (synchronous && !writeAsyncBuffer.isEmpty());   if (synchronous) {    ap.waitUntilDone();   }   if (ap.hasError()) {    LOG.debug(tableName + ": One or more of the operations have failed -" +      " waiting for all operation in progress to finish (successfully or not)");    while (!writeAsyncBuffer.isEmpty()) {     ap.submit(writeAsyncBuffer, true);    }    ap.waitUntilDone();    if (!clearBufferOnFail) {     // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the     // write buffer. This is a questionable feature kept here for backward compatibility     writeAsyncBuffer.addAll(ap.getFailedOperations());    }    RetriesExhaustedWithDetailsException e = ap.getErrors();    ap.clearErrors();    throw e;   }  } finally {   currentWriteBufferSize = 0;   for (Row mut : writeAsyncBuffer) {    if (mut instanceof Mutation) {     currentWriteBufferSize += ((Mutation) mut).heapSize();    }   }  }}

如红色部分所表示,调用顺序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是类AsyncProcess的对象。因此追踪到AsyncProcess类,其代码如下:

public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {  submitLowPriority(rows, atLeastOne, false);}public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException {  if (rows.isEmpty()) {   return;  }  // This looks like we are keying by region but HRegionLocation has a comparator that compares  // on the server portion only (hostname + port) so this Map collects regions by server.  Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>();  List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());  long currentTaskCnt = tasksDone.get();  boolean alreadyLooped = false;  NonceGenerator ng = this.hConnection.getNonceGenerator();  do {   if (alreadyLooped){    // if, for whatever reason, we looped, we want to be sure that something has changed.    waitForNextTaskDone(currentTaskCnt);    currentTaskCnt = tasksDone.get();   } else {    alreadyLooped = true;   }   // Wait until there is at least one slot for a new task.   waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);   // Remember the previous decisions about regions or region servers we put in the   // final multi.   Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();   Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();   int posInList = -1;   Iterator<? extends Row> it = rows.iterator();   while (it.hasNext()) {    Row r = it.next();    HRegionLocation loc = findDestLocation(r, posInList);    if (loc == null) { // loc is null if there is an error such as meta not available.     it.remove();    } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {     Action<Row> action = new Action<Row>(r, ++posInList);     setNonce(ng, r, action);     retainedActions.add(action);     addAction(loc, action, actionsByServer, ng);     it.remove();    }   }  } while (retainedActions.isEmpty() && atLeastOne && !hasError());  HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();  sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority);}private HRegionLocation findDestLocation(Row row, int posInList) { if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); HRegionLocation loc = null; IOException locationException = null; try {  loc = hConnection.locateRegion(this.tableName, row.getRow());  if (loc == null) {   locationException = new IOException("#" + id + ", no location found, aborting submit for" +     " tableName=" + tableName +     " rowkey=" + Arrays.toString(row.getRow()));  } } catch (IOException e) {  locationException = e; } if (locationException != null) {  // There are multiple retries in locateRegion already. No need to add new.  // We can't continue with this row, hence it's the last retry.  manageError(posInList, row, false, locationException, null);  return null; } return loc;}

这里代码的主要实现机制是异步调用,也就是说,并非每一次put操作都是直接往HBase里面写数据的,而是等到缓存区域内的数据多到一定程度(默认设置是2M),再进行一次写操作。当然这次操作在Server端应当还是要排队执行的,具体执行机制这里不作展开。可以确定的是,HConnection在HTable的put操作中,只是起到一个定位RegionServer的作用,在定位到RegionServer之后,操作都是由cilent端通过rpc调用完成的,与connection无关。这个结论在插入/查询/删除中是一致的。另外,locateRegion其实只有第一次或者缓存满了的时候才会进行rpc调用,其他时候都是直接从缓存中获取RegionServer信息,详情可以查看locateRegion的源码,这里也不再展开。

代码分析告一段落,简单做一个实验来验证上述论断:

环境:四台linux 64G服务器组成的HBase集群,连接速度平均5ms

实验代码如下:

public class TestHyperbaseConection {  public static void main(String[] args) throws Exception{    Configuration conf = HBaseConfiguration.create();    conf.set("hbase.zookeeper.quorum", "10.172.1.16");    conf.set("hbase.zookeeper.property.clientPort", "2181");    ThreadInfo info = new ThreadInfo();    info.setTableNamePrefix("test");    info.setColNames("col1,col2");    info.setTableCount(1);    info.setConnStrategy("CREATEWITHCONF");//CREATEWITHCONF,CREATEWITHCONN    info.setWriteStrategy("SEPERATE");//OVERLAP,SEPERATE    info.setLifeCycle(60000L);    int threadCount = 100;    for(int i=0;i<threadCount;i++){      //createTable(tableNamePrefix+i,colNames,conf);    }    //    for(int i=0;i<threadCount;i++){      new Thread(new WriteThread(conf,info,i)).start();    }    //HBaseAdmin admin = new HBaseAdmin(conf);    //System.out.println(admin.tableExists("test"));  }  public static void createTable(String tableName,String[] colNames,Configuration conf) {    System.out.println("start create table "+tableName);    try {      HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);      if (hBaseAdmin.tableExists(tableName)) {        System.out.println(tableName + " is exist");        //hBaseAdmin.disableTable(tableName);        //hBaseAdmin.deleteTable(tableName);        return;      }      HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);      for(int i=0;i<colNames.length;i++) {        tableDescriptor.addFamily(new HColumnDescriptor(colNames[i]));      }      hBaseAdmin.createTable(tableDescriptor);    } catch (Exception ex) {      ex.printStackTrace();    }    System.out.println("end create table "+tableName);  }}//Thread执行操作的配置信息class ThreadInfo {  private int tableCount;  String tableNamePrefix;  String[] colNames;  //CREATEBYCONF or CREATEBYCONN  String connStrategy;  //overlap or seperate  String writeStrategy;  long lifeCycle;  public ThreadInfo(){  }  public int getTableCount() {    return tableCount;  }  public void setTableCount(int tableCount) {    this.tableCount = tableCount;  }  public String getTableNamePrefix() {    return tableNamePrefix;  }  public void setTableNamePrefix(String tableNamePrefix) {    this.tableNamePrefix = tableNamePrefix;  }  public String[] getColNames() {    return colNames;  }  public void setColNames(String[] colNames) {    this.colNames = colNames;  }  public void setColNames(String colNames) {    if(colNames == null){      this.colNames = null;    }    else{      this.colNames = colNames.split(",");    }  }  public String getWriteStrategy() {    return writeStrategy;  }  public void setWriteStrategy(String writeStrategy) {    this.writeStrategy = writeStrategy;  }  public String getConnStrategy() {    return connStrategy;  }  public void setConnStrategy(String connStrategy) {    this.connStrategy = connStrategy;  }  public long getLifeCycle() {    return lifeCycle;  }  public void setLifeCycle(long lifeCycle) {    this.lifeCycle = lifeCycle;  }}class WriteThread implements Runnable{  private Configuration conf;  private ThreadInfo info;  private int index;  public WriteThread(Configuration conf,ThreadInfo info,int index){    this.conf = conf;    this.info = info;    this.index = index;  }  @Override  public void run(){    String threadName = Thread.currentThread().getName();    int operationCount = 0;    HTable[] htables = null;    HConnection conn = null;    int tableCount = info.getTableCount();    String tableNamePrefix = info.getTableNamePrefix();    String[] colNames = info.getColNames();    String connStrategy = info.getConnStrategy();    String writeStrategy = info.getWriteStrategy();    long lifeCycle = info.getLifeCycle();    System.out.println(threadName+": started with index "+index);    try{      if (connStrategy.equals("CREATEWITHCONN")) {        conn = HConnectionManager.createConnection(conf);        if (writeStrategy.equals("SEPERATE")) {          htables = new HTable[1];          htables[0] = new HTable(TableName.valueOf(tableNamePrefix+(index%tableCount)), conn);        }        else if(writeStrategy.equals("OVERLAP")) {          htables = new HTable[tableCount];          for (int i = 0; i < tableCount; i++) {            htables[i] = new HTable(TableName.valueOf(tableNamePrefix+i), conn);          }        }        else{          return;        }      }      else if (connStrategy.equals("CREATEWITHCONF")) {        conn = null;        if (writeStrategy.equals("SEPERATE")) {          htables = new HTable[1];          htables[0] = new HTable(conf,TableName.valueOf(tableNamePrefix+(index%tableCount)));        }        else if(writeStrategy.equals("OVERLAP")) {          htables = new HTable[tableCount];          for (int i = 0; i < tableCount; i++) {            htables[i] = new HTable(conf,TableName.valueOf(tableNamePrefix+i));          }        }        else{          return;        }      }      else {        return;      }      long start = System.currentTimeMillis();      long end = System.currentTimeMillis();      Map<HTable,HColumnDescriptor[]> table_columnFamilies = new HashMap<HTable,HColumnDescriptor[]>();      for(int i=0;i<htables.length;i++){        table_columnFamilies.put(htables[i],htables[i].getTableDescriptor().getColumnFamilies());      }      while(end-start<=lifeCycle){        HTable table = htables.length==1?htables[0]:htables[(int)Math.random()*htables.length];        long s1 = System.currentTimeMillis();        double r = Math.random();        HColumnDescriptor[] columnFamilies = table_columnFamilies.get(table);        Put put = generatePut(threadName,columnFamilies,colNames,operationCount);        table.put(put);        if(r>0.999){          System.out.println(System.currentTimeMillis()-s1);        }        operationCount++;        end = System.currentTimeMillis();      }      if(conn != null){        conn.close();      }    }catch(Exception ex){      ex.printStackTrace();    }    System.out.println(threadName+": ended with operation count:"+operationCount);  }  private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,String[] colNames,int operationCount){    Put put = new Put(Bytes.toBytes(threadName+"_"+operationCount));    for (int i = 0; i < columnFamilies.length; i++) {      String familyName = columnFamilies[i].getNameAsString();      //System.out.println("familyName:"+familyName);      for(int j=0;j<colNames.length;j++){        if(familyName.equals(colNames[j])) { //          String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j));          String val = ""+columnName.hashCode()%100;          put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val));        }      }    }    //System.out.println(put.toString());    return put;  }}

简单来说就是先创建一些有两列的HBase表,然后创建一些线程分别采用getConnection策略和createConnection策略来写1分钟的数据。当然写几张表,写多久,写什么,怎么写都可以调整。比如我这里就设计了固定写一张表或者随机写一张表几种逻辑。需要注意一下红色部分的代码,这里预先获得了要写的HBase表的列信息。这一步比我之前想象的要重要得多,在下一篇笔记中会详细展开。

具体实验数据如下表所示,具体值因为网络波动等原因会有所差异。总的来说,在线程较多(大于30)的时候,getConnection方法速度要明显快于createConnection;在线程较少(小于等于10)的时候,createConnection则稍微占优。猜想造成这种情况的原因是createConnection线程过多可能会导致ZK的负载过大,即便是多个regionServer在负责具体的写操作,也仍旧会导致性能下降。还有一点值得注意的是,createConnection策略需要显式地关闭某个连接,否则它将持续地占有资源,甚至导致内存泄露。因此,建议大家在使用Java API与HBase交互时,尽量使用getConnection的办法去创建HTable对象,避免维护不必要的connection导致浪费资源。

thread_counttable_countconn_strategywrite_strategyintervalresult
11CONF/60s10000
51CONF/60s11000
101CONF/60s12000
301CONF/60s8300
601CONF/60s6000
1001CONF/60s4700
11CONN/60s12000
51CONN/60s16000
101CONN/60s10000
301CONN/60s2500
601CONN/60s1200
1001CONN/60s1000
55CONFSEPERATE60s9600~16000
1010CONFSEPERATE60s8800~15000
3030CONFSEPERATE60s6800~7000
6060CONFSEPERATE60s3500~3800
100100CONFSEPERATE60s2300~2700
55CONNSEPERATE60s14000
1010CONNSEPERATE60s10000~11000
3030CONNSEPERATE60s3200~3300
6060CONNSEPERATE60s1400~1500
100100CONNSEPERATE60s880~980