关键字:
客户端编程接口、LIBKCI、COPY、例程、人大金仓、KingbaseES、
一、环境准备
安装KES数据库,libkci驱动测试可以正常使用, libkci驱动无法正常使用的请参照作者的文章《产品使用-金仓数据库KingbaseES LIBKCI驱动使用》。
COPY说明:在libkci的手册无COPY例程,本文记录COPY例程的开发过程和测试过程。COPY包括COPY IN和COPY OUT,包括从字符串与数据库表之间的复制,以及文件和数据库表之间的复制。
二、COPY用例
2.1 数据库连接模块
1、连接模块首先创建连接参数信息,这里有两种方式,一是通过命令行输入参数作为数据库连接信息,二是直接修改conninfo中的字符串信息。
图2-1 连接参数
2、连接数据库,建立连接并检查是否连接成功,如果没成功调用函数终止程序。此处的exit_nicely可以参照作者的写法。
static void exit_nicely(KCIConnection *conn)
{
KCIConnectionDestory(conn);
exit(1);
}
图2-2 建立连接
3、设置安全的搜索路径,这样恶意用户就无法取得控制。可根据程序和个人新增schema和设置search_path。
create schema schemaname;
set search_path schemaname;
图2-3 设定安全的搜索路径
4、创建所需的表,建议提前创建,避免在函数中出错,这里创建时插入了一条测试语句,并记录插入数据条数。
图2-4 创建所需表
5、创建程序入口,在主函数中调用批量插入和修改的程序,函数返回值为bool类型,打印函数执行的结果为success或failed,并且调用完程序关闭数据库连接。
图2-5 程序入口
2.2 字符串COPY
1、创建函数testCOPY,定义变量,这里定义了向数据库COPY IN有10条数据,创建文件指针,向数据库发送SQL指令,启动数据库的testQuery表的COPY IN,其中stdin是键盘输入。
图2-6 开启COPY IN
2、向数据库复制数据,循环语句,先sprintf拼接字符串到inbuffer中,然后启动SQL指令往表testQuery中COPY IN数据,累计从字符串复制10条数据进入,需要修改数据的条数只需要修改colnum的值即可。
图2-8 COPY IN数据
3、结束COPY IN,调用函数KCICopySendEOF来发送COPY的END命令,然后利用KCIConnectionFetchResult函数来检查命令是否完成。
图2-9 结束COPY IN
4、启动COPY OUT,向数据库发送指令,开始表testQuery的COPY OUT,stdout为标准输出,整个语句为将整个表拷贝至标准输出。
图2-10 开启COPY OUT
5、复制数据到buffer中,在while循环中使用KCICopyReceiveData函数,每次复制一行数据到buffer中,此函数有返回值,返回
图2-11 复制数据到buffer中
6、在while循环中做判断,其中参照KCI源码中的KCICopyReceiveData函数返回值,0是继续复制,-1是正常复制结束,-2是复制出错。
图2-12 查看源码函数功能
图2-13 根据相关返回值编写程序
2.3 文件COPY
1、这里演示复制两个文件,逻辑会稍微复杂一些,首先创建所需的表,西斯表tmptable中有解决40列。
图2-12 创建预备语句
2、定义一个for循环,因为有两个文件,其中copydata1.txt文件的数据行比较多,而copydata2.txt中的每一列数据比较多。在for循环中首先启动表的COPY IN,是从文件中来,并且定义文件中数据列的分隔为单引号。
图2-13 绑定参数
3、打开文件,在循环中先组文件名字符串,然后利用fopen函数打开文件。
图2-14 打开文件
4、将文件中数据复制到表中,首先利用fgets函数将fp[j]文件指针多代表的文件中读取1024*1024大小的数据,放到容器str中,再使用KCICopySendData函数进行发送数据,将容器str中的数据计算长度后发送到数据库。
KCICopySendData的返回值:如果成功则返回1,如果数据无法发送则返回0(只可能在非块模式下),如果发生错误则返回-1。
图2-15 COPY IN
5、结束COPY IN,调用函数KCICopySendEOF来发送COPY的END命令,KCICopySendEOF的返回值:如果成功则返回1,如果数据无法发送(仅可能在非块模式下)则返回0,如果发生错误则返回-1。结束COPY之后需要关闭文件,输出COPY END日志,再返回COPY函数整个执行结果。
图2-14 结束COPY
2.4 程序执行
1、定义Makefile文件,在Makefile中加上需要编译的文件,比如作者这里是libkcitest6.c,加入数据库的lib和include路径。
图2-15 编辑Makefile文件
2、编译程序,使用make进行程序编译,编译时提示openssl未定义的引用解决方法是添加openssl 1.1.1q的环境变量,如图2-17,编译完成后生成可执行文件libkcitest6。
图2-16 编译文件报错
图2-17 添加环境变量后继续编译通过
3、执行程序,在启动数据库的前提下直接执行./testlibkci6,可以看到COPY IN和COPY OUT10条数据,COPY成功两个文件。
图2-18 程序执行结果
三、总结
1、libkci的驱动文件在安装完数据库后已经生成,不需要再去单独编译libkci,使用libkci例程时,需要在Makefile中定义数据库的lib和include路径。
2、执行例程时需要执行openssl的环境变量配置,将openssl版本从1.1.1k转到1.1.1q,并且还需要执行数据库lib环境变量配置。
export PATH=openssl地址/bin:$PATH
export LD_LIBRARY_PATH=openssl地址/lib:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=数据库地址/lib:LD_LIBRARY_PATH
3、本libkci例程完成了libkci的COPY用例(IN和OUT,文件和字符串),作者接下来将完善libkci负载均衡用例、大对象的二进制写入与读取等,用例的编写和使用可参考作者的同系列文章。
附录一:本文创建的例程源码
/*
* testlibkci6.c
* 测试 libkci(KingbaseES 前端库)的 COPY 功能
* 运行 COPY 需要 copydata1.txt 和 copydata2.txt 两个文件
* copydata1.txt 中数据行比较多
* copydata2.txt 中数据列中的数据比较多
* 将上述两个文件放到testlibkci6.c的当前路径
* 测试过程包括 COPY IN 和 COPY OUT
* 分为字符串的 COPY 和文件的 COPY
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "libkci_fe.h"
/* 定义全局变量 */
KCIResult *res = NULL;
KCIConnection *conn;
int icols = 0;
/* 遇到错误关闭数据库连接,并终止程序 */
static void exit_nicely(KCIConnection *conn)
{
KCIConnectionDestory(conn);
exit(1);
}
/* 使用结束后释放KCIResult */
void exit_PQ(KCIConnection *conn, KCIResult *res)
{
if (res)
KCIResultDealloc(res);
}
/*
* COPY 在表和标准文件系统文件之间移动数据。
* COPY TO 把一个表的内容复制到一个文件,
* 而 COPY FROM 则从一个文件复制数据到一个表(把数据追加到表中原有数据),
* COPY TO 也能复制一个 SELECT 查询的结果。
* 如果指定了一个列列表,COPY TO 将只把指定列的数据复制到文件。
* 对于 COPY FROM,文件中的每个字段将按顺序插入到指定列中。
* COPY FROM 命令的列列表中没有指定的表列则会采纳其默认值。
*/
bool testCOPY()
{
int i, j;
int ret;
int colnum = 10;
char *buffer[1];
char inbuffer[100];
FILE *fp[2];
char filename[20] = {0};
char sSql[1024];
char str[1024*1024+1];
printf("****test copy begin****\n");
printf("COPY IN add %d col\n", colnum);
/* 启动 COPY IN */
res = KCIStatementExecute(conn, "copy testQuery from stdin");
if (KCIResultGetStatusCode(res) != EXECUTE_COPY_IN)
{
fprintf(stderr, "copy in error:[%s]", KCIConnectionGetLastError(conn));
exit_PQ(conn, res);
return false;
}
KCIResultDealloc(res);
/* 将字符串中数据 COPY IN 到表 testQuery 中 */
for (j = 0; j < colnum; j++)
{
memset(inbuffer, 0x00, sizeof(inbuffer));
sprintf(inbuffer, "%d auto auto auto auto autoooooooooo auto\n", icols);
ret = KCICopySendData(conn, inbuffer, strlen(inbuffer));
if (ret < 0)
{
printf("errmessage:[%s]\n", KCIConnectionGetLastError(conn));
printf("put copy data error! [%d]\n", ret);
}
icols++;
}
ret = KCICopySendEOF(conn, NULL);
if (ret < 0)
{
printf("put copy end error! [%d]\n", ret);
}
res = KCIConnectionFetchResult(conn);
KCIResultDealloc(res);
/* 启动 COPY OUT */
printf("COPY OUT %d col\n", colnum);
res = KCIStatementExecute(conn, "copy testQuery to stdout");
if (KCIResultGetStatusCode(res) != EXECUTE_COPY_OUT)
{
fprintf(stderr, "copy out error:[%s]", KCIConnectionGetLastError(conn));
exit_PQ(conn, res);
return false;
}
KCIResultDealloc(res);
i = 0;
/* 从数据库表中数据 COPY OUT 到 buffer 中 */
while(1)
{
memset(buffer, 0x00, sizeof(buffer));
ret = KCICopyReceiveData(conn, buffer, 1);
if (ret > 0)
{
printf("%s", buffer[0]);
i++;
}
else if (ret == 0)
{
continue;
}
else if (ret == -1)
{
printf("no data now[%d]\n", ret);
break;
}
else if (ret == -2)
{
res = KCIConnectionFetchResult(conn);
printf("errmessage:[%s]\n", KCIResultGetErrorString(res));
KCIResultDealloc(res);
break;
}
}
/* 删除和创建表 tmptable */
res = KCIStatementExecute(conn, "drop table if exists tmptable");
KCIResultDealloc(res);
memset(sSql, 0, sizeof(sSql));
strcpy(sSql, "CREATE TABLE tmptable (VAL1 NUMERIC(24,0), VAL2 NUMERIC(10,0), VAL3 NUMERIC(10,0), VAL4 NUMERIC(20,0), VAL5 VARCHAR2(4096), VAL6 VARCHAR2(4096), VAL7 VARCHAR2(4096), VAL8 VARCHAR2 (4096), VAL9 VARCHAR2(4096), VAL10 VARCHAR2(100), VAL11 VARCHAR2(50), VAL12 VARCHAR2(100), VAL13 VARCHAR2(100), VAL14 VARCHAR2 (50), VAL15 VARCHAR2 (50), VAL16 TIMESTAMP(6), VAL17 TIMESTAMP(6), VAL18 NUMERIC(10,0), VAL19 VARCHAR2 (100), VAL20 NUMERIC (24,0), VAL21 NUMERIC (24,0), VAL22 NUMERIC (5,2), VAL23 NUMERIC (10,4), VAL24 NUMERIC (5,2), VAL25 NUMERIC (6,1), VAL26 NUMERIC(5,2), VAL27 NUMERIC(10,4), VAL28 NUMERIC (5,2), VAL29 NUMERIC (6,1), VAL30 NUMERIC(5,2), VAL31 NUMERIC(11,8), VAL32 NUMERIC(11,8), VAL33 NUMERIC (8,1), VAL34 VARCHAR2 (400), VAL35 TIMESTAMP(6), VAL36 BYTEA);");
res = KCIStatementExecute(conn, sSql);
if (KCIResultGetStatusCode(res) != EXECUTE_COMMAND_OK)
{
fprintf(stderr, "create table tmptable error:[%s]\n", KCIConnectionGetLastError(conn));
exit_PQ(conn, res);
return false;
}
KCIResultDealloc(res);
for (j = 0; j < 2; j++)
{
/*
* 启动 COPY IN ,
* 从 txt、csv 文件进行数据导入
* delimiter AS ' ':以空格来分割文件每行中的列
* QUOTE AS '''':指定数据中使用的引号为单引号 ''
* 例如文件中的数据存放格式为 1 'a' 'b' ...
*/
res = KCIStatementExecute(conn, "COPY tmptable FROM STDIN with csv delimiter as ' ' QUOTE AS ''''");
if (KCIResultGetStatusCode(res) != EXECUTE_COPY_IN)
{
fprintf(stderr, "copy in error:[%s]", KCIConnectionGetLastError(conn));
exit_PQ(conn, res);
return false;
}
KCIResultDealloc(res);
sprintf(filename,"copydata%d.txt", j + 1);
/* 打开文件 copydata.txt */
if ((fp[j] = fopen(filename, "r")) == NULL)
{
printf("open file error.\n");
return false;
}
printf("COPY FILE%d\n", j + 1);
/*
* 将文件中数据 COPY IN 到表 tmptable 中
*/
memset(str, 0, sizeof(str));
while (fgets(str, 1024 * 1024, fp[j]))
{
ret = KCICopySendData(conn, str, strlen(str));
if (ret < 0)
{
printf("errmessage:[%s]\n", KCIConnectionGetLastError(conn));
printf("put copy data error! [%d]\n", ret);
}
}
ret = KCICopySendEOF(conn, NULL);
if (ret < 0)
{
printf("put copy file error! [%d]\n", ret);
}
fclose(fp[j]);
}
printf("****test copy end****\n");
return true;
}
int main(int argc, char **argv)
{
const char *conninfo;
/*
* 如果用户在命令行上提供了一个参数,将它用作连接信息串。如:
* ./testlibkci6 "hostaddr=127.0.0.1 port=54321 user=system dbname=test password=123456"
* 否则默认用设置 dbname=kingbase 并为所有其他链接参数使用环境变量或默认值。
*/
if (argc > 1)
conninfo = argv[1];
else
conninfo = "host=10.12.1.30 port=52222 user=system password=123456 dbname=test sslmode=disable";
//conninfo = "dbname = kingbase";
/* 建立一个到数据库的连接 */
conn = KCIConnectionCreate(conninfo);
/* 检查看后端连接是否成功建立 */
if (KCIConnectionGetStatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s",
KCIConnectionGetLastError(conn));
exit_nicely(conn);
}
/* 设置总是安全的搜索路径,这样恶意用户就无法取得控制。*/
res = KCIStatementExecute(conn,"SELECT sys_catalog.set_config('search_path', '', false)");
if (KCIResultGetStatusCode(res) != EXECUTE_TUPLES_OK)
{
fprintf(stderr, "SET failed: %s", KCIConnectionGetLastError(conn));
KCIResultDealloc(res);
exit_nicely(conn);
}
/*
* 任何时候不再需要 KCIResult 时,应该 KCIResultDealloc 它来避免内存泄露
*/
KCIResultDealloc(res);
/* 创建所需的表,并往表中插入一条数据 */
res = KCIStatementExecute(conn, "drop table if exists testQuery");
KCIResultDealloc(res);
res = KCIStatementExecute(conn, "create table testQuery (id int, val1 text, val2 text, val3 text, val4 text, val5 text, val6 text)");
KCIResultDealloc(res);
res = KCIStatementExecute(conn, "insert into testQuery values(0, '人大\', '金仓', '数据库', '000','000','000');");
KCIResultDealloc(res);
icols++;
/*
* 测试 COPY 的程序入口
* 定义 bool 类型变量 b 来接收 testCOPY 的结果,成功则返回 true
*/
bool b = testCOPY();
printf("testCOPY %s\n", (b?"success.":"failed!"));
/* 关闭数据库的连接并且清理 */
KCIConnectionDestory(conn);
return 0;
}




