In this lab you'll build a MapReduce library as an introduction to programming in Go and to building fault tolerant distributed systems. In the first part you will write a simple MapReduce program. In the second part you will write a Master that hands out tasks to MapReduce workers, and handles failures of workers. The interface to the library and the approach to fault tolerance is similar to the one described in the original MapReduce paper.
To get up and running, execute the following commands:
$ cd ~/golabs $ git pull ... $ cd src/mapreduce $ export GOPATH=$HOME/golabs $ go test ... FAIL
The Map/Reduce implementation we give you has support for two modes of operation, sequential and distributed. In the former, the map and reduce tasks are executed one at a time: first, the first map task is executed to completion, then the second, then the third, etc. When all the map tasks have finished, the first reduce task is run, then the second, etc. This mode, while not very fast, is useful for debugging. The distributed mode runs many worker threads that first execute map tasks in parallel, and then reduce tasks. This is much faster, but also harder to implement and debug.
The mapreduce package provides a simple Map/Reduce library. Applications should normally call Distributed() [located in master.go] to start a job, but may instead call Sequential() [also in master.go] to get a sequential execution for debugging.
The code executes a job as follows:
mrtmp.xxx-0-0 mrtmp.xxx-0-1 mrtmp.xxx-0-2 mrtmp.xxx-1-0 mrtmp.xxx-1-1 mrtmp.xxx-1-2Each worker must be able to read files written by any other worker, as well as the input files. Real deployments use distributed storage systems such as GFS to allow this access even though workers run on different machines. In this lab you'll run all the workers on the same machine, and use the local file system.
Over the course of the following exercises, you will have to write/modify doMap, doReduce, and schedule yourself. These are located in common_map.go, common_reduce.go, and schedule.go respectively. You will also have to write the map and reduce functions in ../main/wc.go.
You should not need to modify any other files, but reading them might be useful in order to understand how the other methods fit into the overall architecture of the system.
The Map/Reduce implementation you are given is missing some pieces. Before you can write your first Map/Reduce function pair, you will need to fix the sequential implementation. In particular, the code we give you is missing two crucial pieces: the function that divides up the output of a map task, and the function that gathers all the inputs for a reduce task. These tasks are carried out by the doMap() function in common_map.go, and the doReduce() function in common_reduce.go respectively. The comments in those files should point you in the right direction.
Once you've finished the sequential implementation, test by running the command
$ go test -v -run Sequential ok mapreduce 2.694s
If the output did not show ok next to the tests, you can print out more verbose output by setting debugEnabled = true in common.go. You will get much more output along the lines of:
$ env "GOPATH=$PWD/../../" go test -v -run Sequential === RUN TestSequentialSingle master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 master: Map/Reduce task completed --- PASS: TestSequentialSingle (1.34s) === RUN TestSequentialMany master: Starting Map/Reduce task test Merge: read mrtmp.test-res-0 Merge: read mrtmp.test-res-1 Merge: read mrtmp.test-res-2 master: Map/Reduce task completed --- PASS: TestSequentialMany (1.33s) PASS ok mapreduce 2.672s
Now you will implement word count — a simple Map/Reduce example. Look in main/wc.go; you'll find empty mapF() and reduceF() functions. Your job is to insert code so that wc.go reports the number of occurrences of each word in its input. A word is any contiguous sequence of letters, as determined by unicode.IsLetter.
There are some input files with pathnames of the form pg-*.txt in ~/golabs/src/main, downloaded from Project Gutenberg. Here's how to run wc with the input files:
$ cd golabs/src/main/ $ go run wc.go master sequential pg-*.txt # command-line-arguments ./wc.go:14: missing return at end of function ./wc.go:21: missing return at end of function
The compilation fails because mapF() and reduceF() are not complete.
Review Section 2 of the MapReduce paper. Your mapF() and reduceF() functions will differ a bit from those in the paper's Section 2.1. Your mapF() will be passed the name of a file, as well as that file's contents; it should split the contents into words, and return a Go slice of mapreduce.KeyValue. While you can choose what to put in the keys and values for the mapF output, for word count it only makes sense to use words as the keys. Your reduceF() will be called once for each key, with a slice of all the values generated by mapF() for that key. It must return a string containing the total number of occurences of the key.
You can test your solution using:
$ cd ~/golabs/src/main/ $ time go run wc.go master sequential pg-*.txt master: Starting Map/Reduce task wcseq Merge: read mrtmp.wcseq-res-0 Merge: read mrtmp.wcseq-res-1 Merge: read mrtmp.wcseq-res-2 master: Map/Reduce task completed 14.59user 3.78system 0:14.81elapsed
The output will be in the file "mrtmp.wcseq". Your implementation is correct if the following command produces the output shown here:
$ sort -n -k2 mrtmp.wcseq | tail -10 he: 34077 was: 37044 that: 37495 I: 44502 in: 46092 a: 60558 to: 74357 of: 79727 and: 93990 the: 154024
You can remove the output file and all intermediate files with:
$ rm mrtmp.*
To make testing easy for you, run:
$ bash ./test-wc.sh
and it will report if your solution is correct or not.
Your current implementation runs the map and reduce tasks one at a time. One of Map/Reduce's biggest selling points is that it can automatically parallelize ordinary sequential code without any extra work by the developer. In this part of the lab, you will complete a version of MapReduce that splits the work over a set of worker threads that run in parallel on multiple cores. While not distributed across multiple machines as in real Map/Reduce deployments, your implementation will use RPC to simulate distributed computation.
The code in mapreduce/master.go does most of the work of managing a MapReduce job. We also supply you with the complete code for a worker thread, in mapreduce/worker.go, as well as some code to deal with RPC in mapreduce/common_rpc.go.
Your job is to implement schedule() in mapreduce/schedule.go. The master calls schedule() twice during a MapReduce job, once for the Map phase, and once for the Reduce phase. schedule()'s job is to hand out tasks to the available workers. There will usually be more tasks than worker threads, so schedule() must give each worker a sequence of tasks, one at a time. schedule() should wait until all tasks have completed, and then return.
schedule() learns about the set of workers by reading its registerChan argument. That channel yields a string for each worker, containing the worker's RPC address. Some workers may exist before schedule() is called, and some may start while schedule() is running; all will appear on registerChan. schedule() should use all the workers, including ones that appear after it starts.
schedule() tells a worker to execute a task by sending a Worker.DoTask RPC to the worker. This RPC's arguments are defined by DoTaskArgs in mapreduce/common_rpc.go. The File element is only used by Map tasks, and is the name of the file to read; schedule() can find these file names in mapFiles.
Use the call() function in mapreduce/common_rpc.go to send an RPC to a worker. The first argument is the the worker's address, as read from registerChan. The second argument should be "Worker.DoTask". The third argument should be the DoTaskArgs structure, and the last argument should be nil.
Your solution to Part III should only involve modifications to schedule.go. If you modify other files as part of debugging, please restore their original contents and then test before submitting.
To test your solution, you should use the same Go test suite as you did in Part I, but replace -run Sequential with -run TestBasic. This will execute the distributed test case without worker failures instead of the sequential ones you ran before:
$ go test -run TestBasic
The code we give you runs the workers as threads within a single UNIX process, and can exploit multiple cores on a single machine. Some modifications would be needed in order to run the workers on multiple machines communicating over a network. The RPCs would have to use TCP rather than UNIX-domain sockets; there would need to be a way to start worker processes on all the machines; and all the machines would have to share storage through some kind of network file system.
In this part you will make the master handle failed workers. MapReduce makes this relatively easy because workers don't have persistent state. If a worker fails, any RPCs that the master issued to that worker will fail (e.g., due to a timeout). Thus, if the master's RPC to the worker fails, the master should re-assign the task given to the failed worker to another worker.
An RPC failure doesn't necessarily mean that the worker didn't execute the task; the worker may have executed it but the reply was lost, or the worker may still be executing but the master's RPC timed out. Thus, it may happen that two workers receive the same task, compute it, and generate output. Two invocations of a map or reduce function are required to generate the same output for a given input (i.e. the map and reduce functions are "functional"), so there won't be inconsistencies if subsequent processing sometimes reads one output and sometimes the other. In addition, the MapReduce framework ensures that map and reduce function output appears atomically: the output file will either not exist, or will contain the entire output of a single execution of the map or reduce function (the lab code doesn't actually implement this, but instead only fails workers at the end of a task, so there aren't concurrent executions of a task).
You don't have to handle failures of the master. Making the master fault-tolerant is more difficult because it keeps state that would have to be recovered in order to resume operations after a master failure. Much of the later labs are devoted to this challenge.
Your implementation must pass the two remaining test cases in test_test.go. The first case tests the failure of one worker, while the second test case tests handling of many failures of workers. Periodically, the test cases start new workers that the master can use to make forward progress, but these workers fail after handling a few tasks. To run these tests:
$ go test -run Failure
Your solution to Part IV should only involve modifications to schedule.go. If you modify other files as part of debugging, please restore their original contents and then test before submitting.
For this optional challenge exercise, you will build Map and Reduce functions for generating an inverted index.
Inverted indices are widely used in computer science, and are particularly useful in document searching. Broadly speaking, an inverted index is a map from interesting facts about the underlying data, to the original location of that data. For example, in the context of search, it might be a map from keywords to documents that contain those words.
We have created a second binary in main/ii.go that is very similar to the wc.go you built earlier. You should modify mapF and reduceF in main/ii.go so that they together produce an inverted index. Running ii.go should output a list of tuples, one per line, in the following format:
$ go run ii.go master sequential pg-*.txt $ head -n5 mrtmp.iiseq A: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt ABC: 2 pg-les_miserables.txt,pg-war_and_peace.txt ABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txt ABRAHAM: 1 pg-dracula.txt ABSOLUTE: 1 pg-les_miserables.txtIf it is not clear from the listing above, the format is:
word: #documents documents,sorted,and,separated,by,commasFor full credit on this challenge, you must pass bash ./test-ii.sh, which runs:
$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10 women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt won: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt wonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt words: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt worked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt worse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt wounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt yes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt younger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt yours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
You can run all the tests by running the script src/main/test-mr.sh. With a correct solution, your output should resemble:
$ bash ./test-mr.sh ==> Part I ok mapreduce 3.053s ==> Part II Passed test ==> Part III ok mapreduce 1.851s ==> Part IV ok mapreduce 10.650s ==> Part V (challenge) Passed test
$ git commit -am "Submit lab-3" $ git push origin