MP4: Spark Streaming

Introduction

This MP will introduce Spark Streaming. You will be writing some simple Spark applications that process streaming data from Twitter and Reddit.

You’ll be using the DStream API writing streaming jobs. Note that recent versions of Spark have another API for writing applications called Structured Streaming, but we won’t be using that this week.

Example

Here’s an example of a Spark Streaming application that does “word count” on incoming Tweets

import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

# Initialize the spark streaming context
conf = SparkConf().setAppName("Word count")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # Use 10 second batch duration
ssc.checkpoint('streaming_checkpoints')

# Stream window tunable parameters
window_length = 900 # The size of each window "slice"
slide_interval = 30 # Interval to execute operation


# Parse tweets and return the text's word count
def get_tweet_word_count(tweet_json):
    try:
        data = json.loads(tweet_json)
        return data['text'].split(' ')
    except:
        return []


# Listen to the tweet stream
tweet_json_lines = ssc.socketTextStream("<stream-source-url>", 8000)

# Map json tweets to word counts
tweet_word_counts = tweet_json_lines.flatMap(get_tweet_word_count).map(lambda x: (x, 1))

# Do a windowed aggregate sum on the word counts
windowed_word_count = tweet_word_counts.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, window_length, slide_interval)

# Save output to Hadoop
windowed_word_count.saveAsTextFiles('tweet_word_count')


# Signal to spark streaming that we've setup our streaming application, and it's ready to be run
ssc.start()

# Run the streaming application until it's terminated externally
ssc.awaitTermination()

Resources

For this MP, you’ll find the following resources useful:

Stream Formats

Twitter

You will receive Tweets as a JSON blob with 1 tweet per line with the following schema:

{
    "text": "<tweet body>"
}

This stream can be accessed via TCP on ip-172-31-25-25.ec2.internal:8000 from within the cluster.

If you want to look at the raw stream, you can do so by running telnet ip-172-31-25-25.ec2.internal 8000 on the course cluster.

Reddit

You will receive Reddit comments as a JSON blob with 1 comment per line with the following schema:

{
    "text": "<comment body>",
    "subreddit": "<subreddit name>",
    "author": "<comment author>"
}

This stream can be accessed via TCP on ip-172-31-25-25.ec2.internal:8001 from within the cluster.

If you want to look at the raw stream, you can do so by running telnet ip-172-31-25-25.ec2.internal 8001 on the course cluster

Tips

You can “listen” to the stream by running telnet <stream_address> <stream_port>. Spark Streaming considers each line as a distinct record, so you’ll see a JSON blob on each line.

MP Activities

Write a Spark Streaming application that listens to an incoming stream of Tweets and counts the number of times each Hashtag is used within intervals of 60 seconds. Return a DStream that contains the top 10 most used hashtags in the given stream interval. Evaluate results every 10 seconds. (Hint, use a windowed operation)

For example, if we saw stream batches like this:

# BATCH 1: t=[0, 10)
Testing my cool application #HelloWorld
#HelloWorld Foo #Bar
Here's another Tweet #HelloWorld

# BATCH 2: t=[10, 20)
Some more #tweets #HelloWorld

# BATCH 2: t=[20, 30)
Even more #tweets #HelloWorldAgain

# BATCH 4: t=[30, 40)
Hello #Bar, here's some #tweets

# BATCH 5: t=[30, 50)
No hashtags here

# BATCH 6: t=[50, 60)
None here either

# BATCH 7: t=[60, 70)
Here's a #hashtag

Then our hashtag count results would look like this:

# BATCH 1: t=[0, 10)
("#HelloWorld", 3)
("#Bar", 1)

# BATCH 2: t=[10, 20)
("#HelloWorld", 4)
("#Bar", 1)
("#tweets", 1)

# BATCH 2: t=[20, 30)
("#HelloWorld", 4)
("#Bar", 1)
("#tweets", 1)

# BATCH 4: t=[30, 40)
("#HelloWorld", 4)
("#Bar", 2)
("#tweets", 2)

# BATCH 5: t=[40, 50)
(("#HelloWorld", 4)
("#Bar", 2)
("#tweets", 2)

# BATCH 6: t=[50, 60)
("#HelloWorld", 4)
("#Bar", 2)
("#tweets", 2)

# BATCH 7: t=[60, 70)
("#HelloWorld", 1)
("#Bar", 1)
("#tweets", 2)
("#hashtag", 1)

Running your code:

spark-submit trending_hashtags.py --input_stream ip-172-31-25-25.ec2.internal --input_stream_port 8000 trending_hashtags_out

Problem 2 - Subreddit Topics

Note that Reddit comments have a field called “subreddit” that indicates on which forum on Reddit the comments were posted. Write a Spark Streaming application that listens to an incoming stream of Reddit comments and outputs the top 10 most used word in comments per each subreddit. Normalize the comments by transforming them to all lowercase, and use the stopwords list provided in the solution template to remove common words. Use the same “word regex” as in the word count examples in MP1. Order does not matter when outputting either the subreddit keys, or the popular words within each key.

For this problem, use an evaluation window length of 900 seconds, and evaluate the stream results every 10 seconds. (Hint, use a windowed operation)

For example, if we saw a stream batch like this:

SUBREDDIT           TEXT
/r/todayilearned    Today, I learned something
/r/todayilearned    we learned something else
/r/uiuc             my life is a corn field
/r/pics             I took this picture

Our output for this batch would be:

("/r/todayilearned", ("today", "learned", "something", "else"))
("/r/uiuc", ("life", "corn", "field"))
("/r/pics", ("took", "picture"))

Running your code:

spark-submit subreddit_topics.py --input_stream ip-172-31-25-25.ec2.internal --input_stream_port 8001 subreddit_topics_out

Problem 3 - Reddit Bot Detection

Write a Spark Streaming application that listens to an incoming stream of Reddit comments and detects users that post multiple similar comments within a given stream period. This problem will not be autograded for accuracy, so feel free to come up with your own criteria for detecting bots given the Reddit comment stream.

As a suggestion, here are some criteria that you may consider:

The output of your program should be an RDD of users that have comment histories matching whatever criteria you decide to use.

(To sanity-check your output, you should probably see the “AutoModerator” user as a bot after your job has been running for a few minutes)

Running your code:

spark-submit reddit_bot_detection.py --input_stream ip-172-31-25-25.ec2.internal --input_stream_port 8001 reddit_bot_detection_out

Problem 4 - Additional Questions

Answer the following questions in questions.txt. Be sure to number your responses.

  1. Justify your choice of criteria for detecting bots in Problem 3.
  2. Briefly explain the significance of the batchDuration argument to the StreamingContext constructor and the windowLength and slideInterval arguments to windowed transformation functions.
  3. Briefly explain the difference between a normal RDD and a DStream.
  4. What is checkpointing in the context of Spark Streaming, and why might we choose to enable it? When are we forced to enable it?

Submission

MP 4 is due on Tuesday, February 20th, 2018 at 11:59PM.

You can find starter code and the Git repository where you should submit your code here:

If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.

Please write your solutions as indicated in the following files:

… and commit your code to Gitlab like this:

git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push

WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.