你的位置:首页 > 数据库

[数据库]Logstash mutate 插件【翻译+实践】


地址:https://www.elastic.co/guide/en/logstash/2.2/plugins-filters-mutate.html

本文内容

  • 语法

mutate 插件可以在字段上执行变换,包括重命名、删除、替换和修改。这个插件相当常用。

比如:

你已经根据 Grok 表达式将 Tomcat 日志的内容放到各个字段中,想把状态码、字节大小或是响应时间,转换成整型;

你已经根据正则表达式将日志内容放到各个字段中,但是字段的值,大小写都有,这对于 Elasticsearch 的全文检索来说,显然用处不大,那么可以用该插件,将字段内容全部转换成小写。

语法


该插件必须是用 mutate 包裹,如下所示:

mutate {}


可用的配置选项如下表所示:

设置 输入类型 是否必填 默认值
add_field hash No {}
add_tag array No []
convert hash No  
gsub array No  
join hash No  
lowercase array No  
merge hash No  
periodic_flush boolean No false
remove_field array No []
remove_tag array No []
rename hash No  
replace hash No  
split hash No  
strip array No  
update hash No  
uppercase array No  

其中,add_field、remove_field、add_tag、remove_tag 是所有 Logstash 插件都有。它们在插件过滤成功后生效。虽然 Logstash 叫过滤,但不仅仅过滤功能。

tag 的作用是,当你对字段处理期间,你还期望进行后续处理,就先添加了标记。Logstash 有个内置 tags 数组,包含了期间产生的 tag,无论是 Logstash 自己产生的,还是你添加的,比如,你用 grok 解析日志,结果错了,那么 Logstash 自己就会自己添加一个 tag:_grokparsefailure。这样,你在output时,可以对解析失败的日志不做任何处理;而 field 的作用,是对字段的操作,比如,你想利用已有的字段,创建新的字段。这些在后面再说。

下面具体介绍各个选项。

你会发现,所有选项要么是动词,要么是动宾短语,估计你也猜到了,这些选项其实都是 ruby 函数,而它们后面,即“=>”,跟着的肯定是一堆参数。要是你写程序,你也会这么干,第一个参数,肯定是字段,也就是你期望改函数作用在哪个字段上,从第二个字段开始往后,是具体参数~

什么是字段?比如,你想解析 Tomcat 日志,把一行访问日志拆分后,得到客户端IP、字节大小、响应时间等放到指定变量,那么这个变量就是字段。

测试数据


假设有 Tomcat access 日志:

192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET "/goLogin" "" 8080 200 1692 23 "http://10.1.8.193:8080/goMain" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0"
192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET "/js/common/jquery-1.10.2.min.js" "" 8080 304 - 67 "http://10.1.8.193:8080/goLogin" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0"
192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET "/css/common/login.css" "" 8080 304 - 75 "http://10.1.8.193:8080/goLogin" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0"
192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET "/js/system/login.js" "" 8080 304 - 53 "http://10.1.8.193:8080/goLogin" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0"


它是按如下配置产生的:

<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
        prefix="localhost_access_log." suffix=".txt"
        pattern="%h %l %u %t %m &quot;%U&quot; &quot;%q&quot; %p %s %b %D &quot;%{Referer}i&quot; &quot;%{User-Agent}i&quot;" />


Grok


假设用如下 Grok 解析该日志:

%{IPORHOST:clientip} %{NOTSPACE:identd} %{NOTSPACE:auth} \[%{HTTPDATE:timestamp}\] %{WORD:http_method} %{NOTSPACE:request} %{NOTSPACE:request_query|-} %{NUMBER:port} %{NUMBER:statusCode} (%{NOTSPACE:bytes}|-) %{NUMBER:reqTime} %{QS:referer} %{QS:userAgent}


会得到如下结果:

