一张 100 列的表,执行 SELECT AVG(salary) FROM employees——行存要读取全部 100 列的数据,而查询只用到了 1 列。99% 的 I/O 浪费了。列式存储把同一列的值连续存放,这条查询只需读取 salary 列,I/O 量直接缩减到 1/100。这就是分析场景下列存比行存快 10–100 倍的根本原因。
Parquet 和 ORC 是列存的两大文件格式。它们的物理布局、编码方式、Zone Map 和谓词下推机制,共同决定了分析引擎能跑多快。
一、为什么需要列式存储
1.1 行存 vs 列存
1.2 查询场景对比
-- 分析查询:只查平均薪资SELECT AVG(salary) FROM employees WHERE city = '北京';
-- 行存:需要读取所有列的数据-- 100 列 × 1M 行 = 读取 100M 个值(假设每列 8B = 800MB)
-- 列存:只需读取 salary 和 city 两列-- 2 列 × 1M 行 = 读取 2M 个值(= 16MB)-- I/O 减少 50 倍!| 维度 | 行存 | 列存 |
|---|---|---|
| 点查 | 快(整行读取) | 慢(需重组多列) |
| 范围扫描 | 一般 | 快(只读相关列) |
| 聚合分析 | 慢(读取所有列) | 快(只读聚合列) |
| 更新 | 快(原地更新) | 慢(需修改多列文件) |
| 压缩比 | 低(混合类型) | 高(同类型连续) |
| 适用场景 | OLTP | OLAP |
二、Parquet 文件格式
2.1 Parquet 文件结构
2.2 Parquet 关键参数
| 参数 | 默认值 | 说明 | 影响 |
|---|---|---|---|
| Row Group 大小 | 128MB | 一组行的数据块集合 | 越大,I/O 效率越高 |
| Page 大小 | 1MB | 列数据的最小读取单元 | 越大,压缩比越高 |
| Dictionary Page | 可选 | 字典编码的字典值 | 低基数列压缩比高 |
| Column Index | 可选 | 每页的 min/max/null_count | 谓词下推过滤 |
2.3 Parquet 元数据
# Parquet 文件元数据结构class ParquetMetadata: version: int # 文件版本 schema: SchemaDescriptor # 列定义 num_rows: int # 总行数 row_groups: list[RowGroupMetadata]
class RowGroupMetadata: num_rows: int # 行数 total_byte_size: int # 总大小 columns: list[ColumnChunkMetadata]
class ColumnChunkMetadata: file_offset: int # 文件偏移 total_compressed_size: int # 压缩后大小 total_uncompressed_size: int # 压缩前大小 num_values: int # 值数量 encoding: list[str] # 编码方式 statistics: ColumnStatistics # 统计信息
class ColumnStatistics: min_value: any # 最小值 max_value: any # 最大值 null_count: int # NULL 数量 distinct_count: int # 唯一值数量三、ORC 文件格式
3.1 ORC vs Parquet
| 维度 | Parquet | ORC |
|---|---|---|
| 起源 | Twitter/Cloudera | Hortonworks |
| Row Group | Row Group(128MB) | Stripe(256MB) |
| 索引 | Column Index(可选) | Row Index(内置) |
| 压缩 | Snappy/ZSTD/Gzip | Snappy/ZSTD/LZ4 |
| 编码 | 字典/Delta/RLE | 字典/Delta/RLE/BitPack |
| ACID | 不支持 | 支持(Hive) |
| 生态 | Spark/Impala/Arrow | Hive/Presto/Trino |
3.2 ORC Stripe 结构
四、向量化执行
4.1 为什么向量化
传统行式执行一次处理一行,向量化执行一次处理一批(向量):
# 行式执行(一次一行)def row_oriented_execution(table): total = 0 for row in table: if row.city == '北京': total += row.salary return total
# 向量化执行(一次一批)def vectorized_execution(salary_column, city_column, batch_size=1024): total = 0 for i in range(0, len(salary_column), batch_size): # 一次处理 1024 个值 salary_batch = salary_column[i:i+batch_size] city_batch = city_column[i:i+batch_size]
# 向量化过滤 mask = city_batch == '北京' # 一次比较 1024 个值
# 向量化聚合 total += salary_batch[mask].sum()
return total| 维度 | 行式执行 | 向量化执行 |
|---|---|---|
| 每次处理 | 1 行 | 1024+ 行 |
| CPU 利用率 | 低(分支预测失败) | 高(SIMD 指令) |
| 函数调用 | 每行一次 | 每批一次 |
| 缓存利用 | 差 | 好 |
4.2 SIMD 加速
// SIMD 向量化聚合(AVX2)// 一次处理 8 个 int32 值void vectorized_sum_avx2(int32_t *values, int n, int32_t *result) { __m256i sum = _mm256_setzero_si256(); // 8 × 32bit = 256bit
for (int i = 0; i < n; i += 8) { __m256i vec = _mm256_loadu_si256((__m256i*)(values + i)); sum = _mm256_add_epi32(sum, vec); // 一次加 8 个值 }
// 横向求和 int32_t temp[8]; _mm256_storeu_si256((__m256i*)temp, sum); *result = 0; for (int i = 0; i < 8; i++) *result += temp[i];}4.3 Parquet 编码细节
Parquet 的 Data Page 内部采用分层编码——先列级编码,再块级压缩:
# Parquet 编码流程(以整数列为例)def parquet_encode_int_column(values): # 1. 统计分析 cardinality = len(set(values)) is_sorted = all(values[i] <= values[i+1] for i in range(len(values)-1))
# 2. 选择列级编码 if cardinality < len(values) * 0.01: # 低基数:字典编码 dict_page, indices = dictionary_encode(values) # 索引再用 RLE + 位打包 encoded = rle_encode(bit_pack_encode(indices)) return dict_page, encoded elif is_sorted: # 排序整数:Delta + 位打包 delta = delta_encode(values) return bit_pack_encode(delta) else: # 通用:Plain 编码 return plain_encode(values)
# 3. 块级压缩(对编码后的字节流) # compressed = zstd_compress(encoded_bytes)| 编码组合 | 适用列类型 | 压缩比 | Parquet 版本 |
|---|---|---|---|
| Dictionary + RLE + BitPack | 枚举、状态、国家 | 50–100x | V1/V2 |
| Delta + BitPack | 自增 ID、时间戳 | 10–30x | V2 |
| RLE + BitPack | 排序后低基数 | 5–20x | V1 |
| Plain + ZSTD | 随机浮点、JSON | 2–4x | V1/V2 |
| ByteRunSplit + LZ4 | 重复字节模式 | 3–8x | V2 |
4.4 列式存储的更新策略
列式存储天然不适合频繁更新,但实际场景中数据并非完全不可变:
| 更新策略 | 原理 | 适用场景 | 代表系统 |
|---|---|---|---|
| 重写文件 | 读取→修改→写新文件 | 低频批量更新 | Parquet + Spark |
| Delta 文件 | 增量文件 + 合并读取 | 中频追加/更新 | Delta Lake、Iceberg |
| 行存 + 列存双写 | 写入行存,异步转列存 | HTAP | TiFlash |
| Copy-on-Write | 修改时重写受影响的文件 | ACID 事务 | Iceberg |
| Merge-on-Read | 增量文件在读取时合并 | 高频更新 | Hudi MOR 表 |
4.5 Apache Arrow 零拷贝
Apache Arrow 定义了跨语言的列式内存格式,核心优势是零拷贝读取:
// Arrow 列式内存布局struct ArrowArray { int64_t length; // 元素数量 int64_t null_count; // NULL 数量 int64_t offset; // 偏移量 byte *buffers[]; // 缓冲区数组 // 对于 int32 列: // buffers[0] = 有效性位图(1 bit/值) // buffers[1] = 数据缓冲区(4 bytes/值,连续存储)};
// 零拷贝:多个进程可 mmap 同一 Arrow 内存区域// 无需序列化/反序列化,直接读取 buffers 中的数据// Spark → Arrow → Python: 避免了 JVM 到 Python 的数据拷贝| 特性 | Arrow | Parquet |
|---|---|---|
| 存储位置 | 内存 | 磁盘 |
| 是否压缩 | 否(可选 LZ4) | 是 |
| 读取方式 | 零拷贝(指针直读) | 需解码+解压 |
| 跨语言 | C++/Java/Python/Rust/Go | C++/Java/Python |
| 典型用途 | 进程间数据交换 | 持久化存储 |
Arrow 的零拷贝特性使其成为数据分析的”内存总线”——Spark、Pandas、Polars 之间可通过 Arrow 共享数据而无需序列化。PyArrow 的 pandas 兼容层将 DataFrame 到 Arrow 的转换时间从秒级降到毫秒级。
4.6 数据湖格式
现代数据湖在 Parquet/ORC 之上增加了事务和版本管理能力:
| 格式 | 开源方 | ACID | 时间旅行 | Schema 演化 | 更新策略 | 元数据存储 |
|---|---|---|---|---|---|---|
| Delta Lake | Databricks | CoW / MoR | 事务日志(JSON) | |||
| Apache Iceberg | Netflix | CoW / MoR | 跨层元数据树 | |||
| Apache Hudi | Uber | MoR 为主 | 时间线日志 |
# 数据湖读取流程(以 Iceberg 为例)def iceberg_read(table, snapshot_id=None): # 1. 获取快照元数据 snapshot = table.snapshots.get(snapshot_id or table.current_snapshot)
# 2. 从 Manifest List 获取数据文件列表 manifest_list = read_manifest_list(snapshot.manifest_list_path)
# 3. 过滤:利用文件级统计跳过不匹配的文件 matching_files = [] for manifest in manifest_list: for file in manifest.data_files: if zone_map_matches(file.stats, query_filter): matching_files.append(file)
# 4. 读取匹配的 Parquet 文件 return read_parquet_files(matching_files, columns, filters)五、Zone Map 与谓词下推
5.1 Zone Map(Min/Max 统计)
Zone Map 在每个数据页/Row Group 级别存储 min/max 统计:
# Zone Map 过滤def zone_map_filter(query_filter, row_group_stats): """利用 Zone Map 跳过不相关的 Row Group""" skipped = 0 read = 0
for rg_stats in row_group_stats: # 检查过滤条件是否可能匹配 if query_filter.column == 'salary': if query_filter.op == '>' and query_filter.value > rg_stats.salary_max: skipped += 1 # 所有值都小于阈值,跳过 continue if query_filter.op == '<' and query_filter.value < rg_stats.salary_min: skipped += 1 # 所有值都大于阈值,跳过 continue
read += 1 # 可能匹配,需要读取
print(f"跳过 {skipped} 个 Row Group,读取 {read} 个") return read
# 示例:SELECT * WHERE salary > 50000# Row Group 1: salary min=30000, max=45000 → 跳过# Row Group 2: salary min=40000, max=60000 → 读取# Row Group 3: salary min=55000, max=80000 → 读取# Row Group 4: salary min=20000, max=35000 → 跳过# 4 个 Row Group 中跳过 2 个,I/O 减少 50%5.2 谓词下推
| 过滤层级 | 说明 | 效果 |
|---|---|---|
| 文件级 | 文件元数据中的统计 | 跳过整个文件 |
| Row Group 级 | Zone Map min/max | 跳过整个 Row Group |
| Page 级 | Page 级统计 | 跳过整个 Page |
| Bloom Filter | 列级 Bloom Filter | 跳过不包含值的 Page |
| 行级 | 读取后逐行过滤 | 最精确但最慢 |
谓词下推是列式存储最重要的优化。通过 Zone Map + Bloom Filter,Spark/Trino 可以在读取数据前就跳过 90%+ 的数据块,使全表扫描实际上变成了”智能扫描”。
六、行列混存
6.1 PAX 格式
PAX(Partition Attributes Across)是行列混存格式:
| 格式 | 行组织 | 列组织 | 适用场景 |
|---|---|---|---|
| NSM(行存) | 整行连续 | OLTP | |
| DSM(列存) | 整列连续 | OLAP | |
| PAX(混存) | 行组内 | 行组内列连续 | HTAP |
6.2 HTAP 场景的选择
| 场景 | 推荐 | 原因 |
|---|---|---|
| 纯 OLTP | 行存 | 点查和更新效率高 |
| 纯 OLAP | 列存(Parquet/ORC) | 分析查询 I/O 少 |
| HTAP | 混存或双存储 | TiDB: 行存 + 列存副本 |
七、实战:Parquet 文件操作
7.1 创建 Parquet 文件
# 使用 PyArrow 创建 Parquet 文件import pyarrow as paimport pyarrow.parquet as pq
# 创建表table = pa.table({ 'id': range(1_000_000), 'name': ['user_%d' % i for i in range(1_000_000)], 'age': [20 + i % 60 for i in range(1_000_000)], 'city': ['北京' if i % 3 == 0 else '上海' if i % 3 == 1 else '广州' for i in range(1_000_000)], 'salary': [5000 + i * 10 for i in range(1_000_000)],})
# 写入 Parquet 文件pq.write_table( table, 'employees.parquet', row_group_size=128 * 1024 * 1024, # 128MB Row Group compression='zstd', # ZSTD 压缩 compression_level=3, use_dictionary=True, # 字典编码 write_statistics=True, # 写入统计信息)
# 查看文件元数据parquet_file = pq.ParquetFile('employees.parquet')print(f"行数: {parquet_file.metadata.num_rows}")print(f"Row Group 数: {parquet_file.metadata.num_row_groups}")print(f"压缩后大小: {parquet_file.metadata.total_compressed_size / 1024 / 1024:.1f} MB")print(f"压缩前大小: {parquet_file.metadata.total_uncompressed_size / 1024 / 1024:.1f} MB")7.2 谓词下推读取
# 使用谓词下推读取 Parquet# 只读取 salary > 50000 的行table = pq.read_table( 'employees.parquet', columns=['name', 'salary'], # 列裁剪 filters=[('salary', '>', 50000)], # 谓词下推)
# Spark SQL 中的谓词下推# spark.read.parquet('employees.parquet')# .filter('salary > 50000')# .select('name', 'salary')7.3 Parquet 文件统计信息
# 查看 Parquet 列统计信息parquet_file = pq.ParquetFile('employees.parquet')
for i, rg in enumerate(parquet_file.metadata.row_groups): for j, col in enumerate(rg.columns): stats = col.statistics print(f"RG{i} Col{j}: min={stats.min}, max={stats.max}, " f"null_count={stats.null_count}, distinct={stats.distinct_count}")八、总结
| 主题 | 核心要点 | 关键词 |
|---|---|---|
| 行存 vs 列存 | OLTP 用行存(点查快),OLAP 用列存(分析快) | 行存, 列存 |
| Parquet | Row Group + Column Chunk + Data Page 三级结构,丰富的编码和统计 | Row Group, Column Chunk |
| ORC | Stripe + Row Index,内置索引支持谓词下推 | Stripe, Row Index |
| 向量化执行 | 一次处理一批数据,SIMD 加速,CPU 利用率高 | SIMD, 批处理 |
| Zone Map | min/max 统计跳过不相关的数据块,减少 90%+ I/O | min/max, 跳块 |
| 谓词下推 | 过滤条件在存储层执行,避免读取不匹配数据 | 存储层过滤, I/O 裁剪 |
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






