An elastic-mapreduce streaming example with python and ngrams on AWS

Overview

This is meant as a tutorial to running an elastic-mapreduce job on AWS, from scratch. You can find lots of resources on this, but this is intended as a start-to-finish guide.

We are going to use google ngrams to look for words which were coined in the year 1999 – and we are going to do it with streaming mapreduce in python.

Furthermore, we are going to do it from scratch, assuming you’ve never used AWS at all. That means everything including:

  • Getting an account and S3 storage bucket
  • Using s3cmd to interact with S3
  • Downloading data from google and uploading to S3 from an EC2 instance
  • Setting up the elastic-mapreduce command line interface (CLI)
  • Understanding the data, and writing a mapper and reducer
  • Submitting jobs from the command line, and retrieving output.

 

Disclaimer: AWS costs money. Make sure you don’t leave instances running that you aren’t using, and don’t occupy S3 space that you don’t need. This tutorial is for educational purposes only. In the process of trying this code and a few other things, it cost me about 4 dollars.

 


Getting Started

  1. First you will need to make your account at http://aws.amazon.com/ – you will need a phone number and credit card. This exercise shouldn’t cost more than a couple dollars.
  2. Then go to http://console.aws.amazon.com/ec2 . Under  “Network & Security” click  “Key Pairs” and then click “Create Key Pair”. Name it MyKeyPair and download it (I will asume it goes to the ~/Downloads/ directory.)
  3. Next, let’s make a local working directory and put your key there. As such:
          mkdir MyAWSTest
          cd MyAWSTest
          cp ~/Downloads/MyKeyPair.pem .
          chmod 700 MyKeyPair.pem
          
  4. Then, download and setup the elastic-mapreduce command-line interface
    mkdir EMR
    cd EMR
    wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
    unzip elastic-mapreduce-ruby.zip
    
  5. Still in the EMR directory, make a credentials.json file. For this you will need your access ID and private key from https://console.aws.amazon.com/iam/home?#security_credential ( click on access keys and click download key file.) My credentials.json looks like:

    {
    "access_id": "YOURACCESSIDHERE",
    "private_key": "YOURPRIVATEKEYHERE",
    "keypair": "MyKeyPair",
    "key-pair-file": "/PATH/TO/YOUR/WORKINGDIRECTORY/MyAWSTest/MyKeyPair.pem",
    "region": "us-west-2"
    }

  6. The last setup item is installing s3cmd, a convenient command-line tool for access s3 from your local computer. I am on an ubuntu-based linux distro, so this is easy. You will need to configure once with your access ID and private key from the previous step
    sudo apt-get install s3cmd
    s3cmd --configure
    

Writing map and reduce code in python

We can write map and reduce code in python, which will take the ngrams data files, map the lines into a more useful format, and reduce them to our desired result. To get a better idea of this, let’s look at a small subset of the data. To do so, we will download and glance at the 1grams beginning with the letter “x”:

wget http://storage.googleapis.com/books/ngrams/books/googlebooks-eng-all-1gram-20120701-x.gz
gunzip googlebooks-eng-all-1gram-20120701-x.gz
head googlebooks-eng-all-1gram-20120701-x

Which displays:

X’rays 1914 1 1
X’rays 1917 1 1
X’rays 1919 1 1
X’rays 1921 1 1
X’rays 1922 2 1
X’rays 1923 1 1
X’rays 1927 1 1
X’rays 1930 5 3
X’rays 1931 2 2
X’rays 1932 3 2

Here we can see some of the early appearances of xrays in the 1900’s. The first column is the word, the second is the year it appeared, the third is the total number of occurrences, and the last is the number of distinct books it occurred in.

We are going to look for normal words (consisting of alphabetic characters only), and see which words started occurring in the year 1999. So we will ignore the last column for this exercise.

The first stage of map-reduce is the mapper. Here we will clean up the word (make lower case, get rid of weird words with special characters, etc), and simply output the clean word, the year, and the number of occurrences. Code for a mapper is like this:

