这篇文章是我们的 Parquet 和 ClickHouse 博客系列的第二部分。在这篇文章中,我们将更详细地探讨 Parquet 格式,重点介绍使用 ClickHouse 读写文件时要考虑的关键细节。对于更有经验的 Parquet 用户,我们还讨论了用户在编写 Parquet 文件时可以进行的优化,以最大限度地提高压缩率,以及使用并行化优化读取性能的一些最新进展。
对于我们的示例,我们继续使用英国房价数据集。其中包含 1995 年至撰写本文时英格兰和威尔士房地产支付价格的数据。我们以 Parquet 格式将其分发到公共 s3 存储桶中s3://datasets-documentation/uk-house-prices/parquet/。我们使用 ClickHouse Local 读取和写入本地和 S3 托管的 Parquet 文件。ClickHouse Local 是 ClickHouse 的易于使用的版本,非常适合需要使用 SQL 对本地和远程文件执行快速处理而无需安装完整数据库服务器的开发人员。最重要的是,ClickHouse Local 和 ClickHouse Server 共享相同的 Parquet 读写代码,因此任何细节都适用于两者。有关更多详细信息,请参阅本系列的上一篇文章。
Parquet 格式依赖于三个分层相关的主要概念:行组、列块和页面。
在较高级别上,Parquet 文件被分为多个行组。行组最多包含 N 行,在编写本文时确定。
在每个行组中,每列都有一个块 - 每个块都包含其各自列的数据,从而提供列方向。虽然理论上每个列块的行数可能不同,但为了简化,我们假设这是相同的。这些块由页面组成。原始数据存储在这些数据页中。每个数据页的最大大小是可以配置的,但目前在 ClickHouse 中尚未公开,它使用默认值 1MB。数据块在写入之前也会被压缩(见下文)。
我们将这些概念和逻辑结构可视化如下。出于说明目的,我们假设行组大小为 10,总共 19 行,每行三列。我们假设我们的数据页大小导致每页一致三个值(除了第一行组的每页上的最后一个块):

有两种类型的页面:数据页面和字典页面。当字典编码应用于数据页中的值时,就会产生字典页。对于 ClickHouse,写入 Parquet 文件时默认启用此功能。对于已字典编码的数据页,它们前面将是字典页。这实际上意味着字典和数据页交替,如下所示。可以对字典页面大小施加限制,默认为 1MB。如果超过此值,写入器将恢复写入包含值的纯数据页。

以上是 Parquet 格式的简化。对于寻求更深入理解的用户,我们建议阅读有关重复和定义级别的内容,因为这些对于充分理解数据页如何与数组和嵌套类型以及空值相关的工作方式也至关重要。
请注意,虽然 Parquet 被正式描述为基于列,但行组的引入和列块的顺序存储意味着它通常被描述为基于混合的格式。这使得该格式的读者可以轻松地实现投影和下推,如下所述。
除了存储数据值之外,Parquet 格式还包括元数据。这被写在文件末尾的页脚中,以方便单遍写入(更有效),并包括对行组、块和页面的引用。

