暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

时序数据合并场景加速分析和实现 - 复合索引,窗口分组查询加速,变态递归加速

digoal 2016-11-28
143
TAG 18

作者

digoal

日期

2016-11-28

标签

PostgreSQL , 数据合并 , 时序数据 , 复合索引 , 窗口查询


背景

在很多场景中,都会有数据合并的需求。

例如记录了表的变更明细(insert,update,delete),需要合并明细,从明细中快速取到每个PK的最新值。

又比如有很多传感器,不断的在上报数据,要快速的取出每个传感器的最新状态。

对于这种需求,可以使用窗口查询,但是如何加速,如何快速的取出批量数据呢?

这个是有优化的门道的。

传感器例子

假设传感器数据不断的上报,用户需要查询当前最新的,每个传感器上报的值。

创建测试表如下,

``` create unlogged table sort_test( id serial8 primary key, -- 主键 c2 int, -- 传感器ID c3 int -- 传感器值 );

写入1000万传感器测试数据 postgres=# insert into sort_test (c2,c3) select random()100000, random()100 from generate_series(1,10000000); INSERT 0 10000000 ```

查询语句如下

``` postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1; QUERY PLAN


Subquery Scan on t (cost=10001512045.83..10001837045.83 rows=50000 width=16) (actual time=23865.363..44033.984 rows=100001 loops=1) Output: t.id, t.c2, t.c3 Filter: (t.rn = 1) Rows Removed by Filter: 9899999 Buffers: shared hit=54055, temp read=93801 written=93801 -> WindowAgg (cost=10001512045.83..10001712045.83 rows=10000000 width=24) (actual time=23865.351..41708.460 rows=10000000 loops=1) Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?) Buffers: shared hit=54055, temp read=93801 written=93801 -> Sort (cost=10001512045.83..10001537045.83 rows=10000000 width=16) (actual time=23865.335..31540.089 rows=10000000 loops=1) Output: sort_test.id, sort_test.c2, sort_test.c3 Sort Key: sort_test.c2, sort_test.id DESC Sort Method: external merge Disk: 254208kB Buffers: shared hit=54055, temp read=93801 written=93801 -> Seq Scan on public.sort_test (cost=10000000000.00..10000154055.00 rows=10000000 width=16) (actual time=0.021..1829.135 rows=10000000 loops=1) Output: sort_test.id, sort_test.c2, sort_test.c3 Buffers: shared hit=54055 Planning time: 0.194 ms Execution time: 44110.560 ms (18 rows) ```

优化手段,新增复合索引,避免SORT,注意,id需要desc

postgres=# create index sort_test_1 on sort_test(c2,id desc); CREATE INDEX

优化后的SQL性能

``` postgres=# explain (analyze,verbose,timing,costs,buffers) select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1; QUERY PLAN


Subquery Scan on t (cost=0.43..542565.80 rows=50000 width=16) (actual time=0.048..33844.843 rows=100001 loops=1) Output: t.id, t.c2, t.c3 Filter: (t.rn = 1) Rows Removed by Filter: 9899999 Buffers: shared hit=10029020 read=1 -> WindowAgg (cost=0.43..417564.59 rows=10000097 width=24) (actual time=0.042..30490.662 rows=10000000 loops=1) Output: sort_test.id, sort_test.c2, sort_test.c3, row_number() OVER (?) Buffers: shared hit=10029020 read=1 -> Index Scan using sort_test_1 on public.sort_test (cost=0.43..242562.89 rows=10000097 width=16) (actual time=0.030..18347.482 rows=10000000 loops=1) Output: sort_test.id, sort_test.c2, sort_test.c3 Buffers: shared hit=10029020 read=1 Planning time: 0.216 ms Execution time: 33865.321 ms (13 rows) ```

如果被取出的数据需要后续的处理,可以使用游标,分批获取,因为不需要显示sort,所以分批获取速度很快,从而加快整个的处理速度。

\timing begin; declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1; postgres=# fetch 100 from c1; id | c2 | c3 ---------+----+----- 9962439 | 0 | 93 9711199 | 1 | 52 9987709 | 2 | 65 9995611 | 3 | 34 9998766 | 4 | 12 9926693 | 5 | 81 .... 9905064 | 98 | 44 9991592 | 99 | 99 (100 rows) Time: 31.408 ms -- 很快就返回

优化前,需要显示SORT,所以使用游标并不能加速,拿到第一条记录是在SORT后的。

``` drop index sort_test_1;

begin; declare c1 cursor for select id,c2,c3 from (select id,c2,c3,row_number() over(partition by c2 order by id desc) rn from sort_test) t where rn=1;

postgres=# fetch 100 from c1; .... Time: 22524.783 ms -- sort结束后才开始返回,很慢 ```

增量合并数据同步例子

类似Oracle的物化视图,apply时,对于同一条记录的update并不需要每次update的中间过程都需要执行,只需要执行最后一次的。

因此,也可以利用类似的操作手段,分组取最后一条,

``` create extension hstore;

create unlogged table sort_test1( id serial8 primary key, -- 主键 c2 int, -- 目标表PK c3 text, -- insert or update or delete c4 hstore -- row );

create index idx_sort_test1_1 on sort_test1(c2,id desc);

select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1;

postgres=# explain select c2,c3,c4 from (select c2,c3,c4,row_number() over(partition by c2 order by id desc) rn from sort_test1) t where rn=1; QUERY PLAN


Subquery Scan on t (cost=0.15..46.25 rows=4 width=68) Filter: (t.rn = 1) -> WindowAgg (cost=0.15..36.50 rows=780 width=84) -> Index Scan using idx_sort_test1_1 on sort_test1 (cost=0.15..22.85 rows=780 width=76) (4 rows) ```

稀疏列的变态优化方法

我们看到前面的优化手段,其实只是消除了SORT,并没有消除扫描的BLOCK数。

如果分组很少时,即稀疏列,还有一种更变态的优化方法,递归查询。

优化方法与这篇文档类似,

《distinct xx和count(distinct xx)的变态递归优化方法》

例子

``` create type r as (c2 int, c3 int);

postgres=# explain (analyze,verbose,timing,costs,buffers) with recursive skip as (
(
select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1) )
union all
(
select ( select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1) ) from skip s where (s.r).c2 is not null ) -- 这里的where (s.r).c2 is not null 一定要加, 否则就死循环了. )
select (t.r).c2, (t.r).c3 from skip t where t.* is not null;

                                                                                       QUERY PLAN

CTE Scan on skip t (cost=302.97..304.99 rows=100 width=8) (actual time=0.077..4184.770 rows=100001 loops=1) Output: (t.r).c2, (t.r).c3 Filter: (t.* IS NOT NULL) Rows Removed by Filter: 1 Buffers: shared hit=800947, temp written=476 CTE skip -> Recursive Union (cost=0.91..302.97 rows=101 width=32) (actual time=0.066..3970.580 rows=100002 loops=1) Buffers: shared hit=800947 -> Nested Loop (cost=0.91..2.95 rows=1 width=32) (actual time=0.064..0.066 rows=1 loops=1) Output: ROW(sort_test_1.c2, sort_test_1.c3)::r Buffers: shared hit=8 -> HashAggregate (cost=0.47..0.48 rows=1 width=8) (actual time=0.044..0.044 rows=1 loops=1) Output: sort_test_2.id Group Key: sort_test_2.id Buffers: shared hit=4 -> Limit (cost=0.43..0.46 rows=1 width=12) (actual time=0.036..0.036 rows=1 loops=1) Output: sort_test_2.id, sort_test_2.c2 Buffers: shared hit=4 -> Index Only Scan using sort_test_1 on public.sort_test sort_test_2 (cost=0.43..267561.43 rows=10000000 width=12) (actual time=0.034..0.034 rows=1 loops=1) Output: sort_test_2.id, sort_test_2.c2 Index Cond: (sort_test_2.c2 IS NOT NULL) Heap Fetches: 1 Buffers: shared hit=4 -> Index Scan using sort_test_pkey on public.sort_test sort_test_1 (cost=0.43..2.45 rows=1 width=16) (actual time=0.011..0.012 rows=1 loops=1) Output: sort_test_1.id, sort_test_1.c2, sort_test_1.c3 Index Cond: (sort_test_1.id = sort_test_2.id) Buffers: shared hit=4 -> WorkTable Scan on skip s (cost=0.00..29.80 rows=10 width=32) (actual time=0.037..0.038 rows=1 loops=100002) Output: (SubPlan 1) Filter: ((s.r).c2 IS NOT NULL) Rows Removed by Filter: 0 Buffers: shared hit=800939 SubPlan 1 -> Nested Loop (cost=0.92..2.96 rows=1 width=32) (actual time=0.034..0.035 rows=1 loops=100001) Output: ROW(sort_test.c2, sort_test.c3)::r Buffers: shared hit=800939 -> HashAggregate (cost=0.49..0.50 rows=1 width=8) (actual time=0.023..0.023 rows=1 loops=100001) Output: t_1.id Group Key: t_1.id Buffers: shared hit=400401 -> Limit (cost=0.43..0.48 rows=1 width=12) (actual time=0.021..0.021 rows=1 loops=100001) Output: t_1.id, t_1.c2 Buffers: shared hit=400401 -> Index Only Scan using sort_test_1 on public.sort_test t_1 (cost=0.43..133557.76 rows=3333333 width=12) (actual time=0.019..0.019 rows=1 loops=100001) Output: t_1.id, t_1.c2 Index Cond: ((t_1.c2 > (s.r).c2) AND (t_1.c2 IS NOT NULL)) Heap Fetches: 100000 Buffers: shared hit=400401 -> Index Scan using sort_test_pkey on public.sort_test (cost=0.43..2.45 rows=1 width=16) (actual time=0.006..0.007 rows=1 loops=100000) Output: sort_test.id, sort_test.c2, sort_test.c3 Index Cond: (sort_test.id = t_1.id) Buffers: shared hit=400538 Planning time: 0.970 ms Execution time: 4209.026 ms (54 rows) ```

依旧支持快速的FETCH

``` postgres=# begin; BEGIN Time: 0.079 ms postgres=# declare cur cursor for with recursive skip as (
(
select (c2,c3)::r as r from sort_test where id in (select id from sort_test where c2 is not null order by c2,id desc limit 1) )
union all
(
select ( select (c2,c3)::r as r from sort_test where id in (select id from sort_test t where t.c2>(s.r).c2 and t.c2 is not null order by c2,id desc limit 1) ) from skip s where (s.r).c2 is not null ) -- 这里的where (s.r).c2 is not null 一定要加, 否则就死循环了. )
select (t.r).c2, (t.r).c3 from skip t where t.* is not null; DECLARE CURSOR Time: 1.240 ms postgres=# fetch 100 from cur; r


(0,93) (1,52) (2,65) ..... (97,78) (98,44) (99,99) (100 rows)

Time: 4.314 ms ```

使用变态的递归优化,性能提升了10倍,仅仅花了4秒,完成了1000万记录的筛选。

PostgreSQL 许愿链接

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

9.9元购买3个月阿里云RDS PostgreSQL实例

PostgreSQL 解决方案集合

德哥 / digoal's github - 公益是一辈子的事.

digoal's wechat

文章转载自digoal,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论