#!/usr/bin/env python

import sys

def CleanWord(aword):
	"""
	Function input: A string which is meant to be
	   interpreted as a single word.
	Output: a clean, lower-case version of the word
	"""
	# Make Lower Case
	aword = aword.lower()
	# Remvoe special characters from word
	for character in '.,;:\'?':
		aword = aword.replace(character,'')
	# No empty words
	if len(aword)==0:
		return None
	# Restrict word to the standard english alphabet
	for character in aword:
		if character not in 'abcdefghijklmnopqrstuvwxyz':
			return None
	# return the word
	return aword

# Now we loop over lines in the system input
for line in sys.stdin:
	# Strip the line of whitespace and split into a list
	line = line.strip().split()
	# Use CleanWord function to clean up the word
	word = CleanWord(line[0])

	# If CleanWord didn't return a string, move on
	if word == None:
		continue

	# Get the year and the number of occurrences from
	# the ngram line
	year = int(line[1])
	occurrences = int(line[2])

	# Print the output: word, year, and number of occurrences
	print '%s\t%s\t%s' % (word, year,occurrences)

The next step is the reducer. It will take the output of the mapper (which is sorted alphabetically), and run through it. Our goal is to add up the pre-1999 occurrences and the 1999 occurrences, and if the word only occurred in 1999, we will output it. It’s pretty straightforward:

#!/usr/bin/env python
import sys

# current_word will be the word in each loop iteration
current_word = ''
# word_in_progress will be the word we have been working
# on for the last few iterations
word_in_progress = ''

# target_year_count is the number of word occurrences
# in the target year
target_year_count = 0
# prior_year_count is the number of word occurrenes
# in the years prior to the target year
prior_year_count = 0

# Define the target year, in our case 1999
target_year = 1999

# Loop over lines of input from STDIN
for line in sys.stdin:

	# Get the items in the line as a list
	line = line.strip().split('\t')

	# If for some reason there are not 3 items,
	# then move on to next line
	if len(line)!=3:
		continue

	# The line consists of a word, a year, and
	# a number of occurrences
	current_word, year, occurrences =  line

	# If we are on a new word, check the info of the last word
	# Print if it is a newly minted word, and zero our counters
	if current_word != word_in_progress:
		# Word exists in target year
		if target_year_count > 0:
			# Word doesn't exist in target year
			if prior_year_count ==0:
				# Print the cool new word and its occurrences
				print '%s\t%s' % (word_in_progress,target_year_count)

		# Zero our counters
		target_year_count = 0
		prior_year_count = 0
		word_in_progress = current_word

	# Get the year and occurences as integers
	# Continue if there is a problem
	try:
		year = int(year)
	except ValueError:
		continue
	try:
		 occurrences = int(occurrences)
	except ValueError:
		continue

	# Update our variables
	if year == target_year:
		target_year_count += occurrences
	if year < target_year:
		prior_year_count += occurrences

# Since the loop is over, print the last word if applicable
if target_year_count > 0:
	# Word doesn't exist in target year
	if prior_year_count ==0:
		# Print the cool new word and its occurrences
		print '%s\t%s' % (word_in_progress,target_year_count)


A local test of the code

Remember our “x” 1gram data that we downloaded? We can use that to test the code. You’ll note that the code used the “stdin” – this is equivalent to just “cat”ing the file, and taking that streaming input line by line. This is what hadoop or elastic-mapreduce will do, so this is what we can try in the command line:

cat googlebooks-eng-all-1gram-20120701-x | ./mapper.py  | sort -k1,1 | ./reducer.py | sort -k2,2n

So, the mapper is run on the streaming input, and the output is sorted, and the reducer is run on that. The end is just a sort that I introduced which will put the most common output last – these are the words created in 1999 which were used the most. Example output is:

xdcam 25
xmlparser 83
xadatasource 338

As you might expect from “x” words – these are mostly tech words. Now we can try a similar test with elastic-mapreduce.


