暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

人大金仓数据库KingbaseES LIBKCI多线程负载均衡用例支持

原创 数据猿 2023-12-26
463


关键字:

客户端编程接口、LIBKCI、多线程、负载均衡、人大金仓、KingbaseES、

一、环境准备

安装KES数据库,libkci驱动测试可以正常使用,libkci驱动无法正常使用的请参照作者的文章《产品使用-金仓数据库KingbaseES LIBKCI驱动使用》。

程序说明:在libkci的手册无多线程负载均衡例程,本文记录多线程负载均衡例程的开发过程和测试过程。包含Linux和Windows的负载均衡,本文的思路是将两个环境集成到一个程序中。

负载均衡:通过算法调整负载,尽力均匀分配应用集群中各个节点的工作量,以此提高应用集群的并发处理能力。

二、多线程负载均衡

2.1 多主机连接准备

1、连接参数信息,这里作者给出两种多主机的方式,一是连接字符串,二是连接URI。

(1)连接字符串:

host=192.168.1.1,192.168.1.2 port=54321,54322 user=system password=123456 dbname=test loadbalance = on target_session_attrs = read-write

(2)连接URI:

kingbase://system:123456@192.168.1.1:54323,192.168.1.2:54300,192.168.1.3:54321/test?loadbalance=on

图2-1 连接参数

2、相关库文件引用,通过相关资料搜集和学习网上的Linux和Windows负载均衡方法,作者使用了以下负载均衡所需的头文件。#indef WIN32表示这部分头文件是Windows所独有,#else表示这部分为Linux所独有,#endif表示后面的内容共同使用。

图2-2 库文件使用

3、全局变量定义,定义主机的数量,定义需要开启的线程数量,定义线程内连接的次数。

图2-3 全局变量定义

4、定义打印KCIResult状态的函数和错误保护函数。

图2-4 状态输出函数和程序退出函数

5、创建表的函数,因为是多主机多线程,如果每一个主机都手动去创建表比较麻烦,把创建表这部分单独提取出来封装成一个函数,方便在多线程中调用。

图2-5 创建表

6、检查表是否存在的函数,构造该函数目的是避免在多主机多线程中重复创建表引发错误,创建表之前做表存在与否的判断,不存在再创建即可。

图2-6 检查表是否存在

7、主程序和函数入口构建,在main中写入以下程序,即可在不同平台调用不同的负载均衡程序。

#ifdef WIN32

/* Windows 平台多线程负载均衡程序入口 */

test_mult_thread_Windows();

#else

/* Linux 平台多线程负载均衡程序入口 */

test_mult_thread_Linux();

#endif

图2-7 不同平台程序入口

2.2 Linux平台多线程负载均衡

1、创建Linux平台的函数,Linux程序的位置如下面示例。

#ifdef WIN32

Windows程序

#else

Linux程序

#endif

在上方标红色位置创建函数,然后定义相关变量,定义线程变量(标识符),然后分配内存,这里的num在2.1中定义为线程数量,即根据线程的数量来分配合适内存。

图2-8 函数创建

2、创建线程,调用pthread_create来创建线程,线程执行的函数为my_thread函数。

图2-9 创建线程

3、线程执行函数。

(1)首先在#endif下方创建*my_thread多线程执行函数。

图2-10 线程执行函数

(2)然后在线程执行函数中构建一个while循环,循环参数为counts,全局变量里面设置线程内的执行次数,然后建立到数据库的连接。

图2-11 线程内循环

(3)然后是获取socket连接信息。

图2-12 scoket连接信息

(4)打印出对端的IP,并将IP与不同的主机进行字符串对比,检测是哪个主机,那么该主机的线程执行次数加一。

图2-13 打印并检测IP

(5)线程内执行数据插入。

图2-14 线程内循环执行

4、执行完上述线程循环创建和线程内循环构建数据库连接之后,打印相关的程序执行结果,阻塞等待线程执行,释放内存,最后返回程序执行结果。

图2-15 等待线程执行完毕释放

2.3 Windows平台多线程负载均衡

1、同2.2,创建函数入口,在下方红色地方创建函数。

#ifdef WIN32

Windows程序

#else

Linux程序

#endif

在上方标红色位置创建函数,然后定义相关变量,定义线程变量(标识符),然后分配内存,这里的num在2.1中定义为线程数量,即根据线程的数量来分配合适内存。

下方程序灰色是因为我在Linux中来写程序,而在Linux中该部分程序是不执行的。

图2-16 Windows多线程函数创建

2、创建线程,循环创建num个线程,利用函数_beginthreadex进行线程创建,程序的执行函数依旧是*my_thread(同2.2是同一个函数),不过注意这里传入的是程序地址。

图2-13 绑定参数

3、执行完上述线程循环创建和线程内循环构建数据库连接之后,打印相关的程序执行结果,阻塞等待线程执行,释放内存,最后返回程序执行结果。

图2-14 打开文件

2.4 Linux中程序执行

1、定义Makefile文件,在Makefile中加上需要编译的文件,比如作者这里是libkcitest6.c,加入数据库的lib和include路径。

图2-15 编辑Makefile文件

2、编译程序,使用make进行程序编译,记得添加openssl环境变量和数据库的lib环境变量,编译完成后生成可执行文件libkcitest5。

图2-17 编译

3、执行程序,在启动数据库的前提下直接执行./testlibkci5,可以看到程序执行结果,总共连接了20次,程序中只写了两个主机,程序将连接负载均衡到每一个主机了。

图2-18 程序执行结果

2.4 Windows中程序执行

Windows的程序编辑和运行,可以参照作者的文章《官网V9各版本LIBKCI驱动提供-赵微》。

(1)创建好工程之后,添加lib和include,然后创建一个test.c文件,把程序复制到test.c中。

(2)执行程序,先右键项目,重新生成,然后再点击上方运行,测试成功。

图3-7 生成解决方案

图3-8 运行测试程序

三、总结

1、负载均衡在Windows上会有所不同,且需要的头文件不一样,使用时要注意使用#ifdef、#else、#endif。

2、执行例程时需要执行openssl的环境变量配置,将openssl版本从1.1.1k转到1.1.1q,并且还需要执行数据库lib环境变量配置。

export PATH=openssl地址/bin:$PATH

export LD_LIBRARY_PATH=openssl地址/lib:$LD_LIBRARY_PATH

export LD_LIBRARY_PATH=数据库地址/lib:LD_LIBRARY_PATH

附录一:本文创建的例程源码

/*

* testlibkci5.c

* 多线程负载均衡。

* 分 Windows 平台和 Linux 平台

* #ifdef WIN32 为 Windows 平台

* #else 为 Linux 平台

* 要求数据库中没有 test1 表

* 如有请删除或修改程序中的表名

*

* 输出结果为:

*

* remote IP [192.168.1.1] [44244]

* remote IP [192.168.1.2] [65227]

* ......

* total connections: 20

* 192.168.1.1 ---> 6

* 192.168.1.2 ---> 7

* 192.168.1.3 ---> 6

* 上述的不同 IP 连接次数以实际情况为准

*

*/

#ifdef WIN32

#include <WS2tcpip.h>

#include <windows.h>

/* 扩展,把这个库加到工程文件中,提供对网络相关的API支持 */

#pragma comment(lib,"ws2_32.lib")

#else

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <unistd.h>

#include <pthread.h> /* 注意:编译需要链接线程系统库 -lpthread */

#endif

#include <stdio.h>

#include <stdlib.h>

#include <stdint.h>

#include <string.h>

#include "libkci_fe.h"

#define SA struct sockaddr

KCIResult *res = NULL;

int nConn1, nConn2, nConn3;

char IP1[] = "10.12.1.30";

char IP2[] = "10.10.12.252";

char IP3[] = "10.10.11.153";

/* 此处定义线程和线程内连接次数 */

int num = 10; /* 开启线程的数量 */

int counts = 2; /* 线程内循环次数 */

/*

* 多主机连接串

* loadbalance 参数开启负载均衡

* target_session_attrs 参数要求主机具有读写权限

*/

// char connInfo[] = "host=192.168.1.1,192.168.1.2 port=54321,54322 user=system password=123456 dbname=test loadbalance = on target_session_attrs = read-write ";

char connInfo[] = "host=10.12.1.30,10.10.12.252 port=52222,54444 user=system password=123456 dbname=test loadbalance = on connect_timeout=3 ";

/* URI 连接串:char connInfo[] = "kingbase://system:123456@192.168.1.1:54323,192.168.1.2:54300,192.168.1.3:54321/test?loadbalance=on"; */

void* my_thread(void *arg);

/* 打印状态字符串 */

void printStatus(KCIResult *res)

{

KCIExecuteStatus status;

status = KCIResultGetStatusCode(res);

printf("STATUS[%s]\n", KCIResultGetStatusString(status));

}

/* 遇到错误销毁连接并退出程序 */

void exit_KB(KCIConnection *conn, KCIResult *res)

{

if (res)

KCIResultDealloc(res);

KCIConnectionDestory(conn);

exit(1);

}

/* 创建表 */

void createTbl(KCIConnection *conn)

{

char crtTbl[] = "CREATE TABLE test1( ID serial, content text, primary key (ID))";

/* 检查是否成功建立 */

res = KCIStatementExecute(conn, crtTbl);

if (KCIResultGetStatusCode(res) != EXECUTE_COMMAND_OK)

{

fprintf(stderr, "CREATE TABLE error:[%s]", KCIConnectionGetLastError(conn));

exit_KB(conn, res);

}

else

printf("CREATE TABLE test1.\n");

KCIResultDealloc(res);

}

/* 检查表是否存在 */

int isTblExist(KCIConnection *conn, char *tbname)

