Overview
This is meant as a tutorial to running an elastic-mapreduce job on AWS, from scratch. You can find lots of resources on this, but this is intended as a start-to-finish guide.
We are going to use google ngrams to look for words which were coined in the year 1999 – and we are going to do it with streaming mapreduce in python.
Furthermore, we are going to do it from scratch, assuming you’ve never used AWS at all. That means everything including:
- Getting an account and S3 storage bucket
- Using s3cmd to interact with S3
- Downloading data from google and uploading to S3 from an EC2 instance
- Setting up the elastic-mapreduce command line interface (CLI)
- Understanding the data, and writing a mapper and reducer
- Submitting jobs from the command line, and retrieving output.
Disclaimer: AWS costs money. Make sure you don’t leave instances running that you aren’t using, and don’t occupy S3 space that you don’t need. This tutorial is for educational purposes only. In the process of trying this code and a few other things, it cost me about 4 dollars.
Getting Started
- First you will need to make your account at http://aws.amazon.com/ – you will need a phone number and credit card. This exercise shouldn’t cost more than a couple dollars.
- Then go to http://console.aws.amazon.com/ec2 . Under “Network & Security” click “Key Pairs” and then click “Create Key Pair”. Name it MyKeyPair and download it (I will asume it goes to the ~/Downloads/ directory.)
- Next, let’s make a local working directory and put your key there. As such:
mkdir MyAWSTest cd MyAWSTest cp ~/Downloads/MyKeyPair.pem . chmod 700 MyKeyPair.pem
- Then, download and setup the elastic-mapreduce command-line interface
mkdir EMR cd EMR wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip unzip elastic-mapreduce-ruby.zip
- Still in the EMR directory, make a credentials.json file. For this you will need your access ID and private key from https://console.aws.amazon.com/iam/home?#security_credential ( click on access keys and click download key file.) My credentials.json looks like:
{
"access_id": "YOURACCESSIDHERE",
"private_key": "YOURPRIVATEKEYHERE",
"keypair": "MyKeyPair",
"key-pair-file": "/PATH/TO/YOUR/WORKINGDIRECTORY/MyAWSTest/MyKeyPair.pem",
"region": "us-west-2"
} - The last setup item is installing s3cmd, a convenient command-line tool for access s3 from your local computer. I am on an ubuntu-based linux distro, so this is easy. You will need to configure once with your access ID and private key from the previous step
sudo apt-get install s3cmd s3cmd --configure
Writing map and reduce code in python
We can write map and reduce code in python, which will take the ngrams data files, map the lines into a more useful format, and reduce them to our desired result. To get a better idea of this, let’s look at a small subset of the data. To do so, we will download and glance at the 1grams beginning with the letter “x”:
wget http://storage.googleapis.com/books/ngrams/books/googlebooks-eng-all-1gram-20120701-x.gz gunzip googlebooks-eng-all-1gram-20120701-x.gz head googlebooks-eng-all-1gram-20120701-x
Which displays:
X’rays 1914 1 1
X’rays 1917 1 1
X’rays 1919 1 1
X’rays 1921 1 1
X’rays 1922 2 1
X’rays 1923 1 1
X’rays 1927 1 1
X’rays 1930 5 3
X’rays 1931 2 2
X’rays 1932 3 2
Here we can see some of the early appearances of xrays in the 1900’s. The first column is the word, the second is the year it appeared, the third is the total number of occurrences, and the last is the number of distinct books it occurred in.
We are going to look for normal words (consisting of alphabetic characters only), and see which words started occurring in the year 1999. So we will ignore the last column for this exercise.
The first stage of map-reduce is the mapper. Here we will clean up the word (make lower case, get rid of weird words with special characters, etc), and simply output the clean word, the year, and the number of occurrences. Code for a mapper is like this:
#!/usr/bin/env python import sys def CleanWord(aword): """ Function input: A string which is meant to be interpreted as a single word. Output: a clean, lower-case version of the word """ # Make Lower Case aword = aword.lower() # Remvoe special characters from word for character in '.,;:\'?': aword = aword.replace(character,'') # No empty words if len(aword)==0: return None # Restrict word to the standard english alphabet for character in aword: if character not in 'abcdefghijklmnopqrstuvwxyz': return None # return the word return aword # Now we loop over lines in the system input for line in sys.stdin: # Strip the line of whitespace and split into a list line = line.strip().split() # Use CleanWord function to clean up the word word = CleanWord(line[0]) # If CleanWord didn't return a string, move on if word == None: continue # Get the year and the number of occurrences from # the ngram line year = int(line[1]) occurrences = int(line[2]) # Print the output: word, year, and number of occurrences print '%s\t%s\t%s' % (word, year,occurrences)
The next step is the reducer. It will take the output of the mapper (which is sorted alphabetically), and run through it. Our goal is to add up the pre-1999 occurrences and the 1999 occurrences, and if the word only occurred in 1999, we will output it. It’s pretty straightforward:
#!/usr/bin/env python import sys # current_word will be the word in each loop iteration current_word = '' # word_in_progress will be the word we have been working # on for the last few iterations word_in_progress = '' # target_year_count is the number of word occurrences # in the target year target_year_count = 0 # prior_year_count is the number of word occurrenes # in the years prior to the target year prior_year_count = 0 # Define the target year, in our case 1999 target_year = 1999 # Loop over lines of input from STDIN for line in sys.stdin: # Get the items in the line as a list line = line.strip().split('\t') # If for some reason there are not 3 items, # then move on to next line if len(line)!=3: continue # The line consists of a word, a year, and # a number of occurrences current_word, year, occurrences = line # If we are on a new word, check the info of the last word # Print if it is a newly minted word, and zero our counters if current_word != word_in_progress: # Word exists in target year if target_year_count > 0: # Word doesn't exist in target year if prior_year_count ==0: # Print the cool new word and its occurrences print '%s\t%s' % (word_in_progress,target_year_count) # Zero our counters target_year_count = 0 prior_year_count = 0 word_in_progress = current_word # Get the year and occurences as integers # Continue if there is a problem try: year = int(year) except ValueError: continue try: occurrences = int(occurrences) except ValueError: continue # Update our variables if year == target_year: target_year_count += occurrences if year < target_year: prior_year_count += occurrences # Since the loop is over, print the last word if applicable if target_year_count > 0: # Word doesn't exist in target year if prior_year_count ==0: # Print the cool new word and its occurrences print '%s\t%s' % (word_in_progress,target_year_count)
A local test of the code
Remember our “x” 1gram data that we downloaded? We can use that to test the code. You’ll note that the code used the “stdin” – this is equivalent to just “cat”ing the file, and taking that streaming input line by line. This is what hadoop or elastic-mapreduce will do, so this is what we can try in the command line:
cat googlebooks-eng-all-1gram-20120701-x | ./mapper.py | sort -k1,1 | ./reducer.py | sort -k2,2n
So, the mapper is run on the streaming input, and the output is sorted, and the reducer is run on that. The end is just a sort that I introduced which will put the most common output last – these are the words created in 1999 which were used the most. Example output is:
xdcam 25
xmlparser 83
xadatasource 338
As you might expect from “x” words – these are mostly tech words. Now we can try a similar test with elastic-mapreduce.
An EMR test of the code
First, we can use s3cmd to upload our necessary files to S3. This is very easy. We make a bucket called “ngramstest” and then upload out mapper, reducer, and data file to the bucket.
s3cmd mb ngramstest s3cmd put reducer.py s3://ngramstest/code/reducer.py s3cmd put mapper.py s3://ngramstest/code/mapper.py s3cmd put googlebooks-eng-all-1gram-20120701-x s3://ngramstest/input/NGramsX.txt
Then, we can use these inputs to launch an EMR job. From the EMR directory:
./elastic-mapreduce --create --stream \ --input s3n://ngramstest/input \ --output s3n://ngramstest/output-streaming-full-cli \ --mapper s3n://ngramstest/code/mapper.py \ --reducer s3n://ngramstest/code/reducer.py
Now it it is just a matter of waiting a couple minutes. Here are the commands to check your status and view the results when it is complete:
# Check status ./elastic-mapreduce --list # When done, ls the output, and copy the output file locally. s3cmd ls s3://ngramstest/output-streaming-cli/ s3cmd get s3://ngramstest/output-streaming-cli/part-00000 results.txt # We can view the results just as before! cat results.txt | sort -k2,2n
Getting lots of data
The google ngrams are on S3 already, but elastic-mapreduce streaming can read from compressed files, as long as we have the file extension in the name. Unfortunately, the files provided do not 😦
But no worries, we can do it from scratch! To make this process faster, we can launch an EC2 instance for a few moments, and run a script to download the data and move it to S3.
To launch an EC2 instance:
- Go to console.aws.amazon.com/ec2 and click the big blue “Launch Instance” button
- Choose and AMI (Machine Image). I chose Amazon Linux AMI 2014.03 – ami-b8f69f88 (64-bit)
- The next step will be choosing an instance type. I chose “General Purpose” m1.small. More info at: http://aws.amazon.com/ec2/instance-types/
- Just hit “Review and launch”
- Click “Launch” to launch the instance. When prompted, select your key pair “MyKeyPair”.
From the EC2 console you can get the Public DNS, and use it to ssh into the instance. My command looked like this, but switch out your Public DNS:
ssh ec2-user@ec2-54-186-211-19.us-west-2.compute.amazonaws.com -i MyKeyPair.pem
Once logged in, do
“aws configure” and fill out the “AWS Access Key” prompt and “AWS Secret Access Key” prompt. This will allow you to interact with S3 from the ssh session.
Make a folder “ngramstest/input_gz” from the AWS S3 console.
Now it is just a matter of downloading the google 1grams data for all of the alphabet and putting it your S3 folder. I made this quick script to do that:
import os for letter in 'abcdefghijklmnopqrstuvwxyz': ngram_file = 'googlebooks-eng-all-1gram-20120701-'+letter+'.gz' os.system('wget http://storage.googleapis.com/books/ngrams/books/'+ngram_file) os.system('aws s3 cp '+ngram_file+' s3://ngramstest/input_gz/'+ngram_file) os.system('rm '+ngram_file)
Now we have all the data we need! Exit the ssh session, and be sure to stop or terminate the m1.small session from the AWS console. You are paying 6 cents an hour for it, after all!
Launch the full map-reduce work
This is similar to our previous tests, except now we will run on the full dataset. This is now using 5 instances.
./elastic-mapreduce --create --stream \ --input s3n://ngramstest/input_gz \ --output s3n://ngramstest/output-streaming-fullrun \ --mapper s3n://ngramstest/code/mapper.py \ --reducer s3n://ngramstest/code/reducer.py \ --num-instances 5
Wait for the job to finish. Then you can get the output, and read it!
s3cmd get s3://ngramstest/output-streaming-fullrun/part* cat part* | sort -k2,2n
A few of the more interesting results:
- podracer (357) , and sebulba (432), both starwars references
- charizard (95) and pokedex (72)
- buckbeak(99), from Harry Potter and the Prisoner of Azkaban
Thanks to http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ for a brilliantly simple python streaming example.