jeudi 14 mai 2020

Hadoop / Spark2 snippet that took way too long to figure out


This is a collection of links and snippet that took me way too long to figure out; I've copied them here with a bit of documentation in the hope that others find it useful.

Copy data from one cluster to another

This snippet was made to copy large (42TB) from one cluster to another, the default options don't play very well here.
We basically increase the number of mapper (-m100 seems to work well with 40 data nodes), increase the timeout (-D mapred.task.timeout) run in update mode to be able to recover and retry (-update), ignore error (-i), log the errors to hdfs for investigation (-log ) and we also disable the crc check (-skipcrccheck) as we are copying between 2 different encryption zones.

hadoop distcp  -D mapred.task.timeout=1800000 -pb -pp -m100 -update -copybuffersize 65536 -i -log /application/distcp -skipcrccheck -v  hdfs://namenode1.com/application/logs/year=2018/month=11 /application/logs/year=2018/month=11

A snippet of python to convert data from Avro (old cluster) to parquet (new cluster)

Note that this requires Spark 2.3 and to run it you'll need to add an extra package to be able to read the Avro files:

/bin/spark-submit --queue batch_queue --packages com.databricks:spark-avro_2.11:3.2.0 sparkConversion.py
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, types
from pyspark.sql.functions import udf
import logging

logger = logging.getLogger()

appName = "spark test"

conf = SparkConf().setAppName(appName)
conf.set('spark.hadoop.avro.mapred.ignore.inputs.without.extension', 'false')

sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
sqlContext = SQLContext(sc)

df = sqlContext.read.format("com.databricks.spark.avro").load("/application/day=26")
logger.info("Data frame loaded, it contains {0} element".format(df.count()))

df.write.format("org.apache.spark.sql.execution.datasources.parquet").save("/application/app1/year=2018/month=11/day=26/")



The starting point for _start_ script:

In order this initialize:
  1. Kerberos from a keytab
  2. submit the job in client mode (usually the best for reading from HDFS)
  3. force the use of the official HortonWorks repo to pull external artefacts
  4. add a property file to override some Spark defaults
  5. setup an HTTP proxy
  6. force the use of a given jaas file for Kafa connection
  7. pass on the external python modules in a zip
  8. invoke the job from the src directory


#!/bin/bash -xp

export SPARK_MAJOR_VERSION=2
#export PYSPARK_PYTHON=python3.4

BASE_DIR=$(dirname $0)
TOOL_DIRECTORY=$(realpath ${BASE_DIR})
ROOT_DIRECTORY=$(realpath ${BASE_DIR}/../)
DIST_DIRECTORY=$(realpath ${TOOL_DIRECTORY}/../dist/)
SRC_DIRECTORY=$(realpath ${TOOL_DIRECTORY}/../src)

cd  ${TOOL_DIRECTORY}

kinit -kt ~/user.keytab user@REALM.NET

/bin/spark-submit --master=yarn --deploy-mode=client \
--repositories http://repo.hortonworks.com/content/groups/public \
--properties-file ${TOOL_DIRECTORY}/spark-streaming.conf \
--executor-memory 4g \
--driver-memory 60g \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./kafkajaas.conf -Dhttp.proxyHost=proxy.net -Dhttp.proxyPort=3128 -Dhttps.proxyHost=proxy.net -Dhttps.proxyPort=3128 -Dhttp.nonProxyHosts='localhost|*.realm.net|10.*.*.*'  -Djava.io.tmpdir=/scratch/tmp " \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafkajaas.conf -Djavax.net.ssl.trustStore=cacerts.jks -Dhttps.proxyHost=proxy.net -Dhttps.proxyPort=3128 -Dhttp.proxyHost=proxy.net -Dhttp.proxyPort=3128 -Dhttp.nonProxyHosts='localhost|*.realm.net|10.*.*.*'  -Djava.io.tmpdir=/scratch/tmp" \
--keytab ~/user.keytab \
--principal user@REALM.NET \
--supervise \
--py-files  /scratch/libs.zip \
${SRC_DIRECTORY}/$1

Problem; reading from a remote HDFS cluster which has Kerberos enabled:
You start the spark job on a DR/newer cluster and you want to access data located on the other DR site or on an older cluster and has soon as Spark tries to access the remote FS you are greeted with this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 42, host3.net, executor 1): java.io.IOException: DestHost:destPort host1.net:8020 , LocalHost:localPort host2.net/10.4.16.51:0. Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]

hdfs dfs commands perform exactly as they should and the user can access the files using other methods, only Spark fails.
This error means that Spark failed to obtain a delegation token when accessing the HDFS RPC services on the remote cluster...
So what is a delegation token?

When a cluster is Kerberized, applications need to obtain tickets prior to access services and run jobs, Kerberos is not only used for authentication it is also used to allow users to access services like a MapReduce job.
If all the worker's tasks have to authenticate via Kerberos using a delegated TGT (Ticket Granting Ticket), the Kerberos Key Distribution Center (KDC) be overwhelmed by the amount of request so Delegation Tokens are introduced as a lightweight authentication method to complement Kerberos authentication.
The way Delegation Tokens works is:

1. The client initially authenticates with each server via Kerberos, and obtains a Delegation Token from that server.
2. The client uses the Delegation Tokens for subsequent authentications with the servers instead of reaching out to the KDC again.


In this case, the service Spark was not able to get the delegation tokens for HDFS and YARN service(s) which are two main components for writing data and running jobs.

So we need to force Spark to obtain a delegation token for both clusters, we can do this by overwriting a property (as always) using
spark.yarn.access.hadoopFileSystems=hdfs://,hdfs://

Note that Spark must have access to the filesystems listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm), then Spark will acquire security tokens for each of the file systems so that the Spark application can access those remote Hadoop file systems.
This also means that you can only list *ACTIVE* name nodes in that property and not the standby ones.



Useful resources

  • http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/ for all things related to monitoring your Spark jobs once they reach production maturity.
  • https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/ on all things Kerberos and hadoop-ish. REALLY A MUST.
  • https://spark.apache.org/docs/latest/running-on-yarn.html#kerberos on Spark & Kerberos.
  • https://bigdatapath.wordpress.com/2018/01/09/hadoop-delegation-tokens-explained/ on delegation tokens.
  • http://lzhbrian.me/mywiki_mkdocs/pyspark/ convenient PySpark snippet to get you started the right way

mardi 27 mars 2018

Lessons learned building a big data solution oriented towards security logs AKA a "security data lake"

My company decided to give a go at the big data security log search, most information out there talks about implementing machine learning and other features using (cool) Jupyter notebooks and proof of concept sort of setup to accomplish that. However, trying to implement an infrastructure to accomplish that at large scale, reliably turned out to be an epic story.

The high level requirements goes like this;

  • We want to be able to search events in near real time and perform full text search on events and create dashboards for specific use cases depending on the investigation.
  • We want to store both the data in pristine and parsed format for at least a year for audit and investigation purposes.
  • We want to be able to replay older logs through the pipeline to fix/update the parser-augmenter pipeline.
At this stage some of you might say that Apache Metron or Apache Spot and you would be right to do so, the problem we found was the list of supported log parser for metron isn't that long and are linked to specific version. This is not abnormal but in our large and mixed environment (the telecom world has a lot of strange devices) the benefit were not so clear, on top of that using 2 tools means 2 different data models, interfaces and so on to work with. The full story is a lot longer and in retrospect some point were overlooked and some difficulties were over evaluated, the importance of having a tool for the analyst to manipulate the data and query them extensively was overlooked, it's called Stellar on the Metron side.

Laying out the tracks

The bird's eye overview is around 3 steps, ingestion, parsing and augmentation, storage

