Apache Parquet的官方描述为我们提供了一个关于其设计和属性的出色概述:“Apache Parquet是一个开源的、面向列的数据文件格式,旨在实现高效的数据存储和检索。”
与ClickHouse的MergeTree格式类似,数据是面向列存储的。这实际上意味着同一列的值存储在一起,与面向行的文件格式(例如,Avro)形成对比,后者的行数据是共同存放的。
这种数据方向,加上支持多种适合现代处理器管道的编码技术,允许实现高压缩率和高效存储属性。列方向还最大限度地减少了读取的数据量,因为只从存储中读取必要的列以进行分组等分析查询。当与高压缩率和每列提供的内部统计数据(存储为元数据)结合时,Parquet还承诺快速检索。
这后一个属性在很大程度上取决于完全利用元数据、任何查询引擎中的并行化程度,以及存储数据时所做的决策。我们将在下面讨论与ClickHouse的关系。
在我们深入了解Parquet的内部结构之前,我们将介绍ClickHouse如何支持写入和读取这种格式。
在下面的示例中,我们假设我们的房价数据已导出到一个名为house_prices.parquet的文件,并使用ClickHouse Local进行查询,除非另有说明。
可以使用DESCRIBE语句和文件表函数来识别任何文件的模式:
DESCRIBE TABLE file('house_prices.parquet')┌─name──────┬─type─────────────┬│ price │ Nullable(UInt32) ││ date │ Nullable(UInt16) ││ postcode1 │ Nullable(String) ││ postcode2 │ Nullable(String) ││ type │ Nullable(Int8) ││ is_new │ Nullable(UInt8) ││ duration │ Nullable(Int8) ││ addr1 │ Nullable(String) ││ addr2 │ Nullable(String) ││ street │ Nullable(String) ││ locality │ Nullable(String) ││ town │ Nullable(String) ││ district │ Nullable(String) ││ county │ Nullable(String) |└───────────┴──────────────────┴
SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 2000000, 100)FROM file('house_prices.parquet')WHERE town = 'LONDON'GROUP BY yearORDER BY year ASC┌─year─┬───price─┬─bar(round(avg(price)), 0, 2000000, 100)──────────────┐│ 1995 │ 109120 │ █████▍ ││ 1996 │ 118672 │ █████▉ ││ 1997 │ 136530 │ ██████▊ ││ 1998 │ 153014 │ ███████▋ ││ 1999 │ 180639 │ █████████ ││ 2000 │ 215860 │ ██████████▊ ││ 2001 │ 232998 │ ███████████▋ ││ 2002 │ 263690 │ █████████████▏ ││ 2003 │ 278423 │ █████████████▉ ││ 2004 │ 304666 │ ███████████████▏ ││ 2005 │ 322886 │ ████████████████▏ ││ 2006 │ 356189 │ █████████████████▊ ││ 2007 │ 404065 │ ████████████████████▏ ││ 2008 │ 420741 │ █████████████████████ ││ 2009 │ 427767 │ █████████████████████▍ ││ 2010 │ 480329 │ ████████████████████████ ││ 2011 │ 496293 │ ████████████████████████▊ ││ 2012 │ 519473 │ █████████████████████████▉ ││ 2013 │ 616182 │ ██████████████████████████████▊ ││ 2014 │ 724107 │ ████████████████████████████████████▏ ││ 2015 │ 792274 │ ███████████████████████████████████████▌ ││ 2016 │ 843685 │ ██████████████████████████████████████████▏ ││ 2017 │ 983673 │ █████████████████████████████████████████████████▏ ││ 2018 │ 1016702 │ ██████████████████████████████████████████████████▊ ││ 2019 │ 1041915 │ ████████████████████████████████████████████████████ ││ 2020 │ 1060936 │ █████████████████████████████████████████████████████││ 2021 │ 968152 │ ████████████████████████████████████████████████▍ ││ 2022 │ 967439 │ ████████████████████████████████████████████████▎ ││ 2023 │ 830317 │ █████████████████████████████████████████▌ │└──────┴─────────┴──────────────────────────────────────────────────────┘29 rows in set. Elapsed: 0.625 sec. Processed 28.11 million rows, 750.65 MB (44.97 million rows/s., 1.20 GB/s.)
SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 2000000, 100)FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_all.parquet')WHERE town = 'LONDON'GROUP BY yearORDER BY year ASC┌─year─┬───price─┬─bar(round(avg(price)), 0, 2000000, 100)───────────────┐│ 1995 │ 109120 │ █████▍ ││ 1996 │ 118672 │ █████▉ ││ 1997 │ 136530 │ ██████▊ ││ 1998 │ 153014 │ ███████▋ ││ 1999 │ 180639 │ █████████ │...29 rows in set. Elapsed: 2.069 sec. Processed 28.11 million rows, 750.65 MB (13.59 million rows/s., 362.87 MB/s.)
这两个功能都支持通配符模式,允许选择文件的子集。正如我们稍后将讨论的,这不仅仅提供了跨文件查询的优势 - 主要是读取的并行化。下面我们将查询限制在所有带有年份后缀的house_prices_文件上 - 这假设我们每年有一个文件(见下文)。
SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 2000000, 100)FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')WHERE town = 'LONDON'GROUP BY yearORDER BY year ASC29 rows in set. Elapsed: 3.387 sec. Processed 28.11 million rows, 750.65 MB (8.30 million rows/s., 221.66 MB/s.)
用户还应该了解s3Cluster功能,该功能允许从集群中的多个节点并行处理文件 - 尤其与ClickHouse Cloud用户相关。在需要读取许多文件的情况下,这可以提供显著的性能优势(允许工作分布)。
在ClickHouse中将表数据写入Parquet文件可以通过几种方式实现。这里的首选选项通常取决于你是使用ClickHouse Server还是ClickHouse Local。在下面的示例中,我们假设uk_price_paid表已经填充了数据。关于加载这些数据的详细信息,请看这里。
使用INTO FUNCTION子句,我们可以使用与读取相同的文件函数写parquet。这最适合于ClickHouse Local,文件可以写入本地文件系统的任何位置。ClickHouse服务器将它们写入由配置参数user_files_path指定的目录。
INSERT INTO FUNCTION file('house_prices.parquet') SELECT *FROM uk_price_paid0 rows in set. Elapsed: 12.490 sec. Processed 28.11 million rows, 1.32 GB (2.25 million rows/s., 105.97 MB/s.)dalemcdiarmid@dales-mac houseprices % ls -lh house_prices.parquet-rw-r----- 1 dalemcdiarmid staff 243M 17 Apr 16:59 house_prices.parquet
SELECT *FROM uk_price_paidINTO OUTFILE 'house_prices.parquet'28113076 rows in set. Elapsed: 15.690 sec. Processed 28.11 million rows, 2.47 GB (1.79 million rows/s., 157.47 MB/s.)clickhouse@clickhouse-mac ~ % ls -lh house_prices.parquet-rw-r--r-- 1 dalemcdiarmid staff 291M 17 Apr 18:23 house_prices.parquet
clickhouse@clickhouse-mac ~ % ./clickhouse client --query "SELECT * FROM uk_price_paid FORMAT Parquet" > house_price.parquet
通常,客户端存储是有限的。在这些情况下,用户可能希望将文件写入对象存储,例如 S3 和 GCS。这些都通过用于读取的相同s3 函数来支持。请注意,需要凭证 - 在下面的示例中,我们将它们作为函数参数传递,但也支持 IAM 凭证。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') SELECT *FROM uk_price_paidLIMIT 10000 rows in set. Elapsed: 0.726 sec. Processed 2.00 thousand rows, 987.86 KB (2.75 thousand rows/s., 1.36 MB/s.)
INSERT INTO FUNCTION file('house_prices_{_partition_id}.parquet') PARTITION BY toYear(date) SELECT * FROM uk_price_paid0 rows in set. Elapsed: 23.281 sec. Processed 28.11 million rows, 1.32 GB (1.21 million rows/s., 56.85 MB/s.)clickhouse@clickhouse-mac houseprices % ls house_prices_*house_prices_1995.parquet house_prices_2001.parquet house_prices_2007.parquet house_prices_2013.parquet house_prices_2019.parquethouse_prices_1996.parquet house_prices_2002.parquet house_prices_2008.parquet house_prices_2014.parquet house_prices_2020.parquethouse_prices_1997.parquet house_prices_2003.parquet house_prices_2009.parquet house_prices_2015.parquet house_prices_2021.parquethouse_prices_1998.parquet house_prices_2004.parquet house_prices_2010.parquet house_prices_2016.parquet house_prices_2022.parquethouse_prices_1999.parquet house_prices_2005.parquet house_prices_2011.parquet house_prices_2017.parquet house_prices_2023.parquethouse_prices_2000.parquet house_prices_2006.parquet house_prices_2012.parquet house_prices_2018.parquet
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample_{_partition_id}.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') PARTITION BY toYear(date) SELECT *FROM uk_price_paidLIMIT 10000 rows in set. Elapsed: 2.247 sec. Processed 2.00 thousand rows, 987.86 KB (889.92 rows/s., 439.56 KB/s.)
结合以上内容,我们可以使用ClickHouse Local在格式之间转换文件。在下面的示例中,我们使用ClickHouse Local和文件函数读取CSV格式的本地副本房价数据集,该数据集包含所有的2800万行,然后将其写入S3作为Parquet。正如之前所示,这些文件按年对数据进行了分区。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample_{_partition_id}.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') PARTITION BY toYear(date) SELECT *FROM file('house_prices.csv')0 rows in set. Elapsed: 223.864 sec. Processed 28.11 million rows, 5.87 GB (125.58 thousand rows/s., 26.24 MB/s.)

前面的所有示例都假设用户正在查询本地和 S3 托管的文件以进行临时分析,或将数据从 ClickHouse 迁移到 Parquet 进行分发。虽然 Parquet 是一种与数据存储无关的文件分发格式,但它的查询效率不如 ClickHouse MergeTree 表,后者能够利用索引和特定于格式的优化。考虑以下查询的性能,该查询使用本地 Parquet 文件和具有推荐架构的MergeTree 表(均在 Macbook Pro 2021 上运行)计算伦敦房产的每年平均价格:
SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 2000000, 100)FROM file('house_prices.parquet')WHERE town = 'LONDON'GROUP BY yearORDER BY year ASC29 rows in set. Elapsed: 0.625 sec. Processed 28.11 million rows, 750.65 MB (44.97 million rows/s., 1.20 GB/s.)SELECTtoYear(toDate(date)) AS year,round(avg(price)) AS price,bar(price, 0, 2000000, 100)FROM uk_price_paidWHERE town = 'LONDON'GROUP BY yearORDER BY year ASC29 rows in set. Elapsed: 0.022 sec.
这里的差异是巨大的,并且解释了为什么对于需要实时性能的大型数据集,用户将 Parquet 文件加载到 ClickHouse 中。下面我们假设该uk_price_paid表已预先创建。
可以使用该子句从客户端计算机加载文件INFILE。clickhouse-client以下查询从本地客户端的文件系统执行并读取数据。
INSERT INTO uk_price_paid FROM INFILE 'house_price.parquet' FORMAT Parquet28113076 rows in set. Elapsed: 15.412 sec. Processed 28.11 million rows, 1.27 GB (1.82 million rows/s., 82.61 MB/s.)
如果用户的数据分布在多个 Parquet 文件中,此方法还支持 glob 模式。clickhouse-client或者,可以使用以下参数将 Parquet 文件重定向到--query:
clickhouse@clickhouse-mac ~ % ~/clickhouse client --query "INSERT INTO uk_price_paid FORMAT Parquet" < house_price.parquet
由于客户端存储通常有限,并且随着基于对象存储的数据湖的兴起,Parquet 文件通常驻留在 S3 或 GCS 上。同样,我们可以使用 s3 函数读取这些文件,并使用INSERT INTO SELECT子句将它们的数据插入到 MergeTree 表中。在下面的示例中,我们利用 glob 模式读取按年份分区的文件,并在三节点 ClickHouse Cloud 集群上执行此查询。
INSERT INTO uk_price_paid SELECT *FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')0 rows in set. Elapsed: 12.028 sec. Processed 28.11 million rows, 4.64 GB (2.34 million rows/s., 385.96 MB/s.)
SET parallel_distributed_insert_select=1INSERT INTO uk_price_paid SELECT *FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')0 rows in set. Elapsed: 6.425 sec. Processed 28.11 million rows, 4.64 GB (4.38 million rows/s., 722.58 MB/s.)
在本系列的第一部分中,我们介绍了 Parquet 格式,并展示了如何使用 ClickHouse 查询和编写该格式。在下一篇文章中,我们将更详细地探讨该格式,进一步探索 ClickHouse 集成以及最近的性能改进和优化查询的技巧。