An EMR test of the code

First, we can use s3cmd to upload our necessary files to S3. This is very easy. We make a bucket called “ngramstest” and then upload out mapper, reducer, and data file to the bucket.

s3cmd mb ngramstest

s3cmd put reducer.py s3://ngramstest/code/reducer.py
s3cmd put mapper.py s3://ngramstest/code/mapper.py
s3cmd put googlebooks-eng-all-1gram-20120701-x s3://ngramstest/input/NGramsX.txt

Then, we can use these inputs to launch an EMR job. From the EMR directory:

./elastic-mapreduce --create --stream \
--input s3n://ngramstest/input \
--output s3n://ngramstest/output-streaming-full-cli \
--mapper s3n://ngramstest/code/mapper.py \
--reducer s3n://ngramstest/code/reducer.py

Now it it is just a matter of waiting a couple minutes. Here are the commands to check your status and view the results when it is complete:

# Check status
./elastic-mapreduce --list
# When done, ls the output, and copy the output file locally.
s3cmd ls s3://ngramstest/output-streaming-cli/
s3cmd get s3://ngramstest/output-streaming-cli/part-00000 results.txt
# We can view the results just as before!
cat results.txt | sort -k2,2n

Getting lots of data

The google ngrams are on S3 already, but elastic-mapreduce streaming can read from compressed files, as long as we have the file extension in the name. Unfortunately, the files provided do not 😦

But no worries, we can do it from scratch! To make this process faster, we can launch an EC2 instance for a few moments, and run a script to download the data and move it to S3.

To launch an EC2 instance:

  • Go to console.aws.amazon.com/ec2 and click the big blue “Launch Instance” button
  • Choose and AMI (Machine Image). I chose Amazon Linux AMI 2014.03 – ami-b8f69f88 (64-bit)
  • The next step will be choosing an instance type. I chose “General Purpose” m1.small. More info at: http://aws.amazon.com/ec2/instance-types/
  • Just hit “Review and launch”
  • Click “Launch” to launch the instance. When prompted, select your key pair “MyKeyPair”.

From the EC2 console you can get the Public DNS, and use it to ssh into the instance. My command looked like this, but switch out your Public DNS:

ssh ec2-user@ec2-54-186-211-19.us-west-2.compute.amazonaws.com -i MyKeyPair.pem

Once logged in, do
“aws configure” and fill out the “AWS Access Key” prompt and “AWS Secret Access Key” prompt. This will allow you to interact with S3 from the ssh session.

Make a folder “ngramstest/input_gz” from the AWS S3 console.
Now it is just a matter of downloading the google 1grams data for all of the alphabet and putting it your S3 folder. I made this quick script to do that:

import os
for letter in 'abcdefghijklmnopqrstuvwxyz':
	ngram_file = 'googlebooks-eng-all-1gram-20120701-'+letter+'.gz'
	os.system('wget http://storage.googleapis.com/books/ngrams/books/'+ngram_file)
	os.system('aws s3 cp '+ngram_file+' s3://ngramstest/input_gz/'+ngram_file)
	os.system('rm  '+ngram_file)

Now we have all the data we need! Exit the ssh session, and be sure to stop or terminate the m1.small session from the AWS console. You are paying 6 cents an hour for it, after all!


Launch the full map-reduce work

This is similar to our previous tests, except now we will run on the full dataset. This is now using 5 instances.

./elastic-mapreduce --create --stream \
--input s3n://ngramstest/input_gz \
--output s3n://ngramstest/output-streaming-fullrun \
--mapper s3n://ngramstest/code/mapper.py \
--reducer s3n://ngramstest/code/reducer.py \
--num-instances 5

Wait for the job to finish. Then you can get the output, and read it!

s3cmd get s3://ngramstest/output-streaming-fullrun/part*
cat part*  | sort -k2,2n

A few of the more interesting results:


Thanks to http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ for a brilliantly simple python streaming example.

Advertisement