mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
933 字
3 分钟
综合实战
2025-09-02

Put("key1", "value1")——这行代码背后发生了什么?WAL 先写还是 MemTable 先写?MemTable 满了怎么切换?SSTable 的索引怎么组织?Compaction 什么时候触发、怎么合并?Bloom Filter 的位数组怎么计算?前 18 章我们拆解了这些机制,现在把它们组装成一个能跑的引擎。

用 Go 从零实现一个迷你 LSM 存储引擎——WAL、MemTable、SSTable、Bloom Filter、Compaction,代码量 800 行以内,每个组件都是真实存储引擎的简化版本。写完之后,RocksDB 的源码就不再是黑盒了。

一、设计概述#

1.1 我们要构建什么#

用 Go 实现一个名为 MiniLSM 的迷你 LSM 存储引擎,包含以下核心组件:

graph TB subgraph "MiniLSM 架构" PUT["Put(key, value)"] --> WAL["WAL<br/>预写日志"] PUT --> MEM["MemTable<br/>跳表"] GET["Get(key)"] --> MEM GET --> IMMU["Immutable<br/>MemTable"] GET --> BF["Bloom Filter"] BF --> SST["SSTable<br/>L0-L2"] MEM -->|"满"| IMMU IMMU -->|"Flush"| SST SST -->|"Compaction"| SST2["SSTable<br/>合并后"] end style WAL fill:#fff9c4,stroke:#f9a825 style MEM fill:#e3f2fd,stroke:#1565c0 style BF fill:#c8e6c9,stroke:#2e7d32 style SST fill:#e1bee7,stroke:#6a1b9a

1.2 功能范围#

功能实现说明
Put(key, value)写入键值对
Get(key)读取键值对
Delete(key)删除键(墓碑标记)
Scan(start, end)范围扫描
WAL崩溃恢复
MemTable跳表实现
SSTable数据块 + 索引块
Bloom Filter减少无效查找
Leveled CompactionL0→L1→L2
Block Cache跳过(简化)
事务跳过(简化)
压缩跳过(简化)
Note

MiniLSM 是教学目的的实现,不是生产级存储引擎。它省略了并发控制、压缩、缓存等关键特性,但保留了 LSM 树的核心设计思想。理解 MiniLSM 后,阅读 RocksDB 源码会轻松很多。

1.3 核心数据结构#

package minilsm
import "time"
// Key 和 Value 类型
type Key = []byte
type Value = []byte
// Command 类型
type CommandType byte
const (
CommandPut CommandType = 0
CommandDelete CommandType = 1
)
// WAL 记录
type WALRecord struct {
CommandType CommandType
Key Key
Value Value // Delete 时为 nil
Timestamp uint64
CRC uint32
}
// MemTable 条目
type Entry struct {
Key Key
Value Value
Timestamp uint64
Deleted bool // 墓碑标记
}
// SSTable 元数据
type SSTableMeta struct {
Level int
FileID uint64
MinKey Key
MaxKey Key
Size int64
NumKeys int
CreatedAt time.Time
}

二、MemTable 实现#

2.1 跳表实现#

MemTable 使用跳表(Skip List)实现,提供 O(log N) 的查找和插入:

