
Interconnect概要介绍


libpq是基于TCP的控制流协议。QD通过libpq与各个QE间传输控制信息,包括发送查询计划、收集错误信息、处理取消操作等。libpq是PostgreSQL的标准协议,Greenplum对该协议进行了增强,譬如新增了‘M’消息类型 (QD 使用该消息发送查询计划给QE)等。 Interconnect数据流协议:QD和QE、QE和QE之间的表元组数据传输通过Interconnect实现,Greenplum有三种Interconnect实现方式,一种基于TCP协议,一种基于UDP协议,还有一种是Proxy协议。缺省方式为 UDP Interconnect连接方式。
Interconnect初始化流程

typedef enum GpVars_Interconnect_Type { Interconnect_TYPE_TCP = 0, Interconnect_TYPE_UDPIFC, Interconnect_TYPE_PROXY, } GpVars_Interconnect_Type; typedef struct ChunkTransportState { /* array of per-motion-node chunk transport state */ int size;//来自宏定义CTS_INITIAL_SIZE ChunkTransportStateEntry *states;//上一个成员变量定义的size个数 ChunkTransportStateEntry /* keeps track of if we've "activated" connections via SetupInterconnect(). */ bool activated; bool aggressiveRetry; /* whether we've logged when network timeout happens */ bool networkTimeoutIsLogged;//缺省false,在ic_udp中才用到 bool teardownActive; List *incompleteConns; /* slice table stuff. */ struct SliceTable *sliceTable; int sliceId;//当前执行slice的索引号 /* Estate pointer for this statement */ struct EState *estate; /* Function pointers to our send/receive functions */ bool (*SendChunk)(struct ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntry, MotionConn *conn, TupleChunkListItem tcItem, int16 motionId); TupleChunkListItem (*RecvTupleChunkFrom)(struct ChunkTransportState *transportStates, int16 motNodeID, int16 srcRoute); TupleChunkListItem (*RecvTupleChunkFromAny)(struct ChunkTransportState *transportStates, int16 motNodeID, int16 *srcRoute); void (*doSendStopMessage)(struct ChunkTransportState *transportStates, int16 motNodeID); void (*SendEos)(struct ChunkTransportState *transportStates, int motNodeID, TupleChunkListItem tcItem); /* ic_proxy backend context */ struct ICProxyBackendContext *proxyContext; } ChunkTransportState; |
void SetupInterconnect(EState *estate) { Interconnect_handle_t *h; h = allocate_Interconnect_handle(); Assert(InterconnectContext != NULL); oldContext = MemoryContextSwitchTo(InterconnectContext); if (Gp_Interconnect_type == Interconnect_TYPE_UDPIFC) SetupUDPIFCInterconnect(estate); #here udp初始化流程 else if (Gp_Interconnect_type == Interconnect_TYPE_TCP || Gp_Interconnect_type == Interconnect_TYPE_PROXY) SetupTCPInterconnect(estate);#here tcp & proxy else elog(ERROR, "unsupported expected Interconnect type"); MemoryContextSwitchTo(oldContext); h->Interconnect_context = estate->Interconnect_context; } SetupUDPIFCInterconnect_Internal初始化一些列相关结构,包括Interconnect_context初始化、以及transportStates->states成员createChunkTransportState的初始化,以及rx_buffer_queue相关成员的初始化。 /* rx_buffer_queue */ //缓冲区相关初始化重要参数 conn->pkt_q_capacity = Gp_Interconnect_queue_depth; conn->pkt_q_size = 0; conn->pkt_q_head = 0; conn->pkt_q_tail = 0; conn->pkt_q = (uint8 **) palloc0(conn->pkt_q_capacity * sizeof(uint8 *)); /* update the max buffer count of our rx buffer pool. */ rx_buffer_pool.maxCount += conn->pkt_q_capacity; |
TCP & proxy : Interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromTCP; Interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyTCP; Interconnect_context->SendEos = SendEosTCP; Interconnect_context->SendChunk = SendChunkTCP; Interconnect_context->doSendStopMessage = doSendStopMessageTCP; UDP: Interconnect_context->RecvTupleChunkFrom = RecvTupleChunkFromUDPIFC; Interconnect_context->RecvTupleChunkFromAny = RecvTupleChunkFromAnyUDPIFC; Interconnect_context->SendEos = SendEosUDPIFC; Interconnect_context->SendChunk = SendChunkUDPIFC; Interconnect_context->doSendStopMessage = doSendStopMessageUDPIFC; |
Ic_udp流程分析

MotionConn: 核心成员变量分析: /* send side queue for packets to be sent */ ICBufferList sndQueue; //buff来自conn->curBuff,间接来自snd_buffer_pool ICBuffer *curBuff; //snd_buffer_pool在motionconn初始化的时候,分别获取buffer,放在curBuff uint8 *pBuff; //pBuff初始化后指向其curBuff->pkt /* 依赖aSlice->primaryProcesses获取进程proc结构进行初始化构造,进程id、IP、端口等信息 */ struct icpkthdr conn_info; //全局&ic_control_info.connHtab struct CdbProcess *cdbProc;//来自aSlice->primaryProcesses uint8 **pkt_q; /*pkt_q是数组充当环形缓冲区,其中容量求模计算下标操作,Rx线程接收的数据包pkt放置在conn->pkt_q[pos] = (uint8 *) pkt中。而IcBuffer中的pkt赋值给motioncon中的pBuff,而pBuff又会在调用prepareRxConnForRead时,被赋值pkt_q对应指针指向的数据区,conn->pBuff = conn->pkt_q[conn->pkt_q_head];从而形成数据链路关系。 */ motion: ICBufferList sndQueue、 ICBuffer *curBuff、 ICBufferList unackQueue、 uint8 *pBuff、 uint8 **pkt_q; ICBuffer pkt static SendBufferPool snd_buffer_pool; 第一层:snd_buffer_pool在motionconn初始化的时候,分别获取buffer,放在curBuff,并初始化pBuff。 第二层:理解sndQueue逻辑 中转站,buff来自conn->curBuff,间接来自snd_buffer_pool 第三层:理解data buffer和 pkt_q 启用数据缓冲区:pkt_q是数组充当环形缓冲区,其中容量求模计算下标操作,Rx线程接收的数据包pkt放置在conn->pkt_q[pos] = (uint8 *) pkt中。 而IcBuffer中的pkt赋值给motioncon中的pBuff,而pBuff又会在调用prepareRxConnForRead时,被赋值pkt_q对应指针指向的数据区, conn->pBuff = conn->pkt_q[conn->pkt_q_head];从而形成数据链路关系。 |
Ic_udp流程分析缓冲区初始化: SetupUDPIFCInterconnect_Internal调用initSndBufferPool(&snd_buffer_pool)进行初始化。 Ic_udp流程分析缓冲获取: 调用接口getSndBuffer获取缓冲区buffer,在初始化流程SetupUDPIFCInterconnect_Internal->startOutgoingUDPConnections,为每个con获取一个buffer,并且填充MotionConn中的curBuff static ICBuffer * getSndBuffer(MotionConn *conn) Ic_udp流程分析缓冲释放: 通过调用icBufferListReturn接口,释放buffer进去snd_buffer_pool.freeList static void icBufferListReturn(ICBufferList *list, bool inExpirationQueue) { icBufferListAppend(&snd_buffer_pool.freeList, buf);# here 0 } 清理:cleanSndBufferPool(&snd_buffer_pool);上面释放回去后接着清理buff。 handleAckedPacket逻辑对于unackQueue也会出发释放。 |
Ic_Proxy流程分析

gp_Interconnect_proxy_addresses gpconfig --skipvalidation -c gp_Interconnect_proxy_addresses -v "'1:-1:localhost:2000,2:0:localhost:2002,3:0:localhost:2003,4:-1:localhost:2001'" |
在 Ic-Tcp 模式下,QE 之间存在 TCP 连接(包括 QD),以一个收集动作举例: ┌ ┐ │ │ <===== [ QE1 ] │ QD │ │ │ <===== [ QE2 ] └ ┘ 在 Ic-Udp 模式下,没有 TCP 连接,但仍有逻辑连接:如果两个QE相互通信,则存在逻辑连接: ┌ ┐ │ │ <----- [ QE1 ] │ QD │ │ │ <----- [ QE2 ] └ ┘ 在 Ic_Proxy 模式下,我们仍然使用逻辑连接的概念: ┌ ┐ ┌ ┐ │ │ │ │ <====> [ proxy ] <~~~~> [ QE1 ] │ QD │ <~~~~> │ proxy │ │ │ │ │ <====> [ proxy ] <~~~~> [ QE2 ] └ ┘ └ ┘ 在 N:1 集合运动中,有 N 个逻辑连接; 在N:N重新分配/广播运动中存在逻辑连接数N*N |
Ic_Proxy数据转发流程介绍



相关阅读
END

最后修改时间:2023-01-28 10:12:44
文章转载自HashData,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




