Parquet 源码解析

Parquet 源码解析

Parquet文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此Parquet格式文件是自解析的。在HDFS文件系统和Parquet文件中存在如下几个概念。

  • HDFS块(Block):它是HDFS上的最小的副本单位,HDFS会把一个Block存储在本地的一个文件并且维护分散在不同的机器上的多个副本,通常情况下一个Block的大小为256M、512M等。
  • HDFS文件(File):一个HDFS的文件,包括数据和元数据,数据分散存储在多个Block中。
  • 行组(Row Group):按照行将数据物理上划分为多个单元,每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,Parquet读写的时候会将整个行组缓存在内存中,所以如果每一个行组的大小是由内存大的小决定的,例如记录占用空间比较小的Schema可以在每一个行组中存储更多的行。
  • 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
  • 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。

一般按照 Block 大小来设置行组的大小,每一个mapper处理数据的最小单位是一个 block,这样就可以把每一个行组由一个mapper处理,提高任务执行并行度

MapredParquetOutputFormat.getHiveRecordWriter()
-> DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert((List)columnNames, columnTypes), jobConf) 根据列名和类型,生成schema
-> ParquetRecordWriterWrapper.ParquetRecordWriterWrapper() 生成一个 ParquetOutputFormat 对象
-> ParquetFileWriter
创建一个文件写入对象,
ParquetFileWriter(Configuration configuration, MessageType schema, Path file, ParquetFileWriter.Mode mode, long rowGroupSize, int maxPaddingSize)
参数包括:conf配置,schema表结构,file文件路径,mode文件写入的模式(新建或覆写),blockSizeHDFS块大小,也就是一个行组的大小
之后会先在这个文件的最开始的位置写入四个字节的 “PAR1” 表示该文件为 parquet 格式,”parquet.writer.max-padding”, 8388608
根据这个文件写入对象,去创建一个
-> InternalParquetRecordWriter
创建一个 InternalParquetRecordWriter(fileWriter, writeSupport, schema, writeContext.getExtraMetaData(), (long)blockSize, compressor, validating, encodingProps); 对象,每读取一条数据,调用该对象的 write() 方法写入,底层实现是调用 DataWritableWriter.write(T value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
InternalParquetRecordWriter 类
public void write(T value) throws IOException, InterruptedException {
this.writeSupport.write(value);
++this.recordCount; // 每写入一条数据,记录数+1
this.checkBlockSizeReached();
}

private void checkBlockSizeReached() throws IOException {
if (this.recordCount >= this.recordCountForNextMemCheck) { // 默认值 this.recordCountForNextMemCheck = 100L,每调用一次修改为新值
// this.pageStore = new ColumnChunkPageWriteStore(this.compressor, this.schema, this.props.getAllocator());
// this.columnStore = this.props.newColumnWriteStore(this.schema, this.pageStore);
// props 就是 ParquetProperties
// 已写入到内存中的记录的总大小,除以记录数,得到平均一条记录的大小
long memSize = this.columnStore.getBufferedSize();
long recordSize = memSize / this.recordCount;
if (memSize > this.nextRowGroupSize - 2L * recordSize) {
// 如果内存中的记录总大小 > 行组大小 - 2*平均一条记录大小 ???
// 当 memSize > GroupSize(约等于blocksize),就可以刷到磁盘
LOG.debug("mem size {} > {}: flushing {} records to disk.", new Object[]{memSize, this.nextRowGroupSize, this.recordCount});
// 刷写内存的记录
this.flushRowGroupToStore();
this.initStore(); // 重置 pageStore、columnStore 等信息
this.recordCountForNextMemCheck = Math.min(Math.max(100L, this.recordCount / 2L), 10000L);
this.lastRowGroupEndPos = this.parquetFileWriter.getPos();
} else {
// 当目前内存中的记录的总大小还不够大时,修改 recordCountForNextMemCheck 的值,每次会增大一点,差不多相当于之前增量的一半,如果recordsize变化不大的话
this.recordCountForNextMemCheck = Math.min(Math.max(100L, (this.recordCount + (long)((float)this.nextRowGroupSize / (float)recordSize)) / 2L), this.recordCount + 10000L);
LOG.debug("Checked mem at {} will check again at: {}", this.recordCount, this.recordCountForNextMemCheck);
}
}
}

private void flushRowGroupToStore() throws IOException {
// 先将null值刷写出去???
this.recordConsumer.flush();
LOG.debug("Flushing mem columnStore to file. allocated memory: {}", this.columnStore.getAllocatedSize());
if (this.columnStore.getAllocatedSize() > 3L * this.rowGroupSizeThreshold) {
LOG.warn("Too much memory used: {}", this.columnStore.memUsageString());
}

if (this.recordCount > 0L) {
// 获取block的元数据信息
this.parquetFileWriter.startBlock(this.recordCount);
// 把每一列的值写到文件
this.columnStore.flush();
this.pageStore.flushToFileWriter(this.parquetFileWriter);
this.recordCount = 0L;
this.parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(this.parquetFileWriter.getNextRowGroupSize(), this.rowGroupSizeThreshold);
}

this.columnStore = null;
this.pageStore = null;
}

ColumnWriteStoreV1 类
this.columnStore = ColumnWriteStoreV1 时,执行的方法
public void flush() {
// private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap();
Collection<ColumnWriterV1> values = this.columns.values();
Iterator var2 = values.iterator();

while(var2.hasNext()) {
ColumnWriterV1 memColumn = (ColumnWriterV1)var2.next();
memColumn.flush();
}
}
ColumnWriterV1 类
public void flush() {
if (this.valueCount > 0) {
this.writePage();
// 将所有数据都转换成了Bytes
// this.pageWriter.writePage(BytesInput.concat(new BytesInput[]{this.repetitionLevelColumn.getBytes(), this.definitionLevelColumn.getBytes(), this.dataColumn.getBytes()}), this.valueCount, this.statistics, this.repetitionLevelColumn.getEncoding(), this.definitionLevelColumn.getEncoding(), this.dataColumn.getEncoding());
}

DictionaryPage dictionaryPage = this.dataColumn.toDictPageAndClose();
if (dictionaryPage != null) {
try {
this.pageWriter.writeDictionaryPage(dictionaryPage);
} catch (IOException var3) {
throw new ParquetEncodingException("could not write dictionary page for " + this.path, var3);
}
this.dataColumn.resetDictionary();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
ParquetProperties 默认变量信息
private Builder() {
this.pageSize = 1048576;
this.dictPageSize = 1048576;
this.enableDict = true;
this.writerVersion = ParquetProperties.DEFAULT_WRITER_VERSION => "v1";
this.minRowCountForPageSizeCheck = 100;
this.maxRowCountForPageSizeCheck = 10000;
this.estimateNextSizeCheck = true;
this.allocator = new HeapByteBufferAllocator();
this.valuesWriterFactory = ParquetProperties.DEFAULT_VALUES_WRITER_FACTORY;
}

org.apache.parquet.hadoop.api 包下

  • ReadSupport
    • GroupReadSupport
    • DataWritableReadSupport
  • WriteSupport
    • GroupWriteSupport
    • DataWritableWriteSupport

映射下推(Project PushDown)
说到列式存储的优势,映射下推是最突出的,它意味着在获取表中原始数据时只需要扫描查询中需要的列,由于每一列的所有值都是连续存储的,所以分区取出每一列的所有值就可以实现TableScan算子,而避免扫描整个表文件内容。

在Parquet中原生就支持映射下推,执行查询的时候可以通过Configuration传递需要读取的列的信息,这些列必须是Schema的子集,映射每次会扫描一个Row Group的数据,然后一次性得将该Row Group里所有需要的列的Cloumn Chunk都读取到内存中,每次读取一个Row Group的数据能够大大降低随机读的次数,除此之外,Parquet在读取的时候会考虑列是否连续,如果某些需要的列是存储位置是连续的,那么一次读操作就可以把多个列的数据读取到内存。

谓词下推(Predicate PushDown)
在数据库之类的查询系统中最常用的优化手段就是谓词下推了,通过将一些过滤条件尽可能的在最底层执行可以减少每一层交互的数据量,从而提升性能,例如”select count(1) from A Join B on A.id = B.id where A.a >数据 10 and B.b < 100”SQL查询中,在处理Join操作之前需要首先对A和B执行TableScan操作,然后再进行Join,再执行过滤,最后计算聚合函数返回,但是如果把过滤条件A.a > 10和B.b < 100分别移到A表的TableScan和B表的TableScan的时候执行,可以大大降低Join操作的输入数据。

无论是行式存储还是列式存储,都可以在将过滤条件在读取一条记录之后执行以判断该记录是否需要返回给调用者,在Parquet做了更进一步的优化,优化的方法时对每一个Row Group的每一个Column Chunk在存储的时候都计算对应的统计信息,包括该Column Chunk的最大值、最小值和空值个数。通过这些统计值和该列的过滤条件可以判断该Row Group是否需要扫描。另外Parquet未来还会增加诸如Bloom Filter和Index等优化数据,更加有效的完成谓词下推。

在使用Parquet的时候可以通过如下两种策略提升查询性能:1、类似于关系数据库的主键,对需要频繁过滤的列设置为有序的,这样在导入数据的时候会根据该列的顺序存储数据,这样可以最大化的利用最大值、最小值实现谓词下推。2、减小行组大小和页大小,这样增加跳过整个行组的可能性,但是此时需要权衡由于压缩和编码效率下降带来的I/O负载。

参考地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
4-byte magic number "PAR1"
<Column 1 Chunk 1 + Column Metadata>
<Column 2 Chunk 1 + Column Metadata>
...
<Column N Chunk 1 + Column Metadata>
<Column 1 Chunk 2 + Column Metadata>
<Column 2 Chunk 2 + Column Metadata>
...
<Column N Chunk 2 + Column Metadata>
...
<Column 1 Chunk M + Column Metadata>
<Column 2 Chunk M + Column Metadata>
...
<Column N Chunk M + Column Metadata>
File Metadata
4-byte length in bytes of file metadata
4-byte magic number "PAR1"