Filter可以对Logstash内部的数据也就是之前提到的Event进行处理.比如字段的删除,字段的类型转换等等.功能极其强大~

处理方式

常用方式说明
date日期解析
grok正则匹配解析
dissect分隔符解析
mutate字段处理
json按照json解析字段内容到指定字段中
geoip增加地理位置数据
ruby用ruby代码修改LogstashEvent

时间格式处理

  创建执行的配置文件filterDate.conf

input{
  stdin{
    codec => json
  }
}
filter{
  date{
    match => [ "error", "MMM dd yyyy HH:mm:ss" ]
  }
}
output{
  stdout{
    codec => plain
  }
}
# 建议在输出的时候使用rubydebug方便查看

  测试的json数据{"error":"Jul 25 2019 17:01:01"}.返回结果以rubydebug方式展示

{
    "@timestamp" => 2019-07-25T09:01:01.000Z,
          "host" => "erhuadaangdeMBP",
      "@version" => "1",
         "error" => "Jul 25 2019 17:01:01"
}

  可以看到多返回了一个@timestamp的字段.这个时间是少了8个小时的.时间类型有很多种.在match的匹配中可以都列举出来.

  切记:match中的时间类型必须是文档中存在的.自己瞎写会抱看不懂的错误信息...

# 匹配error_time可能出现的2种时间.可以写多种
# 将转换的时间戳设置为新字段
# 定义时区
filter{
  date{
    match => [
      "error",
      "MMM dd yyyy HH:mm:ss",
      "ISO8601"
    ]
    target => 'newtime'
  }
}

时区问题

  不想复制文档的一大段话了.可以自行去官网阅读.如果非要改的话可以按照下面的配置进行复制.只+00.不用做其他时间修改.

filter{
  date{
    match => [
      "error",
      "MMM dd yyyy HH:mm:ss",
      "ISO8601"
    ]
    locale => "en"
    timezone => "+00:00"
  }
}

Grok

   grok是一个带有名字的正则表达式集合.比如USERNAME的正则表达式是[a-zA-Z0-9._-].在grok中就可以直接使用USERNAME来替代正则表达式.简写的方式减少了代码的长度.非常棒哦~对应的文档可以在这里查看

  语法为:%{SYNTAX:SEMANTIC:TYPE}

说明
SYNTAX匹配的名称
SEMANTIC赋值字段的名称
TYPE: int or fload结果都为字符串,只可以转换成整形和浮点型.

  在git的文档中是很久没有Nginx的pattern.因为在使用正则表达式的时候会很长,在写配置看到一大坨的东西也很反感.所以最好的办法就是将我们的配置自定义名称.去使用.

# logstash-patterns位置
cd /usr/local/Cellar/logstash/vendor
cd bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
ll
-rw-r--r--  1 qvbilam  admin   1.8K  6 17 23:14 aws
-rw-r--r--  1 qvbilam  admin   4.7K  6 17 23:14 bacula
-rw-r--r--  1 qvbilam  admin   260B  6 17 23:14 bind
-rw-r--r--  1 qvbilam  admin   2.1K  6 17 23:14 bro
-rw-r--r--  1 qvbilam  admin   879B  6 17 23:14 exim
-rw-r--r--  1 qvbilam  admin   9.9K  6 17 23:14 firewalls
-rw-r--r--  1 qvbilam  admin   5.2K  6 17 23:14 grok-patterns
-rw-r--r--  1 qvbilam  admin   3.2K  6 17 23:14 haproxy
-rw-r--r--  1 qvbilam  admin   987B  6 17 23:14 httpd
-rw-r--r--  1 qvbilam  admin   1.2K  6 17 23:14 java
-rw-r--r--  1 qvbilam  admin   1.1K  6 17 23:14 junos
-rw-r--r--  1 qvbilam  admin   1.0K  6 17 23:14 linux-syslog
-rw-r--r--  1 qvbilam  admin    74B  6 17 23:14 maven
-rw-r--r--  1 qvbilam  admin    49B  6 17 23:14 mcollective
-rw-r--r--  1 qvbilam  admin   190B  6 17 23:14 mcollective-patterns
-rw-r--r--  1 qvbilam  admin   614B  6 17 23:14 mongodb
-rw-r--r--  1 qvbilam  admin   9.4K  6 17 23:14 nagios
-rw-r--r--  1 qvbilam  admin   142B  6 17 23:14 postgresql
-rw-r--r--  1 qvbilam  admin   845B  6 17 23:14 rails
-rw-r--r--  1 qvbilam  admin   224B  6 17 23:14 redis
-rw-r--r--  1 qvbilam  admin   188B  6 17 23:14 ruby
-rw-r--r--  1 qvbilam  admin   404B  6 17 23:14 squid

  下面是我从Nginx随机拿到的一条访问日志.通过Grok对日志进行简单的格式化