package minilsm
import (
"math/rand"
"bytes"
)
const maxLevel = 12
type skipListNode struct {
key Key
value Value
timestamp uint64
deleted bool
forward []*skipListNode
}
type SkipList struct {
head *skipListNode
level int
size int
}
func NewSkipList() *SkipList {
head := &skipListNode{
forward: make([]*skipListNode, maxLevel),
}
return &SkipList{head: head, level: 1}
}
func (sl *SkipList) randomLevel() int {
level := 1
for rand.Float64() < 0.5 && level < maxLevel {
level++
}
return level
}
func (sl *SkipList) Put(key Key, value Value, timestamp uint64, deleted bool) {
update := make([]*skipListNode, maxLevel)
current := sl.head
for i := sl.level - 1; i >= 0; i-- {
for current.forward[i] != nil &&
bytes.Compare(current.forward[i].key, key) < 0 {
current = current.forward[i]
}
update[i] = current
}
if current.forward[0] != nil &&
bytes.Equal(current.forward[0].key, key) {
current.forward[0].value = value
current.forward[0].timestamp = timestamp
current.forward[0].deleted = deleted
return
}
newLevel := sl.randomLevel()
if newLevel > sl.level {
for i := sl.level; i < newLevel; i++ {
update[i] = sl.head
}
sl.level = newLevel
}
newNode := &skipListNode{
key: key,
value: value,
timestamp: timestamp,
deleted: deleted,
forward: make([]*skipListNode, newLevel),
}
for i := 0; i < newLevel; i++ {
newNode.forward[i] = update[i].forward[i]
update[i].forward[i] = newNode
}
sl.size++
}
func (sl *SkipList) Get(key Key) (Value, uint64, bool, bool) {
current := sl.head
for i := sl.level - 1; i >= 0; i-- {
for current.forward[i] != nil &&
bytes.Compare(current.forward[i].key, key) < 0 {
current = current.forward[i]
}
}
if current.forward[0] != nil &&
bytes.Equal(current.forward[0].key, key) {
node := current.forward[0]
return node.value, node.timestamp, node.deleted, true
}
return nil, 0, false, false
}

三、WAL 实现#

3.1 WAL 写入#

package minilsm
import (
"encoding/binary"
"hash/crc32"
"os"
)
type WAL struct {
file *os.File
}
func NewWAL(path string) (*WAL, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
return &WAL{file: f}, nil
}
// WAL 记录格式:
// [CRC 4字节][Type 1字节][KeyLen 4字节][Key][ValueLen 4字节][Value][Timestamp 8字节]
func (w *WAL) Append(cmdType CommandType, key Key, value Value, timestamp uint64) error {
// 构建记录体(不含 CRC)
buf := make([]byte, 0, 4+1+4+len(key)+4+len(value)+8)
// Type
buf = append(buf, byte(cmdType))
// KeyLen + Key
keyLen := make([]byte, 4)
binary.BigEndian.PutUint32(keyLen, uint32(len(key)))
buf = append(buf, keyLen...)
buf = append(buf, key...)
// ValueLen + Value
valLen := make([]byte, 4)
if value != nil {
binary.BigEndian.PutUint32(valLen, uint32(len(value)))
}
buf = append(buf, valLen...)
if value != nil {
buf = append(buf, value...)
}
// Timestamp
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, timestamp)
buf = append(buf, ts...)
// 计算 CRC
crc := crc32.ChecksumIEEE(buf)
crcBuf := make([]byte, 4)
binary.BigEndian.PutUint32(crcBuf, crc)
// 写入:CRC + Body
record := append(crcBuf, buf...)
if _, err := w.file.Write(record); err != nil {
return err
}
return w.file.Sync() // fsync 保证持久性
}

3.2 WAL 恢复#

func (w *WAL) Replay() ([]WALRecord, error) {
data, err := os.ReadFile(w.file.Name())
if err != nil {
return nil, err
}
var records []WALRecord
offset := 0
for offset < len(data) {
// 读取 CRC
if offset+4 > len(data) {
break // 不完整的记录,截断
}
expectedCRC := binary.BigEndian.Uint32(data[offset : offset+4])
offset += 4
// 读取记录体
bodyStart := offset
if offset+1 > len(data) { break }
cmdType := CommandType(data[offset])
offset += 1
// KeyLen + Key
if offset+4 > len(data) { break }
keyLen := int(binary.BigEndian.Uint32(data[offset : offset+4]))
offset += 4
if offset+keyLen > len(data) { break }
key := make(Key, keyLen)
copy(key, data[offset:offset+keyLen])
offset += keyLen
// ValueLen + Value
if offset+4 > len(data) { break }
valLen := int(binary.BigEndian.Uint32(data[offset : offset+4]))
offset += 4
var value Value
if valLen > 0 {
if offset+valLen > len(data) { break }
value = make(Value, valLen)
copy(value, data[offset:offset+valLen])
offset += valLen
}
// Timestamp
if offset+8 > len(data) { break }
timestamp := binary.BigEndian.Uint64(data[offset : offset+8])
offset += 8
// 校验 CRC
actualCRC := crc32.ChecksumIEEE(data[bodyStart:offset])
if actualCRC != expectedCRC {
break // CRC 不匹配,停止回放
}
records = append(records, WALRecord{
CommandType: cmdType,
Key: key,
Value: value,
Timestamp: timestamp,
})
}
return records, nil
}

