Reliably utilizing Spark, S3 and Parquet: Everybody says ‘I love you’; not sure they know what that entails

Posts over posts have been written about the wonders of Spark and Parquet. How one can simply save the RDD/Dataframes in parquet format into HDFS or S3. In many cases the job output is persisted to HDFS volumes that are located on the same machines in the Spark cluster. However, HDFS come with a price:

  • Disk volume resources (in AWS called EBS –Elastic Block Store) are expensive
  • Disk volumes are limited by certain read/write throughput and number of I/O operations. These bottlenecks always need to be in check in order to ensure optimal performance. These in turn limits the jobs that may be run simultaneously.
  • Simply maintaining a cluster of HDFS (or any other) is time consuming

For these reasons we decided to go with S3 as deep storage for input and output of our data instead of HDFS. We chose Parquet as a file format for it’s obvious advantages over raw Jsons. There are other column based file formats around but it seems Parquet has the most community around it.

The documentation around Spark SQL and Parquet is sufficient in order to set up a data processing flow. While it works pretty much out of the box, there are additional tweaks to be done but most of all – a caveat – serious ones – which I will discuss in this post.

The first tweak for interacting with S3 is using S3a:// URI scheme rather than s3:// or s3n:// schemes, which are deprecated as well as simply slower in comparison to S3a. It is possible to acheive S3a integration for Spark with an ancient AWS SDK (1.7.4) bundled already with Spark or with newer AWS SDK version as described in a previous post. Once this is set up one can be sure that access to/from S3 is optimal. There is active work around Hadoop 2.8.x to improve the support for S3a introduced in 2.7.x, so further improvements can be expected in the future.

The second tweak which is often brought up is changing the Hadoop config mapreduce.fileoutputcommitter.algorithm.version to “2”. This tweak will reduce the time the driver application waits until all files are moved from temporary folder to main folder, by writing directly to final output folder and thus deprecating the final rename step. This page elaborates further about this topic and is worth a read.

So now after we tweaked some essentials, persisting RDDs/DataFrames to S3 should work  smoothly. And it does… until it doesn’t.

Soon enough after running a job with many tasks that writes a DF into S3, it is highly probable that some tasks will fail and yield all kinds of undeterministic FileNotFound exceptions, as described in this Hadoop issue. The problem stems from the executed output committer operations. They were originally intended for HDFS,  a (distiubted) File system, but at a later stage ported to S3, which is not a file system.

Why is S3 not a file system?

  • changes to the file structure are not reflected immediately
  • operations are not atomic
  • some operations, such as RENAME should take constant time but in S3 these operations are in proportion to the data

In order to overcome these limitations of S3, and in particular its eventual consistency model, a feature called S3guard will be introduced.  This feature uses an external non eventual consistency store, such as DynamoDB (chosen for its kinship to AWS) or HDFS. This store is used to supply an up-to-date view on the files that should be in S3. Work is done under this ticket to develop a committer which uses S3guard. According to the Jira ticket  much of the work has already been done, and planeed release versions are 2.9.x and 3.x. However, looking at latest the releases of Hadoop this could take time until it’s production ready: Hadoop 2.8.x line is still not production ready though it has been more than 6 months since its 2.8.1 release, and 3.x line has only recently gone into beta stage.

So what can be done in the meantime, I hear you ask. The way i see it there are three options:

  • Suck it up. An occasional failing task is tolerable, especially in light of Spark’s ability to replay tasks that fail. However, this will increase the overall running time of the job and also feels shaky. As explained, due to eventual consistency changes to S3 are made but not reflected, so when a task fails and is re-executed, it might (or might not) cause issues with persisted data integrity.
  • This open-source committer from an employee of Netflix is actually the inspiration for S3guard, but it is not maintained for some years already (since work on it has kinda moved to Apache team). I try to avoid unmaintained libraries for fast-moving projects so this option was off the table.
  • Luckily a recently updated and most importantly working S3 committer exists – another committer developed by an employee of Netflix. Though it eventually works and so far has proved useful, it does not come without a preparation process. In the rest of the post I will explain how to use it.

How does this committer work? At the end of each task, at the commitTask phase, the committer saves the task’s output into S3 using the multipart upload API, which means that data is already on S3, but not committed, which means it is not visible yet. Once a task output is uploaded, the committer saves to HDFS some necessary metadata about the upload. Once all tasks are done, at the commit job phase, the driver application will traverse over the metadata saved in HDFS previously and finalize them, thus making them visible.

The committer project exposes two specific committers: S3DirectoryOutputCommitter and S3PartitionedOutputCommitter which should be used when persisting non partitioned and partitioned data respectively.

In my fork of this s3 committer I made some further development necessary to work with Parquet in my setup:

  1. S3MultipartOutputCommitter should extend ParquetOutputCommitter instead of FileOutputCommitter. This is a spark requirement as can be seen here.
  2. Merged-in this essential commit from Scravy fixing a concurrency issue.
  3. Downgraded AWS and upgraded Hadoop versions according my cluster setup. If multiupload API is not available in your AWS SDK version you will need to upgrade it.

