Mutable Ideas

Notes and ideas about Java, Scala, Big Data, NoSQL, Quality and Software Deploy

Processing Twitter’s Top Stories With Apache Spark (Part 1)

Since I got to know News.me it surprises me with its simplicity and yet power to recommend me the best stories to read. I always thought it would be fun try to build something similar. So I decided to create a PoC of Twitter’s top stories using Apache Spark.

DISCLAIMER: this is a PoC, mainly focused on learning Spark, this architecture doesn’t represent a production level product neither I consider recommending stories for only one user as a big data problem.

TL;DR;

The code is available at Github, just create a valid Twitter API credentials and you can run it.

Solution

The solution is based on two stages:

  1. Collect tweets from the stream, analize them and store those tweets whthat contains a link, expanding the link to its final destination (removing shortening and click counters)

  2. Run a batch job to process the data from previous stage and create a top 10 list

img

Building & Running

I use Eclipse + Scala and typesafe plugin sbteclipse to create an Eclipse project.

1
2
$ git clone git@github.com:arjones/spark-news.git
$ cd spark-news

Now edit src/main/resources/twitter4j.properties.template adding your credentials and rename it to src/main/resources/twitter4j.properties:

twitter4j.oauth.consumerKey= //
twitter4j.oauth.consumerSecret= //
twitter4j.oauth.accessToken= //
twitter4j.oauth.accessTokenSecret= //

Let’s build our project:

1
2
3
4
$ sbt/sbt
> eclipse with-source=true
> assembly
> exit

And run it:

1
2
3
4
$ spark-submit \
 --class io.smx.spark.news.CollectLinksFromStreamDrive \
 target/scala-2.10/SparkNews-assembly-0.1-SNAPSHOT.jar \
 following.txt

Edit following.txt adding accounts that you find interesting!

If everything is working properly, each 5 minutes you going to see a new folders at ./urls/999999999/ the numbers represent the unix timestamp, rounded down to minute. For example:

urls/1404627900/ --> 7/5/2014 11:25:00 PM GMT-7

You can check the activity, using Spark UI

Internals

Setup a StreamingContext with a 5 minutes window, load the accounts and create the Twitter Stream

1
2
3
// Setup the Streaming Context
val ssc = new StreamingContext(new SparkConf(), Seconds(300))
val tweets = TwitterUtils.createStream(ssc, None, followingList)

For each Tweet that contains a URL, extract it and if there are more than one url, extracts only the first:

1
2
// Consider only 1st URL on the Tweet
val url = URLExpander.expandUrl(status.getURLEntities.head.getExpandedURL)

Use SparkSQL to implicit convert a RDD[TweetContent] into a Schema RDD:

1
2
val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext._

So we can go through each SchemaRDD and saveAsParquet to disk

1
2
3
4
5
6
urlsDStream.foreach { schemaRDD =>
  ...
  ...
  ...
  links.saveAsParquetFile("urls/" + folderTimestamp) + ".parquet"
}

What’s next?

Leave this code running for a few hours so we have data for the next part, which involves calculating the best stories!