Computer Science Spark Support

This file documents details of how to program with Spark on our systems.

This document does not describe interactive use of Spark. Our installation of Jupyter supports Spark, for Java, Scala and Python. See the Spark section in Data Science Support for information on that, as well as more information on the interactive shells, pyspark and spark-shell.

In comparison with a Hadoop cluster, Spark as described here

Our larger systems, e.g. ilab1 - 4, have enough cores that you can get a good deal of parallelism without needing a cluster.

Our installation of Spark is currently version 3.4.0, with Scala 2.12, although Spark 2.4.5 and Scala 2.11 is still available using spark2 versions of commands, e.g. spark2-submit rather than spark-submit.

WARNING: Only the most recent combination, Spark 3.4.0 with Python 3.10, is tested. (We have 3.11, but a number of packages haven't been updated to support it, so we do most testing with 3.10.)

In addition

Table of contents

Spark with Scala

See Spark Quick Start for a quick introduction to Spark with Scala. (You can also choose Python for in that introduction.) This demonstrates typing Scala Spark programs interactively.

As documented in that introduction, an easy way to try Spark is to type commands interactively.

spark-shell

However for significant programs you probably don't want to type them in every time. Spark programs using Scala are built just like Java applications, because Scala runs in a Java JVM. It's really just a different syntax for Java functionality. Thus programs are put into .jar files, as they are for Java.

Here are a couple of examples of using example programs, where the .jar files are already installed in the system.

spark-submit --class org.apache.spark.examples.SparkPi /common/system/spark3/examples/jars/spark-examples*.jar 10
Here's what it does:

Building and running your own program

The following instructions apply to Ubuntu 22.

WARNING: it will not work for Ubuntu 20. All systems will be upgraded to 22 in the summer of 2023. If you need to do this for Ubuntu 20, set JAVA_HOME to /usr/lib/jvm/java-11-openjdk-amd64/.

This uses 304 MB of space in ~/.m2

Set up your build area as follows:

NOTE: Despite the name "SparkWordCount," the program is actually a bit more complex. It ends up outputting character counts, not word counts.

Now that you have built a jar file, you can run it just like the samples. For this program, the input can be any text file. Here I'm using data.txt. This copy of Spark defaults to reading local files, so a normal Linux file name works.

spark-submit --class SparkWordCount --master local --driver-memory 4g --name wordcount target/sparkwordcount-0.0.1-SNAPSHOT.jar  data.txt 1

This is a word count program. data.txt is the file whose words it should count. Note that Spark expects the input The "1" says to count all words that are at least 1 character long.

Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Object declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.

The only option you're likely to need is --driver-memory. The default is 8G. That should be enough for most programs. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.

Python

Setting your python version

You must select a version of python We have several versions of python. By default, "python" gives you an old python2. You really don't want to use that with Spark. You almost certainly want to use our most recent Anacdona environment. See Using Python on CS Linux systems for the official way to pick your version. The following steps are an alternate shortcut that will work for Spark, but may not be enough for other things.

Both the pyspark shell and spark-submit will use the version of Python you specify.

You can check this by typing "pyspark" and verifying that it prints the version number you were expecting.

Using Spark with Python

If you want to use Python interactively, type "pyspark." This will give you an interactive copy of python. It's like typing "python", except that a Spark environment is set up for you. The following are predefined:

However for longer programs, you probably want to put them in a file and run thhem. Here's how to run an example program that they supply:

spark-submit /common/system/spark3/examples/src/main/python/pi.py 100

If you need to use additional libraries that we haven't installed, one approach is to pass them to spark-submit with the --py-files argument. This takes a comma-separated list of files. They can be either .py files or ZIP files. However it's probably easier to install the libraries using "python -m pip install --user". (This obviously assumes that your path is set so you get the right version of python.)

Here's a sample Python Spark program, as usual a word count: WordCount.py This program is written be work under both Python 2 and 3. Since we're now using Python 3 only, the line starting "from __future__" is no longer needed.

To run it, specify a file to count. It can be any text file.

spark-submit WordCount.py data.txt

Java

Java is used much like Scala, because Scala actually uses the Java JVM and build process. Thus we suggest that you look at the Scala section and try running the samples. That will get you used to the tools.

Building and running your own program

The following instructions apply to Ubuntu 22.

WARNING: it will not work for Ubuntu 20. All systems will be upgraded to 22 in the summer of 2023. If you need to do this for Ubuntu 20, set JAVA_HOME to /usr/lib/jvm/java-11-openjdk-amd64/, and edit pom.xml changing "source" and "target" under "maven-compiler-plugin" from 17 to 11.

This uses 304 MB of space in ~/.m2

Set up your build area as follows:

Now that you have built a jar file, you can run it just like the samples.

spark-submit --class JavaWordCount --driver-memory 4g target/javawordcount-1.jar data.txt

This is a word count program. data.txt is the file whose words it should count. This version of Spark is configured to expect normal Linux files.

See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them. You can use a pom.xml file that has support for Scala. However the sample provided here supports only Java.

About pom.xml (Scala and Java)

When you specify an "import" statement in your program, this implicitly refers to a jar file that has the code implementing the package. That jar file has to be available both when your program is being compiled / built, and when it is run.

The pom.xml file declares what packages you need. The "mvn package" command will retrieve them from various network respositories, and make them avaiable to the Java compiler when building your program.

pom.xml does several other things in addition:

In the pom.xml files I supply, I've included a few Spark packages, and in comments I've shown how to use additional ones.

NOTE: Libraries come in lots of versions. As people make changes and fixes they move to a new version number. In the POM file you have to specify the specific version you want. I've used versions that match the specific version of Spark we have installed.

But suppose you want to use packages that aren't part of Spark. In that case you'll need to locate the necessary jar files and add them as dependencies in pom.xml. Generally you'll be using standard open-source libraries. (See below for how to add your own .jar files.) There's a very large network respository, which has pretty much every package known to man: Maven Central. The "mvn package" command knows how to find packages in that repository, download them, and use them.

But first you have to find the location of the package in the repository. Generally a bit of searching in Google show you the Maven specificatino for a package. You're looking for what is called a "Maven dependency." Normally they look like this:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.4.0</version>
    </dependency>
If you look at pom.xml, it should be obvious where to add them.

More complex programs may use more than one source file. If you add additional files in src/main/java, they will be compiled and added to the jar file in target.

Including packages at runtime

Unfortunately listing a package in pom.xml is only half the battle. That only makes the .jar file available when your program is built. It also has to be available when the program runs.

Many common packages are already present in the Spark systen. You can see which ones are there by looking at /common/system/spark/jars/. If your package is already present, you need to put it in pom.xml, but you don't need this section. This section shows how to supply a package in the execution environment if it's not already there.

If you need to add the package to the execution environment, you'll need to use the --packages argument to spark-submit.

E.g. suppose you wanted to use the "JSON simple" package to work with JSON objects. E.g.

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

...

    JSONObject obj = new JSONObject();
    obj.put("name", "Rutgers University");
    obj.put("age", new Integer(100));
    System.out.println(obj.toJSONString());

The JSON simple documentation gives you the Maven dependency. But you could also look in google for "maven json simple." That will point you to a Maven web page. You'll find that The most recent version is 1.1.1. It shows the following dependency:
<dependency>
    <groupId>com.googlecode.json-simple</groupId>
    <artifactId>json-simple</artifactId>
    <version>1.1.1</version>
</dependency>

Add that to the <dependency> section of your pom.xml. You should now be able to do "mvn package".

But there's more .... If you do

spark-submit --class JavaWordCount --driver-memory 4g target/javawordcount-1.jar data.txt
you'll get the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/json/simple/JSONObject
That's because you had the jar files available through maven when you compiled the program, but not when you ran it. This won't always be a problem. The execution environment has many of the most common libraries, e.g. most of the Apache Commons packages. It also has a couple of recent JSON libraries. I actually picked JSON Simple specifically because it's an older library that isn't present in the execution environment. (I needed something that isn't there to do this demonstration.)

To make the necessary jar file available during execution, you need to include the package in the "spark-submit" command

spark-submit --packages com.googlecode.json-simple:json-simple:1.1.1 --class JavaWordCount --driver-memory 4g target/javawordcount-1.jar data.txt
Note the --packages argument. It simply puts together the group ID, artifact ID and version number. If you have more than one package, list all of them separated by commas, without any whitespace.

Some packages are in repositories other than Maven cental. E.g. there's a spark-packages.org that collects packages for spark. They have their own repository. To load packages from there, you need to specify --repositories=https://dl.bintray.com/spark-packages/maven in addition to the --packages argument. For both packages and repositories, you can specify more than one value, separated by comma. No white space.

If you have jar files that aren't in maven, you'll need to have copies of them in your directory. Then you can supply a "--jars" argument to "spark-submit", using the name of the jar files separated by commas.

Note that jars loaded from --packages are cached in .ivy2, so they don't have to be downloaded every time. For jobs submitted to the cluster, there's a per-user cache on each cluster node.

Putting options in a file

If you need lots of packages, you may find it inconvenient to type long arguments to spark-shell or spark-submit each time. You can put arguments in a properties file. E.g. here's one that adds a package from the Spark packages collection. Put the following in spark.properties

spark.yarn.appMasterEnv.PYSPARK_PYTHON /common/system/anaconda/envs/python39/bin/python
spark.jars.packages graphframes:graphframes:0.8.0-spark3.0-s_2.12
spark.jars.repositories https://dl.bintray.com/spark-packages/maven
Note that the property names are different from the arguments. That is --packages on the command line corredponds to the property spark.jars.packages. You can see the property names in the Spark documentation. You'll see that there are properties to control many things: how many nodes your job uses, how many cores, how much memory, etc.

You can then point spark-shell or spark-submit to this file, e.g.

spark-shell --properties-file spark.properties'
To avoid typing --properties-file each time, you can alias the spark-shell and spark-submit commands:
alias spark-shell='spark-shell --properties-file ~/spark.properties'
alias spark-submit='spark-submit --properties-file ~/spark.properties'
To get this each time, put it in your .bashrc and .profile files

SQL

Spark has builtin ability to do SQL queries. If you have a DataFrame, you can use df.createOrReplaceTempView("name") to make the data from that DataFrame accessible to SQL as a table called "name". Of course there are native DataFrame calls to do most of the searching you might want, so it's your choice whether to use SQL.

You can use df.write.saveAsTable("name") to save data from the DataFrame into a file. By default it saves it in Parquet format, which is a directory of files, in spark-warehouse in your home directory. You can, of course, choose other fornats and specify another location.

You can also work with tables stored in Hive. See Hive from Spark.

There's also a separate spark-sql program, which gives you a simple SQL interface. It's part of the overall Spark infrastructure, so it uses Spark-style parallel execution for SQL commands. By default, it deals with Hive tables. The data from a Hive table is normally stored in spark-warehouse in your home directory. However there is some additional meta-data stored in metastore_db.

In the spark-sql program, if you do "create table", it will by default put the table in the spark-warehouse directory, in Parquet format, and create the appropriate Hive meta-data in metastore_db. Hive is able to deal with other formats as well.

If you have a DataFrame in Spark, and do df.write.saveAsTable("name"), it will put the data into spark-warehouse. However it won't create the Hive metadata in metastore_db.

To alllow spark-sql to access tables saved by saveAsTable, you'll need to tell Hive about them. Here's an example. It assummes that you did saveAsTable("tab")

create table mytable using org.apache.spark.sql.parquet options (path "/ilab/users/NETID/spark-warehouse/tab");
You can use any name you want in place of "mytable." Use your own NETID in place of NETID. THis command only needs to be done once. After that, you can access the data from spark-sql using the table name mytable.