
Hbase是一种NoSQL数据库,这意味着它不像传统的RDBMS数据库那样支持SQL作为查询语言。Hbase是一种分布式存储的数据库,技术上来讲,它更像是分布式存储而不是分布式数据库,它缺少很多RDBMS系统的特性,比如列类型,辅助索引,触发器,和高级查询语言等待
那Hbase有什么特性呢?如下:
强读写一致,但是不是“最终一致性”的数据存储,这使得它非常适合高速的计算聚合
自动分片,通过Region分散在集群中,当行数增长的时候,Region也会自动的切分和再分配
自动的故障转移
Hadoop/HDFS集成,和HDFS开箱即用,不用太麻烦的衔接
丰富的“简洁,高效”API,Thrift/REST API,Java API
块缓存,布隆过滤器,可以高效的列查询优化
*** 作管理,Hbase提供了内置的web界面来 *** 作,还可以监控JMX指标
什么时候用Hbase?
Hbase不适合解决所有的问题:
首先数据库量要足够多,如果有十亿及百亿行数据,那么Hbase是一个很好的选项,如果只有几百万行甚至不到的数据量,RDBMS是一个很好的选择。因为数据量小的话,真正能工作的机器量少,剩余的机器都处于空闲的状态
其次,如果你不需要辅助索引,静态类型的列,事务等特性,一个已经用RDBMS的系统想要切换到Hbase,则需要重新设计系统。
最后,保证硬件资源足够,每个HDFS集群在少于5个节点的时候,都不能表现的很好。因为HDFS默认的复制数量是3,再加上一个NameNode。
Hbase在单机环境也能运行,但是请在开发环境的时候使用。
内部应用
存储业务数据:车辆GPS信息,司机点位信息,用户 *** 作信息,设备访问信息。。。
存储日志数据:架构监控数据(登录日志,中间件访问日志,推送日志,短信邮件发送记录。。。),业务 *** 作日志信息
存储业务附件:UDFS系统存储图像,视频,文档等附件信息
不过在公司使用的时候,一般不使用原生的Hbase API,使用原生的API会导致访问不可监控,影响系统稳定性,以致于版本升级的不可控。
HFile
HFile是Hbase在HDFS中存储数据的格式,它包含多层的索引,这样在Hbase检索数据的时候就不用完全的加载整个文件。索引的大小(keys的大小,数据量的大小)影响block的大小,在大数据集的情况下,block的大小设置为每个RegionServer 1GB也是常见的。
探讨数据库的数据存储方式,其实就是探讨数据如何在磁盘上进行有效的组织。因为我们通常以如何高效读取和消费数据为目的,而不是数据存储本身。
Hfile生成方式
起初,HFile中并没有任何Block,数据还存在于MemStore中。
Flush发生时,创建HFile Writer,第一个空的Data Block出现,初始化后的Data Block中为Header部分预留了空间,Header部分用来存放一个Data Block的元数据信息。
而后,位于MemStore中的KeyValues被一个个append到位于内存中的第一个Data Block中:
注:如果配置了Data Block Encoding,则会在Append KeyValue的时候进行同步编码,编码后的数据不再是单纯的KeyValue模式。Data Block Encoding是HBase为了降低KeyValue结构性膨胀而提供的内部编码机制。
Region内每个ColumnFamily的数据组成一个Store。每个Store内包括一个MemStore和若干个StoreFile(HFile)组成。
HBase为了方便按照RowKey进行检索,要求HFile中数据都按照RowKey进行排序,Memstore数据在flush为HFile之前会进行一次排序
为了减少flush过程对读写的影响,HBase采用了类似于两阶段提交的方式,将整个flush过程分为三个阶段:
要避免“写阻塞”,貌似让Flush *** 作尽量的早于达到触发“写 *** 作”的阈值为宜。但是,这将导致频繁的Flush *** 作,而由此带来的后果便是读性能下降以及额外的负载。
每次的Memstore Flush都会为每个CF创建一个HFile。频繁的Flush就会创建大量的HFile。这样HBase在检索的时候,就不得不读取大量的HFile,读性能会受很大影响。
为预防打开过多HFile及避免读性能恶化,HBase有专门的HFile合并处理(HFile Compaction Process)。HBase会周期性的合并数个小HFile为一个大的HFile。明显的,有Memstore Flush产生的HFile越多,集群系统就要做更多的合并 *** 作(额外负载)。更糟糕的是:Compaction处理是跟集群上的其他请求并行进行的。当HBase不能够跟上Compaction的时候(同样有阈值设置项),会在RS上出现“写阻塞”。像上面说到的,这是最最不希望的。
提示 :严重关切RS上Compaction Queue 的size。要在其引起问题前,阻止其持续增大。
想了解更多HFile 创建和合并,可参看 Visualizing HBase Flushes And Compactions 。
理想情况下,在不超过hbaseregionserverglobalmemstoreupperLimit的情况下,Memstore应该尽可能多的使用内存(配置给Memstore部分的,而不是真个Heap的)。下图展示了一张“较好”的情况:
hbase使用的是jdk提供的ConcurrentSkipListMap,并对其进行了的封装,Map结构是<KeyValue,KeyValue>的形式。Concurrent表示线程安全。
SkipList是一种高效的数据结构,之前专门写过文章,这里就不表了
写入MemStore中的KV,被记录在kvset中。根据JVM内存的垃圾回收策略,在如下条件会触发Full GC。 1、内存满或者触发阈值。 2、内存碎片过多,造成新的分配找不到合适的内存空间。 RS上服务多个Region,如果不对KV的分配空间进行控制的话,由于访问的无序性以及KV长度的不同,每个Region上的KV会无规律地分散在内存上。Region执行了MemStore的Flush *** 作,再经过JVM GC之后就会出现零散的内存碎片现象,而进一步数据大量写入,就会触发Full-GC。
为了解决因为内存碎片造成的Full-GC的现象,RegionServer引入了MSLAB(HBASE-3455)。MSLAB全称是MemStore-Local Allocation Buffers。它通过预先分配连续的内存块,把零散的内存申请合并,有效改善了过多内存碎片导致的Full GC问题。 MSLAB的工作原理如下: 1、在MemStore初始化时,创建MemStoreLAB对象allocator。 2、创建一个2M大小的Chunk数组,偏移量起始设置为0。Chunk的大小可以通过参数hbasehregionmemstoremslabchunksize调整。 3、 当MemStore有KeyValue加入时,maybeCloneWithAllocator(KeyValue)函数调用allocator为其查找KeyValuegetBuffer()大小的空间,若KeyValue的大小低于默认的256K,会尝试在当前Chunk下查找空间,如果空间不够,MemStoreLAB重新申请新的Chunk。选中Chunk之后,会修改offset=原偏移量+KeyValuegetBuffer()length。chunk内控制每个KeyValue大小由hbasehregionmemstoremslabmaxallocation配置。 4、 空间检查通过的KeyValue,会拷贝到Chunk的数据块中。此时,原KeyValue由于不再被MemStore引用,会在接下来的JVM的Minor GC被清理。
MSLAB解决了因为碎片造成Full GC的问题,然而在MemStore被Flush到文件系统时,没有reference的chunk,需要GC来进行回收,因此,在更新 *** 作频繁发生时,会造成较多的Young GC。 针对该问题,HBASE-8163提出了MemStoreChunkPool的解决方案,方案已经被HBase-095版本接收。它的实现思路: 1、 创建chunk池来管理没有被引用的chunk,不再依靠JVM的GC回收。 2、 当一个chunk没有引用时,会被放入chunk池。 3、chunk池设置阈值,如果超过了,则会放弃放入新的chunk到chunk池。 4、 如果当需要新的chunk时,首先从chunk池中获取。 根据patch的测试显示,配置MemStoreChunkPool之后,YGC降低了40%,写性能有5%的提升。如果是095以下版本的用户,可以参考HBASE-8163给出patch。
先看一个标准的hbase作为数据读取源和输出目标的样例:
Configuration conf = HBaseConfigurationcreate();
Job job = new Job(conf, "job name ");
jobsetJarByClass(testclass);
Scan scan = new Scan();
TableMapReduceUtilinitTableMapperJob(inputTable, scan, mapperclass, Writableclass, Writableclass, job);
TableMapReduceUtilinitTableReducerJob(outputTable, reducerclass, job);
jobwaitForCompletion(true);
和普通的mr程序不同的是,不再用jobsetMapperClass()和jobsetReducerClass()来设置mapper和reducer,而用TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。此处的TableMapReduceUtil是hadoophbasemapreduce包中的,而不是hadoophbasemapred包中的。
数据输入源是hbase的inputTable表,执行mapperclass进行map过程,输出的key/value类型是 ImmutableBytesWritable和Put类型,最后一个参数是作业对象。需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数据用,其中scan可以配置参数。
数据输出目标是hbase的outputTable表,输出执行的reduce过程是reducerclass类, *** 作的作业目标是job。与map比缺少输出类型的标注,因为他们不是必要的,看过源代码就知道mapreduce的TableRecordWriter中write(key,value) 方法中,key值是没有用到的,value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。
mapper类从hbase读取数据,所以输入的
public class mapper extends TableMapper<KEYOUT, VALUEOUT> {
public void map(Writable key, Writable value, Context context)
throws IOException, InterruptedException {
//mapper逻辑
contextwrite(key, value);
}
}
mapper继承的是TableMapper类,后边跟的两个泛型参数指定mapper输出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob 方法指定的数据类型一致。该过程会自动从指定hbase表内一行一行读取数据进行处理。
reducer类将数据写入hbase,所以输出的
public class reducer extends TableReducer<KEYIN, VALUEIN, KEYOUT> {
public void reduce(Text key, Iterable<VALUEIN> values, Context context)
throws IOException, InterruptedException {
//reducer逻辑
contextwrite(null, put or delete);
}
}
reducer继承的是TableReducer类,后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个是 The type of the output key,write的时候可以把key写成IntWritable什么的都行,它是不必要的。这样reducer输出的数据会自动插入outputTable指定的表内。
TableMapper和TableReducer的本质就是为了简化一下书写代码,因为传入的4个泛型参数里都会有固定的参数类型,所以是Mapper和Reducer的简化版本,本质他们没有任何区别。源码如下:
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
}
封装了一层确实方便多了,但也多了很多局限性,就不能在map里写hbase吗?
我他么试了一下午,约5个小时,就想在map里读hdfs写hbase,莫名其妙的各种问题,逻辑上应该没有错,跟着别人的文章做的。最后还是通过IdentityTableReducer这个类实现了,what's a fucking afternoon!
官方对IdentityTableReducer的说明是:Convenience class that simply writes all values (which must be Put or Delete instances) passed to it out to the configured HBase table
这是一个工具类,将map输出的value(只能是Put或Delete)pass给HBase。看例子:
import javaioIOException;
import javautilStringTokenizer;
import orgapachehadoopconfConfiguration;
import orgapachehadoopfsPath;
import orgapachehadoopioIntWritable;
import orgapachehadoopioText;
import orgapachehadoopmapreduceJob;
import orgapachehadoopmapreduceMapper;
import orgapachehadoopmapreducelibinputFileInputFormat;
import orgapachehadooputilGenericOptionsParser;
import orgapachehadoophbaseHBaseConfiguration;
import orgapachehadoophbaseclientPut;
import orgapachehadoophbasemapreduceTableMapReduceUtil;
import orgapachehadoophbasemapreduceIdentityTableReducer;
public class WordCount
{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, Put>
{
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(valuetoString());
while (itrhasMoreTokens())
{
wordset(itrnextToken());
Put putrow = new Put(wordtoString()getBytes());
putrowadd("info"getBytes(), "name"getBytes(),"iamvalue"getBytes());
contextwrite(word, putrow);
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = HBaseConfigurationcreate();
String[] otherArgs = new GenericOptionsParser(conf, args)getRemainingArgs();
Job job = new Job(conf, "hdfs to hbase");
jobsetJarByClass(WordCountclass);
jobsetMapperClass(TokenizerMapperclass);
jobsetMapOutputKeyClass(Textclass);
jobsetMapOutputValueClass(Putclass);//important
FileInputFormataddInputPath(job, new Path(otherArgs[0]));
TableMapReduceUtilinitTableReducerJob("test", IdentityTableReducerclass, job);
jobsetNumReduceTasks(0);
Systemexit(jobwaitForCompletion(true) 0 : 1);
}
}
无论是什么方法吧,总算可以运行了!
MapReduce和HBase结合,似乎是这样一种框架:map读HBase,reduce写HBase。使用IdentityTableReducer就是处于这样一种框架之内。
运行 *** 作HBase的MapReduce程序的第2种方式:HADOOP_CLASSPATH的设置
在我《HBase *** 作》一文中提到了运行 *** 作HBase的MapReduce程序的两种方式,现在说明下另一种方式。
打开hadoop/etc/hadoop/hadoop-envsh,在设置HADOOP_CLASSPATH的后面添加下面的语句,即将hbase的jar包导入:
for f in /home/laxe/apple/hbase/lib/jar; do
if [ "$HADOOP_CLASSPATH" ]; then
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f
else
export HADOOP_CLASSPATH=$f
fi
done
然后就可以用最初的运行MapReduce的方式来运行了。
1、通过Configuration初始化集群Connection
11、Connction维持了客户端到整个HBase集群的链接,如一个HBase集群有2个Master、5个RegionServer,那么一般来说整个Connection会维持一个到Active Master的TCP连接和5个到ReginonServer的TCP链接。
12、通常一个进程只需要为一个独立的集群建立一个Connection即可,并不需要建立连接池。
13、Connection还缓存了访问的Meta信息,后续的大部分请求都可以通过缓存的Meta信息定位到对应的Region Server。
2、通过Connection初始化Table
21、Table是一个非常轻量级的对象,它所使用的连接资源、配置信息、线程池、Meta缓存等都来自于Connection。
22、由同一个Connection创建的多个Table,都会共享连接、配置信息、线程池、Meta缓存这些资源
23、在branch-1以及之前的版本中,Table并不是线程安全的类,所以不建议在多个线程中使用同一个Table实例。在HBase 200及之后,Table已经实现了线程安全。
24、由于Table是一个非常轻量级的对象,所以可以通过Connection为每个请求创建一个Table,但是记住,在该请求执行完毕之后需要关闭Table资源。
3、hbase:meta
31、hbase:meta用来保存整个集群的region信息
32、hbase:meta在HBase中保证始终只有一个Region,这是为了确保meta表多次 *** 作的原子性,因为HBase本质上只支持Region级别的事务<所谓Region级别的事务是指:当多个 *** 作落在同一个Region内时,HBase能保证这一批 *** 作执行的原子性。如果多个 *** 作分散在不同的Region,则无法保证这批 *** 作的原子性>
33、hbase:meta的一行就对应一个Region 它的rowkey主要由TableName、StartRow、TimeStamp、EncodeName、标识这个Region是属于哪个表,表Rowkey的起始行以及Region的创建时间戳
34、hbase:meta只有一个列簇<info>他有4列,info:regioninfo、info:seqnumDuringOpen、info:server、info:serverstartcode,表示这个表rowkey的起始位置,region落在哪个RegionServer上以及所在RegionServer的启动时间戳
4、HBase超时参数设置
hbaserpctimeout:表示单次RPC请求的超时时间,默认是60 000ms。
hbaseclinetretriesnumber:最多允许发生多少次RPC重试 *** 作默认是35次。
hbaseclinetpause:表示连续两次RPC重试之间的休眠时间,默认是100ms。重试休眠时间是按照随机退避算法设计的。也就是重试次数越多,休眠间隔时间就会越来越长。按照默认的重试次数35,则可能长期卡在休眠和重试两个步骤中
hbaseclinetoperationtimeout:表示单次API的超时时间,默认值为1 200 000ms一次API可能会有多次RPC重试,这个参数是API *** 作的总超时。
5、CAS<checkAndPut、inCrementColumnValue> *** 作是Region级别串行执行的,吞吐受限,在HBase 2x版本已调整设计,对于同一个Region内部的不同行可以并行执行CAS,这样大大提交了Region内部的CAS吞吐
6、Filter使用避坑指南
61、PrefixFilter 前缀过滤
低效使用方式:
Scan scan = new Scan();
scansetFilter(new PrefixFillter(BytestoBytes("def")));
这个Scan虽然能得到预期的效果,但是并不高效,因为对于rowKey在区间(-∞,def)的数据,会一条条扫描,发现前缀不为def,就读下一行,直到找到第一个rowkey为def的行为止
高效使用方式:
Scan scan = new Scan();
scansetStartRow(BytestoBytes("def"));
scansetFilter(new PrefixFillter(BytestoBytes("def")));
增加了一个startRow。RegionServer发现Scan设置了startRow,首先会寻址定位到startRow。这样就跳过了大量的(-∞,def)的数据。
最高效的使用方式:
Scan scan = new Scan();
scansetStartRow(BytestoBytes("def"));
scansetStopRow(BytestoBytes("deg"));
将PrefixFilter直接展开,扫描[def,deg)区间的数据,这样效率是最高的。
62、PageFilter:表有5个Region起始key为(-∞,1)、[1,2)、[2,3)、[3,4)、[4,+∞)每个Region 都有超过100条数据
错误的使用方式:
Scan scan = new Scan();
scansetStartRow(BytestoBytes("1"));
scansetStopRow(BytestoBytes("3"));
scansetFilter(new PageFilter(100))
这样写得出来的分页每页数据就会有200 条。但是明明设置了分页每页条数是100。原因是,它需要scan 2个Regionscan从一个region切换到另一个region之前的那个Filter的内部状态就无效了,新的region内部用的是一个全新的FilterFilter计数器被清零。Filter不是全局的, 所以它分别从2个region各查了100 条,总共200 条返回。
正确的使用方式:
如果想实现分页功能,可以不通过Filter而直接通过limit来实现。
Scan scan = new Scan();
scansetStartRow(BytestoBytes("1"));
scansetStopRow(BytestoBytes("3"));
scansetLimit(100);
所以对于用户来说,正常情况下PageFilter并没有太多的存在价值
63、SingleColumnValueFilter
使用方式:
Scan scan = new Scan();
SingleColumnValueFilter scvf = new SingleColumnValueFilter(BytestoBytes("family"),BytestoBytes("qualifier"),
CompareOpEQUAL,BytestoBytes("value"));
scansetFilter(scvf);
表面上是将列簇为family,列为qualifier,值为value的cell返回给用户,但事实上那些不包含family:qualifier的行也会默认返回给用户,如果用户不希望读取那些不包含family:qualifier的数据,需要设计如下scan
Scan scan = new Scan();
SingleColumnValueFilter scvf = new SingleColumnValueFilter(BytestoBytes("family"),BytestoBytes("qualifier"),
CompareOpEQUAL,BytestoBytes("value"));
scvfsetFiterIfMisssing(true);
scansetFilter(scvf);
另外当SingleColumnValueFilter设置为filterIfMisssing为true时,和其他Filter组合成FilterList时可能导致返回的结果不正确。建议是不要使用SingleColumnValueFilter与其他Filter组合成FilterList。 直接指定列,通过ValueFilter替换掉SingleColumnValueFilter
Scan scan = new Scan();
ValueFilter vf = new ValueFilter(CompareOfEQUAL,new BinaryComparatoe(BytestoBytes("value")));
scanaddColum(BytestoBytes("family"),BytestoBytes("qualifier"));
scansetFilter(vf);
7、HBase写入方式对比
71、tableput(Put):
每次执行都会执行一次RPC和磁盘持久化,写入吞吐受限于磁盘带宽、网络带宽,不会有数据丢失能保证put *** 作的原子性。
72、tableput(List<Put>):
客户端打包一批put提交,执行一次RPC,一次WAL。相比第一种省略了多次往返的RPC和磁盘持久化。但是时间会变长。如果打包的put分布在多个Region。则不能保证这一批put的原子性,应为HBase不支持跨Region的多行事务,失败的put会经历若干次重试。
73、bulk load:
将待写入的数据生成HFile,然后采用bulk load方式将HFile直接加载到对于的Region的CF内。这是一种完全离线的快速写入方式。它应该是最快的批量写入手段,同时不会对线上的集群产生巨大压力,在load完HFile之后,CF内部会进行Compaction,但是Compaction是异步的且可以限速,所以bulk load对线上集群非常友好。
使用场景举例:
731、两个集群互为主备,其中一个集群由存在数据丢失,想通过另一备份集群的数据来修复异常集群。最快的方式是:把备份集群的数据导一个快照拷贝到异常集群,然后通过copyTable工具扫快照生成HFile,然后bulk load 到异常集群,完成数据的修复。
732、当用户写入大量数据后,发现选择的split keys不合适,想重新选择split keys见表,这时也可以通过 snapshort生成HFile再bulk load的方式生成新表。
hbase的核心数据结构为LSM树。
LSM树分为内存部分和磁盘部分。
内存部分是一个维护有序数据集合的数据结构。一般来讲,内存数据结构可以选择平衡二叉树、红黑树、跳跃表(SkipList)等维护有序集的数据结构,由于考虑并发性能,HBase选择了表现更优秀的跳跃表。
磁盘部分是由一个个独立的文件组成,每一个文件又是由一个个数据块组成。对于数据存储在磁盘上的数据库系统来说,磁盘寻道以及数据读取都是非常耗时的 *** 作(简称IO耗时)。为了避免不必要的IO耗时,可以在磁盘中存储一些额外的二进制数据,这些数据用来判断对于给定的key是否有可能存储在这个数据块中,这个数据结构称为布隆过滤器(BloomFilter)。
LSM树介绍:
LSM树是一种磁盘数据的索引结构。LSM树的索引对写入请求更友好。因为无论是何种写入请求,LSM树都会将写入 *** 作处理为一次顺序写,而HDFS擅长的正是顺序写(且HDFS不支持随机写)。
一个LSM树的索引内存部分是一个ConcurrentSkipListMap,Key是rowkey、column family、qualifier、type以及timestamp, Value是字节数组。随着数据不断写入MemStore,一旦内存超过阈值会将数据flush到磁盘,生产HFile;多个小HFile文件会compact成一个大HFile。
以上就是关于HBase是什么呢,都有哪些特点呢全部的内容,包括:HBase是什么呢,都有哪些特点呢、深入理解HBASE(3.4)RegionServer-Memstore、如何用MapReduce程序 *** 作hbase等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)