考虑到发布和订阅消息的实时性问题,有时需要消费者重新消费之前的历史日志消息,所以后面肯定需要使用kafka替换掉原来使用的redis,所以这里暂时先mark下logstash清洗过滤日志以后发布到kafka主题中的方式
将日志数据导入到kafka
input {
beats {
port => 5044
codec => plain {
charset => "UTF-8"
}
}
}
filter {
if "nginx-logs" in [tags] {
grok {
match => ["message", '%{IP:client} (%{WORD:ident}|-) (%{USERNAME:auth}|-) \[%{HTTPDATE:timestamp}\] \"(%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion})\" %{NUMBER:response_code:int} (?:%{NUMBER:response_time:int}|-) "(?:%{URI:referrer}|-)" (?:%{QS:user_agent}|-) \"(%{WORD:x_forword_for}|-)\"']
}
mutate {
remove_field => ["message"]
}
}
if "beats_input_codec_plain_applied" in [tags] {
mutate {
remove_tag => ["beats_input_codec_plain_applied"]
}
}
}
output {
if "nginx-logs" in [tags] {
kafka {
bootstrap_servers => "122.51.230.106:9092"
topic_id => "nginx-log"
batch_size => 5
codec => "json"
}
}
}将kafka中日志写入到es中
input {
kafka {
bootstrap_servers => "122.51.230.106:9092"
topics => ["nginx-log"]
batch_size => 5
codec => "json"
group_id => "nginx"
consumer_threads => 1
decorate_events => true
}
}
elasticsearch {
hosts => "122.51.230.106:9200"
manage_template => false
index => "nginx-log-%{+YYYY.MM.dd}"
#document_type => "%{[@metadata][type]}"
}我们在消费kafka中的消息时需要定义一个消费者组,不填写的话默认会分配到一个group的消费者分组,在同一个分组中只会有一个消费者能够消费到一个topic下的消息,所以我们这里需要注意,一定要填写一个和其他消费端不要一致
内容版权声明:除非注明,否则皆为本站原创文章。
转载注明出处:https://sulao.cn/post/751
评论列表