暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用 DuckDB 快速进行 Top N 聚合和过滤

alitrack 2024-10-28
637

使用 DuckDB 快速进行 Top N 聚合和过滤

作者: Alex Monahan


原文:https://duckdb.org/2024/10/25/topn.html


摘录: 通过 min
max
min_by
 和 max_by
 聚合函数中的 N
 参数,更快速、轻松地找到前 N 个值或过滤到最新的 N 行。

DuckDB Top N

Top N 概述

在数据分析中,通常我们会关注某个指标的最高或最低几行数据。如果我们想获取整个数据集中最高或最低的 N
 行,SQL 的标准 ORDER BY
 和 LIMIT
 子句可以按照所选的指标进行排序,并返回前 N
 行。例如,使用 TPC-H 基准[1]的规模因子 1(SF1)数据集:

INSTALL tpch;
LOAD tpch;
-- 生成一个示例的 TPC-H 数据集
CALL dbgen(sf = 1);

-- 返回按 l_shipdate 排序的最近三行
FROM lineitem 
ORDER BY 
    l_shipdate DESC 
LIMIT 3;

l_orderkeyl_partkeyl_shipmodel_comment
3545286116MAILwake according to the u
41395616402SHIPusual patterns. carefull
48458110970TRUCKccounts maintain. dogged accounts a

这种方法可以帮助我们快速找到数据集中最早或最新的几行记录,或者发现某个指标中的异常值。

另一种常用的方法是查询一列或多列的最大值或最小值,以计算汇总统计信息。虽然这能帮助发现一些异常值,但每列的异常数据可能不同,因此它解决的是不同的问题。DuckDB 的 COLUMNS
 表达式能够帮助我们计算所有列的最大值。

FROM lineitem
SELECT 
    MAX(COLUMNS(*));

本文中的查询广泛使用了 DuckDB 的 FROM-first 语法[2]。此语法允许交换 FROM
 和 SELECT
 子句的位置,甚至可以完全省略 SELECT
 子句。

l_orderkeyl_partkeyl_shipmodel_comment
60000020000TRUCKzzle. slyly

然而,这两种方法只能回答某些特定类型的问题。在很多情况下,分析的目标是了解分组内的前 N 个值。例如,如何查询每个供应商的最近 10 个发货?SQL 的 LIMIT
 子句无法解决这种需求。我们可以将这种分析称为分组内的 Top N。

这种分析方法常用于探索新数据集。使用场景包括提取每组的最近几行记录,或找到每组中最极端的几个值。继续以发货为例,我们可以查询每个零件号的最近 10 次发货,或查找每个客户价格最高的 5 笔订单。

传统的分组内 Top N

在大多数数据库中,过滤分组内 Top N 的常用方法是使用 窗口函数[3] 结合 公用表表达式 (CTE)[4]。这种方法在 DuckDB 中同样适用。以下示例查询返回每个供应商的最近 3 次发货:

