
背景
客户应用大量使用存储过程,并且创建连接后每次只调用一个函数,获得结果关闭连接,也就是大量短连接场景。
为了尽可能高地实现最大并发数与TPS,我们做了很多尝试。
PL/pgSQL引擎介绍
PostgreSQL 存储过程引擎设计非常先进,预留的接口可以实现非常丰富的语言支持,我们熟知的包括:C、PL/pgSQL、Python、Perl,还有非官方的 Java、PHP、JavaScript 等等。PL/pgSQL是最常用的官方过程语言,性能虽然无法与C语言相比,胜在开发效率高,有测试表明它比其他几种官方语言引擎要快得多。为最大可能的提高函数调用的吞吐量,飞象数据对函数引擎进行了深入细致的分析,并且做了大量尝试和改进。
函数执行过程PL/pgSQL引擎的执行入口是在:plpgsql_call_handler(src/pl/plpgsql/src/pl_handler.c),源代码中可以看到,在第一次执行时需要对函数体进行编译,也就是对创建函数时的 $$ ... $$ 部分进行词法和语法解析。然后的执行过程还有更多处理,不在本文讨论范围之内。对于我们提到的项目背景,每次调用都会有一次编译,而后随着连接关闭后端进程退出这个编译结果被舍弃,新连接无法重用。
预编译本着尽少的影响 PostgreSQL 原有架构的原则,我们利用validator函数来实现创建时的编译。这样的方式可以最大程度不影响 PostgreSQL 原有系统结构,而不必像下边提到的 pg_proc 系统表改动,当然也不够优雅。当 validator 函数的参数大于1时,进入预编译逻辑:
if (PG_NARGS()>1) { char *proc_source = PG_GETARG_CSTRING(1); Oid returnType = PG_GETARG_OID(2); char *ret; StringInfoData buf; PLpgSQL_function *func = plpgsql_precompile(proc_source, returnType); initStringInfo(&buf); encodeBlock(&buf, func->action); ret = palloc(buf.len + VARHDRSZ); SET_VARSIZE(ret, buf.len + VARHDRSZ); memcpy(VARDATA(ret), buf.data, buf.len); PG_RETURN_BYTEA_P(ret); }
plpgsql_precompile 原型是 plpgsql_compile,唯一的不同是将编译结果返回,这里不再赘述。
encodeBlock函数将编译结果转换为可以连续存储的二进制类型,最终存放于 pg_proc 表新增的 bytea 类型字段,它递归调用 encodeExpr 函数。
static voidencodeExpr(StringInfo buf, PLpgSQL_expr *expr) { int intdata; const char *intptr = (const char *)&intdata; intdata = expr->dtype; appendBinaryStringInfo(buf, intptr, sizeof(intdata)); if (expr->query) { intdata = strlen(expr->query) + 1; appendBinaryStringInfo(buf, intptr, sizeof(intdata)); appendBinaryStringInfo(buf, expr->query, intdata); } else { intdata = 0; appendBinaryStringInfo(buf, intptr, sizeof(intdata)); } }
新增系统列记录函数编译结果在系统表 pg_proc 增加一列,类型为 bytea,记录函数编译结果,下次调用该函数时,可以重用该编译结果。通过编译结果重用提高系统性能。
反向语法结构体
存放于 pg_proc 的预编译结果需要在函数执行时重新转换成正常的结构体:
static PLpgSQL_expr* decodeExpr(const char *expr_precomp, int *currpos) { int intdata; PLpgSQL_expr *expr = palloc0(sizeof(PLpgSQL_expr)); intdata = *(int *)(expr_precomp + *currpos); *currpos += sizeof(int); expr->dtype = intdata; expr->plan = NULL; expr->paramnos = NULL; expr->rwparam = -1; intdata = *(int *)(expr_precomp + *currpos); *currpos += sizeof(int); if (intdata > 0) { expr->query = pstrdup(expr_precomp + *currpos); *currpos += intdata; } else expr->query = NULL; expr->ns = plpgsql_ns_top(); return expr; }
这里仅作示例,实际的代码量非常大。
测试效果
原生
duration: 600 s number of transactions actually processed: 527512 latency average = 4.550 ms tps = 879.178722 (including connections establishing) tps = 2463.536531 (excluding connections establishing)
预编译
duration: 600 s number of transactions actually processed: 544672 latency average = 4.406 ms tps = 907.780907 (including connections establishing) tps = 2533.390855 (excluding connections establishing)
tps提升2.8%((2533-2463)/2463=2.8%),提升幅度不大,但很明显它确实起到了提升tps的作用。




