Computer Science Ilab Hadoop Support

The main computer science instructional cluster has a small Hadoop cluster as part of it. This cluster is intended for small class projects. It has three fairly substantial nodes. That's enough to handle class assignments, but wouldn't be appropriate for research or theses. (For research, we'd be happy to put you in contact with OARC, the Univeristy HPC group. They have a small cluster, and can work with cloud providers.) This cluster uses Hadoop version 3.1.1, from Hortonworks.

You don't need to use Hadoop in order to use Spark, unless you want to run Spark programs across multiple nodes. To use Spark outside the Hadoop cluster, see Spark outside Hadoop .

Here are the major parts:

Yarn Memory Limit: The cluster scheduler (YARN) is set to have a limit of 32 GB per user if there are lots of users running. This will probably only affect final due dates for classes. Our default for Spark is 3 executors of 4 GB and one master of 8 GB. Similarly, our default for Map-reduce is 4 GB for map and 8 for reduce. So these should work OK by default. However if you do a map-reduce with more than 3 maps, and you're within a couple of hours of a due date, you could run into trouble. If your Map-Reduce job gets killed, try submitting it with "-Dmapreduce.job.running.map.limit=3"

Hdfs Backup: Hadoop uses a special file system, HDFS. Files on HDFS are backed up nightly to a second HDFS file system in a separate building. Snapshots are taken nightly, so it is possible to restore deleted files within 60 days. However we are not yet sure what our policy on retention of files is going to be. It is possible that we might reset the file system each summer. Please contact help@cs.rutgers.edu if you need to keep files in HDFS on an ongoing basis, and we'll arrange to preserve them if we decice to clean the file system.

Table of contents

Note on python for this cluster

Please make sure you use the copy of python from Anaconda. By default your path is set appropriately. When you type "python," you should see

Python 3.7.1 (default, Dec 14 2018, 19:28:38) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
If for some reason you don't, make sure /usr/lib/anaconda3/bin is in your path.

If you want to install your own package, you can use "pip install --user PACKAGE". Normally, "install --user" installs in ~/.local. That location won't work for jobs running on the cluster. So we are using a special location that is available in all Hadoop contexts. It is not available outside the Hadoop system, i.e. outside jupyterhub, data4, data5 and data6.

Don't try to update pip: When you use pip, it will suggest that you upgrade it to a new version. Do NOT try to do this. You can't actually upgrade PIP, because it is installed in a system directory. In attempting to do so, you will end up with an inconsistent set of packages.

HDFS files

HDFS is a distributed file system used by most of the Hadoop tools. You have a home directory in HDFS that is completely separate from your normal home directory. In fact you can't even use normal Linux tools such as the "ls" command to look at it. To see what files are in HDFS you have to use "hdfs dfs -ls".

There's no concept of a current directory in HDFS. So there's nothing like "cd" to change directories. If you omit the directory name, you'll always get /user/NETID.

hdfs dfs -ls /user/NETID
hdfs dfs -ls
where NETID is your netid will do the same thing.

The advantage of HDFS is that it's a distributed file system. For many tasks that allows the system to distribute file access across the nodes. In a large cluster that gives much greater total bandwidth than with a conventional file system. This cluster doesn't actually do that, so there's no performance advantage to HDFS. However the Hadoop tools expect data to be there, so you still have to use it.

There's a new HDFS file system for the new version. To look at files, it's probably most convenient to login via ssh or X2Go to data4.cs.rutgers.edu, data5, or data6. From there, you can copy files from the old system to the new system using a command like

hdfs dfs -cp hdfs://data-services2/user/NETID/file /user/NETID/file
Note that if you copy a directory, all the files in it are also copied.

To look at your old directory from the new systems, you can use a command like

hdfs dfs -ls hdfs://data-services2/user/NETID

NOTE: Spark and other tools that run on the cluster normally put their files in HDFS or SQL. If you need for a Spark job to access a file in your home directory, you should copy it from your home directory into HDFS. Otherwise Spark won't be able to see it. If it's small you can upload it through the files view in Ambari. (See above.)

To copy larger files into HDFS, login to data4, data5 or data6 and do something like this

hdfs dfs -put FILE /user/YOURNETID
By default HDFS commands will refer to your main direcory, /user/YOURNETID. So this command could also be written as
hdfs dfs -put FILE 
To look at your files in HDFS
hdfs dfs -ls /user/YOURNETID
or simply
hdfs dfs -ls
or if you want to see everything include subdirectories, do
hdfs dfs -ls -R /user/YOURNETID
For more help on HDFS options do
hdfs dfs -help
[Technical note: Spark can actually access non-HDFS files using a URI that starts with files:///. The problem is that our cluster uses Kerberos security. While the cluster nodes can see your home directory, jobs running in the cluster won't normally have Kerberos tickets for you, so they won't be able to read files from your home directory. You can get around this by putting files in /common/clusterdata/NETID. /common/clusterdata is mounted by all of the nodes in the Hadoop cluster without Kerberos security. It is slightly less secure than our normal home directories, but the difference is probably not significant.]

How to Run Spark Programs on the Cluster

Consider using Jupyter notebook, https://jupyter.cs.rutgers.edu, or Zeppelin, https://zeppelin.cs.rutgers.edu, for running Python and Scala code that use Spark. This section will describe how to do things using the command line. The notebook is probably best for small programs and testing things out. This section is better for more substantial programs.

Spark is actually available on all of our computers. You don't need to use Hadoop unless you want to run your programs across multiple nodes. See the Jupyter section of Data Science Facilities for using Spark in the Jupyter notebook, and Spark outside Hadoop for other Spark programming. Currently Zeppelin is only available within the Hadoop cluster.

In comparison with Hadoop, Spark on our other systems:

These examples all use Spark, but building and running Scala and Python that use other Hadoop components should be similar.

For a good introduction to programming with Spark in all the supported languages, see the official Spark documentation, particularly the Spark programming guide Here are examples of programming in all three languages.

You are probably familiar with both Python and Java, but not Scala. While Spark can be used with Java, the API was really developed with Scala in mind. You should consider learning Scala. It's quite similar to Java, with somewhat more modern syntax and language features. Here's the official Scala documentation .

Spark with Scala

See A Lap Around Spark for a quick introduction to Spark. Ignore instructions to do "su spark." This demonstrates typing Scala Spark programs interactively.

As documented in that introduction, an easy way to try Spark is to type commands interactively.

spark-shell

However for significant programs you probably don't want to type them in every time. Spark programs using Scala are built just like Java applications, because Scala runs in a Java JVM. It's really just a different syntax for Java functionality. Thus programs are put into .jar files.

Here are a couple of examples of using example programs, where the .jar files are already installed in the system.

This is an example of running Spark code just on the current node. You should be logged into data4, data5, or data6 :

spark-submit --class org.apache.spark.examples.SparkPi --master 'local[3]' /usr/hdp/current/spark2-client/examples/jars/spark-examples*.jar 10
Here's what it does:

If for some reason you want to use Spark 1, use "cd /usr/hdp/current/spark-client" rather than "cd /usr/hdp/current/spark2-client." HHowever I haven't done any tests with that.

Please use "--master local" for most course assignments. If you're doing research, or a major student project, you may want to submit your job to the whole cluster. See Submitting Spark to the cluster for how to do that.

NOTE: When you run with --master local, you may get two error messages in the end complaining that it can't delete files. You can ignore that.

Building and running your own program

Set up your build area as follows:

NOTE: Despite the name "SparkWordCount," the program is actually a bit more complex. It ends up outputting character counts, not word counts.

Now that you have built a jar file, you can run it just like the samples.

First you need a file fr the program to read. You'll need to copy some file into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

Now you can run the program. This will run it on the computer you're logged into

spark-submit --class SparkWordCount --master local --driver-memory 4g --name wordcount target/sparkwordcount-0.0.2-SNAPSHOT.jar /user/NETID/data.txt 1
where NETID is your netid. Don't actually include the []. The part in [] isn't needed, but shows the kinds of things you can specify.

You'll probably get two error messages complaining that it can't delete 2 files. You can ignore them.

This is a word count program. /user/NETID/data.txt is the file whose words it should count. Note that Spark expects the input to be in the HDFS file system. So before running the program you have to copy some file into HDFS. That's what the "hdfs dfs -put" command does. The "1" says to count all words that are at least 1 character long.

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Object declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The only option you're likely to need beyond "--master local" is --driver-memory. The default is 512m. That might well be enough for many programs. Feel free to use 4G or 8G if necessary if you run out of memory. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

I've done this example with a file in HDFS, so that you can run it on the cluster. Actually, if you're running locally (which we recommend most of the time) it can read files from your home directory. Here's an example of running the program to read a file data.txt in your home directory:

spark-submit --class SparkWordCount --master local --driver-memory 4g --name wordcount target/sparkwordcount-0.0.2-SNAPSHOT.jar file:///ilab/users/NETID/data.txt 1
To get Spark to use a local file, you need to prefix the file name with "file:". You also need an absolute name. To find the absolute name for data.txt. in your home directory you can do "echo ~/data.txt".

Python

Before doing this, verify that python is set properly.

Both the pyspark shell and spark-submit will use the version of Python you specify.

To use python with Spark interactively, do the following:

pyspark
Pyspark is python with Spark-related libraries preloaded and a Sparkcontext already created as "sc".

However for longer programs, you probably want to put them in a file and run thhem. Here's how to run an example program that they supply:

spark-submit --master 'local[3]' /usr/hdp/current/spark2-client/examples/src/main/python/pi.py 100

Please use "--master local" or local[3] for most course assignments. If you're doing research, or a major student project, you may want to submit your job to the whole cluster. See Submitting Spark to the cluster for how to do that.

NOTE: Anytime you do spark-submit with --master set to local or local[N], you may get two errors at the end of the job. Spark creates temporary directories for each job. It tries to delete these directories at the end. It is unable to do so. This error is harmless.

Consider using Jupyter notebook, https://jupyter.cs.rutgers.edu. This is a good approach for small programs, and for interactive exploration. The rest of this section assumes you want to do more conventional programming.

If you need to use additional libraries that we haven't installed, one approach is to pass them to spark-submit with the --py-files argument. This takes a comma-separated list of files. They can be either .py files or ZIP files. However it's probably easier to install the libraries using "pip install --user".

Java

Java is used much like Scala, because Scala actually uses the Java JVM and build process. Thus we suggest that you look at the Scala section and try running the samples. That will get you used to the tools.

Building and running your own program

Set up your build area as follows:

Now that you have built a jar file, you can run it just like the samples.

First you need a file fr the program to read. You'll need to copy some file into HDFS

hdfs dfs -put SOMEFILE /user/NETID/data.txt

Now you can run the program. This will run it on the computer you're logged into

spark-submit --class JavaWordCount --master local --driver-memory 4g  target/javawordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt
where NETID is your netid. Unlike the Scala example, I've used the minimum number of arguments to the spark-submit command. See the Scala section for the kinds of things you can specify.

You'll probably get two error messages complaining that it can't delete 2 files. You can ignore them.

Everything up to and including the .jar file is needed every time you run a Java program. The last item is an argument to the program. Your program won't necessarily expect arguments, and you can also write programs that take more than one.

This is a word count program. /user/NETID/data.txt is the file whose words it should count. Note that Spark expects the input to be in the HDFS file system. So before running the program you have to copy some file into HDFS. That's what the "hdfs dfs -put" command does.

I've used HDFS for the example, because it will work if you submit the job to the cluster. However if you're running locally (which we normally recommend), it can read files from your home directory. Here's an example. To get Spark to use a local file, you need to prefix the file name with "file:". You also need an absolute name. To find the absolute name for data.txt. in your home directory you can do "echo ~/data.txt".

spark-submit --class JavaWordCount --master local --driver-memory 4g  target/javawordcount-0.0.1-SNAPSHOT.jar file:/ilab/users/NETID/data.txt

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Class declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The only option you're likely to need beyond "--master local" is --driver-memory. The default is 512m. That might well be enough for many programs. Feel free to use 4G or 8G if necessary if you run out of memory. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.

Everything up to and including the .jar file is needed every time you run a Java program, except --driver-memory, which is optional. The last item is an argument to the program. Your program won't necessarily expect arguments, and you can also write programs that take more than one.

As above, I've not used as many arguments as in the Scala examples. See the Scala section for what you can specify.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

About pom.xml (Scala and Java)

When you specify a "import" statement in your program, this implicitly refers to a jar file that has the code implementing the package. That jar file has to be available both when your program is being compiled / built, and when it is run. Since programs are typically run on the cluster, that requires the jar files to be on every node in the cluster.

The pom.xml file declares what packages you need. The "mvn package" command will retrieve them from various network respositories, and make them avaiable to the Java compiler when building your program.

pom.xml does several other things in addition:

In the pom.xml files I supply, I've included a few Spark packages, and in comments I've shown how to use additional ones.

But suppose you want to use packages that aren't part of Spark. In that case you'll need to locate the necessary jar files and add them as dependencies in pom.xml. Generally you'll be using standard open-source libraries. (See below for how to add your own .jar files.) There's a very large network respository, which has pretty much every package known to man: Maven Central. The "mvn package" command knows how to find packages in that repository, download them, and use them.

But first you have to find the location of the package in the repository. Generally a bit of searching in Google show you the Maven specificatino for a package. You're looking for what is called a "Maven dependency." Normally they look like this:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.2.0.2.6.3.0-235</version>
    </dependency>
If you look at pom.xml, it should be obvious where to add them.

More complex programs may use more than one source file. If you add additional files in src/main/java, they will be compiled and added to the jar file in target.

Note that the version numbers in the dependencies have been chosen to match the precise version of Hortonworks we have installed. These versions will change next time we upgrade the software. In practice such an exact match probably isn't necessary. You could build against generic libraries.

Including packages at runtime

Unfortunately listing a package in pom.xml is only half the battle. That only makes the .jar file available when your program is built. It also has to be available when the program runs. In cluster mode that means it has to be on every node in the cluster.

Many common packages are already present in the cluster. I believe you can see which ones by looking at /usr/hdp/current/spark2-client/jars on data4, 5, and 6. If your package is already present, you need to put it in pom.xml, but you don't need this section. This section shows how to supply a package in the execution environment if it's not already there.

If you need to add the package to the execution environment, you'll need to use the --packages argument to spark-submit.

E.g. suppose you wanted to use the "JSON simple" package to work with JSON objects. E.g.

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

...

    JSONObject obj = new JSONObject();
    obj.put("name", "Rutgers University");
    obj.put("age", new Integer(100));
    System.out.println(obj.toJSONString());

The JSON simple documentation gives you the Maven dependency. But you could also look in google for "maven json simple." That will point you to a Maven web page. You'll find that The most recent version is 1.1.1. It shows the following dependency:
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1.1</version>
</dependency>

Add that to the <dependency> section of your pom.xml. You should now be able to do "mvn package".

But there's more .... If you do

spark-submit --class JavaWordCount --master local target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/NETID/data.txt
you'll get the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/json/simple/JSONObject
That's because you had the jar files available through maven when you compiled the program, but not when you ran it. This won't always be a problem. The execution environment has many of the most common libraries, e.g. most of the Apache Commons packages. It also has a couple of recent JSON libraries. I actually picked JSON Simple specifically because it's an older library that isn't present in the execution environment. (I needed something that isn't there to do this demonstration.)

To make the necessary jar file available during execution, you need to include the package in the "spark-submit" command

spark-submit --packages com.googlecode.json-simple:json-simple:1.1.1 --class JavaWordCount --master local target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/hedrick/data.txt
Note the --packages argument. It simply puts together the group ID, artifact ID and version number. If you have more than one package, list all of them separated by commas, without any whitespace.

Some packages are in repositories other than Maven cental. E.g. there's a spark-packages.org that collects packages for spark. They have their own repository. To load packages from there, you need to specify --repositories=https://dl.bintray.com/spark-packages/maven in addition to the --packages argument. For both packages and repositories, you can specify more than one value, separated by comma. No white space.

If you have jar files that aren't in maven, you'll need to have copies of them in your directory. Then you can supply a "--jars" argument to "spark-submit", using the name of the jar files separated by commas.

Note that jars loaded from --packages are cached in .ivy2, so they don't have to be downloaded every time. For jobs submitted to the cluster, there's a per-user cache on each cluster node.

Putting options in a file

If you need lots of packages, you may find it inconvenient to type long arguments to spark-shell or spark-submit each time. You can put arguments in a properties file. E.g. here's one that adds a package from the Spark packages collection. Put the following in spark.properties

spark.yarn.appMasterEnv.PYSPARK_PYTHON /usr/lib/anaconda3/bin/python3
spark.jars.packages graphframes:graphframes:0.5.0-spark2.1-s_2.11
spark.jars.repositories https://dl.bintray.com/spark-packages/maven
Note that the property names are different from the arguments. That is --packages on the command line corredponds to the property spark.jars.packages. You can see the property names in the Spark documentation. You'll see that there are properties to control many things: how many nodes your job uses, how many cores, how much memory, etc.

You can then point spark-shell or spark-submit to this file, e.g.

spark-shell --properties-file spark.properties'
To avoid typing --properties-file each time, you can alias the spark-shell and spark-submit commands:
alias spark-shell='spark-shell --properties-file ~/spark.properties'
alias spark-submit='spark-submit --properties-file ~/spark.properties'
To get this each time, put it in your .bashrc and .profile files

Submitting Spark to the cluster

The examples so far have all used --master local. That causes the program to be run in a single JVM on the systrm you're logged into. The only option that makes sense is --driver-memory, which controls the size of the JVM.

To get more parallelism, you need to submit your job to the cluster. Theere it will be split across several "executors," possibly running on different nodes of the cluster. One additional process, the "driver," coordinates the job as a whole. (With --master local, the single JVM is considered the driver.)

To use the cluster, you'll specify "--master yarn". (Yarn is the name of the scheduling system for the cluster.) You may also want to specify the number of executors and the amount of memory each takes. We recommend 3 executors. The default size is 512m, which may be enough, but you should feel free to up to 4g for each executor and 8g for the driver if the default isn't enough.

Here's an example

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --num-executors 3 --driver-memory 4g --executor-memory 4g  /usr/hdp/current/spark2-client/examples/jars/spark-examples*.jar 10

You can watch what's going on in yarn by using "yarn top". This will show yuo all jobs running on the cluster. You'll see -1 in some entries, because you only have the permissions to see details of your own job. Note that our copy of yarn only schedules memory. Every task is shown as using 1 vcore, no matter what it actually does. There's no maximum number of vcores, despite the implication that there is.

Some documentation talks about setting the number of cores. That has no effect in our environment. (Indeed it doesn't have any effect for most clusters.)

NOTE: During periods when students are doing assignments, please restrict yourself to

During the summer, or the beginning of the term, larger numbers are OK. Please do not use more than 75 G of memory total. The number of executors doesn't matter: it's total memory across all tasks that's the issue. Note that yarn won't schedule a task (i.e. drivers or executor) using more than 20G of memory. Since there's additional overhead beyond the jvm, that means you can't go above 17g in your specifications. Please contact help@cs.rutgers.edu if you really need a larger task.

If you use these argumrnts, please watch with "yarn top" to verify that you have the number of containers and memory usage you expect.

Note that Spark only has one memory argument for drivers and executors. The JVM size is set automatically from this argument. This contrasts with MapReduce, where you have to set both.

In principle, you can run into a situation where yarn kills your job because you didn't allocate enough memory. This will be very rare. It could occur if you use lots of memory-mapped files. SQL usage could also trigger it. But I don't expect it to happen. Here's what the error would probably look like:

18/12/21 11:36:10 INFO mapreduce.Job: Task Id : attempt_1545409883169_0001_m_000007_0, Status : FAILED
Container [pid=2738,containerID=container_e64_1545409883169_0001_01_000009] is running beyond virtual memory limits. Current usage: 100.6 MB of 1 GB physical memory used; 5.3 GB of 2.1 GB virtual memory used. Killing container.
...
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143. 

Running MapReduce Jobs

(This section is about running MapReduce jobs written in Java. If you're interested in using Python, see Writing An Hadoop Mapreduce Program in Python.)

WARNING: If your job doesn't run immediately, please don't change the jar file for it before it runs. Once the job is ready to run, if the jar file has changed since the job was submitted, the jobs fails.

WARNING 2: Once you have submitted your job with the "yarn" command, typing ^C won't terminate it. If you change your mind, use "yarn application -list" to list all of your applications. Note the application ID of the one you don't want. Use "yarn application -kill ID" to kill it. (This doesn't apply if you're running in local mode.)

These instructions assume that you have assembled all the classes in your program into a jar file. Simply run that file with "yarn", e.g. to run one of the examples you can do

yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar pi 3 10000
Everything after the jar file name is an argument, passed to the program. In this case the jar file has a number of example programs. "pi" tell it to calculation the value of pi. 3 and 10000 specify the number of maps to use and the number of samples in each map. Please don't use more than 3 maps for MapReduce jobs unless you're sure you are at a period when there's no class doing an assignment.

By default, 5 processes are created:

The job is scheduled through "yarn," a scheduler that distributes tasks across the 3 nodes, and makes sure no more jobs are running than the system can handle.

Details of parallel exection

You can watch what's going on in yarn by using "yarn top". You'll see -1 in some entries, because you only have the permissions to see details of your own job. Note that our copy of yarn only schedules memory. Every task is shown as using 1 vcore, no matter what it actually does. There's no maximum number of vcores, despite the implication that there is.

This system is set up for instrucional use. We need to be able to execute a number of jobs at the same time, to handle periods before assignments are due. That means that we've had to limit both the size of processes and the number running at the same time. The parameters are listed above.

You shouldn't need more than the resources we supply for course assignments. If you do, please consult your instructor.

You may be able to override these configurations, depending upon how your program is written. The main class has to include

extends Configured Implements Tool
and the run method has to use getConf()
Job job = new Job(getConf());
If you do that, then you can specify configuration arguments. Here's ann example from the examples program:
yarn jar /usr/hdp/3.1.0.0-78/hadoop-mapreduce/hadoop-mapreduce-examples.jar pi -Dmapreduce.job.running.map.limit=3 -Dmapreduce.job.running.reduce.limit=9 -Dmapreduce.job.reduces=9 -Dmapreduce.map.memory.mb=4096 -Dmapreduce.map.java.opts=-Xmx3481m 16 100000
Here is what the options do:

You may wonder why both mapreduce.map.memory.mb and mapreduce.map.java.opts are needed. mapreduce.map.memory.mb tells yarn how much memory to allocate to the process. mapreduce.map.java.opts tells Java how much memory to use. It's 3/4 the memory assigned because additional memory beyond this size is actually used by the task. The format of the two arguments is different: for the first, it's a number, which is interpreted as MB. For the second it's a standard JVM argument, typically something like -Xmx4g, for 4 G. The examples uses m, to make it easier to get the who parameters consistent.

If the amount of memory allocated is too small, your task will likely crash. When I saw this, there was no usable error message. It just stopped without producing usable output.

NOTE: During peridds when students are doing assignments, please restrict yourself to 24 G. During the summer, or the beginning of the term, you can use up to 75 G. The number of tasks doesn't matter: it's total memory that's the issue. However yarn won't schedule jobs with more than 20 G of memory for a single tasks (i.e. one map or reduce task). If you need more than 20 G, please contact help@cs.rutgers.edu.

If you use these argumrnts, please watch with "yarn top" to verify that you have the number of containers and memory usage you expect.

Using a configuration file for a MapReduce job

If you want to specify options for your MapReduce job, but you don't like typing all the options each time, you can put the options in a file. For example, instead of

yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar pi -Dmapreduce.job.running.map.limit=3 16 10000
you could use
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar pi -conf mapred.xml 16 10000
This probably isn't worth it for one option, but might be if you use many. Here's mapred.xml:
<?xml version="1.0"?>
<?xml-style type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
     <name>mapreduce.job.running.map.limit</name>
     <value>3</value>
  </property>
</configuration>

Here's a second approach. Instead of "Job job = new Job(getConf());" use "Configuration conf = getConf()". Then you can use set to set parameters. E.g. conf.set("mapreduce.job.running.map.limit","3");

Here's a third approach: Create a simple shell script to run your program, e.g. create a file dopi.sh

#!/bin/sh
MRDIR=/usr/hdp/current/hadoop-mapreduce-client

yarn jar $MRDIR/hadoop-mapreduce-examples.jar pi -Dmapreduce.job.running.map.limit=3 16 10000
Now do "chmod +x" dopi.sh to make it executable. You can now run it by typing "./dopi.sh".

Kafka

Currently we have a Kafka broker running on data5.cs.rutgers.edu. Only data5 has all of the Kafka client software, so we suggests working on that system. Many of the examples here won't work on systems other than data5. (If anyone plans to use Kafka for a large course, please contact help. We can put it on all cluster nodes if necesssry.)

Our Kafka uses Kerberos security, as does the rest of our tools. Unfortunately, Kafka programs don't adapt to this automatically. You must add a couple of parameters to your code if it's going to run successfully in our cluster.

This section is fairly long. That's not because Kafka is particularly important. Rather, it's because using Kafka in a secure cluster requires more work than other Hadoop tools. Generally for Spark and other technologies you can follow instructions you find on the web. With Kafka, you'll almost always have to add a couple of security parameters to code you find on the net.

NOTE: Kafka code generally uses Java's JAAS security system. If you find examples on the web for using Kafka in a secure environment, they will tell you how to create key tables, and how to write JAAS configuration files. This is not necessary for our cluster. We have a single system-wide JAAS configuration file, /usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf. It will work without needing a key table. Indeed creating a key table on our systems is fairly difficult, intentionally, because it has security risks.

Our Kafka does not use SSL. We verify users' identity, but we do not encrypt data on the network. Since all Kafka activity is occurring on a single system, data5.cs.rutgers.edu, it does not appears that SSL would provide much extra security.

Fixed version of the Kafka Quick Start

Here's an example of creating a topic, writing a couple of lines to it, and reading them. This is the equivalent of the Kafka Quickstart, but with parameters appropriate for our installation. Please note: topic6 is an example. Use your own topic name. We suggest including your netid in the name, so we know who is responsible for it.

# the following export only needs to be done once. You can put
# it in your .bashrc if you want to use kafka every time you login
export PATH=/usr/hdp/current/kafka-broker/bin:"$PATH"

kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic topic6 --zookeeper data4.cs.rutgers.edu:2181
kafka-topics.sh --describe --topic topic6 --zookeeper data4.cs.rutgers.edu:2181
kafka-console-producer.sh --topic topic6 --broker-list data5.cs.rutgers.edu:6667 --producer-property security.protocol=SASL_PLAINTEXT
stuff
more stuff
^D
kafka-console-consumer.sh --from-beginning --topic topic6 -bootstrap-server data5.cs.rutgers.edu:6667 --consumer-property security.protocol=SASL_PLAINTEXT

If no one is currently using Kafka, the first time you run this, the consumer will take a while before it shows any data.

These commands point to two services

We have modified the kafka-topics script so that when you create a topic you are made its owner.

The example of connect-standalone.sh needs some work, because the builtin configuration files they refer to don't have the necessary parameters. Note that what this command actually does it connect one source to one sink. In this case they're connecting a file to a file. So this really just copies the file, except that if you keep the program running, it will watch the source file. If you append lines to the source file, it will also append them to the destination file. Here's an example that will work:

connect-standalone.sh ./connect-standalone.properties ./connect-file-source.properties ./connect-file-sink.properties
where connect-standalone.properties contains
bootstrap.servers=data5.cs.rutgers.edu:6667
security.protocol=SASL_PLAINTEXT
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.kerberos.service.name=kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/topic.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
NOTE: The offset storage file should be specific to this topic. You might want to use the topic name in it. The first time you run the connect-standalone.sh command, this file should not exist.

Here's connect-file-source.properties. It defines the file that the command is going to read from

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=mytopic
Obviously the file name can be anything. The file should exist and have some lines in it. The topic can exist before, but if so, there shouldn't be any contents in it. If the topic doesn't exist, it will be created. But if it's created it will be public, so anyone else can read, write or delete it.

Here's connect-file-sink.properties. It defines the file the command is going to write to.

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=mytopic
The file name can be anything. I recommend using a file that doesn't exist yet. The topic should be the same as in the source file.

Note that the demo app described there can't actually work. It has hardcoded the URL of the Kafka broker, and it isn't right.

Example in Java

If you go to the Streams Tutorial on the Kafka web site, you'll see a sample of a Java application. There's a command to maven, "mvn archetype:generate" given there. It will download various files and set up a source tree for you.

Once you have the code, you can compile it using "mvn compile" and run it using "mvn exec:java -Dexec.mainClass=myapps.XXX", where XXX is the name of the program, e.g. WordCount. (mvn exec:java takes care of setting up a class path with all the necessary jar files needed for the program.)

However there are some issues. First, the programs don't all compile, and the failure causes the one I used (WordCount) not to be built. The problem is in line 48 of LineSplit.java, which is

        builder.stream("streams-plaintext-input")
but should be
        builder.stream("streams-plaintext-input")

The program I worked with was WordCount.java. To work in our environment it needs some changes.

1) The most significant is near the beginning, where there are a number of props.put statements. Here's what they should look like:

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "NETID-streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "data5.cs.rutgers.edu:6667");
        props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/NETIDstate");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        System.setProperty("java.security.auth.login.config", "/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf");
There are two places where you should use your NetID, so avoid having your program interfere with other users. We've changed the server hostname:port to be appropriate for our site, and added two properties needed for our security, SECURITY_PROTOCOL_CONFIG and the system property java.security.auth.login.config. We added STATE_DIR_CONFIG to define a directory that's private to each user. Otherwise it uses a common directory, which could result in issues. Note that you must do "mkdir /tmp/NETIDstate" to create the directory. Obviously you use your own NetID rather than NETID.

2) Change the names of the topics from streams-plaintext-input and streams-wordcount-output to names that have your NetID in them. This is to avoid conflict if several users are trying this.

3) I recommend adding print statements so you can see whether the program is actually reading input. If there's a problem with the security setup, normally you won't get an error. It just won't see any input. Add a "peek" call before the first function that processes the stream:

       builder.stream("...")
              .peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
              .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))

