暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

PostgreSQL pg_clickhouse 0.1.0 插件的安装和使用

原创 ByteHouse 2025-12-19
913

PostgreSQL pg_clickhouse 0.1.0 插件的安装和使用

clickhouse公司开发的pg_clickhouse可以由PostgreSQL数据库调用clickhouse数据库的能力,实现快速的查询。

https://github.com/ClickHouse/pg_clickhouse/blob/main/doc/pg_clickhouse.md

该库包含一个 PostgreSQL 扩展,支持在 ClickHouse 数据库上执行远程查询,包括一个外部数据包装器(foreign data wrapper)。其兼容 PostgreSQL 13 及以上版本,以及 ClickHouse 23 及以上版本。

版本控制策略

pg_clickhouse 的公开版本遵循语义化版本控制(Semantic Versioning):

  • 主版本号(Major):当 API 发生不兼容变更时递增
  • 次版本号(Minor):当新增向后兼容的 SQL 功能时递增
  • 修订版本号(Patch):仅发生二进制层面的兼容变更时递增

安装后,PostgreSQL 会跟踪该扩展的两个版本变体:

  • 库版本(Library Version):在 PostgreSQL 18 及以上版本中由 PG_MODULE_MAGIC 定义,包含完整的语义化版本号,可通过 pg_get_loaded_modules() 函数查看
  • 扩展版本(Extension Version):在控制文件中定义,仅包含主版本号和次版本号,可通过 pg_catalog.pg_extension 表、pg_available_extension_versions() 函数或 \dx pg_clickhouse 命令查看

实际使用中:

  • 仅修订版本号递增的更新(如从 v0.1.0 到 v0.1.1):所有已加载 v0.1 版本的数据库无需执行 ALTER EXTENSION 即可享受更新带来的优化
  • 次版本号或主版本号递增的更新:会附带 SQL 升级脚本,所有已安装该扩展的数据库必须执行 ALTER EXTENSION pg_clickhouse UPDATE 才能完成升级

安装

快速开始

体验 pg_clickhouse 最简单的方式是使用 Docker 镜像,该镜像基于标准 PostgreSQL Docker 镜像并集成了 pg_clickhouse 扩展:

docker run --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \ -d ghcr.io/clickhouse/pg_clickhouse:18 docker exec -it pg_clickhouse psql -U postgres

使用

CREATE EXTENSION pg_clickhouse; CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw OPTIONS(driver 'binary', host 'localhost', dbname 'taxi'); CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv OPTIONS (user 'default'); CREATE SCHEMA taxi; IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;

从源代码编译

PostgreSQL 和 curl 开发包会在环境变量中包含 pg_configcurl_config,因此您只需运行 make(或 gmake),然后执行 make install,最后在数据库中运行 CREATE EXTENSION pg_clickhouse 即可。

RedHat / CentOS / Yum 系统

sudo yum install \ postgresql-server \ libcurl-devel \ libuuid-devel \ openssl-libs \ automake \ cmake \ gcc

从 PGXN 安装

满足上述依赖后,使用 PGXN 客户端(可通过 Homebrew、Apt、Yum 安装,包名均为 pgxnclient)下载、编译并安装 pg_clickhouse

pgxn install pg_clickhouse

编译与安装

要构建并安装 ClickHouse 库和 pg_clickhouse 扩展,运行以下命令:

make sudo make install

特殊情况处理

若主机存在多个 PostgreSQL 安装版本,需指定对应的 pg_config 路径:

export PG_CONFIG=/usr/lib/postgresql/18/bin/pg_config make sudo make install

curl_config 未在环境变量中,可显式指定路径:

export CURL_CONFIG=/opt/homebrew/opt/curl/bin/curl-config make sudo make install

若遇到如下错误:

"Makefile", line 8: Need an operator

需使用 GNU make(系统中可能以 gmake 命名):

gmake gmake install gmake installcheck

若遇到如下错误:

make: pg_config: Command not found

需确保 pg_config 已安装且在环境变量中。若通过 RPM 等包管理工具安装 PostgreSQL,需同时安装 -devel 开发包。必要时显式指定 pg_config 路径:

export PG_CONFIG=/path/to/pg_config make sudo make install

若需在 PostgreSQL 18 及以上版本中安装到自定义路径,可在 install 目标中指定 prefix 参数(其他 make 目标无需指定):

sudo make install prefix=/usr/local/extras

然后在 postgresql.conf 中配置以下参数,确保自定义路径被识别:

extension_control_path = '/usr/local/extras/postgresql/share:$system'
dynamic_library_path   = '/usr/local/extras/postgresql/lib:$libdir'

测试

安装扩展后,运行以下命令执行测试套件:

make installcheck

若遇到如下错误:

ERROR: must be owner of database regression

需使用超级用户(如默认的 postgres 超级用户)运行测试:

make installcheck PGUSER=postgres

加载扩展

pg_clickhouse 安装完成后,可通过以下步骤加载到数据库:

