curl -O https://artifacts.elastic.co/downloads/beats/filebeat/logstash-8.9.2-linux-x86_64.tar.gz
tar -xzvf logstash-8.9.2-linux-x86_64.tar.gz
mv logstash-8.9.2-linux-x86_64 logstash
cd logstash/config
vi logstash.yml
# 文末附内容
vi kafka_into_es.conf
# 文末附内容
cd ..
bin/logstash -f config/kafka_into_es.conf
logstash.yml
node.name: 1eventlog pipeline.id: 1eventlog
xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.username: logstash_system xpack.monitoring.elasticsearch.password: Glkib_1 xpack.monitoring.elasticsearch.hosts: ["https://10.1.5.13:9200","https://10.1.5.14:9200","https://10.1.5.15:9200"] xpack.monitoring.elasticsearch.ssl.certificate_authority: "/home/app/logstash/config/elasticsearch-ca.pem" xpack.monitoring.elasticsearch.ssl.verification_mode: certificate xpack.monitoring.elasticsearch.sniffing: false
kafka_into_es.conf
input { kafka { bootstrap_servers => "10.1.5.9:9092" group_id => "oracle_logs" client_id => "oracle_log" id => "oracle_log" topics => ["oracle_database","oracle_listener","oracle_asm"] codec => json { charset => "UTF-8" } consumer_threads => 1 add_field => { "[@metadata][appname]" => "oraclelog" } } kafka { bootstrap_servers => "10.1.5.9:9092" group_id => "oracle_logs" client_id => "oracle_log1" id => "oracle_log1" topics => ["oracle_clusterware"] codec => json { charset => "UTF-8" } consumer_threads => 1 add_field => { "[@metadata][appname]" => "oracle_log" } } } filter { if [@metadata][appname] == "oraclelog" { grok{ match => ["message","(?m)%{DAY}\ %{MONTH:month}\ %{MONTHDAY:day}\ %{TIME:time}\ %{YEAR:year}\n%{GREEDYDATA:info}[^\n]+"] } mutate { add_field => {"timestamp" => "%{year} %{month} %{day} %{time}"} } date { match => ["timestamp", "yyyy MMM dd HH:mm:ss"] } mutate { gsub => ["message", "\\x", "\\\x"] remove_field => ["@version","agent","event","ecs","input","tags","year","month","day","time","timestamp","message"] } } if [@metadata][appname] == "oracle_log" { grok{ match => ["message","%{YEAR:year}\-%{MONTHNUM:month}\-%{MONTHDAY:day}\ %{TIME:time}\:\ \n%{GREEDYDATA:info}[^\n]+"] } mutate { add_field => {"timestamp" => "%{year} %{month} %{day} %{time}"} } date { match => ["timestamp", "yyyy MM dd HH:mm:ss.SSS"] } mutate { gsub => ["message", "\\x", "\\\x"] remove_field => ["@version","agent","event","ecs","input","tags","year","month","day","time","timestamp","message"] } } } output { if [@metadata][appname] == "oraclelog" { elasticsearch { hosts => ["10.1.5.13:9200","10.1.5.14:9200","10.1.5.15:9200"] user => "elastic" password => "app!236" ssl_enabled => true ssl_certificate_authorities => "/home/app/logstash/config/elasticsearch-ca.pem" index => "%{[@metadata][appname]}-%{+YYYY}" } } else if [@metadata][appname] == "oracle_log" { elasticsearch { hosts => ["10.1.5.13:9200","10.1.5.14:9200","10.1.5.15:9200"] user => "elastic" password => "app!236" ssl_enabled => true ssl_certificate_authorities => "/home/app/logstash/config/elasticsearch-ca.pem" index => "oraclelog-%{+YYYY}" } } }
参考文章
发表评论