暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

PostgreSQL源码分析——CREATE SCHEMA

原创 chirpyli 2023-03-13
990

这里我们对创建模式的源码进行分析。创建一个schema,主流程如下,需要注意pg_namespace系统表,所有的schema都可在这个系统表中查到,创建一个新的schema,需要向pg_namespace系统表中插入一个数据。插入模式名以及对应的namespace oid的值。

exec_simple_query --> pg_parse_query --> raw_parser --> pg_analyze_and_rewrite --> transformStmt --> pg_plan_queries --> PortalStart --> PortalRun --> ProcessUtility --> standard_ProcessUtility --> CreateSchemaCommand --> NamespaceCreate --> PortalDrop

所有的模式名都在pg_namespace系统表中。

postgres@hangzhou=# \d pg_namespace; Table "pg_catalog.pg_namespace" Column | Type | Collation | Nullable | Default ----------+-----------+-----------+----------+--------- oid | oid | | not null | nspname | name | | not null | nspowner | oid | | not null | nspacl | aclitem[] | | | Indexes: "pg_namespace_nspname_index" UNIQUE, btree (nspname) "pg_namespace_oid_index" UNIQUE, btree (oid)

解析层

可查看gram.y中的定义

/*********************************** * * Manipulate a schema * ************************************/ CreateSchemaStmt: CREATE SCHEMA OptSchemaName AUTHORIZATION RoleSpec OptSchemaEltList { CreateSchemaStmt *n = makeNode(CreateSchemaStmt); /* One can omit the schema name or the authorization id. */ n->schemaname = $3; n->authrole = $5; n->schemaElts = $6; n->if_not_exists = false; $$ = (Node *)n; } | CREATE SCHEMA ColId OptSchemaEltList { CreateSchemaStmt *n = makeNode(CreateSchemaStmt); /* ...but not both */ n->schemaname = $3; n->authrole = NULL; n->schemaElts = $4; n->if_not_exists = false; $$ = (Node *)n; } | CREATE SCHEMA IF_P NOT EXISTS OptSchemaName AUTHORIZATION RoleSpec OptSchemaEltList { CreateSchemaStmt *n = makeNode(CreateSchemaStmt); /* schema name can be omitted here, too */ n->schemaname = $6; n->authrole = $8; if ($9 != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CREATE SCHEMA IF NOT EXISTS cannot include schema elements"), parser_errposition(@9))); n->schemaElts = $9; n->if_not_exists = true; $$ = (Node *)n; } | CREATE SCHEMA IF_P NOT EXISTS ColId OptSchemaEltList { CreateSchemaStmt *n = makeNode(CreateSchemaStmt); /* ...but not here */ n->schemaname = $6; n->authrole = NULL; if ($7 != NIL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CREATE SCHEMA IF NOT EXISTS cannot include schema elements"), parser_errposition(@7))); n->schemaElts = $7; n->if_not_exists = true; $$ = (Node *)n; } ; OptSchemaName: ColId { $$ = $1; } | /* EMPTY */ { $$ = NULL; } ; OptSchemaEltList: OptSchemaEltList schema_stmt { if (@$ < 0) /* see comments for YYLLOC_DEFAULT */ @$ = @2; $$ = lappend($1, $2); } | /* EMPTY */ { $$ = NIL; } ; /* * schema_stmt are the ones that can show up inside a CREATE SCHEMA * statement (in addition to by themselves). */ schema_stmt: CreateStmt | IndexStmt | CreateSeqStmt | CreateTrigStmt | GrantStmt | ViewStmt ;

CreateSchemaStmt的定义:

/* ---------------------- * Create Schema Statement * * NOTE: the schemaElts list contains raw parsetrees for component statements * of the schema, such as CREATE TABLE, GRANT, etc. These are analyzed and * executed after the schema itself is created. * ---------------------- */ typedef struct CreateSchemaStmt { NodeTag type; char *schemaname; /* the name of the schema to create */ RoleSpec *authrole; /* the owner of the created schema */ List *schemaElts; /* schema components (list of parsenodes) */ bool if_not_exists; /* just do nothing if schema already exists? */ } CreateSchemaStmt;

执行层

这块的逻辑基本可以简单认为是向pg_namespace中插入一条元组,表明新定义了一个模式名。

/* * CREATE SCHEMA * * Note: caller should pass in location information for the whole * CREATE SCHEMA statement, which in turn we pass down as the location * of the component commands. This comports with our general plan of * reporting location/len for the whole command even when executing * a subquery. */ Oid CreateSchemaCommand(CreateSchemaStmt *stmt, const char *queryString, int stmt_location, int stmt_len) { const char *schemaName = stmt->schemaname; Oid namespaceId; OverrideSearchPath *overridePath; List *parsetree_list; ListCell *parsetree_item; Oid owner_uid; Oid saved_uid; int save_sec_context; AclResult aclresult; ObjectAddress address; GetUserIdAndSecContext(&saved_uid, &save_sec_context); /* * Who is supposed to own the new schema? */ if (stmt->authrole) owner_uid = get_rolespec_oid(stmt->authrole, false); else owner_uid = saved_uid; /* fill schema name with the user name if not specified */ if (!schemaName) { HeapTuple tuple; tuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(owner_uid)); if (!HeapTupleIsValid(tuple)) elog(ERROR, "cache lookup failed for role %u", owner_uid); schemaName = pstrdup(NameStr(((Form_pg_authid) GETSTRUCT(tuple))->rolname)); ReleaseSysCache(tuple); } /* * To create a schema, must have schema-create privilege on the current * database and must be able to become the target role (this does not * imply that the target role itself must have create-schema privilege). * The latter provision guards against "giveaway" attacks. Note that a * superuser will always have both of these privileges a fortiori. */ aclresult = pg_database_aclcheck(MyDatabaseId, saved_uid, ACL_CREATE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, OBJECT_DATABASE, get_database_name(MyDatabaseId)); check_is_member_of_role(saved_uid, owner_uid); /* Additional check to protect reserved schema names */ if (!allowSystemTableMods && IsReservedName(schemaName)) ereport(ERROR, (errcode(ERRCODE_RESERVED_NAME), errmsg("unacceptable schema name \"%s\"", schemaName), errdetail("The prefix \"pg_\" is reserved for system schemas."))); /* * If if_not_exists was given and the schema already exists, bail out. * (Note: we needn't check this when not if_not_exists, because * NamespaceCreate will complain anyway.) We could do this before making * the permissions checks, but since CREATE TABLE IF NOT EXISTS makes its * creation-permission check first, we do likewise. */ if (stmt->if_not_exists) { namespaceId = get_namespace_oid(schemaName, true); if (OidIsValid(namespaceId)) { /* * If we are in an extension script, insist that the pre-existing * object be a member of the extension, to avoid security risks. */ ObjectAddressSet(address, NamespaceRelationId, namespaceId); checkMembershipInCurrentExtension(&address); /* OK to skip */ ereport(NOTICE, (errcode(ERRCODE_DUPLICATE_SCHEMA), errmsg("schema \"%s\" already exists, skipping", schemaName))); return InvalidOid; } } /* * If the requested authorization is different from the current user, * temporarily set the current user so that the object(s) will be created * with the correct ownership. * * (The setting will be restored at the end of this routine, or in case of * error, transaction abort will clean things up.) */ if (saved_uid != owner_uid) SetUserIdAndSecContext(owner_uid, save_sec_context | SECURITY_LOCAL_USERID_CHANGE); /* Create the schema's namespace */ namespaceId = NamespaceCreate(schemaName, owner_uid, false); /* Advance cmd counter to make the namespace visible */ CommandCounterIncrement(); /* * Temporarily make the new namespace be the front of the search path, as * well as the default creation target namespace. This will be undone at * the end of this routine, or upon error. */ overridePath = GetOverrideSearchPath(CurrentMemoryContext); overridePath->schemas = lcons_oid(namespaceId, overridePath->schemas); /* XXX should we clear overridePath->useTemp? */ PushOverrideSearchPath(overridePath); /* * Report the new schema to possibly interested event triggers. Note we * must do this here and not in ProcessUtilitySlow because otherwise the * objects created below are reported before the schema, which would be * wrong. */ ObjectAddressSet(address, NamespaceRelationId, namespaceId); EventTriggerCollectSimpleCommand(address, InvalidObjectAddress, (Node *) stmt); /* * Examine the list of commands embedded in the CREATE SCHEMA command, and * reorganize them into a sequentially executable order with no forward * references. Note that the result is still a list of raw parsetrees --- * we cannot, in general, run parse analysis on one statement until we * have actually executed the prior ones. */ parsetree_list = transformCreateSchemaStmt(stmt); /* * Execute each command contained in the CREATE SCHEMA. Since the grammar * allows only utility commands in CREATE SCHEMA, there is no need to pass * them through parse_analyze() or the rewriter; we can just hand them * straight to ProcessUtility. */ foreach(parsetree_item, parsetree_list) { Node *stmt = (Node *) lfirst(parsetree_item); PlannedStmt *wrapper; /* need to make a wrapper PlannedStmt */ wrapper = makeNode(PlannedStmt); wrapper->commandType = CMD_UTILITY; wrapper->canSetTag = false; wrapper->utilityStmt = stmt; wrapper->stmt_location = stmt_location; wrapper->stmt_len = stmt_len; /* do this step */ ProcessUtility(wrapper, queryString, PROCESS_UTILITY_SUBCOMMAND, NULL, NULL, None_Receiver, NULL); /* make sure later steps can see the object created here */ CommandCounterIncrement(); } /* Reset search path to normal state */ PopOverrideSearchPath(); /* Reset current user and security context */ SetUserIdAndSecContext(saved_uid, save_sec_context); return namespaceId; } /* ---------------- * NamespaceCreate * * Create a namespace (schema) with the given name and owner OID. * * If isTemp is true, this schema is a per-backend schema for holding * temporary tables. Currently, it is used to prevent it from being * linked as a member of any active extension. (If someone does CREATE * TEMP TABLE in an extension script, we don't want the temp schema to * become part of the extension). And to avoid checking for default ACL * for temp namespace (as it is not necessary). * --------------- */ Oid NamespaceCreate(const char *nspName, Oid ownerId, bool isTemp) { Relation nspdesc; HeapTuple tup; Oid nspoid; bool nulls[Natts_pg_namespace]; Datum values[Natts_pg_namespace]; NameData nname; TupleDesc tupDesc; ObjectAddress myself; int i; Acl *nspacl; /* sanity checks */ if (!nspName) elog(ERROR, "no namespace name supplied"); /* make sure there is no existing namespace of same name */ if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(nspName))) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_SCHEMA), errmsg("schema \"%s\" already exists", nspName))); if (!isTemp) nspacl = get_user_default_acl(OBJECT_SCHEMA, ownerId, InvalidOid); else nspacl = NULL; nspdesc = table_open(NamespaceRelationId, RowExclusiveLock); tupDesc = nspdesc->rd_att; /* initialize nulls and values */ for (i = 0; i < Natts_pg_namespace; i++) { nulls[i] = false; values[i] = (Datum) NULL; } nspoid = GetNewOidWithIndex(nspdesc, NamespaceOidIndexId, Anum_pg_namespace_oid); values[Anum_pg_namespace_oid - 1] = ObjectIdGetDatum(nspoid); namestrcpy(&nname, nspName); values[Anum_pg_namespace_nspname - 1] = NameGetDatum(&nname); values[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(ownerId); if (nspacl != NULL) values[Anum_pg_namespace_nspacl - 1] = PointerGetDatum(nspacl); else nulls[Anum_pg_namespace_nspacl - 1] = true; tup = heap_form_tuple(tupDesc, values, nulls); CatalogTupleInsert(nspdesc, tup); Assert(OidIsValid(nspoid)); table_close(nspdesc, RowExclusiveLock); /* Record dependencies */ myself.classId = NamespaceRelationId; myself.objectId = nspoid; myself.objectSubId = 0; /* dependency on owner */ recordDependencyOnOwner(NamespaceRelationId, nspoid, ownerId); /* dependences on roles mentioned in default ACL */ recordDependencyOnNewAcl(NamespaceRelationId, nspoid, 0, ownerId, nspacl); /* dependency on extension ... but not for magic temp schemas */ if (!isTemp) recordDependencyOnCurrentExtension(&myself, false); /* Post creation hook for new schema */ InvokeObjectPostCreateHook(NamespaceRelationId, nspoid, 0); return nspoid; }
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论