以超级用户身份连接数据库

执行以下 SQL 语句:

CREATE EXTENSION pg_clickhouse;

若需将 pg_clickhouse 及其所有支持对象安装到指定 schema,使用 SCHEMA 子句指定:

CREATE SCHEMA env; CREATE EXTENSION pg_clickhouse SCHEMA env;

依赖项

  • 运行依赖:PostgreSQL 13+、libcurl、libuuid
  • 构建依赖:C/C++ 编译器、libSSL、GNU make、CMake

使用 pg_clickhouse

以下 SQL 语句用于操作 pg_clickhouse 扩展。

CREATE EXTENSION

使用 CREATE EXTENSION 将 pg_clickhouse 添加到数据库:

CREATE EXTENSION pg_clickhouse;

推荐使用 WITH SCHEMA 指定扩展安装的 schema:

CREATE SCHEMA ch; CREATE EXTENSION pg_clickhouse WITH SCHEMA ch;

ALTER EXTENSION

使用 ALTER EXTENSION 修改 pg_clickhouse 扩展配置:

安装新版本后,使用 UPDATE 子句升级:

ALTER EXTENSION pg_clickhouse UPDATE;

使用 SET SCHEMA 将扩展迁移到新的 schema:

CREATE SCHEMA ch;
ALTER EXTENSION pg_clickhouse SET SCHEMA ch;

DROP EXTENSION

使用 DROP EXTENSION 从数据库中移除 pg_clickhouse:

DROP EXTENSION pg_clickhouse;

若存在依赖该扩展的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:

DROP EXTENSION pg_clickhouse CASCADE;

CREATE SERVER

使用 CREATE SERVER 创建连接 ClickHouse 服务器的外部服务器:

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');

支持的选项:

  • driver:ClickHouse 连接驱动,可选值为 “binary” 或 “http”,必填项
  • dbname:连接时使用的 ClickHouse 数据库,默认值为 “default”
  • host:ClickHouse 服务器主机名,默认值为 “localhost
  • port:连接端口,默认值如下:
    • driver 为 “binary” 且 host 是 ClickHouse Cloud 主机:9440
    • driver 为 “binary” 且 host 非 ClickHouse Cloud 主机:9004
    • driver 为 “http” 且 host 是 ClickHouse Cloud 主机:8443
    • driver 为 “http” 且 host 非 ClickHouse Cloud 主机:8123

ALTER SERVER

使用 ALTER SERVER 修改外部服务器配置:

ALTER SERVER taxi_srv OPTIONS (SET driver 'http');

支持的选项与 CREATE SERVER 一致。

DROP SERVER

使用 DROP SERVER 移除外部服务器:

DROP SERVER taxi_srv;

若存在依赖该服务器的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:

DROP SERVER taxi_srv CASCADE;

CREATE USER MAPPING

使用 CREATE USER MAPPING 将 PostgreSQL 用户映射到 ClickHouse 用户。例如,将当前 PostgreSQL 用户映射到 taxi_srv 外部服务器对应的 ClickHouse 用户:

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv OPTIONS (user 'demo');

支持的选项:

  • user:ClickHouse 用户名,默认值为 “default”
  • password:ClickHouse 用户密码

ALTER USER MAPPING

使用 ALTER USER MAPPING 修改用户映射配置:

ALTER USER MAPPING FOR CURRENT_USER SERVER taxi_srv OPTIONS (SET user 'default');

支持的选项与 CREATE USER MAPPING 一致。

DROP USER MAPPING

使用 DROP USER MAPPING 移除用户映射:

DROP USER MAPPING FOR CURRENT_USER SERVER taxi_srv;

IMPORT FOREIGN SCHEMA

使用 IMPORT FOREIGN SCHEMA 将 ClickHouse 数据库中的所有表导入为 PostgreSQL 中的外部表:

CREATE SCHEMA taxi; IMPORT FOREIGN SCHEMA demo FROM SERVER taxi_srv INTO taxi;

使用 LIMIT TO 限制仅导入指定表:

IMPORT FOREIGN SCHEMA demo LIMIT TO (trips) FROM SERVER taxi_srv INTO taxi;

使用 EXCEPT 排除指定表:

IMPORT FOREIGN SCHEMA demo EXCEPT (users) FROM SERVER taxi_srv INTO taxi;

pg_clickhouse 会执行以下操作:

  1. 获取指定 ClickHouse 数据库(上述示例中为 “demo”)的所有表列表
  2. 获取每个表的列定义
  3. 执行 CREATE FOREIGN TABLE 命令创建外部表,列会使用支持的数据类型,并在可检测的情况下包含 CREATE FOREIGN TABLE 支持的选项

⚠️ 导入标识符的大小写保留规则

IMPORT FOREIGN SCHEMA 会对导入的表名和列名执行 quote_identifier() 函数,这意味着包含大写字符或空格的标识符会被双引号包裹。因此,在 PostgreSQL 查询中必须对这类表名和列名使用双引号。全小写且无空格的标识符无需引号。

示例:

ClickHouse 中的表定义:

CREATE OR REPLACE TABLE test
(
    id UInt64,
    Name TEXT,
    updatedAt DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY id;

通过 IMPORT FOREIGN SCHEMA 创建的外部表:

CREATE TABLE test
(
    id          BIGINT      NOT NULL,
    "Name"      TEXT        NOT NULL,
    "updatedAt" TIMESTAMPTZ NOT NULL
);

查询时需正确使用双引号:

SELECT id, "Name", "updatedAt" FROM test;

若需创建名称不同或全小写(大小写不敏感)的对象,请使用 CREATE FOREIGN TABLE

CREATE FOREIGN TABLE

使用 CREATE FOREIGN TABLE 创建可查询 ClickHouse 数据的外部表:

CREATE FOREIGN TABLE uact ( user_id bigint NOT NULL, page_views int, duration smallint, sign smallint ) SERVER taxi_srv OPTIONS( table_name 'uact', engine 'CollapsingMergeTree' );

支持的表选项:

  • database:远程 ClickHouse 数据库名,默认值为外部服务器定义的数据库
  • table_name:远程 ClickHouse 表名,默认值为外部表的名称
  • engine:ClickHouse 表使用的引擎。对于 CollapsingMergeTree()AggregatingMergeTree(),pg_clickhouse 会自动将参数应用到该表上执行的函数表达式中

列数据类型需与远程 ClickHouse 表的列类型匹配。对于 AggregateFunction 类型和 SimpleAggregateFunction 类型的列,需将数据类型映射到函数对应的 ClickHouse 类型,并通过列选项指定聚合函数名称:

  • AggregateFunction:应用于 AggregateFunction 类型列的聚合函数名称
  • SimpleAggregateFunction:应用于 SimpleAggregateFunction 类型列的聚合函数名称

示例:

CREATE FOREIGN TABLE test ( column1 bigint OPTIONS(AggregateFunction 'uniq'), column2 integer OPTIONS(AggregateFunction 'anyIf'), column3 bigint OPTIONS(AggregateFunction 'quantiles(0.5, 0.9)') ) SERVER clickhouse_srv;

对于带有 AggregateFunction 函数的列,pg_clickhouse 会自动在评估该列的聚合函数后追加 Merge

ALTER FOREIGN TABLE

使用 ALTER FOREIGN TABLE 修改外部表定义:

ALTER TABLE table ALTER COLUMN b OPTIONS (SET AggregateFunction 'count');

支持的表选项和列选项与 CREATE FOREIGN TABLE 一致。

DROP FOREIGN TABLE

使用 DROP FOREIGN TABLE 移除外部表:

DROP FOREIGN TABLE uact;

若存在依赖该外部表的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:

DROP FOREIGN TABLE uact CASCADE;

函数和运算符参考

数据类型映射

pg_clickhouse 将 ClickHouse 数据类型映射为以下 PostgreSQL 数据类型:

ClickHouse 类型 PostgreSQL 类型 备注
Bool boolean
Date date
DateTime timestamp
Decimal numeric
Float32 real
Float64 double precision
IPv4 inet
IPv6 inet
Int16 smallint
Int32 integer
Int64 bigint
Int8 smallint
JSON jsonb 仅支持 HTTP 引擎
String text
UInt16 integer
UInt32 bigint
UInt64 bigint 当值超过 BIGINT 最大值时会报错
UInt8 smallint
UUID uuid

函数

以下函数提供查询 ClickHouse 数据库的接口。

clickhouse_raw_query

SELECT clickhouse_raw_query( 'CREATE TABLE t1 (x String) ENGINE = Memory', 'host=localhost port=8123' );

通过 HTTP 接口连接 ClickHouse 服务,执行单个查询后断开连接。可选的第二个参数为连接字符串,默认值为 host=localhost port=8123。支持的连接参数:

  • host:连接的主机名,必填项
  • port:HTTP 端口,默认值为 8123;若 host 是 ClickHouse Cloud 主机,默认值为 8443
  • dbname:连接的数据库名
  • username:连接使用的用户名,默认值为 “default”
  • password:认证密码,默认值为空

该函数适用于无返回结果的查询;若查询有返回值,会以单个文本值形式返回:

SELECT clickhouse_raw_query( 'SELECT schema_name, schema_owner from information_schema.schemata', 'host=localhost port=8123' );

返回结果:

clickhouse_raw_query --------------------------------- INFORMATION_SCHEMA default+ default default + git default + information_schema default+ system default + (1 row)

下推函数(Pushdown Functions)

所有在查询 ClickHouse 外部表的条件子句(HAVINGWHERE)中使用的 PostgreSQL 内置函数,都会自动以相同名称和签名下推到 ClickHouse 执行。但部分函数的名称或签名不同,需映射为对应的 ClickHouse 函数。pg_clickhouse 支持以下函数映射:

  • date_part
    • date_part('day')toDayOfMonth
    • date_part('doy')toDayOfYear
    • date_part('dow')toDayOfWeek
    • date_part('year')toYear
    • date_part('month')toMonth
    • date_part('hour')toHour
    • date_part('minute')toMinute
    • date_part('second')toSecond
    • date_part('quarter')toQuarter
    • date_part('isoyear')toISOYear
    • date_part('week')toISOYear
    • date_part('epoch')toISOYear
  • date_trunc
    • date_trunc('week')toMonday
    • date_trunc('second')toStartOfSecond
    • date_trunc('minute')toStartOfMinute
    • date_trunc('hour')toStartOfHour
    • date_trunc('day')toStartOfDay
    • date_trunc('month')toStartOfMonth
    • date_trunc('quarter')toStartOfQuarter
    • date_trunc('year')toStartOfYear
  • array_positionindexOf
  • btrimtrimBoth
  • strposposition
  • regexp_likematch

自定义函数(Custom Functions)

pg_clickhouse 提供以下自定义函数,用于为无 PostgreSQL 等效函数的 ClickHouse 函数提供外部查询下推支持。若这些函数无法下推,会抛出异常:

  • dictGet

下推类型转换(Pushdown Casts)

pg_clickhouse 支持将 CAST(x AS bigint) 等类型转换操作下推到 ClickHouse(仅适用于兼容的数据类型)。若类型不兼容,下推会失败(例如,若 x 是 ClickHouse 的 UInt64 类型,ClickHouse 会拒绝该转换)。

为支持将类型转换下推到不兼容的数据类型,pg_clickhouse 提供以下函数。若这些函数无法下推,会在 PostgreSQL 中抛出异常:

  • toUInt8
  • toUInt16
  • toUInt32
  • toUInt64
  • toUInt128

下推聚合函数(Pushdown Aggregates)

以下 PostgreSQL 聚合函数支持下推到 ClickHouse 执行:

  • count

自定义聚合函数(Custom Aggregates)

pg_clickhouse 提供以下自定义聚合函数,用于为无 PostgreSQL 等效函数的 ClickHouse 聚合函数提供外部查询下推支持。若这些函数无法下推,会抛出异常:

  • argMax
  • argMin
  • uniq
  • uniqCombined
  • uniqCombined64
  • uniqExact
  • uniqHLL12
  • uniqTheta
  • quantile
  • quantileExact

下推有序集合聚合函数(Pushdown Ordered Set Aggregates)

以下有序集合聚合函数会映射到 ClickHouse 的参数化聚合函数,将直接参数作为参数传递,将 ORDER BY 表达式作为参数传入。例如:

PostgreSQL 查询:

SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY a) FROM t1;

