logstash根据message中的不同类型的日志输出到不同的es中

当我们从beats输入数据后,然后通过grok把需要的字段抽取出来,然后输出到es中,下面是简单的一个配置文件
input {
beats {
port => "5044"
}
}

filter {
grok {
match => { "message" => "uid:%{WORD:uid}[T ]channel:%{WORD:channel}[T ]os:%{WORD:os}[T ]money:%{WORD:money}[T ]timestamp:%{WORD:timestamp}"}
}

}

output {

elasticsearch {
	hosts => ["http://192.168.1.29:9200"]
	index => "recharge-%{+YYYY.MM.dd}"
}

stdout { codec => rubydebug }

}

geoip {

source => "clientip"

}

#https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html

目前有一个需求,现在我们的业务日志是存放在一个日志文件中,然后每行日志里面有一个logtype的字段指明日志的类型。这就需要我们filter的时候匹配到各种日志格式,然后根据logtype来分发到不同的index中。

看下面的例子:

我们的数据源是这个:
[2020/04/01 19:49:00 CST] [DEFAULT] [DEBG] (main.addUser:172) logtype:reg uid:10000 os:web channel:meizu version:1 udid:g8tnwtu0k7qdcc80 source:8 timestamp:1585741740 networkType:5g ip:205.48.79.22 area:10

grok匹配的模式是这个:
logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}

然后在output节点可以根据某些字段来处理不同的逻辑,最后我们的配置文件为:
input {
beats {
port => "5044"
}
}

filter {
grok {
match => { "message" => [
#充值
"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]money:%{WORD:money}",

		#道具产出
		"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]itemid:%{WORD:cardid}[T ]number:%{WORD:lvto}",

		#卡牌升级
		"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]cardid:%{WORD:cardid}[T ]lvto:%{WORD:lvto}",

		#新手引导
		"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]process:%{WORD:process}",


		#金币消耗
		"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]cost:%{WORD:cost}[T ]itemid:%{WORD:itemid}",


		#登录login
		"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}[T ]regtime:%{WORD:regtime}[T ]lv:%{WORD:lv}[T ]rest_coin:%{WORD:rest_coin}",


		#注册,这个要放最后,不然前后的都可以匹配 reg
		"logtype:%{WORD:logtype}[T ]uid:%{WORD:uid}[T ]os:%{WORD:os}[T ]channel:%{WORD:channel}[T ]version:%{WORD:version}[T ]udid:%{WORD:udid}[T ]source:%{WORD:source}[T ]timestamp:%{WORD:timestamp}[T ]networkType:%{WORD:networkType}[T ]ip:%{IP:ip}[T ]area:%{WORD:area}"
	]
	}
}

}

output {

if [logtype] == "reg"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "reg-%{+YYYY.MM.dd}"
	}
}
if [logtype] == "login"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "login-%{+YYYY.MM.dd}"
	}
}
if [logtype] == "cost"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "cost-%{+YYYY.MM.dd}"
	}
}
if [logtype] == "guide"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "guide-%{+YYYY.MM.dd}"
	}
}
if [logtype] == "cardlevelup"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "cardlevelup-%{+YYYY.MM.dd}"
	}
}
if [logtype] == "output"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "output-%{+YYYY.MM.dd}"
	}
}
if [logtype] == "charge"{
	elasticsearch {
		hosts => ["http://192.168.1.29:9200"]
		index => "charge-%{+YYYY.MM.dd}"
	}
}


stdout { codec => rubydebug }

}

geoip {

source => "clientip"

}

#https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html