curl -O https://artifacts.elastic.co/downloads/beats/filebeat/logstash-8.9.2-linux-x86_64.tar.gz

tar -xzvf logstash-8.9.2-linux-x86_64.tar.gz

mv logstash-8.9.2-linux-x86_64 logstash

cd logstash/config

vi logstash.yml

# 文末附内容

vi kafka_into_es.conf

# 文末附内容

cd ..

bin/logstash -f config/kafka_into_es.conf

logstash.yml

node.name: 1eventlog pipeline.id: 1eventlog

xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.username: logstash_system xpack.monitoring.elasticsearch.password: Glkib_1 xpack.monitoring.elasticsearch.hosts: ["https://10.1.5.13:9200","https://10.1.5.14:9200","https://10.1.5.15:9200"] xpack.monitoring.elasticsearch.ssl.certificate_authority: "/home/app/logstash/config/elasticsearch-ca.pem" xpack.monitoring.elasticsearch.ssl.verification_mode: certificate xpack.monitoring.elasticsearch.sniffing: false

 kafka_into_es.conf

 input {     kafka {         bootstrap_servers => "10.1.5.9:9092"         group_id => "oracle_logs"         client_id => "oracle_log"         id => "oracle_log"         topics => ["oracle_database","oracle_listener","oracle_asm"]         codec => json {            charset => "UTF-8"         }         consumer_threads => 1         add_field => { "[@metadata][appname]" => "oraclelog" }     }     kafka {         bootstrap_servers => "10.1.5.9:9092"         group_id => "oracle_logs"         client_id => "oracle_log1"         id => "oracle_log1"         topics => ["oracle_clusterware"]         codec => json {            charset => "UTF-8"         }         consumer_threads => 1         add_field => { "[@metadata][appname]" => "oracle_log" }     } } filter {     if [@metadata][appname] == "oraclelog" {       grok{         match => ["message","(?m)%{DAY}\ %{MONTH:month}\ %{MONTHDAY:day}\ %{TIME:time}\ %{YEAR:year}\n%{GREEDYDATA:info}[^\n]+"]       }       mutate {         add_field => {"timestamp" => "%{year} %{month} %{day} %{time}"}       }       date {         match => ["timestamp", "yyyy MMM dd HH:mm:ss"]       }       mutate {         gsub => ["message", "\\x", "\\\x"]         remove_field => ["@version","agent","event","ecs","input","tags","year","month","day","time","timestamp","message"]       }     }     if [@metadata][appname] == "oracle_log" {       grok{         match => ["message","%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day}\ %{TIME:time}\:\ \n%{GREEDYDATA:info}[^\n]+"]       }       mutate {         add_field => {"timestamp" => "%{year} %{month} %{day} %{time}"}       }       date {         match => ["timestamp", "yyyy MM dd HH:mm:ss.SSS"]       }       mutate {         gsub => ["message", "\\x", "\\\x"]         remove_field => ["@version","agent","event","ecs","input","tags","year","month","day","time","timestamp","message"]       }     } } output {     if [@metadata][appname] == "oraclelog" {         elasticsearch {             hosts => ["10.1.5.13:9200","10.1.5.14:9200","10.1.5.15:9200"]             user => "elastic"             password => "app!236"             ssl_enabled => true             ssl_certificate_authorities => "/home/app/logstash/config/elasticsearch-ca.pem"             index => "%{[@metadata][appname]}-%{+YYYY}"         }     } else if [@metadata][appname] == "oracle_log" {         elasticsearch {             hosts => ["10.1.5.13:9200","10.1.5.14:9200","10.1.5.15:9200"]             user => "elastic"             password => "app!236"             ssl_enabled => true             ssl_certificate_authorities => "/home/app/logstash/config/elasticsearch-ca.pem"             index => "oraclelog-%{+YYYY}"         }     } }

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: