文章目录
环境准备日志读取及保存日志解析grok调试工具grok自定义格式
环境准备
准备kafka、logstash 、es 环境并启动服务,参考之前的文章这里不重复
日志读取及保存
启动zookeeper:[root@localhost kafka]# bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 启动kafka:[root@localhost kafka]# bin/kafka-server-start.sh -daemon config/server.properties
假设日志格式如下,包括基本字段(时间、IP、级别等)和业务字段(字段1,字段2,字段3)。
2021-05-21 15:28:50.093 192.168.8.96 1 error 4 [#字段1#][#字段2#][#字段3#]
消费消息并打印 配置文件如下,从kafka获取日志并直接打印:
input {
kafka {
bootstrap_servers => "192.168.195.11:9092"
topics => ["guardlog"]
group_id => "GuardLogGroup"
client_id => "guard_cli"
security_protocol => "SSL"
}
}
filter {
}
output {
stdout { } # 控制台输出
}
启动logstash
[root@localhost logstash]# bin/logstash -f config/logstash.conf --config.reload.automatic
向kakfa发送消息后,logstash接收并打印了消息。
消息输出到ES 启动ES[es@localhost es7.12]$ bin/elasticsearch -d 修改配置文件.output 增加 es输出如下:
output {
stdout { } # 控制台输出
elasticsearch {
hosts => ["192.168.195.12:9200"]
index => "guardlog"
}
}
插入kafka消息控制台有输出 es中也有了这条消息,postman查询结果如下
日志解析
上边我们从kafka读取日志并保存到了es 中但是可以看到,日志格式是一段文本,很多时候我们需要结构化的数据方便数据查询过滤等,因此我们在将日志保存到es前要对日志做解析和转换。 logstash提供了很多插件可以用来解析日志,对于常见的apache 等日志有现成的解析插件,这里我们使用grok插件来解析基本日志格式内容,并通过正则表达式和字符串分割解析出业务字段。
可以直接调用grok内置的正则格式,来解析日志内容,如下:通过IP 来解析日志中的请求者IP":%{IP:sag_ip}"
默认提供的格式见:patterns groke基于正则表达式库是Oniguruma,你可以在Oniguruma网站上看到完整支持的regexp语法。 中文学习正则表达式网站:http://www.runoob.com/regexp/regexp-tutorial.html
调试工具:调试
观察输出内容可以看到,当日志内容中有ip时会解析出来并在日志中增加一个字段sag_ip 来保存这个值,但是如果日志中没有找到IP那么就会增加tags字段,提示groke解析失败。测试发现如果有一个表达式没有匹配到内容那么就是报错(提示解析失败)其他字段也无法正确解析。实际中可以通过是否存在tags字段来判断是否处理成功,如果未处理成功那么可以将数据单独保存到一个地方方便分析
如下,我们使用 grok自带的表达式来解析时间、IP、tag、loglevel等内容,最后一个字段是使用我们自定义的正则表达式(\[#?)(?
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?
}
}
}
输入测试数据可以看到 ,数据被正常解析
{
"sag_tag" => "1",
"@version" => "1",
"message" => "2021-05-21 15:28:50.093 192.168.8.96 1 error 4 [#字段1#][#字段2#][#字段3#]",
"sag_ip" => "192.168.8.96",
"log_level" => "error",
"op_date" => "2021-05-21 15:28:50.093",
"log_src" => "4",
"@timestamp" => 2022-05-08T03:33:16.271Z,
"source" => "字段1#][#字段2#][#字段3"
}
可以在souce字段的基础上根据#][# 进行字段分割就可以得到 所有的业务字段,如下使用ruby来完成这个功能
filter {
grok {
pattern_definitions =>{
# MYNUM => "\d+",
FIELD => "(?:\[#).*(?:#\])"
SAGTAG => "\w+"
}
#patterns_dir =>["/root/logstash/patterns"]
match => {
"message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?
}
}
ruby{
code =>"
ipStr = event.get('source')
ipArray = ipStr.split('#][#',-1)
length = ipArray.length
path = ''
if length != 3
event.set('tags','mainMsgLengthError')
else
event.set('f1',ipArray[0])
event.set('f2',ipArray[1])
event.set('f3',ipArray[2])
end
"
}
}
再次测试可以发现业务字段已经都完成了解析
[2022-05-08T11:48:23,133][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=guard_cli-0, groupId=GuardLogGroup] Setting newly assigned partitions [guardlog-0]
{
"sag_tag" => "1",
"@version" => "1",
"f2" => "字段2",
"op_date" => "2021-05-21 15:28:50.093",
"source" => "字段1#][#字段2#][#字段3",
"f3" => "字段3",
"message" => "2021-05-21 15:28:50.093 192.168.8.96 1 error 4 [#字段1#][#字段2#][#字段3#]",
"sag_ip" => "192.168.8.96",
"log_level" => "error",
"log_src" => "4",
"@timestamp" => 2022-05-08T03:48:40.473Z,
"f1" => "字段1"
}
最后删除不必要的字段,同时增加输出判断 如果包含tags字段则输出到异常数据索引
filter {
grok {
pattern_definitions =>{
# MYNUM => "\d+",
FIELD => "(?:\[#).*(?:#\])"
SAGTAG => "\w+"
}
#patterns_dir =>["/root/logstash/patterns"]
match => {
"message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?
}
}
ruby{
code =>"
ipStr = event.get('source')
elasticsearch {
hosts => ["192.168.195.12:9200"]
index => "guardlog"
}
ipArray = ipStr.split('#][#',-1)
length = ipArray.length
path = ''
if length != 3
event.set('tags','mainMsgLengthError')
else
event.set('f1',ipArray[0])
event.set('f2',ipArray[1])
event.set('f3',ipArray[2])
end
"
}
if [tags] {
#如果出错了,不删除内容直接保存到
} else {
#如果没有出错择删除不必要的字段 并保存到es
mutate {remove_field => ["source","message","@timestamp","@version"]}
}
}
output {
stdout { } # 控制台输出
if [tags] {
elasticsearch {
hosts => ["192.168.195.12:9200"]
index => "guarderrlog"
}
} else {
elasticsearch {
hosts => ["192.168.195.12:9200"]
index => "guardlog"
}
}
}
再次测试,没有了多余的字段,同时一旦解析出错就会保存到guarderrlog索引中
[2022-05-08T11:55:47,875][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=guard_cli-0, groupId=GuardLogGroup] Setting newly assigned partitions [guardlog-0]
{
"sag_tag" => "1",
"f2" => "字段2",
"op_date" => "2021-05-21 15:28:50.093",
"f3" => "字段3",
"sag_ip" => "192.168.8.96",
"log_level" => "error",
"log_src" => "4",
"f1" => "字段1"
}
grok调试工具
上述过程中我们每次都通过运行logstash来验证我们解析的正确性,很繁琐,实际中我们有必要搞一套grok测试工具。以方便对解析过程的测试。网络上的工具有时候不稳定,可以在本地启动调试服务。 docker环境下运行一下命令就可以运行本地的测试服务,
[root@localhost ~]# docker pull qiudev/grokdebugger
[root@localhost ~]# docker run -d --name grokdebugger -p 19999:9999
启动后我们访问 http://192.168.195.10:19999/ 就可以使用本地调试服务
grok自定义格式
如果grok自带的表达式不能满足需求,还可以自定义表达式,在logstash 中自定义表达式有多种方式 通过pattern_definitions 属性来定义
grok {
pattern_definitions =>{
# MYNUM => "\d+",
FIELD => "(?:\[#).*(?:#\])"
SAGTAG => "\w+"
}
match => {
"message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?
}
}
通过文件来定义
定义一个模式文件里边每行都是一个模式
[root@192 patterns]# pwd
/root/logstashacc/patterns
[root@192 patterns]# cat pattern
#LOG_TIME (?>\d\d){1,2}[/-](?:0[1-9]|1[0-2])[/-](?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
MYNUM [-]\d+
然后再grok配置文件中通过patterns_dir 属性配置好模式文件所在的目录即可
grok {
patterns_dir =>["/root/logstash/patterns"]
match => {
"message" => "%{TIMESTAMP_ISO8601:op_date} \s*%{IP:sag_ip} \s*%{DATA:sag_tag} \s*%{LOGLEVEL:log_level} \s*%{DATA:log_src} \s*(\[#?)(?
}
}
直接在配置文件中使用 如上述配置中的 (\[#?)(?
关于表达式的详细规则和语法 请参考这个不错的学习网站 编程胶囊
参考文章
发表评论