PostgreSQL pg_clickhouse 0.1.0 插件的安装和使用
clickhouse公司开发的pg_clickhouse可以由PostgreSQL数据库调用clickhouse数据库的能力,实现快速的查询。
https://github.com/ClickHouse/pg_clickhouse/blob/main/doc/pg_clickhouse.md
该库包含一个 PostgreSQL 扩展,支持在 ClickHouse 数据库上执行远程查询,包括一个外部数据包装器(foreign data wrapper)。其兼容 PostgreSQL 13 及以上版本,以及 ClickHouse 23 及以上版本。
版本控制策略
pg_clickhouse 的公开版本遵循语义化版本控制(Semantic Versioning):
- 主版本号(Major):当 API 发生不兼容变更时递增
- 次版本号(Minor):当新增向后兼容的 SQL 功能时递增
- 修订版本号(Patch):仅发生二进制层面的兼容变更时递增
安装后,PostgreSQL 会跟踪该扩展的两个版本变体:
- 库版本(Library Version):在 PostgreSQL 18 及以上版本中由
PG_MODULE_MAGIC定义,包含完整的语义化版本号,可通过pg_get_loaded_modules()函数查看 - 扩展版本(Extension Version):在控制文件中定义,仅包含主版本号和次版本号,可通过
pg_catalog.pg_extension表、pg_available_extension_versions()函数或\dx pg_clickhouse命令查看
实际使用中:
- 仅修订版本号递增的更新(如从 v0.1.0 到 v0.1.1):所有已加载 v0.1 版本的数据库无需执行
ALTER EXTENSION即可享受更新带来的优化 - 次版本号或主版本号递增的更新:会附带 SQL 升级脚本,所有已安装该扩展的数据库必须执行
ALTER EXTENSION pg_clickhouse UPDATE才能完成升级
安装
快速开始
体验 pg_clickhouse 最简单的方式是使用 Docker 镜像,该镜像基于标准 PostgreSQL Docker 镜像并集成了 pg_clickhouse 扩展:
docker run --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
-d ghcr.io/clickhouse/pg_clickhouse:18
docker exec -it pg_clickhouse psql -U postgres
使用
CREATE EXTENSION pg_clickhouse;
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'default');
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;
从源代码编译
PostgreSQL 和 curl 开发包会在环境变量中包含 pg_config 和 curl_config,因此您只需运行 make(或 gmake),然后执行 make install,最后在数据库中运行 CREATE EXTENSION pg_clickhouse 即可。
RedHat / CentOS / Yum 系统
sudo yum install \ postgresql-server \ libcurl-devel \ libuuid-devel \ openssl-libs \ automake \ cmake \ gcc
从 PGXN 安装
满足上述依赖后,使用 PGXN 客户端(可通过 Homebrew、Apt、Yum 安装,包名均为 pgxnclient)下载、编译并安装 pg_clickhouse:
pgxn install pg_clickhouse
编译与安装
要构建并安装 ClickHouse 库和 pg_clickhouse 扩展,运行以下命令:
make sudo make install
特殊情况处理
若主机存在多个 PostgreSQL 安装版本,需指定对应的 pg_config 路径:
export PG_CONFIG=/usr/lib/postgresql/18/bin/pg_config
make
sudo make install
若 curl_config 未在环境变量中,可显式指定路径:
export CURL_CONFIG=/opt/homebrew/opt/curl/bin/curl-config
make
sudo make install
若遇到如下错误:
"Makefile", line 8: Need an operator
需使用 GNU make(系统中可能以 gmake 命名):
gmake gmake install gmake installcheck
若遇到如下错误:
make: pg_config: Command not found
需确保 pg_config 已安装且在环境变量中。若通过 RPM 等包管理工具安装 PostgreSQL,需同时安装 -devel 开发包。必要时显式指定 pg_config 路径:
export PG_CONFIG=/path/to/pg_config
make
sudo make install
若需在 PostgreSQL 18 及以上版本中安装到自定义路径,可在 install 目标中指定 prefix 参数(其他 make 目标无需指定):
sudo make install prefix=/usr/local/extras
然后在 postgresql.conf 中配置以下参数,确保自定义路径被识别:
extension_control_path = '/usr/local/extras/postgresql/share:$system'
dynamic_library_path = '/usr/local/extras/postgresql/lib:$libdir'
测试
安装扩展后,运行以下命令执行测试套件:
make installcheck
若遇到如下错误:
ERROR: must be owner of database regression
需使用超级用户(如默认的 postgres 超级用户)运行测试:
make installcheck PGUSER=postgres
加载扩展
pg_clickhouse 安装完成后,可通过以下步骤加载到数据库:
以超级用户身份连接数据库
执行以下 SQL 语句:
CREATE EXTENSION pg_clickhouse;
若需将 pg_clickhouse 及其所有支持对象安装到指定 schema,使用 SCHEMA 子句指定:
CREATE SCHEMA env;
CREATE EXTENSION pg_clickhouse SCHEMA env;
依赖项
- 运行依赖:PostgreSQL 13+、libcurl、libuuid
- 构建依赖:C/C++ 编译器、libSSL、GNU make、CMake
使用 pg_clickhouse
以下 SQL 语句用于操作 pg_clickhouse 扩展。
CREATE EXTENSION
使用 CREATE EXTENSION 将 pg_clickhouse 添加到数据库:
CREATE EXTENSION pg_clickhouse;
推荐使用 WITH SCHEMA 指定扩展安装的 schema:
CREATE SCHEMA ch;
CREATE EXTENSION pg_clickhouse WITH SCHEMA ch;
ALTER EXTENSION
使用 ALTER EXTENSION 修改 pg_clickhouse 扩展配置:
安装新版本后,使用 UPDATE 子句升级:
ALTER EXTENSION pg_clickhouse UPDATE;
使用 SET SCHEMA 将扩展迁移到新的 schema:
CREATE SCHEMA ch;
ALTER EXTENSION pg_clickhouse SET SCHEMA ch;
DROP EXTENSION
使用 DROP EXTENSION 从数据库中移除 pg_clickhouse:
DROP EXTENSION pg_clickhouse;
若存在依赖该扩展的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:
DROP EXTENSION pg_clickhouse CASCADE;
CREATE SERVER
使用 CREATE SERVER 创建连接 ClickHouse 服务器的外部服务器:
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
支持的选项:
driver:ClickHouse 连接驱动,可选值为 “binary” 或 “http”,必填项dbname:连接时使用的 ClickHouse 数据库,默认值为 “default”host:ClickHouse 服务器主机名,默认值为 “localhost”port:连接端口,默认值如下:- 若
driver为 “binary” 且host是 ClickHouse Cloud 主机:9440 - 若
driver为 “binary” 且host非 ClickHouse Cloud 主机:9004 - 若
driver为 “http” 且host是 ClickHouse Cloud 主机:8443 - 若
driver为 “http” 且host非 ClickHouse Cloud 主机:8123
- 若
ALTER SERVER
使用 ALTER SERVER 修改外部服务器配置:
ALTER SERVER taxi_srv OPTIONS (SET driver 'http');
支持的选项与 CREATE SERVER 一致。
DROP SERVER
使用 DROP SERVER 移除外部服务器:
DROP SERVER taxi_srv;
若存在依赖该服务器的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:
DROP SERVER taxi_srv CASCADE;
CREATE USER MAPPING
使用 CREATE USER MAPPING 将 PostgreSQL 用户映射到 ClickHouse 用户。例如,将当前 PostgreSQL 用户映射到 taxi_srv 外部服务器对应的 ClickHouse 用户:
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'demo');
支持的选项:
user:ClickHouse 用户名,默认值为 “default”password:ClickHouse 用户密码
ALTER USER MAPPING
使用 ALTER USER MAPPING 修改用户映射配置:
ALTER USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (SET user 'default');
支持的选项与 CREATE USER MAPPING 一致。
DROP USER MAPPING
使用 DROP USER MAPPING 移除用户映射:
DROP USER MAPPING FOR CURRENT_USER SERVER taxi_srv;
IMPORT FOREIGN SCHEMA
使用 IMPORT FOREIGN SCHEMA 将 ClickHouse 数据库中的所有表导入为 PostgreSQL 中的外部表:
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA demo FROM SERVER taxi_srv INTO taxi;
使用 LIMIT TO 限制仅导入指定表:
IMPORT FOREIGN SCHEMA demo LIMIT TO (trips) FROM SERVER taxi_srv INTO taxi;
使用 EXCEPT 排除指定表:
IMPORT FOREIGN SCHEMA demo EXCEPT (users) FROM SERVER taxi_srv INTO taxi;
pg_clickhouse 会执行以下操作:
- 获取指定 ClickHouse 数据库(上述示例中为 “demo”)的所有表列表
- 获取每个表的列定义
- 执行
CREATE FOREIGN TABLE命令创建外部表,列会使用支持的数据类型,并在可检测的情况下包含CREATE FOREIGN TABLE支持的选项
⚠️ 导入标识符的大小写保留规则
IMPORT FOREIGN SCHEMA会对导入的表名和列名执行quote_identifier()函数,这意味着包含大写字符或空格的标识符会被双引号包裹。因此,在 PostgreSQL 查询中必须对这类表名和列名使用双引号。全小写且无空格的标识符无需引号。示例:
ClickHouse 中的表定义:
CREATE OR REPLACE TABLE test ( id UInt64, Name TEXT, updatedAt DateTime DEFAULT now() ) ENGINE = MergeTree ORDER BY id;通过
IMPORT FOREIGN SCHEMA创建的外部表:CREATE TABLE test ( id BIGINT NOT NULL, "Name" TEXT NOT NULL, "updatedAt" TIMESTAMPTZ NOT NULL );查询时需正确使用双引号:
SELECT id, "Name", "updatedAt" FROM test;若需创建名称不同或全小写(大小写不敏感)的对象,请使用
CREATE FOREIGN TABLE。
CREATE FOREIGN TABLE
使用 CREATE FOREIGN TABLE 创建可查询 ClickHouse 数据的外部表:
CREATE FOREIGN TABLE uact (
user_id bigint NOT NULL,
page_views int,
duration smallint,
sign smallint
) SERVER taxi_srv OPTIONS(
table_name 'uact',
engine 'CollapsingMergeTree'
);
支持的表选项:
database:远程 ClickHouse 数据库名,默认值为外部服务器定义的数据库table_name:远程 ClickHouse 表名,默认值为外部表的名称engine:ClickHouse 表使用的引擎。对于CollapsingMergeTree()和AggregatingMergeTree(),pg_clickhouse 会自动将参数应用到该表上执行的函数表达式中
列数据类型需与远程 ClickHouse 表的列类型匹配。对于 AggregateFunction 类型和 SimpleAggregateFunction 类型的列,需将数据类型映射到函数对应的 ClickHouse 类型,并通过列选项指定聚合函数名称:
AggregateFunction:应用于AggregateFunction类型列的聚合函数名称SimpleAggregateFunction:应用于SimpleAggregateFunction类型列的聚合函数名称
示例:
CREATE FOREIGN TABLE test (
column1 bigint OPTIONS(AggregateFunction 'uniq'),
column2 integer OPTIONS(AggregateFunction 'anyIf'),
column3 bigint OPTIONS(AggregateFunction 'quantiles(0.5, 0.9)')
) SERVER clickhouse_srv;
对于带有 AggregateFunction 函数的列,pg_clickhouse 会自动在评估该列的聚合函数后追加 Merge。
ALTER FOREIGN TABLE
使用 ALTER FOREIGN TABLE 修改外部表定义:
ALTER TABLE table ALTER COLUMN b OPTIONS (SET AggregateFunction 'count');
支持的表选项和列选项与 CREATE FOREIGN TABLE 一致。
DROP FOREIGN TABLE
使用 DROP FOREIGN TABLE 移除外部表:
DROP FOREIGN TABLE uact;
若存在依赖该外部表的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:
DROP FOREIGN TABLE uact CASCADE;
函数和运算符参考
数据类型映射
pg_clickhouse 将 ClickHouse 数据类型映射为以下 PostgreSQL 数据类型:
| ClickHouse 类型 | PostgreSQL 类型 | 备注 |
|---|---|---|
| Bool | boolean | |
| Date | date | |
| DateTime | timestamp | |
| Decimal | numeric | |
| Float32 | real | |
| Float64 | double precision | |
| IPv4 | inet | |
| IPv6 | inet | |
| Int16 | smallint | |
| Int32 | integer | |
| Int64 | bigint | |
| Int8 | smallint | |
| JSON | jsonb | 仅支持 HTTP 引擎 |
| String | text | |
| UInt16 | integer | |
| UInt32 | bigint | |
| UInt64 | bigint | 当值超过 BIGINT 最大值时会报错 |
| UInt8 | smallint | |
| UUID | uuid |
函数
以下函数提供查询 ClickHouse 数据库的接口。
clickhouse_raw_query
SELECT clickhouse_raw_query(
'CREATE TABLE t1 (x String) ENGINE = Memory',
'host=localhost port=8123'
);
通过 HTTP 接口连接 ClickHouse 服务,执行单个查询后断开连接。可选的第二个参数为连接字符串,默认值为 host=localhost port=8123。支持的连接参数:
host:连接的主机名,必填项port:HTTP 端口,默认值为 8123;若host是 ClickHouse Cloud 主机,默认值为 8443dbname:连接的数据库名username:连接使用的用户名,默认值为 “default”password:认证密码,默认值为空
该函数适用于无返回结果的查询;若查询有返回值,会以单个文本值形式返回:
SELECT clickhouse_raw_query(
'SELECT schema_name, schema_owner from information_schema.schemata',
'host=localhost port=8123'
);
返回结果:
clickhouse_raw_query --------------------------------- INFORMATION_SCHEMA default+ default default + git default + information_schema default+ system default + (1 row)
下推函数(Pushdown Functions)
所有在查询 ClickHouse 外部表的条件子句(HAVING 和 WHERE)中使用的 PostgreSQL 内置函数,都会自动以相同名称和签名下推到 ClickHouse 执行。但部分函数的名称或签名不同,需映射为对应的 ClickHouse 函数。pg_clickhouse 支持以下函数映射:
date_part:date_part('day')→toDayOfMonthdate_part('doy')→toDayOfYeardate_part('dow')→toDayOfWeekdate_part('year')→toYeardate_part('month')→toMonthdate_part('hour')→toHourdate_part('minute')→toMinutedate_part('second')→toSeconddate_part('quarter')→toQuarterdate_part('isoyear')→toISOYeardate_part('week')→toISOYeardate_part('epoch')→toISOYear
date_trunc:date_trunc('week')→toMondaydate_trunc('second')→toStartOfSeconddate_trunc('minute')→toStartOfMinutedate_trunc('hour')→toStartOfHourdate_trunc('day')→toStartOfDaydate_trunc('month')→toStartOfMonthdate_trunc('quarter')→toStartOfQuarterdate_trunc('year')→toStartOfYear
array_position→indexOfbtrim→trimBothstrpos→positionregexp_like→match
自定义函数(Custom Functions)
pg_clickhouse 提供以下自定义函数,用于为无 PostgreSQL 等效函数的 ClickHouse 函数提供外部查询下推支持。若这些函数无法下推,会抛出异常:
dictGet
下推类型转换(Pushdown Casts)
pg_clickhouse 支持将 CAST(x AS bigint) 等类型转换操作下推到 ClickHouse(仅适用于兼容的数据类型)。若类型不兼容,下推会失败(例如,若 x 是 ClickHouse 的 UInt64 类型,ClickHouse 会拒绝该转换)。
为支持将类型转换下推到不兼容的数据类型,pg_clickhouse 提供以下函数。若这些函数无法下推,会在 PostgreSQL 中抛出异常:
toUInt8toUInt16toUInt32toUInt64toUInt128
下推聚合函数(Pushdown Aggregates)
以下 PostgreSQL 聚合函数支持下推到 ClickHouse 执行:
count
自定义聚合函数(Custom Aggregates)
pg_clickhouse 提供以下自定义聚合函数,用于为无 PostgreSQL 等效函数的 ClickHouse 聚合函数提供外部查询下推支持。若这些函数无法下推,会抛出异常:
argMaxargMinuniquniqCombineduniqCombined64uniqExactuniqHLL12uniqThetaquantilequantileExact
下推有序集合聚合函数(Pushdown Ordered Set Aggregates)
以下有序集合聚合函数会映射到 ClickHouse 的参数化聚合函数,将直接参数作为参数传递,将 ORDER BY 表达式作为参数传入。例如:
PostgreSQL 查询:
SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY a) FROM t1;
映射为 ClickHouse 查询:
SELECT quantile(0.25)(a) FROM t1;
注意:不支持非默认的 ORDER BY 后缀(DESC 和 NULLS FIRST),否则会抛出错误。
支持的函数:
percentile_cont(double)→quantilequantile(double)→quantilequantileExact(double)→quantileExact
会话设置
通过设置 pg_clickhouse.session_settings 运行时参数,可配置后续查询中 ClickHouse 的会话设置。示例:
SET pg_clickhouse.session_settings = 'join_use_nulls 1, final 1';
默认值为 join_use_nulls 1。若需使用 ClickHouse 服务器的默认设置,可将其设为空字符串:
SET pg_clickhouse.session_settings = '';
语法规则:
以逗号分隔的键值对列表,键和值之间用一个或多个空格分隔
键必须是 ClickHouse 支持的设置项
若值中包含空格、逗号或反斜杠,需用反斜杠转义:
SET pg_clickhouse.session_settings = 'join_algorithm grace_hash\,hash';
也可使用单引号包裹值以避免转义空格和逗号;若需避免双引号转义,可使用美元引号(dollar quoting):
SET pg_clickhouse.session_settings = $$join_algorithm 'grace_hash,hash'$$;
若需设置多个配置项以提高可读性,可使用多行格式:
SET pg_clickhouse.session_settings TO $$
connect_timeout 2,
count_distinct_implementation uniq,
final 1,
group_by_use_nulls 1,
join_algorithm 'prefer_partial_merge',
join_use_nulls 1,
log_queries_min_type QUERY_FINISH,
max_block_size 32768,
max_execution_time 45,
max_result_rows 1024,
metrics_perf_events_list 'this,that',
network_compression_method ZSTD,
poll_interval 5,
totals_mode after_having_auto
$$;
pg_clickhouse 不会验证这些设置项,仅会在每个查询中传递给 ClickHouse。因此,该参数支持各个 ClickHouse 版本的所有设置项。
注意:设置 pg_clickhouse.session_settings 前需确保 pg_clickhouse 已加载,可通过库预加载(shared library preloading)或使用扩展中的任意对象来确保加载。
测试用例
启动 ClickHouse
首先,若尚未部署 ClickHouse 数据库,可通过 Docker 快速启动:
docker run -d --network host --name clickhouse -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse
docker exec -it clickhouse clickhouse-client
执行:
[root@bigdata ~]# docker run -d --network host --name clickhouse -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse
Unable to find image 'clickhouse:latest' locally
latest: Pulling from library/clickhouse
7e49dc6156b0: Pull complete
66a9f27340b0: Pull complete
4ea1943fd65c: Pull complete
c4e91a1791f4: Pull complete
b95838ab0602: Pull complete
e70e29afad90: Pull complete
596aaa9e4cb7: Pull complete
077cf31b66a6: Pull complete
Digest: sha256:0c1acee29c905829331544ec71342ed4346a6383da60f0c488f68b6b45009010
Status: Downloaded newer image for clickhouse:latest
WARNING: Published ports are discarded when using host network mode
22ec632560f4d7ddafc04610623ceb80e9588ac5fa702c13eff57b15ad8578ac
[root@bigdata ~]# docker exec -it clickhouse clickhouse-client
ClickHouse client version 25.11.2.24 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 25.11.2.
Warnings:
* Delay accounting is not enabled, OSIOWaitMicroseconds will not be gathered. You can enable it using `sudo sh -c 'echo 1 > /proc/sys/kernel/task_delayacct'` or by using sysctl.
* Linux transparent hugepages are set to "always". Check /sys/kernel/mm/transparent_hugepage/enabled
* Available disk space for data at server startup is too low (1GiB): /var/lib/clickhouse/
* Available disk space for logs at server startup is too low (1GiB): /var/log/clickhouse-server
bigdata :)
创建表
参考 ClickHouse 官方教程,创建存储纽约市出租车数据集的数据库和表:
CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;
导入数据集
执行以下语句导入数据(数据来源于 S3 存储的纽约市出租车数据集):
INSERT INTO taxi.trips
SELECT * FROM s3(
'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
'TabSeparatedWithNames', "
trip_id UInt32,
vendor_id Enum8(
'1' = 1, '2' = 2, '3' = 3, '4' = 4,
'CMT' = 5, 'VTS' = 6, 'DDS' = 7, 'B02512' = 10,
'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
'' = 15
),
pickup_date Date,
pickup_datetime DateTime,
dropoff_date Date,
dropoff_datetime DateTime,
store_and_fwd_flag UInt8,
rate_code_id UInt8,
pickup_longitude Float64,
pickup_latitude Float64,
dropoff_longitude Float64,
dropoff_latitude Float64,
passenger_count UInt8,
trip_distance Float64,
fare_amount Decimal(10, 2),
extra Decimal(10, 2),
mta_tax Decimal(10, 2),
tip_amount Decimal(10, 2),
tolls_amount Decimal(10, 2),
ehail_fee Decimal(10, 2),
improvement_surcharge Decimal(10, 2),
total_amount Decimal(10, 2),
payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
trip_type UInt8,
pickup FixedString(25),
dropoff FixedString(25),
cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
pickup_nyct2010_gid Int8,
pickup_ctlabel Float32,
pickup_borocode Int8,
pickup_ct2010 String,
pickup_boroct2010 String,
pickup_cdeligibil String,
pickup_ntacode FixedString(4),
pickup_ntaname String,
pickup_puma UInt16,
dropoff_nyct2010_gid UInt8,
dropoff_ctlabel Float32,
dropoff_borocode UInt8,
dropoff_ct2010 String,
dropoff_boroct2010 String,
dropoff_cdeligibil String,
dropoff_ntacode FixedString(4),
dropoff_ntaname String,
dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0
验证数据导入成功后退出 ClickHouse 客户端:
SELECT count() FROM taxi.trips; -- 查看数据总行数
quit -- 退出客户端
安装 pg_clickhouse
可通过以下方式安装 pg_clickhouse:
docker run -d --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \ ghcr.io/clickhouse/pg_clickhouse:18
连接 pg_clickhouse
首先连接到 PostgreSQL 数据库:
docker exec -it pg_clickhouse psql -U postgres
启用 pg_clickhouse 扩展:
CREATE EXTENSION pg_clickhouse;
创建外部服务器
创建连接 ClickHouse 的外部服务器,需指定 ClickHouse 的主机、端口和数据库:
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
driver:连接驱动,可选binary(使用 ClickHouse 二进制协议)或http(使用 HTTP 接口)host:ClickHouse 主机名(此处为本地)dbname:目标 ClickHouse 数据库名(即前文创建的taxi)
创建用户映射
将 PostgreSQL 用户映射到 ClickHouse 用户,最简单的方式是映射当前 PostgreSQL 用户:
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
OPTIONS (user 'default'); -- ClickHouse 默认用户名
如需认证,可添加 password 选项(例如 OPTIONS (user 'default', password 'your_pass'))。
导入外部表
创建 PostgreSQL 模式(schema),并将 ClickHouse 的 taxi 数据库中所有表导入该模式:
CREATE SCHEMA taxi; -- 创建本地模式
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi; -- 导入外部表
验证导入结果
在 psql 中使用以下命令验证外部表是否导入成功:
- 查看外部表列表(
\det+):
taxi=# \det+ taxi.*
外部表列表
模式 | 表名 | 服务器 | FDW 选项 | 描述
-------+--------+-------------+-----------------------------------------------+--------
taxi | trips | taxi_srv | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 行记录)
- 查看表结构(
\d):
taxi=# \d taxi.trips
外部表 "taxi.trips"
列名 | 类型 | 排序规则 | 可为空 | 默认值 | FDW 选项
---------------------+--------------------------+----------+--------+---------+----------
trip_id | bigint | | 否 | |
vendor_id | text | | 否 | |
pickup_date | date | | 否 | |
pickup_datetime | timestamp without time zone | | 否 | |
dropoff_date | date | | 否 | |
dropoff_datetime | timestamp without time zone | | 否 | |
store_and_fwd_flag | smallint | | 否 | |
rate_code_id | smallint | | 否 | |
pickup_longitude | double precision | | 否 | |
pickup_latitude | double precision | | 否 | |
dropoff_longitude | double precision | | 否 | |
dropoff_latitude | double precision | | 否 | |
passenger_count | smallint | | 否 | |
trip_distance | double precision | | 否 | |
fare_amount | numeric(10,2) | | 否 | |
extra | numeric(10,2) | | 否 | |
mta_tax | numeric(10,2) | | 否 | |
tip_amount | numeric(10,2) | | 否 | |
tolls_amount | numeric(10,2) | | 否 | |
ehail_fee | numeric(10,2) | | 否 | |
improvement_surcharge | numeric(10,2) | | 否 | |
total_amount | numeric(10,2) | | 否 | |
payment_type | text | | 否 | |
trip_type | smallint | | 否 | |
pickup | character varying(25) | | 否 | |
dropoff | character varying(25) | | 否 | |
cab_type | text | | 否 | |
pickup_nyct2010_gid | smallint | | 否 | |
pickup_ctlabel | real | | 否 | |
pickup_borocode | smallint | | 否 | |
pickup_ct2010 | text | | 否 | |
pickup_boroct2010 | text | | 否 | |
pickup_cdeligibil | text | | 否 | |
pickup_ntacode | character varying(4) | | 否 | |
pickup_ntaname | text | | 否 | |
pickup_puma | integer | | 否 | |
dropoff_nyct2010_gid | smallint | | 否 | |
dropoff_ctlabel | real | | 否 | |
dropoff_borocode | smallint | | 否 | |
dropoff_ct2010 | text | | 否 | |
dropoff_boroct2010 | text | | 否 | |
dropoff_cdeligibil | text | | 否 | |
dropoff_ntacode | character varying(4) | | 否 | |
dropoff_ntaname | text | | 否 | |
dropoff_puma | integer | | 否 | |
服务器:taxi_srv
FDW 选项:(database 'taxi', table_name 'trips', engine 'MergeTree')
执行查询
查询数据总行数,验证查询下推功能:
SELECT count(*) FROM taxi.trips;
count
---------
1999657
(1 行记录)
使用 EXPLAIN 查看查询计划(确认查询是否下推到 ClickHouse):
EXPLAIN select count(*) from taxi.trips;
查询计划
-------------------------------------------------
外部扫描 (cost=1.00..-0.90 rows=1 width=8)
Relations: 聚合 on (trips)
(2 行记录)
- 若 “外部扫描(Foreign Scan)” 位于计划根节点,说明整个查询已下推至 ClickHouse 执行,仅返回结果给 PostgreSQL,性能最优。
数据分析
以下示例演示如何通过 pg_clickhouse 分析出租车数据,可直接执行或自定义 SQL 查询。
1. 计算平均小费金额
taxi=# \timing -- 开启计时功能
计时已开启。
taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips;
round
-------
1.68
(1 行记录)
时间:9.438 毫秒
2. 按乘客数量分组计算平均总费用
taxi=# SELECT
passenger_count,
avg(total_amount)::NUMERIC(10, 2) AS average_total_amount
FROM taxi.trips
GROUP BY passenger_count;
passenger_count | average_total_amount
-----------------+----------------------
0 | 22.68
1 | 15.96
2 | 17.14
3 | 16.75
4 | 17.32
5 | 16.34
6 | 16.03
7 | 59.79
8 | 36.40
9 | 9.79
(10 行记录)
时间:27.266 毫秒
3. 按日期和社区分组统计每日接单量
taxi=# SELECT
pickup_date,
pickup_ntaname,
SUM(1) AS number_of_trips
FROM taxi.trips
GROUP BY pickup_date, pickup_ntaname
ORDER BY pickup_date ASC LIMIT 10;
pickup_date | pickup_ntaname | number_of_trips
-------------+--------------------------------+-----------------
2015-07-01 | Williamsburg | 1
2015-07-01 | park-cemetery-etc-Queens | 6
2015-07-01 | Maspeth | 1
2015-07-01 | Stuyvesant Town-Cooper Village | 44
2015-07-01 | Rego Park | 1
2015-07-01 | Greenpoint | 7
2015-07-01 | Highbridge | 1
2015-07-01 | Briarwood-Jamaica Hills | 3
2015-07-01 | Airport | 550
2015-07-01 | East Harlem North | 32
(10 行记录)
时间:30.978 毫秒
4. 按行程时长分组分析费用和乘客量
taxi=# SELECT
avg(tip_amount) AS avg_tip,
avg(fare_amount) AS avg_fare,
avg(passenger_count) AS avg_passenger,
count(*) AS count,
round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes
FROM taxi.trips
WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0
GROUP BY trip_minutes
ORDER BY trip_minutes DESC
LIMIT 5;
avg_tip | avg_fare | avg_passenger | count | trip_minutes
-------------------+------------------+------------------+-------+--------------
1.96 | 8 | 1 | 1 | 27512
0 | 12 | 2 | 1 | 27500
0.562727272727273 | 17.4545454545455 | 2.45454545454545 | 11 | 1440
0.716564885496183 | 14.2786259541985 | 1.94656488549618 | 131 | 1439
1.00945205479452 | 12.8787671232877 | 1.98630136986301 | 146 | 1438
(5 行记录)
时间:45.477 毫秒
- 注:
date_part('epoch', datetime)用于将时间转换为 Unix 时间戳(秒数),再计算行程时长(分钟)。
5. 按社区和小时分组统计接单量
taxi=# SELECT
pickup_ntaname,
date_part('hour', pickup_datetime) as pickup_hour,
SUM(1) AS pickups
FROM taxi.trips
WHERE pickup_ntaname != ''
GROUP BY pickup_ntaname, pickup_hour
ORDER BY pickup_ntaname, date_part('hour', pickup_datetime)
LIMIT 5;
pickup_ntaname | pickup_hour | pickups
----------------+-------------+---------
Airport | 0 | 3509
Airport | 1 | 1184
Airport | 2 | 401
Airport | 3 | 152
Airport | 4 | 213
(5 行记录)
时间:36.895 毫秒
6. 查询前往拉瓜迪亚 / 肯尼迪机场的行程
taxi=# SELECT
pickup_datetime,
dropoff_datetime,
total_amount,
pickup_nyct2010_gid,
dropoff_nyct2010_gid,
CASE
WHEN dropoff_nyct2010_gid = 138 THEN 'LGA' -- 拉瓜迪亚机场
WHEN dropoff_nyct2010_gid = 132 THEN 'JFK' -- 肯尼迪机场
END AS airport_code,
EXTRACT(YEAR FROM pickup_datetime) AS year,
EXTRACT(DAY FROM pickup_datetime) AS day,
EXTRACT(HOUR FROM pickup_datetime) AS hour
FROM taxi.trips
WHERE dropoff_nyct2010_gid IN (132, 138)
ORDER BY pickup_datetime
LIMIT 5;
pickup_datetime | dropoff_datetime | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour
---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------
2015-07-01 00:04:14 | 2015-07-01 00:15:29 | 13.30 | -34 | 132 | JFK | 2015 | 1 | 0
2015-07-01 00:09:42 | 2015-07-01 00:12:55 | 6.80 | 50 | 138 | LGA | 2015 | 1 | 0
2015-07-01 00:23:04 | 2015-07-01 00:24:39 | 4.80 | -125 | 132 | JFK | 2015 | 1 | 0
2015-07-01 00:27:51 | 2015-07-01 00:39:02 | 14.72 | -101 | 138 | LGA | 2015 | 1 | 0
2015-07-01 00:32:03 | 2015-07-01 00:55:39 | 39.34 | 48 | 138 | LGA | 2015 | 1 | 0
(5 行记录)
时间:17.450 毫秒
创建字典
字典(Dictionary)是 ClickHouse 的特殊对象,用于存储维度数据(如地区映射表)。以下创建纽约市社区 - 行政区映射字典,并通过 pg_clickhouse 访问。
1. 创建 ClickHouse 字典
通过 clickhouse_raw_query 函数在 ClickHouse 中创建字典(数据来源于 S3 存储的 CSV 文件):
SELECT clickhouse_raw_query($$
CREATE DICTIONARY taxi.taxi_zone_dictionary (
LocationID Int64 DEFAULT 0,
Borough String, -- 行政区(如 Manhattan、Brooklyn)
zone String, -- 社区名称
service_zone String -- 服务区域
)
PRIMARY KEY LocationID -- 主键(映射 trips 表的 gid 字段)
SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames'))
LIFETIME(MIN 0 MAX 0) -- 禁用自动更新(避免频繁访问 S3)
LAYOUT(HASHED_ARRAY()) -- 字典存储结构
$$, 'host=localhost dbname=taxi'); -- 连接参数
- 字典数据说明:CSV 文件包含纽约市社区与行政区的映射关系,
LocationID对应trips表的pickup_nyct2010_gid和dropoff_nyct2010_gid字段。
2. 导入字典到 PostgreSQL
IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary) FROM SERVER taxi_srv INTO taxi;
3. 验证字典查询
taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3;
LocationID | Borough | Zone | service_zone
------------+-----------+-----------------------------------------------+--------------
77 | Brooklyn | East New York/Pennsylvania Avenue | Boro Zone
106 | Brooklyn | Gowanus | Boro Zone
103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone
(3 行记录)
4. 使用字典查询(dictGet 函数)
通过 dictGet 函数(pg_clickhouse 自定义函数)关联字典数据,统计前往两大机场的行程按出发行政区分组的数量:
taxi=# SELECT
count(1) AS total,
COALESCE(NULLIF(dictGet(
'taxi.taxi_zone_dictionary', 'Borough',
toUInt64(pickup_nyct2010_gid) -- 转换为字典主键类型
), ''), 'Unknown') AS borough_name
FROM taxi.trips
WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138 -- 目的地为 JFK/LGA 机场
GROUP BY borough_name
ORDER BY total DESC;
total | borough_name
-------+---------------
23683 | Unknown
7053 | Manhattan
6828 | Brooklyn
4458 | Queens
2670 | Bronx
554 | Staten Island
53 | EWR
(7 行记录)
时间:66.245 毫秒
dictGet(dict_name, col_name, key):从字典中根据主键key查询指定列col_name的值。
执行关联查询(JOIN)
将 trips 表与字典表关联,实现更复杂的分析。
1. 简单关联查询(替代 dictGet)
taxi=# SELECT
count(1) AS total,
"Borough" AS borough_name
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
WHERE pickup_nyct2010_gid > 0 -- 过滤无效 gid
AND dropoff_nyct2010_gid IN (132, 138) -- 目的地为机场
GROUP BY "Borough"
ORDER BY total DESC;
total | borough_name
-------+---------------
7053 | Manhattan
6828 | Brooklyn
4458 | Queens
2670 | Bronx
554 | Staten Island
53 | EWR
(6 行记录)
时间:48.449 毫秒
- 说明:该查询结果与前文
dictGet示例一致(不含Unknown行),底层 ClickHouse 会自动优化为字典查询,JOIN 语法更符合 SQL 开发者习惯。
2. 查看关联查询计划
taxi=# explain SELECT
count(1) AS total,
"Borough"
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
WHERE pickup_nyct2010_gid > 0
AND dropoff_nyct2010_gid IN (132, 138)
GROUP BY "Borough"
ORDER BY total DESC;
查询计划
-----------------------------------------------------------------------
外部扫描 (cost=1.00..5.10 rows=1000 width=40)
Relations: 聚合 on ((trips) INNER JOIN (taxi_zone_dictionary))
(2 行记录)
时间:2.012 毫秒
- 结果表明:关联查询已完全下推至 ClickHouse 执行,PostgreSQL 仅接收最终结果。
3. 复杂关联查询(筛选高小费行程)
查询小费金额前 1000 的行程,并关联字典获取下车点行政区信息:
taxi=# SELECT *
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID"
WHERE tip_amount > 0
ORDER BY tip_amount DESC
LIMIT 1000;
注意事项
- 避免使用
SELECT *:查询时应明确指定所需列,减少数据传输量,提升性能。 - 数据类型兼容性:关联查询时需确保关联字段类型一致(如使用
toUInt64转换类型),避免类型不匹配导致查询失败。 - 字典与表关联:ClickHouse 字典本质是内存中的维度表,关联查询性能优于普通表 JOIN,适合高频访问的维度数据。




