1. save_file 按天生成
save_file 字段,是用来配置 push 失败后,记录原始数据的文件名
在我们日常开发中,经常习惯将日志按天维度存放
这样既方便查询历史日志,也方便后续按天维度去修复数据
在源码 clickhouse.rb 文件中找到 save_to_disk 函数,如下
def save_to_disk(documents)beginfile = File.open("#{save_dir}/#{table}_#{save_file}", "a")file.write(documents)rescue IOError => elog_failure("An error occurred while saving file to disk: #{e}",:file_name => file_name)ensurefile.close unless file.nil?endend
修改之后
def save_to_disk(documents)beginday = Time.new.strftime("%Y-%m-%d")file = File.open("#{save_dir}/#{table}_#{day}_#{save_file}", "a")file.write(documents)rescue IOError => elog_failure("An error occurred while saving file to disk: #{e}",:file_name => file_name)ensurefile.close unless file.nil?endend
如果使用的是 docker 服务部署,获取到的可能是 UTC 时间
想要自定义时区,执行以下命令即可
rm -f etc/localtime \ln -sv usr/share/zoneinfo/Asia/Chongqing etc/localtime \# Asia/Chongqing 是重庆时区,根据实际需求,替换成想要的时区即可echo "Asia/Chongqing" > /etc/timezone
修改前生成的日志文件:table_failed.json
修改后生成的日志文件:table_2021-08-05_failed.json
2. push 失败日志记录
插件本身是支持将 push 失败的数据保存到日志文件中
但只会记录原始数据,具体失败的原因是没有记录的
很多时候,我们需要知道每条数据具体的失败原因
从而更有针对性的做出相应修改
这就需要将每条数据与失败原因一一对应起
修改思路如下
获取每个 push 请求失败后返回的内容
save_to_disk 函数增加传入失败原因的参数
原始数据增加 @err_msg 字段,用来标记失败原因
将组合后的数据写入到日志文件
具体代码如下:
# make_request 函数部分内容request.on_success do |response|# Make sure we return the token to the pool@request_tokens << tokenif response.code == 200@logger.debug("Successfully submitted",:size => documents.length,:response_code => response.code,:uuid => uuid)elseif req_count >= @request_tolerancelog_failure("Encountered non-200 HTTP code #{response}",:response_code => response.code,:message => response.message,:response => response.body,:url => url,:size => documents.length,:uuid => uuid)if @save_on_failuresave_to_disk(documents, response.body) # documents:原始日志。response.body:失败原因endelse@logger.info("Retrying request", :url => url, :message => response.message, :response => response.body, :uuid => uuid)delay_attempt(req_count, @backoff_time)make_request(documents, hosts, query, con_count, req_count+1, host, uuid)endendend# save_to_diskdef save_to_disk(documents, opts)begintime = Time.newday = time.strftime("%Y-%m-%d")documents_obj= JSON.parse(documents)documents_obj["@err_msg"] = optsdocuments_str = JSON.generate(documents_obj)file = File.open("#{save_dir}/#{table}_#{day}_#{save_file}", "a")file.write("#{documents_str}\n")rescue IOError => elog_failure("An error occurred while saving file to disk: #{e}",:file_name => file_name)ensurefile.close unless file.nil?endend
这儿需要注意三点
需要将 documents 转换成对象 documents_obj,只有对象才能添加字段。
在写入文件之前,需要将对象转换成字符串,否则写入文件的内容很有可能是这样的。
{"name" => "xiaoming","age"=> 18}每次写入文件的时候,都需要在尾行添加换行符。
3. 动态设置 mutations 内容
mutations 字段的作用有两个
用来配置 CH 表中的哪些字段可以被插入数据。
用来约束原始数据参数与 CH 表结构字段对应关系。
通过增加 mutation_path 字段,实现动态更改 mutations 字段内容的。
我们的实际业务需求是,将同一类型的数据,按照不同的条件,插入到不同的表中。
而数据格式和表结构是完全一样的
修改前,插件使用方式如下:
output {if [area] == "shanxi"{ # 山西用户插入山西的表中clickhouse {http_hosts => ["http://172.0.0.1:8123/"]table => "shanxi"mutations => {user_id => user_iduser_name => user_namedate => datetime => timeip => ipplatfrom => platfromversion => versionlevel => level}}}else if [area] == "beijing"{ # 北京用户插入北京的表中clickhouse {http_hosts => ["http://172.0.0.1:8123/"]table => "beijing"mutations => {user_id => user_iduser_name => user_namedate => datetime => timeip => ipplatfrom => platfromversion => versionlevel => level}}}else if [area] == "hebei"{ # 北京用户插入北京的表中clickhouse {http_hosts => ["http://172.0.0.1:8123/"]table => "hebei"mutations => {user_id => user_iduser_name => user_namedate => datetime => timeip => ipplatfrom => platfromversion => versionlevel => level}}}}
以上 mutations 字段只是举例,实际业务中可能有几十个以上
单单 output 这一块,大概就会有好几百行内容
后期随着业务不断增加,会有越来越多的表
整个 output 内容会越来越庞大。
可否将 mutations 字段内容以文件的方式动态传入呢?
答案是肯定的。
修改思路:
通过查看源码发现,mutations 字段是 hash 类型,默认为 {}。
增加 mutation_path 字段,传入文件路径。
将原 mutations 字段内容以 json 格式写入文件。
在插件初始化的时候,读取 mutation_path 内容,并转换为 hash 对象。
重新赋值给 mutations 变量,实现 mutations 参数的动态修改。
注意:为了插件使用多样性,同时也兼容旧版本,只有当 mutations 字段为空时,才会读取 mutation_path 文件中的内容。
def registerif @mutations.empty? == falseputs "使用 mutations 配置的内容"elsif @mutation_path != ""puts "mutations 字段为空,使用 #{mutation_path} 配置文件内容"mutation_str = File.read(@mutation_path)@mutations = JSON.parse(mutation_str)elseputs "mutations、mutation_path 都为空,使用默认值 {}"endputs @mutations@request_tokens = SizedQueue.new(@pool_max)@pool_max.times {|t| @request_tokens << true }@requests = Array.new@http_query = "/?query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow"@hostnames_pool =parse_http_hosts(http_hosts,ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))buffer_initialize(:max_items => @flush_size,:max_interval => @idle_flush_time,:logger => @logger)print_plugin_info()end
修改后,插件使用方式如下:
output {if [area] == "shanxi"{ # 山西用户插入山西的表中clickhouse {http_hosts => ["http://172.0.0.1:8123/"]table => "shanxi"mutation_path => "/usr/share/logstash/config/clickhouse_mutation.json"}}else if [area] == "beijing"{ # 北京用户插入北京的表中clickhouse {http_hosts => ["http://172.0.0.1:8123/"]table => "beijing"mutation_path => "/usr/share/logstash/config/clickhouse_mutation.json"}}else if [area] == "hebei"{ # 北京用户插入北京的表中clickhouse {http_hosts => ["http://172.0.0.1:8123/"]table => "hebei"mutation_path => "/usr/share/logstash/config/clickhouse_mutation.json"}}}
/usr/share/logstash/config/clickhouse_mutation.json 内容
{"user_id":"user_id","user_name":"user_name","date":"date","time":"time","ip":"ip","platfrom":"platfrom","version":"version","level":"level"}
修改之后维护起来就很简单了
每次修改表结构完成后
只需要更改相应的 clickhouse_mutation.json 内容即可。