{

int ret = 0;

char *value;

char sqlstr[100];

memset(sqlstr, 0x00, sizeof(sqlstr));

/* 在系统表中查找 test1 表是否存在 */

sprintf(sqlstr, "select count(*) from sys_class where relname = '%s'", tbname);

res = KCIStatementExecute(conn, sqlstr);

if (KCIResultGetStatusCode(res) != EXECUTE_TUPLES_OK)

{

fprintf(stderr, "isTblExist error [%s]", KCIConnectionGetLastError(conn));

exit_KB(conn, res);

}

value = KCIResultGetColumnValue(res, 0, 0);

ret = atoi(value);

KCIResultDealloc(res);

return ret;

}

#ifdef WIN32

/* Windows 平台多线程操作 */

int test_mult_thread_Windows()

{

int i = 0;

int err;

HANDLE *hThread = NULL;

hThread = (HANDLE *)malloc(sizeof(HANDLE) * num);

memset(hThread, 0, sizeof(HANDLE) * num);

for (i = 0; i < num; i++)

{

/* 创建线程 */

hThread[i] = (HANDLE)_beginthreadex(NULL, 0, my_thread, NULL, 0, NULL);

if (hThread[i] == NULL)

{

printf("create thread hThread[%d] fail\n", i);

return -1;

}

Sleep(1000);

}

printf("total connections: %d\n", num * counts);

printf("%s ---> %d\n", IP1, nConn1);

printf("%s ---> %d\n", IP2, nConn2);

printf("%s ---> %d\n", IP3, nConn3);

/* 以阻塞方式等待 hThread[i] 指定的线程结束 */

for (i = 0; i < num; i++)

{

system("pause");

}

free(hThread);

return 0;

}

#else

/* Linux 平台多线程操作 */

int test_mult_thread_Linux()

{

int i = 0;

int err;

pthread_t *hThread = NULL;

hThread = (pthread_t *)malloc(sizeof(pthread_t) * num);

memset(hThread, 0, sizeof(pthread_t) * num);

for (i = 0; i < num; i++)

{

/* 创建线程,创建成功返回 0 */

err = pthread_create(&hThread[i], NULL, &my_thread, NULL);

if (err != 0)

{

printf("create thread hThread[%d] fail\n", i);

return err;

}

sleep(1);

}

printf("total connections: %d\n", num * counts);

printf("%s ---> %d\n", IP1, nConn1);

printf("%s ---> %d\n", IP2, nConn2);

printf("%s ---> %d\n", IP3, nConn3);

/* 以阻塞方式等待 hThread[i] 指定的线程结束 */

for (i = 0; i < num; i++)

{

pthread_join(hThread[i], NULL);

}

free(hThread);

return 0;

}

#endif

void *my_thread(void *arg)

{

int i;

int ret;

int socket;

char sIP[32 + 1];

char sqlstr[100];

struct sockaddr_in remote;

socklen_t len;

KCIConnection *conn;

len = sizeof(struct sockaddr_in);

/* 多连接数据库 */

i = 0;

while (i < counts)

{

memset(&remote, 0, sizeof(struct sockaddr_in));

/*

* 线程内建立连接

* 不允许两个线程同时尝试操纵同一个 conn 对象

*/

conn = KCIConnectionCreate(connInfo);

if (KCIConnectionGetStatus(conn) != CONNECTION_OK)

{

fprintf(stderr, "KCIConnectionCreate error:[%s]", KCIConnectionGetLastError(conn));

exit_KB(conn, NULL);

}

//printf("建立第%d次连接\n", i);

/* 获取 socket 信息 */

socket = KCIConnectionGetSocket(conn);

ret = getpeername(socket, (SA *)&remote, &len);

if (ret < 0)

{

fprintf(stdout, "getpeername error.");

exit_KB(conn, NULL);

}

/* 打印对端 IP */

memset(sIP, 0x00, sizeof(sIP));

printf("remote IP [%s] [%d]\n", inet_ntop(AF_INET, &remote.sin_addr, sIP, sizeof(sIP)), remote.sin_port);

if (strcmp(IP1, sIP) == 0)

nConn1++;

else if (strcmp(IP2, sIP) == 0)

nConn2++;

// else if (strcmp(IP3, sIP) == 0)

// nConn3++;

if (!isTblExist(conn, "test1"))

createTbl(conn);

memset(sqlstr, 0, sizeof(sqlstr));

sprintf(sqlstr, "insert into test1(content) values('%s')", sIP);

/* 插入数据 */

res = KCIStatementExecute(conn, sqlstr);

if (KCIResultGetStatusCode(res) != EXECUTE_COMMAND_OK)

{

fprintf(stderr, "insert error:[%s]", KCIConnectionGetLastError(conn));

exit_KB(conn, res);

}

KCIResultDealloc(res);

KCIConnectionDestory(conn);

i++;

}

return 0;

}

int main()

{

#ifdef WIN32

/* Windows 平台多线程负载均衡程序入口 */

test_mult_thread_Windows();

#else

/* Linux 平台多线程负载均衡程序入口 */

test_mult_thread_Linux();

#endif

}

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论