四、SSTable 实现#

4.2 SSTable 写入#

type SSTableWriter struct {
file *os.File
dataBlocks [][]byte // 数据块
indexBlock []IndexEntry // 索引条目
bloom *BloomFilter
currentBlock []byte
blockSize int
minKey Key
maxKey Key
numKeys int
}
type IndexEntry struct {
MinKey Key
Offset int64
Size int32
}
func (w *SSTableWriter) Add(key Key, value Value, deleted bool) error {
w.bloom.Add(key)
entry := w.serializeEntry(key, value, deleted)
if len(w.currentBlock)+len(entry) > w.blockSize && len(w.currentBlock) > 0 {
w.finishBlock()
}
w.currentBlock = append(w.currentBlock, entry...)
w.numKeys++
if w.minKey == nil || bytes.Compare(key, w.minKey) < 0 {
w.minKey = key
}
if w.maxKey == nil || bytes.Compare(key, w.maxKey) > 0 {
w.maxKey = key
}
return nil
}
func (w *SSTableWriter) Finish() (*SSTableMeta, error) {
// 写入最后一个数据块
if len(w.currentBlock) > 0 {
w.finishBlock()
}
// 写入 Bloom Filter
metaOffset, _ := w.file.Seek(0, 1)
bloomData := w.bloom.Marshal()
w.file.Write(bloomData)
// 写入索引块
indexOffset, _ := w.file.Seek(0, 1)
for _, entry := range w.indexBlock {
w.file.Write(w.serializeIndexEntry(entry))
}
// 写入 Footer
footer := make([]byte, 48)
binary.BigEndian.PutUint64(footer[0:8], uint64(metaOffset))
binary.BigEndian.PutUint64(footer[8:16], uint64(indexOffset))
copy(footer[16:], []byte("MINILSM01")) // magic number
w.file.Write(footer)
w.file.Sync()
return &SSTableMeta{
MinKey: w.minKey,
MaxKey: w.maxKey,
NumKeys: w.numKeys,
}, nil
}

五、Bloom Filter 实现#

package minilsm
import "hash/fnv"
type BloomFilter struct {
bits []uint64
numBits uint
numHashs int
}
func NewBloomFilter(expectedItems int, falsePositiveRate float64) *BloomFilter {
// 计算最优参数
// m = -n * ln(p) / (ln2)^2
// k = m/n * ln2
m := uint(-float64(expectedItems) * math.Log(falsePositiveRate) /
(math.Ln2 * math.Ln2))
k := int(float64(m) / float64(expectedItems) * math.Ln2)
numWords := (m + 63) / 64
return &BloomFilter{
bits: make([]uint64, numWords),
numBits: m,
numHashs: k,
}
}
func (bf *BloomFilter) Add(key Key) {
h1, h2 := bf.hashPair(key)
for i := 0; i < bf.numHashs; i++ {
// 双重哈希:h(i) = h1 + i * h2
idx := (h1 + uint64(i)*h2) % uint64(bf.numBits)
wordIdx := idx / 64
bitIdx := idx % 64
bf.bits[wordIdx] |= 1 << bitIdx
}
}
func (bf *BloomFilter) MayContain(key Key) bool {
h1, h2 := bf.hashPair(key)
for i := 0; i < bf.numHashs; i++ {
idx := (h1 + uint64(i)*h2) % uint64(bf.numBits)
wordIdx := idx / 64
bitIdx := idx % 64
if bf.bits[wordIdx]&(1<<bitIdx) == 0 {
return false // 一定不存在
}
}
return true // 可能存在
}
func (bf *BloomFilter) hashPair(key Key) (uint64, uint64) {
h1 := fnv.New64a()
h1.Write(key)
h2 := fnv.New64a()
h2.Write([]byte("salt"))
h2.Write(key)
return h1.Sum64(), h2.Sum64()
}

