Computer Science Ilab Hadoop Support

The main computer science instructional cluster has a small Hadoop cluster as part of it. This cluster is intended for small class projects. It has three fairly substantial nodes. That's enough to handle class assignments, but wouldn't be appropriate for research or theses. (For research, we'd be happy to put you in contact with OARC, the Univeristy HPC group. They have a small cluster, and can work with cloud providers.)

Here are the major parts:

WARNING: Hadoop uses a special file system, HDFS. Files on HDFS are not backed up well, and the HDFS file system may be reinitialized over the summer. We urge you to keep copies of files you care about on your normal ilab home directory or (if they're big) /common/users.

Table of contents

Note on python for this cluster

Python for the Hadoop cluser is completely separate from other Computer Science systems. If you install your own packages using "pip install --user", as described below, it will work for Jupyter, data1, data2, and data3, but the packages will not be available on other ilab systems.

For the Hadoop cluster, we have three versions of Python. Versions of python2 and python3 come with the ooperating system. We haven't removed them. But the one you probably want is a more recent Python 3, which we have installed using anaconda.

When you use Python from Jupyterhub you automatically get the new Python 3. For data1, data2, and data3, we have set the default environment to use the new Python 3 as well.

If you set PATH yourself in .bashrc, make sure you include /usr/lib/anaconda3/bin before /bin and /usr/bin. We also set PYSPARK_PYTHON=/usr/lib/anaconda3/bin/python3. This will make sure that when you run Python interactively you get the same version that you get with Jupyterhub.

If you need to install your own python packages, we suggest that you use the command

pip install --user PACKAGE
Jupyter, data1, data2 and data3 (but not other ilab systems) are set up so that "install --user" automatically installs packages to /common/clusterdata/USER/local. That makes sure that python will use the packages whether you call it interactively, via Jupyterhub, or for jobs submitted to the cluster.

Normally, "install --user" installs in ~/.local. That location won't work for jobs running on the cluster. So we are using a special location that is available in all Hadoop contexts. It is not available outside the Hadoop system, i.e. outside jupyterhub, data1, data2 and data3.

WARNING: When you use pip, it will suggest that you upgrade it to a new version. Do NOT try to do this. You can't actually upgrade PIP, because it is installed in a system directory. In attempting to do so, you will end up with an inconsistent set of packages.

HDFS files

HDFS is a distributed file system used by most of the Hadoop tools. You have a home directory in HDFS that is completely separate from your normal home directory. In fact you can't even use normal Linux tools such as the "ls" command to look at it. To see what files are in HDFS you have to use "hdfs dfs -ls".

There's no concept of a current directory in HDFS. So there's nothing like "cd" to change directories. If you omit the directory name, you'll always get /user/NETID.

hdfs dfs -ls /user/NETID
hdfs dfs -ls
where NETID is your netid will do the same thing.

The advantage of HDFS is that it's a distributed file system. For many tasks that allows the system to distribute file access across the nodes. In a large cluster that gives much greater total bandwidth than with a conventional file system. This cluster doesn't actually do that, so there's no performance advantage to HDFS. However the Hadoop tools expect data to be there, so you still have to use it.

NOTE: Spark and other tools that run on the cluster normally put their files in HDFS or SQL. If you need for a Spark job to access a file in your home directory, you should copy it from your home directory into HDFS. Otherwise Spark won't be able to see it. If it's small you can upload it through the files view in Ambari. (See above.)

To copy larger files into HDFS, login to data1, data2 or data3, and do something like this

hdfs dfs -put FILE /user/YOURNETID
By default HDFS commands will refer to your main direcory, /user/YOURNETID. So this command could also be written as
hdfs dfs -put FILE 
To look at your files in HDFS
hdfs dfs -ls /user/YOURNETID
or simply
hdfs dfs -ls
or if you want to see everything include subdirectories, do
hdfs dfs -ls -R /user/YOURNETID
For more help on HDFS options do
hdfs dfs -help
[Technical note: Spark can actually access non-HDFS files using a URI that starts with files:///. The problem is that our cluster uses Kerberos security. While the cluster nodes can see your home directory, jobs running in the cluster won't normally have Kerberos tickets for you, so they won't be able to read files from your home directory. You can get around this by putting files in /common/clusterdata/NETID. /common/clusterdata is mounted by all of the nodes in the Hadoop cluster without Kerberos security. It is slightly less secure than our normal home directories, but the difference is probably not significant.]

Software versions

Hadoop is open-source software, coordinated by the Apache Foundation. However the whole Hadoop econsystem consists of many separate projects. Most people use distributions that put together compatible versions of all the pieces and tie them together. There are two major distributions: Hortonworks and Cloudera.

This cluster is based on Hortonworks version 2.6.5. It is Hadoop version 2.7.3.2.6.3.0-235. Installation and management is done by a web management system called Ambari. Hortonworks 2.6.5 uses Ambari 2.6.0.

There are several copies of Java on the system. Hadoop itself uses /usr/jdk64/jdk1.8.0_112. If you simply type "java" you get 1.8.0_161.

"python" is the standard Centos 7.4 version of python, which is python 2.7.5. "python3" is from EPEL (a supplementary library for Centos). It is Python 3.4.8. Anaconda 5.2.0 is also installed, in /usr/lib/anaconda3. It has python 3.6 in /usr/lib/anaconda3/bin.

The Hadoop nodes use Centos 7.4 as their Linux version. The most recent version of Centos is 7.5. However Hortonworks hasn't certified their software with 7.5, so we are sticking with Centos 7.4 for the moment. (Jupyterhub, however, is running on Centos 7.5.)

How to Run Spark Programs on the Cluster

Consider using Jupyter notebook, https://jupyter.cs.rutgers.edu, or Zeppelin, https://data-services2.cs.rutgers.edu:9995, for running Python and Scala code that use Spark. This section will describe how to do things using the command line. The notebook is probably best for small programs and testing things out. This section is better for more substantial programs.

These examples all use Spark, but building and running Scala and Python that use other Hadoop components should be similar.

For a good introduction to programming with Spark in all the supported languages, see the official Spark documentation, particularly the Spark programming guide Here are examples of programming in all three languages.

You are probably familiar with both Python and Java, but not Scala. While Spark can be used with Java, the API was really developed with Scala in mind. You should consider learning Scala. It's quite similar to Java, with somewhat more modern syntax and language features. Here's the official Scala documentation .

Spark with Scala

See A Lap Around Spark for a quick introduction to Spark. Ignore instructions to do "su spark." This demonstrates typing Scala Spark programs interactively.

As documented in that introduction, an easy way to try Spark is to type commands interactively.

export PATH="/usr/hdp/current/spark2-client/bin/:$PATH"
spark-shell
The export PATH command is only needed once in a session, and can be put in your .bashrc

However for significant programs you probably don't want to type them in every time. Spark programs using Scala are built just like Java applications, because Scala runs in a Java JVM. It's really just a different syntax for Java functionality. Thus programs are put into .jar files.

Here are a couple of examples of using example programs, where the .jar files are already installed in the system.

This is an example of running Spark code just on the current node. You should be logged into data1, data2, or data3:

export SPARK_MAJOR_VERSION=2
cd /usr/hdp/current/spark2-client
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 'local[3]' examples/jars/spark-examples*.jar 10
Here's what it does:

If for some reason you want to use Spark 1, use "cd /usr/hdp/current/spark-client" rather than "cd /usr/hdp/current/spark2-client." HHowever I haven't done any tests with that.

Here's an example of running Spark code on the cluster. That is, it will run the program in parallel using all three nodes. To use this example, you need to be logged in on data1, data2, or data3.

export SPARK_MAJOR_VERSION=2
cd /usr/hdp/current/spark2-client
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 examples/jars/spark-examples*.jar 10
Here's what it does:

Building and running your own program

Before doing anything, we recommend starting with

export SPARK_MAJOR_VERSION=2
PATH=/usr/hdp/current/spark2-client/bin:$PATH
This only has to be done once, and can be put in your .bashrc

Set up your build area as follows:

NOTE: Despite the name "SparkWordCount," the program is actually a bit more complex. It ends up outputting character counts, not word counts.

Now that you have built a jar file, you can run it just like the samples.

First you need a file fr the program to read. You'll need to copy some file into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

Now you can run the program. This will run it on the computer you're logged into

spark-submit --class SparkWordCount --master local [--deploy-mode client --executor-memory 1g --name wordcount --conf "spark.app.id=wordcount"] target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt 1
where NETID is your netid. Don't actually include the []. The part in [] isn't needed, but shows the kinds of things you can specify.

This is a word count program. /user/NETID/data.txt is the file whose words it should count. Note that Spark expects the input to be in the HDFS file system. So before running the program you have to copy some file into HDFS. That's what the "hdfs dfs -put" command does. The "1" says to count all words that are at least 1 character long.

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Object declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The example above runs the program only on the current system. To run it on all nodes of the cluster, do

spark-submit --class SparkWordCount --master yarn --num-executors 3 [--driver-memory 512m --executor-memory 512m] --executor-cores 5 target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt 1

Don't actually include the []. The two memory options, shown in [], aren't needed. Without them memory use is a bit larger, but not enough to be a problem. Indeed the number of executors and cores aren't either. They default to all the nodes in the cluster and 1, respectively.

For this short job, using the cluster takes longer. It has to schedule jobs on all three nodes using a scheduling system. However for a large job, it should be faster to spread the work out over 3 nodes.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

Python

NOTE: We have both python 2.7.5 and python 3.4.8. By default you will get Python 2.7.5. If you want to use Python 3 from Anaconda, execute the following before doing any programming:

export PYSPARK_PYTHON=/usr/lib/anaconda3/bin/python3
You may want to put that in your .bashrc file. Both the pyspark shell and spark-submit will use the version of Python you specify. (If you prefer to use one of the pythons that come with Centos, you can use /usr/bin/python or /usr/bin/python3. However we'll be installing new packages primarily in Anaconda.)

To use python with Spark interactively, do the following:

export PATH="/usr/hdp/current/spark2-client/bin/:$PATH"
pyspark
The export PATH command is only needed once in a session, and can be put in your .bashrc Pyspark is python with Spark-related libraries preloaded and a Sparkcontext already created as "sc".

However for longer programs, you probably want to put them in a file and run thhem. Here's how to run an example program that they supply:

cd /usr/hdp/current/spark2-client
./bin/spark-submit --master 'local[3]' examples/src/main/python/pi.py 100
As above, "--master=local[3]" means to run it on the current node, using 3 cores.

Here's an example of runniing on the cluster:

cd /usr/hdp/current/spark2-client
./bin/spark-submit --master yarn --num-executors 3 --executor-cores 5 examples/src/main/python/pi.py 1000
Note: While the second example ran, it didn't print an output. However the correct value was printed to the log. See the Yarn queue view for access to the log file.

Running your own code

Consider using Jupyter notebook, https://jupyter.cs.rutgers.edu. This is a good approach for small programs, and for interactive exploration. The rest of this section assumes you want to do more conventional programming.

Before doing anything, we recommend starting with

PATH=/usr/hdp/current/spark2-client/bin:$PATH
This only has to be done once, and can be put in your .bashrc

If you want to try out simple programs, you can try

export SPARK_MAJOR_VERSION=2
pyspark
Assuming you're set the path, that will give you a copy of python with Spark accessible to it.

However for serious programs you probably don't want to type them in each time. You'll want to put the program in a file. Here's how:

Download this sample code: WordCount.py.

You'll also need a data file in HDFS for the program to read. This will copy SOMEFILE into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

At this point you can use the same kind of commands as above, e.g. to run it just on the current computer

spark-submit --master 'local[3]' spark1.py /user/NETID/data.txt

To run it on the cluster (i.e. on all three nodes)

spark-submit --master yarn --num-executors 3 --executor-cores 5 spark1.py /user/NETID/data.txt

As with the Scala example, with a job this short it takes a lot longer to run it on the cluster. However with very large files distributing the work would get you better results.

When you're running in distributed mode, output will go to the log file, not your terminal. So look at YARN. Find your job, which should be near the top, click on it, then in the display about the job, look for the column "logs." The output will be near the end of the log.

You might find it more convenient for your program to write its output to a file.

If you need to use additional libraries that we haven't installed, one approach is to pass them to spark-submit with the --py-files argument. This takes a comma-separated list of files. They can be either .py files or ZIP files. We're working on a way to use an environment created with "pip install --user". That will be documented here once we've verified it.

Java

Java is used much like Scala, because Scala actually uses the Java JVM and build process. Thus we suggest that you look at the Scala section and try running the samples. That will get you used to the tools.

Building and running your own program

Before doing anything, we recommend starting with

export SPARK_MAJOR_VERSION=2
PATH=/usr/hdp/current/spark2-client/bin:$PATH
This only has to be done once, and can be put in your .bashrc

Set up your build area as follows:

Now that you have built a jar file, you can run it just like the samples.

First you need a file fr the program to read. You'll need to copy some file into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

Now you can run the program. This will run it on the computer you're logged into

spark-submit --class JavaWordCount --master local target/javawordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt
where NETID is your netid. Unlike the Scala example, I've used the minimum number of arguments to the spark-submit command. See the Scala section for the kinds of things you can specify.

Everything up to and including the .jar file is needed every time you run a Java program. The last item is an argument to the program. Your program won't necessarily expect arguments, and you can also write programs that take more than one.

This is a word count program. /user/NETID/data.txt is the file whose words it should count. Note that Spark expects the input to be in the HDFS file system. So before running the program you have to copy some file into HDFS. That's what the "hdfs dfs -put" command does.

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Class declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The example above runs the program only on the current system. To run it on all nodes of the cluster, do

spark-submit --class JavaWordCount --master yarn --num-executors 3 --executor-cores 5 target/javawordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt

Everything up to and including the .jar file is needed every time you run a Java program, though you could actually omit the number of executors and number of cores, and take the default (all 3 systems, one core on each system). The last item is an argument to the program. Your program won't necessarily expect arguments, and you can also write programs that take more than one.

As above, I've not used as many arguments as in the Scala examples. See the Scala section for what you can specify.

For this short job, using the cluster takes longer. It has to schedule jobs on all three nodes using a scheduling system. However for a large job, it should be faster to spread the work out over 3 nodes.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

About pom.xml

When you specify a "import" statement in your program, this implicitly refers to a jar file that has the code implementing the package. That jar file has to be available both when your program is being compiled / built, and when it is run. Since programs are typically run on the cluster, that requires the jar files to be on every node in the cluster.

The pom.xml file declares what packages you need. The "mvn package" command will retrieve them from various network respositories, and make them avaiable to the Java compiler when building your program.

pom.xml does several other things in addition:

In the pom.xml files I supply, I've included a few Spark packages, and in comments I've shown how to use additional ones.

But suppose you want to use packages that aren't part of Spark. In that case you'll need to locate the necessary jar files and add them as dependencies in pom.xml. Generally you'll be using standard open-source libraries. (See below for how to add your own .jar files.) There's a very large network respository, which has pretty much every package known to man: Maven Central. The "mvn package" command knows how to find packages in that repository, download them, and use them.

But first you have to find the location of the package in the repository. Generally a bit of searching in Google show you the Maven specificatino for a package. You're looking for what is called a "Maven dependency." Normally they look like this:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.2.0.2.6.3.0-235</version>
    </dependency>
If you look at pom.xml, it should be obvious where to add them.

More complex programs may use more than one source file. If you add additional files in src/main/java, they will be compiled and added to the jar file in target.

Note that the version numbers in the dependencies have been chosen to match the precise version of Hortonwords we have installed. These versions will change next time we upgrade the software. In practice such an exact match probably isn't necessary. You could build against generic libraries.

Including packages at runtime

Unfortunately listing a package in pom.xml is only half the battle. That only makes the .jar file available when your program is built. It also has to be available when the program runs. In cluster mode that means it has to be on every node in the cluster.

Many common packages are already present in the cluster. I believe you can see which ones by looking at /usr/hdp/2.6.3.0-235/spark2/jars on data1, 2 and 3. If your package is already present, you need to put it in pom.xml, but you don't need this section. This section shows how to supply a package in the execution environment if it's not already there.

If you need to add the package to the execution environment, you'll need to use the --packages argument to spark-submit.

E.g. suppose you wanted to use the "JSON simple" package to work with JSON objects. E.g.

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

...

    JSONObject obj = new JSONObject();
    obj.put("name", "Rutgers University");
    obj.put("age", new Integer(100));
    System.out.println(obj.toJSONString());

The JSON simple documentation gives you the Maven dependency. But you could also look in google for "maven json simple." That will point you to a Maven web page. You'll find that The most recent version is 1.1.1. It shows the following dependency:
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1.1</version>
</dependency>

Add that to the <dependency> section of your pom.xml. You should now be able to do "mvn package".

But there's more .... If you do

spark-submit --class JavaWordCount --master local target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/hedrick/data.txt
you'll get the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/json/simple/JSONObject
That's because you had the jar files available through maven when you compiled the program, but not when you ran it. This won't always be a problem. The execution environment has many of the most common libraries, e.g. most of the Apache Commons packages. It also has a couple of recent JSON libraries. I actually picked JSON Simple specifically because it's an older library that isn't present in the execution environment. (I needed something that isn't there to do this demonstration.)

To make the necessary jar file available during execution, you need to include the package in the "spark-submit" command

spark-submit --packages com.googlecode.json-simple:json-simple:1.1.1 --class JavaWordCount --master local target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/hedrick/data.txt
Note the --packages argument. It simply puts together the group ID, artifact ID and version number. If you have more than one package, list all of them separated by commas, without any whitespace.

Some packages are in repositories other than Maven cental. E.g. there's a spark-packages.org that collects packages for spark. They have their own repository. To load packages from there, you need to specify --repositories=https://dl.bintray.com/spark-packages/maven in addition to the --packages argument. For both packages and repositories, you can specify more than one value, separated by comma. No white space.

If you have jar files that aren't in maven, you'll need to have copies of them in your directory. Then you can supply a "--jars" argument to "spark-submit", using the name of the jar files separated by commas.

Note that jars loaded from --packages are cached in .ivy2, so they don't have to be downloaded every time. For jobs submitted to the cluster, there's a per-user cache on each cluster node.

Putting options in a file

If you need lots of packages, you may find it inconvenient to type long arguments to spark-shell or spark-submit each time. You can put arguments in a properties file. E.g. here's one that adds a package from the Spark packages collection. Put the following in spark.properties

spark.yarn.appMasterEnv.PYSPARK_PYTHON /usr/lib/anaconda3/bin/python3
spark.jars.packages graphframes:graphframes:0.5.0-spark2.1-s_2.11
spark.jars.repositories https://dl.bintray.com/spark-packages/maven
Note that the property names are different from the arguments. That is --packages on the command line corredponds to the property spark.jars.packages. You can see the property names in the Spark documentation. You'll see that there are properties to control many things: how many nodes your job uses, how many cores, how much memory, etc.

You can then point spark-shell or spark-submit to this file, e.g.

spark-shell --properties-file spark.properties'
To avoid typing --properties-file each time, you can alias the spark-shell and spark-submit commands:
alias spark-shell='spark-shell --properties-file ~/spark.properties'
alias spark-submit='spark-submit --properties-file ~/spark.properties'
To get this each time, put it in your .bashrc and .profile files

Technical informtion about the cluster

The Hadoop system is contained in 7 VMs, running on ilab1, ilab2 and ilab3 using KVM.

Because this cluster is intended for use in a student environment, Kerberos security is enabled on it. In order to do any operations you need Kerberos credentials. Our systems are set up to get credentials for you when you login, whether via ssh or the web tools.