logstash filter mutate 组件实战案例
1.编写生成日志的脚本
cat > generate_log.py <<EOF
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : Jason Yin
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.baimeidashu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1]
, filemode='a',)
actions = ["浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券",
"搜索", "查看订单", "付款", "清空购物车"]
while True:
time.sleep(random.randint(1, 5))
user_id = random.randint(1, 10000)
# 对生成的浮点数保留2位有效数字.
price = round(random.uniform(15000, 30000),2)
action = random.choice(actions)
svip = random.choice([0,1])
logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id, action,svip,price))
EOF
2.生成测试日志
python generate_log.py /tmp/app.log
3.使用mutate组件分析日志
[root@baimeidashu-elk113 /etc/logstash/conf.d]#cat apps-to-es.conf
input {
file {
start_position => "beginning"
path => ["/tmp/app.log"]
}
}
filter {
# 对文本数据进行处理
mutate {
# 对message字段按照"|"进行切割
split => { "message" => "|" }
}
mutate {
# 添加字段
add_field => {
user_id => "%{[message][1]}"
action => "%{[message][2]}"
svip => "%{[message][3]}"
price => "%{[message][4]}"
}
}
mutate {
# 进行数据类型转换,将指定字段换成为期望的数据类型
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
mutate {
# 对字段进行重命名
rename => { "path" => "filepath" }
}
mutate {
# 移除指定的字段
remove_field => [ "@version","message" ]
}
}
output {
elasticsearch {
hosts => ["10.0.0.111:19200","10.0.0.112:19200","10.0.0.113:19200"]
index => "baimei-apps-%{+yyyy.MM.dd}"
}
stdout {
codec => rubydebug
}
}
启动:
logstash -rf /etc/logstash/conf.d/apps-to-es.conf --path.data /tmp/d7
5.kiban查看数据
4.启动logstash
[root@elk112 ~]# logstash -rf config/11-apps-to-es.conf
5.kiban查看数据
菜单栏 ---> Visualize library ---> 创建可视化 ---> Lens ---> 根据字段选择即可。
---> 基于聚合 ---> 指标 ---> 选择索引 --->
聚合: 把不同的 索引 结果合并到一起
欢迎来撩 : 汇总all