映射为 ClickHouse 查询:

SELECT quantile(0.25)(a) FROM t1;

注意:不支持非默认的 ORDER BY 后缀(DESCNULLS FIRST),否则会抛出错误。

支持的函数:

  • percentile_cont(double)quantile
  • quantile(double)quantile
  • quantileExact(double)quantileExact

会话设置

通过设置 pg_clickhouse.session_settings 运行时参数,可配置后续查询中 ClickHouse 的会话设置。示例:

SET pg_clickhouse.session_settings = 'join_use_nulls 1, final 1';

默认值为 join_use_nulls 1。若需使用 ClickHouse 服务器的默认设置,可将其设为空字符串:

SET pg_clickhouse.session_settings = '';

语法规则:

以逗号分隔的键值对列表,键和值之间用一个或多个空格分隔

键必须是 ClickHouse 支持的设置项

若值中包含空格、逗号或反斜杠,需用反斜杠转义:

SET pg_clickhouse.session_settings = 'join_algorithm grace_hash\,hash';

也可使用单引号包裹值以避免转义空格和逗号;若需避免双引号转义,可使用美元引号(dollar quoting):

SET pg_clickhouse.session_settings = $$join_algorithm 'grace_hash,hash'$$;

若需设置多个配置项以提高可读性,可使用多行格式:

SET pg_clickhouse.session_settings TO $$ connect_timeout 2, count_distinct_implementation uniq, final 1, group_by_use_nulls 1, join_algorithm 'prefer_partial_merge', join_use_nulls 1, log_queries_min_type QUERY_FINISH, max_block_size 32768, max_execution_time 45, max_result_rows 1024, metrics_perf_events_list 'this,that', network_compression_method ZSTD, poll_interval 5, totals_mode after_having_auto $$;

pg_clickhouse 不会验证这些设置项,仅会在每个查询中传递给 ClickHouse。因此,该参数支持各个 ClickHouse 版本的所有设置项。

注意:设置 pg_clickhouse.session_settings 前需确保 pg_clickhouse 已加载,可通过库预加载(shared library preloading)或使用扩展中的任意对象来确保加载。

测试用例

启动 ClickHouse

首先,若尚未部署 ClickHouse 数据库,可通过 Docker 快速启动:

docker run -d --network host --name clickhouse -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse docker exec -it clickhouse clickhouse-client