Ingestion is typically done through a syslog server in a Linux environment and using a Windows event collector in a Microsoft shop, this is already a problem; none of these provide high availability and scaling can only be done via a divide an conquer approach (host A&B go on that server, B&C on that other server and so on). One approach to address that issue would be to use minifi to read the log files from the syslog servers (active-active) and ship them to the nifi cluster, there we use a detect duplicate processor to ship only one copy of the log to the next layer.  For windows, you might want to use NXLog to ship your windows logs either to files and then use minifi or emit them directly towards Kafka. Once the logs have reached nifi, the 1st thing to do is to wrap them in a json envelope to add the meta data (Time of ingestion, server from which it's coming from, the type of logs ...) and then ship them to Kafka. This will allow you to be handle to back pressure nicely and do point in time recovery if the subsequent part of the pipeline ever have problems or slow down.
The above architecture result in something like this:



Parsing and augmentation is where the heavy duty processing is done.
To parse an event is to extract business data out it like IP's, username and URL from it, you can do it with various tools depending on the skills and constraint of your shop, our proof of concept used logstash to do that task, the pros are it's easy to implement and very near real time oriented (low latency) the con's are that it doesn't handle back pressure very well (one slow parser will slow down all log sources) and it will generally require fire power than others. Nifi on the other hand is micro batches oriented which means more delay, if you have a hard requirement on low latency (less than 60 seconds) this is most likely a no go, on top of that any non trivial parsing on logs that aren't record oriented (Linux auth logs or windows logs) are a massive pain IMHO, there's some work being done on that with record orient processor that can use grok expression to do the parsing but it's still sub optimal. It's a tough problem, Metron uses apache storm to do their parsing to have enough fire power and flexibility to implement complex logic but storm is primarily Java and since our team doesn't have enough Java dev's we settle for Spark since it has a Python API pySpark.
To augment an event is to add more information into it, simple example is to add the geoip information for an IP address, a more complex example would be to add the username associated with an internal IP address or a internal user and IP reputation system (there's a lot to be done here so I'll probably expand on the subject in another post) ...
Again we'll use Spark for the job, there a fair amount of boiler plate code to help you start properly and focus on the business logic rather than fight the tools to get them to work.

Storage is where we store the event for later display, reporting and again we face an amusing issue in that the tools for long term archival and near real time searches are very different, for the real time search we settle on Solr and ElasticSearch, and at the time we settled for Solr since it was included in the stack we use (Ambari) for the long term retention (HDFS) , while Solr is not a bad product, feature wise it's lagging behind, on the UI front, the only generic interface that we found is "banana" which is a fork of Kibana 3 which is old to say the least...On top of all that Elastic has been moving extremely fast with either the machine learning component in the x-pack and the vega scriptable UI, on top of that the core business of Solr seems to be the content management search, which means that the typical workload is read heavy and few writes, exactly the opposite of a log search tool where there will be massive and continuous write and much fewer reads which means that a lot of default value are not suitable for near real time search, see the link for the Solr wiki on the topic.

No matter which one you choose, keep in mind to spread your data over a large number of disks to max the number IOPS the cluster can perform and maximize the effect of the buffer cache, increase the number of shards to match the number of physical disks to enhance write & read performances, keep the replica down to a manageable size.
On the long term storage front, there's is also a couple of things to consider like the file format, there's a lot of discussions about which file format is best but basically you want one of the packed/binary file format like avro, ORC or parquet. Preferably one of the column oriented ones which allow you to change the schema (add or remove fields) over time without it being a breaking change (leaving out avro), knowing all that, we settle for parquet because it has a good support in Spark and it's compression ratio are pretty good which allow us to archive a bit more before using up all the disk space. You will also want to partition the data, that means essentially to store the files in different directories based on the time, something like /hadoop/proxy/year=2018/month=02/day=01/file.parquet this makes it easy to write a scrubber job to delete old data per type depending on the retention period of your logs, pretty much the same way we used to do it on a syslog server.

Now that we layed out the tracks, metrics, out of memory, kerberos, config management, writing a spark job for the parsing, seting up hive table as external because you want to keep the data, ...

Spark streaming with Kerberos enabled on Ambari

In the scope of a research project, I'm parsing syslog logs to stuff them into HDFS (another post on that will follow) and one of the requirement is to have Kerberos enabled on the Kafa brokers, well it turns out it's a bit more trouble than what I asked for and not so well documented.
So here's a brain dump in case you find yourself in the same situation;

My pySpark(2) job flow is as follow, it uses a kafka stream to read from one topic, parse the log line (raw field), add some fields and then emit the resulting json message to another topic, I kept the thing close to what I'm doing so that it's a somewhat relevant example ad not just a simple word count that doesn't illustrate much in my mind.

setup the jaas file

To tell the jvm we're using Kerberos and what credentials to use and all that, once again all the example I've seen use password based auth which is irrelevant in any production context (again ymmv), so here's the jaas file:
Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="/home/user/user.keytab"
        useTicketCache=true
        serviceName="zookeeper"
        debug=true
        principal="user@DOMAIN.NET";
};
KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="/home/user/user.keytab"
        useTicketCache=true
        serviceName="kafka"
        debug=true
        principal="user@DOMAIN.NET";
};

Create a keytab

We need to create a keytab file, as referenced by the jass file,  you can create it with those instructions:
$ktutil
ktutil: addent  -password -p your_id97@BGC.NET -k 1 -e aes256-cts-hmac-sha1-96
password for user@DOMAIN.NET: (ktutil is prompting for the password)
ktutil:  wkt your_id.keytab
ktutil:  q
$kinit –kt your_id.keytab user@DOMAIN.NET

Now that the jvm kerberos setup is ok and the keytab is initialized we can do the adjustment in the python.

Setup spark streaming


ssc = StreamingContext(sc, 1)

 kafkaStream = KafkaUtils.createStream(ssc, 'zookeeper:2181', consumer_name, {'ingest-topic': 1}, 
  kafkaParams = {'security.protocol': 'PLAINTEXTSASL',
   'sasl.kerberos.service.name': 'kafka',
   'group.id': consumer_name,
   'rebalance.backoff.ms': '5000',
   'zookeeper.session.timeout.ms': '10000' },
  storageLevel = StorageLevel.MEMORY_AND_DISK_SER)

 kafkaStream.foreachRDD(handler)

The important bit for Kerberos is the 'kafkaParams', note that the security.protocol is *really* 'PLAINTEXTSASL', some other part of the eco system uses SASL_PLAINTEXT but this one uses PLAINTEXTSASL.
The sasl service name is linked to the content of the jaas file, so make sure they match as it's the way the jvm finds the other options required.

Setup kafka producer

Now we want to emit to Kafka, using KafkaProducer:
producer = KafkaProducer(security_protocol = "SASL_PLAINTEXT",
  sasl_mechanism="GSSAPI",
  bootstrap_servers="kafkabroker:6667",
  client_id=consumer_name)
Note that this time the security protocol is "SASL_PLAINTEXT" and no longer PLAINTEXTSASL, even though it's 2 different libraries, a bit of consistency would be nice, note that you'll need GSSAPI libraries (dev) and python gssapi modules for it to support Kerberos.

Fire it up

Now we need to start our spark job with the following invocation:
Launch script:
#!/bin/bash -xp
export SPARK_MAJOR_VERSION=2
/bin/spark-submit --master=yarn --deploy-mode=cluster \
--files kafka_jaas.conf,myuser.keytab \
--repositories http://repo.hortonworks.com/content/groups/public \
--packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1.2.6.1.0-129 \
--properties-file spark-streaming.conf \
--driver-memory 5g \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_jaas.conf -Djavax.net.ssl.trustStore=ca_certs.jks" \
--py-files main.py

