Skip to main content

Mutable Ideas

Reading compressed data with Spark using unknown file extensions

This post could also be called Reading .gz.tmp files with Spark. At Socialmetrix we have several pipelines writing logs to AWS S3, sometimes Apache Flume fails on the last phase to rename the final archive from .gz.tmp to .gz, therefore those files are unavailable to be read by SparkContext.textFile API. This post presents our workaround to process those files.

## Our Problem

The diagram below contains the sink part of our architecture:

Pipeline Flume to S3

  1. Flume is listening to an AMQP queue, dequeuing logs as soon they arrive;

  2. Each 10 minutes, Flume Gzip the accumulated content and save to a S3 bucket;

  3. For some unknown reason at this moment, some files doesn’t end up with the final desired extension .gz, instead it is saved with .gz.tmp extension.

  4. If you try to read this files with Spark (or Hadoop) all you gonna get is gibberish. Because any unknown extension is defaulted to plain-text.

## Our Workaround

The reason why you can’t read a file .gz.tmp is because Spark try to match the file extension with registered compression codecs and no codec handlers the extension .gz.tmp !!

Having this in mind, the solution is really easy, all we had to do was to extend GzipCodec and override the getDefaultExtension method.

Here is our TmpGzipCodec.scala:

package smx.ananke.spark.util.codecs

import org.apache.hadoop.io.compress.GzipCodec

class TmpGzipCodec extends GzipCodec {

  override def getDefaultExtension(): String = ".gz.tmp"

}

Now we just registered this codec, setting spark.hadoop.io.compression.codecs on SparkConf:

val conf = new SparkConf()

// Custom Codec that process .gz.tmp extensions as a common Gzip format
conf.set("spark.hadoop.io.compression.codecs", "smx.ananke.spark.util.codecs.TmpGzipCodec")

val sc = new SparkContext(conf)

val data = sc.textFile("s3n://my-data-bucket/2015/09/21/13/*")

Now it is just a matter of assembly this codec as part of your project, in our case sbt assembly and run your code as usual.

## Final Note

From the tests we ran on our environment, registering this Codec does not affect Spark’s default configuration, so we still can process extensions .gz, .bz2, etc.