{
     "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/goLogin\" \"\" 8080 200 1692 23 \"http://10.1.8.193:8080/goMain\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
     "@version" => "1",
    "@timestamp" => "2016-05-17T08:26:07.794Z",
       "host" => "vcyber",
     "clientip" => "192.168.6.25",
      "identd" => "-",
       "auth" => "-",
    "timestamp" => "24/Apr/2016:01:25:53 +0800",
   "http_method" => "GET",
     "request" => "\"/goLogin\"",
  "request_query" => "\"\"",
       "port" => "8080",
    "statusCode" => "200",
      "bytes" => "1692",
     "reqTime" => "23",
     "referer" => "\"http://10.1.8.193:8080/goMain\"",
    "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\""
}


下面的示例都在此基础上。

注意,日志分割到字段后的数据类型。port、statusCode、bytes、reqTime 字段肯定(最好是)数字,不过这里暂时先用字符串。后面会转换成整型类型。

可配置选项


add_field

  • 值是哈希,就是键值对,比如 add_field => 值。
  • 默认值是空对象,即 {}

添加新的字段。

示例:

input {
    stdin {
    }
}
filter {
    grok {
        match=>["message","%{IPORHOST:clientip} %{NOTSPACE:identd} %{NOTSPACE:auth} \[%{HTTPDATE:timestamp}\] %{WORD:http_method} %{NOTSPACE:request} %{NOTSPACE:request_query|-} %{NUMBER:port} %{NUMBER:statusCode} (%{NOTSPACE:bytes}|-) %{NUMBER:reqTime} %{QS:referer} %{QS:userAgent}"]
    }
    mutate {
        add_field=>{
             "SayHi"=>"Hello , %{clientip}"
        }
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}


注意黑体部分,如果用这个配置,解析前面的 Tcomat access 日志,会得到如下结果:
{
     "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/goLogin\" \"\" 8080 200 1692 23 \"http://10.1.8.193:8080/goMain\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
     "@version" => "1",
    "@timestamp" => "2016-05-17T04:52:02.031Z",
       "host" => "vcyber",
     "clientip" => "192.168.6.25",
      "identd" => "-",
       "auth" => "-",
    "timestamp" => "24/Apr/2016:01:25:53 +0800",
   "http_method" => "GET",
     "request" => "\"/goLogin\"",
  "request_query" => "\"\"",
       "port" => "8080",
    "statusCode" => "200",
      "bytes" => "1692",
     "reqTime" => "23",
     "referer" => "\"http://10.1.8.193:8080/goMain\"",
    "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
      "SayHi" => "Hello , 192.168.6.25"
}


你会看到多了一个“SayHi”字段。这个字段是写死的,当然也可以动态。如果将
"SayHi"=>"Hello , %{clientip}"


改成:
"another_%{clientip}"=>"Hello , %{clientip}" 


你会看到如下结果:
{
         "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/goLogin\" \"\" 8080 200 1692 23 \"http://10.1.8.193:8080/goMain\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
        "@version" => "1",
       "@timestamp" => "2016-05-17T06:38:04.427Z",
          "host" => "vcyber",
        "clientip" => "192.168.6.25",
         "identd" => "-",
          "auth" => "-",
        "timestamp" => "24/Apr/2016:01:25:53 +0800",
       "http_method" => "GET",
         "request" => "\"/goLogin\"",
      "request_query" => "\"\"",
          "port" => "8080",
       "statusCode" => "200",
          "bytes" => "1692",
         "reqTime" => "23",
         "referer" => "\"http://10.1.8.193:8080/goMain\"",
        "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
  "another_192.168.6.25" => "Hello , 192.168.6.25"
}


虽然这个例子不太合理,但你现在知道,用已有字段的值,可以生成新的字段和它的值。
上面示例只添加了一个字段,你也可以添加多个字段:
add_field=>{
    "another_%{clientip}"=>"Hello , %{clientip}"
    "another_%{http_method}"=>"Hello, %{http_method}"
}


add_tag

  • 值是 array 数组
  • 默认值为空数组,即 []

添加新的标签。

示例:

mutate {
    add_tag=>[
        "foo_%{clientip}"
    ]
}


你会看到如下结果:
{
     "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/goLogin\" \"\" 8080 200 1692 23 \"http://10.1.8.193:8080/goMain\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
     "@version" => "1",
    "@timestamp" => "2016-05-17T06:48:43.278Z",
       "host" => "vcyber",
     "clientip" => "192.168.6.25",
      "identd" => "-",
       "auth" => "-",
    "timestamp" => "24/Apr/2016:01:25:53 +0800",
   "http_method" => "GET",
     "request" => "\"/goLogin\"",
  "request_query" => "\"\"",
       "port" => "8080",
    "statusCode" => "200",
      "bytes" => "1692",
     "reqTime" => "23",
     "referer" => "\"http://10.1.8.193:8080/goMain\"",
    "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
       "tags" => [
    [0] "foo_192.168.6.25"
  ]
}


与 add_field 类似,也可以一次添加多个 tags。
注意,add_tag 是数组 [],不是 {}。

convert

  • 值是 hash
  • 无默认值

数据类型转换。

如果要转换成 boolean,那么可接受的数据是:

  • true, t, yes, y, 和 1
  • false, f, no, n, 和 0

另外,还可转成的类型包括:integer, float, string 和 boolean。

示例:

mutate {
    #convert=>["reqTime","integer","statusCode","integer","bytes","integer"]
    convert=>{"port"=>"integer"}
}


convert 有两种写法。得到如下结果:

{
     "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/goLogin\" \"\" 8080 200 1692 23 \"http://10.1.8.193:8080/goMain\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
     "@version" => "1",
    "@timestamp" => "2016-05-17T09:06:25.360Z",
       "host" => "vcyber",
     "clientip" => "192.168.6.25",
      "identd" => "-",
       "auth" => "-",
    "timestamp" => "24/Apr/2016:01:25:53 +0800",
   "http_method" => "GET",
     "request" => "\"/goLogin\"",
  "request_query" => "\"\"",
       "port" => 8080,
    "statusCode" => "200",
      "bytes" => "1692",
     "reqTime" => "23",
     "referer" => "\"http://10.1.8.193:8080/goMain\"",
    "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\""
}


注意,port 字段,已经没有双引号啦。

gsub

  • 值是 array 数组
  • 无默认值

字符串替换。用正则表达式和字符串都行。它只能用于字符串,如果不是字符串,那么什么都不会做,也不会报错。

该配置的值是个数组,三个一组,分别表示:字段名称,待匹配的字符串(或正则表达式),待替换的字符串。

示例:在解析 Tomcat 日志,会遇到一种情况,资源的字节大小,可能会是“-”,因此,需要将“-”,替换成0,然后在用convert转换成数字型。

input {
    stdin {
    }    
}
filter {
    grok {
        match=>["message","%{IPORHOST:clientip} %{NOTSPACE:identd} %{NOTSPACE:auth} \[%{HTTPDATE:timestamp}\] %{WORD:http_method} %{NOTSPACE:request} %{NOTSPACE:request_query|-} %{NUMBER:port} %{NUMBER:statusCode} (%{NOTSPACE:bytes}|-) %{NUMBER:reqTime} %{QS:referer} %{QS:userAgent}"]
    }
    mutate {
        gsub=>["bytes","_","0"]
        convert=>["port","integer","reqTime","integer","statusCode","integer","bytes","integer"]
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}


得到如下结果:

{
     "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/js/common/jquery-1.10.2.min.js\" \"\" 8080 304 - 67 \"http://10.1.8.193:8080/goLogin\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
     "@version" => "1",
    "@timestamp" => "2016-05-17T09:17:21.745Z",
       "host" => "vcyber",
     "clientip" => "192.168.6.25",
      "identd" => "-",
       "auth" => "-",
    "timestamp" => "24/Apr/2016:01:25:53 +0800",
   "http_method" => "GET",
     "request" => "\"/js/common/jquery-1.10.2.min.js\"",
  "request_query" => "\"\"",
       "port" => 8080,
    "statusCode" => 304,
      "bytes" => 0,
     "reqTime" => 67,
     "referer" => "\"http://10.1.8.193:8080/goLogin\"",
    "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\""
}


join

  • 值是 hash
  • 无默认值

用分隔符连接数组. 如果字段不是数组,那什么都不做。

示例:

filter { mutate {  join =>{"fieldname"=>","}}}

lowercase

  • 值是数组 array
  • 没有默认值

把字符串转换成小写。

例如:

filter { mutate {  lowercase =>["fieldname"]}}

merge

  • 值是 hash
  • 无默认值

Merge two fields of arrays or hashes. String fields will be automatically be converted into an array, so:

`array` + `string` will work`string` + `string` will result in an 2 entry array in `dest_field``array` and `hash` will not work

示例:

filter { mutate {   merge =>{"dest_field"=>"added_field"}}}

periodic_flush

  • 值是 boolean
  • 默认值是 false

Call the filter flush method at regular interval. Optional.

remove_field

  • 值是数组 array
  • 默认值是数组 []

移除字段。

示例:

filter { mutate {  remove_field =>["foo_%{somefield}"]}}
# You can also remove multiple fields at once:filter { mutate {  remove_field =>["foo_%{somefield}","my_extraneous_field"]}}

If the event has field "somefield" == "hello" this filter, on success, would remove the field with namefoo_hello if it is present. The second example would remove an additional, non-dynamic field.

remove_tag

  • 值是数组 array
  • 默认值是 []

移除标识

示例:

filter { mutate {  remove_tag =>["foo_%{somefield}"]}}
# You can also remove multiple tags at once:filter { mutate {  remove_tag =>["foo_%{somefield}","sad_unwanted_tag"]}}

rename

  • 值是 hash
  • 无默认值

重命名一个或多个字段。

示例:

input {
    stdin {
    }    
}
filter {
    grok {
        match=>["message","%{IPORHOST:clientip} %{NOTSPACE:identd} %{NOTSPACE:auth} \[%{HTTPDATE:timestamp}\] %{WORD:http_method} %{NOTSPACE:request} %{NOTSPACE:request_query|-} %{NUMBER:port} %{NUMBER:statusCode} (%{NOTSPACE:bytes}|-) %{NUMBER:reqTime} %{QS:referer} %{QS:userAgent}"]
    }
    mutate {
        rename=>{"clientip"=>"host"}
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}


得到如下结果:

{
     "message" => "192.168.6.25 - - [24/Apr/2016:01:25:53 +0800] GET \"/goLogin\" \"\" 8080 200 1692 23 \"http://10.1.8.193:8080/goMain\" \"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\"",
     "@version" => "1",
    "@timestamp" => "2016-05-17T09:29:44.018Z",
       "host" => "192.168.6.25",
      "identd" => "-",
       "auth" => "-",
    "timestamp" => "24/Apr/2016:01:25:53 +0800",
   "http_method" => "GET",
     "request" => "\"/goLogin\"",
  "request_query" => "\"\"",
       "port" => "8080",
    "statusCode" => "200",
      "bytes" => "1692",
     "reqTime" => "23",
     "referer" => "\"http://10.1.8.193:8080/goMain\"",
    "userAgent" => "\"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:46.0) Gecko/20100101 Firefox/46.0\""
}


Grok 里,客户端IP本来叫 clientip,但是可以在 mutate 里重新命名为 host。

replace

  • 值是 hash
  • 无默认值

Replace a field with a new value. The new value can include %{foo} strings to help you build a new value from other parts of the event.

Example:

filter { mutate {  replace =>{"message"=>"%{source_host}: My new message"}}}

split

  • 值是 hash
  • 无默认值

用分隔符或字符分割一个字符串。只能应用在字符串上。

示例:

filter { mutate {   split =>{"fieldname"=>","}}}

strip

  • 值是数组 array
  • 无默认值

去掉字段首尾的空格。

示例:

filter { mutate {   strip =>["field1","field2"]}}

update

  • 值是 hash
  • 无默认值

Update an existing field with a new value. If the field does not exist, then no action will be taken.

Example:

filter { mutate {  update =>{"sample"=>"My new message"}}}

uppercase

  • 值是数组 array
  • 无默认值

把字符串转换成大写。

示例:

filter { mutate {  uppercase =>["fieldname"]}}