The important bits here are the extraJavaOptions to point to the jass file we want to use and the packages option that tells spark to load extra jars to allow Kafka streaming to work.
Note that running a pyspark job like this isn't really practical, I recommend using something like https://github.com/ekampf/PySpark-Boilerplate that setups a zip file with all your data file and deps, remember since the spark job will be distributed all over your workers, they all need the same exact copy of the script and dependencies.
You'll also need to bootstrap the keytab somewhere in your job, simply using
print '[+] Setting up Kerberos: \n%s\n' % (subprocess.check_output(['kinit', '-kt', 'user.keytab', 'user@DOMAIN.NET']))
did the trick for me.

One additional trick if it doesn't work is to add the following line before doing anything in your script:

Debugging

# to debug kerberos problems
import logging
logging.basicConfig(filename='/tmp/example.log',level=logging.DEBUG)
It will generate a massive amount of logs but at least it gives you some data to help you debug the thing.

Putting everything together

# Import dependencies
from __future__ import print_function
from pyspark import SparkContext, StorageLevel
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaProducer
import json
from pygrok import Grok
from datetime import *
import pytz
from dateutil.parser import *

consumer_name = "SyslogImporter"
syslog_pattern = "(<%{NUMBER:SyslogPriority}>)?%{TIMESTAMP_ISO8601:Timestamp}\s%{HOSTNAME:SourceHostname}\s({SYSLOGPROG:SyslogProgram}\[%{NUMBER:SyslogProgramId}\])?"

def grok_syslog(event):
    grok = Grok(syslog_pattern)
    raw = json.loads(event)
    parsed_syslog = grok.match(raw['Raw'])
    if parsed_syslog:
     parsed_syslog["tags"] = ["syslog"]
     parsed_syslog["parsers"] = [consumer_name]
    else:
     parsed_syslog = json.loads(event)
     parsed_syslog["Timestamp"] = parsed_syslog["IngestionTimestamp"]
     parsed_syslog["tags"] = ["not_syslog"]
     parsed_syslog["parsers"] = [consumer_name]    
    new_event = {key: value for (key, value) in (raw.items() + parsed_syslog.items())}
    return new_event

def parse_syslog(event):
    utc_tz = pytz.timezone('UTC')
    log = grok_syslog(event)
    ts = log['Timestamp']
    ts_dt = parse(ts)
    log['Timestamp'] = ts_dt.astimezone(utc_tz).isoformat()
    return log

def handler(message):
 records = message.collect()
 for raw_record in records:
  #saving the original to send it as such in case of exception
  syslog_record_string = raw_record[1]
  event_id = "0"
  try:
   syslog_record = parse_syslog(raw_record[1])
   event_id = syslog_record["Id"]
   syslog_record_string = json.dumps(syslog_record, ensure_ascii=False).encode('utf8')
  except Exception as e:
   print("[-] Something barfed" + str(e))
   print(raw_record)
   pass
  producer.send(b'work-topic', bytes(syslog_record_string))
  producer.flush()

def main():
 sc = SparkContext(appName=consumer_name) 
 ssc = StreamingContext(sc, 1)

 kafkaStream = KafkaUtils.createStream(ssc, 'zookeeper:2181', consumer_name, {'ingest-topic': 1}, 
  kafkaParams = {'security.protocol': 'PLAINTEXTSASL',
   'sasl.kerberos.service.name': 'kafka',
   'group.id': consumer_name,
   'rebalance.backoff.ms': '5000',
   'zookeeper.session.timeout.ms': '10000' },
  storageLevel = StorageLevel.MEMORY_AND_DISK_SER)

 kafkaStream.foreachRDD(handler)

 # Start the streaming context
 ssc.start()
 ssc.awaitTermination()

if __name__ == "__main__":
print '[+] Setting up Kerberos: \n%s\n' % (subprocess.check_output(['kinit', '-kt', 'user.keytab', 'user@DOMAIN.NET']))
 producer = KafkaProducer(security_protocol = "SASL_PLAINTEXT",
  sasl_mechanism="GSSAPI",
  bootstrap_servers="kafkabroker:6667",
  client_id=consumer_name)
 main()

lundi 2 octobre 2017

Incident response on a shoestring budget

This blog post is the first in a series that aim to setup a basic security monitoring on a low budget, nothing but a few servers, gray matter and a lot of open source software and try to avoid vendor$ and all.

One of the problem to solve on a large network is to know who is talking to whom. host to host communication are easy to monitor using bro or netflow but knowing which software communicates over what connection is a whole different problem (sometimes called endpoint visibility).
Good news is Facebook released osquery and it is now available on all platforms, osquery allow you to access pretty much all the data of a computer using SQL.
For instance something like 
 
osquery> SELECT uid, name FROM listening_ports l, processes p WHERE l.pid=p.pid;
+------+-------------+
| uid  | name        |
+------+-------------+
| 1000 | dbus-daemon |
+------+-------------+

Would give you all the uid and process name of the process having listening ports (server process if you want).
It also has a daemon (osqueryd) that allows you to have scheduled query and you can save the output of those queries in ElasticSearch for stacking and analysis.

The plan is to have queries running at regular interval that gives you the process name, uid, checksum (sha1) and the connections.
something like this:
osquery> select action, protocol, local_address, local_port, remote_address, remote_port, uid, name FROM processes p, socket_events s WHERE s.pid=p.pid;
scheduled at regular interval would do the trick.
Once setup to run it will return something like this on regular interval:
{
 "name": "network_info",
  "hostIdentifier": "goldorak",
  "calendarTime": "Tue Dec 13 15:10:08 2016 UTC",
  "unixTime": "1481641808",
  "decorations": {
   "host_uuid": "FC1D7B01-5138-11CB-B85D-C04D3A0C6645",
   "username": "someusername"
  },
 "columns": {
    "action": "connect",
    "local_address": "",
    "local_port": "0",
    "name": "DNS Res~er #115",
    "protocol": "14578",
    "remote_address": "192.168.1.1",
    "remote_port": "53",
    "uid": "1000"
  },
  "action": "added"
}

The next thing I want to look at is the network forensic problem, I define it like this: I want to know at any point in time which host on my network talk to what other host. First Ideally, I want to know the protocol and meta data associated like SSL/TLS cert, file extraction and so on. Now that we have the same sort of information already from osquery (pid, process name and connection ...) we should ideally be able to stitch it all that together to have a global view, the source could be netflow or bro metadata and the same stitching could be done for Snort or Suricata alerts to add more context and  details for the analyst.

vendredi 29 septembre 2017

Hive external tables shows no data

I'm currently working on a project that ingest a lot of security related data (firewall logs, http proxy logs, ...) and the goal is to keep them for a year or so to be able to do analytics and go back in time in case of security incident and have a clue what the "bad guy" did (follow compromised asset or user).

The plan is to store the data in avro format on HDFS and then create Hive table on top, ideally the table should be external so that Hive doesn't remove the original files in case I need it for something else.

To create an external table using avro formatted data you typically do this:

CREATE EXTERNAL TABLE cyber.proxy_logs
PARTITIONED BY (year STRING, month STRING, day STRING)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/cyber/proxy_logs/'
TBLPROPERTIES ('avro.schema.literal'=' { "type" : "record", "name" : "proxy", "fields" : [ { "name" : "DeviceHostname", "type" : "string", "default" : "" }  { "name" : "ResponseTime", "type" : "string", "default" : "" }, { "name" : "SourceIp", "type" : "string", "default" : "" }, { "name" : "SourcePort", "type" : "string", "default" : "" }, { "name" : "Username", "type" : "string", "default" : "" } ] } ');

This will create a table mapped to our data, the files in HDFS are organized like this:

/hadoop/proxy/year=2017/month=09/day=19
/hadoop/proxy/year=2017/month=09/day=20
...