执行:

[root@bigdata ~]# docker run -d --network host --name clickhouse -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse
Unable to find image 'clickhouse:latest' locally
latest: Pulling from library/clickhouse
7e49dc6156b0: Pull complete 
66a9f27340b0: Pull complete 
4ea1943fd65c: Pull complete 
c4e91a1791f4: Pull complete 
b95838ab0602: Pull complete 
e70e29afad90: Pull complete 
596aaa9e4cb7: Pull complete 
077cf31b66a6: Pull complete 
Digest: sha256:0c1acee29c905829331544ec71342ed4346a6383da60f0c488f68b6b45009010
Status: Downloaded newer image for clickhouse:latest
WARNING: Published ports are discarded when using host network mode
22ec632560f4d7ddafc04610623ceb80e9588ac5fa702c13eff57b15ad8578ac
[root@bigdata ~]# docker exec -it clickhouse clickhouse-client
ClickHouse client version 25.11.2.24 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 25.11.2.

Warnings:
 * Delay accounting is not enabled, OSIOWaitMicroseconds will not be gathered. You can enable it using `sudo sh -c 'echo 1 > /proc/sys/kernel/task_delayacct'` or by using sysctl.
 * Linux transparent hugepages are set to "always". Check /sys/kernel/mm/transparent_hugepage/enabled
 * Available disk space for data at server startup is too low (1GiB): /var/lib/clickhouse/
 * Available disk space for logs at server startup is too low (1GiB): /var/log/clickhouse-server

bigdata :) 

创建表

参考 ClickHouse 官方教程,创建存储纽约市出租车数据集的数据库和表:

CREATE DATABASE taxi; CREATE TABLE taxi.trips ( trip_id UInt32, vendor_id Enum8( '1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15 ), pickup_date Date, pickup_datetime DateTime, dropoff_date Date, dropoff_datetime DateTime, store_and_fwd_flag UInt8, rate_code_id UInt8, pickup_longitude Float64, pickup_latitude Float64, dropoff_longitude Float64, dropoff_latitude Float64, passenger_count UInt8, trip_distance Float64, fare_amount Decimal(10, 2), extra Decimal(10, 2), mta_tax Decimal(10, 2), tip_amount Decimal(10, 2), tolls_amount Decimal(10, 2), ehail_fee Decimal(10, 2), improvement_surcharge Decimal(10, 2), total_amount Decimal(10, 2), payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4), trip_type UInt8, pickup FixedString(25), dropoff FixedString(25), cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3), pickup_nyct2010_gid Int8, pickup_ctlabel Float32, pickup_borocode Int8, pickup_ct2010 String, pickup_boroct2010 String, pickup_cdeligibil String, pickup_ntacode FixedString(4), pickup_ntaname String, pickup_puma UInt16, dropoff_nyct2010_gid UInt8, dropoff_ctlabel Float32, dropoff_borocode UInt8, dropoff_ct2010 String, dropoff_boroct2010 String, dropoff_cdeligibil String, dropoff_ntacode FixedString(4), dropoff_ntaname String, dropoff_puma UInt16 ) ENGINE = MergeTree PARTITION BY toYYYYMM(pickup_date) ORDER BY pickup_datetime;

导入数据集

执行以下语句导入数据(数据来源于 S3 存储的纽约市出租车数据集):

INSERT INTO taxi.trips SELECT * FROM s3( 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz', 'TabSeparatedWithNames', " trip_id UInt32, vendor_id Enum8( '1' = 1, '2' = 2, '3' = 3, '4' = 4, 'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14, '' = 15 ), pickup_date Date, pickup_datetime DateTime, dropoff_date Date, dropoff_datetime DateTime, store_and_fwd_flag UInt8, rate_code_id UInt8, pickup_longitude Float64, pickup_latitude Float64, dropoff_longitude Float64, dropoff_latitude Float64, passenger_count UInt8, trip_distance Float64, fare_amount Decimal(10, 2), extra Decimal(10, 2), mta_tax Decimal(10, 2), tip_amount Decimal(10, 2), tolls_amount Decimal(10, 2), ehail_fee Decimal(10, 2), improvement_surcharge Decimal(10, 2), total_amount Decimal(10, 2), payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4), trip_type UInt8, pickup FixedString(25), dropoff FixedString(25), cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3), pickup_nyct2010_gid Int8, pickup_ctlabel Float32, pickup_borocode Int8, pickup_ct2010 String, pickup_boroct2010 String, pickup_cdeligibil String, pickup_ntacode FixedString(4), pickup_ntaname String, pickup_puma UInt16, dropoff_nyct2010_gid UInt8, dropoff_ctlabel Float32, dropoff_borocode UInt8, dropoff_ct2010 String, dropoff_boroct2010 String, dropoff_cdeligibil String, dropoff_ntacode FixedString(4), dropoff_ntaname String, dropoff_puma UInt16 ") SETTINGS input_format_try_infer_datetimes = 0

验证数据导入成功后退出 ClickHouse 客户端:

SELECT count() FROM taxi.trips; -- 查看数据总行数 quit -- 退出客户端

安装 pg_clickhouse

可通过以下方式安装 pg_clickhouse:

  1. PGXNGitHub 编译安装
  2. 直接使用 Docker 镜像(基于标准 PostgreSQL 镜像,已预装 pg_clickhouse):
docker run -d --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \ ghcr.io/clickhouse/pg_clickhouse:18

连接 pg_clickhouse

首先连接到 PostgreSQL 数据库:

docker exec -it pg_clickhouse psql -U postgres

启用 pg_clickhouse 扩展:

CREATE EXTENSION pg_clickhouse;

创建外部服务器

创建连接 ClickHouse 的外部服务器,需指定 ClickHouse 的主机、端口和数据库:

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
  • driver:连接驱动,可选 binary(使用 ClickHouse 二进制协议)或 http(使用 HTTP 接口)
  • host:ClickHouse 主机名(此处为本地)
  • dbname:目标 ClickHouse 数据库名(即前文创建的 taxi

创建用户映射

将 PostgreSQL 用户映射到 ClickHouse 用户,最简单的方式是映射当前 PostgreSQL 用户:

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv OPTIONS (user 'default'); -- ClickHouse 默认用户名

如需认证,可添加 password 选项(例如 OPTIONS (user 'default', password 'your_pass'))。

导入外部表

创建 PostgreSQL 模式(schema),并将 ClickHouse 的 taxi 数据库中所有表导入该模式:

CREATE SCHEMA taxi; -- 创建本地模式 IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi; -- 导入外部表

验证导入结果

在 psql 中使用以下命令验证外部表是否导入成功:

  1. 查看外部表列表(\det+):
taxi=# \det+ taxi.* 外部表列表 模式 | 表名 | 服务器 | FDW 选项 | 描述 -------+--------+-------------+-----------------------------------------------+-------- taxi | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null] (1 行记录)
  1. 查看表结构(\d):
taxi=# \d taxi.trips 外部表 "taxi.trips" 列名 | 类型 | 排序规则 | 可为空 | 默认值 | FDW 选项 ---------------------+--------------------------+----------+--------+---------+---------- trip_id | bigint | | 否 | | vendor_id | text | | 否 | | pickup_date | date | | 否 | | pickup_datetime | timestamp without time zone | | 否 | | dropoff_date | date | | 否 | | dropoff_datetime | timestamp without time zone | | 否 | | store_and_fwd_flag | smallint | | 否 | | rate_code_id | smallint | | 否 | | pickup_longitude | double precision | | 否 | | pickup_latitude | double precision | | 否 | | dropoff_longitude | double precision | | 否 | | dropoff_latitude | double precision | | 否 | | passenger_count | smallint | | 否 | | trip_distance | double precision | | 否 | | fare_amount | numeric(10,2) | | 否 | | extra | numeric(10,2) | | 否 | | mta_tax | numeric(10,2) | | 否 | | tip_amount | numeric(10,2) | | 否 | | tolls_amount | numeric(10,2) | | 否 | | ehail_fee | numeric(10,2) | | 否 | | improvement_surcharge | numeric(10,2) | | 否 | | total_amount | numeric(10,2) | | 否 | | payment_type | text | | 否 | | trip_type | smallint | | 否 | | pickup | character varying(25) | | 否 | | dropoff | character varying(25) | | 否 | | cab_type | text | | 否 | | pickup_nyct2010_gid | smallint | | 否 | | pickup_ctlabel | real | | 否 | | pickup_borocode | smallint | | 否 | | pickup_ct2010 | text | | 否 | | pickup_boroct2010 | text | | 否 | | pickup_cdeligibil | text | | 否 | | pickup_ntacode | character varying(4) | | 否 | | pickup_ntaname | text | | 否 | | pickup_puma | integer | | 否 | | dropoff_nyct2010_gid | smallint | | 否 | | dropoff_ctlabel | real | | 否 | | dropoff_borocode | smallint | | 否 | | dropoff_ct2010 | text | | 否 | | dropoff_boroct2010 | text | | 否 | | dropoff_cdeligibil | text | | 否 | | dropoff_ntacode | character varying(4) | | 否 | | dropoff_ntaname | text | | 否 | | dropoff_puma | integer | | 否 | | 服务器:taxi_srv FDW 选项:(database 'taxi', table_name 'trips', engine 'MergeTree')

执行查询

查询数据总行数,验证查询下推功能:

SELECT count(*) FROM taxi.trips; count --------- 1999657 (1 行记录)

使用 EXPLAIN 查看查询计划(确认查询是否下推到 ClickHouse):

EXPLAIN select count(*) from taxi.trips; 查询计划 ------------------------------------------------- 外部扫描 (cost=1.00..-0.90 rows=1 width=8) Relations: 聚合 on (trips) (2 行记录)
  • 若 “外部扫描(Foreign Scan)” 位于计划根节点,说明整个查询已下推至 ClickHouse 执行,仅返回结果给 PostgreSQL,性能最优。

数据分析

以下示例演示如何通过 pg_clickhouse 分析出租车数据,可直接执行或自定义 SQL 查询。

1. 计算平均小费金额

taxi=# \timing -- 开启计时功能 计时已开启。 taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips; round ------- 1.68 (1 行记录) 时间:9.438 毫秒

2. 按乘客数量分组计算平均总费用

taxi=# SELECT passenger_count, avg(total_amount)::NUMERIC(10, 2) AS average_total_amount FROM taxi.trips GROUP BY passenger_count; passenger_count | average_total_amount -----------------+---------------------- 0 | 22.68 1 | 15.96 2 | 17.14 3 | 16.75 4 | 17.32 5 | 16.34 6 | 16.03 7 | 59.79 8 | 36.40 9 | 9.79 (10 行记录) 时间:27.266 毫秒

3. 按日期和社区分组统计每日接单量

taxi=# SELECT pickup_date, pickup_ntaname, SUM(1) AS number_of_trips FROM taxi.trips GROUP BY pickup_date, pickup_ntaname ORDER BY pickup_date ASC LIMIT 10; pickup_date | pickup_ntaname | number_of_trips -------------+--------------------------------+----------------- 2015-07-01 | Williamsburg | 1 2015-07-01 | park-cemetery-etc-Queens | 6 2015-07-01 | Maspeth | 1 2015-07-01 | Stuyvesant Town-Cooper Village | 44 2015-07-01 | Rego Park | 1 2015-07-01 | Greenpoint | 7 2015-07-01 | Highbridge | 1 2015-07-01 | Briarwood-Jamaica Hills | 3 2015-07-01 | Airport | 550 2015-07-01 | East Harlem North | 32 (10 行记录) 时间:30.978 毫秒

4. 按行程时长分组分析费用和乘客量

taxi=# SELECT avg(tip_amount) AS avg_tip, avg(fare_amount) AS avg_fare, avg(passenger_count) AS avg_passenger, count(*) AS count, round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes FROM taxi.trips WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0 GROUP BY trip_minutes ORDER BY trip_minutes DESC LIMIT 5; avg_tip | avg_fare | avg_passenger | count | trip_minutes -------------------+------------------+------------------+-------+-------------- 1.96 | 8 | 1 | 1 | 27512 0 | 12 | 2 | 1 | 27500 0.562727272727273 | 17.4545454545455 | 2.45454545454545 | 11 | 1440 0.716564885496183 | 14.2786259541985 | 1.94656488549618 | 131 | 1439 1.00945205479452 | 12.8787671232877 | 1.98630136986301 | 146 | 1438 (5 行记录) 时间:45.477 毫秒
  • 注:date_part('epoch', datetime) 用于将时间转换为 Unix 时间戳(秒数),再计算行程时长(分钟)。

5. 按社区和小时分组统计接单量

taxi=# SELECT pickup_ntaname, date_part('hour', pickup_datetime) as pickup_hour, SUM(1) AS pickups FROM taxi.trips WHERE pickup_ntaname != '' GROUP BY pickup_ntaname, pickup_hour ORDER BY pickup_ntaname, date_part('hour', pickup_datetime) LIMIT 5; pickup_ntaname | pickup_hour | pickups ----------------+-------------+--------- Airport | 0 | 3509 Airport | 1 | 1184 Airport | 2 | 401 Airport | 3 | 152 Airport | 4 | 213 (5 行记录) 时间:36.895 毫秒

6. 查询前往拉瓜迪亚 / 肯尼迪机场的行程

taxi=# SELECT pickup_datetime, dropoff_datetime, total_amount, pickup_nyct2010_gid, dropoff_nyct2010_gid, CASE WHEN dropoff_nyct2010_gid = 138 THEN 'LGA' -- 拉瓜迪亚机场 WHEN dropoff_nyct2010_gid = 132 THEN 'JFK' -- 肯尼迪机场 END AS airport_code, EXTRACT(YEAR FROM pickup_datetime) AS year, EXTRACT(DAY FROM pickup_datetime) AS day, EXTRACT(HOUR FROM pickup_datetime) AS hour FROM taxi.trips WHERE dropoff_nyct2010_gid IN (132, 138) ORDER BY pickup_datetime LIMIT 5; pickup_datetime | dropoff_datetime | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour ---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------ 2015-07-01 00:04:14 | 2015-07-01 00:15:29 | 13.30 | -34 | 132 | JFK | 2015 | 1 | 0 2015-07-01 00:09:42 | 2015-07-01 00:12:55 | 6.80 | 50 | 138 | LGA | 2015 | 1 | 0 2015-07-01 00:23:04 | 2015-07-01 00:24:39 | 4.80 | -125 | 132 | JFK | 2015 | 1 | 0 2015-07-01 00:27:51 | 2015-07-01 00:39:02 | 14.72 | -101 | 138 | LGA | 2015 | 1 | 0 2015-07-01 00:32:03 | 2015-07-01 00:55:39 | 39.34 | 48 | 138 | LGA | 2015 | 1 | 0 (5 行记录) 时间:17.450 毫秒

创建字典

字典(Dictionary)是 ClickHouse 的特殊对象,用于存储维度数据(如地区映射表)。以下创建纽约市社区 - 行政区映射字典,并通过 pg_clickhouse 访问。

1. 创建 ClickHouse 字典

通过 clickhouse_raw_query 函数在 ClickHouse 中创建字典(数据来源于 S3 存储的 CSV 文件):

SELECT clickhouse_raw_query($$ CREATE DICTIONARY taxi.taxi_zone_dictionary ( LocationID Int64 DEFAULT 0, Borough String, -- 行政区(如 Manhattan、Brooklyn) zone String, -- 社区名称 service_zone String -- 服务区域 ) PRIMARY KEY LocationID -- 主键(映射 trips 表的 gid 字段) SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames')) LIFETIME(MIN 0 MAX 0) -- 禁用自动更新(避免频繁访问 S3) LAYOUT(HASHED_ARRAY()) -- 字典存储结构 $$, 'host=localhost dbname=taxi'); -- 连接参数
  • 字典数据说明:CSV 文件包含纽约市社区与行政区的映射关系,LocationID 对应 trips 表的 pickup_nyct2010_giddropoff_nyct2010_gid 字段。

2. 导入字典到 PostgreSQL

IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary) FROM SERVER taxi_srv INTO taxi;

3. 验证字典查询

taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3; LocationID | Borough | Zone | service_zone ------------+-----------+-----------------------------------------------+-------------- 77 | Brooklyn | East New York/Pennsylvania Avenue | Boro Zone 106 | Brooklyn | Gowanus | Boro Zone 103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone (3 行记录)

4. 使用字典查询(dictGet 函数)

通过 dictGet 函数(pg_clickhouse 自定义函数)关联字典数据,统计前往两大机场的行程按出发行政区分组的数量:

taxi=# SELECT count(1) AS total, COALESCE(NULLIF(dictGet( 'taxi.taxi_zone_dictionary', 'Borough', toUInt64(pickup_nyct2010_gid) -- 转换为字典主键类型 ), ''), 'Unknown') AS borough_name FROM taxi.trips WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138 -- 目的地为 JFK/LGA 机场 GROUP BY borough_name ORDER BY total DESC; total | borough_name -------+--------------- 23683 | Unknown 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (7 行记录) 时间:66.245 毫秒
  • dictGet(dict_name, col_name, key):从字典中根据主键 key 查询指定列 col_name 的值。

执行关联查询(JOIN)

trips 表与字典表关联,实现更复杂的分析。

1. 简单关联查询(替代 dictGet)

taxi=# SELECT count(1) AS total, "Borough" AS borough_name FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 -- 过滤无效 gid AND dropoff_nyct2010_gid IN (132, 138) -- 目的地为机场 GROUP BY "Borough" ORDER BY total DESC; total | borough_name -------+--------------- 7053 | Manhattan 6828 | Brooklyn 4458 | Queens 2670 | Bronx 554 | Staten Island 53 | EWR (6 行记录) 时间:48.449 毫秒
  • 说明:该查询结果与前文 dictGet 示例一致(不含 Unknown 行),底层 ClickHouse 会自动优化为字典查询,JOIN 语法更符合 SQL 开发者习惯。

2. 查看关联查询计划

taxi=# explain SELECT count(1) AS total, "Borough" FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID") WHERE pickup_nyct2010_gid > 0 AND dropoff_nyct2010_gid IN (132, 138) GROUP BY "Borough" ORDER BY total DESC; 查询计划 ----------------------------------------------------------------------- 外部扫描 (cost=1.00..5.10 rows=1000 width=40) Relations: 聚合 on ((trips) INNER JOIN (taxi_zone_dictionary)) (2 行记录) 时间:2.012 毫秒
  • 结果表明:关联查询已完全下推至 ClickHouse 执行,PostgreSQL 仅接收最终结果。

3. 复杂关联查询(筛选高小费行程)

查询小费金额前 1000 的行程,并关联字典获取下车点行政区信息:

taxi=# SELECT * FROM taxi.trips JOIN taxi.taxi_zone_dictionary ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID" WHERE tip_amount > 0 ORDER BY tip_amount DESC LIMIT 1000;

注意事项

  • 避免使用 SELECT *:查询时应明确指定所需列,减少数据传输量,提升性能。
  • 数据类型兼容性:关联查询时需确保关联字段类型一致(如使用 toUInt64 转换类型),避免类型不匹配导致查询失败。
  • 字典与表关联:ClickHouse 字典本质是内存中的维度表,关联查询性能优于普通表 JOIN,适合高频访问的维度数据。
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论