106.11.152.49 - - [25/Jul/2019:17:09:01 +0800] "GET /static/index/css/fonts/font-awesome/fontawesome-webfont.woff2?v=4.4.0 HTTP/1.1" 200 64464 "http://angel.qvbilam.xin/static/index/css/fonts/font-awesome.css" "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 YisouSpider/5.0 Safari/537.36"

  对应Nginx的日志来写自己的grok表达式.日志一定要拆分,分成多个部分去匹配。成功后再拼接起来,千万不要急于求成.每个人的Nginx日志可能多少有些不同.建议按照自己的需求来.不要一昧的复制粘贴~

#vim nginxtest.conf
input{
  http{
    port => 8601
  }
}
filter{
  grok{
    match => {"message" => '%{IPORHOST:clientip}'}
  }
}
output{
  stdout{
    codec => rubydebug
  }
}

  逐条内容匹配测试.

匹配内容表达式
106.11.152.49%{IPORHOST:clientip}
- - (就是用户,用户组)%{USER:ident} %{USER:auth}
[25/Jul/2019:17:09:01 +0800][%{HTTPDATE:timestamp}]
"GET"%{WORD:request_method}
/static/index/css/fonts/font-awesome/fontawesome-webfont.woff2?v=4.4.0%{URIPATHPARAM:request}
HTTP/1.1"HTTP/%{NUMBER:httpversion}"
200(服务端状态)%{INT:status}
64464(返回内容字节)%{INT:body_bytes_sent}
"http://angel.qvbilam.xin/static/index/css/fonts/font-awesome.css""%{DATA:referer}"
"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 YisouSpider/5.0 Safari/537.36""%{GREEDYDATA:agent}"

  将拼接好的正则写入到之前的patterns目录中

# cd /usr/local/Cellar/logstash/vendor
# cd bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
# vim nginx
NGINXASSCESS %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:request_method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{INT:status} %{INT:body_bytes_sent} "%{DATA:referer}" "%{GREEDYDATA:agent}"

  这里的配置中有双引号,可以加个转意或者在grok中调用的使用用单引号将NGINXASSCESS引用起来。修改logstash启动的配置文件nginx.conf

# vim nginx.conf
input{
  http{
    port => 8601
  }
}
filter{
  grok{
    match => {"message" => '%{NGINXACCESS}'}
  }
}
output{
  stdout{
    codec => rubydebug
  }
}

# 热启动
logstash -f nginxtest.conf -r

  访问http://127.0.0.01:8601请求参数为之前的日志

2019-12-28T15:29:06.png

  返回结果

{
               "auth" => "-",
             "status" => "200",
    "body_bytes_sent" => "64464",
              "agent" => "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 YisouSpider/5.0 Safari/537.36",
         "@timestamp" => 2019-07-26T04:03:26.089Z,
           "clientip" => "106.11.152.49",
           "@version" => "1",
            "referer" => "http://angel.qvbilam.xin/static/index/css/fonts/font-awesome.css",
          "timestamp" => "25/Jul/2019:17:09:01 +0800",
     "request_method" => "GET",
            "headers" => {
           "http_version" => "HTTP/1.1",
        "http_user_agent" => "PostmanRuntime/7.6.0",
           "request_path" => "/",
           "content_type" => "text/plain",
              "http_host" => "127.0.0.1:8601",
         "content_length" => "341",
          "cache_control" => "no-cache",
         "request_method" => "POST",
            "http_accept" => "*/*",
          "postman_token" => "a048f980-07f4-48d1-96a7-00d1124a3af9",
        "accept_encoding" => "gzip, deflate",
             "connection" => "keep-alive"
    },
            "request" => "/static/index/css/fonts/font-awesome/fontawesome-webfont.woff2?v=4.4.0",
               "host" => "127.0.0.1",
              "ident" => "-",
            "message" => "106.11.152.49 - - [25/Jul/2019:17:09:01 +0800] \"GET /static/index/css/fonts/font-awesome/fontawesome-webfont.woff2?v=4.4.0 HTTP/1.1\" 200 64464 \"http://angel.qvbilam.xin/static/index/css/fonts/font-awesome.css\" \"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 YisouSpider/5.0 Safari/537.36\"",
        "httpversion" => "1.1"
}

  结果还是很理想的其中headers字段是HTTP请求的参数,不想显示可以隐藏.这里只是简单的解析.对于字段的处理将在后续的文章中分享出来~

