总结
这个其实query子命令其实是读取gaussdb.state里面的内容,这个存储的结构是gaussstate的结构体的东西然后读取gaussdb.state
的文件,后面我发现有一个函数实现了类似功能pg_stat_get_stream_replications
,不用自己实现了,通过SQL展示了,用它就行了
命令
这是一个单节点执行的命令,多节点或者有主备节的,和这个完全不一样,执行这个命令在经过了哪些过程,我很想知道,下面我梳理一下,只梳理query的实现,根据需要自己去看
[2025-03-20 10:14:21.791][8484][][gs_ctl]: gs_ctl query ,datadir is var/lib/opengauss/data
HA state:
local_role : Normal
static_connections : 0
db_state : Normal
detail_information : Normal
Senders info:
No information
Receiver info:
No information
[opengauss@vbox ~]$
源代码文件位置
src/bin/pg_ctl/pg_ctl.cpp
src/include/replication/replicainternal.h
入口函数main
int main(int argc, char** argv)
执行query指令会触发QUERY_COMMAND
这是一个枚举类型
ctl_command = NOTIFY_COMMAND;
else if (strcmp(argv[optind], "query") == 0)
ctl_command = QUERY_COMMAND;
这是枚举类型的代码样子关注在QUERY_COMMAND
typedef enum {
NO_COMMAND = 0,
INIT_COMMAND,
START_COMMAND,
RESTART_COMMAND,
STOP_COMMAND,
RELOAD_COMMAND,
STATUS_COMMAND,
PROMOTE_COMMAND,
KILL_COMMAND,
REGISTER_COMMAND,
UNREGISTER_COMMAND,
RUN_AS_SERVICE_COMMAND,
FAILOVER_COMMAND,
SWITCHOVER_COMMAND,
NOTIFY_COMMAND,
QUERY_COMMAND,
BUILD_COMMAND,
BUILD_QUERY_COMMAND,
RESTORE_COMMAND,
HOTPATCH_COMMAND,
FINISH_REDO_COMMAND
} CtlCommand;
我们执行query这个操作真正对应的是QUERY_COMMAND
,然后QUERY_COMMAND
对应do_query()
函数,这才是真正的打开人do_query()
case QUERY_COMMAND:
pg_log(PG_PROGRESS, _("gs_ctl query ,datadir is %s \n"), pg_data);
do_query();
break;
case BUILD_QUERY_COMMAND:
pg_log(PG_PROGRESS, _("gs_ctl build query ,datadir is %s \n"), pg_data);
do_build_query();
break;
do_query()函数实现
里面显示部分为display_query()在负责最终结果的展示
static void do_query(void);
static void do_query(void);
{
#define NumCommands 3
PGconn* conn = NULL;
GaussState state;
pid_t pid = 0;
int commandIndex = 0;
errno_t tnRet = 0;
const char* infoTitle[NumCommands] = {"HA state:", "Senders info:", "Receiver info:"};
const char* sqlCommands[NumCommands] = {"SELECT local_role,static_connections,db_state,detail_information "
"FROM pg_stat_get_stream_replications();",
"SELECT sender_pid,local_role,peer_role,peer_state, "
"state,sender_sent_location,sender_write_location, "
"sender_flush_location,sender_replay_location, "
"receiver_received_location,receiver_write_location, "
"receiver_flush_location,receiver_replay_location, "
"sync_percent,sync_state,sync_priority, "
"sync_most_available,channel "
"FROM pg_stat_get_wal_senders();",
"SELECT receiver_pid,local_role,peer_role,peer_state, "
"state,sender_sent_location,sender_write_location, "
"sender_flush_location,sender_replay_location, "
"receiver_received_location,receiver_write_location, "
"receiver_flush_location,receiver_replay_location, "
"sync_percent,channel "
"FROM pg_stat_get_wal_receiver();"};
if (islsnquery) {
do_lsn_query();
return;
}
pid = get_pgpid();
if (pid == 0) { /* No pid file */
pg_log(PG_WARNING, _(" PID file \"%s\" does not exist\n"), pid_file);
pg_log(PG_WARNING, _("Is server running?\n"));
exit(1);
} else if (!is_process_alive(pid)) {
pg_log(PG_WARNING, _(" could not connect to the local server, the postmaster process %d is not running"), pid);
exit(1);
} else if (!IsMyPostmasterPid(pid, pg_config)) {
pg_log(PG_WARNING, _(" The process recorded in PID file \"%s\" is not current server\n"), pid_file);
pg_log(PG_WARNING, _("Is server running?\n"));
exit(1);
}
/* connect to database */
conn = get_connectionex();
if ((conn == NULL) || (PQstatus(conn) != CONNECTION_OK)) {
tnRet = memset_s(&state, sizeof(state), 0, sizeof(state));
securec_check_c(tnRet, "\0", "\0");
ReadDBStateFile(&state);
display_query(&state, conn != NULL ? (const char*)PQerrorMessage(conn) : (const char*)NULL);
PQfinish(conn);
conn = NULL;
return;
}
/* print query results */
for (commandIndex = 0; commandIndex < NumCommands; commandIndex++) {
pg_log(PG_PRINT, _(" %-20s\n"), infoTitle[commandIndex]);
exe_sql(conn, sqlCommands[commandIndex]);
}
PQfinish(conn);
conn = NULL;
}
display_query()函数实现
static void display_query(GaussState* state, const char* errormsg)
{
#define MAX_INFO 1024
/* Ha state options */
const char* role_opts = "local_role";
const char* conn_opts = "static_connections";
const char* state_opts = "db_state";
const char* detail_opts = "detail_information";
const char* syncmode_opts = "sync_mode";
const char* default_reason = "Access denied";
int static_connections = 0;
char detail_info[MAX_INFO] = {0};
errno_t tnRet = 0;
if (state == NULL) {
pg_log(PG_PRINT, _("%s: display query failed, could not get information from gaussdb.state\n"), progname);
exit(1);
}
if ((errormsg == NULL) || (*errormsg == '\0')) {
tnRet = strncpy_s(detail_info, MAX_INFO, default_reason, strlen(default_reason) + 1);
securec_check_c(tnRet, "\0", "\0");
} else {
tnRet = strncpy_s(detail_info, MAX_INFO, errormsg, strlen(errormsg) + 1);
securec_check_c(tnRet, "\0", "\0");
}
static_connections = state->conn_num;
/* print state header */
pg_log(PG_PRINT, _(" HA state:\n"));
/* print local role */
pg_log(PG_PRINT, _(" %-30s: %s\n"), role_opts, get_string_by_mode(state->mode));
/* print static connections */
pg_log(PG_PRINT, _(" %-30s: %d\n"), conn_opts, static_connections);
/* print db state */
pg_log(PG_PRINT, _(" %-30s: %s\n"), state_opts, get_string_by_state(state->state));
/* print detail information */
pg_log(PG_PRINT, _(" %-30s: %s\n"), detail_opts, detail_info);
/* print sync mode */
pg_log(PG_PRINT, _(" %-30s: %s\n"), syncmode_opts, get_string_by_sync_mode(state->sync_stat));
pg_log(PG_PRINT, _("\n"));
/* print no info about sender and receiver */
pg_log(PG_PRINT, _(" Senders info:\n"));
pg_log(PG_PRINT, _(" No information\n"));
pg_log(PG_PRINT, _(" Receiver info:\n"));
pg_log(PG_PRINT, _(" No information\n"));
}
db_state&local_role 功能点
本次我们关注db_state&local_role这两个显示结果,因为我需要通过SQL的形式展现出来
pg_log(PG_PRINT, _(" %-30s: %s\n"), role_opts, get_string_by_mode(state->mode));
pg_log(PG_PRINT, _(" %-30s: %s\n"), state_opts, get_string_by_state(state->state));
难点
gaussstate struct
typedef struct gaussstate {
ServerMode mode;
int conn_num;
DbState state;
bool sync_stat;
uint64 lsn;
uint64 term;
BuildState build_info;
HaRebuildReason ha_rebuild_reason;
} GaussState;
static void display_query(GaussState* state, const char* errormsg);
ReadDBStateFile(&state);
display_query(&state, conn != NULL ? (const char*)PQerrorMessage(conn) : (const char*)NULL);
get_string_by_state 函数实现
static char* get_string_by_state(DbState db_state);
static char* get_string_by_state(DbState db_state)
{
switch (db_state) {
case NORMAL_STATE:
return "Normal";
case UNKNOWN_STATE:
return "Unknown";
case NEEDREPAIR_STATE:
return "Need repair";
case STARTING_STATE:
return "Starting";
case WAITING_STATE:
return "Wait promoting";
case DEMOTING_STATE:
return "Demoting";
case PROMOTING_STATE:
return "Promoting";
case BUILDING_STATE:
return "Building";
case CATCHUP_STATE:
return "Catchup";
case COREDUMP_STATE:
return "Coredump";
default:
return "Unknown";
}
}
get_string_by_mode 函数实现
static char* get_string_by_mode(ServerMode s_mode);
static char* get_string_by_mode(ServerMode s_mode)
{
switch (s_mode) {
case NORMAL_MODE:
return "Normal";
case PRIMARY_MODE:
return "Primary";
case STANDBY_MODE:
return "Standby";
case CASCADE_STANDBY_MODE:
return "Cascade Standby";
case PENDING_MODE:
return "Pending";
default:
return "Unknown";
}
}
文章转载自SmallDB,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




