HBase 使用与原理总结
window启动: 点击 bin/start-hbase.cmd 即可;
UI: 16010
创建连接:
public static Connection Conn() {
Configuration config = HBaseConfiguration.create();
// 本地测试
config.set(“hbase.zookeeper.quorum”, “127.0.0.1”);
config.set(“hbase.zookeeper.property.clientPort”, “2181”);
config.set(“hbase.master”, “10.110.135.186:8020”);
Connection conn = null;
try {
conn = ConnectionFactory.createConnection(config);
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
解析每个region:
public static void RegionParse(Connection conn, String tableName) {
try {
Admin admin = conn.getAdmin();
List<HRegionInfo> regionInfo = admin.getTableRegions(TableName.valueOf(tableName));
regionInfo.stream().forEach(r -> {
System.out.println(r.getRegionId());
System.out.print(String.format(“\t %s, %s”, new String(r.getStartKey()), new String(r.getEndKey())));
System.out.println();
});
} catch (Exception e) {
}
}
预创建表:
public static void CreateTable(Connection conn, String tableName, byte[][] preRegion, String… cf) {
try {
Admin admin = conn.getAdmin();
// 创建表名
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for (String c : cf) {
tableDesc.addFamily(new HColumnDescriptor(c));
}
// 如果不存在该表, 则创建表;
if (!admin.tableExists(TableName.valueOf(tableName))) admin.createTable(tableDesc, preRegion);
} catch (Exception e) {
e.printStackTrace();
}
}
写入数据:
public static void PutData(Connection conn, String table, String rowkey, String cf, String colum, String val) {
try {
Table t = conn.getTable(TableName.valueOf(table));
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(colum), Bytes.toBytes(val));
t.put(put);
} catch (Exception e) {
e.printStackTrace();
}
}
部分查找:
public static void PartGet(Connection conn, String tableName, String partRowKey) {
try {
Table t = conn.getTable(TableName.valueOf(tableName));
Scan s = new Scan();
//s.setFilter(new PrefixFilter(partRowKey.getBytes()));
s.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(partRowKey)));
ResultScanner rse = t.getScanner(s);
for (Result rs : rse) {
KeyValue[] kv = rs.raw();
for (int i = 0; i < kv.length; i++) {
System.out.print(new String(kv[i].getRow()) + ” “);
System.out.print(new String(kv[i].getFamily()) + “:”);
System.out.print(new String(kv[i].getQualifier()) + ” “);
System.out.print(kv[i].getTimestamp() + ” “);
System.out.println(new String(kv[i].getValue()));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
获取rowkey的完整结果:
public static Result GetResult(Connection conn, String tableName, String rowKey) {
Result result = null;
try {
Table t = conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
result = t.get(get);
} catch (Exception e) {
e.printStackTrace();
}
for (KeyValue k : result.list()) {
System.out.println(Bytes.toString(k.getFamily()));
System.out.println(Bytes.toString(k.getQualifier()));
System.out.println(Bytes.toString(k.getValue()));
System.out.println(k.getTimestamp());
}
return result;
}
byte[][] splitKeys = {“100-raw”.getBytes(),”200-aws”.getBytes(),”300-zzz”.getBytes()};
#### 预分区规则 ####
hash-keys: byte[][] = {000,001,002}
预分区: 000| 001| 002| 将生成 [(+,000), (000,001), (001,002), (002, +)] 4个region;
HBase的RowKey排序是根据自小到大的排序规则,排序的比较方式是使用ASCII码的大小对比;
1-zzs
8-aaa
8-rwa
-> 实际预分区使用:
vin + nio_encoding
预分区:
首先模拟数据量, 比如10亿条数据, 分成50个region, vin: 10位, nio_encoding: 16位, 取MD5Hash进行处理; -> 10-vin10-nio_encoding16.bytes()
真实数据:
对于各partition形成的分区数据, 使用hash()取到其hash值, 拼接成 10-vin10-nio_encoding16.bytes() rowkey即可;
部分查找:
Scan s = new Scan();
//s.setFilter(new PrefixFilter(partRowKey.getBytes()));
s.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(partRowKey))); //.*-vin10-nio_encoding16
bug解决:
需要配置hadoop环境等resources文件;
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
java.io.IOException: java.lang.reflect.InvocationTargetException
包冲突、有重复包或者缺少包
找到最下面的casue by, 指向的就是问题的原因。
1. 随机读写系统
分区 share, 在传入数据前必须提前划分好数据的存储范围, 如何超过容量就会 reshare, 重分区相当于数据重做.
2.数据结构:
row-key column+cell
row-1 column=cf1, timestamp
定义列簇时, 一个列簇包含相同数据结构的列, 存储在一个HFile; 列簇不能修改,且数据有限, 列可以很多。
SortedMap<
RowKey,SortedMap<
Column,SortedMap<
Value, Cell
>
>
>
3. 分区
rowkey, 切成多个region, 每台服务器建议10-1000个region, 每个1G;
HFile 由多个64kb的存储块组成, 块索引加载到内存, 进行二分查找. 每次数据更新时, 数据记录在 commit log预写日志中, 然后写入内存, 内存满了, 落到磁盘; HFile归并合并。
4. 执行
hbase shell
create \’t1\’, \’f1\’, \’f2\’, \’f3\’; //创建表以及列簇
list \’t1\’
put \’t1\’, \’r1\’, \’f1:c1\’, \’value\’ //添加rowkey, 为f1列簇 添加新列 c1, 其值v1
scan \’testtable\’
get \’testtable\’, \’row-1\’
delete, disable, drop
5. 行键设计
HBase数据分割主要使用列簇, 高表要比宽表效率更高, 将列添加进rowkey, put \’testtable\’ , \’12312-messageId\’, \’cf1:message\’
在设计rowke时, 每个属性的长度必须一致, 这样才能存储 以及 使用 部分键扫描,
按日期降序排序, \’row-1-(Long.Max-date)\’, 注意, 经常需要更新的字段 不适合作为rowkey.
时间序列数据, 以数据产生的时间戳为特点, 为造成数据热点问题, 使用salt操作, byte prefix = (byte)Long.hashcode(time) % region server rowkey = Bytes.add(Bytes.toByte(prefix),time)
这样使用部分键扫描时, 会在不同服务器上 并发获取;
另一种是完全随机化, 适用于不需要查询的操作; rowkey = MD5(time)
辅助索引, 相当于带条件的过滤SQL, when … >= …
介绍下hbase的过滤器
对读取的hbase中的数据进行过滤
过滤器可以在hbase中表的多个维度(行,列,数据版本)上进行对数据的筛选操作
可以借助hbase的过滤器精确到某一个版本的cell
之前使用最简单的过滤器:
Scan scan = new Scan();
// set start and stop rowkey
scan.setStartRow(Bytes.toBytes(“10003”));
scan.setStopRow(Bytes.toBytes(“10005”));
// set column family or cloumn
scan.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“name”));
scan.addColumn(Bytes.toBytes(“info”), Bytes.toBytes(“age”));
创建各种条件的过滤器实例:
1、筛选出具有特定前缀的行键的数据
Filter pf = new PrefixFilter(Bytes.toBytes(“392”));
scan.setFilter(pf);
2、只返回每行的rowkey列簇和列,不返回值
Filter kof = new KeyOnlyFilter();
scan.setFilter(kof);
3、本过滤器的作用就是按照一定的几率(<=0会过滤掉所有的行,>=1会包含所有的行)来返回随机的结果集
Filter rrf = new RandomRowFilter((float) 0.1);
scan.setFilter(rrf);
4、只想返回的结果集中只包含第一列的数据
Filter fkof = new FirstKeyOnlyFilter();
scan.setFilter(fkof);
5、ColumnPrefixFilter:顾名思义,它是按照列名的前缀来筛选单元格的
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes(“product”));
scan.setFilter(cpf);
6、组合使用
Filter pf = new PrefixFilter(Bytes.toBytes(“392”));
Filter kof = new KeyOnlyFilter();
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes(“product”));
List<Filter> filters = new ArrayList<Filter>();
filters.add(pf);
filters.add(kof);
filters.add(cpf);
scan.setFilter(new FilterList(Operator.MUST_PASS_ALL,filters));
FilterList是一个可以构造包含多个Filter对象的集合
第一个参数:Operator.MUST_PASS_ALL
Hbase提供了枚举类型的变量来表示这些抽象的操作符
整体表示综合使用多种过滤器
第二个参数:Filter类型的List集合
Hbase的底层架构
Master
通常是Hadoop里面的一台或者两台服务器[HA],保证master高可用
管理并分配region给regionserver,对集群的region数量进行负载均衡分配
Master会借助zookeeper感知regionserver的上下线,某台宕机后会重新分配这台服务器上的region
zookeeper作为观察者模式,regionserver会在zookeeper上有注册缓存信息,缓存消失则判断宕机
Master不参与HBase表数据的读写,负载通常很低
Regionserver
服务器通常是Hadoop集群里面的一部分/全部服务器,在conf/regionservers内指定
管理当前服务器上的所有region
响应客户端的数据读写IO请求
处理Flush、compact、split操作
zookeeper
通常是另外几台独立服务器的集群
基于观察者模式监控master和regionserver的状态,保证hbase的可用性
存储了client访问HBase的寻址入口
存储了meta表的元数据信息,即元数据表meta的位置信息
一般meta表只有一个region,存储在某个regionserver上
meta表存储的信息
如一张表有哪些region
region分配在哪些regionserver上
每个region的startkey和endkey
Hbase表数据的读写流程
HBase读数据流程: –根据rowkey查询
1、client先去访问zookeeper,从zookeeper里面获取meta表所在位置信息
以前的版本除了meta表还有一个root表,root表存储meta表位置信息,现在直接存入zookeeper中
2、client向meta所在regionserver发起访问,读取meta表数据,获取hbase集群上所有user表的元数据
3、根据meta表中的region划分及分配的信息,client找到了当前需要访问的数据对应的region及所在的regionserver服务器
4、client向对应regionserver服务器发起读请求
5、regionserver收到client访问请求,先扫描memstore,在扫描blockcache,最后再读取storefile[HDFS]文件
6、regionserver把数据响应给client
HBase写数据流程: –根据rowkey写入
1、client先去访问zookeeper,从zookeeper里面获取meta表所在位置信息
以前的版本除了meta表还有一个root表,root表存储meta表位置信息,现在直接存入zookeeper中
2、client向meta所在regionserver发起访问,读取meta表数据,获取hbase集群上所有user表的元数据
3、根据meta表中的region划分及分配的信息,client找到了当前需要访问的数据对应的region及所在的regionserver服务器
4、client向对应regionserver服务器发起写请求
5、regionserver收到client请求,并响应,client先把数据写入HLog,防止数据丢失
6、再把数据写入memstore内存缓存区,默认128M
7、当Hlog和memstore都写入成功,则这条数据写入成功
8、当memstore达到阀值[128M],会把memstore里面的数据Flush成storefile
9、当[128M]storefile越来越多,会触发compact合并操作,把多storefile合并成一个大的storefile
10、当单个storefiles[region]越来越大,达到阀值10G时会触发split操作,region被一分为二被管理