As we have talked about in lecture, Spark is built on the concept of a Resilient Distributed Dataset (RDD).
PySpark allows us to interface with these RDD’s in Python. Think of it as an API. In fact, it is an API: check out its documentation. PySpark is built on top of the Spark’s Java API and exposes the Spark programming model to Python. It does this by using a library called
Py4J, which enables Python programs to dynamically access Java objects in a Java Virtual Machine.
This allows data to be processed in Python and cached in the JVM. This model lets us combine the performance of the JVM with the expressiveness of Python.
Note that Spark has many ways of processing data. One of the more exciting innovations of recent Spark versions is the “DataFrame”. However, this week we’ll only be using RDDs, which are the original unit of processing data in Spark.
rdd = sc.parallelize(rdd.take(100)). This action pulls 100 items from the input RDD and creates a new RDD through the
.parallelizefunction. You may also use the
rdd.samplefunction if you wish to take a fraction of an RDD’s items
You aren’t limited to just these functions, here are some you should definitely know. (It may be more efficient to lookup other options!)
funcon the input value returned
Other useful functions:
You should use the
sparkContext.textFile method to load in data using the filename provided to your function. Read more about the
textFile method in its documentation.
Similar to what we saw in the past weeks with the Hadoop Job web interface, Spark has a really useful web interface to view how jobs are executed. Accessing this web interface is similar to accessing the Hadoop interface.
UNIX users can access this UI by appending
-L 18080:ip-172-31-7-221.ec2.internal:18080 to their ssh command, and then accessing
localhost:18080 in their local web browser. Windows users can use PuTTY in a similar fashion.
For example, your “full” ssh command may look like this:
ssh -i /path/to/ssh_key <netid>@<cluster_address> -L 18080:ip-172-31-7-221.ec2.internal:18080
NOTE: To view “in-progress” jobs, you’ll need to click on the “Show incomplete applications” link at the bottom of the page.
We’ll be using
spark-submit to run our spark jobs on the cluster.
spark-submit has a couple command line options that you can tweak.
This option tells
spark-submit where to run your job, as Spark can run in several modes.
--master yarn --deploy-mode client
--master yarn --deploy-mode cluster
In the output of
spark-submit --master yarn --deploy-mode cluster you’ll find an
applicationId. (This is similar to when you ran jobs on Hadoop). You can issue this command to get the logs for your job:
yarn logs -applicationId <YOUR_APPLICATION_ID> | less
When debugging Python applications, it’s useful to
Traceback in your logs, as this will likely be the actual debug information you’re looking for.
yarn logs -applicationId <YOUR_APPLICATION_ID> | grep -A 50 Traceback
NOTE: In cluster mode, normal “local” IO operations like opening files will behave unexpectedly! This is because you’re not guaranteed which node the driver will run on. You must use the PySpark API for saving files to get reliable results. You also have to coalesce your RDD into one partition before asking PySpark to write to a file (why do you think this is?). Additionally, you should save your results to HDFS.
In summary, use
--master yarn --deploy-mode client, unless we tell you not to. 😄
Submitting a Spark job will usually look something like this:
spark-submit --master yarn --deploy-mode client <my_spark_job>.py <job_arguments>
Because Spark isn’t tied to HDFS, we can run Spark using input data that exists on the local filesystem.
However, because we’re using Spark on EMR, the default is to use HDFS paths. We can get around this using the
file:// prefix, and using an absolute path to the file we want to reference.
spark-submit is the way we’ll be endorsing to run PySpark jobs, there is an option to run jobs in an interactive shell. Use the
pyspark command to load into the PySpark interactive shell. You can use many of the same options listed above to tweak
pyspark settings, such as
Note: If you start up the normal
python interpreter on the cluster, you won’t be able to use any of the PySpark features.
This week, we’ll be working off of a set of released Yelp data.
The dataset is located in
/data/yelp in HDFS. We’ll be using the following files for this MP:
/data/yelp/yelp_academic_dataset_business.json /data/yelp/yelp_academic_dataset_checkin.json /data/yelp/yelp_academic_dataset_review.json /data/yelp/yelp_academic_dataset_user.json
We’ll give more details about the data in these files as we continue with the MP, but the general schema is this: each line in each of these JSON files is an independent JSON object that represents a distinct entity, whether it be a business, a review, or a user.
Hint: JSON is parsed with
This problem uses the
In planning your next road trip, you want to find the cities that will be the least expensive to dine at.
It turns out that Yelp keeps track of a handy metric for this: many restaurants have the attribute
RestaurantsPriceRange2 that gives the business a score from 1-4 on a scale of ‘priciness’. Note that the “attributes” section of the business schema is messy, so you’ll have to manually parse out the
Write a PySpark application that determines the average price attribute of their businesses/restaurants.
Your average restaurant price should be rounded to 2 decimal places. Each city should be output as a tuple of
("City, State", Price):
CITY_STATE, PRICE Example: ("Champaign, IL", 1.23)
Running Your Code:
spark-submit --master local city_expensiveness.py file://`pwd`/data/yelp_business_sample.json file://`pwd`/city_expensiveness
spark-submit --master yarn city_expensiveness.py hdfs:///data/yelp/yelp_academic_dataset_business.json city_expensiveness
This problem uses the
In selecting a restaurant, you might want to find the review that Yelp users have decided is the best review. For each business in the
review dataset, find and output the review that has the greatest engagement. That is, find the review that has the greatest sum of
cool interactions. You will find these values as fields of the review.
Break ties by selecting the review ID with the lexicographically “largest” ID.
Your output should should be a RDD of tuples in the form (business_id, review_id):
``` BUSINESS_ID, REVIEW_ID Example: KYasaF1nov1bn7phfSgWeg EmuqmSacByt96t8G5GK0KQ ```
Running Your Code:
spark-submit --master local engaging_reviews.py file://`pwd`/data/yelp_review_sample.json file://`pwd`/engaging_reviews
spark-submit --master yarn engaging_reviews.py hdfs:///data/yelp/yelp_academic_dataset_review.json engaging_reviews
For this activity, we’ll be looking at Yelp reviews. 😱 Namely, we want to find out which Yelp reviewers are more or less harsh than they should be.
To do this we will calculate the average review score of each business in our dataset, and find how far away users’ ratings are from the average of a business. You can find the rating for each review in the “stars” attribute of the review.
average_business_rating of a business is the sum of the ratings of the business divided by the count of the ratings for that business. A user’s review offset score is the sum of the differences between their rating and the average business rating. A positive score indicates that a user tends to give higher ratings than the average; a negative score indicates that the user tends to give lower ratings than the average.
Your output should be an RDD of tuples in the form
user_id, and a
Running Your Code:
spark-submit --master local yelp_reviewer_accuracy.py file://`pwd`/data/yelp_review_sample.json file://`pwd`/yelp_reviewer_accuracy
spark-submit --master yarn yelp_reviewer_accuracy.py hdfs:///data/yelp/yelp_academic_dataset_review.json yelp_reviewer_accuracy
MP 3 is due on Tuesday, February 13th, 2018 at 11:59PM.
You can find starter code and the Git repository where you should submit your code here:
If you have issues accessing your repository, make sure that you’ve signed into Gitlab. If you still do not have access, please make a private post on the course Piazza.
Please write your solutions as indicated in the following files:
… and commit your code to Gitlab like this:
git add . git commit -m "Completed MP" # Your commit message doesn't need to match this git push
WARNING: Our autograder runs on Python3. If you write code for this assignment that only works on Python2, we will be unable to grade your code. Code that contains syntax errors (i.e. unable to be interpreted by the Python interpreter) will receive no credit.