CloudberryDB | 第5期 | 执行器算子ShareInputScan
WITH查询是PgSQL支持的高级特性之一,即CTE,当然继承于PgSQL的Cloudberry也支持该特性。WITH查询在复杂查询中定义一个辅助语句,比如:
WITH t as (SELECT generate_series(1,10)) SELECT * from t;
这样不必每次都执行SELECT generate_series,而是WITH物化后,只从物化中读取即可。在复杂的查询场景下可以提升性能。
继承自GreenPlum的Cloudberry在分布式场景下,由Motion算子进行数据传输,并且由Motion算子将执行计划树分割成slice(slice内不需要数据传输)。不同slice在同一个segment上执行时,由不同进程提供服务。WITH物化的内容在同一个slice时,可以在同一个进程中进行共享,那么不同slice时也需要进行共享。这就是ShareInputScan算子要干的活儿。
ShareInputScan采用生产者-消费者模式进行共享,生产者完成WITH数据的生成,即将他放到tuplestore中;消费者从tuplestore中取数据。跨slice时,即不同进程共享时,将tuplestore持久化到磁盘临时文件,在同一个segment的共享内存中进行通信,从而到磁盘临时文件中读取,继而完成数据的共享。下面解析ShareInputScan的实现原理。
1、ShareInputScan与ShareInputScanState

同一个slice下使用local_state来管理共享信息:
1) 消费者的ShareInputScan无子节点,因为他是从共享tuplestore中取数据的,在优化器中会将消费这的ShareInputScan的左分支裁剪掉
2) 同一个进程的消费者ShareInputScan直接从estate->sharenode链表取,初始化后放到它的ShareInputScanState的local_state中
3) local_state中childState是WITH子查询的执行计划节点状态树,注意这个是最底层的WITH,也就是驱动WITH。比如在嵌套WITH中的t2:WITH t1 as(select XXX),t2 as(select *from t1)select * from t2;
跨slice时使用shareinput_Xslice_reference *ref来管理共享信息:
1) 跨slice不同进程时,消费者进程自己仍旧需要初始化shareinput_local_state,然后放到它的ShareInputScanState的local_state中
2) 跨slice时,第一个消费者初始化好shareinput_Xslice_state并由tag作为hash key将其存放到共享内存中的shareinput_Xslice_hash哈希表中,该哈希表存放该segment所有的shareinput_Xslice_state。后面的消费者从该hash表取处shareinput_Xslice_state并将它传递给后续消费者的ref中
ShareInputScan算子的初始化完成上面信息的场景,生成ShareInputScanState,以供执行时使用。
2、跨slice下共享
如下图所示:
消费者:
1)shareinput_reader_waitready函数中从ref.xslice_state中的ready中原子获取其值,该值此时为false,需要生产者准备好后才将他原子更新为true
2)ready为false,所以进入ConditionVariableSleep,可以看到会进入ConditionVariablePrepareToSleep将消费者进程放到共享内存的cv的wakeup链表中,以便生产者准备好后可以唤醒该进程,然后就会返回
3)重新进入shareinput_reader_waitready函数的for循环中,此时仍旧发现ready为false,那么继续进入ConditionVariablePrepareToSleep函数,此时cv_sleep_target已经==cv了,所以会进入while循环中通过WaitLatch调用epoll_wait,进入等待
生产者:
1)首先生产者需要创建tuplestore,并在temp_tablespace配置项下打开临时文件;
2)循环从WITH子查询中取数据并将他放到tuplestore中
3)去完后需要通过tuplestore_freeze将tuplestore中的数据持久化到磁盘临时文件中
4)通过shareinput_writer_notifyready原子更新ref.xslice_state的ready,以便消费者知道可以从共享临时文件取数据了
5)然后通过ConditionVariableBroadcast从共享内存中的cv->wakeup链表取出等待的消费者的进程序号,然后通过kill向他发送SIGUSG信号
消费者:
1)消费者接收到SIGURG信号后,通过latch_sigurg_handler函数处理该信号,由sendSelfPipeByte向管道写一字节
2)消费者在WaitLatch->epoll_wait中感知到管道写了一字节后,就会唤醒
接着就可以从tuplestore的临时文件中读取数据了

3、同一个进程下共享
同一个进程时消费者和生产者不会并发访问数据,此时消费者和生产者是同一个人。生产者将数据准备好后,作为消费者角色就可以从tuplestore中使用另一个读指针读取。




