LevelDB的批处理(Batch)
LevelDB的批处理(Batch)
主要是两个类,db/write_batch_internal.h
,include/leveldb/write_batch.h
,db/write_batch.cc
namespace leveldb {
class MemTable;
// WriteBatchInternal provides static methods for manipulating a
// WriteBatch that we don\'t want in the public WriteBatch interface.
// 一个batch的存储结构如下:
// seq(8字节)+count(后面kv的个数,占4个字节)+kvs
// 一个kv的存储结构
// type(1个字节)+key_len(4字节)+key+value_len(4字节)+value
class WriteBatchInternal {
public:
// Return the number of entries in the batch.
static int Count(const WriteBatch* batch);
// Set the count for the number of entries in the batch.
static void SetCount(WriteBatch* batch, int n);
// Return the sequence number for the start of this batch.
static SequenceNumber Sequence(const WriteBatch* batch);
// Store the specified number as the sequence number for the start of
// this batch.
static void SetSequence(WriteBatch* batch, SequenceNumber seq);
static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); }
static size_t ByteSize(const WriteBatch* batch) { return batch->rep_.size(); }
// 用content中的内容替换batch中的内容
static void SetContents(WriteBatch* batch, const Slice& contents);
// 将batch中的所有条目插入memtable中,他们此时用的是同一个序号
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
// 向batch中增加条目
static void Append(WriteBatch* dst, const WriteBatch* src);
};
} // namespace leveldb
namespace leveldb {
class Slice;
class LEVELDB_EXPORT WriteBatch {
public:
class LEVELDB_EXPORT Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Delete(const Slice& key) = 0;
};
WriteBatch();
// Intentionally copyable.
WriteBatch(const WriteBatch&) = default;
WriteBatch& operator=(const WriteBatch&) = default;
~WriteBatch();
// Store the mapping "key->value" in the database.
// 向batch中添加key-value
void Put(const Slice& key, const Slice& value);
// If the database contains a mapping for "key", erase it. Else do nothing.
// 向batch中添加一个删除的key,意思是类型为删除,只有key,没有value
void Delete(const Slice& key);
// Clear all updates buffered in this batch.
void Clear();
// The size of the database changes caused by this batch.
//
// This number is tied to implementation details, and may change across
// releases. It is intended for LevelDB usage metrics.
size_t ApproximateSize() const;
// Copies the operations in "source" to this batch.
//
// This runs in O(source size) time. However, the constant factor is better
// than calling Iterate() over the source batch with a Handler that replicates
// the operations into this batch.
// 将一个batch追加到末尾
void Append(const WriteBatch& source);
// Support for iterating over the contents of a batch.
// 遍历batch中的所有的key-value,然后挨个处理他们
Status Iterate(Handler* handler) const;
private:
friend class WriteBatchInternal;
// 头部是8字节的序号seq+类型和4字节的batch里面条目的数量,所以初始头部长度为12字节
std::string rep_; // See comment in write_batch.cc for the format of rep_
};
} // namespace leveldb
namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
WriteBatch::WriteBatch() { Clear(); }
WriteBatch::~WriteBatch() = default;
WriteBatch::Handler::~Handler() = default;
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(kHeader);
}
size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
// 获取一个字节的kv的类型
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
// 内部是先获取前4个字节的长度,然后读取后面的数据,将数据写入key和value
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
// 如果是删除操作,那么就只有key,没有value
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
EncodeFixed32(&b->rep_[8], n);
}
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}
void WriteBatch::Append(const WriteBatch& source) {
WriteBatchInternal::Append(this, &source);
}
namespace {
class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
} // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter;
// 设置序号
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
// 遍历batch中所有的kv,然后将他们插入memtable
return b->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size());
}
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= kHeader);
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}
} // namespace leveldb
版权声明:本文为gyyyl原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。