主要面向使用ruby插件处理数据的开发者。
Logstash从数据流中抽象出event对象来表示数据流中的基本数据单元。目标是提供尽可能简单的api。方便操作event的内容。典型场景是使用ruby插件处理数据-数据清洗,数据转换等操作。Event对象包括原始数据和数据过滤时的一些额外数据。
5.0版本后的Logstash可以使用Ruby操作hash对象的方式直接操作数据。
event[field] = foo
尽可能的抽象,忽略细节,提供简单的getter和setter API。
Get API
Event.get(key)
如果值不存在会返回空。
可以像使用多维数组的方式使用hash对象。
event.get(“[foo][bar]”)
Set API
event.set(key,value) 改变对象的值,如果key不存在,则新建key。
value 的值可以为普通值或者对象。
推荐的写法:
h={“a”=>1, “b”=>2, “c”=>[1,2]}event.set(“[foo][bar]”, h)h[“c”] = [3,4]event.set(“[foo][bar]”, h)event.get(“[foo][bar][c]”) # => [3,4]Ruby Filterfilter {ruby {code => 'event.set("lowercase_field", event.get("message").downcase)'}}
将message转为小写
2 实战部分
1 将内容解压缩
input {kafka {bootstrap_servers => ["ip:port"]group_id => “group-id"auto_offset_reset => "latest"topics => ["topic"]consumer_threads => 6decorate_events => falsecodec => "json"}kafka {bootstrap_servers => [“ip:port"]group_id => "ddc-collect-to-es"auto_offset_reset => "latest"topics => ["topic"]consumer_threads => 6decorate_events => falsecodec => "json"}}filter {ruby {init => "require 'base64';require 'zlib';require 'stringio'"code => "event.set('Result',Zlib::GzipReader.new(StringIO.new(Base64.decode64(event.get('Result')))).read())log = event.get('Log')log['Detail']=Zlib::GzipReader.new(StringIO.new(Base64.decode64(log['Detail']))).read()event.set('TraceLog', log)"}}output {elasticsearch {hosts => [“es-host:port”]index => “index-name"document_type => “index-name"codec => "json"}}
2 遍历内容字段
input {kafka {bootstrap_servers => ["ip:port"]group_id => “log_test"auto_offset_reset => "latest"topics => [“topic1,topic2"]consumer_threads => 6decorate_events => falsecodec => "json"}}filter {ruby {init => "require 'base64';require 'zlib';require 'stringio'"code => "array = event.get('Html')array.each do |value|html_base64=Base64.decode64(value['Html'])html_ungzip = Zlib::GzipReader.new(StringIO.new(html_base64)).read()value['PageSource']=html_ungzipendkv = envet.get('Log')kv.each do |k,v|event.set(k,v)end"}json{source => "Result"target => "result" # 使用多层结构}mutate{remove_field => ["Result","Log"] # 已转为多层结构,原始数据可以删除}mutate{add_field => { "topic_name" => "%{[@metadata][kafka][topic]}" }}mutate{convert => { "RequestTimeSpan" => "float" }}urldecode{field => "Url"}}output {stdout { codec => rubydebug }elasticsearch {hosts => [“es-ip:port”]index => “result-index"document_type => "result-index"codec => "json"}}
文章转载自晨起临风,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




