lundi 4 mai 2015

setup 'bro' to log into elasticsearch

A recurring question on the bro mailling list is, "how do you setup bro to ship logs to elasticsearch?"
I'll explain how I've setup my bro network monitor and hopefully it will be useful to others as well.

Setup bro to log in json format:

There are others recipes where you can use grok to parse the text logs to json-ify it, but I find it easier and more stable if the application can log directly into json.
In you local.bro add the following lines:

redef LogAscii::json_timestamps = JSON::TS_ISO8601;
@load tuning/json-logs

The 1st line changes the format of the time stamp from UNIX epoch to Iso 8601, that make it easier for logstash to parse the date into @timestamp, the 2nd line loads a tunning script that will turn your logs into json.

restart bro with broctl restart --clean and you should be set.

Setup logstash input to feed on the logs:

Note that a few assumptions are made here.
I like to organise my logstash conf in different files, named [input, output, filter]-<purpose>.conf, this make it easy for templating and debugging. Off course you can always stuff it into a gigantic file, that entirely up to you :-) and in the output-default.conf, you will need to change the cluster name to whatever name your cluster has. you will also need to change the input path to match yours and so on ...
final gotcha; on Debian you will need to apt-get install logstash-contrib to have the translate plugin.

In /etc/logstash/conf.d/input-bro.conf, we specify the source files, the type and that the logs are in json.

input {
    file {
       type => 'bro_logs' 
       path => [ '/opt/spool/manager/*.log' ]
        codec => "json"
    }
}

In /etc/logstash/conf.d/filter-bro.conf, the translate will add a field named conn_state_full with the full text based on the content of conn_state, the grok will add a field named bro_type with the type of bro logs (conn, weird, dns, ssl ...) based on the file name, we could do that in the input part as well by giving all the file name and adding a specific type but I'm lazy and I always forget one file so ...

filter {
        if [type] == "bro_logs" {
                date { 
                        match => [ "ts", "ISO8601" ]
                }
                translate { field => "conn_state" destination => "conn_state_full" dictionary => [ "S0", "Attempt", "S1", "Established", "S2", "Originator close only", "S3", "Responder close only", "SF", "SYN/FIN completion", "REJ", "Rejected", "RSTO", "Originator aborted", "RSTR", "Responder aborted", "RSTOS0", "Originator SYN +  RST", "RSTRH", "Responder SYN ACK + RST", "SH", "Originator SYN + FIN", "SHR", "Responder SYN ACK + FIN", "OTH", "Midstream traffic" ] }
                grok {
                        match => { "path" => ".*\/(?<bro_type>[a-zA-Z0-9]+)\.log$"} 
                } 
        }
}


In /etc/logstash/conf.d/output-default.conf

output { 
        elasticsearch {
                cluster => 'elasticsearch'
        }
        #stdout { codec => rubydebug } 
}

restart logstash.

Conclusion:

By now, you should have logs in json and logstash shipping them to elasticsearch, you can now start kibana and visualize our bro logs:
pretty pictures ^_^

Hadoop / Spark2 snippet that took way too long to figure out

This is a collection of links and snippet that took me way too long to figure out; I've copied them here with a bit of documentation in...