ELK中logstash从kafka中读写数据

考虑到发布和订阅消息的实时性问题,有时需要消费者重新消费之前的历史日志消息,所以后面肯定需要使用kafka替换掉原来使用的redis,所以这里暂时先mark下logstash清洗过滤日志以后发布到kafka主题中的方式

将日志数据导入到kafka

input {
  beats {
    port => 5044
    codec => plain {
          charset => "UTF-8"
    }
  }
}
filter {
  if "nginx-logs" in [tags] {
    grok {
      match => ["message", '%{IP:client} (%{WORD:ident}|-) (%{USERNAME:auth}|-) \[%{HTTPDATE:timestamp}\] \"(%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion})\" %{NUMBER:response_code:int} (?:%{NUMBER:response_time:int}|-) "(?:%{URI:referrer}|-)" (?:%{QS:user_agent}|-) \"(%{WORD:x_forword_for}|-)\"'] 
    }
    mutate {
      remove_field => ["message"]
    }
  }

  if "beats_input_codec_plain_applied" in [tags] {
    mutate {
      remove_tag => ["beats_input_codec_plain_applied"]
    }
  }
}

output {
  if "nginx-logs" in [tags] {
    kafka {
      bootstrap_servers => "122.51.230.106:9092"
      topic_id => "nginx-log"
      batch_size => 5
      codec => "json"
    }
  }
}

将kafka中日志写入到es中

input {
  kafka {
     bootstrap_servers => "122.51.230.106:9092"
     topics => ["nginx-log"]
     batch_size => 5
     codec => "json"
     group_id => "nginx" 
     consumer_threads => 1
     decorate_events => true 
  }
}

elasticsearch {
  hosts => "122.51.230.106:9200"
  manage_template => false
  index => "nginx-log-%{+YYYY.MM.dd}"
  #document_type => "%{[@metadata][type]}"
}

我们在消费kafka中的消息时需要定义一个消费者组,不填写的话默认会分配到一个group的消费者分组,在同一个分组中只会有一个消费者能够消费到一个topic下的消息,所以我们这里需要注意,一定要填写一个和其他消费端不要一致

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://sulao.cn/post/754.html

我要评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。