Mutable Ideas

Notes and ideas about Java, Scala, Big Data, NoSQL, Quality and Software Deploy

Processing JSON With Spark SQL

After our first day on Spark Summit 2014 I was very excited to try Spark SQL with JSON manipulation. So I download and compile the SNAPSHOT version of Spark to try this feature.

[EDIT]: Thanks to @BigsnarfDude, he posted about the same topic and referred to SQL json-datasets 1.0.1-rc1-docs. Although I still couldn’t figure out how to register a function on Spark SQL.

1
2
3
4
$ git clone git@github.com:apache/spark.git
$ cd spark/
$ ./sbt/sbt assembly
$ ./bin/spak-shell

And I did a small example, processing a JSON containing Facebook FQL data - just to try something different of Twitter Statuses JSON ;)

It was incredible easy to process even those complex JSON, thanks to the Schema inference for JSON available on sqlContext. I’ve been able to do a quick Likes by Day count and save it to Parquet format in just a few lines of code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val postsRDD = sqlContext.jsonFile("/Users/arjones/Projects/spark-summit/mydata/facebook/posts/2014/05/14/14/facebook.1400076455720")

// Checking the Schema
println(postsRDD.schemaString)


// register SchemaRDD as table
postsRDD.registerAsTable("posts")

// run a SQL query and creates another SchemaRDD
val likesPerPost = sql("SELECT ((smx.crawled_time / 3600) * 3600) AS crawled_at, like_info.like_count AS likes FROM posts WHERE like_info.like_count > 0").cache()

likesPerPost.registerAsTable("likesPerPost")

// Run a aggregation query
val likesPerHour = sql("""
SELECT 
crawled_at,
SUM(likes) AS likes
FROM likesPerPost
GROUP BY crawled_at
""")

// print results
likesPerHour.collect().foreach(println)

// save them to a Parquet file
likesPerHour.saveAsParquetFile("fb-posts-by-hour.parquet")

There where two things I couldn’t figure out so far:

  1. I’m using .cache() method but looking into the logs, seems Spark is still reading JSON file each time I run any command.
  2. I couldn’t register a function to round date by hour, I couldn’t find it even grep’ing the snapshot code base. ie: $ grep -r -i "registerFunction" *

Here is the shell transcription: