sflow to elastic search
input {
pipe {
command => "/etc/logstash/capture.sh"
}
}
filter {
# Parse the date
date {
match => ["timestamp",
"MMM dd HH:mm:ss",
"MMM d HH:mm:ss",
"MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss"
]
}
}
filter{
grok {
match => ["message", "\"%{INT:pktNo}\",\"%{GREEDYDATA:field1}\",\"%{IP:source_ip}\",\"%{IP:destination_ip}\",\"%{WORD:Protocol}\",\"%{INT:pktLen}\",,,,,\"%{INT:source_port}\",\"%{INT:destination_port}\",\"%{GREEDYDATA:syslog_message}",
"message", "\"%{INT:pktNo}\",\"%{GREEDYDATA:field1}\",\"%{IP:source_ip}\",\"%{IP:destination_ip}\",\"%{WORD:Protocol}\",\"%{INT:pktLen}\",\"%{INT:source_port}\",\"%{INT:destination_port}\",,,\"%{GREEDYDATA:syslog_message}",
"message", "\"%{INT:pktNo}\",\"%{GREEDYDATA:field1}\",\"%{IP:source_ip}\",\"%{IP:destination_ip}\",\"%{WORD:Protocol}\",\"%{INT:pktLen}\",,,\"%{INT:source_port}\",\"%{INT:destination_port}\",\"%{GREEDYDATA:syslog_message}"
] }
geoip {
database => "/etc/logstash/GeoLite2-City.mmdb"
source => "destination_ip"
}
if "," in [source_ip] { drop{ } }
}
output {
elasticsearch {
hosts => ["http://xxx.xxx.xxx.xxx:9200"]
user => "elastic"
password => "changeme"
action => "index"
index => "indexname-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}