Logstash是Elastic stack 中的一个开源组件,其不仅能够对日志进行抓取收集,还能对抓取的日志进行过滤输出。Logstash的过滤插件有多种,如:grok、date、json、geoip等等。其中最为常用的为grok正则表达式过滤
grok的匹配语法分为两种:grok自带的基本匹配模式、用户自定义的匹配模式。
Grok的基本匹配模式
Grok模块提供了默认内嵌了一些基本匹配模式,其使用语法为:
%{SYNTAX:SEMANTIC}
其中SYNTAX为匹配模式的名称,用于调用相应的匹配模式匹配文本,如:3.44 会被NUBER模式所匹配,而10.10.10.1会被IP模式所匹配。
而SEMANTIC则是用于标记被匹配到的文本内容,如10.10.10.1被IP模式所匹配,并编辑为ClientIP。
其使用例子:
%{NUMBER:duration} %{IP:ClientIP}
Grok的自定义模式
Grok模块是基于正则表达式做匹配的,因此当基本匹配模式不足以满足我们的需求时,我们还自定义模式编译相应的匹配规则。
Grok的自定义模式的语法:
(?<field_name>the pattern here)
其中filed_name为自定义模式的名称,pattern即指正则表达式。
如:
#匹配时间的自定义模式
(?<Date>(\d*[./-]\d*[./-]\d* \d*:\d*:\d*[.,][0-9]+))
grok支持下述的过滤配置选项
选项 | 类型 | 是否为必须 | 描述 |
---|---|---|---|
break_on_match | 布尔型 | 否 | 默认值为true,只会匹配第一个符合匹配条件的值,如果需要匹配多个值,则需要设置为false |
keep_empty_captures | 布尔型 | 否 | 默认值为false,如果为true,则保留空字段为事件字段 |
match | 哈希型 | 否 | 意思为匹配一个字段的哈希值,单一字段可设置匹配多个匹配模式 |
named_captures_only | 布尔型 | 否 | 默认值为true,意味着只保存从grok中获取的名称 |
overwrite | 数组 | 否 | 此选项用于复写字段中的值 |
pattern_definitions | 哈希型 | 否 | 定义被当前过滤器所使用的自动模式的名称和元组名称,如果命名的名称已存在,则会覆盖此前配置 |
patterns_dir | 数组 | 否 | 指定用于保存定义好的匹配模式的文件目录 |
patterns_files_glob | 字符串 | 否 | 用于在patterns_dir指定的目录中过滤匹配的文件 |
tag_on_failure | 数组 | 否 | 默认值为_grokparsefailure,当匹配不成功时追加指定值到tags字段 |
tag_on_timeout | 字符串 | 否 | 默认值为_groktimeout,当grok正则表达式匹配超时追加的tag |
timeout_millis | 数值 | 否 | 默认值为30000毫秒,当正则匹配运行超过指定的时间后,尝试终结此匹配操作。设置为0将关闭超时 |
下述选项是被所有过滤插件都支持的通用选项。
选项 | 类型 | 是否为必须 | 描述 |
---|---|---|---|
add_field | 哈希型 | 否 | 如果此过滤选项匹配成功,则会向匹配的事件中添加指定的字段,字段名和内容可以调用相关的变量进行定义命名 |
add_tag | 数组 | 否 | 用于当过滤成功时,向匹配的事件中添加tag |
enable_metric | 布尔型 | 否 | 默认值为true,默认情况下,启用或禁用此功能,能记录特定插件的相关度量值。 |
id | 字符串 | 否 | 添加一个唯一ID到指定的插件配置中,当有多个同一类型的插件时,可更好地去区别监控logstash |
periodic_flush | 布尔型 | 否 | 默认值为false,可选项,用于在规定的间隔时间调用过滤器的刷新功能 |
remove_field | 数组 | 否 | 当此插件匹配成功时,从事件中移除指定的字段 |
remove_tag | 数组 | 否 | 当此插件匹配成功时,从事件中移除指定的tags |
下面是grok基本表达式映射
名称 | 含义 | 示例 |
---|---|---|
USERNAME 或 USER | 用户名,由数字、大小写及特殊字符(._-)组成的字符串 | |
EMAILLOCALPART | 电子邮件用户名部分 | abc-123等 |
EMAILADDRESS | 电子邮件 | abc-123@163.com |
HTTPDUSER | Apache服务器的用户 | |
INT | 包括0和正负整数 | |
BASE10NUM 或 NUMBER | 十进制数字,包括整数和小数 | 18、5.23等 |
BASE16NUM | 十六进制数字,整数 | 0x0045fa2d、-0x3F8709等 |
BASE16FLOAT | 十六进制数字,整数和小数 | |
WORD | 字符串,包括数字和大小写字母 | ILoveYou等 |
NOTSPACE | 不带任何空格的字符串 | |
SPACE | 空格字符串 | |
QUOTEDSTRING 或 QS | 带引号的字符串 | ‘What is your name?’ |
UUID | 标准UUID | 550E8400-E29B-11D4-A716-446655440000 |
MAC | MAC地址 | |
IP | IPv4或IPv6地址 | 127.0.0.1、FE80:0000:0000:0000:AAAA:0000:00C2:0002等 |
HOSTNAME | 主机名称 | |
IPORHOST | IP或者主机名称 | |
HOSTPORT | 主机名(IP)+端口 | 127.0.0.1:3306、api.stozen.net:8000等 |
PATH | Unix系统或者Windows系统里的路径格式 | /usr/local/nginx/sbin/nginx、c:\windows\system32\clr.exe等 |
URIPROTO | URI协议 | http、ftp等 |
URIHOST | URI主机 | www.sulao.cn、10.0.0.1:22等 |
URIPATH | URI路径 | //www.sulao.cn/abc/、/api.php等 |
URIPARAM | URI里的GET参数 | ?a=1&b=2&c=3 |
URIPATHPARAM | URI路径+GET参数 | //www.sulao.cn/abc/api.php?a=1&b=2&c=3 |
URI | 完整的URI | http://www.sulao.cn/abc/api.php?a=1&b=2&c=3 |
MONTH | 月份名称 | Jan、January等 |
MONTHNUM | 月份数字 | 4 |
MONTHDAY | 日期数字 | 31 |
DAY | 星期几名称 | Mon、Monday等 |
YEAR | 年份数字 | 1992 |
HOUR | 小时数字 | 12 |
MINUTE | 分钟数字 | 59 |
SECOND | 秒数字 | 47 |
TIME | 时间 | 00:01:23 |
DATE_US | 美国日期格式 | 10-15-1982、10/15/1982等 |
DATE_EU | 欧洲日期格式 | 15-10-1982、15/10/1982、15.10.1982 |
ISO8601_TIMEZONE | ISO8601时间格式 | +10:23、-1023 |
TIMESTAMP_ISO8601 | ISO8601时间戳格式 | 2016-07-03T00:34:06+08:00 |
DATE | 美国日期或者欧洲日期 | %{DATE_US}%{DATE_EU} |
DATESTAMP | 完整日期+时间 | 07-03-2016 00:34:06 |
HTTPDATE | http默认日期格式 | 03/Jul/2016:00:36:53 +0800 |
LOGLEVEL | 日志等级 | Alert、alert、ALERT、Error等 |
下面我来看看grok使用的例子,我们来匹配下nginx的日志
113.57.176.61 - - [07/Nov/2019:10:06:08 +0800] "POST /zabbix.php?sid=f7e1f4f8d197181a&action=widget.problems.view HTTP/1.1" 200 5385 "http://49.233.176.23:8888/zabbix.php?action=dashboard.view" "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0" "-"
可以通过下面的grok正则匹配
%{IP:client} (%{WORD:ident}|-) (%{USERNAME:auth}|-) \[%{HTTPDATE:timestamp}\] \"(%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion})\" %{NUMBER:response_code} (?:%{NUMBER:response_time}|-) "(?:%{URI:referrer}|-)" (?:%{QS:user_agent}|-) \"(%{WORD:x_forword_for}|-)\"
匹配的结果是
{ "client": [ [ "113.57.176.61" ] ], "IPV6": [ [ null, null ] ], "IPV4": [ [ "113.57.176.61", null ] ], "ident": [ [ null ] ], "auth": [ [ "-" ] ], "timestamp": [ [ "07/Nov/2019:10:06:08 +0800" ] ], "MONTHDAY": [ [ "07" ] ], "MONTH": [ [ "Nov" ] ], "YEAR": [ [ "2019" ] ], "TIME": [ [ "10:06:08" ] ], "HOUR": [ [ "10" ] ], "MINUTE": [ [ "06" ] ], "SECOND": [ [ "08" ] ], "INT": [ [ "+0800" ] ], "method": [ [ "POST" ] ], "request": [ [ "/zabbix.php?sid=f7e1f4f8d197181a&action=widget.problems.view" ] ], "httpversion": [ [ "1.1" ] ], "BASE10NUM": [ [ "1.1", "200", "5385" ] ], "response_code": [ [ "200" ] ], "response_time": [ [ "5385" ] ], "referrer": [ [ "http://49.233.176.23:8888/zabbix.php?action=dashboard.view" ] ], "URIPROTO": [ [ "http" ] ], "USER": [ [ null ] ], "USERNAME": [ [ null ] ], "URIHOST": [ [ "49.233.176.23:8888" ] ], "IPORHOST": [ [ "49.233.176.23" ] ], "HOSTNAME": [ [ "49.233.176.23" ] ], "IP": [ [ null ] ], "port": [ [ "8888" ] ], "URIPATHPARAM": [ [ "/zabbix.php?action=dashboard.view" ] ], "URIPATH": [ [ "/zabbix.php" ] ], "URIPARAM": [ [ "?action=dashboard.view" ] ], "user_agent": [ [ ""Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0"" ] ], "QUOTEDSTRING": [ [ ""Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0"" ] ], "x_forword_for": [ [ null ] ] }
我们可以通过一个grok在线的调试进行调试,地址是http://grokdebug.herokuapp.com/,其次官方已经有一些正则匹配模板,我们可以在这里获取https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
上面一个例子已经匹配出来很多字段,其实使用匹配是可以让我们有更多的字段信息可以使用,我们来看看grok的过滤方法
input { beats { port => 5044 codec => plain { charset => "UTF-8" } } } filter { grok { match => ["message", '%{IP:client} (%{WORD:ident}|-) (%{USERNAME:auth}|-) \[%{HTTPDATE:timestamp}\] \"(%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion})\" %{NUMBER:response_code} (?:%{NUMBER:response_time}|-) "(?:%{URI:referrer}|-)" (?:%{QS:user_agent}|-) \"(%{WORD:x_forword_for}|-)\"'] } if "beats_input_codec_plain_applied" in [tags] { mutate { remove_tag => ["beats_input_codec_plain_applied"] } } } output { elasticsearch { hosts => "172.26.61.61:9200" manage_template => false index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" #document_type => "%{[@metadata][type]}" } }
通过上述修改,再登陆kibana查看发现已经多了刚才匹配出来的字段
{ "_index": "filebeat-2019.10.28", "_type": "_doc", "_id": "WtuCEW4BW_ylP464mifs", "_version": 1, "_score": null, "_source": { "method": "POST", "Client": "172.26.61.49", "agent": { "type": "filebeat", "id": "9b6a51a8-a6ed-413a-8f7e-2fe7a172b450", "version": "7.4.0", "ephemeral_id": "35d9e2f2-8e8c-4408-a034-433e310a3018", "hostname": "localhost.localdomain" }, "request": "/zabbix.php?sid=37ae6650e649a1f2&action=widget.problems.view", "@version": "1", "ecs": { "version": "1.1.0" }, "input": { "type": "log" }, "@timestamp": "2019-10-28T08:36:41.241Z", "log": { "offset": 1146484, "file": { "path": "/usr/local/nginx/logs/access.log" } }, "response": "200", "httpversion": "1.1", "tags": [ "nginx-access" ], "referrer": "\"http://172.26.61.58/zabbix.php?action=dashboard.view\"", "timestamp": "28/Oct/2019:16:36:39 +0800", "bytes": "629", "host": { "id": "7017e1f0128944ceaedb6189b101351c", "name": "localhost.localdomain", "os": { "family": "redhat", "platform": "centos", "name": "CentOS Linux", "version": "7 (Core)", "kernel": "3.10.0-1062.1.2.el7.x86_64", "codename": "Core" }, "containerized": false, "architecture": "x86_64", "hostname": "localhost.localdomain" }, "message": "172.26.61.49 - - [28/Oct/2019:16:36:39 +0800] \"POST /zabbix.php?sid=37ae6650e649a1f2&action=widget.problems.view HTTP/1.1\" 200 629 \"http://172.26.61.58/zabbix.php?action=dashboard.view\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0\"" }, "fields": { "@timestamp": [ "2019-10-28T08:36:41.241Z" ] }, "sort": [ 1572251801241 ] }