此外,可以在行组级别包括统计数据,描述每列的最小和最大值。这允许读取器考虑这些信息与任何谓词(如果在SQL中查询,为WHERE子句)相对比,进一步跳过列块。这个谓词下推目前还没有在ClickHouse中实现,但是计划添加[1][2][3]。最后,官方规范允许使用单独的元数据文件引用多个Parquet文件,例如,每列一个。ClickHouse目前不支持这个,尽管我们计划增加支持。
min_insert_block_size_bytes:设置块中的最小字节数,较小的块会被压缩为较大的块。这有效地通过字节大小限制了行数。默认为 256MB 未压缩。 min_insert_block_size_rows:传递给箭头客户端的块中的最小行数,较小尺寸的块被压缩为较大尺寸的块。默认为 1m。 output_format_parquet_row_group_size:行组大小默认为 1m。
字典编码构建了一个列的所有不同值的字典,并用字典中的相应索引替换原始值。这对于低基数列特别有效,并帮助在列类型之间提供稳定的性能。这可以应用于数字和字节数组为基础的类型。 字典编码的值、布尔值以及重复和定义级别都是用Run Length Encoded(RLE)进行编码的。这通过用一个发生次数和一个表示重复的数字来替换连续重复的值来压缩列。在这种情况下,当更多的相同值连续出现时,可以实现更高的压缩。列的基数因此也直接影响压缩效率。注意,这个RLE与Bit Packing结合使用,以最小化所需的存储位数。 Delta编码可以应用于整数值。在这种情况下,存储值之间的差值而不是实际值(除了第一个值)。当连续的值有小的或恒定的变化时,这特别有效,例如以毫秒为粒度的DateTime值,因为差值占用更少的位。 现在也有其他编码技术可用,包括Byte Stream Split。
./clickhouse local --query "SELECT * FROM file('house_prices.parquet', ParquetMetadata) FORMAT PrettyJSONEachRow"{"num_columns": "14","num_rows": "28113076","num_row_groups": "53","format_version": "2.6","metadata_size": "65503","total_uncompressed_size": "365131681","total_compressed_size": "255323648","columns": [{"name": "price","path": "price","max_definition_level": "0","max_repetition_level": "0","physical_type": "INT32","logical_type": "Int(bitWidth=32, isSigned=false)","compression": "LZ4","total_uncompressed_size": "53870143","total_compressed_size": "54070424","space_saved": "-0.3718%","encodings": ["RLE_DICTIONARY","PLAIN","RLE"]},...],"row_groups": [{"num_columns": "14","num_rows": "1000000","total_uncompressed_size": "10911703","total_compressed_size": "8395071","columns": [{"name": "price","path": "price","total_compressed_size": "1823285","total_uncompressed_size": "1816162","have_statistics": 1,"statistics": {"num_values": "1000000","null_count": "0","distinct_count": null,"min": "50","max": "6250000"}},...]},...]}
INSERT INTO FUNCTION file('house_prices.<format>.<compression>') SELECT * FROM uk_price_paid

如图所示,即使没有压缩,Parquet 也仅比使用 BZIP2 的基于文本的 CSV 最佳替代方案大 40%。通过 Brotli 压缩,Parquet 比此压缩的 CSV 小 30%。虽然 BZIP2 实现了文本格式的最佳压缩率,但这种压缩方法也相当慢。如下图所示,我们显示了上述时间(3 次运行中最快的一次)。虽然这取决于 ClickHouse 实现和硬件 (Mac Pro 2021),但 Parquet 的写入速度可与所有压缩文本格式相媲美,并且压缩时的开销最小。与采用 BZIP2 的 CSV 相比,Parquet 的最佳压缩 (Brotli) 速度几乎快 10 倍。对于 Parquet,这种压缩技术的速度也是其他格式的两倍,ZSTD 和 GZIP 提供与 BZIP2 的 CSV 类似的压缩,速度超过 25 倍。尽管 Parquet 编码目前是单线程的(与文本格式不同),但该格式在写入性能方面具有优势。我们预计未来对并行 Parquet 编码的改进将对这些写入时序产生重大影响。

