jeudi 14 mai 2020

Hadoop / Spark2 snippet that took way too long to figure out


This is a collection of links and snippet that took me way too long to figure out; I've copied them here with a bit of documentation in the hope that others find it useful.

Copy data from one cluster to another

This snippet was made to copy large (42TB) from one cluster to another, the default options don't play very well here.
We basically increase the number of mapper (-m100 seems to work well with 40 data nodes), increase the timeout (-D mapred.task.timeout) run in update mode to be able to recover and retry (-update), ignore error (-i), log the errors to hdfs for investigation (-log ) and we also disable the crc check (-skipcrccheck) as we are copying between 2 different encryption zones.

hadoop distcp  -D mapred.task.timeout=1800000 -pb -pp -m100 -update -copybuffersize 65536 -i -log /application/distcp -skipcrccheck -v  hdfs://namenode1.com/application/logs/year=2018/month=11 /application/logs/year=2018/month=11

A snippet of python to convert data from Avro (old cluster) to parquet (new cluster)

Note that this requires Spark 2.3 and to run it you'll need to add an extra package to be able to read the Avro files:

/bin/spark-submit --queue batch_queue --packages com.databricks:spark-avro_2.11:3.2.0 sparkConversion.py
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, types
from pyspark.sql.functions import udf
import logging

logger = logging.getLogger()

appName = "spark test"

conf = SparkConf().setAppName(appName)
conf.set('spark.hadoop.avro.mapred.ignore.inputs.without.extension', 'false')

sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
sqlContext = SQLContext(sc)

df = sqlContext.read.format("com.databricks.spark.avro").load("/application/day=26")
logger.info("Data frame loaded, it contains {0} element".format(df.count()))

df.write.format("org.apache.spark.sql.execution.datasources.parquet").save("/application/app1/year=2018/month=11/day=26/")



The starting point for _start_ script:

In order this initialize:
  1. Kerberos from a keytab
  2. submit the job in client mode (usually the best for reading from HDFS)
  3. force the use of the official HortonWorks repo to pull external artefacts
  4. add a property file to override some Spark defaults
  5. setup an HTTP proxy
  6. force the use of a given jaas file for Kafa connection
  7. pass on the external python modules in a zip
  8. invoke the job from the src directory


#!/bin/bash -xp

export SPARK_MAJOR_VERSION=2
#export PYSPARK_PYTHON=python3.4

BASE_DIR=$(dirname $0)
TOOL_DIRECTORY=$(realpath ${BASE_DIR})
ROOT_DIRECTORY=$(realpath ${BASE_DIR}/../)
DIST_DIRECTORY=$(realpath ${TOOL_DIRECTORY}/../dist/)
SRC_DIRECTORY=$(realpath ${TOOL_DIRECTORY}/../src)

cd  ${TOOL_DIRECTORY}

kinit -kt ~/user.keytab user@REALM.NET

/bin/spark-submit --master=yarn --deploy-mode=client \
--repositories http://repo.hortonworks.com/content/groups/public \
--properties-file ${TOOL_DIRECTORY}/spark-streaming.conf \
--executor-memory 4g \
--driver-memory 60g \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./kafkajaas.conf -Dhttp.proxyHost=proxy.net -Dhttp.proxyPort=3128 -Dhttps.proxyHost=proxy.net -Dhttps.proxyPort=3128 -Dhttp.nonProxyHosts='localhost|*.realm.net|10.*.*.*'  -Djava.io.tmpdir=/scratch/tmp " \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafkajaas.conf -Djavax.net.ssl.trustStore=cacerts.jks -Dhttps.proxyHost=proxy.net -Dhttps.proxyPort=3128 -Dhttp.proxyHost=proxy.net -Dhttp.proxyPort=3128 -Dhttp.nonProxyHosts='localhost|*.realm.net|10.*.*.*'  -Djava.io.tmpdir=/scratch/tmp" \
--keytab ~/user.keytab \
--principal user@REALM.NET \
--supervise \
--py-files  /scratch/libs.zip \
${SRC_DIRECTORY}/$1

Problem; reading from a remote HDFS cluster which has Kerberos enabled:
You start the spark job on a DR/newer cluster and you want to access data located on the other DR site or on an older cluster and has soon as Spark tries to access the remote FS you are greeted with this error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5.0 (TID 42, host3.net, executor 1): java.io.IOException: DestHost:destPort host1.net:8020 , LocalHost:localPort host2.net/10.4.16.51:0. Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]

hdfs dfs commands perform exactly as they should and the user can access the files using other methods, only Spark fails.
This error means that Spark failed to obtain a delegation token when accessing the HDFS RPC services on the remote cluster...
So what is a delegation token?

When a cluster is Kerberized, applications need to obtain tickets prior to access services and run jobs, Kerberos is not only used for authentication it is also used to allow users to access services like a MapReduce job.
If all the worker's tasks have to authenticate via Kerberos using a delegated TGT (Ticket Granting Ticket), the Kerberos Key Distribution Center (KDC) be overwhelmed by the amount of request so Delegation Tokens are introduced as a lightweight authentication method to complement Kerberos authentication.
The way Delegation Tokens works is:

1. The client initially authenticates with each server via Kerberos, and obtains a Delegation Token from that server.
2. The client uses the Delegation Tokens for subsequent authentications with the servers instead of reaching out to the KDC again.


In this case, the service Spark was not able to get the delegation tokens for HDFS and YARN service(s) which are two main components for writing data and running jobs.

So we need to force Spark to obtain a delegation token for both clusters, we can do this by overwriting a property (as always) using
spark.yarn.access.hadoopFileSystems=hdfs://,hdfs://

Note that Spark must have access to the filesystems listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm), then Spark will acquire security tokens for each of the file systems so that the Spark application can access those remote Hadoop file systems.
This also means that you can only list *ACTIVE* name nodes in that property and not the standby ones.



Useful resources

  • http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/ for all things related to monitoring your Spark jobs once they reach production maturity.
  • https://steveloughran.gitbooks.io/kerberos_and_hadoop/content/ on all things Kerberos and hadoop-ish. REALLY A MUST.
  • https://spark.apache.org/docs/latest/running-on-yarn.html#kerberos on Spark & Kerberos.
  • https://bigdatapath.wordpress.com/2018/01/09/hadoop-delegation-tokens-explained/ on delegation tokens.
  • http://lzhbrian.me/mywiki_mkdocs/pyspark/ convenient PySpark snippet to get you started the right way

Hadoop / Spark2 snippet that took way too long to figure out

This is a collection of links and snippet that took me way too long to figure out; I've copied them here with a bit of documentation in...