Dissect

  无论在哪正则表达式的匹配都是会消耗很多CPU资源.而Dissect是以分隔符原理将数据解析,它的性能会比Grok快很多,官方提供的Dissect与Grok的性能对比:Dissect是Grok的三倍左右!但是Dissect的局限性是只适用于每行格式相似,分隔符相同且简单的场景.

语法

语法/参数说明
%{field}string%{field}基本格式
%{field}字段
%{+field}追加字段
%{+fileld/num}追加字段设置顺序
string分隔符
%{?field}将匹配值设置成key
%{&field}将匹配值设置成value
%{field}string%{field}
# field:字段
# +field:追加字段
# +fileld/num:追加字段设置顺序
# string:分隔符
# 如果大括号内不写field相当于占位符,不会输出

例一

  假设有如下日志,希望解析出时间以及用户说的内容

# 我就是假设内容的注释
[2019-08-02 15:18:21]qvbilam:love you.

  新建配置文件

# vim dissect_test.conf
input{
  stdin{
    codec => line
  }
}
filter{
  dissect{
    mapping => {"message" => "[%{Y}-%{m}-%{d} %{H}:%{i}:%{s}]%{user}:%{connect}."}
  }
}
output{
  stdout{
    codec => rubydebug
  }
}
# logstash -f dissect_test.conf

  返回解析结果

{
             "H" => "15",
             "Y" => "2019",
       "connect" => "love you",
             "i" => "18",
             "d" => "02",
      "@version" => "1",
          "host" => "erhuadaangdeMBP",
          "user" => "qvbilam",
             "s" => "21",
       "message" => "[2019-08-02 15:18:21]qvbilam:love you.",
             "m" => "08",
    "@timestamp" => 2019-08-02T07:37:02.339Z
}

例二

  假设有一段key/value类型的日志

# 我就是假设内容的注释
angel=gy&sb=zcx&num=123

  修改原来的配置文件

# vim dissect_test.conf
input{
  stdin{
    codec => line
  }
}
filter{
  dissect{
    #mapping => {"message" => "[%{Y}-%{m}-%{d} %{H}:%{i}:%{s}]%{user}:%{connect}."}
    mapping => {"message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}&%{?key3}=%{&key3}"}
  }
}
output{
  stdout{
    codec => rubydebug
  }
}
# logstash -f dissect_test.conf

  返回解析结果

{
       "message" => "angel=gy&sb=zcx&num=123",
      "@version" => "1",
            "sb" => "zcx",
           "num" => "123",
          "host" => "erhuadaangdeMBP",
         "angel" => "gy",
    "@timestamp" => 2019-08-02T08:04:07.530Z
}

  可以看出num的类型是string,Dissect也支持转换,需要在convert_datatype中进行

类型转换

# vim dissect_test.conf
input{
  stdin{
    codec => line
  }
}
filter{
  dissect{
    #mapping => {"message" => "[%{Y}-%{m}-%{d} %{H}:%{i}:%{s}]%{user}:%{connect}."}
    mapping => {"message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}&%{?key3}=%{&key3}"}
    # 将num的类型转换成整形
    convert_datatype => { num => "int" }
  }
}
output{
  stdout{
    codec => rubydebug
  }
}
# logstash -f dissect_test.conf

  类型转换测试结果

{
       "message" => "angel=gy&sb=zcx&num=123",
      "@version" => "1",
            "sb" => "zcx",
          "host" => "erhuadaangdeMBP",
         "angel" => "gy",
           "num" => 123,
    "@timestamp" => 2019-08-02T08:07:36.639Z
}

Mutate

  Mutate可以对字段进行各种操作.无非就是增删改等.

常用操作说明
convert类型转换,只支持integer/float/string
gsub字符串替换
split字符串切割成数组
join数组合并成字符串
merge数组合并成数组
rename字段重命名
update/replace字段内容更改/替换
remove_field删除字段

类型转换

# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    # 将age转换成integer,num转换成float
    convert => {"age" => "integer"}
    convert => {"num" => "float"}
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=123&age=16'
# 返回
{
       "message" => "num=123&age=16",
           "num" => 123.0,
           "age" => 16
}

字符串切割

# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    # 将key=string中的@转换成_,!转换成?
    gsub => ["string","@",'_']
    gsub => ["string","!",'?']
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=123&string=a@bc@dd!'
# 返回
{
       "message" => "num=123&string=a@bc@dd!",
           "num" => "123",
        "string" => "a_bc_dd?"
}

字符串转数组

# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    # 按*将字符串切割成数组
    split => {"str_to_arr","*"}
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=123&str_to_arr=1*2*3*4'
#返回
{
       "message" => "num=123&str_to_arr=1*2*3*4",
           "num" => "123",
    "str_to_arr" => [
        [0] "1",
        [1] "2",
        [2] "3",
        [3] "4"
    ]
}

数组转字符串

# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    # 将数据转成数组再转成字符串
    split => ["str_to_arr","*"]
    join => ["str_to_arr",","]
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=123&str_to_arr=1*2*3*4'
# 返回
{
       "message" => "num=123&str_to_arr=1*2*3*4",
           "num" => "123",
    "str_to_arr" => "1,2,3,4"
}

合并数组

# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
        split => ["string1","|"]
    split => ["string2","*"]
    merge => ["string1","string2"]
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'string1=a|b|c&string2=1*2*3'
# 返回
{
       "message" => "string1=a|b|c&string2=1*2*3",
       "string1" => [
        [0] "a",
        [1] "b",
        [2] "c",
        [3] "1",
        [4] "2",
        [5] "3"
    ],
       "string2" => [
        [0] "1",
        [1] "2",
        [2] "3"
    ]
}

字段重命名

# 说明
重命名某个字段,如果目的字段已经存在,会被覆盖掉.
# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    rename => ["num","height"]
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=190&age=16'
# 返回
{
       "message" => "num=190&age=16",
        "height" => "190",
           "age" => "16"
}

字段更新Update

# 说明
更新某个字段的内容.如果字段不存在,不会新建.
# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    update => ["num",100]
    convert => {"num" => "integer" }
    convert => {"age" => "integer" }
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=190&age=16'
#返回
{
       "message" => "num=190&age=16",
           "num" => 100,
           "age" => 16
}

字段更新Replace

# 说明
作用和 update 类似,但是当字段不存在的时候.它会起到 add_field 效果,自动添加新的字段.
# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    replace => ["num_test",100]
    convert => {"num" => "integer" }
    convert => {"age" => "integer" }
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=190&age=16'
# 返回
{
       "message" => "num=190&age=16",
           "num" => 190,
      "num_test" => "100",
           "age" => 16
}

删除字段

# 配置
filter{
  dissect{
    mapping => { "message" => "%{?key1}=%{&key1}&%{?key2}=%{&key2}"  }
  }
  mutate{
    replace => ["num_test",100]
    convert => {"num" => "integer" }
    convert => {"age" => "integer" }
    remove_field => ["num","age"]
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d 'num=190&age=16'
# 返回
{
       "message" => "num=190&age=16",
      "num_test" => "100",
}

Json

  当传输的字段里有一个json的字符串就可以使用Json Pulgin

操作说明
source需要解析的字段名
target解析后存储字段名,默认为/

示例

# vim json.conf
input{
  http{
    port => 8602
  }
}
filter{
  json{
    source => 'message'
    target => 'new_json'
  }
}
output{
  stdout{
    codec => rubydebug
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d '{"name":"qvbilam","age":123}'
# 返回
{
       "message" => "{\"name\":\"qvbilam\",\"age\":123}",   
       "my_json" => {
        "name" => "qvbilam",
         "age" => 123
    }
}

Geoip

  通过IP地址获取对应的地理位置信息,使用很简单,直接上代码演示~

# vim geoip.conf
input{
  http{
    port => 8602
  }
}
filter{
  geoip{
    source => 'message'
  }
}
output{
  stdout{
    codec => rubydebug
  }
}
# 请求
curl http://127.0.0.1:8602 -X POST -d '220.184.204.221'
# 返回
{
      "@version" => "1",
         "geoip" => {
           "region_name" => "Zhejiang",
          "country_name" => "China",
             "longitude" => 120.1619,
         "country_code2" => "CN",
              "latitude" => 30.294,
              "timezone" => "Asia/Shanghai",
                    "ip" => "220.184.204.221",
           "region_code" => "ZJ",
              "location" => {
            "lon" => 120.1619,
            "lat" => 30.294
        },
         "country_code3" => "CN",
             "city_name" => "Hangzhou",
        "continent_code" => "AS"
    },
       "message" => "220.184.204.221"
}

Ruby

  当上述插件无法满足你的需求,并且你也会Ruby的情况下,可以使用该插件随心所欲的修改Logstash Event.

/*
 * 我实在是找不到一个能展示的Ruby示例.
 * 自己也不会写,咸鱼悲伤.
 * 官方文档照抄也不行.
 * 不能随心所欲,嘤嘤嘤.
 * 立个flag:在空闲时间学习Ruby
 */
Last modification:February 18th, 2020 at 10:16 pm