暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

LightDB23.2新特性加强存储过程PL/OraSQL对分布式版本支持

原创 姚崇 2023-06-26
224

从23.2版本开始LightDB加强对LightDB分布式canopy的支持兼容,具体兼容项如下:

解决问题1

sql语句:
create table tab1(a int, b int);
select create_distributed_table('tab1', 'a');
insert into tab1 (values (1,1), (2,2), (3,3));

create or replace package typkg
as
type nt is table of tab1%rowtype;
res nt;
function myfunc(v int) return nt;
end;
/

create or replace package body typkg
as
function myfunc(v int) return nt
is
begin
res(1) := ROW(1,1);
res(2) := ROW(2,2);
res(3) := ROW(3,3);
return res;
end;
end;
/

select * from table(typkg.myfunc(1)) t0 where t0.a in (select a from tab1) order by t0.a;

原因是canopy会重新生成查询语句,把包名丢了
调用顺序如下:
ExecutePlan
-> ExecProcNode
-> CitusExecScan(CustomScanState node)
->AdaptiveExecutor
-> RunLocalExecution
-> ExecuteLocalTaskListExtended
-> ParseQueryString
-> DeparseTaskQuery
-> pg_get_query_def
-> get_select_query_def
-> get_basic_select_query
-> get_from_clause
->get_from_clause_item
->get_rule_expr_funccall
-> get_func_expr
->generate_function_name
其中generate_function_name函数为获取函数名称以及namespace名称,这里面获取namespace时获取不到,因为是package,所以在加上下面逻辑即可
if ((p_result == FUNCDETAIL_NORMAL ||
p_result == FUNCDETAIL_AGGREGATE ||
p_result == FUNCDETAIL_WINDOWFUNC) &&
p_funcid == funcid)
nspname = NULL;
else
{
nspname = get_namespace_name(procform->pronamespace);
/
lightdb add, possible package name */
if (!nspname)
{
nspname = get_package_name(procform->pronamespace, true);
}
}
红色为新增,当获取不到namespace名称时,再去获取一下包名即可。

解决问题2

CREATE TABLE test1 (a int);
select create_distributed_table('test1','a');

CREATE PROCEDURE test_proc6(a int, INOUT b int, INOUT c int)
LANGUAGE plorasql
AS $$
BEGIN
    b := b * a;
    c := c * a;
END;
$$;

CREATE FUNCTION triggerfunc1() RETURNS trigger
LANGUAGE plorasql
AS $$
DECLARE
    z int := 0;
BEGIN
    CALL test_proc6(2, NEW.a, NEW.a);
    RETURN NEW;
END;
$$;

CREATE TRIGGER t1 BEFORE INSERT ON test1 EXECUTE PROCEDURE triggerfunc1();
ERROR:  triggers are not supported on distributed tables when "canopy.enable_unsafe_triggers" is set to "false"
INSERT INTO test1 VALUES (1), (2), (3);
UPDATE test1 SET a = 22 WHERE a = 2;
ERROR:  modifying the partition value of rows is not allowed

其中第一个错误,将参数canopy.enable_unsafe_triggers设置为true即可。
因为分布式表分区键时不能改的,所以第二个错误报错正常。

解决问题3

CREATE TABLE regions(region_id NUMBER PRIMARY KEY,region_name VARCHAR2(25));
select create_distributed_table('regions','region_id');
CREATE TABLE countries(country_id CHAR(2) PRIMARY KEY,country_name VARCHAR2(40),region_id NUMBER REFERENCES regions(region_id));
CREATE TABLE locations(location_id NUMBER(4) PRIMARY KEY,street_address VARCHAR2(40), postal_code VARCHAR2(12), city VARCHAR2(30) NOT NULL, state_province VARCHAR2(25), country_id CHAR(2) REFERENCES countries(country_id));
CREATE TABLE departments(department_id NUMBER(4) PRIMARY KEY,department_name VARCHAR2(30) NOT NULL, manager_id NUMBER(6), location_id NUMBER(4) REFERENCES locations(location_id));--5.create JOBS table
CREATE TABLE jobs(job_id VARCHAR2(10) PRIMARY KEY,jod_title VARCHAR2(35) NOT NULL,min_salary NUMBER(6),max_salary NUMBER(6));
select create_distributed_table('jobs','job_id');
CREATE TABLE employees(employee_id NUMBER(6) PRIMARY KEY,first_name VARCHAR2(20),last_name VARCHAR2(25) NOT NULL,email VARCHAR2(25) NOT NULL UNIQUE,phone_number VARCHAR2(20),hire_date DATE NOT NULL,job_id VARCHAR2(10) NOT NULL REFERENCES jobs(job_id),salary NUMBER(8,2) CHECK(salary>0),commission_pct NUMBER(2,2),manager_id NUMBER(6),department_id NUMBER(4)REFERENCES departments(department_id));
CREATE TABLE job_history(employee_id NUMBER(6) NOT NULL REFERENCES employees(employee_id),start_date DATE NOT NULL,end_date DATE NOT NULL,job_id VARCHAR2(10) NOT NULL REFERENCES jobs(job_id),department_id NUMBER(4) REFERENCES departments(department_id),CONSTRAINT jhist CHECK(end_date > start_date),CONSTRAINT jhist2 PRIMARY KEY (employee_id,start_date));
CREATE TABLE sal_grades(grade NUMBER PRIMARY KEY,min_salary NUMBER(8,2),max_salary NUMBER(8,2));
select create_distributed_table('sal_grades','grade');

