从RocksDB到LevelDB:手把手教你用C++实现一个简易的LSM-Tree存储引擎 从RocksDB到LevelDB手把手教你用C实现一个简易的LSM-Tree存储引擎在当今数据爆炸式增长的时代高效可靠的存储引擎成为各类系统的核心组件。Google开源的LevelDB作为LSM-Tree(Log-Structured Merge-Tree)的经典实现以其出色的写入性能和紧凑的设计吸引了大量开发者。而Facebook基于LevelDB开发的RocksDB则通过引入多线程压缩、分层压缩等优化进一步提升了性能表现。本文将带您从零开始用C实现一个具备核心功能的简易LSM-Tree存储引擎深入理解其设计精髓。1. LSM-Tree核心设计解析LSM-Tree的核心思想是将随机写入转换为顺序写入从而大幅提升IO性能。其架构通常包含以下几个关键组件MemTable内存中的有序数据结构负责接收最新写入Immutable MemTable只读的内存表等待持久化到磁盘SSTable(Sorted String Table)磁盘上的有序数据文件WAL(Write-Ahead Log)预写日志确保数据持久性与传统B-Tree类存储引擎相比LSM-Tree在写入性能上通常有数量级的提升。下表展示了主要性能对比特性LSM-TreeB-Tree写入吞吐量极高中等读取延迟较高低空间放大较高低写放大中等高实现一个基础LSM-Tree引擎需要解决几个关键问题内存数据结构的选择与实现SSTable文件格式设计压缩(Compaction)策略崩溃恢复机制2. 内存表(MemTable)实现MemTable作为LSM-Tree的内存组件需要支持高效的插入和查询操作。LevelDB选择了跳表(SkipList)作为其实现相比平衡树有以下优势templatetypename Key, typename Value class SkipList { private: struct Node { Key key; Value value; std::vectorNode* forward; // 多层指针数组 Node(const Key k, const Value v, int level) : key(k), value(v), forward(level, nullptr) {} }; Node* head_; int max_level_; std::random_device rd_; std::mt19937 gen_; std::uniform_real_distribution dis_; public: SkipList() : max_level_(12), gen_(rd_()) { head_ new Node(Key(), Value(), max_level_); } // 随机生成节点层数 int RandomLevel() { int level 1; while (dis_(gen_) 0.5 level max_level_) { level; } return level; } void Insert(const Key key, const Value value) { std::vectorNode* update(max_level_, nullptr); Node* current head_; // 从最高层开始查找插入位置 for (int i max_level_ - 1; i 0; --i) { while (current-forward[i] ! nullptr current-forward[i]-key key) { current current-forward[i]; } update[i] current; } int level RandomLevel(); Node* newNode new Node(key, value, level); // 更新各层指针 for (int i 0; i level; i) { newNode-forward[i] update[i]-forward[i]; update[i]-forward[i] newNode; } } };跳表的平均时间复杂度为O(log n)虽然理论上不如平衡树的O(log n)稳定但实现简单且并发友好。在实际应用中还需要考虑线程安全问题LevelDB通过在外部加锁的方式保证线程安全。3. SSTable文件格式设计SSTable是LSM-Tree在磁盘上的持久化存储形式其核心特点是按键有序排列。一个完整的SSTable文件通常包含以下几个部分数据块(Data Blocks)存储实际的键值对元数据块(Meta Blocks)如布隆过滤器等索引块(Index Block)指向数据块的索引文件尾部(Footer)指向元数据块和索引块下面是一个简化的SSTable写入实现class SSTableBuilder { public: void Add(const Slice key, const Slice value) { if (block_.size() block_size_) { FlushBlock(); } // 记录前一个key的共享前缀长度节省空间 int shared 0; if (last_key_.size() 0) { shared PrefixMatch(last_key_, key); } int non_shared key.size() - shared; // 写入共享前缀长度、非共享长度和value长度 block_.append(EncodeVarint32(shared)); block_.append(EncodeVarint32(non_shared)); block_.append(EncodeVarint32(value.size())); // 写入非共享key部分和value block_.append(key.data() shared, non_shared); block_.append(value.data(), value.size()); last_key_.assign(key.data(), key.size()); } void Finish() { if (!block_.empty()) { FlushBlock(); } WriteIndex(); WriteFooter(); } private: void FlushBlock() { // 压缩并写入数据块 std::string compressed; Compress(block_, compressed); uint64_t offset writer_-Write(compressed); // 记录块位置信息用于构建索引 BlockHandle handle; handle.set_offset(offset); handle.set_size(compressed.size()); index_entries_.emplace_back(last_key_, handle); block_.clear(); } void WriteIndex() { // 构建并写入索引块 SSTableBuilder index_builder; for (const auto entry : index_entries_) { index_builder.Add(entry.key, entry.handle.Encode()); } index_builder.Finish(); } };注意实际实现中需要考虑校验和(Checksum)、压缩选项等细节这里做了适当简化。4. 压缩(Compaction)策略实现压缩是LSM-Tree的核心操作之一主要解决以下问题清理已删除的数据合并多个SSTable文件减少文件数量优化数据布局提高查询效率LevelDB采用分层压缩策略(Leveled Compaction)将SSTable分为多个层级Level文件大小限制文件数量限制L0无4L110MB10L2100MB100.........下面是一个简化的压缩过程实现class Compactor { public: Status Compact(Version* current, VersionEdit* edit) { // 选择需要压缩的文件 Compaction* c PickCompaction(current); if (c nullptr) return Status::OK(); // 执行压缩 Iterator* input current-MakeInputIterator(c); Status s DoCompactionWork(c, input); // 安装新版本 if (s.ok()) { InstallCompactionResults(c, edit); } delete c; return s; } private: Status DoCompactionWork(Compaction* c, Iterator* input) { std::string current_user_key; bool has_current_user_key false; SequenceNumber last_sequence_for_key kMaxSequenceNumber; while (input-Valid()) { Slice key input-key(); // 检查是否需要停止压缩(手动触发停止) if (compact_-ShouldStop()) { break; } // 处理相同key的不同版本 if (has_current_user_key user_comparator_-Compare(key, current_user_key) 0) { // 对于相同key只保留最新版本 if (last_sequence_for_key compact_-smallest_snapshot) { builder_-Add(key, input-value()); } last_sequence_for_key input-sequence(); } else { current_user_key.assign(key.data(), key.size()); has_current_user_key true; last_sequence_for_key input-sequence(); if (last_sequence_for_key compact_-smallest_snapshot) { builder_-Add(key, input-value()); } } input-Next(); } return builder_-Finish(); } };压缩策略的选择对性能影响很大RocksDB在LevelDB基础上引入了以下优化并行压缩利用多线程加速压缩过程分层压缩更精细地控制各层文件大小压缩优先级根据文件热度决定压缩顺序5. 崩溃恢复与WAL实现为确保数据安全LSM-Tree采用预写日志(WAL)机制。每次写入操作会先记录到日志文件再应用到MemTable。崩溃恢复时通过重放日志文件恢复数据。日志文件格式设计要点使用CRC32校验和检测数据损坏记录类型标识(如首记录、正常记录等)批量写入提高性能下面是一个简化的日志写入实现class LogWriter { public: Status AddRecord(const Slice record) { const char* ptr record.data(); size_t left record.size(); // 分片写入每个片段有头部信息 bool begin true; do { const int leftover kBlockSize - block_offset_; if (leftover kHeaderSize) { // 填充空位 if (leftover 0) { dest_-append(kZeroBytes, leftover); } block_offset_ 0; } const size_t avail kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length (left avail) ? left : avail; RecordType type; const bool end (left fragment_length); if (begin end) { type kFullType; } else if (begin) { type kFirstType; } else if (end) { type kLastType; } else { type kMiddleType; } Status s EmitPhysicalRecord(type, ptr, fragment_length); if (!s.ok()) return s; ptr fragment_length; left - fragment_length; begin false; } while (left 0); return Status::OK(); } private: Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { // 构造记录头 char buf[kHeaderSize]; buf[4] static_castchar(n 0xff); buf[5] static_castchar(n 8); buf[6] static_castchar(t); // 计算CRC32校验和 uint32_t crc crc32c::Value(ptr, n); crc crc32c::Extend(crc, buf 4, 3); // 扩展计算头部 EncodeFixed32(buf, crc32c::Mask(crc)); // 写入头部和记录内容 Status s dest_-Append(Slice(buf, kHeaderSize)); if (s.ok()) { s dest_-Append(Slice(ptr, n)); if (s.ok()) { s dest_-Flush(); } } block_offset_ kHeaderSize n; return s; } };提示实际实现中需要考虑日志轮转、批量提交等优化手段以进一步提升性能。6. 性能优化技巧在完成基础功能后可以考虑以下优化手段提升性能布隆过滤器为每个SSTable添加布隆过滤器加速查询class BloomFilter { public: void Add(const Slice key) { uint32_t h hash_func_(key); for (int i 0; i k_; i) { bits_[(h i * h) % bits_.size()] true; } } bool MayContain(const Slice key) const { uint32_t h hash_func_(key); for (int i 0; i k_; i) { if (!bits_[(h i * h) % bits_.size()]) { return false; } } return true; } };块缓存缓存热点数据块减少IOclass BlockCache { public: Slice* Get(const CacheKey key) { auto it cache_.find(key); if (it ! cache_.end()) { // 更新LRU队列 lru_list_.erase(it-second.lru_pos); lru_list_.push_front(key); it-second.lru_pos lru_list_.begin(); return it-second.data; } return nullptr; } void Insert(const CacheKey key, const Slice value) { if (cache_.size() capacity_) { // 淘汰最久未使用的数据 CacheKey old_key lru_list_.back(); lru_list_.pop_back(); cache_.erase(old_key); } lru_list_.push_front(key); cache_[key] {value, lru_list_.begin()}; } };多线程压缩利用多核CPU并行执行压缩任务class CompactionScheduler { public: void ScheduleCompaction(CompactionTask* task) { std::lock_guardstd::mutex lock(mutex_); pending_tasks_.push(task); cond_.notify_one(); } void WorkerThread() { while (true) { CompactionTask* task nullptr; { std::unique_lockstd::mutex lock(mutex_); cond_.wait(lock, [this]{ return !pending_tasks_.empty() || shutdown_; }); if (shutdown_) break; task pending_tasks_.front(); pending_tasks_.pop(); } ExecuteCompaction(task); } } };写入批处理合并小写入减少IO次数class WriteBatch { public: void Put(const Slice key, const Slice value) { rep_.push_back(static_castchar(kTypeValue)); PutLengthPrefixedSlice(rep_, key); PutLengthPrefixedSlice(rep_, value); count_; } void Clear() { rep_.clear(); count_ 0; } Status Execute(DBImpl* db) { Writer w(this); Status s db-Write(w); if (s.ok()) { s w.status; } return s; } };7. 测试与性能评估完成实现后需要设计全面的测试验证系统正确性和性能。主要测试类型包括功能测试基本CRUD操作迭代器功能快照隔离并发控制性能测试随机写入吞吐量随机读取延迟顺序扫描性能压缩对性能的影响稳定性测试长时间运行测试崩溃恢复测试边界条件测试(如大value、空key等)下面是一个简单的性能测试示例void RunBenchmark(DB* db, int num_entries) { // 写入测试 auto start std::chrono::high_resolution_clock::now(); for (int i 0; i num_entries; i) { std::string key key std::to_string(i); std::string value value std::to_string(i); db-Put(WriteOptions(), key, value); } auto end std::chrono::high_resolution_clock::now(); std::chrono::durationdouble elapsed end - start; std::cout Write throughput: num_entries / elapsed.count() ops/s\n; // 读取测试 start std::chrono::high_resolution_clock::now(); for (int i 0; i num_entries; i) { std::string key key std::to_string(i); std::string value; db-Get(ReadOptions(), key, value); } end std::chrono::high_resolution_clock::now(); elapsed end - start; std::cout Read throughput: num_entries / elapsed.count() ops/s\n; }在实际项目中建议使用更专业的基准测试工具如YCSB(雅虎云服务基准测试)进行更全面的评估。