考虑到发布和订阅消息的实时性问题,有时需要消费者重新消费之前的历史日志消息,所以后面肯定需要使用kafka替换掉原来使用的redis,所以这里暂时先mark下logstash清洗过滤日志以后发布到kafka主题中的方式
将日志数据导入到kafka
input { beats { port => 5044 codec => plain { charset => "UTF-8" } } } filter { if "nginx-logs" in [tags] { grok { match => ["message", '%{IP:client} (%{WORD:ident}|-) (%{USERNAME:auth}|-) \[%{HTTPDATE:timestamp}\] \"(%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion})\" %{NUMBER:response_code:int} (?:%{NUMBER:response_time:int}|-) "(?:%{URI:referrer}|-)" (?:%{QS:user_agent}|-) \"(%{WORD:x_forword_for}|-)\"'] } mutate { remove_field => ["message"] } } if "beats_input_codec_plain_applied" in [tags] { mutate { remove_tag => ["beats_input_codec_plain_applied"] } } } output { if "nginx-logs" in [tags] { kafka { bootstrap_servers => "122.51.230.106:9092" topic_id => "nginx-log" batch_size => 5 codec => "json" } } }
将kafka中日志写入到es中
input { kafka { bootstrap_servers => "122.51.230.106:9092" topics => ["nginx-log"] batch_size => 5 codec => "json" group_id => "nginx" consumer_threads => 1 decorate_events => true } } elasticsearch { hosts => "122.51.230.106:9200" manage_template => false index => "nginx-log-%{+YYYY.MM.dd}" #document_type => "%{[@metadata][type]}" }
我们在消费kafka中的消息时需要定义一个消费者组,不填写的话默认会分配到一个group的消费者分组,在同一个分组中只会有一个消费者能够消费到一个topic下的消息,所以我们这里需要注意,一定要填写一个和其他消费端不要一致