rsyslog

服务器较少时使用,rsyslog日志收集,统一存放在专门存放日志的收集器中;

ELK 简介

ELK平台是一套完整的日志集中处理解决方案,将 ElasticSearch、Logstash 和 Kiabana 三个开源工具配合使用, 完成更强大的用户对日志的查询、排序、统计需求

ElasticSearch:是基于Lucene(一个全文检索引擎的架构)开发的分布式存储检索引擎,用来存储各类日志;(用来存储日志数据和创建全文索引);

Kiabana:Kibana 通常与 Elasticsearch 一起部署,Kibana 是 Elasticsearch 的一个功能强大的数据可视化 Dashboard,Kibana 提供图形化的 web 界面来浏览 Elasticsearch 日志数据,可以用来汇总、分析和搜索重要数据;(用来实现可视化的日志监控);

Logstash:作为数据收集引擎。它支持动态的从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作,然后存储到用户指定的位置,一般会发送给 Elasticsearch。 Logstash 由 Ruby 语言编写,运行在 Java 虚拟机(JVM)上,是一款强大的数据处理工具, 可以实现数据传输、格式处理、格式化输出。Logstash 具有强大的插件功能,常用于日志处理;(数据收集、数据过滤、格式化统一)。

完整日志系统基本特征 收集:能够采集多种来源的日志数据 传输:能够稳定的把日志数据解析过滤并传输到存储系统 存储:存储日志数据 分析:支持 UI 分析

ELK 的工作原理: (1)在所有需要收集日志的服务器上部署Logstash;或者先将日志进行集中化管理在日志服务器上,在日志服务器上部署 Logstash。 (2)Logstash 收集日志,将日志格式化并输出到 Elasticsearch 群集中。 (3)Elasticsearch 对格式化后的数据进行索引和存储。 (4)Kibana 从 ES 群集中查询数据生成图表,并进行前端数据的展示。

总结:logstash作为日志搜集器,从数据源采集数据,并对数据进行过滤,格式化处理,然后交由Elasticsearch存储,kibana对日志进行可视化处理。

安装elasticsearch:

①环境监测:

设置Java环境

java -version                                        #如果没有安装,yum -y install java

1)部署 Elasticsearch 软件

cd /opt rpm -ivh elasticsearch-6.7.2.rpm

2)修改elasticsearch主配置文件(vim /etc/elasticsearch/elasticsearch.yml)

cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak 做个备份

--17--取消注释,指定集群名字 cluster.name: my-elk-cluster --23--取消注释,指定节点名字:Node1节点为node1,Node2节点为node2 node.name: node1 node.master: true        #是否master节点,false为否 node.data: true            #是否数据节点,false为否 --33--取消注释,指定数据存放路径 path.data: /var/lib/elasticsearch --37--取消注释,指定日志存放路径 path.logs: /var/log/elasticsearch --43--取消注释,避免es使用swap交换分区 bootstrap.memory_lock: true --55--取消注释,设置监听地址,0.0.0.0代表所有地址 network.host: 0.0.0.0 --59--取消注释,ES 服务的默认监听端口为9200 http.port: 9200                    #指定es集群提供外部访问的接口 transport.tcp.port: 9300        #指定es集群内部通信接口 --68--取消注释,集群发现通过单播实现,指定要发现的节点 discovery.zen.ping.unicast.hosts: ["192.168.169.10:9300", "192.168.169.30:9300"]

grep -v "^#" /etc/elasticsearch/elasticsearch.yml 过滤监测有效信息

3)es 性能调优参数

vim /etc/security/limits.conf

*  soft    nofile             65536 *  hard    nofile            65536 *  soft    nproc             32000 *  hard    nproc            32000 *  soft    memlock        unlimited *  hard    memlock       unlimited

 vim /etc/systemd/system.conf

DefaultLimitNOFILE=65536 DefaultLimitNPROC=32000 DefaultLimitMEMLOCK=infinity

需重启系统生效reboot!!!!!

vim /etc/sysctl.conf #一个进程可以拥有的最大内存映射区域数,参考数据(分配 2g/262144,4g/4194304,8g/8388608) vm.max_map_count=262144

sysctl -p sysctl -a | grep vm.max_map_count

4)启动elasticsearch是否成功开启 systemctl start elasticsearch.service systemctl enable elasticsearch.service netstat -antp | grep 9200

 ③测试:http://192.168.169.10:9200 和 http://192.168.169.10:9200/_cluster/health?pretty 和http://192.168.169.10:9200/_cluster/state?pretty

使用上述方式查看群集的状态对用户并不友好,可以通过安装 Elasticsearch-head 插件,可以更方便地管理群集:

安装 Elasticsearch-head 插件 Elasticsearch 在 5.0 版本后,Elasticsearch-head 插件需要作为独立服务进行安装,需要使用npm工具(NodeJS的包管理工具)安装。 安装 Elasticsearch-head 需要提前安装好依赖软件 node 和 phantomjs。 node:是一个基于 Chrome V8 引擎的 JavaScript 运行环境。 phantomjs:是一个基于 webkit 的JavaScriptAPI,可以理解为一个隐形的浏览器,任何基于 webkit 浏览器做的事情,它都可以做到;

编译安装 node(时间较久,建议提前安装):

