mardi 27 mars 2018

Spark streaming with Kerberos enabled on Ambari

In the scope of a research project, I'm parsing syslog logs to stuff them into HDFS (another post on that will follow) and one of the requirement is to have Kerberos enabled on the Kafa brokers, well it turns out it's a bit more trouble than what I asked for and not so well documented.
So here's a brain dump in case you find yourself in the same situation;

My pySpark(2) job flow is as follow, it uses a kafka stream to read from one topic, parse the log line (raw field), add some fields and then emit the resulting json message to another topic, I kept the thing close to what I'm doing so that it's a somewhat relevant example ad not just a simple word count that doesn't illustrate much in my mind.

setup the jaas file

To tell the jvm we're using Kerberos and what credentials to use and all that, once again all the example I've seen use password based auth which is irrelevant in any production context (again ymmv), so here's the jaas file:
Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="/home/user/user.keytab"
        useTicketCache=true
        serviceName="zookeeper"
        debug=true
        principal="user@DOMAIN.NET";
};
KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="/home/user/user.keytab"
        useTicketCache=true
        serviceName="kafka"
        debug=true
        principal="user@DOMAIN.NET";
};

Create a keytab

We need to create a keytab file, as referenced by the jass file,  you can create it with those instructions:
$ktutil
ktutil: addent  -password -p your_id97@BGC.NET -k 1 -e aes256-cts-hmac-sha1-96
password for user@DOMAIN.NET: (ktutil is prompting for the password)
ktutil:  wkt your_id.keytab
ktutil:  q
$kinit –kt your_id.keytab user@DOMAIN.NET

Now that the jvm kerberos setup is ok and the keytab is initialized we can do the adjustment in the python.

Setup spark streaming


ssc = StreamingContext(sc, 1)

 kafkaStream = KafkaUtils.createStream(ssc, 'zookeeper:2181', consumer_name, {'ingest-topic': 1}, 
  kafkaParams = {'security.protocol': 'PLAINTEXTSASL',
   'sasl.kerberos.service.name': 'kafka',
   'group.id': consumer_name,
   'rebalance.backoff.ms': '5000',
   'zookeeper.session.timeout.ms': '10000' },
  storageLevel = StorageLevel.MEMORY_AND_DISK_SER)

 kafkaStream.foreachRDD(handler)

The important bit for Kerberos is the 'kafkaParams', note that the security.protocol is *really* 'PLAINTEXTSASL', some other part of the eco system uses SASL_PLAINTEXT but this one uses PLAINTEXTSASL.
The sasl service name is linked to the content of the jaas file, so make sure they match as it's the way the jvm finds the other options required.

Setup kafka producer

Now we want to emit to Kafka, using KafkaProducer:
producer = KafkaProducer(security_protocol = "SASL_PLAINTEXT",
  sasl_mechanism="GSSAPI",
  bootstrap_servers="kafkabroker:6667",
  client_id=consumer_name)
Note that this time the security protocol is "SASL_PLAINTEXT" and no longer PLAINTEXTSASL, even though it's 2 different libraries, a bit of consistency would be nice, note that you'll need GSSAPI libraries (dev) and python gssapi modules for it to support Kerberos.

Fire it up

Now we need to start our spark job with the following invocation:
Launch script:
#!/bin/bash -xp
export SPARK_MAJOR_VERSION=2
/bin/spark-submit --master=yarn --deploy-mode=cluster \
--files kafka_jaas.conf,myuser.keytab \
--repositories http://repo.hortonworks.com/content/groups/public \
--packages  org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1.2.6.1.0-129 \
--properties-file spark-streaming.conf \
--driver-memory 5g \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_jaas.conf -Djavax.net.ssl.trustStore=ca_certs.jks" \
--py-files main.py

The important bits here are the extraJavaOptions to point to the jass file we want to use and the packages option that tells spark to load extra jars to allow Kafka streaming to work.
Note that running a pyspark job like this isn't really practical, I recommend using something like https://github.com/ekampf/PySpark-Boilerplate that setups a zip file with all your data file and deps, remember since the spark job will be distributed all over your workers, they all need the same exact copy of the script and dependencies.
You'll also need to bootstrap the keytab somewhere in your job, simply using
print '[+] Setting up Kerberos: \n%s\n' % (subprocess.check_output(['kinit', '-kt', 'user.keytab', 'user@DOMAIN.NET']))
did the trick for me.

One additional trick if it doesn't work is to add the following line before doing anything in your script:

Debugging

# to debug kerberos problems
import logging
logging.basicConfig(filename='/tmp/example.log',level=logging.DEBUG)
It will generate a massive amount of logs but at least it gives you some data to help you debug the thing.

Putting everything together

# Import dependencies
from __future__ import print_function
from pyspark import SparkContext, StorageLevel
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from kafka import KafkaProducer
import json
from pygrok import Grok
from datetime import *
import pytz
from dateutil.parser import *

consumer_name = "SyslogImporter"
syslog_pattern = "(<%{NUMBER:SyslogPriority}>)?%{TIMESTAMP_ISO8601:Timestamp}\s%{HOSTNAME:SourceHostname}\s({SYSLOGPROG:SyslogProgram}\[%{NUMBER:SyslogProgramId}\])?"

def grok_syslog(event):
    grok = Grok(syslog_pattern)
    raw = json.loads(event)
    parsed_syslog = grok.match(raw['Raw'])
    if parsed_syslog:
     parsed_syslog["tags"] = ["syslog"]
     parsed_syslog["parsers"] = [consumer_name]
    else:
     parsed_syslog = json.loads(event)
     parsed_syslog["Timestamp"] = parsed_syslog["IngestionTimestamp"]
     parsed_syslog["tags"] = ["not_syslog"]
     parsed_syslog["parsers"] = [consumer_name]    
    new_event = {key: value for (key, value) in (raw.items() + parsed_syslog.items())}
    return new_event

def parse_syslog(event):
    utc_tz = pytz.timezone('UTC')
    log = grok_syslog(event)
    ts = log['Timestamp']
    ts_dt = parse(ts)
    log['Timestamp'] = ts_dt.astimezone(utc_tz).isoformat()
    return log

def handler(message):
 records = message.collect()
 for raw_record in records:
  #saving the original to send it as such in case of exception
  syslog_record_string = raw_record[1]
  event_id = "0"
  try:
   syslog_record = parse_syslog(raw_record[1])
   event_id = syslog_record["Id"]
   syslog_record_string = json.dumps(syslog_record, ensure_ascii=False).encode('utf8')
  except Exception as e:
   print("[-] Something barfed" + str(e))
   print(raw_record)
   pass
  producer.send(b'work-topic', bytes(syslog_record_string))
  producer.flush()

def main():
 sc = SparkContext(appName=consumer_name) 
 ssc = StreamingContext(sc, 1)

 kafkaStream = KafkaUtils.createStream(ssc, 'zookeeper:2181', consumer_name, {'ingest-topic': 1}, 
  kafkaParams = {'security.protocol': 'PLAINTEXTSASL',
   'sasl.kerberos.service.name': 'kafka',
   'group.id': consumer_name,
   'rebalance.backoff.ms': '5000',
   'zookeeper.session.timeout.ms': '10000' },
  storageLevel = StorageLevel.MEMORY_AND_DISK_SER)

 kafkaStream.foreachRDD(handler)

 # Start the streaming context
 ssc.start()
 ssc.awaitTermination()

if __name__ == "__main__":
print '[+] Setting up Kerberos: \n%s\n' % (subprocess.check_output(['kinit', '-kt', 'user.keytab', 'user@DOMAIN.NET']))
 producer = KafkaProducer(security_protocol = "SASL_PLAINTEXT",
  sasl_mechanism="GSSAPI",
  bootstrap_servers="kafkabroker:6667",
  client_id=consumer_name)
 main()

Aucun commentaire:

Enregistrer un commentaire

Hadoop / Spark2 snippet that took way too long to figure out

This is a collection of links and snippet that took me way too long to figure out; I've copied them here with a bit of documentation in...