Computer Science Ilab Hadoop Support

The main computer science instructional cluster has a small Hadoop cluster as part of it. This cluster is intended for small class projects. It has three fairly substantial nodes. That's enough to handle class assignments, but wouldn't be appropriate for research or theses. (For research, we'd be happy to put you in contact with OARC, the Univeristy HPC group. They have a small cluster, and can work with cloud providers.)

Here are the major parts:

NOTE: Hadoop uses a special file system, HDFS. Files on HDFS are backed up nightly to a second HDFS file system in a separate building. Snapshots are taken nightly, so it is possible to restore deleted files within 60 days. However we are not yet sure what our policy on retention of files is going to be. It is possible that we might reset the file system each summer. Please contact help@cs.rutgers.edu if you need to keep files in HDFS on an ongoing basis, and we'll arrange to preserve them if we decice to clean the file system.

Table of contents

Note on python for this cluster

Python for the Hadoop cluser is completely separate from other Computer Science systems. If you install your own packages using "pip install --user", as described below, it will work for Jupyter, data1, data2, and data3, but the packages will not be available on other ilab systems.

For the Hadoop cluster, we have three versions of Python. Versions of python2 and python3 come with the ooperating system. We haven't removed them. But the one you probably want is a more recent Python 3, which we have installed using anaconda.

When you use Python from Jupyterhub you automatically get the new Python 3. For data1, data2, and data3, we have set the default environment to use the new Python 3 as well.

If you set PATH yourself in .bashrc, make sure you include /usr/lib/anaconda3/bin before /bin and /usr/bin. We also set PYSPARK_PYTHON=/usr/lib/anaconda3/bin/python3. This will make sure that when you run Python interactively you get the same version that you get with Jupyterhub.

If you need to install your own python packages, we suggest that you use the command

pip install --user PACKAGE
Jupyter, data1, data2 and data3 (but not other ilab systems) are set up so that "install --user" automatically installs packages to /common/clusterdata/USER/local. That makes sure that python will use the packages whether you call it interactively, via Jupyterhub, or for jobs submitted to the cluster.

Normally, "install --user" installs in ~/.local. That location won't work for jobs running on the cluster. So we are using a special location that is available in all Hadoop contexts. It is not available outside the Hadoop system, i.e. outside jupyterhub, data1, data2 and data3.

WARNING: When you use pip, it will suggest that you upgrade it to a new version. Do NOT try to do this. You can't actually upgrade PIP, because it is installed in a system directory. In attempting to do so, you will end up with an inconsistent set of packages.

HDFS files

HDFS is a distributed file system used by most of the Hadoop tools. You have a home directory in HDFS that is completely separate from your normal home directory. In fact you can't even use normal Linux tools such as the "ls" command to look at it. To see what files are in HDFS you have to use "hdfs dfs -ls".

There's no concept of a current directory in HDFS. So there's nothing like "cd" to change directories. If you omit the directory name, you'll always get /user/NETID.

hdfs dfs -ls /user/NETID
hdfs dfs -ls
where NETID is your netid will do the same thing.

The advantage of HDFS is that it's a distributed file system. For many tasks that allows the system to distribute file access across the nodes. In a large cluster that gives much greater total bandwidth than with a conventional file system. This cluster doesn't actually do that, so there's no performance advantage to HDFS. However the Hadoop tools expect data to be there, so you still have to use it.

NOTE: Spark and other tools that run on the cluster normally put their files in HDFS or SQL. If you need for a Spark job to access a file in your home directory, you should copy it from your home directory into HDFS. Otherwise Spark won't be able to see it. If it's small you can upload it through the files view in Ambari. (See above.)

To copy larger files into HDFS, login to data1, data2 or data3, and do something like this

hdfs dfs -put FILE /user/YOURNETID
By default HDFS commands will refer to your main direcory, /user/YOURNETID. So this command could also be written as
hdfs dfs -put FILE 
To look at your files in HDFS
hdfs dfs -ls /user/YOURNETID
or simply
hdfs dfs -ls
or if you want to see everything include subdirectories, do
hdfs dfs -ls -R /user/YOURNETID
For more help on HDFS options do
hdfs dfs -help
[Technical note: Spark can actually access non-HDFS files using a URI that starts with files:///. The problem is that our cluster uses Kerberos security. While the cluster nodes can see your home directory, jobs running in the cluster won't normally have Kerberos tickets for you, so they won't be able to read files from your home directory. You can get around this by putting files in /common/clusterdata/NETID. /common/clusterdata is mounted by all of the nodes in the Hadoop cluster without Kerberos security. It is slightly less secure than our normal home directories, but the difference is probably not significant.]

Software versions

Hadoop is open-source software, coordinated by the Apache Foundation. However the whole Hadoop econsystem consists of many separate projects. Most people use distributions that put together compatible versions of all the pieces and tie them together. There are two major distributions: Hortonworks and Cloudera.

This cluster is based on Hortonworks version 2.6.5. It is Hadoop version 2.7.3.2.6.3.0-235. Installation and management is done by a web management system called Ambari. Hortonworks 2.6.5 uses Ambari 2.6.0.

There are several copies of Java on the system. Hadoop itself uses /usr/jdk64/jdk1.8.0_112. If you simply type "java" you get 1.8.0_161.

"python" is the standard Centos 7.4 version of python, which is python 2.7.5. "python3" is from EPEL (a supplementary library for Centos). It is Python 3.4.8. Anaconda 5.2.0 is also installed, in /usr/lib/anaconda3. It has python 3.6 in /usr/lib/anaconda3/bin.

The Hadoop nodes use Centos 7.4 as their Linux version. The most recent version of Centos is 7.5. However Hortonworks hasn't certified their software with 7.5, so we are sticking with Centos 7.4 for the moment. (Jupyterhub, however, is running on Centos 7.5.)

How to Run Spark Programs on the Cluster

Consider using Jupyter notebook, https://jupyter.cs.rutgers.edu, or Zeppelin, https://data-services2.cs.rutgers.edu:9995, for running Python and Scala code that use Spark. This section will describe how to do things using the command line. The notebook is probably best for small programs and testing things out. This section is better for more substantial programs.

These examples all use Spark, but building and running Scala and Python that use other Hadoop components should be similar.

For a good introduction to programming with Spark in all the supported languages, see the official Spark documentation, particularly the Spark programming guide Here are examples of programming in all three languages.

You are probably familiar with both Python and Java, but not Scala. While Spark can be used with Java, the API was really developed with Scala in mind. You should consider learning Scala. It's quite similar to Java, with somewhat more modern syntax and language features. Here's the official Scala documentation .

Spark with Scala

See A Lap Around Spark for a quick introduction to Spark. Ignore instructions to do "su spark." This demonstrates typing Scala Spark programs interactively.

As documented in that introduction, an easy way to try Spark is to type commands interactively.

export PATH="/usr/hdp/current/spark2-client/bin/:$PATH"
spark-shell
The export PATH command is only needed once in a session, and can be put in your .bashrc

However for significant programs you probably don't want to type them in every time. Spark programs using Scala are built just like Java applications, because Scala runs in a Java JVM. It's really just a different syntax for Java functionality. Thus programs are put into .jar files.

Here are a couple of examples of using example programs, where the .jar files are already installed in the system.

This is an example of running Spark code just on the current node. You should be logged into data1, data2, or data3:

export SPARK_MAJOR_VERSION=2
cd /usr/hdp/current/spark2-client
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 'local[3]' examples/jars/spark-examples*.jar 10
Here's what it does:

If for some reason you want to use Spark 1, use "cd /usr/hdp/current/spark-client" rather than "cd /usr/hdp/current/spark2-client." HHowever I haven't done any tests with that.

Please use "--master local" for most course assignments. If you're doing research, or a major student project, you may want to submit your job to the whole cluster. See Submitting Spark to the cluster for how to do that.

NOTE: Anytime you do spark-submit with --master set to local or local[N], you'll get two errors at the end of the job. Spark creates temporary directories for each job. It tries to delete these directories at the end. It is unable to do so. This error is harmless.

Building and running your own program

Before doing anything, we recommend starting with

export SPARK_MAJOR_VERSION=2
PATH=/usr/hdp/current/spark2-client/bin:$PATH
This only has to be done once, and can be put in your .bashrc

Set up your build area as follows:

NOTE: Despite the name "SparkWordCount," the program is actually a bit more complex. It ends up outputting character counts, not word counts.

Now that you have built a jar file, you can run it just like the samples.

First you need a file fr the program to read. You'll need to copy some file into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

Now you can run the program. This will run it on the computer you're logged into

spark-submit --class SparkWordCount --master local --driver-memory 4g --name wordcount target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt 1
where NETID is your netid. Don't actually include the []. The part in [] isn't needed, but shows the kinds of things you can specify.

This is a word count program. /user/NETID/data.txt is the file whose words it should count. Note that Spark expects the input to be in the HDFS file system. So before running the program you have to copy some file into HDFS. That's what the "hdfs dfs -put" command does. The "1" says to count all words that are at least 1 character long.

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Object declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The only option you're likely to need beyond "--master local" is --driver-memory. The default is 512m. That might well be enough for many programs. Feel free to use 4G or 8G if necessary if you run out of memory. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

Python

NOTE: We have both python 2.7.5 and python 3.4.8. By default you will get Python 2.7.5. If you want to use Python 3 from Anaconda, execute the following before doing any programming:

export PYSPARK_PYTHON=/usr/lib/anaconda3/bin/python3
You may want to put that in your .bashrc file. Both the pyspark shell and spark-submit will use the version of Python you specify. (If you prefer to use one of the pythons that come with Centos, you can use /usr/bin/python or /usr/bin/python3. However we'll be installing new packages primarily in Anaconda.)

To use python with Spark interactively, do the following:

export PATH="/usr/hdp/current/spark2-client/bin/:$PATH"
pyspark
The export PATH command is only needed once in a session, and can be put in your .bashrc Pyspark is python with Spark-related libraries preloaded and a Sparkcontext already created as "sc".

However for longer programs, you probably want to put them in a file and run thhem. Here's how to run an example program that they supply:

cd /usr/hdp/current/spark2-client
./bin/spark-submit --master 'local[3]' examples/src/main/python/pi.py 100

Please use "--master local" for most course assignments. If you're doing research, or a major student project, you may want to submit your job to the whole cluster. See Submitting Spark to the cluster for how to do that.

NOTE: Anytime you do spark-submit with --master set to local or local[N], you'll get two errors at the end of the job. Spark creates temporary directories for each job. It tries to delete these directories at the end. It is unable to do so. This error is harmless.

Running your own code

Consider using Jupyter notebook, https://jupyter.cs.rutgers.edu. This is a good approach for small programs, and for interactive exploration. The rest of this section assumes you want to do more conventional programming.

Before doing anything, we recommend starting with

PATH=/usr/hdp/current/spark2-client/bin:$PATH
This only has to be done once, and can be put in your .bashrc

If you want to try out simple programs, you can try

export SPARK_MAJOR_VERSION=2
pyspark
Assuming you're set the path, that will give you a copy of python with Spark accessible to it.

However for serious programs you probably don't want to type them in each time. You'll want to put the program in a file. Here's how:

Download this sample code: WordCount.py.

You'll also need a data file in HDFS for the program to read. This will copy SOMEFILE into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

At this point you can use the same kind of commands as above, e.g. to run it just on the current computer

spark-submit --master 'local[3]' --driver-memory=4g spark1.py /user/NETID/data.txt

The only option you're likely to need beyond "--master local" is --driver-memory. The default is 512m. That might well be enough for many programs. Feel free to use 4G or 8G if necessary if you run out of memory. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.

If you need to use additional libraries that we haven't installed, one approach is to pass them to spark-submit with the --py-files argument. This takes a comma-separated list of files. They can be either .py files or ZIP files. We're working on a way to use an environment created with "pip install --user". That will be documented here once we've verified it.

Java

Java is used much like Scala, because Scala actually uses the Java JVM and build process. Thus we suggest that you look at the Scala section and try running the samples. That will get you used to the tools.

Building and running your own program

Before doing anything, we recommend starting with

export SPARK_MAJOR_VERSION=2
PATH=/usr/hdp/current/spark2-client/bin:$PATH
This only has to be done once, and can be put in your .bashrc

Set up your build area as follows:

Now that you have built a jar file, you can run it just like the samples.

First you need a file fr the program to read. You'll need to copy some file into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

Now you can run the program. This will run it on the computer you're logged into

spark-submit --class JavaWordCount --master local --driver-memory 4g  target/javawordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt
where NETID is your netid. Unlike the Scala example, I've used the minimum number of arguments to the spark-submit command. See the Scala section for the kinds of things you can specify.

Everything up to and including the .jar file is needed every time you run a Java program. The last item is an argument to the program. Your program won't necessarily expect arguments, and you can also write programs that take more than one.

This is a word count program. /user/NETID/data.txt is the file whose words it should count. Note that Spark expects the input to be in the HDFS file system. So before running the program you have to copy some file into HDFS. That's what the "hdfs dfs -put" command does.

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Class declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The only option you're likely to need beyond "--master local" is --driver-memory. The default is 512m. That might well be enough for many programs. Feel free to use 4G or 8G if necessary if you run out of memory. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.

Everything up to and including the .jar file is needed every time you run a Java program, except --driver-memory, which is optional. The last item is an argument to the program. Your program won't necessarily expect arguments, and you can also write programs that take more than one.

As above, I've not used as many arguments as in the Scala examples. See the Scala section for what you can specify.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

About pom.xml

When you specify a "import" statement in your program, this implicitly refers to a jar file that has the code implementing the package. That jar file has to be available both when your program is being compiled / built, and when it is run. Since programs are typically run on the cluster, that requires the jar files to be on every node in the cluster.

The pom.xml file declares what packages you need. The "mvn package" command will retrieve them from various network respositories, and make them avaiable to the Java compiler when building your program.

pom.xml does several other things in addition:

In the pom.xml files I supply, I've included a few Spark packages, and in comments I've shown how to use additional ones.

But suppose you want to use packages that aren't part of Spark. In that case you'll need to locate the necessary jar files and add them as dependencies in pom.xml. Generally you'll be using standard open-source libraries. (See below for how to add your own .jar files.) There's a very large network respository, which has pretty much every package known to man: Maven Central. The "mvn package" command knows how to find packages in that repository, download them, and use them.

But first you have to find the location of the package in the repository. Generally a bit of searching in Google show you the Maven specificatino for a package. You're looking for what is called a "Maven dependency." Normally they look like this:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.2.0.2.6.3.0-235</version>
    </dependency>
If you look at pom.xml, it should be obvious where to add them.

More complex programs may use more than one source file. If you add additional files in src/main/java, they will be compiled and added to the jar file in target.

Note that the version numbers in the dependencies have been chosen to match the precise version of Hortonwords we have installed. These versions will change next time we upgrade the software. In practice such an exact match probably isn't necessary. You could build against generic libraries.

Including packages at runtime

Unfortunately listing a package in pom.xml is only half the battle. That only makes the .jar file available when your program is built. It also has to be available when the program runs. In cluster mode that means it has to be on every node in the cluster.

Many common packages are already present in the cluster. I believe you can see which ones by looking at /usr/hdp/2.6.3.0-235/spark2/jars on data1, 2 and 3. If your package is already present, you need to put it in pom.xml, but you don't need this section. This section shows how to supply a package in the execution environment if it's not already there.

If you need to add the package to the execution environment, you'll need to use the --packages argument to spark-submit.

E.g. suppose you wanted to use the "JSON simple" package to work with JSON objects. E.g.

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

...

    JSONObject obj = new JSONObject();
    obj.put("name", "Rutgers University");
    obj.put("age", new Integer(100));
    System.out.println(obj.toJSONString());

The JSON simple documentation gives you the Maven dependency. But you could also look in google for "maven json simple." That will point you to a Maven web page. You'll find that The most recent version is 1.1.1. It shows the following dependency:
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1.1</version>
</dependency>

Add that to the <dependency> section of your pom.xml. You should now be able to do "mvn package".

But there's more .... If you do

spark-submit --class JavaWordCount --master local target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/hedrick/data.txt
you'll get the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/json/simple/JSONObject
That's because you had the jar files available through maven when you compiled the program, but not when you ran it. This won't always be a problem. The execution environment has many of the most common libraries, e.g. most of the Apache Commons packages. It also has a couple of recent JSON libraries. I actually picked JSON Simple specifically because it's an older library that isn't present in the execution environment. (I needed something that isn't there to do this demonstration.)

To make the necessary jar file available during execution, you need to include the package in the "spark-submit" command

spark-submit --packages com.googlecode.json-simple:json-simple:1.1.1 --class JavaWordCount --master local target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/hedrick/data.txt
Note the --packages argument. It simply puts together the group ID, artifact ID and version number. If you have more than one package, list all of them separated by commas, without any whitespace.

Some packages are in repositories other than Maven cental. E.g. there's a spark-packages.org that collects packages for spark. They have their own repository. To load packages from there, you need to specify --repositories=https://dl.bintray.com/spark-packages/maven in addition to the --packages argument. For both packages and repositories, you can specify more than one value, separated by comma. No white space.

If you have jar files that aren't in maven, you'll need to have copies of them in your directory. Then you can supply a "--jars" argument to "spark-submit", using the name of the jar files separated by commas.

Note that jars loaded from --packages are cached in .ivy2, so they don't have to be downloaded every time. For jobs submitted to the cluster, there's a per-user cache on each cluster node.

Putting options in a file

If you need lots of packages, you may find it inconvenient to type long arguments to spark-shell or spark-submit each time. You can put arguments in a properties file. E.g. here's one that adds a package from the Spark packages collection. Put the following in spark.properties

spark.yarn.appMasterEnv.PYSPARK_PYTHON /usr/lib/anaconda3/bin/python3
spark.jars.packages graphframes:graphframes:0.5.0-spark2.1-s_2.11
spark.jars.repositories https://dl.bintray.com/spark-packages/maven
Note that the property names are different from the arguments. That is --packages on the command line corredponds to the property spark.jars.packages. You can see the property names in the Spark documentation. You'll see that there are properties to control many things: how many nodes your job uses, how many cores, how much memory, etc.

You can then point spark-shell or spark-submit to this file, e.g.

spark-shell --properties-file spark.properties'
To avoid typing --properties-file each time, you can alias the spark-shell and spark-submit commands:
alias spark-shell='spark-shell --properties-file ~/spark.properties'
alias spark-submit='spark-submit --properties-file ~/spark.properties'
To get this each time, put it in your .bashrc and .profile files

Submitting Spark to the cluster

The examples so far have all used --master local. That causes the program to be run in a single JVM on the systrm you're logged into. The only option that makes sense is --driver-memory, which controls the size of the JVM.

To get more parallelism, you need to submit your job to the cluster. Theere it will be split across several "executors," possibly running on different nodes of the cluster. One additional process, the "driver," coordinates the job as a whole. (With --master local, the single JVM is considered the driver.)

To use the cluster, you'll specify "--master yarn". (Yarn is the name of the scheduling system for the cluster.) You may also want to specify the number of executors and the amount of memory each takes. We recommend 3 executors. The default size is 512m, which may be enough, but you should feel free to up to 4g for each executor and 8g for the driver if the default isn't enough.

Here's an example

export SPARK_MAJOR_VERSION=2
cd /usr/hdp/current/spark2-client
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 3 --driver-memory 4g --executor-memory 4g  examples/jars/spark-examples*.jar 10

You can watch what's going on in yarn by using "yarn top". This will show yuo all jobs running on the cluster. You'll see -1 in some entries, because you only have the permissions to see details of your own job. Note that our copy of yarn only schedules memory. Every task is shown as using 1 vcore, no matter what it actually does. There's no maximum number of vcores, despite the implication that there is.

Some documentation talks about setting the number of cores. That has no effect in our environment. (Indeed it doesn't have any effect for most clusters.)

NOTE: During periods when students are doing assignments, please restrict yourself to

During the summer, or the beginning of the term, larger numbers are OK. Please do not use more than 75 G of memory total. The number of executors doesn't matter: it's total memory across all tasks that's the issue. Note that yarn won't schedule a task (i.e. drivers or executor) using more than 20G of memory. Since there's additional overhead beyond the jvm, that means you can't go above 17g in your specifications. Please contact help@cs.rutgers.edu if you really need a larger task.

If you use these argumrnts, please watch with "yarn top" to verify that you have the number of containers and memory usage you expect.

Note that Spark only has one memory argument for drivers and executors. The JVM size is set automatically from this argument. This contrasts with MapReduce, where you have to set both.

In principle, you can run into a situation where yarn kills your job because you didn't allocate enough memory. This will be very rare. It could occur if you use lots of memory-mapped files. SQL usage could also trigger it. But I don't expect it to happen. Here's what the error would probably look like:

18/12/21 11:36:10 INFO mapreduce.Job: Task Id : attempt_1545409883169_0001_m_000007_0, Status : FAILED
Container [pid=2738,containerID=container_e64_1545409883169_0001_01_000009] is running beyond virtual memory limits. Current usage: 100.6 MB of 1 GB physical memory used; 5.3 GB of 2.1 GB virtual memory used. Killing container.
...
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143. 

Running MapReduce Jobs

(This section is about running MapReduce jobs written in Java. If you're interested in using Python, see Writing An Hadoop Mapreduce Program in Python.)

WARNING: If your job doesn't run immediately, please don't change the jar file for it before it runs. Once the job is ready to run, if the jar file has changed since the job was submitted, the jobs fails.

WARNING 2: Once you have submitted your job with the "yarn" command, typing ^C won't terminate it. If you change your mind, use "yarn application -list" to list all of your applications. Note the application ID of the one you don't want. Use "yarn application -kill ID" to kill it. (This doesn't apply if you're running in local mode.)

These instructions assume that you have assembled all the classes in your program into a jar file. Simply run that file with "yarn", e.g. to run one of the examples you can do

yarn jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi 16 10000
Everything after the jar file name is an argument, passed to the program. In this case the jar file has a number of example programs. "pi" tell it to calculation the value of pi. 16 and 10000 specify the number of maps to use and the number of samples in each map.

By default, 5 processes are created:

The job is scheduled through "yarn," a scheduler that distributes tasks across the 3 nodes, and makes sure no more jobs are running than the system can handle.

Local exection

While you're still debugging, you may prefer to avoid yarn, and just run in a single JVM. When you do this, you get one JVM with one map and one reduce running at a time. The memory defaults to 1 GB, which may not be enough, unless you do your debugging with small files. If you need more memory than this, you can set the environment variable YARN_CLIENT_OPTS. Here's an example of running the pi demo in local mode with 4G of memory. (It won't work with the default of 1G.)

export YARN_CLIENT_OPTS="-Xmx4000m"
yarn jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi -Dmapreduce.framework.name=local 16 10000
The key is "-Dmapreduce.framework.name=local" This still uses 16 maps. They just won't be processed in parallel. Note that 'export YARN_CLIENT_OPTS="-Xmx4000m"' only has to be done once. It will continue for the rest of your session.

4 or 8G should be enough for course assignments. If that isn't enough, please talk with your instructor. For research or major projects, you can use up to 24 G. However for for problems of that size you probably want to use distributed mode.

NOTE: "-Dmapreduce.framework.name=local" will only be processed if you implement the Tool interface, as noted below. Otherwise you can use conf.set("mapreduce.framework.name", "local") in your program.

Other configuration options described below don't apply with local execution.

Details of parallel exection

Once you've verified that your program works, you can try it without "-Dmapreduce.framework.name=local". Without that, your job will be run in parallel across the cluster. For typical coursework, it will probably be faster in local mode than distributed, but it's still worth getting experience running the way real Hadoop jobs would be run. For larger jobs parallelism definitely speeds things up.

You can watch what's going on in yarn by using "yarn top". You'll see -1 in some entries, because you only have the permissions to see details of your own job. Note that our copy of yarn only schedules memory. Every task is shown as using 1 vcore, no matter what it actually does. There's no maximum number of vcores, despite the implication that there is.

This system is set up for instrucional use. We need to be able to execute a number of jobs at the same time, to handle periods before assignments are due. That means that we've had to limit both the size of processes and the number running at the same time. The parameters are listed above.

You shouldn't need more than the resources we supply for course assignments. If you do, please consult your instructor.

You may be able to override these configurations, depending upon how your program is written. The main class has to include

extends Configured Implements Tool
and the run method has to use getConf()
Job job = new Job(getConf());
If you do that, then you can specify configuration arguments. Here's ann example from the examples program:
yarn jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi -Dmapreduce.job.running.map.limit=16 -Dmapreduce.job.running.reduce.limit=9 -Dmapreduce.job.reduces=9 -Dmapreduce.map.memory.mb=4096 -Dmapreduce.map.java.opts=-Xmx3481m 16 100000
Here is what the options do:

You may wonder why both mapreduce.map.memory.mb and mapreduce.map.java.opts are needed. mapreduce.map.memory.mb tells yarn how much memory to allocate to the process. mapreduce.map.java.opts tells Java how much memory to use. It's 3/4 the memory assigned because additional memory beyond this size is actually used by the task. The format of the two arguments is different: for the first, it's a number, which is interpreted as MB. For the second it's a standard JVM argument, typically something like -Xmx4g, for 4 G. The examples uses m, to make it easier to get the who parameters consistent.

If the amount of memory allocated is too small, your task will likely crash. When I saw this, there was no usable error message. It just stopped without producing usable output.

NOTE: During peridds when students are doing assignments, please restrict yourself to 24 G. During the summer, or the beginning of the term, you can use up to 75 G. The number of tasks doesn't matter: it's total memory that's the issue. However yarn won't schedule jobs with more than 20 G of memory for a single tasks (i.e. one map or reduce task). If you need more than 20 G, please contact help@cs.rutgers.edu.

If you use these argumrnts, please watch with "yarn top" to verify that you have the number of containers and memory usage you expect.

Using a configuration file for a MapReduce job

If you want to specify options for your MapReduce job, but you don't like typing all the options each time, you can put the options in a file. For example, instead of

yarn jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi -Dmapreduce.job.running.map.limit=16 16 10000
you could use
yarn jar /usr/hdp/2.6.3.0-235/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi -conf mapred.xml 16 10000
This probably isn't worth it for one option, but might be if you use many. Here's mapred.xml:
<?xml version="1.0"?>
<?xml-style type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
     <name>mapreduce.job.running.map.limit</name>
     <value>16</value>
  </property>
</configuration>

Here's a second approach. Instead of "Job job = new Job(getConf());" use "Configuration conf = getConf()". Then you can use set to set parameters. E.g. conf.set("mapreduce.job.running.map.limit","16");

Here's a third approach: Create a simple shell script to run your program, e.g. create a file dopi.sh

#!/bin/sh
MRDIR=/usr/hdp/2.6.3.0-235/hadoop-mapreduce

yarn jar $MRDIR/hadoop-mapreduce-examples.jar pi -Dmapreduce.job.running.map.limit=16 16 10000
Now do "chmod +x" dopi.sh to make it executable. You can now run it by typing "./dopi.sh".

Technical informtion about the cluster

The Hadoop system is contained in 7 VMs, running on ilab1, ilab2 and ilab3 using KVM.

Because this cluster is intended for use in a student environment, Kerberos security is enabled on it. In order to do any operations you need Kerberos credentials. Our systems are set up to get credentials for you when you login, whether via ssh or the web tools.