Processing 150 Mn short files with Hadoop and Spark

This is my journey in building a data pipeline on Amazon Elastic Map Reduce cluster to analyze historic sensor data.

Overview: The objective is to analyze historic sensor data saved on AWS s3 in hourly format. To save data in hourly format made sense from the business use case stand point but as we know Hadoop Map Reduce performance gets effected with many small files.

Just to give an idea for a single sensor for an year: 24*365 = 8760 files. We can linearly extrapolate the number of files with number of sensors.

Example : 10,000 Sensors Approx for 5 years (8760*5*10000) i.e, 435 Million files

Photo by Mr Cup / Fabien Barral on Unsplash

Choice of tools:

To perform the analysis after doing the research and based on the skills the following options were chosen:

  1. EMR (Elastic Map Reduce) service provided by AWS is chosen for the analysis as it makes life very easy in setting up the Hadoop cluster and scale it as needed.
  2. EMR also provides options to add bootstrap scripts to install required packages on the nodes as they are added to the cluster.
  3. AWS is preferred over other public cloud providers as the data is on s3 and pricing for internal region data transfer is free of charge, other than S3 Get requests.

Key thing to keep in mind is the size of the cluster, mainly the number of core data nodes based on the replication factor of the HDFS file system.

Example: If you are downloading 100 GB of data and the replication factor is 3, make sure that data Core nodes in combined have at least 350 GB.

Also AWS is chosen for few other reasons, which I will highlight down the line.

After choosing the infrastructure, to download the s3 data on to the HDFS file system,tools that we found to do the s3 download at scale are:

  1. aws cli s3 cp: By setting default.s3.max_concurrent_requests on the master node. In this case the master node should be able to hold the entire data or we need to constantly move the data from local file system to HDFS , which also adds overhead.
  2. Distcp : DistCp (distributed copy) is bundled with Hadoop and uses MapReduce to copy files in a distributed manner. This is much faster than the above approach.
  3. s3distCP: s3 Distcp is optimized for s3,however, I could only see a very minor improvement from DistCP in throughput.

Testing the small amount of data using the above tools picked s3 DistCP, In a hope to observe improved performance at scale.

Then the next step comes to Aggregation of hourly data to monthly or yearly to avoid small file problem.

“Map tasks usually process a block of input at a time (using the default FileInputFormat). If the file is very small and there are a lot of them, then each map task processes very little input, and there are a lot more map tasks, each of which imposes extra bookkeeping overhead. Compare a 1GB file broken into 16 64MB blocks, and 10,000 or so 100KB files. The 10,000 files use one map each, and the job time can be tens or hundreds of times slower than the equivalent one with a single input file.”

Read this blog to understand small file problem

s3distCP provides options to aggregate data based on folder structure of the s3 using the groupBy command. This is very helpful in case if your data needs to be aggregated.

s3-dist-cp -s3Endpoint -src xxxxxxxxxxxx $sensor_id -dest $Intermediate_Path groupBy ‘.*(‘$month’).*’ -targetSize 1024-outputCodec=none

Also the number mappers or reducers for a cluster is based on the mapred-site configuration.

Example: For a m5 Xlarge default number of mappers are yarn.scheduler.maximum-allocation mb/ , i.e: 12288/3072 default 4 mappers per node.


Note: One thing to keep in mind is newline “\n” , while aggregation using groupBy s3DistCp doesn’t add new line while appending the files. We need to perform this operation manually later, either using Hadoop streaming or bash command line based on the regex pattern.

Also in-case if your files on s3 are gzipped then you need to unzip it and then apply the newline step.

After this step you will have your data in HDFS file system . Please re-verify , if there are any corrupt or missing blocks using:

hadoop fsck /path/to/files/

hdfs fsck -list-corruptfileblocks /path/to/files/

If there are any corrupt files or missing blocks recheck the replication is 100% . one way to fix the corrupt blocks is to delete them

hadoop fsck /path/to/files/ -d

I have faced this issue, when I used EC2 spot instances for data nodes and they are decommissioned from the cluster. So I would highly recommend to use on-demand instances for Data nodes , if no data loss is acceptable for your analysis.

Once you reach here, the major chunk of work is done and the fun stuff starts…

You can attach a Jupyter notebook to your cluster, EMR provides the option to do that.

Caveat: Make sure all the python packages that are being used in the Map Reduce functions are installed on all the core nodes and task nodes using the bootstrap script.

Spark session is by default created in YARN mode, when you import pyspark in the EMR notebook. If you want to use local mode you need to redefine the session with local mode.

In the next article, I would introduce few more bottlenecks when working with large scale data and dealing with them using python pure functions.

Happy Spark-lings with PySpark and Hadoop!

Photo by Erwan Hesry on Unsplash