#上传软件包 node-v8.2.1.tar.gz 到/opt yum install gcc gcc-c++ make -y

cd /opt tar zxvf node-v8.2.1.tar.gz

cd node-v8.2.1/ ./configure make && make install

安装 phantomjs #上传软件包 phantomjs-2.1.1-linux-x86_64.tar.bz2 到 cd /opt tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2 cd /opt/phantomjs-2.1.1-linux-x86_64/bin cp phantomjs /usr/local/bin

安装 Elasticsearch-head 数据可视化工具 #上传软件包 elasticsearch-head-master.zip 到/opt cd /opt unzip elasticsearch-head-master.zip cd /opt/elasticsearch-head-master/ npm install         //安装依赖包

修改 Elasticsearch 主配置文件(vim /etc/elasticsearch/elasticsearch.yml)

--末尾添加以下内容-- http.cors.enabled: true                #开启跨域访问支持,默认为 false http.cors.allow-origin: "*"

然后systemctl restart elasticsearch

启动 elasticsearch-head 服务 #必须在解压后的 elasticsearch-head 目录下启动服务,进程会读取该目录下的 gruntfile.js 文件,否则可能启动失败。 cd /opt/elasticsearch-head-master npm run start &

 netstat -natp |grep 9100

 测试:

通过命令插入一个测试索引,索引为 index-demo,类型为 test。 curl -X PUT 'localhost:9200/index-demo/test/1?pretty&pretty' -H 'content-Type: application/json' -d '{"user":"zhangsan","mesg":"hello world"}' //输出结果如下: { "_index" : "index-demo", "_type" : "test", "_id" : "1", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 2, "failed" : 0 }, "created" : true }  

Logstash 部署(apache):

①安装Apahce服务(httpd) yum -y install httpd systemctl start httpd

②java -version

③安装logstash #上传软件包 logstash-6.7.2.rpm 到/opt目录下 cd /opt rpm -ivh logstash-6.7.2.rpm                           systemctl start logstash.service                       systemctl enable logstash.service

④ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

测试:logstash -e 'input { stdin{} } output { stdout{ codec=>rubydebug } }'

 ELK Kiabana 部署:

cd /opt rpm -ivh kibana-6.7.2-x86_64.rpm

设置 Kibana 的主配置文件 vim /etc/kibana/kibana.yml

--2--取消注释,Kiabana 服务的默认监听端口为5601 server.port: 5601 --7--取消注释,设置 Kiabana 的监听地址,0.0.0.0代表所有地址 server.host: "0.0.0.0" --28--取消注释,配置es服务器的ip,如果是集群则配置该集群中master节点的ip elasticsearch.url:  ["http://192.168.80.10:9200","http://192.168.80.11:9200"]  --37--取消注释,设置在 elasticsearch 中添加.kibana索引 kibana.index: ".kibana" --96--取消注释,配置kibana的日志文件路径(需手动创建),不然默认是messages里记录日志 logging.dest: /var/log/kibana.log

创建日志文件,启动 Kibana 服务

touch /var/log/kibana.log chown kibana:kibana /var/log/kibana.log

测试:

 Filebeat+ELK 部署:

/opt

tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz mv filebeat-6.7.2-linux-x86_64/ /usr/local/filebeat

设置 filebeat 的主配置文件 cd /usr/local/filebeat

vim filebeat.yml filebeat.inputs: - type: log         #指定 log 类型,从日志文件中读取消息   enabled: true   paths:     - /var/log/httpd/access_log       #指定监控的日志文件     - /var/log/*.log   tags: ["filebeat"]        #设置索引标签  自己添加   fields:           #可以使用 fields 配置选项设置一些参数字段添加到 output 中     service_name: httpd     log_type: access     from: 192.168.169.60

--------------Elasticsearch output------------------- (全部注释掉)

----------------Logstash output--------------------- output.logstash:   hosts: ["192.168.80.12:5044"]      #指定 logstash 的 IP 和端口

然后去logstash里面去做日志的输入输出过滤流:

filter {   grok {     match => ["message", "(?%{IPV6}|%{IPV4})[\s\-]+\[(?.*)\]\s+\"(?\S+)\s+(?.+)\"\s+(?\d+) \d+ \"(?.+)\" \"(?.*)\""]   } }

output {     elasticsearch {         hosts => ["192.168.169.20:9200","192.168.169.50:9200"]         index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"     }     stdout {         codec => rubydebug     } }

 然后施行:logstash -f filebeat.conf进行日志的事件创建

filter的四个插件:

①grok:大字段分割成小字段(可以使用内置正则,也可以使用自定义的正则) match => {"大字段名"} => ".* \- \- \[(?.+)\].+"

②date:对数据中的时间格式进行统一和格式化,把日志生成时间和事件生成时间统一 先做grok  然后match => ["acess_time(这是grok分割出的时间小字段)" ...] target => "@timestamp" (默认也是他) timezone => "Asia/Shanghai"

③mutate:对字段增删改查 add_field  添加新字段 remove_field 删除字段 replace     替换字段 rename      重命名 gsub        使用正则

④mutiline:对多行数据进行统一 pattern => "n\d..." 匹配第一行的信息 negate => true  取反 what => "previous"  向上合并  next向下合并

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: