Spark SQL is a powerful way for interacting with large amounts of structured data. Spark SQL gives us the concept of “DataFrames”, which will be familiar if you’ve ever done work with Pandas or R. DataFrames can also be thought of as similar to tables in databases. Since the release of Spark 2.0, DataFrames have become increasing in popularity, and are often chosen over using RDDs.
With Spark SQL DataFrames we can interact with our data using the Structured Query Language (SQL). This gives us a declarative way to query our data, as opposed to the imperative methods we’ve studied in past weeks (i.e. discrete operations on sets of RDDs)
The Spark SQL documentation describes DataFrames as follows:
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
SQL is a declarative language used for querying data. The simplest SQL query is a SELECT - FROM - WHERE
query. This selects a set of attributes (SELECT
) from a specific table (FROM
) where a given set of conditions holds (WHERE
).
However, SQL also has a series of more advanced aggregation commands for grouping data. This is accomplished with the GROUP BY
keyword. We can also join tables on attributes or conditions with the set of JOIN ... ON
commands. We won’t be expecting advanced knowledge of these topics, but developing a working understanding of how these work will be useful in completing this MP.
Spark SQL has a pretty good Programming Guide that’s worth looking at.
Additionally, you may find these SQL tutorials useful for this assignment.
The easiest way to get data into Spark SQL is by registering a DataFrame as a table. A DataFrame is essentially an instance of a Table: it has a schema (columns with data types and names), and data.
We can create a DataFrame by passing an RDD of data tuples and a schema to sqlContext.createDataFrame
:
data = sc.parallelize([('Alice', 1), ('Bob', 2), ('Eve', 3)])
df = sqlContext.createDataFrame(data, ['name', 'user_id'])
This creates a DataFrame with 2 columns: name
and user_id
.
We can then register this frame with the sqlContext to be able to query it generally:
df.createOrReplaceTempView("users")
Now we can query the table:
sqlContext.sql("SELECT name FROM users WHERE user_id=3")
Suppose we want to find all the businesses located in Champaign, IL that have 5 star ratings. We can do this with a simple SELECT - FROM - WHERE
query:
sqlContext.sql("SELECT * "
"FROM businesses "
"WHERE stars=5 "
"AND city='Champaign' AND state='IL'").collect()
This selects all the rows from the businesses
table that match the criteria described in the WHERE
clause.
Alternatively, we can use the DataFrame API to perform this same query:
df.filter((df['stars'] == 5) & (df['city'] == 'Champaign') & (df['state'] == 'IL')).collect()
Suppose we want to rank users by how many reviews they’ve written. We can do this query with aggregation and grouping:
sqlContext.sql("SELECT user_id, COUNT(*) AS c"
"FROM reviews "
"GROUP BY user_id "
"ORDER BY c DESC "
"LIMIT 10").collect()
This query groups rows by the user_id
column, and collapses those rows into tuples of (user_id, COUNT(*))
, where COUNT(*)
is the number of collapsed rows per grouping. This gives us the review count of each user. We then do ORDER BY c DESC
to show the top counts first, and LIMIT 10
to only show the top 10 results.
Alternatively, we can use the DataFrame API to perform this same query:
df.groupBy(df['user_id']).count().orderBy(desc('count')).select(df['user_id'], 'count').limit(10).collect()
(Note that we have to import pyspark.sql.functions.desc
)
For each of the following problems, there is a function called setup_table
that you should complete first. This function should take the filename(s) provided, and create a DataFrame representing the dataset.
From here, you should register this DataFrame as a SparkSQL table, which you will use in your subsequent queries.
For this problem, we’ll construct some simple SQL queries on the Amazon Review dataset. Your first task is to create a DataFrame from the CSV data. Once you’ve done this, write queries that get the requested information about the data.
Note: For this problem, you must use sqlContext.sql
to run your queries. This means, you have to run sqlContext.registerDataFrameAsTable
on your constructed DataFrame and write queries in raw SQL.
22010
?B000E5C1YE
have?Running your code:
spark-submit quizzical_queries.py hdfs:///data/amazon/amazon_food_reviews.csv
sqlContext.read.csv(filename, header=True, inferSchema=True)
to parse your data.For this problem, we’ll use some more complicated parts of the SQL language. Often times, we’ll want to learn aggregate statistics about our data. We’ll use GROUP BY
and aggregation methods like COUNT
, MAX
, AVG
to find out more interesting information about our dataset.
ProductIds
of the products with the top 25 highest average review scores of products that each have more than 25 reviews, ordered by average product score, with ties broken by number of reviews.Ids
of the reviews with the top 25 highest ratios between HelpfulnessNumerator
and HelpfulnessDenominator
, which have HelpfulnessDenominator
greater than 10, ordered by that ratio, with ties broken by HelpfulnessDenominator
.Running your code:
spark-submit aggregation_aggrevation.py hdfs:///data/amazon/amazon_food_reviews.csv
sqlContext.read.csv(filename, header=True, inferSchema=True)
to parse your data.sqlContext.sql
, but you must still do all your computations on DataFrames.For this problem, we’ll switch back to the Yelp dataset. Note that you can use the very handy json DataFrame reader method to load in the dataset as a DataFrame.
There are some times that we need to access data that is split across multiple tables. For instance, when we look at a single Yelp review, we cannot directly get the user’s name, because we only have their id. But, we can match users with their reviews by “joining” on their user id. The database does this by looking for rows with matching values for the join columns.
You’ll want to look up the JOIN
(specifically INNER JOIN
) SQL commands for these problems.
Running your code:
spark-submit jaunting_with_joins.py hdfs:///data/yelp/yelp_academic_dataset_user.json hdfs:///data/yelp/yelp_academic_dataset_business.json hdfs:///data/yelp/yelp_academic_dataset_review.json
sqlContext.read.json()
to parse your data.sqlContext.sql
, but you must still do all your computations on DataFrames.MP 5 is due on Tuesday, February 27th, 2018 at 11:59PM.
You can find starter code and the Git repository where you should submit your code here:
If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.
Please write your solutions as indicated in the following files:
quizzical_queries.py
aggregation_aggrevation.py
jaunting_with_joins.py
… and commit your code to Gitlab like this:
git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push
WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.