And I did a small example, processing a JSON containing Facebook FQL data - just to try something different of Twitter Statuses JSON ;)
It was incredible easy to process even those complex JSON, thanks to the Schema inference for JSON available on sqlContext.
I’ve been able to do a quick Likes by Day count and save it to Parquet format in just a few lines of code.
valsqlContext=neworg.apache.spark.sql.SQLContext(sc)importsqlContext._valpostsRDD=sqlContext.jsonFile("/Users/arjones/Projects/spark-summit/mydata/facebook/posts/2014/05/14/14/facebook.1400076455720")// Checking the Schemaprintln(postsRDD.schemaString)// register SchemaRDD as tablepostsRDD.registerAsTable("posts")// run a SQL query and creates another SchemaRDDvallikesPerPost=sql("SELECT ((smx.crawled_time / 3600) * 3600) AS crawled_at, like_info.like_count AS likes FROM posts WHERE like_info.like_count > 0").cache()likesPerPost.registerAsTable("likesPerPost")// Run a aggregation queryvallikesPerHour=sql("""SELECT crawled_at,SUM(likes) AS likesFROM likesPerPostGROUP BY crawled_at""")// print resultslikesPerHour.collect().foreach(println)// save them to a Parquet filelikesPerHour.saveAsParquetFile("fb-posts-by-hour.parquet")
There where two things I couldn’t figure out so far:
I’m using .cache() method but looking into the logs, seems Spark is still reading JSON file each time I run any command.
I couldn’t register a function to round date by hour, I couldn’t find it even grep’ing the snapshot code base. ie: $ grep -r -i "registerFunction" *