你的位置:首页 > Java教程

[Java教程]实现Hbase的分页


做一个项目中由于数据量比较大,并且需要定时增量分析,做了hbase的分页。项目中用到的版本是hbase1.1 。需要启用协处理器 Aggregation

1.启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.

<property>  <name>hbase.coprocessor.user.region.classes</name>  <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> </property>

2.启用表aggregation,只对特定的表生效。通过HBase Shell 来实现。

(1)disable指定表。hbase> disable 'mytable'

(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

(3)重启指定表 hbase> enable 'mytable'

 

Hbase客户端调用代码示例

    1、 得到hbase的表结构总数

 public int getTotalRecord(Table keyIndexTable , String nowTime){
     int count=0;
       AggregationClient aggregationClient = new AggregationClient(config);
    Scan scan=new Scan();
    scan.setStopRow(nowTime.getBytes());//小于当前时间
    try {
     Long rowCount = aggregationClient.rowCount(keyIndexTable, new LongColumnInterpreter(), scan);
     aggregationClient.close();
     count=rowCount.intValue();
    } catch (Throwable e) {
     e.printStackTrace();
    }
    return count;
    }

2 ,实现分页

 

 public Map<String,String> getIndexTableInfo(Table table,String tableName, String nowTime,String startRow, Integer currentPage, Integer pageSize){      Map<String,String> communtiyKeysMap=new TreeMap<String,String>();      ResultScanner scanner = null;      // 为分页创建的封装类对象,下面有给出具体属性      try {        // 获取最大返回结果数量        if (pageSize == null || pageSize == 0L)          pageSize = 100;        if (currentPage == null || currentPage == 0)          currentPage = 1;        // 计算起始页和结束页        Integer nowPageSize=pageSize+1;        // MUST_PASS_ALL(条件 AND) MUST_PASS_ONE(条件OR)        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);        Filter filter1=new PageFilter(nowPageSize);        filterList.addFilter(filter1);//        if(tableName.equals("COMMUNITY_KEYS_INDEX")){//        Filter filter2 = new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("communitykey")));//        filterList.addFilter(filter2);//        }        Scan scan = new Scan();        scan.setFilter(filterList);        scan.setMaxResultSize(nowPageSize);        scan.setStartRow(Bytes.toBytes(startRow));        if(!nowTime.equals("")){        scan.setStopRow(nowTime.getBytes());        }        scanner = table.getScanner(scan);        int i = 1;        // 遍历扫描器对象, 并将需要查询出来的数据row key取出        for (Result result : scanner) {           String row=new String(result.getRow());            for (Cell cell : result.rawCells()) {//              System.out.println("列族:"+new String(CellUtil.cloneQualifier(cell))+">>>"+new String(CellUtil.cloneValue(cell)));              if(i==nowPageSize){              communtiyKeysMap.put("nextStart", row.substring(0,row.lastIndexOf(":")));              break;              }              communtiyKeysMap.put(row, new String(CellUtil.cloneValue(cell)));            }        i++;        }      } catch (IOException e) {        e.printStackTrace();      } finally {        if (scanner != null)          scanner.close();      }      return communtiyKeysMap;      }


3,该分页中处理和跳转下一页

for(int page=1;page<=pageNum;page++){          //分页         List<String> pageList = new ArrayList<String>();  //子类调用具体分析      //1.查出要分析的数据      Map<String,String> communtiyKeysMap=getIndexTableInfo(hTable,hbaseIndexTabel,nowTime,startRow,page,pageSize);       for(String communitykey:communtiyKeysMap.keySet()){        String rowKeyIndex=communitykey;               String cellValue=communtiyKeysMap.get(rowKeyIndex);        if(communitykey.equals("nextStart")){          startRow=cellValue;          continue;            //下一页进行跳转        }       }             //实现调用具体的分析       //实现该分页处理} 

该过程总共为三步,1.设置表的协处理器 Aggregation,使表能够实现统计功能。2.分页,每次取出1001条数据,每页数据为1000条,第1001条的rowkey为下一页的startrowkey,做为标志“nextStart” 。3分页之后进行查找关联数据和进行逻辑分析处理。