logstash根据message中的不同类型的日志输出到不同的es中
当我们从beats输入数据后,然后通过grok把需要的字段抽取出来,然后输出到es中,下面是简单的一个配置文件
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "uid:%{WORD:uid}[T ]channel:%{WORD:channel}[T ]os:%{WORD:os}[T ]money:%{WORD:money}[T ]timestamp:%{WORD:timestamp}"}
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "recharge-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}
geoip {
source => "clientip"
}
#https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html
目前有一个需求,现在我们的业务日志是存放在一个日志文件中,然后每行日志里面有一个logtype的字段指明日志的类型。这就需要我们filter的时候匹配到各种日志格式,然后根据logtype来分发到不同的index中。
看下面的例子:
我们的数据源是这个:
[2020/04/01 19:49:00 CST] [DEFAULT] [DEBG] (main.addUser:172) logtype:reg uid:10000 os:web channel:meizu version:1 udid:g8tnwtu0k7qdcc80 source:8 timestamp:1585741740 networkType:5g ip:205.48.79.22 area:10
grok匹配的模式是这个:
logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}
然后在output节点可以根据某些字段来处理不同的逻辑,最后我们的配置文件为:
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => [
#充值
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]money:%{WORD:money}",
#道具产出
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]itemid:%{WORD:cardid}[T ]number:%{WORD:lvto}",
#卡牌升级
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]cardid:%{WORD:cardid}[T ]lvto:%{WORD:lvto}",
#新手引导
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]process:%{WORD:process}",
#金币消耗
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]cost:%{WORD:cost}[T ]itemid:%{WORD:itemid}",
#登录login
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]regtime:%{WORD:regtime}[T ]lv:%{WORD:lv}[T ]rest_coin:%{WORD:rest_coin}",
#注册,这个要放最后,不然前后的都可以匹配 reg
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}"
]
}
}
}
output {
if [logtype] == "reg"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "reg-%{+YYYY.MM.dd}"
}
}
if [logtype] == "login"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "login-%{+YYYY.MM.dd}"
}
}
if [logtype] == "cost"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "cost-%{+YYYY.MM.dd}"
}
}
if [logtype] == "guide"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "guide-%{+YYYY.MM.dd}"
}
}
if [logtype] == "cardlevelup"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "cardlevelup-%{+YYYY.MM.dd}"
}
}
if [logtype] == "output"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "output-%{+YYYY.MM.dd}"
}
}
if [logtype] == "charge"{
elasticsearch {
hosts => ["http://192.168.1.29:9200"]
index => "charge-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
geoip {
source => "clientip"
}
#https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html