After our first day on Spark Summit 2014 I was very excited to try Spark SQL with JSON manipulation. So I download and compile the SNAPSHOT version of Spark to try this feature.
$ git clone firstname.lastname@example.org:apache/spark.git $ cd spark/ $ ./sbt/sbt assembly $ ./bin/spak-shell
And I did a small example, processing a JSON containing Facebook FQL data - just to try something different of Twitter Statuses JSON ;)
It was incredible easy to process even those complex JSON, thanks to the Schema inference for JSON available on
I’ve been able to do a quick Likes by Day count and save it to Parquet format in just a few lines of code.
val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val postsRDD = sqlContext.jsonFile("/Users/arjones/Projects/spark-summit/mydata/facebook/posts/2014/05/14/14/facebook.1400076455720") // Checking the Schema println(postsRDD.schemaString) // register SchemaRDD as table postsRDD.registerAsTable("posts") // run a SQL query and creates another SchemaRDD val likesPerPost = sql("SELECT ((smx.crawled_time / 3600) * 3600) AS crawled_at, like_info.like_count AS likes FROM posts WHERE like_info.like_count > 0").cache() likesPerPost.registerAsTable("likesPerPost") // Run a aggregation query val likesPerHour = sql(""" SELECT crawled_at, SUM(likes) AS likes FROM likesPerPost GROUP BY crawled_at """) // print results likesPerHour.collect().foreach(println) // save them to a Parquet file likesPerHour.saveAsParquetFile("fb-posts-by-hour.parquet")
There where two things I couldn’t figure out so far:
- I’m using
.cache()method but looking into the logs, seems Spark is still reading JSON file each time I run any command.
- I couldn’t register a function to round date by hour, I couldn’t find it even grep’ing the snapshot code base. ie:
$ grep -r -i "registerFunction" *
Here is the shell transcription: