暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Logstash将数据从kafka导入es

晨起临风 2021-07-28
1191

主要面向使用ruby插件处理数据的开发者。

Logstash从数据流中抽象出event对象来表示数据流中的基本数据单元。目标是提供尽可能简单的api。方便操作event的内容。典型场景是使用ruby插件处理数据-数据清洗,数据转换等操作。Event对象包括原始数据和数据过滤时的一些额外数据。


5.0版本后的Logstash可以使用Ruby操作hash对象的方式直接操作数据。

event[field] = foo

尽可能的抽象,忽略细节,提供简单的getter和setter API。


Get API

Event.get(key)

如果值不存在会返回空。


可以像使用多维数组的方式使用hash对象。

event.get(“[foo][bar]”)


Set API

event.set(key,value) 改变对象的值,如果key不存在,则新建key。


value 的值可以为普通值或者对象。

推荐的写法:

    h={“a”=>1, “b”=>2, “c”=>[1,2]}
    event.set(“[foo][bar]”, h)




    h[“c”] = [3,4]
    event.set(“[foo][bar]”, h)




    event.get(“[foo][bar][c]”) # => [3,4]




    Ruby Filter




    filter {
    ruby {
    code => 'event.set("lowercase_field", event.get("message").downcase)'
    }
    }



    将message转为小写


    2 实战部分

    1 将内容解压缩

      input {
      kafka {
      bootstrap_servers => ["ip:port"]
      group_id => “group-id"
      auto_offset_reset => "latest"
      topics => ["topic"]
      consumer_threads => 6
      decorate_events => false
      codec => "json"
      }
      kafka {
      bootstrap_servers => [“ip:port"]
      group_id => "ddc-collect-to-es"
      auto_offset_reset => "latest"
      topics => ["topic"]
      consumer_threads => 6
      decorate_events => false
      codec => "json"
      }

      }




      filter {
      ruby {
      init => "require 'base64';require 'zlib';require 'stringio'"
      code => "
      event.set('Result',Zlib::GzipReader.new(StringIO.new(Base64.decode64(event.get('Result')))).read())

              log = event.get('Log')

              log['Detail']=Zlib::GzipReader.new(StringIO.new(Base64.decode64(log['Detail']))).read()

              event.set('TraceLog', log)
      "
      }
      }


      output {
      elasticsearch {
      hosts => [“es-host:port”]
      index => “index-name"
      document_type => “index-name"
      codec => "json"
      }
      }


      遍历内容字段

        input {
        kafka {
        bootstrap_servers => ["ip:port"]
        group_id => “log_test"
        auto_offset_reset => "latest"
        topics => [“topic1,topic2"]
        consumer_threads => 6
        decorate_events => false
        codec => "json"
        }
        }


        filter {
        ruby {
        init => "require 'base64';require 'zlib';require 'stringio'"
        code => "
        array = event.get('Html')
        array.each do |value|
        html_base64=Base64.decode64(value['Html'])
        html_ungzip = Zlib::GzipReader.new(StringIO.new(html_base64)).read()
        value['PageSource']=html_ungzip
        end


                       kv = envet.get('Log')
        kv.each do |k,v|
        event.set(k,v)
        end
        "
        }

        json{
        source => "Result"
                  target => "result" # 使用多层结构
        }


        mutate{
                  remove_field => ["Result","Log"# 已转为多层结构,原始数据可以删除
        }
        mutate{
        add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }
        }
        mutate{
        convert => { "RequestTimeSpan" => "float" }
        }

        urldecode{
        field => "Url"
        }

        }


        output {
        stdout { codec => rubydebug }
        elasticsearch {
        hosts => [“es-ip:port”]
        index => “result-index"
        document_type => "result-index"
        codec => "json"
        }
        }




        文章转载自晨起临风,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论