Once you've built the program, you need to do some things to set up:

Now run the program using "mvn exec:java -Dexec.mainClass=myapps.WordCount". You should see it read data from the input topic. If you run the program again, it won't see any data it's already processed, so you'll have to use kafka-console-producer to add more data. You'll need to add more data each time you try it (or move to a new topic).

There are several different ways to use Kafka in Java. Some may not need precisely the same changes. E.g. you sometimes create producers and consumers separately. But in general you'll always need to do

        System.setProperty("java.security.auth.login.config", "/usr/hdp/current/kafka-broker/config/kafka_client_jaas.conf");
For separate consumers and producers, you'll need to add to the configuration properties
security.protocol="SASL_PLAINTEXT"
sasl.kerberos.service.name="kafka"
either in a configuration file or using props.put, depending upon how your program is written.

Spark Streaming from Kafka

Here's a sample program that will read from a Kafka topic into a Spark Stream. JavaDirectKafkaWordCount . In this case it prints word counts from the stream, although the output will have so much logging informatin that it will be hard to see the actual output. It will process the input in batches every 2 seconds, so the counts will be only of data read since the last batch. It will start at the end of the topic, so you'll need to add new data once the program starts.

Changes:

Building: Create a directory. It will have two files in it: pom.xml and src/main/java/JavaDirectKafkaWordCount.java. You'll want to do "mkdir -p src/main/java" to create the directory for JavaDirectKafkaWordCount.java. Here's pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>sparkwordcount</groupId>
  <artifactId>sparkwordcount</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>"Spark Word Count"</name>
  
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  </properties>

  <build>
    <plugins>
<!-- this plugin is for java code. the source and target versions are Java versions -->
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>  
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.7</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.3.2</version>
    </dependency>
  </dependencies>
</project>

In the main directory, do "export JAVA_HOME=/usr/jdk64/jdk1.8.0_112" then "mvn package." The command "mvn package" will compile the program and produce a jar file target/sparkwordcount-0.0.1-SNAPSHOT.jar.

To run the program, use the spark-submit command:

spark-submit --class JavaDirectKafkaWordCount --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.2 --master 'local[*]' target/sparkwordcount-0.0.1-SNAPSHOT.jar data5.cs.rutgers.edu:6667 java topic6
The other 3 items are arguments to the program.

Example in Python

Here is sample code in python to write to a topic and read from it. If you're using your own Python environment, you may need to install additional packages. Our environment has it already installed. These commands should set up your own copy of anaconda:

conda install -c conda-forge kafka-python
pip install gssapi

Here's a sample producer:

from kafka import KafkaProducer
from kafka import KafkaClient

params = {
  'bootstrap_servers': ['data5.cs.rutgers.edu:6667'],
  'security_protocol': 'SASL_PLAINTEXT',
  'sasl_mechanism': 'GSSAPI',
}

producer = KafkaProducer(**params)
producer.send("topic6", b"test data")
producer.flush()
producer.close()
Again, please don't use "topic6." Use your own topic name. Create the topic with "kafka-topics.sh --create," as shown above before running this program. Kafka will actually create topics for you if you use a new topic name in this Python program. However topics created "on the fly" will be public: anyone can read and write to them and even delete them. Since we can't tell who owns them, we reserve the right to delete them.

Here's example code to read data from a topic. Note that the parameter "'auto_offset_reset': 'earliest'" causes the program to read all data, starting from the eariest available. That's fine for testing. In a real application you probably don't want this, since you probably just want to process each piece of data once.

from kafka import KafkaConsumer
from kafka import KafkaClient

params = {
  'bootstrap_servers': ['data5.cs.rutgers.edu:6667'],
  'security_protocol': 'SASL_PLAINTEXT',
  'auto_offset_reset': 'earliest',
  'sasl_mechanism': 'GSSAPI',
}

consumer = KafkaConsumer('newtopic6', **params)
for msg in consumer:
   print (msg.value)

Technical informtion about the cluster

The Hadoop system is contained in 7 VMs, running on ilab1, ilab2 and ilab3 using KVM.

Because this cluster is intended for use in a student environment, Kerberos security is enabled on it. In order to do any operations you need Kerberos credentials. Our systems are set up to get credentials for you when you login, whether via ssh or the web tools.