背景及需求
工作中我们遇到了把 Kafka 数据同步到 Clickhouse(CH) 的业务需求 一开始我们是使用 CH 提供的 Kafka 引擎来实现 经过短暂的使用发现,CH 的 Kafka 引擎虽然能够满足同步数据的需求 但实际业务场景中的有些重要的需求还是无法满足的
数据类型转换、值转化等,然后再插入 CH
数据入库失败转储
自动化开启和暂停数据同步功能
同步数据时的日志分析
这不是ETL要干的事儿吗?!
但是说到ETL,不是有专业强大的 Logstash 吗,为什么要重复造轮子?
经过一番调研,还真有人写了 Logstash 插件 logstash-output-clickhouse
插件地址:https://github.com/mikechris/logstash-output-clickhouse
安装指南
在线安装
按照文档说的,执行 bin/logstash-plugin install logstash-output-clickhouse
sh-4.2$: bin/logstash-plugin install logstash-output-clickhouseValidating logstash-output-clickhousePlugin logstash-output-clickhouse does not existERROR: Installation aborted, verification failed for logstash-output-clickhouse
从上面的输出信息中可以看出,该插件已经不存在了,所以我们只能换种方式,使用源码进行安装。
构建 gem 并安装本地版本
先 clone 源码
sh-4.2$: git clone https://github.com/mikechris/logstash-output-clickhouse.gitsh-4.2$: cd logstash-output-clickhouse
编译 gemspec 文件
若没有安装 ruby 环境,请先执行 yum install rubygems -y
sh-4.2$: yum install rubygems -ysh-4.2$: gem build logstash-output-clickhouse.gemspe
编译成功后,会多出一个文件 logstash-output-clickhouse-0.1.0.gem
使用 .gem 文件安装插件_
sh-4.2$: logstash-plugin install logstash-output-clickhouse-0.1.0.gemValidating logstash-output-clickhouse-0.1.0.gemInstalling logstash-output-clickhouseInstallation successful
如果输出以上日志,说明 logstash-output-clickhouse 插件安装完成。
避坑指南
logstash-mixin-http_client 版本兼容问题
sh-4.2$: logstash-plugin install logstash-output-clickhouse-0.1.0.gemValidating ./logstash-output-clickhouse/logstash-output-clickhouse-0.1.0.gemInstalling logstash-output-clickhousePlugin version conflict, abortingERROR: Installation Aborted, message: Bundler could not find compatible versions for gem "logstash-mixin-http_client":In snapshot (Gemfile.lock):logstash-mixin-http_client (= 5.2.0)In Gemfile:logstash-mixin-http_client (< 6.0.0, >= 5.0.0) javalogstash-output-clickhouse (= 0.1.0) java depends onlogstash-mixin-http_client (< 7.0.0, >= 6.0.0) javaRunning `bundle update` will rebuild your snapshot from scratch, using onlythe gems in your Gemfile, which may resolve the conflict.
logstash-core-plugin-api 版本兼容问题
sh-4.2$: logstash-plugin install logstash-output-clickhouse-0.1.0.gemERROR: Installation Aborted, message: Bundler could not find compatible versions for gem "logstash-core-plugin-api":In snapshot (Gemfile.lock):logstash-core-plugin-api (= 2.1.16)In Gemfile:logstash-devutils (= 1.3.5) java was resolved to 1.3.5, which depends onlogstash-core-plugin-api (>= 2.0, <= 2.99) javalogstash-output-json_batch (= 0.2.0) java was resolved to 0.2.0, which depends onlogstash-core-plugin-api (~> 1.0) javaRunning `bundle update` will rebuild your snapshot from scratch, using only
如果 Logstash 版本是5.x,可能会遇到以下两种错误: 遇到以上两种问题,是因为 Logstash 版本兼容问题,解决方案有两种
按照提示,修改 logstash-output-clickhouse-0.1.0.gem 即可
选择兼容性更高的 Logstash,推荐使用 Logstash 6.0.0 到 Logstash 7.0.0 之间版本
ruby 的 gem 源问题
sh-4.2$: logstash-plugin install logstash-output-clickhouse-0.1.0.gemFetching source index from https://rubygems.org/Retrying fetcher due to error (2/4): Bundler::HTTPError Could not fetch specs from https://rubygems.org/Retrying fetcher due to error (3/4): Bundler::HTTPError Could not fetch specs from https://rubygems.org/Retrying fetcher due to error (4/4): Bundler::HTTPError Could not fetch specs from https://rubygems.org/Could not fetch specs from https://rubygems.org/
如果出现上边的情况,是因为 ruby 的 gem 源默认使用的是国外源地址,只需将源地址修改为国内源即可
# 第一步:查看当前使用的源地址sh-4.2$: gem source*** CURRENT SOURCES ***https://rubygems.org/# 第二步:删除默认的源地址 https://rubygems.org/sh-4.2$: gem sources -r https://rubygems.org/https://rubygems.org/ removed from sources# 第三步:添加国内源 https://gems.ruby-china.com/sh-4.2$: gem sources -a https://gems.ruby-china.com/https://gems.ruby-china.com/ added to sources# 第四步:更新源的缓存sh-4.2: gem sources -usource cache successfully updated+
插件配置信息
output {clickhouse {http_hosts => ["http://127.0.0.1:8123/"]table => "default.table_name"request_tolerance => 1mutations => {id => idname => name}}}
其他自定义选项
save_on_failure:(默认值:true) - 启用/禁用请求正文保存失败
save_dir:(默认: tmp) - 失败的请求正文将被保存的目录
automatic_retries:(默认值: 1) - 连接重试到每个主机的次数 http_hosts
request_tolerance:默认值:5)- 如果响应状态代码不是 200,则 http 请求发送重试的次数
backoff_time:(默认值:3)- 等待下一次连接或请求重试尝试的时间(以秒为单位)
flush_size:(默认值:5)批量大小为 5
idle_flush_time:(默认值:5)每次发送最多等待时间(以秒为单位)
实战部分
本人使用的是 Docker 容器构建,在此附上 Dockerfile 配置文件
FROM logstash:6.8.0WORKDIR data/logstash-output-clickhouse/# 使用 root 用户访问USER rootRUN yum install git -y \&& git clone https://github.com/mikechris/logstash-output-clickhouse.git ./ \&& logstash-plugin install logstash-filter-prune \# 安装 rubygem(ruby 包管理器)&& yum install rubygems -y \# 删除默认的 rubygem 源地址&& gem sources -r https://rubygems.org/ \# 添加 rubygem 国内源地址&& gem sources -a https://gems.ruby-china.com/ \# 更新源的缓存&& gem sources -u \&& gem build logstash-output-clickhouse.gemspec \&& logstash-plugin install logstash-output-clickhouse-0.1.0.gemENTRYPOINT ["/usr/local/bin/docker-entrypoint","-f ","your.conf"]
额外补充
入库失败告警
因为 CH 对数据要求严格 所以需要预先创建对应的表结构 之后才能进行导入数据的操作 如果单条数据因为某个字段的类型不匹配 导致入库失败,整条日志将被舍弃
上边提到,logstash-output-clickhouse 插件在与 CH 交互失败时,会将日志保存到 save_dir 对应的目录中 我们可以通过编写脚本,定时查询 save_dir 对应的目录下是否用文件产生,如果有,及时发送报警信息。
及时监控 CH 中未创建的字段
如果单条数据中的某个参数,未在 CH 表中创建对应的字段 则该参数将会将被舍弃,但不影响整条数据入库
在 Logstash 的 filter 中,判断接收到的数据参数中,是否有 CH 表中未创建字段 如果有的,可以采用如下方案
将该条数据写入指定的文件,编写脚本定时监控文件内容并报警,从而驱动开发人员主动去添加字段,之后可以将文件中的内容重新导入到 CH 中。
同时也可以将该条数据同步到 CH 中,缺点是会舍弃未在 CH 中创建的字段对应的参数。