These are steps I executed to read a final working result:

  1. Install HDFS datanodes on the spark slaves, and Spark is configured to use this HDFS setup.
  2. The S3 commiter is packaged and included in the classpath of spark. I usually place such Jars in the /lib folder of Spark, which is anyway scanned at startup.
  3. In order to speed up the commitJob phase it is possible to simply allocate more threads which will finalize the commits in parallel:
    sparkContext.hadoopConfiguration.set("s3.multipart.committer.num-threads", numThreads) 
    
  4. Use the required committer class:
    sparkContext.hadoopConfiguration.set("spark.sql.parquet.output.committer.class",classOf[S3DirectoryOutputCommitter].getName)
    

    or

    sparkContext.hadoopConfiguration.set("spark.sql.parquet.output.committer.class",classOf[S3PartitionedOutputCommitter].getName)
    
  5. make sure that “mapreduce.fileoutputcommitter.algorithm.version” is “1”, the default value. If this value is set to “2” the committer will fail. This has been observed and reported by others.

Phew, that was some work! If you are still with me a this point, then you should have an up and running Spark cluster safely commiting Parquet Dataframes/ RDDs to S3. On the long run I think it’s worth it – setting up such a pipeline is a hassle but the value of S3 as storage for Spark outputs is high.

5 thoughts on “Reliably utilizing Spark, S3 and Parquet: Everybody says ‘I love you’; not sure they know what that entails

  1. Hi,
    Very nice article, followed the steps and works locally, however when deployed to AWS EMR, getting:
    java.io.IOException: Mkdirs failed to create file:/mnt1/mapred/application_1515162689027_0001/_temporary/0/_temporary/attempt_20180105160947_0002_m_000097_0/mmsi=310 (exists=false, cwd=file:/mnt/yarn/usercache/hadoop/appcache/application_1515162689027_0001/container_1515162689027_0001_01_000050)

    Can you provide more info regarding this step:
    Install HDFS datanodes on the spark slaves, and Spark is configured to use this HDFS setup

    1. Hi Ozz,
      If you will attach the stacktrace, I could try to understand where the problem lies (driver or executor) and at which stage.

      In my setup, we are using the Spark standalone mode, with master and slaves. On the master node we have a namenode running, and on the executors we have datanodes connecting to this master.
      Additionally, one must tell Spark executors what is the local HDFS directory, using the same config files of HDFS, namely the core-site.xml and hdfs-site.xml.
      This is done by adding to the spark-env.sh the HADOOP_CONF_DIR parameter pointing to the folder containing these files. This I picked up from the official documentation. It is also relevant for yarn deployments if it helps you.

      I can also attach the config files themselves for reference per request, though they are standard hadoop deployment xmls.

      1. Stack trace below, it is taken from executor log:

        ERROR Utils: Aborting task
        java.io.IOException: Mkdirs failed to create file:/mnt1/mapred/application_1515162689027_0001/_temporary/0/_temporary/attempt_20180105160947_0002_m_000097_0/mmsi=310 (exists=false, cwd=file:/mnt/yarn/usercache/hadoop/appcache/application_1515162689027_0001/container_1515162689027_0001_01_000050)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793)
        at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:241)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.org$apache$spark$sql$execution$datasources$FileFormatWriter$DynamicPartitionWriteTask$$newOutputWriter(FileFormatWriter.scala:416)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:449)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:438)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator.foreach(AbstractScalaRowIterator.scala:26)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:438)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        18/01/05 16:09:57 ERROR S3MultipartOutputCommitter: Failed to delete task attempt data: file:/mnt1/mapred/application_1515162689027_0001/_temporary/0/_temporary/attempt_20180105160947_0002_m_000097_0
        18/01/05 16:09:58 WARN FileOutputCommitter: Could not delete hdfs://ip-10-22-1-147.eu-west-1.compute.internal:8020/tmp/application_1515162689027_0001/pending-uploads/_temporary/0/_temporary/attempt_20180105160947_0002_m_000097_0
        18/01/05 16:09:58 ERROR FileFormatWriter: Job job_20180105160947_0002 aborted.
        18/01/05 16:09:58 ERROR Executor: Exception in task 97.0 in stage 2.0 (TID 11308)
        org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        Caused by: java.io.IOException: Mkdirs failed to create file:/mnt1/mapred/application_1515162689027_0001/_temporary/0/_temporary/attempt_20180105160947_0002_m_000097_0/mmsi=310 (exists=false, cwd=file:/mnt/yarn/usercache/hadoop/appcache/application_1515162689027_0001/container_1515162689027_0001_01_000050)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
        at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:793)
        at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:241)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetOutputWriter.scala:37)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:159)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.org$apache$spark$sql$execution$datasources$FileFormatWriter$DynamicPartitionWriteTask$$newOutputWriter(FileFormatWriter.scala:416)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:449)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$2.apply(FileFormatWriter.scala:438)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator.foreach(AbstractScalaRowIterator.scala:26)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:438)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
        … 8 more

        1. Hi Ozz,
          The committer uses local HDFS folder so that the namenode (or any node) has full view of all of the pending multiupload parts initiated by the executors that need to be completed by the Driver application at the end of the job. Since this log is taken from the executors it seems there is a problem saving the multiupload data on the local datanode. Make sure the folder /mnt1/mapred/ is indeed your data directory which defines the local HDFS data, and not a regular local folder (which you might not have permissions to write to).

Leave a Reply

Your email address will not be published. Required fields are marked *