HBase(0.94.5)的Compact和Split源码分析
经过对比,0.94。5以后版本主要过程基本类似(有些新功能和细节增加)
一、 Compact
2.1. Compact主要来源
来自四个方面:1、Memstoreflush时;2、HRegionServer定期做Compaction Checker时;3、HBaseAdmin客户端发起的请求;4、CompactTool发起。
1) MemstoreFlush在flushRegion方法中有相关处理,整个方法主要逻辑:
A 对一个flush请求,判断不是META表且文件很多,则
1) 如果该请求等待了最长时间,则打印日志(Waited — ms on a compaction to clean up \’too many store files\’;waited long enough… proceeding with flush of),然后进行flush。
2) 如果是第一次从队列出列(getRequeueCount<=0),则打印日志(RegionXXX has too many store files; delaying flush up to XXX ms)。然后先看split请求是否成功,如果不能,就发送compact请求(compactSplitThread.requestCompaction)。并把flush的请求放入队列(等待时间为原1/100)
B 最终通过flushCache方法flush,然后checkSplit,若要split则requestSplit,否则如果flushCache的返回值是true,则requestCompact。
2) HRegionServer做CompactionChecker时,逻辑比较简单,首先检查文件数,看是否需要compact,否则检查store的isMajorCompaction,然后根据配置的优先级,进行判断来决定按最高的优先级进行major。(只要大于0,值越低,优先级越高,1是user-最高。如果小于等于0,则是storefile太多,大于region的文件数阻塞阀值,所以会阻塞region的写)
细节:store的isMajorCompaction。该方法首先检查major的周期是否到了,未到,就返回false;若到了,一种情况是不只一个文件,则打印日志(Major compaction triggered on store SSS; time since last majorcompaction MMM ms)返回true。另外一种情况只有一个文件,如果文件是major生成且ttl设置为有限,而上次紧缩的时间还没有到ttl这么长,就打印日志(Skipping major compaction of SSSS because one (major) compacted fileonly and oldestTime xx ms is < ttl=),返回false;否则只要ttl设置且超时了,打印日志(Majorcompaction triggered on store SSS, because keyvalues outdated; time since lastmajor compaction MMM ms)返回true。
3) HBaseAdmin通过接口compact发送请求,到具体的RegionServer,RegionServer直接向compactSplitThread发送requestCompaction。
4) CompactTool直接通过hdfs路径生成Region实例,创建Mock的Store,然后通过Store的compact方法进行。该方法类应在Region不提供服务的情况进行。
5) Major Compact的时机:
(1) 需紧缩的文件数等于Store的所有文件数,自动升级为主紧缩
filesToCompact.getFilesToCompact().size() ==this.storefiles.size()
(2) TTL超时(ttl设置情况下)
需紧缩的StoreFile中,有StoreFile的内容已经过期(检查修改时间最早的StoreFile即可)
Ø 系统的主紧缩时间
Ø Family设定的主紧缩时间
Ø ttl不是永久
Ø 对于单个StoreFile,有标记:isMajorCompaction表明是否由主紧缩生成。有最小和最大时间戳记录
(3) 手动,强制:forcemajor && priority == PRIORITY_USER
2.2. Compact过程:
1) 首先,调用Store.requestCompaction,该方法返回CompactRequest。具体处理如下:
2) 第一步,将所有StoreFile作为compact候选者,去除正在compact的文件(策略:所有比正在compact队列里面最新文件要老的都排除)。
3) 第二步,候选者由coprocessor处理,如果coprocessor覆盖了策略,以coprocessor的结果为准,返回结果。
否则第三步,采用系统算法选取(compactSelection方法,其中两个关键点,就是会直接删除过期的storefile,还有根据配置的最多最少文件进行选取,另开专题详述)。
第四步,将候选者放入正在compact的列表。最后判断,如果此次要compact该region的所有文件,则升级为major。设置priority后返回。
该步全程对fileCompacting对象加锁。
1. 接着,设置compactRequest的server为当前的regionserver,如果传入了priority参数,则设置为传入的值,然后调用Store的throttleCompact找到是large还是small的线程池。
2. 通过线程池,异步执行compact。Compact的过程如下:
执行CompactRequest的run方法:
第一步,调用Region的compact(不加锁,因为只和split冲突,而split在regionserver上是串行的)。首先检查个参数是否有效,检查和设置writestate(writesEnabled才可以继续),打印开始日志(Starting compaction on XX_Store in region YYYY [as an off-peakcompaction]),在MonitoredTask注册状态。
第二步,调用Store的compact,检查参数,检查fileCompacting是否包含这些文件(加锁检查),获取MaxSequenceId和Log相关对象,打印文件个数和大小(Starting compaction of NN file(s) in SSSS of RRR into tmpdir=…)
第三步,Store使用自己的compactor对象compact,先对每个file,取得reader后获取信息,包括bloom filter的keycount(kv对象);如果是major,计算所有文件最早的putTS,(如果debug,打印Compacting 文件名,keycount=,bloomtype=,size=,encoding=,earliestPutTs=<major才有>),获取compactKVMax和压缩算法。对每个文件,得到一个StoreFileScanner,找到所有scanner的getSmallestReadPoint,用coprocessor的preCompactScannerOpen进行处理,得到或者新建一个StoreScanner(首先通过ttl、bloom filter、timerange过滤一些scanner,其次定位每个scanner到startKey,通过KeyValueHeap将scanners连在一起,通过key比较和heap机制内部实现了对外scan时key的顺序访问),对临时文件建一个writer,通过StoreScanner获取数据writer写入。
第四步,Store的completeCompact方法将新文件放入family的目录,正常打开,通知监听storefile更改的对象,打印debug信息(Removing store files after compaction…),通过HFileArchiver.archiveStoreFiles对原hfile归档,如果fs为null,直接删除这些文件。
第五步,计算新store的大小,打印完成信息(Completed [major] compaction of NN file(s) in SSS of RRR intoSF_PATH,…total size is )。
CompactRequest会打印Completed/compaction: CRCRCR ; duration=,然后如果成功要判断是不是要split。
二、 SPLIT
2.1. split来源:
1、Memstore flush时直接CompactSplitThread.requestSplit;2、HBaseAdmin客户端发起的请求,HRegionServer收到后,转CompactSplitThread.requestSplit处理;
2.2. split过程
1) 如果是HBaseAdmin发起,在HRegionServer的splitRegion方法中,首先检查region.flushcache()和region.forceSplit(设置force为true,并设置splitPoint),然后region.checkSplit(),在checkSplit()中主要处理包括:
(1) 检查是否元数据表
(2) 根据split策略判断是否应当split,默认IncreasingToUpperBoundRegionSplitPolicy下的机制(检查region的shouldForceSplit,如果是hbaseadmin发起就直接返回可以split的true。否则继续:获得这个RegionServer上这个表的region个数,如果为0,则“Store阀值”设为table的maxFileSize,否则根据公式min(r^2*flushSize,maxFileSize)计算,接着检查每个store的canSplit,如果存在文件引用,返回false,否则有store的大小大于“Store阀值”的,debug<ShouldSplitbecause familyName size=SSS,sizeToCheck=CCC, regionsWithCommonTable=RRRR>),
(3) 或者在ConstantSizeRegionSplitPolicy下(检查region的shouldForceSplit,如果是hbaseadmin发起就会得到true;然后通过每个store的canSplit,检查是否存在文件引用,如果存在就不能split,否则如果shouldForceSplit为true,或者store的size大于maxFileSize,都返回true)
(4) 获得splitPoint,通过RegionSplitPolicy的getSplitPoint方法获取,主要实现都在RegionSplitPolicy中:首先如果this.region.getExplicitSplitPoint(一般是hbaseAdmin发起设置的才有)有设置,直接返回结果。否则检查每个store的getSplitPoint(进行一些完整性验证,找到最大的storeFile,通过storeFile的Reader的midKey()<会通过HFileReaderV2的midKey()调用HFileBlockIndex的midKey获取>,得到hfile的中间块的第一个KeyValue,如果这个KV不是storeFile.Reader的第一个和最后一个KV<debug: cannot split because midkey is the same as first or lastrow>,就返回其rowKey为结果),将最大的store的splitPoint返回。另外还有IncreasingToUpperBoundRegionSplitPolicy的两个子类,实现了这个方法,DelimitedKeyPrefixRegionSplitPolicy保证以分隔符前面的前缀为splitPoint,保证相同RowKey前缀的数据在一个Region中;KeyPrefixRegionSplitPolicy保证具有相同前缀的row在一个region中(要求设计中前缀具有同样长度)
(5) 检查得到的splitKey是否在region中,如果不在,会抛异常:Requestedrow out of range for calculated split on HRegion …startKey=,endKey=,row=….
2) CompactSplitThread在requestSplit会使用线程池运行split,并最终由SplitRequest的run方法执行:如果最终传入的midKey为null,debug:Region RRRRnot splittable because midkey=null
(1) 首先新建一个SplitTransaction,其中splitdir=父region的path/.splits
(2) SplitTransaction.prepare,第一步检查是否可split(reference、closing、closed),第二步检查splitrow(原midKey)是否null,检查splitrow是否与startKey相同或者不在region中:info:Split row isnot inside region key range or is equal to startkey:。。,然后返回false。第三步计算子region的ts,如果ts比父region的还小,warn:Clock skew; parent regions id is。。。,but current timehere is,但将父region的ts+1,最后一步建立regionInfo对象:hri_a和hri_b
(3) SplitTransaction.execute,真正进行split的地方,第一步:createDaughters,第二步openDaughters,第三步transitionZKNode。
a) createDaughters,info:Starting splitof region RRR。这里面test部分的分析略去。在zk上createNodeSplitting以标识父region在splitting,journal中加入开始split的日志,将zknode的状态从splitting改到splitting,创建split的目录splitdir。然后,关闭父region(等待flush和compact完成、如果需要进行preflush,获得writelock后,flush,使用一个名为StoreCloserThread-R_NAME的线程池并发地关闭store,设置closed标志,设置status),其中Store的关闭:(先将storefiles作为result,并将原storefiles置为空<避免Metrics中出现>,同样用线程池关闭storefile)。
疑问:// Disable compacting and flushing by background threads for this region.谁是back thread
a.1 如果关闭父region没有发生异常,且hstoreFilesToSplit为null,则说明有其他线程在关闭region,抛出closedByOtherException,否则打印:No store files to split for the region…。否则:从service中remove:services.removeFromOnlineRegions。
a.2 在父region的splits目录下splitStoreFiles:有多少file就创建多大的线程池,对每一个文件,用StoreFileSplitter进行处理。如果超过fileSplitTimeout的时限,则抛出异常:Took too long to split the files and create the references, abortingsplit。StoreFileSplitter中根据familyName等信息,生成Reference,并保持在splitdir/[daughter]_EncodedName/familyName/storefileName.RegionEncodedName,内容包括是文件的高半部分内容(bottom),还是底半部分,另外还有splitRow的最小可能rowkey。
a.3 创建Region A,创建HRegion对象,将请求数/2分配给A,将hdfs上splitdir上的ref文件move到A的正常目录下。
a.4 同样方式创建B。
a.5 MetaEditor.offlineParentInMeta。建立一个Put:将offline/split设为hregionInfo,将A和B的信息放入该行,put到META表中。
b) openDaughters,(Performtime consuming opening of the daughter regions)
如果RS正在停止,INFO:Not opening daughtersXXX and YYY because stopping=。。。stop=。。。开启两个DaughterOpener hasThread对象(thread名字为:[SERVERNAME]-daughterOpener=[encodedName]),并启动线程。线程执行内容为:openDaughterRegionà daughter.openHRegionà initialize àinitializeRegionInternals,大多流程都在其中,见下面细述。
b.1 首先checkRegioninfoOnFilesystem检查hdfs上是否有HRegionInfo记录文件;cleanupTmpDir从hdfs上删除父region的.tmp目录;getStoreOpenAndCloseThreadPool(hbase.hstore.open.and.close.threads.max);
b.2 然后并行打开storefileàinstantiateHStore(new Store),并对每个store的maxSeqId和MaxMemstoreTS进行处理,找到最小的maxSeqId和最大的MaxMemstoreTS(seqId会作为edits的名字),接着mvcc.initialize。
b.3 接着replayRecoveredEditsIfAny:{HLog.getSplitEditFilesSorted;isZeroLengthThenDelete,WARN:< File [LOGFILE] is zero-length, deleting. >;replayRecoveredEdits——如果出错,检查“hbase.skip.errors”,确定是否HLog.moveAsideBadEditsFile;this.rsAccounting.clearRegionReplayEditsSize ;所有的edits处理完后,如果replay了数据(seqId判断),则internalFlushcache;最后删除edits文件,返回其中最大seqId};
b.4 之后cleanupAnySplitDetritus:cleanupDaughterRegion;cleanupSplitDir;对于daughter A已经创建完成,正在创建B的过程中rs死掉,没有处理;
b.5 再删除.merges这个目录(前文没有这个)
b.6 最后设置一些Region的读写权限、split策略
最后openDaughters调用 services .postOpenDeployTasks(s.hasReferences()或s.needsCompaction()会请求一次compact,INFO: Donewith post open deploy task for region=)à services.addToOnlineRegions
c) transitionZKNode,首先在zk上将该region的SPLITTING状态的node转换为SPLIT状态(转换可能失败有三个原因:splitting node不存在、不是splitting状态、版本错误),然后检查该node是否存在(master会处理该情况,并删除该节点,当tickleNodeSplit返回-1),每100ms检查一次,并且每10次打印(debug:Still waiting on the master to process the split for…)
(4) 如果异常,rollback
(5) 如果其他异常或者rollback过程异常,checkFileSystem
(6) Master的AssignmentManager.handleRegion会监控zk的split相关node 的状态变化:caseRS_ZK_REGION_SPLIT:会做一些状态检查的处理,细节和整体状态变化有关系;如果收到split消息,但regionstate为null,则可能打印(强制master这边信息offline都不可行,则warn:Received SPLIT for region… from server …, but it doesn\’t existanymore…。如果强制offlineok,INFO:Received SPLIT for region… from server …, but region was not firstin SPLITTING state; continuing。然后使用SplitRegionHandler监控split过程(循环检查split node状态,来确定是否成功,成功会删除parent的split node)。如果出NoNodeEx异常会DEBUG:The znode XXX does not exist. May be deleted already.如果是其他异常mater会退出:master.abort(Error deleting SPLIT node in ZK for transition ZK node( …”
三、 影响运行的相关参数
2.1. 紧缩参数
• hbase.hstore.compaction.ratio(1.2)
– 通常发生当file size <= sum(smaller_files) * ratio 时将会minor
– 默认1.2F
• hbase.hstore.compaction.ratio.offpeak(5.0)
– 默认5.0F
– 非高峰时期的ratio,需要定非高峰
• hbase.offpeak.start.hour(-1)
• hbase.offpeak.end.hour(-1)
• hbase.hstore.compaction.min.size(memstoresize)
– 无条件紧缩比这个size小的文件
• hbase.hstore.compaction.max.size(Long.Max)
– 从不紧缩比这个size大的文件(除非正在spliting?)
• hbase.hstore.compaction.min(3)
– minor compact时,最少需要的文件个数,>2
– hbase.hstore.compactionThreshold(老版本的配置)
• hbase.hstore.compaction.max(10)
– 紧缩时,最多的文件数
• hbase.store.delete.expired.storefile(true)
– 默认true,删除ttl到期的storefile
– 当不是强制主紧缩的时候生效
• hbase.hstore.compaction.kv.max(10)
– Compact每次scan最大kv个数
• hbase.hstore.compaction.complete(true)
– 完成后是否move文件到适当位置等并执行coprocessor
• hbase.hregion.majorcompaction(24h)
– 主紧缩检查的周期,到期时会检查文件的ttl是否超时
• hbase.hregion.majorcompaction.jitter(20%-4.8h)
– 检查主紧缩时,避免主紧缩风暴的随机百分比
• hbase.regionserver.thread.compaction.throttle
– 判断是进行large和small compaction的regijon大小阀值
– 默认2 * this.minFilesToCompact * this.region.memstoreFlushSize
– hbase.server.thread.wakefrequency(10,000ms)
– 紧缩检查周期*hbase.server.thread.wakefrequency.multiplier
• hbase.regionserver.compactionChecker.majorCompactPriority(Integer.MAX)
– 紧缩检查时,主紧缩的优先级
• hbase.regionserver.thread.compaction.large(1)
– 大紧缩的线程数,>0
• hbase.regionserver.thread.compaction.small(1)
– 小紧缩的线程数,>0
2.2. 破裂参数:
• hbase.server.thread.wakefrequency(10*1000ms)
– 这里是从flushQueue中获取可以flush的entry超时时间
• hbase.regionserver.regionSplitLimit(Integer.MAX)
– 一个server上最大的region个数,大于此,可以不破裂
• hbase.regionserver.thread.split(1)
– split的最大线程数
• hbase.regionserver.fileSplitTimeout(30000)
– split超时时间
– IOE:Took toolong to split the files and create the references, aborting split
• hbase.hstore.report.interval.edits(2000)
– replay edit时检查是否报告的间隔
• hbase.master.assignment.timeoutmonitor.timeout(18000)
– Master的timeout,实际上,master会循环等待这么长时间。
• hbase.hstore.report.period(9000)
– 报告的时间间隔Master的timeout的一半
2.3. 破裂策略:
可以在创建表的时候进行定义
• IncreasingToUpperBoundRegionSplitPolicy
– 根据公式min(r^2*flushSize,maxFileSize)确定split的maxFileSize
• ConstantSizeRegionSplitPolicy
– maxFileSize决定split大小
• DelimitedKeyPrefixRegionSplitPolicy
– 保证以分隔符前面的前缀为splitPoint,保证相同RowKey前缀的数据在一个Region中
• KeyPrefixRegionSplitPolicy
– KeyPrefixRegionSplitPolicy.prefix_length
– 以长度作为标准,保证相同RowKey前缀的数据在一个Region中
– 要求设计中前缀具有同样长度