六、Compaction 实现#

6.1 Leveled Compaction#

graph LR subgraph "Leveled Compaction" L0["L0<br/>4 个文件<br/>范围可重叠"] -->|"Compaction"| L1["L1<br/>10x 大小<br/>范围不重叠"] L1 -->|"Compaction"| L2["L2<br/>10x 大小<br/>范围不重叠"] L2 -->|"Compaction"| L3["L3<br/>10x 大小<br/>范围不重叠"] end style L0 fill:#ffcdd2,stroke:#c62828 style L1 fill:#fff9c4,stroke:#f9a825 style L2 fill:#c8e6c9,stroke:#2e7d32 style L3 fill:#e3f2fd,stroke:#1565c0
type Compaction struct {
levels []*Level
l0Trigger int // L0 文件数触发阈值
ratio int // 层级大小比例
}
type Level struct {
files []*SSTableReader
maxSize int64
}
func (c *Compaction) ShouldCompact() (int, bool) {
// L0: 文件数超过阈值
if len(c.levels[0].files) >= c.l0Trigger {
return 0, true
}
// L1+: 层级大小超过阈值
for i := 1; i < len(c.levels); i++ {
totalSize := int64(0)
for _, f := range c.levels[i].files {
totalSize += f.meta.Size
}
if totalSize > c.levels[i].maxSize {
return i, true
}
}
return -1, false
}
func (c *Compaction) Run(level int) error {
if level >= len(c.levels)-1 {
return nil // 最后一层不需要 Compaction
}
// 选择 Compaction 的文件
upperFiles := c.levels[level].files
if len(upperFiles) == 0 {
return nil
}
// 确定下层的重叠文件
minKey := upperFiles[0].meta.MinKey
maxKey := upperFiles[0].meta.MaxKey
for _, f := range upperFiles[1:] {
if bytes.Compare(f.meta.MinKey, minKey) < 0 {
minKey = f.meta.MinKey
}
if bytes.Compare(f.meta.MaxKey, maxKey) > 0 {
maxKey = f.meta.MaxKey
}
}
lowerFiles := c.findOverlappingFiles(level+1, minKey, maxKey)
// 合并排序
merged := c.mergeSort(upperFiles, lowerFiles)
// 写入新的 SSTable
newFiles := c.writeNewSSTables(level+1, merged)
// 替换旧文件
c.levels[level].files = c.removeFiles(level, upperFiles)
c.levels[level+1].files = c.replaceFiles(level+1,
lowerFiles, newFiles)
return nil
}

6.2 合并排序#

// 合并多个 SSTable 的迭代器
func (c *Compaction) mergeSort(
upperFiles, lowerFiles []*SSTableReader,
) []Entry {
// 使用最小堆进行 K 路归并
heap := &EntryHeap{}
heap.Init()
// 为每个 SSTable 创建迭代器
for _, f := range append(upperFiles, lowerFiles...) {
iter := f.NewIterator()
if iter.Next() {
heap.Push(HeapEntry{
Entry: iter.Entry(),
iter: iter,
source: f,
})
}
}
var result []Entry
var lastKey Key
for heap.Len() > 0 {
top := heap.Pop().(HeapEntry)
// 去重:只保留最新版本
if !bytes.Equal(top.Key, lastKey) {
// 跳过墓碑标记(如果是最底层)
if !top.Deleted || !c.isLastLevel(top.source) {
result = append(result, top.Entry)
}
lastKey = top.Key
}
// 推进迭代器
if top.iter.Next() {
heap.Push(HeapEntry{
Entry: top.iter.Entry(),
iter: top.iter,
source: top.source,
})
}
}
return result
}