So that we have 3 levels of partitioning making it easy to do operations on yearly, monthly and daily basis or at least that's the plan.
Once you've done that, you need to tell Hive to add your partitions using:

ALTER TABLE proxy_logs ADD PARTITION(year=2017, month="09", day=19);
...

We need to do that every day since our partitioning is done daily, at this point when I ran a
select count(*) from proxy_logs
I didn't get any result at all... turns out you also need to recompute the statistics for the table metadata to be up to date, I did this with
ANALYZE TABLE proxy_logs partition(year=2017, month="09", day=19) compute statistics;
and only then was I able to access my logs, I will now need to do this for every partitions of every table, time to get scripting!

lundi 4 mai 2015

setup 'bro' to log into elasticsearch

A recurring question on the bro mailling list is, "how do you setup bro to ship logs to elasticsearch?"
I'll explain how I've setup my bro network monitor and hopefully it will be useful to others as well.

Setup bro to log in json format:

There are others recipes where you can use grok to parse the text logs to json-ify it, but I find it easier and more stable if the application can log directly into json.
In you local.bro add the following lines:

redef LogAscii::json_timestamps = JSON::TS_ISO8601;
@load tuning/json-logs

The 1st line changes the format of the time stamp from UNIX epoch to Iso 8601, that make it easier for logstash to parse the date into @timestamp, the 2nd line loads a tunning script that will turn your logs into json.

restart bro with broctl restart --clean and you should be set.

Setup logstash input to feed on the logs:

Note that a few assumptions are made here.
I like to organise my logstash conf in different files, named [input, output, filter]-<purpose>.conf, this make it easy for templating and debugging. Off course you can always stuff it into a gigantic file, that entirely up to you :-) and in the output-default.conf, you will need to change the cluster name to whatever name your cluster has. you will also need to change the input path to match yours and so on ...
final gotcha; on Debian you will need to apt-get install logstash-contrib to have the translate plugin.

In /etc/logstash/conf.d/input-bro.conf, we specify the source files, the type and that the logs are in json.

input {
    file {
       type => 'bro_logs' 
       path => [ '/opt/spool/manager/*.log' ]
        codec => "json"
    }
}

In /etc/logstash/conf.d/filter-bro.conf, the translate will add a field named conn_state_full with the full text based on the content of conn_state, the grok will add a field named bro_type with the type of bro logs (conn, weird, dns, ssl ...) based on the file name, we could do that in the input part as well by giving all the file name and adding a specific type but I'm lazy and I always forget one file so ...

filter {
        if [type] == "bro_logs" {
                date { 
                        match => [ "ts", "ISO8601" ]
                }
                translate { field => "conn_state" destination => "conn_state_full" dictionary => [ "S0", "Attempt", "S1", "Established", "S2", "Originator close only", "S3", "Responder close only", "SF", "SYN/FIN completion", "REJ", "Rejected", "RSTO", "Originator aborted", "RSTR", "Responder aborted", "RSTOS0", "Originator SYN +  RST", "RSTRH", "Responder SYN ACK + RST", "SH", "Originator SYN + FIN", "SHR", "Responder SYN ACK + FIN", "OTH", "Midstream traffic" ] }
                grok {
                        match => { "path" => ".*\/(?<bro_type>[a-zA-Z0-9]+)\.log$"} 
                } 
        }
}


In /etc/logstash/conf.d/output-default.conf

output { 
        elasticsearch {
                cluster => 'elasticsearch'
        }
        #stdout { codec => rubydebug } 
}

restart logstash.

Conclusion:

By now, you should have logs in json and logstash shipping them to elasticsearch, you can now start kibana and visualize our bro logs:
pretty pictures ^_^

mercredi 22 avril 2015

Honeypot malware collection

The setup

