PostgreSQL提供了客户端和其他客户端通过数据库服务器进行消息通信的机制,即异步通知,PostgreSQL通过LISTEN和NOTIFY命令提供了异步通知。
使用方法
客户端通过LISTEN命令监听一个通道,当通道有NOTIFY消息时,客户端会收到通知。
-- 监听mychannel通道
postgres=# listen mychannel;
LISTEN
其他客户端通过NOTIFY命令向一个通道发送消息,所有监听该通道的客户端都会收到通知。如果在事务中发生NOTIFY,那么只有事务提交后,才会发送消息,如果事务回滚,则不会发送消息。另外需要注意的是,如果发送通知时,没有监听该通道的客户端,那么消息会丢失,不会像消息队列一样存储在队列中,等待消费者消费,即使在之后有客户端监听该通道,也不会重新收到该消息。
-- 向mychannel通道发送消息
postgres=# notify mychannel,'hello world';
NOTIFY
也可以通过函数pg_notify发送消息,效果与notify相同。pg_notify函数的参数是通道名称和消息内容。消息内容的长度是有限制的,最大长度为8000字节(#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128))。
postgres=# select pg_notify('mychannel','use pg_notify notify message');
pg_notify
-----------
(1 row)
此时,监听的客户端还没有反应,需要执行任意一条SQL语句,才会收到通知。
postgres=# select now();
now
-------------------------------
2025-04-03 10:25:20.027341+08
(1 row)
-- 监听的客户端收到通知
Asynchronous notification "mychannel" with payload "hello world" received from server process with PID 6425.
如何某个客户端要取消监听,可以使用UNLISTEN命令。
-- 取消监听mychannel通道
postgres=# unlisten mychannel;
UNLISTEN
可通过函数pg_listening_channels查看当前客户端监听的通道。
postgres=# select pg_listening_channels();
pg_listening_channels
-----------------------
mychannel
(1 row)
关于事务,需要注意的是,NOTIFY能够保证来自同一个事务的消息按照发送时的顺序通知,也能保证来自不同事务的消息按照事务提交的顺序通知。
在客户端监听通道时,一个客户端应用检测通知事件的必用方法取决于它使用的PostgreSQL应用编程接口。如果使用libpq库,应用会将LISTEN作为一个普通 SQL 命令发出,并且接着必须周期性地调用函数PQnotifies来查看是否接收到通知事件。其他诸如libpgtcl的接口提供了更高层次上的处理通知事件的方法。
LISTEN NOTIFY机制的使用场景
LISTEN NOTIFY机制可以用于实现数据库的异步通知,但是其通知不是持久化的,没有监听者时消息会丢失,也就是与消息队列相比,其不适合持久化存储消息。建议可考虑PGQMQ(PostgreSQL Queue Manager)等持久化消息队列。
LISTEN NOTIFY机制的工作原理
要想PostgreSQL实现不同客户端进程之间的异步通信,数据库本身需要有一个各进程都可以访问的数据结构,来保存通知消息或者监听者列表。对此,PostgreSQL选择使用共享内存来保存这些信息,并且使用SLRU页面缓冲池来管理这些共享内存。同时,各客户端怎么知道什么时候有新的通知消息呢?PostgreSQL通过信号来实现,当有新的通知消息时,PostgreSQL进程会向监听该通道的客户端进程发送信号,客户端进程收到信号后,会去共享内存中查看是否有新的通知消息。
PostmasterMain(int argc, char *argv[])
--> reset_shared(); // 初始化共享内存和信号量
--> CreateSharedMemoryAndSemaphores();
--> AsyncShmemInit(); // 初始化异步通知相关共享内存和信号量
我们看一下以下的AsyncShmemInit函数,该函数用于初始化异步通知相关共享内存。
void AsyncShmemInit(void)
{
bool found;
Size size;
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
asyncQueueControl = (AsyncQueueControl *)
ShmemInitStruct("Async Queue Control", size, &found);
// 为所有Backend初始化
if (!found)
{
/* First time through, so initialize it */
SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = InvalidBackendId;
asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */
for (int i = 0; i <= MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = InvalidBackendId;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
}
}
/* Set up SLRU management of the pg_notify data. */
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0,
NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
SYNC_HANDLER_NONE);
if (!found)
{
/* During start or reboot, clean out the pg_notify directory. */
(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
}
}
static AsyncQueueControl *asyncQueueControl;
// 共享内存数据结构,用于保存异步通知相关数据
typedef struct AsyncQueueControl
{
QueuePosition head; /* head points to the next free location */ // 指向下一个空闲位置,表示下一个通知将被写入的位置
QueuePosition tail; /* tail must be <= the queue position of every
* listening backend */ // 表示所有监听者后端都已读取的位置
int stopPage; /* oldest unrecycled page; must be <=
* tail.page */ // SLRU回收机制使用,标识可回收的最旧页面
BackendId firstListener; /* id of first listener, or InvalidBackendId */ // 第一个监听者的ID
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl;
LISTEN命令
我们首先分析一下LISTEN命令的实现,在src/backend/commands/async.c中,Async_Listen函数实现了LISTEN命令。主流程如下:
exec_simple_query
--> PortalRun
--> standard_ProcessUtility
--> Async_Listen(stmt->conditionname); // 插入LISTEN动作到pendingActions链表
--> queue_listen(LISTEN_LISTEN, channel);
--> finish_xact_command() // 事务提交时,处理pendingActions链表中的动作
--> PreCommit_Notify();
// 1. 注册监听者
// 2. 初始化监听位置指针
// 3. 确保进程退出时正确清理监听状态
--> Exec_ListenPreCommit();
--> before_shmem_exit(Async_UnlistenOnExit, 0); // 确保进程退出时正确清理监听状态
--> AtCommit_Notify(); // 发送通知信号给所有监听者
--> Exec_ListenCommit(actrec->channel); // 如果当前channel中不存在,则将新的channel加入到listenChannels中
--> ClearPendingActionsAndNotifies(); // 清理pendingActions和pendingNotifies链表
LISTEN命令的实现主要通过queue_listen函数实现,该函数将LISTEN动作插入到pendingActions链表中,该链表在事务提交时才被处理。
// 事务级临时存储的链表,用于缓存当前事务中所有待处理的动作,直到事务提交时才批量处理。
static ActionList *pendingActions = NULL;
// 待处理的通知列表
static NotificationList *pendingNotifies = NULL;
// 插入LISTEN动作到pendingActions链表
void Async_Listen(const char *channel)
{
if (Trace_notify)
elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
queue_listen(LISTEN_LISTEN, channel);
}
/*
* queue_listen
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
* Actual update of the listenChannels list happens during transaction
* commit.
*/
static void queue_listen(ListenActionKind action, const char *channel);
// 通道列表,当事务提交时,会更新该列表
/*
* listenChannels identifies the channels we are actually listening to
* (ie, have committed a LISTEN on). It is a simple list of channel names,
* allocated in TopMemoryContext.
*/
static List *listenChannels = NIL; /* list of C strings */
NOTIFY命令
我们再分析一下NOTIFY命令的实现,在src/backend/commands/async.c中,Async_Notify函数实现了NOTIFY命令。主流程如下:
exec_simple_query
--> PortalRun
--> standard_ProcessUtility
--> Async_Notify(stmt->conditionname, stmt->payload); // 插入NOTIFY动作到pendingNotifies链表,事务提交时才被处理
--> finish_xact_command()
--> CommitTransactionCommand();
--> CommitTransaction();
--> PreCommit_Notify(); // 处理待处理的通知列表
--> AtCommit_Notify();
--> SignalBackends(); // 发送通知信号给所有监听者
--> SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i])
--> ClearPendingActionsAndNotifies();
我们继续看一下函数PreCommit_Notify是如何将消息写入到队列中的。
// 待处理的通知列表,当事务提交时,会处理该列表中的通知
static NotificationList *pendingNotifies = NULL;
// 处理待处理的通知列表,pendingNotifies以及pendingActions链表
void PreCommit_Notify(void)
{
ListCell *p;
if (!pendingActions && !pendingNotifies)
return; /* no relevant statements in this xact */
/* Preflight for any pending listen/unlisten actions */
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
break;
case LISTEN_UNLISTEN_ALL:
break;
}
}
}
/* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
ListCell *nextNotify;
(void) GetCurrentTransactionId();
LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock);
/* Now push the notifications into the queue */
nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
{
/*
* Add the pending notifications to the queue. We acquire and
* release NotifyQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
* A full queue is very uncommon and should really not happen,
* given that we have so much space available in the SLRU pages.
* Nevertheless we need to deal with this possibility. Note that
* when we get here we are in the process of committing our
* transaction, but we have not yet committed to clog, so at this
* point in time we can still roll the transaction back.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify); // 将通知添加到队列中,写入page中
LWLockRelease(NotifyQueueLock);
}
}
}
异步通知
当NOTIFY消息后,监听者如何获取到通知呢?客户端进程收到了PROCSIG_NOTIFY_INTERRUPT信号后,会调用ProcessNotifyInterrupt函数,该函数会调用asyncQueueReadAllNotifications函数,从队列中读取通知,并通过NotifyMyFrontEnd函数将通知发送给客户端。
调用栈如下:
NotifyMyFrontEnd(const char * channel, const char * payload, int32 srcPid) (src\backend\commands\async.c:2311)
asyncQueueProcessPageEntries(volatile QueuePosition * current, QueuePosition stop, char * page_buffer, Snapshot snapshot) (src\backend\commands\async.c:2142)
asyncQueueReadAllNotifications() (src\backend\commands\async.c:2044)
ProcessIncomingNotify(_Bool flush) (src\backend\commands\async.c:2276)
ProcessNotifyInterrupt(_Bool flush) (src\backend\commands\async.c:1904)
ProcessClientReadInterrupt(_Bool blocked) (src\backend\tcop\postgres.c:510)
secure_read(Port * port, void * ptr, size_t len) (src\backend\libpq\be-secure.c:215)
pq_recvbuf() (src\backend\libpq\pqcomm.c:955)
pq_getbyte() (src\backend\libpq\pqcomm.c:1001)
SocketBackend(StringInfo inBuf) (src\backend\tcop\postgres.c:356)
ReadCommand(StringInfo inBuf) (src\backend\tcop\postgres.c:479)
PostgresMain(int argc, char ** argv, const char * dbname, const char * username) (src\backend\tcop\postgres.c:4538)
BackendRun(Port * port) (src\backend\postmaster\postmaster.c:4539)
BackendStartup(Port * port) (src\backend\postmaster\postmaster.c:4261)
ServerLoop() (src\backend\postmaster\postmaster.c:1748)
PostmasterMain(int argc, char ** argv) (src\backend\postmaster\postmaster.c:1420)
main(int argc, char ** argv) (src\backend\main\main.c:211)
函数源码如下:
void PostgresMain(int argc, char *argv[], const char *dbname, const char *username)
{
volatile bool send_ready_for_query = true;
// 注册信号处理函数
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
for (;;)
{
if (send_ready_for_query)
{
if (IsAbortedTransactionBlockState())
{
// 检查当前事务块状态是否处于已终止状态
}
else if (IsTransactionOrTransactionBlock())
{
// 检查当前是否处于活动事务或事务块中
}
else
{
if (notifyInterruptPending)
ProcessNotifyInterrupt(false); // 处理通知
}
}
}
}
/* 信号处理函数 - handle SIGUSR1 signal. */
void procsignal_sigusr1_handler(SIGNAL_ARGS)
{
int save_errno = errno;
if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
HandleNotifyInterrupt(); // 处理PROCSIG_NOTIFY_INTERRUPT信号
SetLatch(MyLatch);
errno = save_errno;
}
/* HandleNotifyInterrupt */
void HandleNotifyInterrupt(void)
{
/* signal that work needs to be done */
notifyInterruptPending = true;
/* make sure the event is processed in due course */
SetLatch(MyLatch);
}
void ProcessNotifyInterrupt(bool flush)
{
if (IsTransactionOrTransactionBlock())
return; /* not really idle */
/* Loop in case another signal arrives while sending messages */
while (notifyInterruptPending)
ProcessIncomingNotify(flush);
}
// 扫描队列,将通知发送给前端
static void ProcessIncomingNotify(bool flush)
{
MemoryContext oldcontext;
/* We *must* reset the flag */
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
if (listenChannels == NIL)
return;
/*
* We must run asyncQueueReadAllNotifications inside a transaction, else
* bad things happen if it gets an error. However, we need to preserve
* the caller's memory context (typically MessageContext).
*/
oldcontext = CurrentMemoryContext;
StartTransactionCommand();
asyncQueueReadAllNotifications();
CommitTransactionCommand();
/* Caller's context had better not have been transaction-local */
Assert(MemoryContextIsValid(oldcontext));
MemoryContextSwitchTo(oldcontext);
/*
* If this isn't an end-of-command case, we must flush the notify messages
* to ensure frontend gets them promptly.
*/
if (flush)
pq_flush();
}
/* 读取所有未处理的通知,并把通知发送给前端 */
static void asyncQueueReadAllNotifications(void)
{
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
/* page_buffer must be adequately aligned, so use a union */
union
{
char buf[QUEUE_PAGESIZE];
AsyncQueueEntry align;
} page_buffer;
/* 获取当前队列状态 */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId));
pos = QUEUE_BACKEND_POS(MyBackendId);
head = QUEUE_HEAD;
LWLockRelease(NotifyQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
{
/* 如果已处理完所有通知则立即返回 */
return;
}
/*----------
* Get snapshot we'll use to decide which xacts are still in progress.
* This is trickier than it might seem, because of race conditions.
* Consider the following example:
*
* Backend 1: Backend 2:
*
* transaction starts
* UPDATE foo SET ...;
* NOTIFY foo;
* commit starts
* queue the notify message
* transaction starts
* LISTEN foo; -- first LISTEN in session
* SELECT * FROM foo WHERE ...;
* commit to clog
* commit starts
* add backend 2 to array of listeners
* advance to queue head (this code)
* commit to clog
*
* Transaction 2's SELECT has not seen the UPDATE's effects, since that
* wasn't committed yet. Ideally we'd ensure that client 2 would
* eventually get transaction 1's notify message, but there's no way
* to do that; until we're in the listener array, there's no guarantee
* that the notify message doesn't get removed from the queue.
*
* Therefore the coding technique transaction 2 is using is unsafe:
* applications must commit a LISTEN before inspecting database state,
* if they want to ensure they will see notifications about subsequent
* changes to that state.
*
* What we do guarantee is that we'll see all notifications from
* transactions committing after the snapshot we take here.
* Exec_ListenPreCommit has already added us to the listener array,
* so no not-yet-committed messages can be removed from the queue
* before we see them.
*----------
*/
snapshot = RegisterSnapshot(GetLatestSnapshot());
/*
* It is possible that we fail while trying to send a message to our
* frontend (for example, because of encoding conversion failure). If
* that happens it is critical that we not try to send the same message
* over and over again. Therefore, we place a PG_TRY block here that will
* forcibly advance our queue position before we lose control to an error.
* (We could alternatively retake NotifyQueueLock and move the position
* before handling each individual message, but that seems like too much
* lock traffic.)
*/
PG_TRY();
{
bool reachedStop;
do
{
int curpage = QUEUE_POS_PAGE(pos); // 当前页
int curoffset = QUEUE_POS_OFFSET(pos); // 当前页偏移
int slotno;
int copysize;
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the NotifySLRULock while we are examining the entries
* and possibly transmitting them to our frontend. Copy only the
* part of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
InvalidTransactionId);
if (curpage == QUEUE_POS_PAGE(head))
{
/* we only want to read as far as head */
copysize = QUEUE_POS_OFFSET(head) - curoffset;
if (copysize < 0)
copysize = 0; /* just for safety */
}
else
{
/* fetch all the rest of the page */
copysize = QUEUE_PAGESIZE - curoffset;
}
memcpy(page_buffer.buf + curoffset,
NotifyCtl->shared->page_buffer[slotno] + curoffset,
copysize);
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
LWLockRelease(NotifySLRULock);
/*
* Process messages up to the stop position, end of page, or an
* uncommitted message.
*
* Our stop position is what we found to be the head's position
* when we entered this function. It might have changed already.
* But if it has, we will receive (or have already received and
* queued) another signal and come here again.
*
* We are not holding NotifyQueueLock here! The queue can only
* extend beyond the head pointer (see above) and we leave our
* backend's pointer where it is so nobody will truncate or
* rewrite pages under us. Especially we don't want to hold a lock
* while sending the notifications to the frontend.
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
page_buffer.buf,
snapshot);
} while (!reachedStop);
}
PG_FINALLY();
{
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos;
LWLockRelease(NotifyQueueLock);
}
PG_END_TRY();
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
/*
* Fetch notifications from the shared queue, beginning at position current,
* and deliver relevant ones to my frontend.
*
* The current page must have been fetched into page_buffer from shared
* memory. (We could access the page right in shared memory, but that
* would imply holding the NotifySLRULock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* uncommitted transaction, or reach the end of the page.
*
* The function returns true once we have reached the stop position or an
* uncommitted notification, and false if we have finished with the page.
* In other words: once it returns true there is no need to look further.
* The QueuePosition *current is advanced past all processed messages.
*/
static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
Snapshot snapshot)
{
bool reachedStop = false;
bool reachedEndOfPage;
AsyncQueueEntry *qe;
do
{
QueuePosition thisentry = *current;
if (QUEUE_POS_EQUAL(thisentry, stop))
break;
qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry));
/*
* Advance *current over this message, possibly to the next page. As
* noted in the comments for asyncQueueReadAllNotifications, we must
* do this before possibly failing while processing the message.
*/
reachedEndOfPage = asyncQueueAdvance(current, qe->length);
/* Ignore messages destined for other databases */
if (qe->dboid == MyDatabaseId)
{
if (XidInMVCCSnapshot(qe->xid, snapshot))
{
/*
* The source transaction is still in progress, so we can't
* process this message yet. Break out of the loop, but first
* back up *current so we will reprocess the message next
* time. (Note: it is unlikely but not impossible for
* TransactionIdDidCommit to fail, so we can't really avoid
* this advance-then-back-up behavior when dealing with an
* uncommitted message.)
*
* Note that we must test XidInMVCCSnapshot before we test
* TransactionIdDidCommit, else we might return a message from
* a transaction that is not yet visible to snapshots; compare
* the comments at the head of heapam_visibility.c.
*
* Also, while our own xact won't be listed in the snapshot,
* we need not check for TransactionIdIsCurrentTransactionId
* because our transaction cannot (yet) have queued any
* messages.
*/
*current = thisentry;
reachedStop = true;
break;
}
else if (TransactionIdDidCommit(qe->xid))
{
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
if (IsListeningOn(channel))
{
/* payload follows channel name */
char *payload = qe->data + strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
}
}
else
{
/*
* The source transaction aborted or crashed, so we just
* ignore its notifications.
*/
}
}
/* Loop back if we're not at end of page */
} while (!reachedEndOfPage);
if (QUEUE_POS_EQUAL(*current, stop))
reachedStop = true;
return reachedStop;
}