七、LSM 引擎整合#

7.1 核心引擎#

type MiniLSM struct {
memTable *MemTable
immuTable *MemTable // Immutable MemTable
wal *WAL
levels []*Level
compaction *Compaction
timestamp uint64
mu sync.RWMutex
}
func Open(dir string) (*MiniLSM, error) {
lsm := &MiniLSM{
memTable: NewMemTable(4 * 1024 * 1024), // 4MB
levels: make([]*Level, 3), // L0, L1, L2
}
// 打开 WAL
wal, err := NewWAL(filepath.Join(dir, "wal.log"))
if err != nil {
return nil, err
}
lsm.wal = wal
// 从 WAL 恢复
if err := lsm.recover(); err != nil {
return nil, err
}
// 加载 SSTable
if err := lsm.loadSSTables(dir); err != nil {
return nil, err
}
return lsm, nil
}

7.2 写入路径#

func (lsm *MiniLSM) Put(key Key, value Value) error {
lsm.mu.Lock()
defer lsm.mu.Unlock()
lsm.timestamp++
ts := lsm.timestamp
// 1. 写入 WAL
if err := lsm.wal.Append(CommandPut, key, value, ts); err != nil {
return err
}
// 2. 写入 MemTable
lsm.memTable.Put(key, value, ts)
// 3. 检查是否需要 Flush
if lsm.memTable.ShouldFlush() {
if err := lsm.flush(); err != nil {
return err
}
}
return nil
}
func (lsm *MiniLSM) Delete(key Key) error {
lsm.mu.Lock()
defer lsm.mu.Unlock()
lsm.timestamp++
// 写入墓碑标记
if err := lsm.wal.Append(CommandDelete, key, nil, lsm.timestamp); err != nil {
return err
}
lsm.memTable.Delete(key, lsm.timestamp)
if lsm.memTable.ShouldFlush() {
return lsm.flush()
}
return nil
}

7.3 读取路径#

func (lsm *MiniLSM) Get(key Key) (Value, error) {
lsm.mu.RLock()
defer lsm.mu.RUnlock()
// 1. 查 MemTable
if val, ts, deleted, ok := lsm.memTable.Get(key); ok {
if deleted {
return nil, ErrNotFound
}
return val, nil
}
// 2. 查 Immutable MemTable
if lsm.immuTable != nil {
if val, ts, deleted, ok := lsm.immuTable.Get(key); ok {
if deleted {
return nil, ErrNotFound
}
return val, nil
}
}
// 3. 查 SSTable(从 L0 到 LN)
for _, level := range lsm.levels {
for _, sst := range level.files {
// 检查键范围
if bytes.Compare(key, sst.meta.MinKey) < 0 ||
bytes.Compare(key, sst.meta.MaxKey) > 0 {
continue
}
// 检查 Bloom Filter
if !sst.bloom.MayContain(key) {
continue
}
// 查找
if val, deleted, err := sst.Get(key); err == nil {
if deleted {
return nil, ErrNotFound
}
return val, nil
}
}
}
return nil, ErrNotFound
}

7.4 Flush 和崩溃恢复#

func (lsm *MiniLSM) flush() error {
// 1. 将 MemTable 标记为 Immutable
lsm.immuTable = lsm.memTable
lsm.memTable = NewMemTable(4 * 1024 * 1024)
// 2. 创建新的 WAL(旧的 WAL 可以在 Flush 完成后删除)
newWAL, _ := NewWAL(filepath.Join(lsm.dir, "wal_new.log"))
oldWAL := lsm.wal
lsm.wal = newWAL
// 3. 后台将 Immutable MemTable 写入 SSTable
writer := NewSSTableWriter(filepath.Join(lsm.dir,
fmt.Sprintf("sst_%d.sst", time.Now().UnixNano())))
iter := lsm.immuTable.table.NewIterator()
for iter.Next() {
writer.Add(iter.Key(), iter.Value(), iter.Deleted())
}
meta, _ := writer.Finish()
// 4. 将 SSTable 加入 L0
reader, _ := OpenSSTable(writer.file.Name())
lsm.levels[0].files = append(lsm.levels[0].files, reader)
// 5. 清理 Immutable MemTable 和旧 WAL
lsm.immuTable = nil
os.Remove(oldWAL.file.Name())
// 6. 检查是否需要 Compaction
if len(lsm.levels[0].files) >= 4 {
go lsm.compaction.Run(0)
}
return nil
}
func (lsm *MiniLSM) recover() error {
records, err := lsm.wal.Replay()
if err != nil {
return err
}
// 重放 WAL 记录到 MemTable
for _, rec := range records {
switch rec.CommandType {
case CommandPut:
lsm.memTable.Put(rec.Key, rec.Value, rec.Timestamp)
case CommandDelete:
lsm.memTable.Delete(rec.Key, rec.Timestamp)
}
if rec.Timestamp > lsm.timestamp {
lsm.timestamp = rec.Timestamp
}
}
return nil
}

八、性能基准测试#

8.2 与 RocksDB 对比#

指标MiniLSMRocksDB差距
随机写 QPS~50K~500K10x
随机读 QPS~20K~300K15x
范围扫描~100K rows/s~5M rows/s50x
写放大~15x~10x1.5x
读放大~20x~8x2.5x
Warning

MiniLSM 的性能远低于 RocksDB。差距主要来自:缺少 Block Cache、Compaction 策略简单、没有向量化执行、没有压缩、Go vs C++ 的语言差异。MiniLSM 的价值在于理解原理,不是追求性能。

flowchart TB WRITE["写入请求"] --> MEM["MemTable<br/>内存表"] --> WAL["WAL 日志"] MEM -->|"满"| IMM["Immutable MemTable"] --> FLUSH["Flush 到磁盘"] --> SST["SSTable 文件"] SST -->|"Compaction"| COMP["合并排序"] --> NEWSST["新 SSTable"] style MEM fill:#bbdefb,stroke:#1565c0 style SST fill:#c8e6c9,stroke:#2e7d32
flowchart LR READ["读取请求"] --> MEM2["MemTable<br/>内存查找"] --> FOUND{"找到?"} FOUND -->|"是"| RET["返回"] FOUND -->|"否"| BLOOM["Bloom Filter<br/>过滤判断"] --> PROB{"可能在?"} PROB -->|"是"| BLOCK["Block Cache<br/>SSTable 块"] --> SEARCH["二分查找"] PROB -->|"否"| MISS["未命中"] style BLOOM fill:#fff9c4,stroke:#f9a825 style BLOCK fill:#c8e6c9,stroke:#2e7d32

九、总结#

组件实现方式对应章节
MemTable跳表Ch6 LSM树深入
WAL顺序追加 + CRC 校验Ch9 WAL与崩溃恢复
SSTable数据块 + 索引块 + FooterCh6 LSM树深入
Bloom Filter双重哈希 + 位数组Ch8 读路径
CompactionLeveled 合并排序Ch6 LSM树深入
崩溃恢复WAL 回放Ch9 WAL与崩溃恢复

通过手写 MiniLSM,我们综合运用了前 18 章的知识:

Tip

如果你对存储引擎实现感兴趣,推荐阅读以下开源项目:

  • RocksDB:facebook/rocksdb — 工业级 LSM 实现

  • LevelDB:google/leveldb — RocksDB 的前身,代码更简洁

  • badger:dgraph-io/badger — Go 实现的 LSM 引擎

  • mini-lsm:skyzh/mini-lsm — Rust 实现的教学 LSM 引擎

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

综合实战
https://blog.souloss.com/posts/storage/storage-hands-on-lsm/
作者
Souloss
发布于
2025-09-02
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时