INSERT INTO FUNCTION file('house_prices.native.zst') SELECT *FROM uk_price_paid-rw-r--r-- 1 dalemcdiarmid wheel 189M 26 Apr 14:44 house_prices.native.zst
利用设置max_bytes_before_external_sort。如果它被设置为0(默认值),则禁用外部排序。如果启用它,当需要排序的数据量达到指定的字节数时,收集的数据将被排序并转储到临时文件中。这将比内存中的排序慢得多。这个值应该谨慎设置,且小于max_memory_usage设置。 如果ORDER BY表达式具有与表排序键相一致的前缀,您可以使用optimize_read_in_order设置。默认情况下,此设置是启用的,这意味着利用了数据的排序,避免了内存问题。注意,禁用此设置有性能优势,特别是对于具有大LIMIT的查询,以及在WHERE条件匹配之前需要读取许多行的情况。
INSERT INTO FUNCTION file('house_prices-ordered.parquet') SELECT *FROM uk_price_paidORDER BYpostcode1 ASC,postcode2 ASC,addr1 ASC,addr2 ASC0 rows in set. Elapsed: 38.812 sec. Processed 28.11 million rows, 2.68 GB (724.34 thousand rows/s., 69.07 MB/s.)-rw-r--r-- 1 dalemcdiarmid wheel 148M 26 Apr 13:42 house_prices-ordered.parquet-rw-r--r-- 1 dalemcdiarmid wheel 183M 26 Apr 13:44 house_prices.parquet
./clickhouse local --query "SELECT * FROM file('house_prices.parquet', ParquetMetadata) FORMAT PrettyJSONEachRow"{"num_columns": "14","num_rows": "28113076","num_row_groups": "53","format_version": "2.6","metadata_size": "65030","total_uncompressed_size": "365131618","total_compressed_size": "191777958","columns": [{"name": "postcode1","path": "postcode1","max_definition_level": "0","max_repetition_level": "0","physical_type": "BYTE_ARRAY","logical_type": "None","compression": "GZIP","total_uncompressed_size": "191694","total_compressed_size": "105224","space_saved": "45.11%","encodings": ["RLE_DICTIONARY","PLAIN","RLE"]},INSERT INTO FUNCTION file('house_prices-ordered.parquet') SELECT *FROM uk_price_paidORDER BYpostcode1 ASC,postcode2 ASC,addr1 ASC,addr2 ASC./clickhouse local --query "SELECT * FROM file('house_prices-ordered.parquet', ParquetMetadata) FORMAT PrettyJSONEachRow"{"num_columns": "14","num_rows": "28113076","num_row_groups": "51","format_version": "2.6","metadata_size": "62305","total_uncompressed_size": "241299186","total_compressed_size": "155551987","columns": [{"name": "postcode1","path": "postcode1","max_definition_level": "0","max_repetition_level": "0","physical_type": "BYTE_ARRAY","logical_type": "None","compression": "GZIP","total_uncompressed_size": "29917","total_compressed_size": "19563","space_saved": "34.61%","encodings": ["RLE_DICTIONARY","PLAIN","RLE"]},
从历史上看,ClickHouse中的Parquet文件的读取是一个顺序操作。这限制了性能,这意味着希望并行读取的用户需要拆分他们的Parquet文件 —— 当在路径中提供一个全局模式时,ClickHouse会在一组文件之间并行读取。通过使用ClickHouse Local在一个文件和29个文件(按年分区)上计算每年的平均价格,可以显示下面的区别。这里的所有文件都使用了先前显示的ORDER BY键和GZIP进行编写,我们使用三次运行中最快的一次。
SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80)FROM file('house_prices.parquet')GROUP BY yearORDER BY year ASC┌─year─┬──price─┬─bar(round(avg(price)), 0, 1000000, 80)─┐│ 1995 │ 67937 │ █████▍ ││ 1996 │ 71513 │ █████▋ ││ 1997 │ 78538 │ ██████▎ ││ 1998 │ 85443 │ ██████▊ ││ 1999 │ 96040 │ ███████▋ ││ 2000 │ 107490 │ ████████▌ ││ 2001 │ 118892 │ █████████▌ ││ 2002 │ 137957 │ ███████████ ││ 2003 │ 155895 │ ████████████▍ ││ 2004 │ 178891 │ ██████████████▎ ││ 2005 │ 189361 │ ███████████████▏ ││ 2006 │ 203533 │ ████████████████▎ ││ 2007 │ 219376 │ █████████████████▌ ││ 2008 │ 217043 │ █████████████████▎ ││ 2009 │ 213423 │ █████████████████ ││ 2010 │ 236115 │ ██████████████████▉ ││ 2011 │ 232807 │ ██████████████████▌ ││ 2012 │ 238385 │ ███████████████████ ││ 2013 │ 256926 │ ████████████████████▌ ││ 2014 │ 280024 │ ██████████████████████▍ ││ 2015 │ 297285 │ ███████████████████████▊ ││ 2016 │ 313548 │ █████████████████████████ ││ 2017 │ 346521 │ ███████████████████████████▋ ││ 2018 │ 351037 │ ████████████████████████████ ││ 2019 │ 352769 │ ████████████████████████████▏ ││ 2020 │ 377149 │ ██████████████████████████████▏ ││ 2021 │ 383034 │ ██████████████████████████████▋ ││ 2022 │ 391590 │ ███████████████████████████████▎ ││ 2023 │ 365523 │ █████████████████████████████▏ │└──────┴────────┴────────────────────────────────────────┘29 rows in set. Elapsed: 0.182 sec. Processed 14.75 million rows, 118.03 MB (81.18 million rows/s., 649.41 MB/s.)SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80)FROM file('house_prices_*.parquet')GROUP BY yearORDER BY year ASC…29 rows in set. Elapsed: 0.116 sec. Processed 26.83 million rows, 214.63 MB (232.17 million rows/s., 1.86 GB/s.)
SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80)FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_all.parquet')GROUP BY yearORDER BY year ASC29 rows in set. Elapsed: 18.017 sec. Processed 28.11 million rows, 224.90 MB (1.56 million rows/s., 12.48 MB/s.)//with changesSET input_format_parquet_preserve_order = 0SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80)FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_all.parquet')GROUP BY yearORDER BY year ASC29 rows in set. Elapsed: 8.428 sec. Processed 26.69 million rows, 213.49 MB (3.17 million rows/s., 25.33 MB/s.)
clickhouse@dclickhouse % ./clickhouse local --query "SELECT num_row_groups FROM file('house_prices.parquet', ParquetMetadata)"53
请参阅前面的内容,了解在使用 ClickHouse 编写 Parquet 文件时如何使用设置来控制行组的数量。
SET input_format_parquet_preserve_order = 0SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 1000000, 80)FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices-1-row-group.parquet')GROUP BY yearORDER BY year ASC29 rows in set. Elapsed: 19.367 sec. Processed 26.64 million rows, 213.12 MB (1.05 million rows/s., 8.40 MB/s.)
反之,行组的高数量远远超过核心的数量也可能对性能产生不利影响。这可能会导致许多微小的读取,相对于实际的解码工作,增加IO延迟的量。如果只读取少数列,这种情况将最为明显,因为读取的碎片化。当选择所有列时,可以减轻这一问题,因为相邻的读取会被合并。使用s3和URL函数(参见“关于S3的简短说明”)进行读取时,这些行为最为明显。因此需要平衡并行解码和高效读取。行组的大小在100 KB到10MB的范围内可以被视为合理的大小。通过进一步的测试,我们希望我们在此处的建议可以更具具体性。
跟踪表随时间变化的模式演变。 创建定义特定版本的数据的快照的能力。这些版本可以被查询,允许用户在代之间"时间旅行"。 支持快速回滚到数据的先前版本。 自动分区文件以帮助过滤 - 历史上,用户需要手工完成这个容易出错的任务,并在更新中维护它。 查询引擎可以使用的元数据提供高级计划和过滤。
利用任何WHERE子句中的条件的元数据可能会显著提高包含范围条件的查询的性能,例如日期过滤。这些元数据还可以用于改进特定的聚合函数,如计数。 在写Parquet文件时,我们目前不允许用户控制用于列的编码,而是使用合理的默认值。未来的改进将允许我们利用其他压缩技术,如Delta用于日期时间和数值的编码,或为特定列关闭字典编码。 Arrow API在写文件时提供了几个设置,包括限制字典大小的能力。我们欢迎用户提供哪些值得公开的想法。 并行化读取是一个持续的努力,有多种低级改进可能[1][2]。我们预期并行化编码对写性能有很大的影响。 我们对Parquet的支持正在不断改进。除了在这篇博客中突出的一些行为上的不一致性外,我们还计划其他的改进,如改进逻辑类型的支持[1]和确保Null列被正确识别[2]。
总体而言,理解和优化Parquet格式与ClickHouse的交互是确保最佳性能和效率的关键。随着技术的不断发展和改进,用户和开发人员应保持警觉,不断适应和探索新的方法来提高他们的数据处理和查询能力。




