使用 DuckDB 快速进行 Top N 聚合和过滤
作者: Alex Monahan
原文:https://duckdb.org/2024/10/25/topn.html
摘录: 通过 min
、max
、min_by
和 max_by
聚合函数中的 N
参数,更快速、轻松地找到前 N 个值或过滤到最新的 N 行。

Top N 概述
在数据分析中,通常我们会关注某个指标的最高或最低几行数据。如果我们想获取整个数据集中最高或最低的 N
行,SQL 的标准 ORDER BY
和 LIMIT
子句可以按照所选的指标进行排序,并返回前 N
行。例如,使用 TPC-H 基准[1]的规模因子 1(SF1)数据集:
INSTALL tpch;
LOAD tpch;
-- 生成一个示例的 TPC-H 数据集
CALL dbgen(sf = 1);
-- 返回按 l_shipdate 排序的最近三行
FROM lineitem
ORDER BY
l_shipdate DESC
LIMIT 3;
| l_orderkey | l_partkey | … | l_shipmode | l_comment |
| 354528 | 6116 | … | wake according to the u | |
| 413956 | 16402 | … | SHIP | usual patterns. carefull |
| 484581 | 10970 | … | TRUCK | ccounts maintain. dogged accounts a |
这种方法可以帮助我们快速找到数据集中最早或最新的几行记录,或者发现某个指标中的异常值。
另一种常用的方法是查询一列或多列的最大值或最小值,以计算汇总统计信息。虽然这能帮助发现一些异常值,但每列的异常数据可能不同,因此它解决的是不同的问题。DuckDB 的 COLUMNS
表达式能够帮助我们计算所有列的最大值。
FROM lineitem
SELECT
MAX(COLUMNS(*));
本文中的查询广泛使用了 DuckDB 的 FROM-first 语法[2]。此语法允许交换
FROM
和SELECT
子句的位置,甚至可以完全省略SELECT
子句。
| l_orderkey | l_partkey | … | l_shipmode | l_comment |
| 600000 | 20000 | … | TRUCK | zzle. slyly |
然而,这两种方法只能回答某些特定类型的问题。在很多情况下,分析的目标是了解分组内的前 N 个值。例如,如何查询每个供应商的最近 10 个发货?SQL 的 LIMIT
子句无法解决这种需求。我们可以将这种分析称为分组内的 Top N。
这种分析方法常用于探索新数据集。使用场景包括提取每组的最近几行记录,或找到每组中最极端的几个值。继续以发货为例,我们可以查询每个零件号的最近 10 次发货,或查找每个客户价格最高的 5 笔订单。
传统的分组内 Top N
在大多数数据库中,过滤分组内 Top N 的常用方法是使用 窗口函数[3] 结合 公用表表达式 (CTE)[4]。这种方法在 DuckDB 中同样适用。以下示例查询返回每个供应商的最近 3 次发货:
WITH ranked_lineitem AS (
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
)
FROM ranked_lineitem
WHERE
my_ranking <= 3;
| l_orderkey | l_partkey | l_suppkey | … | l_shipmode | l_comment | my_ranking |
| 1310688 | 169532 | 7081 | … | RAIL | ully final exc | 1 |
| 910561 | 194561 | 7081 | … | SHIP | ly bold excuses caj | 2 |
| 4406883 | 179529 | 7081 | … | RAIL | tions. furious | 3 |
| 4792742 | 52095 | 7106 | … | RAIL | onic, ironic courts. final deposits sleep | 1 |
| 4010212 | 122081 | 7106 | … | accounts cajole finally ironic instruc | 2 | |
| 1220871 | 94596 | 7106 | … | TRUCK | regular requests above t | 3 |
| … | … | … | … | … | … | … |
在 DuckDB 中,这可以通过 QUALIFY 子句[5] 简化。QUALIFY
类似于 WHERE
子句,但专门用于处理窗口函数的结果。这样就不需要使用 CTE,同时可以返回相同的结果。
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3;
这种方法当然是可行的!但它的缺点是什么呢?尽管查询只需要最近的 3 个发货记录,它仍然必须对所有发货记录进行排序才能获取这 3 个结果。DuckDB 的排序复杂度是 O(kn)
,这是因为 DuckDB 创新的 基数排序算法[6],但这仍然高于 DuckDB 哈希聚合[7] 的 O(n)
。此外,排序比聚合需要更多的内存资源。
DuckDB 中的 Top N
DuckDB 1.1 引入了一个全新功能,可以显著简化 Top N 计算并提升其性能。具体来说,现在 min
、max
、min_by
和 max_by
函数都可以接受一个可选参数 N
。如果 N
大于默认的 1,它们将返回包含前 N 个值的数组。
举个简单的例子,我们可以查询最近的 3 个发货日期:
FROM lineitem
SELECT
max(l_shipdate, 3) AS top_3_shipdates;
| top_3_shipdates |
| [1998-12-01, 1998-12-01, 1998-12-01] |
DuckDB 中按列的 Top N
得益于 COLUMNS
表达式,Top N 的使用变得更加灵活,我们可以同时检索每列中的前 3 个值。我们称这种分析为按列的 Top N 分析。若使用传统的 SQL 实现这个分析,会非常复杂!每一列都需要一个子查询或窗口函数,而在 DuckDB 中,只需:
FROM lineitem
SELECT
max(COLUMNS(*), 3) AS "top_3_\0";
| top_3_l_orderkey | top_3_l_partkey | … | top_3_l_shipmode | top_3_l_comment |
| [600000, 600000, 599975] | [20000, 20000, 20000] | … | [TRUCK, TRUCK, TRUCK] | [zzle. slyly, zzle. quickly bold a, zzle. pinto beans boost slyly slyly fin] |
DuckDB 中的按组 Top N
利用新加入的 N
参数,如何有效加快分组内的 Top N 分析呢?
如果你想直接查看最终结果,请点击这里![8]
我们将结合 DuckDB 的三个其他 SQL 功能来实现这一点:
• max_by 函数[9](也称为
arg_max
)• unnest 函数[10]
• 自动将整个行打包到一个 STRUCT 列[11]
max
函数用于返回某列的最大值(现在可以返回前 N 个最大值)。与此不同,max_by
函数可以在一列中找到最大值,然后从同一行的其他列中检索值。例如,以下查询将返回每个供应商最近发货的 3 个订单 ID:
FROM lineitem
SELECT
l_suppkey,
max_by(l_orderkey, l_shipdate, 3) AS recent_orders
GROUP BY
l_suppkey;
| l_suppkey | recent_orders |
| 2992 | [233573, 3597639, 3060227] |
| 8516 | [4675968, 5431174, 4626530] |
| 3205 | [3844610, 4396966, 3405255] |
| 2152 | [1672000, 4209601, 3831138] |
| 1880 | [4852999, 2863747, 1650084] |
| … | … |
max_by
函数是一个聚合函数,它可以利用 DuckDB 的快速哈希聚合,而无需排序。不同于按 l_shipdate
排序,max_by
函数只需扫描数据集一次,并跟踪前 N 个 l_shipdate
值,再返回与这些最晚发货日期相对应的订单 ID。DuckDB 的基数排序算法每个字节需要遍历数据集一次,而这种方法只需扫描一次即可,极大地提高了效率。例如,如果按 64 位整数排序,传统算法需要遍历 8 次数据集,而使用这种方法只需 1 次!在性能比较
部分包含了一个简单的基准测试。
不过,这个 SQL 查询仍然有一些不足。返回的结果是以 LIST
形式呈现,而不是单独的行。幸运的是,unnest
函数可以将 LIST
拆分为单独的行:
FROM lineitem
SELECT
l_suppkey,
unnest(
max_by(l_orderkey, l_shipdate, 3)
) AS recent_orders
GROUP BY
l_suppkey;
| l_suppkey | recent_orders |
| 2576 | 930468 |
| 2576 | 2248354 |
| 2576 | 3640711 |
| 5559 | 4022148 |
| 5559 | 1675680 |
| 5559 | 4976259 |
| … | … |
另一个问题是,无法轻松查看与返回的 l_orderkey
相关联的 l_shipdate
。这个查询只返回了一个列,而分组内的 Top N 分析通常需要整个数据行。
幸运的是,DuckDB 允许我们将整个数据行视为一个单独的列来引用!通过引用表名(如这里的 lineitem
),而不是具体的列名,max_by
函数可以检索所有列。
FROM lineitem
SELECT
l_suppkey,
unnest(
max_by(lineitem, l_shipdate, 3)
) AS recent_orders
GROUP BY
l_suppkey;
| l_suppkey | recent_orders |
| 5411 | {‘l_orderkey’: 2543618, ‘l_partkey’: 105410, ‘l_suppkey’: 5411, … |
| 5411 | {‘l_orderkey’: 580547, ‘l_partkey’: 130384, ‘l_suppkey’: 5411, … |
| 5411 | {‘l_orderkey’: 3908642, ‘l_partkey’: 132897, ‘l_suppkey’: 5411, … |
| 90 | {‘l_orderkey’: 4529697, ‘l_partkey’: 122553, ‘l_suppkey’: 90, … |
| 90 | {‘l_orderkey’: 4473346, ‘l_partkey’: 160089, ‘l_suppkey’: 90, … |
| … | … |
通过将 STRUCT
进一步拆分为单独的列,我们可以使输出格式更易读,类似于原始数据集。
最终的分组内 Top N 查询
只需再向 UNNEST
传递一个参数,它就可以将结果递归拆分为单独的列。在这种情况下,UNNEST
将运行两次:第一次将每个 LIST
转换为单独的行,第二次将每个 STRUCT
转换为单独的列。可以排除 l_suppkey
列,因为它已经包含在输出中。
FROM lineitem
SELECT
unnest(
max_by(lineitem, l_shipdate, 3),
recursive := 1
) AS recent_orders
GROUP BY
l_suppkey;
| l_orderkey | l_partkey | l_suppkey | … | l_shipinstruct | l_shipmode | l_comment |
| 1234726 | 6875 | 6876 | … | COLLECT COD | FOB | cajole carefully slyly fin |
| 2584193 | 51865 | 6876 | … | TAKE BACK RETURN | TRUCK | fully regular deposits at the q |
| 2375524 | 26875 | 6876 | … | DELIVER IN PERSON | AIR | nusual ideas. busily bold deposi |
| 5751559 | 95626 | 8136 | ||||
| … | NONE | SHIP | ers nag fluffily against the spe | |||
| 3103457 | 103115 | 8136 | … | TAKE BACK RETURN | FOB | y slyly express warthogs-- unusual, e |
| 5759105 | 178135 | 8136 | … | COLLECT COD | TRUCK | es. regular pinto beans haggle. |
| … | … | … | … | … | … | … |
此方法在去重任务中也非常有用,例如在事件表中查找每组的最新事件,只需设置
N=1
即可。
我们现在有了一种使用聚合函数计算每个组的前 N 行的方法!那么,这种方法的效率如何呢?
性能比较
我们将比较使用 QUALIFY
方法和 max_by
方法来解决分组内的 Top N 问题。两种查询我们之前已经讨论过,下面再次列出供参考。
QUALIFY 查询
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3;
max_by 查询
FROM lineitem
SELECT
unnest(
max_by(lineitem, l_shipdate, 3),
recursive := 1
)
GROUP BY
l_suppkey;
在查询执行期间,我们还将启动一个后台线程,定期测量 DuckDB 的内存使用情况。这是通过内置的表函数 duckdb_memory()
完成的,并包括内存和临时磁盘使用信息。基准测试使用的小型 Python 脚本附在结果之后。基准测试使用的设备是一台 16 GB RAM 的 M1 MacBook Pro。
| SF | max_memory | 指标 | QUALIFY | max_by | 提升 |
| 1 | 默认 | 总时间 | 0.58 s | 0.24 s | 2.4× |
| 5 | 默认 | 总时间 | 6.15 s | 1.26 s | 4.9× |
| 10 | 36GB | 总时间 | 36.8 s | 25.4 s | 1.4× |
| 1 | 默认 | 内存使用 | 1.7 GB | 0.2 GB | 8.5× |
| 5 | 默认 | 内存使用 | 7.9 GB | 1.5 GB | 5.3× |
| 10 | 36GB | 内存使用 | 15.7 GB | 17.1 GB | 0.9× |
可以看到,在每种情况下,max_by
方法都更快,有时快近 5 倍!不过,随着数据量增大,max_by
方法的表现开始减弱,相较于 QUALIFY
方法的优势变小。
在某些情况下,max_by
方法的内存使用量也显著降低。但随着规模的增加,max_by
方法的内存使用量也变得更大,因为 l_suppkey
的唯一值数量随着规模因子线性增长。内存使用的增加可能解释了性能下降的原因,因为两种算法都接近机器的最大 RAM 并开始交换到磁盘。
为了减少内存压力,我们可以通过减少线程数量重新运行规模因子 10(SF10)的基准测试(分别使用 4 个线程和 1 个线程)。我们继续使用 36 GB 的 max_memory
设置。之前的 SF10 结果(使用全部 10 个线程)供参考。
| SF | 线程 | 指标 | QUALIFY | max_by | 提升 |
| 10 | 10 | 总时间 | 36.8 s | 25.4 s | 1.4× |
| 10 | 4 | 总时间 | 49.0 s | 21.0 s | 2.3× |
| 10 | 1 | 总时间 | 115.7 s | 12.7 s | 9.1× |
| 10 | 10 | 内存使用 | 15.7 GB | 17.1 GB | 0.9× |
| 10 | 4 | 内存使用 | 15.9 GB | 17.3 GB | 0.9× |
| 10 | 1 | 内存使用 | 14.5 GB | 1.8 GB | 8.1× |
max_by
方法如此高效,以至于即使只使用 1 个线程,其速度也远远快于使用 10 个线程的 QUALIFY
方法!减少线程数也非常有效地降低了内存使用(减少了近 10 倍)。
那么,什么时候应该使用哪种方法呢?正如在数据库中常见的情况一样,视情况而定! 如果内存受限,特别是在调节线程数避免交换到磁盘的情况下,max_by
可能会带来显著优势。但如果组的数量与行数接近,考虑使用 QUALIFY
,因为此时 max_by
方法的内存效率会降低。
Python 基准测试脚本
import duckdb
import pandas as pd
from threading import Thread
from time import sleep
from datetime import datetime
from os import remove
def check_memory(stop_function, filepath, sleep_seconds, results_dict):
print("Starting background thread")
background_con = duckdb.connect(filepath)
max_memory = 0
max_temporary_storage = 0
while True:
if stop_function():
break
# Profile the memory
memory_profile = background_con.sql("""
FROM duckdb_memory()
SELECT
tag,
round(memory_usage_bytes / (1000000), 0)::bigint AS memory_usage_mb,
round(temporary_storage_bytes / (1000000), 0)::bigint AS temporary_storage_mb;
""").df()
print(memory_profile)
total_memory = background_con.sql("""
FROM memory_profile
select
sum(memory_usage_mb) AS total_memory_usage_mb,
sum(temporary_storage_mb) AS total_temporary_storage_mb
""").fetchall()
print('Current memory:', total_memory[0][0])
print('Current temporary_storage:', total_memory[0][1])
if total_memory[0][0] > max_memory:
max_memory = total_memory[0][0]
if total_memory[0][1] > max_temporary_storage:
max_temporary_storage = total_memory[0][1]
print('Maximum memory:', max_memory)
print('Maximum temporary_storage:', max_temporary_storage)
sleep(sleep_seconds)
results_dict["max_memory"] = max_memory
results_dict["max_temporary_storage"] = max_temporary_storage
background_con.close()
return
def query_and_profile(filepath, sql):
con = duckdb.connect(filepath)
con.sql("set max_memory='36GB'")
results_dict = {}
stop_threads = False
background_memory_thread = Thread(target=check_memory,
args=(lambda : stop_threads, filepath, 0.1, results_dict, ))
background_memory_thread.start()
print("Starting query:")
start_time = datetime.now()
results_df = con.sql(sql).df()
results_dict["total_time_seconds"] = (datetime.now() - start_time).total_seconds()
print(results_df.head(10))
stop_threads = True
background_memory_thread.join()
con.close()
return results_dict
filepath = './arg_max_check_duckdb_memory_v3.duckdb'
con = duckdb.connect(filepath)
print("Begin initial tpch load")
con.sql("""call dbgen(sf=1);""")
con.close()
sql = """
FROM lineitem
SELECT
UNNEST(
max_by(lineitem, l_shipdate, 3),
recursive := 1
)
GROUP BY
l_suppkey
;"""
max_by_results = query_and_profile(filepath, sql)
sql = """
FROM lineitem
SELECT
*,
row_number() OVER
(PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
AS my_ranking
QUALIFY
my_ranking <= 3
;"""
qualify_results = query_and_profile(filepath, sql)
print('max_by_results:', max_by_results)
print('qualify_results:', qualify_results)
remove(filepath)
结论
DuckDB 现在提供了一种便捷的方法来计算 min
和 max
聚合函数的 Top N 值,以及它们的高级版本 min_by
和 max_by
。这些功能非常易于使用,且支持更复杂的分析场景,例如按列计算 Top N 或按组计算 Top N。此外,与使用窗口函数的方法相比,这些方法在性能上也可能带来显著优势。
我们期待听到你们如何创造性地使用这个新功能!
祝你分析愉快!
引用链接
[1]
TPC-H 基准: https://duckdb.org/docs/extensions/tpch[2]
FROM-first 语法: https://duckdb.org/docs/sql/query_syntax/from#from-first-syntax[3]
窗口函数: https://duckdb.org/docs/sql/functions/window_functions[4]
公用表表达式 (CTE): https://duckdb.org/docs/sql/query_syntax/with[5]
QUALIFY 子句: https://duckdb.org/docs/sql/query_syntax/qualify[6]
基数排序算法: https://duckdb.org/2021/08/27/external-sorting.html[7]
DuckDB 哈希聚合: https://duckdb.org/2024/03/29/external-aggregation.html[8]
请点击这里!: #the-final-top-n-by-group-query[9]
max_by 函数: https://duckdb.org/docs/sql/functions/aggregates#max_byarg-val-n[10]
unnest 函数: https://duckdb.org/docs/sql/query_syntax/unnest[11]
STRUCT 列: https://duckdb.org/docs/sql/data_types/struct#creating-structs




