1. LevelDB 简介
LevelDB 是 Google 开源的一款单机 KV 持久存储引擎,可以看作是BigTable 的单机版本。通过 LSM 树(Log Structured Merge Tree),LevelDB 牺牲了一定的读性能,但提高了写性能。对外 LevelDB 提供了常用的 Put
、Get
、Delete
等接口,还支持 Snapshot、Recovery 等操作。
本篇文章主要分析 LevelDB 的 compaction 过程以及分层设计的思想,对于 LevelDB 不熟悉的同学,可以先查看其他资料。
2. LSM 树在 LevelDB 中的应用
我们知道直接在磁盘中进行随机写性能是比较低的,那么为了获得较好的随机写性能,LevelDB 使用 LSM 树来组织数据。我们首先看下写流程:
当有一条记录要写入磁盘时,LevelDB 首先会将本次写操作 Append 到日志中(相当于顺序写),然后把这条记录插入内存中的 Memtable 中(Memtable 底层使用 skiplist 来保证记录有序),可见 LevelDB 将随机写转化为了顺序写。
Memtable 达到一定大小时,就会被 dump 到磁盘,这里有两个原因:1. 内存是有限的,不能将数据都放在内存中;2. 断电后内存中的数据就会全部丢失,虽然有日志可以恢复数据,但每次都恢复全部的数据显然不现实。
Memtable 一般情况下会被 dump 到 level 0,level L 中 tables 总大小超过一定大小时,LevelDB 就会在 level L 中选取一些 table 与 level L+1 中有 key 重叠的 table 进行 compaction,生成新的 table。这里我们思考一下为什么需要设计多层 level?
如果只有 level 0,随着 Memtable 不断被 dump 到磁盘上,level 0 上的 table 数量就会越来越多,并且这些 tables 的 key 存在相互重叠的情况。每次读操作时,如果在 Memtable 中没有找到数据,就需要在所有 tables 中进行查找,显然读性能受到了严重影响。
为了解决这一问题我们可以考虑限制 level 0 上 tables 的数量,同时增加一个 level 1,当 level 0 上 tables 数量超过一定大小时,就选取一些相互重叠的 tables 将其合并到 level 1,这样 level 1 中 tables 的 key 是相互不重叠的。此时我们最多只需要读 N+1 个文件,而 N 是比较小的,这样就达到了较好的读性能。
从上面的分析中似乎只要设计两层 level 就可以了,那为什么 LevelDB 还要多层的设计呢?设想一下随着 compaction 次数的增加,level 1 上的 tables 数量也会变得很多,这样极端情况下,level 0 上的一个 table 的 key range 可能会和 level 1 上所有tables 相重合,这样在进行 compaction 时就需要合并 level 1 上所有的 table,IO 开销太大。所以为了降低 compaction 时的 IO 开销,需要进行分层的设计。
3. compaction 过程源码分析
3.1 compaction 触发时机
读写数据时都会触发 compaction ,我们首先看下写数据时的触发流程:
写数据时主要调用 DBImpl::Write()
接口,在 Write()
接口中会调用 MakeRoomForWrite()
接口,我们看下这个接口的内部逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
Status DBImpl::MakeRoomForWrite(bool force) { //写数据时该值为false mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; //true Status s; while (true) { if (!bg_error_.ok()) { // Yield previous error s = bg_error_; break; } else if ( allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { //L0文件达到8个时会减慢写入速度,延迟1ms // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each // individual write by 1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); env_->SleepForMicroseconds(1000); allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { // memtable的大小还没有达到指定size,可以直接往里写数据 // There is room in current memtable break; } else if (imm_ != nullptr) { // 走到这一步说明memtable中已经没有空间可以写数据了,需要将memtable转化为immutable // 但是immutable不为空,表明后台正在进行compact,需要等待compaction完成再进行转化 // We have filled up the current memtable, but the previous // one is still being compacted, so we wait. Log(options_.info_log, "Current memtable full; waiting...\n"); background_work_finished_signal_.Wait(); } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { // level 0上的文件数过多,需要compaction // There are too many level-0 files. Log(options_.info_log, "Too many L0 files; waiting...\n"); background_work_finished_signal_.Wait(); } else { //走到这一步表明可以将memtable转化为immutable了 // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* lfile = nullptr; s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); //生成一个新的log文件 if (!s.ok()) { // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); break; } delete log_; delete logfile_; logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; // 将memtable转化为immutable has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_); //生成一个新的memtable mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); // 判断要不要进行compaction } } return s; } |
可见这个函数的主要作用是判断 Memtable 中有没有空间可写,当没有空间时将 Memtable 转化为 Immutable,最后调用MaybescheduleCompaction()
函数判断要不要进行 compaction,这个函数最终会调用到BackgroundCompaction()
,在这个函数中进行具体的 compaction 逻辑,那接下来我们就看下触发 compaction 的具体逻辑吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); if (imm_ != nullptr) { //如果immutable不为空,需要将immutable dump到level 0 CompactMemTable(); return; } Compaction* c; bool is_manual = (manual_compaction_ != nullptr); // 默认为null,除非手动执行compaction InternalKey manual_end; if (is_manual) { ManualCompaction* m = manual_compaction_; c = versions_->CompactRange(m->level, m->begin, m->end); m->done = (c == nullptr); if (c != nullptr) { manual_end = c->input(0, c->num_input_files(0) - 1)->largest; } //... } else { c = versions_->PickCompaction(); // 选取需要进行compaction的level和sst } Status status; if (c == nullptr) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // trivial compaction,直接将当前level上需要compaction的sst移动到下一层,不需要进行合并操作 // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; //... } else { // 一般的compaction操作 CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); // 真正进行compaction的地方 if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); } delete c; //... } } |
从上面可以看出 LevelDB 中的 compaction 主要分为三种
- Memtable 的 compaction
- Trivial compaction
- 一般的 compaction
3.2 选取需要被 compaction 的 sst
我们直接分析一般 compaction 的过程,前两种 compaction 都是在一般 compaction 的基础上实现的。
首先看下 LevelDB 是如何选取要被 compaction 的 sst 的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
Compaction* VersionSet::PickCompaction() { Compaction* c; int level; const bool size_compaction = (current_->compaction_score_ >= 1); const bool seek_compaction = (current_->file_to_compact_ != nullptr); if (size_compaction) { // size_compaction的值主要根据level中sst的总大小计算出的,也就是说当某一level中tables的总大小超过一定限制时,就需要被compaction到下一层,保证这一层上tables的数量不会太大 //具体的计算方式为: /*if (level == 0) { score = v->files_[level].size() // level 0上根据文件数进行计算 static_cast<double>(config::kL0_CompactionTrigger); } else { // 其他level根据sstable大小计算 const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); }*/ level = current_->compaction_level_; assert(level >= 0); assert(level+1 < config::kNumLevels); c = new Compaction(options_, level); // Pick the first file that comes after compact_pointer_[level] // compact_pointer_是指level上开始compaction的最小key,下一次compaction从这个key开始 for (size_t i = 0; i < current_->files_[level].size(); i++) { FileMetaData* f = current_->files_[level][i]; if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { // 找到第一个需要被compact的sst,加入到inputs_[0], inputs_[0]中存储的是要被合并到下一层的sst文件 c->inputs_[0].push_back(f); break; } } if (c->inputs_[0].empty()) { // 没有找到就合并第一个文件 c->inputs_[0].push_back(current_->files_[level][0]); } } else if (seek_compaction) { // 当sst文件被访问的次数超过allow_seeks,就会触发seek_compaction.我理解leveldb进行 //seek_compaction的原因是这样的:一个sst被访问了太多次有可能是每次get都会查找这个sst, //但是又没有找到,那不如把这个sst合并到下一层,这样下次就不用做无用的查找了 level = current_->file_to_compact_level_; c = new Compaction(options_, level); c->inputs_[0].push_back(current_->file_to_compact_); } else { return nullptr; } c->input_version_ = current_; c->input_version_->Ref(); // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { // 对于level 0, 由于sst中的key相互重叠,所以我们要将所有相互重叠的sst都选出来 InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); assert(!c->inputs_[0].empty()); } SetupOtherInputs(c); // 根据inputs_[0]找到level+1 上有哪些sst与inputs_[0]中的sst相重叠,并放在inputs_[1]中 return c; } |
3.3 compaction 的具体过程
找到了需要进行 compaction 的 sst,下面我们再来看下具体的 compaction 过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions //... if (snapshots_.empty()) { compact->smallest_snapshot = versions_->LastSequence(); } else { compact->smallest_snapshot = snapshots_.oldest()->sequence_number(); } // Release mutex while we're actually doing the compaction work mutex_.Unlock(); // 这里生成一个MergingIterator,相当于在遍历要合并的sst文件时,同时进行多路归并排序 // MergingIterator内部维护了n个Iterator,每个Iterator指向一个sst,进行迭代时,MergingIterator // 会找所有Iterators所指key中的最小那个,这样就完成了多路归并排序 Iterator* input = versions_->MakeInputIterator(compact->compaction); input->SeekToFirst(); Status status; ParsedInternalKey ikey; std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work if (has_imm_.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); if (imm_ != nullptr) { CompactMemTable(); // memtable到level 0上的compaction优先级比较高 // Wake up MakeRoomForWrite() if necessary. background_work_finished_signal_.SignalAll(); } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); } Slice key = input->key(); if (compact->compaction->ShouldStopBefore(key) && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } // Handle key/value, add to state, etc. bool drop = false; if (!ParseInternalKey(key, &ikey)) { // key解析出错,直接drop // Do not hide error keys current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; } else { if (!has_current_user_key || user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0) { // First occurrence of this user key // key第一次出现 current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; } if (last_sequence_for_key <= compact->smallest_snapshot) { // 出现了key值相同但sequence num比当前key小的情况,直接drop // Hidden by an newer entry for same user key drop = true; // (A) } else if (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // 该key已经被标记删除 // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers // (3) data in layers that are being compacted here and have // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; } last_sequence_for_key = ikey.sequence; } // ... if (!drop) { // 如果这个key不需要被删除,就把这个key加入到新的sst中 // Open output file if necessary if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); // 创建sst,用于后面存放数据,同时将新创建的文件加入到compact->outputs中,outputs域存放了新生成的sst if (!status.ok()) { break; } } if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value()); // 将key加入到sst中 // Close output file if it is big enough if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { // sst文件大小达到了限度大小,就dump到磁盘 status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } } input->Next(); // 继续迭代 } //... CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; for (int which = 0; which < 2; which++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) { stats.bytes_read += compact->compaction->input(which, i)->file_size; } } for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } mutex_.Lock(); stats_[compact->compaction->level() + 1].Add(stats); if (status.ok()) { status = InstallCompactionResults(compact); // 将新生成的sst 加入到Version中 } //... return status; } Status DBImpl::InstallCompactionResults(CompactionState* compact) { mutex_.AssertHeld(); // ... // compaction完成后被合并的sst在新版本中就没有用了,将这些文件加入到VersionEdit的deleted_files_中 compact->compaction->AddInputDeletions(compact->compaction->edit()); const int level = compact->compaction->level(); for (size_t i = 0; i < compact->outputs.size(); i++) { // 将新生成的sst文件加入到VersionEdit的new_files_中 const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( level + 1, out.number, out.file_size, out.smallest, out.largest); } // LogAndApply会根据VerionEdit中deleted_files_和new_files_生成一个新的Version return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } |
可见 compaction 过程主要将需要合并的 sst 按 key 值顺序生成新的 sst,同时删除标记为 delete 的 key,最后将新的 sst 加入到 Version 中,完成合并。
3.4 如何生成新的 Version
每次合并完成后都会生成一个新的 Version,新 Version= 当前 Verson + VersionEdit。VersionEdit 中保存了要被删除和添加的 sst,通过 LogAndApply 生成新的 Version:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { //... Version* v = new Version(this); // 首先生成一个新的Version { Builder builder(this, current_); builder.Apply(edit); // 将要增加和删除的文件加入到build中 builder.SaveTo(v); //将这些变化保存到新的Version中 } Finalize(v); //... return s; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
// Apply all of the edits in *edit to the current state. void Apply(VersionEdit* edit) { // Update compaction pointers // 更新每层compact起始位置 for (size_t i = 0; i < edit->compact_pointers_.size(); i++) { const int level = edit->compact_pointers_[i].first; vset_->compact_pointer_[level] = edit->compact_pointers_[i].second.Encode().ToString(); } // Delete files const VersionEdit::DeletedFileSet& del = edit->deleted_files_; for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin(); iter != del.end(); ++iter) { // 将每一层需要被删除的文件加入到deleted_files中 const int level = iter->first; const uint64_t number = iter->second; levels_[level].deleted_files.insert(number); } // Add new files for (size_t i = 0; i < edit->new_files_.size(); i++) { const int level = edit->new_files_[i].first; FileMetaData* f = new FileMetaData(edit->new_files_[i].second); f->refs = 1; // We arrange to automatically compact this file after // a certain number of seeks. Let's assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. // 在这里更新allowed_seeks, 主要用于seek_compaction f->allowed_seeks = (f->file_size / 16384); if (f->allowed_seeks < 100) f->allowed_seeks = 100; // 将每层需要加入的文件保存到added_files中 levels_[level].deleted_files.erase(f->number); levels_[level].added_files->insert(f); } } // Save the current state in *v. void SaveTo(Version* v) { BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; // 遍历每一层 for (int level = 0; level < config::kNumLevels; level++) { // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const std::vector<FileMetaData*>& base_files = base_->files_[level]; // 找到当前Version中各个level的sst文件 std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();//迭代 std::vector<FileMetaData*>::const_iterator base_end = base_files.end(); const FileSet* added = levels_[level].added_files; v->files_[level].reserve(base_files.size() + added->size()); // 扩展当前层的空间,因为有新的文件要加入了 for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end(); ++added_iter) { // 迭代要加入的sst,依次加入到当前level // Add all smaller files listed in base_ for (std::vector<FileMetaData*>::const_iterator bpos = std::upper_bound(base_iter, base_end, *added_iter, cmp); base_iter != bpos; ++base_iter) { // 这里先将原来version中比added_iter所指sst小的sst加入到新的version,再把added sst加入新verion,这样可以保证version中每一层保存的sst都是有序的。方便查找。 // 比如原来version中sstkey范围是 [1, 3] [6, 7] [9,10], 现在added sst的key范围是[4, 5],那sst的加入顺序为: // 1. 将[1, 3]加入到新version MaybeAddFile(v, level, *base_iter); } //2. 将[4, 5]加入到新version MaybeAddFile(v, level, *added_iter); } //3. 将[6, 7] [9,10]加入新verison for (; base_iter != base_end; ++base_iter) { MaybeAddFile(v, level, *base_iter); } // ... } } // MaybeAddFile中执行实际的加入操作,但会过滤需要被删除的文件 void MaybeAddFile(Version* v, int level, FileMetaData* f) { if (levels_[level].deleted_files.count(f->number) > 0) { // File is deleted: do nothing } else { std::vector<FileMetaData*>* files = &v->files_[level]; if (level > 0 && !files->empty()) { // Must not overlap assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest, f->smallest) < 0); } f->refs++; files->push_back(f); } } }; |
至此,LevelDB 的 compaction 流程就分析完了。
本文作者:张迪