WITH ranked_lineitem AS (
    FROM lineitem 
    SELECT 
        *,
        row_number() OVER
            (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
            AS my_ranking
)
FROM ranked_lineitem
WHERE 
    my_ranking <= 3;

l_orderkeyl_partkeyl_suppkeyl_shipmodel_commentmy_ranking
13106881695327081RAILully final exc1
9105611945617081SHIPly bold excuses caj2
44068831795297081RAILtions. furious3
4792742520957106RAILonic, ironic courts. final deposits sleep1
40102121220817106MAILaccounts cajole finally ironic instruc2
1220871945967106TRUCKregular requests above t3

在 DuckDB 中,这可以通过 QUALIFY 子句[5] 简化。QUALIFY
 类似于 WHERE
 子句,但专门用于处理窗口函数的结果。这样就不需要使用 CTE,同时可以返回相同的结果。

FROM lineitem 
SELECT
    *,
    row_number() OVER
        (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
        AS my_ranking
QUALIFY
    my_ranking <= 3;

这种方法当然是可行的!但它的缺点是什么呢?尽管查询只需要最近的 3 个发货记录,它仍然必须对所有发货记录进行排序才能获取这 3 个结果。DuckDB 的排序复杂度是 O(kn)
,这是因为 DuckDB 创新的 基数排序算法[6],但这仍然高于 DuckDB 哈希聚合[7] 的 O(n)
。此外,排序比聚合需要更多的内存资源。

DuckDB 中的 Top N

DuckDB 1.1 引入了一个全新功能,可以显著简化 Top N 计算并提升其性能。具体来说,现在 min
max
min_by
 和 max_by
 函数都可以接受一个可选参数 N
。如果 N
 大于默认的 1,它们将返回包含前 N 个值的数组。

举个简单的例子,我们可以查询最近的 3 个发货日期:

FROM lineitem
SELECT
    max(l_shipdate, 3AS top_3_shipdates;

top_3_shipdates
[1998-12-01, 1998-12-01, 1998-12-01]

DuckDB 中按列的 Top N

得益于 COLUMNS
 表达式,Top N 的使用变得更加灵活,我们可以同时检索每列中的前 3 个值。我们称这种分析为按列的 Top N 分析。若使用传统的 SQL 实现这个分析,会非常复杂!每一列都需要一个子查询或窗口函数,而在 DuckDB 中,只需:

FROM lineitem
SELECT 
    max(COLUMNS(*), 3AS "top_3_\0";

top_3_l_orderkeytop_3_l_partkeytop_3_l_shipmodetop_3_l_comment
[600000, 600000, 599975][20000, 20000, 20000][TRUCK, TRUCK, TRUCK][zzle. slyly, zzle. quickly bold a, zzle. pinto beans boost slyly slyly fin]

DuckDB 中的按组 Top N

利用新加入的 N
 参数,如何有效加快分组内的 Top N 分析呢?

如果你想直接查看最终结果,请点击这里![8]

我们将结合 DuckDB 的三个其他 SQL 功能来实现这一点:

  • • max_by 函数[9](也称为 arg_max

  • • unnest 函数[10]

  • • 自动将整个行打包到一个 STRUCT 列[11]

max
 函数用于返回某列的最大值(现在可以返回前 N 个最大值)。与此不同,max_by
 函数可以在一列中找到最大值,然后从同一行的其他列中检索值。例如,以下查询将返回每个供应商最近发货的 3 个订单 ID:

FROM lineitem
SELECT 
    l_suppkey,
    max_by(l_orderkey, l_shipdate, 3AS recent_orders
GROUP BY
    l_suppkey;

l_suppkeyrecent_orders
2992[233573, 3597639, 3060227]
8516[4675968, 5431174, 4626530]
3205[3844610, 4396966, 3405255]
2152[1672000, 4209601, 3831138]
1880[4852999, 2863747, 1650084]

max_by
 函数是一个聚合函数,它可以利用 DuckDB 的快速哈希聚合,而无需排序。不同于按 l_shipdate
 排序,max_by
 函数只需扫描数据集一次,并跟踪前 N 个 l_shipdate
 值,再返回与这些最晚发货日期相对应的订单 ID。DuckDB 的基数排序算法每个字节需要遍历数据集一次,而这种方法只需扫描一次即可,极大地提高了效率。例如,如果按 64 位整数排序,传统算法需要遍历 8 次数据集,而使用这种方法只需 1 次!在性能比较
部分包含了一个简单的基准测试。

不过,这个 SQL 查询仍然有一些不足。返回的结果是以 LIST
 形式呈现,而不是单独的行。幸运的是,unnest
 函数可以将 LIST
 拆分为单独的行:

FROM lineitem
SELECT 
    l_suppkey,
    unnest(
        max_by(l_orderkey, l_shipdate, 3)
    ) AS recent_orders
GROUP BY
    l_suppkey;

l_suppkeyrecent_orders
2576930468
25762248354
25763640711
55594022148
55591675680
55594976259

另一个问题是,无法轻松查看与返回的 l_orderkey
 相关联的 l_shipdate
。这个查询只返回了一个列,而分组内的 Top N 分析通常需要整个数据行。

幸运的是,DuckDB 允许我们将整个数据行视为一个单独的列来引用!通过引用表名(如这里的 lineitem
),而不是具体的列名,max_by
 函数可以检索所有列。

FROM lineitem
SELECT 
    l_suppkey,
    unnest(
        max_by(lineitem, l_shipdate, 3)
    ) AS recent_orders
GROUP BY
    l_suppkey;

l_suppkeyrecent_orders
5411{‘l_orderkey’: 2543618, ‘l_partkey’: 105410, ‘l_suppkey’: 5411, …
5411{‘l_orderkey’: 580547, ‘l_partkey’: 130384, ‘l_suppkey’: 5411, …
5411{‘l_orderkey’: 3908642, ‘l_partkey’: 132897, ‘l_suppkey’: 5411, …
90{‘l_orderkey’: 4529697, ‘l_partkey’: 122553, ‘l_suppkey’: 90, …
90{‘l_orderkey’: 4473346, ‘l_partkey’: 160089, ‘l_suppkey’: 90, …

通过将 STRUCT
 进一步拆分为单独的列,我们可以使输出格式更易读,类似于原始数据集。

最终的分组内 Top N 查询

只需再向 UNNEST
 传递一个参数,它就可以将结果递归拆分为单独的列。在这种情况下,UNNEST
 将运行两次:第一次将每个 LIST
 转换为单独的行,第二次将每个 STRUCT
 转换为单独的列。可以排除 l_suppkey
 列,因为它已经包含在输出中。

FROM lineitem
SELECT 
    unnest(
        max_by(lineitem, l_shipdate, 3),
        recursive := 1
    ) AS recent_orders
GROUP BY
    l_suppkey;

l_orderkeyl_partkeyl_suppkeyl_shipinstructl_shipmodel_comment
123472668756876COLLECT CODFOBcajole carefully slyly fin
2584193518656876TAKE BACK RETURNTRUCKfully regular deposits at the q
2375524268756876DELIVER IN PERSONAIRnusual ideas. busily bold deposi
5751559956268136




NONESHIPers nag fluffily against the spe

31034571031158136TAKE BACK RETURNFOBy slyly express warthogs-- unusual, e
57591051781358136COLLECT CODTRUCKes. regular pinto beans haggle.

此方法在去重任务中也非常有用,例如在事件表中查找每组的最新事件,只需设置 N=1
 即可。

我们现在有了一种使用聚合函数计算每个组的前 N 行的方法!那么,这种方法的效率如何呢?

性能比较

我们将比较使用 QUALIFY
 方法和 max_by
 方法来解决分组内的 Top N 问题。两种查询我们之前已经讨论过,下面再次列出供参考。

QUALIFY 查询

FROM lineitem 
SELECT 
    *,
    row_number() OVER
        (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
        AS my_ranking
QUALIFY 
    my_ranking <= 3;

max_by 查询

FROM lineitem 
SELECT 
    unnest(
        max_by(lineitem, l_shipdate, 3),
        recursive := 1
    )
GROUP BY 
    l_suppkey;

在查询执行期间,我们还将启动一个后台线程,定期测量 DuckDB 的内存使用情况。这是通过内置的表函数 duckdb_memory()
 完成的,并包括内存和临时磁盘使用信息。基准测试使用的小型 Python 脚本附在结果之后。基准测试使用的设备是一台 16 GB RAM 的 M1 MacBook Pro。

SFmax_memory
指标QUALIFY
max_by
提升
1默认总时间0.58 s0.24 s2.4×
5默认总时间6.15 s1.26 s4.9×
1036GB总时间36.8 s25.4 s1.4×
1默认内存使用1.7 GB0.2 GB8.5×
5默认内存使用7.9 GB1.5 GB5.3×
1036GB内存使用15.7 GB17.1 GB0.9×

可以看到,在每种情况下,max_by
 方法都更快,有时快近 5 倍!不过,随着数据量增大,max_by
 方法的表现开始减弱,相较于 QUALIFY
 方法的优势变小。

在某些情况下,max_by
 方法的内存使用量也显著降低。但随着规模的增加,max_by
 方法的内存使用量也变得更大,因为 l_suppkey
 的唯一值数量随着规模因子线性增长。内存使用的增加可能解释了性能下降的原因,因为两种算法都接近机器的最大 RAM 并开始交换到磁盘。

为了减少内存压力,我们可以通过减少线程数量重新运行规模因子 10(SF10)的基准测试(分别使用 4 个线程和 1 个线程)。我们继续使用 36 GB 的 max_memory
 设置。之前的 SF10 结果(使用全部 10 个线程)供参考。

SF线程指标QUALIFY
max_by
提升
1010总时间36.8 s25.4 s1.4×
104总时间49.0 s21.0 s2.3×
101总时间115.7 s12.7 s9.1×
1010内存使用15.7 GB17.1 GB0.9×
104内存使用15.9 GB17.3 GB0.9×
101内存使用14.5 GB1.8 GB8.1×

max_by
 方法如此高效,以至于即使只使用 1 个线程,其速度也远远快于使用 10 个线程的 QUALIFY
 方法!减少线程数也非常有效地降低了内存使用(减少了近 10 倍)。

那么,什么时候应该使用哪种方法呢?正如在数据库中常见的情况一样,视情况而定! 如果内存受限,特别是在调节线程数避免交换到磁盘的情况下,max_by
 可能会带来显著优势。但如果组的数量与行数接近,考虑使用 QUALIFY
,因为此时 max_by
 方法的内存效率会降低。

Python 基准测试脚本

import duckdb 
import pandas as pd 
from threading import Thread
from time import sleep
from datetime import datetime
from os import remove 

def check_memory(stop_function, filepath, sleep_seconds, results_dict):
    print("Starting background thread")
    background_con = duckdb.connect(filepath)
    max_memory = 0
    max_temporary_storage = 0
    while True:
        if stop_function():
            break
        # Profile the memory
        memory_profile = background_con.sql("""
            FROM duckdb_memory() 
            SELECT 
                tag,
                round(memory_usage_bytes / (1000000), 0)::bigint AS memory_usage_mb,
                round(temporary_storage_bytes / (1000000), 0)::bigint AS temporary_storage_mb;
            """
).df()
        print(memory_profile)
        total_memory = background_con.sql("""
            FROM memory_profile 
            select 
                sum(memory_usage_mb) AS total_memory_usage_mb,
                sum(temporary_storage_mb) AS total_temporary_storage_mb
            """
).fetchall()
        print('Current memory:', total_memory[0][0])
        print('Current temporary_storage:', total_memory[0][1])

        if total_memory[0][0] > max_memory:
            max_memory = total_memory[0][0]
        if total_memory[0][1] > max_temporary_storage:
            max_temporary_storage = total_memory[0][1]
        
        print('Maximum memory:', max_memory)
        print('Maximum temporary_storage:', max_temporary_storage)

        sleep(sleep_seconds)
    
    results_dict["max_memory"] = max_memory
    results_dict["max_temporary_storage"] = max_temporary_storage
    background_con.close()

    return 

def query_and_profile(filepath, sql):
    con = duckdb.connect(filepath)
    con.sql("set max_memory='36GB'")

    results_dict = {}
    stop_threads = False
    background_memory_thread = Thread(target=check_memory,
                                      args=(lambda : stop_threads, filepath, 0.1, results_dict, ))
    background_memory_thread.start()

    print("Starting query:")
    start_time = datetime.now()
    results_df = con.sql(sql).df()
    results_dict["total_time_seconds"] = (datetime.now() - start_time).total_seconds()
    print(results_df.head(10))

    stop_threads = True
    background_memory_thread.join()
    con.close()

    return results_dict

filepath = './arg_max_check_duckdb_memory_v3.duckdb'

con = duckdb.connect(filepath)
print("Begin initial tpch load")
con.sql("""call dbgen(sf=1);""")
con.close()

sql = """
    FROM lineitem 
    SELECT 
        UNNEST(
            max_by(lineitem, l_shipdate, 3),
            recursive := 1
        )
    GROUP BY 
        l_suppkey
;"""


max_by_results = query_and_profile(filepath, sql)

sql = """
    FROM lineitem 
    SELECT 
        *,
        row_number() OVER
            (PARTITION BY l_suppkey ORDER BY l_shipdate DESC)
            AS my_ranking
    QUALIFY 
        my_ranking <= 3
;"""


qualify_results = query_and_profile(filepath, sql)

print('max_by_results:', max_by_results)
print('qualify_results:', qualify_results)

remove(filepath)

结论

DuckDB 现在提供了一种便捷的方法来计算 min
 和 max
 聚合函数的 Top N 值,以及它们的高级版本 min_by
 和 max_by
。这些功能非常易于使用,且支持更复杂的分析场景,例如按列计算 Top N 或按组计算 Top N。此外,与使用窗口函数的方法相比,这些方法在性能上也可能带来显著优势。

我们期待听到你们如何创造性地使用这个新功能!

祝你分析愉快!

引用链接

[1]
 TPC-H 基准: https://duckdb.org/docs/extensions/tpch
[2]
 FROM-first 语法: https://duckdb.org/docs/sql/query_syntax/from#from-first-syntax
[3]
 窗口函数: https://duckdb.org/docs/sql/functions/window_functions
[4]
 公用表表达式 (CTE): https://duckdb.org/docs/sql/query_syntax/with
[5]
 QUALIFY 子句: https://duckdb.org/docs/sql/query_syntax/qualify
[6]
 基数排序算法: https://duckdb.org/2021/08/27/external-sorting.html
[7]
 DuckDB 哈希聚合: https://duckdb.org/2024/03/29/external-aggregation.html
[8]
 请点击这里!: #the-final-top-n-by-group-query
[9]
 max_by 函数: https://duckdb.org/docs/sql/functions/aggregates#max_byarg-val-n
[10]
 unnest 函数: https://duckdb.org/docs/sql/query_syntax/unnest
[11]
 STRUCT 列: https://duckdb.org/docs/sql/data_types/struct#creating-structs


文章转载自alitrack,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论