I've been toying with the idea of installing a kippo honeypot in my company network for a while but 6 months ago I had some time to make a basic setup.
By basic, I mean that kippo runs:

  • as a captive user
  • in a chroot
  • listen to a non privileged port (I used iptables forwarding to redirect external traffic targeted to port 22 to kippo's active port) 
  • on a host that's blacklisted in the IDS and iptables config of the rest of the machine (to prevent a lateral move)

The non basic version would involve selinux and strong auditing policy, in case you wonder.

All that is to avoid some skilled people to be able to exploit a vulnerability in kippo and move further on in my network.

And then I let it run for a while ...

The deception

It turns out that, kippo is very popular and that means that most scanning scripts have evolved to detect kippo and quickly run away. 
Some scanner will flag you as honeypot if the password is too easy (root/root), some will try to run commands that kippo doesn't support, many use sftp to try to login ...
I had to use this fork instead, note that this fork also take care of json logging (logstash!) and ssh algorithm fingerprint which is also popular

The hit

After switching to a more capable version of kippo, it was like I opened a big malware tap, I collected 2898 samples, most of them (2101) are ELF-32 executable, 382 are 64 bit executable around 200 perl IRC bot and so on, all in all pretty good !
The 10 most common ones account for nearly 2/3 of the total:


183 1a8712007f9ef593044350226b829a9fb25f91ad ==> elf32
186 acbe528883175ce934df4edd4fff045a0e2d2d8f ==> elf32
187 bcf7c4b4621a6452f8ace5e1c0df78c71f7ae4bb ==> "C" source jessica_biel_naked_in_my_bed.c
187 dc063902fc457a2d13b0d91ebc4d508bf6bfd118 ==> elf32
188 ba61480ec4062c3386a7b1e559dab2f0baf5e98f ==> elf32 (always fdsfsfvff, left handed?)
188 ec22fac0510d0dc2c29d56c55ff7135239b0aeee ==> elf32
189 059964612c1ac7c928b81a99da67aed3f3a41865 ==> elf64 (always rewgtf3er4t)
189 44e569a191a5d7bd720c7af06c2fd81a501a245b ==> elf32 to replace udevd
190 0e76f4c72295fe851b775dac8c49ec53108f1df6 ==> elf64
236 27e67a31ffc2797340a02133a4bfab5584faa65d ==> elf32

There is a very popular vmsplice exploit (jessical biel ...), so far nothing spectacular all the sha1's existed on Virus total around the same date I received my 'copy'

The weird

Notable mention, some automated tools assumed that because the username / password combination succeeded the device on which it just log must be the expected ones. 
As an amusing side effect, I now have malware build for mips and ARM:

25dc278de8f8b80cea05e9afc4faac6df9b0638b  ==> MIPS32
2a63299784407db16cd3168a197fe57070b1ff83 ==> ARM
3e73c1a31580a6d0e65c4c8a436ec4be8f00c496 ==> MIPS32
4d90877a832ae21befd5a5556b2bfec3c2404c35  ==> ARM
7130128fc2bcafa4b4fe0ba6159399432d833dfd  ==> MIPS-I
7d0ab04aa3c835956d3fe6549ec7bf0223931468  ==> ARM
7e54ec563e186f225b2d04e0f8f1d28dffdd6fb1  ==> ARM
8103179432bcc189f81810bacd191e4440f3a0a3  ==> ARM
9b3b2c7eecad5ab0bf5cb37f094ceb951d7ae52c  ==> ARM
c421bfebe129aa4179e769b56fccb37bb026a1b2  ==> ARM (not stripped)
db79126d667109c3df6138d55b8669fe1a1f10f4  ==> MIPS32

Targeted to routers, ADSL modems, IP cameras and other embedded devices ...

The end?

I'm still working on that project as time allows, I'm very curious to understand the purpose of the MIPS/ARM malware, I suspect some of them change the dns settings to redirect / mitm the traffic of the appliance but I suspect there's more than that. 

Hadoop / Spark2 snippet that took way too long to figure out

This is a collection of links and snippet that took me way too long to figure out; I've copied them here with a bit of documentation in...