As we have talked about in lecture, Spark is built on the concept of a Resilient Distributed Dataset (RDD).
PySpark allows us to interface with these RDD’s in Python. Think of it as an API. In fact, it is an API: check out its documentation. PySpark is built on top of the Spark’s Java API and exposes the Spark programming model to Python. It does this by using a library called Py4J
, which enables Python programs to dynamically access Java objects in a Java Virtual Machine.
This allows data to be processed in Python and cached in the JVM. This model lets us combine the performance of the JVM with the expressiveness of Python.
Note that Spark has many ways of processing data. One of the more exciting innovations of recent Spark versions is the “DataFrame”. However, this week we’ll only be using RDDs, which are the original unit of processing data in Spark.
rdd = sc.parallelize(rdd.take(100))
. This action pulls 100 items from the input RDD and creates a new RDD through the .parallelize
function. You may also use the rdd.sample
function if you wish to take a fraction of an RDD’s itemsYou aren’t limited to just these functions, here are some you should definitely know. (It may be more efficient to lookup other options!)
func
on the input value returned True
.reduce
works in Javascript)Other useful functions:
You should use the sparkContext.textFile
method to load in data using the filename provided to your function. Read more about the textFile
method in its documentation.
Similar to what we saw in the past weeks with the Hadoop Job web interface, Spark has a really useful web interface to view how jobs are executed. Accessing this web interface is similar to accessing the Hadoop interface.
UNIX users can access this UI by appending -L 18080:ip-172-31-7-221.ec2.internal:18080
to their ssh command, and then accessing localhost:18080
in their local web browser. Windows users can use PuTTY in a similar fashion.
For example, your “full” ssh command may look like this:
ssh -i /path/to/ssh_key <netid>@<cluster_address> -L 18080:ip-172-31-7-221.ec2.internal:18080
NOTE: To view “in-progress” jobs, you’ll need to click on the “Show incomplete applications” link at the bottom of the page.
We’ll be using spark-submit
to run our spark jobs on the cluster. spark-submit
has a couple command line options that you can tweak.
--master
FlagThis option tells spark-submit
where to run your job, as Spark can run in several modes.
--master local
--master yarn --deploy-mode client
--master yarn --deploy-mode cluster
In the output of spark-submit --master yarn --deploy-mode cluster
you’ll find an applicationId
. (This is similar to when you ran jobs on Hadoop). You can issue this command to get the logs for your job:
yarn logs -applicationId <YOUR_APPLICATION_ID> | less
When debugging Python applications, it’s useful to grep
for Traceback
in your logs, as this will likely be the actual debug information you’re looking for.
yarn logs -applicationId <YOUR_APPLICATION_ID> | grep -A 50 Traceback
NOTE: In cluster mode, normal “local” IO operations like opening files will behave unexpectedly! This is because you’re not guaranteed which node the driver will run on. You must use the PySpark API for saving files to get reliable results. You also have to coalesce your RDD into one partition before asking PySpark to write to a file (why do you think this is?). Additionally, you should save your results to HDFS.
<my_rdd>.saveAsTextFile('hdfs:///user/MY_USERNAME/foo')
In summary, use --master yarn --deploy-mode client
, unless we tell you not to. 😄
Submitting a Spark job will usually look something like this:
spark-submit --master yarn --deploy-mode client <my_spark_job>.py <job_arguments>
Because Spark isn’t tied to HDFS, we can run Spark using input data that exists on the local filesystem.
However, because we’re using Spark on EMR, the default is to use HDFS paths. We can get around this using the file://
prefix, and using an absolute path to the file we want to reference.
While spark-submit
is the way we’ll be endorsing to run PySpark jobs, there is an option to run jobs in an interactive shell. Use the pyspark
command to load into the PySpark interactive shell. You can use many of the same options listed above to tweak pyspark
settings, such as --num-executors
and --master
.
Note: If you start up the normal python
interpreter on the cluster, you won’t be able to use any of the PySpark features.
This week, we’ll be working off of a set of released Yelp data.
The dataset is located in /data/yelp
in HDFS. We’ll be using the following files for this MP:
/data/yelp/yelp_academic_dataset_business.json
/data/yelp/yelp_academic_dataset_checkin.json
/data/yelp/yelp_academic_dataset_review.json
/data/yelp/yelp_academic_dataset_user.json
We’ll give more details about the data in these files as we continue with the MP, but the general schema is this: each line in each of these JSON files is an independent JSON object that represents a distinct entity, whether it be a business, a review, or a user.
Hint: JSON is parsed with json.loads
This problem uses the yelp_academic_dataset_business.json
dataset.
In planning your next road trip, you want to find the cities that will be the least expensive to dine at.
It turns out that Yelp keeps track of a handy metric for this: many restaurants have the attribute RestaurantsPriceRange2
that gives the business a score from 1-4 on a scale of ‘priciness’. Note that the “attributes” section of the business schema is messy, so you’ll have to manually parse out the RestaurantsPriceRange2
value.
Write a PySpark application that determines the average price attribute of their businesses/restaurants.
Notes:
RestaurantsPriceRange2
attributeYour average restaurant price should be rounded to 2 decimal places. Each city should be output as a tuple of ("City, State", Price)
:
CITY_STATE, PRICE
Example:
("Champaign, IL", 1.23)
Example Data:
Input: ./data/yelp_business_sample.json
Output: ./data/city_expensiveness_sample_solution.txt
Running Your Code:
spark-submit --master local[2] city_expensiveness.py file://`pwd`/data/yelp_business_sample.json file://`pwd`/city_expensiveness
spark-submit --master yarn city_expensiveness.py hdfs:///data/yelp/yelp_academic_dataset_business.json city_expensiveness
This problem uses the yelp_academic_dataset_review.json
dataset.
In selecting a restaurant, you might want to find the review that Yelp users have decided is the best review. For each business in the review
dataset, find and output the review that has the greatest engagement. That is, find the review that has the greatest sum of useful
+ funny
+ cool
interactions. You will find these values as fields of the review.
Break ties by selecting the review ID with the lexicographically “largest” ID.
Your output should should be a RDD of tuples in the form (business_id, review_id):
```
BUSINESS_ID, REVIEW_ID
Example:
KYasaF1nov1bn7phfSgWeg EmuqmSacByt96t8G5GK0KQ
```
Example Data:
Input: ./data/yelp_review_sample.json
Output: ./data/engaging_reviews_sample_solution.txt
Running Your Code:
spark-submit --master local[2] engaging_reviews.py file://`pwd`/data/yelp_review_sample.json file://`pwd`/engaging_reviews
spark-submit --master yarn engaging_reviews.py hdfs:///data/yelp/yelp_academic_dataset_review.json engaging_reviews
For this activity, we’ll be looking at Yelp reviews. 😱 Namely, we want to find out which Yelp reviewers are more or less harsh than they should be.
To do this we will calculate the average review score of each business in our dataset, and find how far away users’ ratings are from the average of a business. You can find the rating for each review in the “stars” attribute of the review.
The average_business_rating
of a business is the sum of the ratings of the business divided by the count of the ratings for that business. A user’s review offset score is the sum of the differences between their rating and the average business rating. A positive score indicates that a user tends to give higher ratings than the average; a negative score indicates that the user tends to give lower ratings than the average.
Your output should be an RDD of tuples in the form (user_id, average_review_offset)
:
Notes:
yelp_academic_dataset_review.json
.user_id
, and a business_id
.("KpkOkG6RIf4Ra25Lhhxf1A", 1.2)
Example Data:
Input: ./data/yelp_review_sample.json
Output: ./data/yelp_reviewer_accuracy_sample_solution.txt
Running Your Code:
spark-submit --master local[2] yelp_reviewer_accuracy.py file://`pwd`/data/yelp_review_sample.json file://`pwd`/yelp_reviewer_accuracy
spark-submit --master yarn yelp_reviewer_accuracy.py hdfs:///data/yelp/yelp_academic_dataset_review.json yelp_reviewer_accuracy
MP 3 is due on Tuesday, February 13th, 2018 at 11:59PM.
You can find starter code and the Git repository where you should submit your code here:
If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.
Please write your solutions as indicated in the following files:
city_expensiveness.py
engaging_reviews.py
yelp_reviewer_accuracy.py
… and commit your code to Gitlab like this:
git add .
git commit -m "Completed MP" # Your commit message doesn't need to match this
git push
WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.