MP5: Spark SQL and DataFrames

Introduction

Spark SQL is a powerful way for interacting with large amounts of structured data. Spark SQL gives us the concept of “DataFrames”, which will be familiar if you’ve ever done work with Pandas or R. DataFrames can also be thought of as similar to tables in databases. Since the release of Spark 2.0, DataFrames have become increasing in popularity, and are often chosen over using RDDs.

With Spark SQL DataFrames we can interact with our data using the Structured Query Language (SQL). This gives us a declarative way to query our data, as opposed to the imperative methods we’ve studied in past weeks (i.e. discrete operations on sets of RDDs)

The Spark SQL documentation describes DataFrames as follows:

A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

SQL Crash Course

SQL is a declarative language used for querying data. The simplest SQL query is a SELECT - FROM - WHERE query. This selects a set of attributes (SELECT) from a specific table (FROM) where a given set of conditions holds (WHERE).

However, SQL also has a series of more advanced aggregation commands for grouping data. This is accomplished with the GROUP BY keyword. We can also join tables on attributes or conditions with the set of JOIN ... ON commands. We won’t be expecting advanced knowledge of these topics, but developing a working understanding of how these work will be useful in completing this MP.

Spark SQL has a pretty good Programming Guide that’s worth looking at.

Additionally, you may find these SQL tutorials useful for this assignment.

Examples

Loading Tables

The easiest way to get data into Spark SQL is by registering a DataFrame as a table. A DataFrame is essentially an instance of a Table: it has a schema (columns with data types and names), and data.

We can create a DataFrame by passing an RDD of data tuples and a schema to sqlContext.createDataFrame:

data = sc.parallelize([('Alice', 1), ('Bob', 2), ('Eve', 3)])
df = sqlContext.createDataFrame(data, ['name', 'user_id'])

This creates a DataFrame with 2 columns: name and user_id.

We can then register this frame with the sqlContext to be able to query it generally:

df.createOrReplaceTempView("users")

Now we can query the table:

sqlContext.sql("SELECT name FROM users WHERE user_id=3")

Specific Business Subset

Suppose we want to find all the businesses located in Champaign, IL that have 5 star ratings. We can do this with a simple SELECT - FROM - WHERE query:

sqlContext.sql("SELECT * "
               "FROM businesses "
               "WHERE stars=5 "
               "AND city='Champaign' AND state='IL'").collect()

This selects all the rows from the businesses table that match the criteria described in the WHERE clause.

Alternatively, we can use the DataFrame API to perform this same query:

df.filter((df['stars'] == 5) & (df['city'] == 'Champaign') & (df['state'] == 'IL')).collect()

Highest Number of Reviews

Suppose we want to rank users by how many reviews they’ve written. We can do this query with aggregation and grouping:

sqlContext.sql("SELECT user_id, COUNT(*) AS c"
               "FROM reviews "
               "GROUP BY user_id "
               "ORDER BY c DESC "
               "LIMIT 10").collect()

This query groups rows by the user_id column, and collapses those rows into tuples of (user_id, COUNT(*)), where COUNT(*) is the number of collapsed rows per grouping. This gives us the review count of each user. We then do ORDER BY c DESC to show the top counts first, and LIMIT 10 to only show the top 10 results.

Alternatively, we can use the DataFrame API to perform this same query:

df.groupBy(df['user_id']).count().orderBy(desc('count')).select(df['user_id'], 'count').limit(10).collect()

(Note that we have to import pyspark.sql.functions.desc)

MP Activities

Important Note

For each of the following problems, there is a function called setup_table that you should complete first. This function should take the filename(s) provided, and create a DataFrame representing the dataset.

From here, you should register this DataFrame as a SparkSQL table, which you will use in your subsequent queries.

1. Quizzical Queries

For this problem, we’ll construct some simple SQL queries on the Amazon Review dataset. Your first task is to create a DataFrame from the CSV data. Once you’ve done this, write queries that get the requested information about the data.

Note: For this problem, you must use sqlContext.sql to run your queries. This means, you have to run sqlContext.registerDataFrameAsTable on your constructed DataFrame and write queries in raw SQL.

Queries:

  1. What is the review text of the review with id 22010?
  2. How many 5-star ratings does product B000E5C1YE have?
  3. How any unique (distinct) users have written reviews?

Running your code:

spark-submit quizzical_queries.py hdfs:///data/amazon/amazon_food_reviews.csv

Notes:

2. Aggregation Aggravation

For this problem, we’ll use some more complicated parts of the SQL language. Often times, we’ll want to learn aggregate statistics about our data. We’ll use GROUP BY and aggregation methods like COUNT, MAX, AVG to find out more interesting information about our dataset.

Queries:

  1. How many reviews has the person who has written the most number of reviews written?
  2. List the ProductIds of the products with the top 25 highest average review scores of products that each have more than 25 reviews, ordered by average product score, with ties broken by number of reviews.
  3. List the Ids of the reviews with the top 25 highest ratios between HelpfulnessNumerator and HelpfulnessDenominator, which have HelpfulnessDenominator greater than 10, ordered by that ratio, with ties broken by HelpfulnessDenominator.

Running your code:

spark-submit aggregation_aggrevation.py hdfs:///data/amazon/amazon_food_reviews.csv

Notes:

3. Jaunting with Joins

For this problem, we’ll switch back to the Yelp dataset. Note that you can use the very handy json DataFrame reader method to load in the dataset as a DataFrame.

There are some times that we need to access data that is split across multiple tables. For instance, when we look at a single Yelp review, we cannot directly get the user’s name, because we only have their id. But, we can match users with their reviews by “joining” on their user id. The database does this by looking for rows with matching values for the join columns.

You’ll want to look up the JOIN (specifically INNER JOIN) SQL commands for these problems.

Queries:

  1. What is the maximum number of “funny” ratings left on a review created by someone who’s been yelping since 2012?
  2. List the user ids of anyone who has left a 1-star review, has created more than 250 reviews, and has left a review in Champaign, IL.

Running your code:

spark-submit jaunting_with_joins.py hdfs:///data/yelp/yelp_academic_dataset_user.json hdfs:///data/yelp/yelp_academic_dataset_business.json hdfs:///data/yelp/yelp_academic_dataset_review.json

Notes:

Submission

MP 5 is due on Tuesday, February 27th, 2018 at 11:59PM.

You can find starter code and the Git repository where you should submit your code here:

If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.

Please write your solutions as indicated in the following files:

… and commit your code to Gitlab like this:

git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push

WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.