sflow to elastic search
input { pipe { command => "/etc/logstash/capture.sh" } } filter { # Parse the date date { match => ["timestamp", "MMM dd HH:mm:ss", "MMM d HH:mm:ss", "MMM dd yyyy HH:mm:ss", "MMM d yyyy HH:mm:ss" ] } } filter{ grok { match => ["message", "\"%{INT:pktNo}\",\"%{GREEDYDATA:field1}\",\"%{IP:source_ip}\",\"%{IP:destination_ip}\",\"%{WORD:Protocol}\",\"%{INT:pktLen}\",,,,,\"%{INT:source_port}\",\"%{INT:destination_port}\",\"%{GREEDYDATA:syslog_message}", "message", "\"%{INT:pktNo}\",\"%{GREEDYDATA:field1}\",\"%{IP:source_ip}\",\"%{IP:destination_ip}\",\"%{WORD:Protocol}\",\"%{INT:pktLen}\",\"%{INT:source_port}\",\"%{INT:destination_port}\",,,\"%{GREEDYDATA:syslog_message}", "message", "\"%{INT:pktNo}\",\"%{GREEDYDATA:field1}\",\"%{IP:source_ip}\",\"%{IP:destination_ip}\",\"%{WORD:Protocol}\",\"%{INT:pktLen}\",,,\"%{INT:source_port}\",\"%{INT:destination_port}\",\"%{GREEDYDATA:syslog_message}" ] } geoip { database => "/etc/logstash/GeoLite2-City.mmdb" source => "destination_ip" } if "," in [source_ip] { drop{ } } } output { elasticsearch { hosts => ["http://xxx.xxx.xxx.xxx:9200"] user => "elastic" password => "changeme" action => "index" index => "indexname-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }