暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

logstash-output-clickhouse 二次封装

吾以梦为马 2021-08-09
2047

1. save_file 按天生成

save_file 字段,是用来配置 push 失败后,记录原始数据的文件名

在我们日常开发中,经常习惯将日志按天维度存放

这样既方便查询历史日志,也方便后续按天维度去修复数据


在源码 clickhouse.rb 文件中找到 save_to_disk 函数,如下

      def save_to_disk(documents)
    begin
    file = File.open("#{save_dir}/#{table}_#{save_file}", "a")
    file.write(documents)
    rescue IOError => e
    log_failure("An error occurred while saving file to disk: #{e}",
    :file_name => file_name)
    ensure
    file.close unless file.nil?
    end
    end

    修改之后

        def save_to_disk(documents)
      begin
      day = Time.new.strftime("%Y-%m-%d")
      file = File.open("#{save_dir}/#{table}_#{day}_#{save_file}", "a")
      file.write(documents)
      rescue IOError => e
      log_failure("An error occurred while saving file to disk: #{e}",
      :file_name => file_name)
      ensure
      file.close unless file.nil?
      end
      end


      如果使用的是 docker 服务部署,获取到的可能是 UTC 时间

      想要自定义时区,执行以下命令即可

        rm -f etc/localtime \
        ln -sv usr/share/zoneinfo/Asia/Chongqing etc/localtime \
        # Asia/Chongqing 是重庆时区,根据实际需求,替换成想要的时区即可
        echo "Asia/Chongqing" > /etc/timezone

        修改前生成的日志文件:table_failed.json

        修改后生成的日志文件:table_2021-08-05_failed.json


        2. push 失败日志记录

        插件本身是支持将 push 失败的数据保存到日志文件中

        但只会记录原始数据,具体失败的原因是没有记录的


        很多时候,我们需要知道每条数据具体的失败原因

        从而更有针对性的做出相应修改

        这就需要将每条数据与失败原因一一对应起


        修改思路如下

        1. 获取每个 push 请求失败后返回的内容

        2. save_to_disk 函数增加传入失败原因的参数

        3. 原始数据增加 @err_msg 字段,用来标记失败原因

        4. 将组合后的数据写入到日志文件


        具体代码如下:

           # make_request 函数部分内容   
          request.on_success do |response|
          # Make sure we return the token to the pool
          @request_tokens << token


          if response.code == 200
          @logger.debug("Successfully submitted",
          :size => documents.length,
          :response_code => response.code,
          :uuid => uuid)
          else
          if req_count >= @request_tolerance
          log_failure(
          "Encountered non-200 HTTP code #{response}",
          :response_code => response.code,
          :message => response.message,
          :response => response.body,
          :url => url,
          :size => documents.length,
          :uuid => uuid)
          if @save_on_failure
          save_to_disk(documents, response.body) # documents:原始日志。response.body:失败原因
          end
          else
          @logger.info("Retrying request", :url => url, :message => response.message, :response => response.body, :uuid => uuid)
          delay_attempt(req_count, @backoff_time)
          make_request(documents, hosts, query, con_count, req_count+1, host, uuid)
          end
          end
          end


          # save_to_disk
          def save_to_disk(documents, opts)
          begin
          time = Time.new
          day = time.strftime("%Y-%m-%d")
          documents_obj= JSON.parse(documents)
          documents_obj["@err_msg"] = opts
          documents_str = JSON.generate(documents_obj)
          file = File.open("#{save_dir}/#{table}_#{day}_#{save_file}", "a")
          file.write("#{documents_str}\n")
          rescue IOError => e
          log_failure("An error occurred while saving file to disk: #{e}",
          :file_name => file_name)
          ensure
          file.close unless file.nil?
          end
          end


          这儿需要注意三点

          1. 需要将 documents 转换成对象 documents_obj,只有对象才能添加字段。

          2. 在写入文件之前,需要将对象转换成字符串,否则写入文件的内容很有可能是这样的。

              {"name" => "xiaoming","age"=> 18}
            • 每次写入文件的时候,都需要在尾行添加换行符。


            3. 动态设置 mutations 内容

            mutations 字段的作用有两个

            1. 用来配置 CH 表中的哪些字段可以被插入数据。

            2. 用来约束原始数据参数与 CH 表结构字段对应关系。

            通过增加 mutation_path 字段,实现动态更改 mutations 字段内容的。


            我们的实际业务需求是,将同一类型的数据,按照不同的条件,插入到不同的表中。

            而数据格式和表结构是完全一样的


            修改前,插件使用方式如下:

              output {
              if [area] == "shanxi"{ # 山西用户插入山西的表中
              clickhouse {
              http_hosts => ["http://172.0.0.1:8123/"]
              table => "shanxi"
              mutations => {
              user_id => user_id
              user_name => user_name
              date => date
              time => time
              ip => ip
              platfrom => platfrom
              version => version
              level => level
              }
              }
              }else if [area] == "beijing"{ # 北京用户插入北京的表中
              clickhouse {
              http_hosts => ["http://172.0.0.1:8123/"]
              table => "beijing"
              mutations => {
              user_id => user_id
              user_name => user_name
              date => date
              time => time
              ip => ip
              platfrom => platfrom
              version => version
              level => level
              }
              }
              }else if [area] == "hebei"{ # 北京用户插入北京的表中
              clickhouse {
              http_hosts => ["http://172.0.0.1:8123/"]
              table => "hebei"
              mutations => {
              user_id => user_id
              user_name => user_name
              date => date
              time => time
              ip => ip
              platfrom => platfrom
              version => version
              level => level
              }
              }
              }
              }


              以上 mutations 字段只是举例,实际业务中可能有几十个以上

              单单 output 这一块,大概就会有好几百行内容


              后期随着业务不断增加,会有越来越多的表

              整个 output 内容会越来越庞大。


              可否将  mutations 字段内容以文件的方式动态传入呢?


              答案是肯定的。


              修改思路:

              1. 通过查看源码发现,mutations 字段是 hash 类型,默认为 {}。

              2. 增加 mutation_path 字段,传入文件路径。

              3. 将原 mutations 字段内容以 json 格式写入文件。

              4. 在插件初始化的时候,读取 mutation_path 内容,并转换为 hash 对象。

              5. 重新赋值给 mutations 变量,实现 mutations 参数的动态修改。


              注意:为了插件使用多样性,同时也兼容旧版本,只有当 mutations 字段为空时,才会读取 mutation_path 文件中的内容。


                  def register
                if @mutations.empty? == false
                puts "使用 mutations 配置的内容"
                elsif @mutation_path != ""
                puts "mutations 字段为空,使用 #{mutation_path} 配置文件内容"
                mutation_str = File.read(@mutation_path)
                @mutations = JSON.parse(mutation_str)
                else
                puts "mutations、mutation_path 都为空,使用默认值 {}"
                end
                puts @mutations
                @request_tokens = SizedQueue.new(@pool_max)
                @pool_max.times {|t| @request_tokens << true }
                @requests = Array.new
                @http_query = "/?query=INSERT%20INTO%20#{table}%20FORMAT%20JSONEachRow"


                @hostnames_pool =
                parse_http_hosts(http_hosts,
                ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))


                buffer_initialize(
                :max_items => @flush_size,
                :max_interval => @idle_flush_time,
                :logger => @logger
                )


                print_plugin_info()
                end


                修改后,插件使用方式如下:

                  output {
                  if [area] == "shanxi"{ # 山西用户插入山西的表中
                  clickhouse {
                  http_hosts => ["http://172.0.0.1:8123/"]
                  table => "shanxi"
                  mutation_path => "/usr/share/logstash/config/clickhouse_mutation.json"
                  }
                  }else if [area] == "beijing"{ # 北京用户插入北京的表中
                  clickhouse {
                  http_hosts => ["http://172.0.0.1:8123/"]
                  table => "beijing"
                  mutation_path => "/usr/share/logstash/config/clickhouse_mutation.json"
                  }
                  }else if [area] == "hebei"{ # 北京用户插入北京的表中
                  clickhouse {
                  http_hosts => ["http://172.0.0.1:8123/"]
                  table => "hebei"
                  mutation_path => "/usr/share/logstash/config/clickhouse_mutation.json"
                  }
                  }
                  }


                  /usr/share/logstash/config/clickhouse_mutation.json 内容

                    {
                    "user_id":"user_id",
                    "user_name":"user_name",
                    "date":"date",
                    "time":"time",
                    "ip":"ip",
                    "platfrom":"platfrom",
                    "version":"version",
                    "level":"level"
                    }


                    修改之后维护起来就很简单了

                    每次修改表结构完成后

                    只需要更改相应的 clickhouse_mutation.json 内容即可。

                    文章转载自吾以梦为马,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                    评论