关键字:
客户端编程接口、LIBKCI、多线程、负载均衡、人大金仓、KingbaseES、
一、环境准备
安装KES数据库,libkci驱动测试可以正常使用,libkci驱动无法正常使用的请参照作者的文章《产品使用-金仓数据库KingbaseES LIBKCI驱动使用》。
程序说明:在libkci的手册无多线程负载均衡例程,本文记录多线程负载均衡例程的开发过程和测试过程。包含Linux和Windows的负载均衡,本文的思路是将两个环境集成到一个程序中。
负载均衡:通过算法调整负载,尽力均匀分配应用集群中各个节点的工作量,以此提高应用集群的并发处理能力。
二、多线程负载均衡
2.1 多主机连接准备
1、连接参数信息,这里作者给出两种多主机的方式,一是连接字符串,二是连接URI。
(1)连接字符串:
host=192.168.1.1,192.168.1.2 port=54321,54322 user=system password=123456 dbname=test loadbalance = on target_session_attrs = read-write
(2)连接URI:
kingbase://system:123456@192.168.1.1:54323,192.168.1.2:54300,192.168.1.3:54321/test?loadbalance=on
图2-1 连接参数
2、相关库文件引用,通过相关资料搜集和学习网上的Linux和Windows负载均衡方法,作者使用了以下负载均衡所需的头文件。#indef WIN32表示这部分头文件是Windows所独有,#else表示这部分为Linux所独有,#endif表示后面的内容共同使用。
图2-2 库文件使用
3、全局变量定义,定义主机的数量,定义需要开启的线程数量,定义线程内连接的次数。
图2-3 全局变量定义
4、定义打印KCIResult状态的函数和错误保护函数。
图2-4 状态输出函数和程序退出函数
5、创建表的函数,因为是多主机多线程,如果每一个主机都手动去创建表比较麻烦,把创建表这部分单独提取出来封装成一个函数,方便在多线程中调用。
图2-5 创建表
6、检查表是否存在的函数,构造该函数目的是避免在多主机多线程中重复创建表引发错误,创建表之前做表存在与否的判断,不存在再创建即可。
图2-6 检查表是否存在
7、主程序和函数入口构建,在main中写入以下程序,即可在不同平台调用不同的负载均衡程序。
#ifdef WIN32
/* Windows 平台多线程负载均衡程序入口 */
test_mult_thread_Windows();
#else
/* Linux 平台多线程负载均衡程序入口 */
test_mult_thread_Linux();
#endif
图2-7 不同平台程序入口
2.2 Linux平台多线程负载均衡
1、创建Linux平台的函数,Linux程序的位置如下面示例。
#ifdef WIN32
Windows程序
#else
Linux程序
#endif
在上方标红色位置创建函数,然后定义相关变量,定义线程变量(标识符),然后分配内存,这里的num在2.1中定义为线程数量,即根据线程的数量来分配合适内存。
图2-8 函数创建
2、创建线程,调用pthread_create来创建线程,线程执行的函数为my_thread函数。
图2-9 创建线程
3、线程执行函数。
(1)首先在#endif下方创建*my_thread多线程执行函数。
图2-10 线程执行函数
(2)然后在线程执行函数中构建一个while循环,循环参数为counts,全局变量里面设置线程内的执行次数,然后建立到数据库的连接。
图2-11 线程内循环
(3)然后是获取socket连接信息。
图2-12 scoket连接信息
(4)打印出对端的IP,并将IP与不同的主机进行字符串对比,检测是哪个主机,那么该主机的线程执行次数加一。
图2-13 打印并检测IP
(5)线程内执行数据插入。
图2-14 线程内循环执行
4、执行完上述线程循环创建和线程内循环构建数据库连接之后,打印相关的程序执行结果,阻塞等待线程执行,释放内存,最后返回程序执行结果。
图2-15 等待线程执行完毕释放
2.3 Windows平台多线程负载均衡
1、同2.2,创建函数入口,在下方红色地方创建函数。
#ifdef WIN32
Windows程序
#else
Linux程序
#endif
在上方标红色位置创建函数,然后定义相关变量,定义线程变量(标识符),然后分配内存,这里的num在2.1中定义为线程数量,即根据线程的数量来分配合适内存。
下方程序灰色是因为我在Linux中来写程序,而在Linux中该部分程序是不执行的。
图2-16 Windows多线程函数创建
2、创建线程,循环创建num个线程,利用函数_beginthreadex进行线程创建,程序的执行函数依旧是*my_thread(同2.2是同一个函数),不过注意这里传入的是程序地址。
图2-13 绑定参数
3、执行完上述线程循环创建和线程内循环构建数据库连接之后,打印相关的程序执行结果,阻塞等待线程执行,释放内存,最后返回程序执行结果。
图2-14 打开文件
2.4 Linux中程序执行
1、定义Makefile文件,在Makefile中加上需要编译的文件,比如作者这里是libkcitest6.c,加入数据库的lib和include路径。
图2-15 编辑Makefile文件
2、编译程序,使用make进行程序编译,记得添加openssl环境变量和数据库的lib环境变量,编译完成后生成可执行文件libkcitest5。
图2-17 编译
3、执行程序,在启动数据库的前提下直接执行./testlibkci5,可以看到程序执行结果,总共连接了20次,程序中只写了两个主机,程序将连接负载均衡到每一个主机了。
图2-18 程序执行结果
2.4 Windows中程序执行
Windows的程序编辑和运行,可以参照作者的文章《官网V9各版本LIBKCI驱动提供-赵微》。
(1)创建好工程之后,添加lib和include,然后创建一个test.c文件,把程序复制到test.c中。
(2)执行程序,先右键项目,重新生成,然后再点击上方运行,测试成功。
图3-7 生成解决方案
图3-8 运行测试程序
三、总结
1、负载均衡在Windows上会有所不同,且需要的头文件不一样,使用时要注意使用#ifdef、#else、#endif。
2、执行例程时需要执行openssl的环境变量配置,将openssl版本从1.1.1k转到1.1.1q,并且还需要执行数据库lib环境变量配置。
export PATH=openssl地址/bin:$PATH
export LD_LIBRARY_PATH=openssl地址/lib:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=数据库地址/lib:LD_LIBRARY_PATH
附录一:本文创建的例程源码
/*
* testlibkci5.c
* 多线程负载均衡。
* 分 Windows 平台和 Linux 平台
* #ifdef WIN32 为 Windows 平台
* #else 为 Linux 平台
* 要求数据库中没有 test1 表
* 如有请删除或修改程序中的表名
*
* 输出结果为:
*
* remote IP [192.168.1.1] [44244]
* remote IP [192.168.1.2] [65227]
* ......
* total connections: 20
* 192.168.1.1 ---> 6
* 192.168.1.2 ---> 7
* 192.168.1.3 ---> 6
* 上述的不同 IP 连接次数以实际情况为准
*
*/
#ifdef WIN32
#include <WS2tcpip.h>
#include <windows.h>
/* 扩展,把这个库加到工程文件中,提供对网络相关的API支持 */
#pragma comment(lib,"ws2_32.lib")
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h> /* 注意:编译需要链接线程系统库 -lpthread */
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include "libkci_fe.h"
#define SA struct sockaddr
KCIResult *res = NULL;
int nConn1, nConn2, nConn3;
char IP1[] = "10.12.1.30";
char IP2[] = "10.10.12.252";
char IP3[] = "10.10.11.153";
/* 此处定义线程和线程内连接次数 */
int num = 10; /* 开启线程的数量 */
int counts = 2; /* 线程内循环次数 */
/*
* 多主机连接串
* loadbalance 参数开启负载均衡
* target_session_attrs 参数要求主机具有读写权限
*/
// char connInfo[] = "host=192.168.1.1,192.168.1.2 port=54321,54322 user=system password=123456 dbname=test loadbalance = on target_session_attrs = read-write ";
char connInfo[] = "host=10.12.1.30,10.10.12.252 port=52222,54444 user=system password=123456 dbname=test loadbalance = on connect_timeout=3 ";
/* URI 连接串:char connInfo[] = "kingbase://system:123456@192.168.1.1:54323,192.168.1.2:54300,192.168.1.3:54321/test?loadbalance=on"; */
void* my_thread(void *arg);
/* 打印状态字符串 */
void printStatus(KCIResult *res)
{
KCIExecuteStatus status;
status = KCIResultGetStatusCode(res);
printf("STATUS[%s]\n", KCIResultGetStatusString(status));
}
/* 遇到错误销毁连接并退出程序 */
void exit_KB(KCIConnection *conn, KCIResult *res)
{
if (res)
KCIResultDealloc(res);
KCIConnectionDestory(conn);
exit(1);
}
/* 创建表 */
void createTbl(KCIConnection *conn)
{
char crtTbl[] = "CREATE TABLE test1( ID serial, content text, primary key (ID))";
/* 检查是否成功建立 */
res = KCIStatementExecute(conn, crtTbl);
if (KCIResultGetStatusCode(res) != EXECUTE_COMMAND_OK)
{
fprintf(stderr, "CREATE TABLE error:[%s]", KCIConnectionGetLastError(conn));
exit_KB(conn, res);
}
else
printf("CREATE TABLE test1.\n");
KCIResultDealloc(res);
}
/* 检查表是否存在 */
int isTblExist(KCIConnection *conn, char *tbname)
{
int ret = 0;
char *value;
char sqlstr[100];
memset(sqlstr, 0x00, sizeof(sqlstr));
/* 在系统表中查找 test1 表是否存在 */
sprintf(sqlstr, "select count(*) from sys_class where relname = '%s'", tbname);
res = KCIStatementExecute(conn, sqlstr);
if (KCIResultGetStatusCode(res) != EXECUTE_TUPLES_OK)
{
fprintf(stderr, "isTblExist error [%s]", KCIConnectionGetLastError(conn));
exit_KB(conn, res);
}
value = KCIResultGetColumnValue(res, 0, 0);
ret = atoi(value);
KCIResultDealloc(res);
return ret;
}
#ifdef WIN32
/* Windows 平台多线程操作 */
int test_mult_thread_Windows()
{
int i = 0;
int err;
HANDLE *hThread = NULL;
hThread = (HANDLE *)malloc(sizeof(HANDLE) * num);
memset(hThread, 0, sizeof(HANDLE) * num);
for (i = 0; i < num; i++)
{
/* 创建线程 */
hThread[i] = (HANDLE)_beginthreadex(NULL, 0, my_thread, NULL, 0, NULL);
if (hThread[i] == NULL)
{
printf("create thread hThread[%d] fail\n", i);
return -1;
}
Sleep(1000);
}
printf("total connections: %d\n", num * counts);
printf("%s ---> %d\n", IP1, nConn1);
printf("%s ---> %d\n", IP2, nConn2);
printf("%s ---> %d\n", IP3, nConn3);
/* 以阻塞方式等待 hThread[i] 指定的线程结束 */
for (i = 0; i < num; i++)
{
system("pause");
}
free(hThread);
return 0;
}
#else
/* Linux 平台多线程操作 */
int test_mult_thread_Linux()
{
int i = 0;
int err;
pthread_t *hThread = NULL;
hThread = (pthread_t *)malloc(sizeof(pthread_t) * num);
memset(hThread, 0, sizeof(pthread_t) * num);
for (i = 0; i < num; i++)
{
/* 创建线程,创建成功返回 0 */
err = pthread_create(&hThread[i], NULL, &my_thread, NULL);
if (err != 0)
{
printf("create thread hThread[%d] fail\n", i);
return err;
}
sleep(1);
}
printf("total connections: %d\n", num * counts);
printf("%s ---> %d\n", IP1, nConn1);
printf("%s ---> %d\n", IP2, nConn2);
printf("%s ---> %d\n", IP3, nConn3);
/* 以阻塞方式等待 hThread[i] 指定的线程结束 */
for (i = 0; i < num; i++)
{
pthread_join(hThread[i], NULL);
}
free(hThread);
return 0;
}
#endif
void *my_thread(void *arg)
{
int i;
int ret;
int socket;
char sIP[32 + 1];
char sqlstr[100];
struct sockaddr_in remote;
socklen_t len;
KCIConnection *conn;
len = sizeof(struct sockaddr_in);
/* 多连接数据库 */
i = 0;
while (i < counts)
{
memset(&remote, 0, sizeof(struct sockaddr_in));
/*
* 线程内建立连接
* 不允许两个线程同时尝试操纵同一个 conn 对象
*/
conn = KCIConnectionCreate(connInfo);
if (KCIConnectionGetStatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "KCIConnectionCreate error:[%s]", KCIConnectionGetLastError(conn));
exit_KB(conn, NULL);
}
//printf("建立第%d次连接\n", i);
/* 获取 socket 信息 */
socket = KCIConnectionGetSocket(conn);
ret = getpeername(socket, (SA *)&remote, &len);
if (ret < 0)
{
fprintf(stdout, "getpeername error.");
exit_KB(conn, NULL);
}
/* 打印对端 IP */
memset(sIP, 0x00, sizeof(sIP));
printf("remote IP [%s] [%d]\n", inet_ntop(AF_INET, &remote.sin_addr, sIP, sizeof(sIP)), remote.sin_port);
if (strcmp(IP1, sIP) == 0)
nConn1++;
else if (strcmp(IP2, sIP) == 0)
nConn2++;
// else if (strcmp(IP3, sIP) == 0)
// nConn3++;
if (!isTblExist(conn, "test1"))
createTbl(conn);
memset(sqlstr, 0, sizeof(sqlstr));
sprintf(sqlstr, "insert into test1(content) values('%s')", sIP);
/* 插入数据 */
res = KCIStatementExecute(conn, sqlstr);
if (KCIResultGetStatusCode(res) != EXECUTE_COMMAND_OK)
{
fprintf(stderr, "insert error:[%s]", KCIConnectionGetLastError(conn));
exit_KB(conn, res);
}
KCIResultDealloc(res);
KCIConnectionDestory(conn);
i++;
}
return 0;
}
int main()
{
#ifdef WIN32
/* Windows 平台多线程负载均衡程序入口 */
test_mult_thread_Windows();
#else
/* Linux 平台多线程负载均衡程序入口 */
test_mult_thread_Linux();
#endif
}