INSERT INTO employees VALUES( 101,'Steven', 'King', 'SKING', '515.123.4567',TO_DATE('17-JUN-1987','dd-MON-yyyy'),'AD_PRES',2400,NULL,NULL,11);

报错:
ERROR: could not run distributed query with FOR UPDATE/SHARE commands
HINT: Consider using an equality filter on the distributed table’s partition column.
CONTEXT: SQL statement “SELECT 1 FROM ONLY “public”.“jobs” x WHERE “job_id”::pg_catalog.text OPERATOR(pg_catalog.=) $1::pg_catalog.text FOR KEY SHARE OF x”

canopy源码

DeferredErrorMessage *
DeferErrorIfQueryNotSupported(Query *queryTree)
{
	char *errorMessage = NULL;
	bool preconditionsSatisfied = true;
	const char *errorHint = NULL;
	const char *joinHint = "Consider joining tables on partition column and have "
						   "equal filter on joining columns.";
	const char *filterHint = "Consider using an equality filter on the distributed "
							 "table's partition column.";

	if (queryTree->setOperations)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with UNION, INTERSECT, or "
					   "EXCEPT";
		errorHint = filterHint;
	}

	if (queryTree->hasRecursive)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with RECURSIVE";
		errorHint = filterHint;
	}

	if (queryTree->cteList)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with common table expressions";
		errorHint = filterHint;
	}

	if (queryTree->hasForUpdate)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with FOR UPDATE/SHARE commands";
		errorHint = filterHint;
	}

	if (queryTree->groupingSets)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with GROUPING SETS, CUBE, "
					   "or ROLLUP";
		errorHint = filterHint;
	}

	if (FindNodeMatchingCheckFunction((Node *) queryTree, IsGroupingFunc))
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with GROUPING";
		errorHint = filterHint;
	}

	bool hasUnsupportedJoin = HasUnsupportedJoinWalker((Node *) queryTree->jointree,
													   NULL);
	if (hasUnsupportedJoin)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with join types other than "
					   "INNER or OUTER JOINS";
		errorHint = joinHint;
	}

	bool hasComplexRangeTableType = HasComplexRangeTableType(queryTree);
	if (hasComplexRangeTableType)
	{
		preconditionsSatisfied = false;
		errorMessage = "could not run distributed query with complex table expressions";
		errorHint = filterHint;
	}

	if (FindNodeMatchingCheckFunction((Node *) queryTree->limitCount, IsNodeSubquery))
	{
		preconditionsSatisfied = false;
		errorMessage = "subquery in LIMIT is not supported in multi-shard queries";
	}

	if (FindNodeMatchingCheckFunction((Node *) queryTree->limitOffset, IsNodeSubquery))
	{
		preconditionsSatisfied = false;
		errorMessage = "subquery in OFFSET is not supported in multi-shard queries";
	}

	RTEListProperties *queryRteListProperties = GetRTEListPropertiesForQuery(queryTree);
	if (queryRteListProperties->hasCitusLocalTable ||
		queryRteListProperties->hasPostgresLocalTable)
	{
		preconditionsSatisfied = false;
		errorMessage = "direct joins between distributed and local tables are "
					   "not supported";
		errorHint = LOCAL_TABLE_SUBQUERY_CTE_HINT;
	}

	/* finally check and error out if not satisfied */
	if (!preconditionsSatisfied)
	{
		bool showHint = ErrorHintRequired(errorHint, queryTree);
		return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
							 errorMessage, NULL,
							 showHint ? errorHint : NULL);
	}

	return NULL;
}

这些场景下会报错。所以也不是问题。

解决问题4

sql语句
create function trap_zero_divide(int) returns int as $$
declare x int;
sx smallint;
begin
begin – start a subtransaction
raise notice ‘should see this’;
x := 100 / $1;
raise notice ‘should see this only if % <> 0’, $1;
sx := $1;
raise notice ‘should see this only if % fits in smallint’, $1;
if $1 < 0 then
raise exception ‘% is less than zero’, $1;
end if;
exception
when division_by_zero then
raise notice ‘caught division_by_zero’;
x := -1;
when NUMERIC_VALUE_OUT_OF_RANGE then
raise notice ‘caught numeric_value_out_of_range’;
x := -2;
end;
return x;
end$$ language plorasql;

select trap_zero_divide(0);
预期结果
NOTICE: should see this
NOTICE: caught division_by_zero
trap_zero_divide

           -1

(1 row)
实际结果
ltsql:plorasql_trap.sql:30: ERROR: division by zero
CONTEXT: PL/oraSQL function trap_zero_divide(integer) line 7 at assignment
原因是div_var函数中,针对Oracle和lt模式的报错机制不一样

当是Oracle模式时用的plsqlerrcode,当非Oracle模式时,用的正常sqlerrcode,所以该问题不算问题。

解决问题5

分布式列键值不能修改,不是问题

解决问题6

create table tcur_tb(f1 int);
insert into tcur_tb values (1),(2),(3);

create function sc_test() return setof integer as
declare
  cursor c scroll for select f1 from tcur_tb;
  x integer;
begin
  open c;
  fetch last from c into x;
  while c%FOUND loop
    return next x;
    fetch prior from c into x;
  end loop;
  close c;
end;
/
select * from sc_test();

create or replace function sc_test() return setof integer as
declare
  cursor c no scroll for select f1 from tcur_tb;
  x integer;
begin
  open c;
  fetch last from c into x;
  while c%FOUND loop
    return next x;
    fetch prior from c into x;
  end loop;
  close c;
end;
/

–死循环
select * from sc_test();–预期报错
经验证Ora_AT_Condition函数在清理游标相关内容时Assert掉了。
原因是当时做commit fetch时未考虑异常情况,正常流程记录游标位置是没问题的,但是异常时,则不需要记录位置(未做),所以这里判断一下,当异常流程时,无须记录游标位置。
修改Ora_AT_Condition函数,增加红色框的判断

解决问题7

根据函数参数找不到函数,这个不是分布式问题,oracle单机模式下也能复现。
本设计不解决。
复现方法:
构建一个函数,参数类型中有date类型的,当调用该函数时,用to_date函数即可复现。

解决问题8

CREATE TABLE people_source ( 
  person_id  INTEGER NOT NULL PRIMARY KEY, 
  first_name VARCHAR(20) NOT NULL, 
  last_name  VARCHAR(20) NOT NULL, 
  title      VARCHAR(10) NOT NULL 
);
select create_distributed_table('people_source','person_id');
INSERT INTO people_source VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_source VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');

CREATE OR REPLACE PACKAGE name_pkg IS
  type type_rowid is table of rowid INDEX BY int4;

END;
/

-- SQL function parameters cannot accept associative arrays type
CREATE OR REPLACE PROCEDURE print_name(names2 name_pkg.type_rowid) IS
  i VARCHAR(60);
BEGIN
  raise notice '%',i;
END;
/

DECLARE
  type ref_cursor is ref cursor;
  cur ref_cursor;
  v_rowid name_pkg.type_rowid;
begin
  open cur for 
      SELECT a.rowid from people_source a ;
      loop
        fetch cur bulk collect into v_rowid limit 10;

        for i in 1..v_rowid.count
        LOOP
          RAISE notice '% is %', i, v_rowid(i);
          delete from people_source where rowid=v_rowid(i); --dn节点报错
        end loop;

        exit when cur%notfound;
      end loop;

  v_rowid.delete();
  CLOSE cur;
end;
/

报错内容:

原因是CN这边定义的联合数组类型是私有的,放在内存里的,当CN把SQL语句分发给DN时,DN并不能知道CN定义的联合数组类型,这就导致找不到类型,从而报错。
所以目前这个问题不解决。

create table test(x int, y varchar(100));
select create_distributed_table('test','x');
insert into test values(1, 'One');
insert into test values(2, 'Two');
insert into test values(3, 'Three');
 
create or replace package pkgvar
as
	cursor emp_cur is select * from test;
	emp_rec test%rowtype;
	function get_emp_info return int;
end;
/
 
create or replace package body pkgvar
as
	function get_emp_info return int as
	begin
		open emp_cur;
		loop
			fetch emp_cur into emp_rec;
			exit when emp_cur%notfound;
			raise notice 'emp_rec = %, %', emp_rec.x, emp_rec.y; -- access record fields
		end loop;
		close emp_cur;
		return 0;
	end;
end;
/
 
select pkgvar.get_emp_info();
 
drop package pkgvar;
drop table if exists test;
最后修改时间:2023-06-26 11:24:00
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论