Computer Science Spark Support
This file documents details of how to program with Spark on our systems.
This document does not describe interactive use of Spark. Our installation of Jupyter supports Spark, for Java, Scala and Python. See the Spark section in Data Science Support for information on that, as well as more information on the interactive shells, pyspark and spark-shell.
In comparison with a Hadoop cluster, Spark as described here
- Always runs in local mode. There's no cluster and no yarn.
- Always uses local files. There's no HDFS
Our installation of Spark is currently version 3.5.1, with Scala 2.12, although Spark 2.4.5 and Scala 2.11 is still available using spark2 versions of commands, e.g. spark2-submit rather than spark-submit.
WARNING: Only the most recent combination, Spark 3.5.1 with Python 3.11, is tested.
In addition
- Spark is configured to use 8gb of memory by default. This should be enough for all classwork. With the command used to run Spark jobs (spark-submit), you can add "--driver-memory NNNg" to override this.
- Spark is currently set to use Java 17. However if you choose Spark 2, you'll get Java 8, because it won't work with Java 17. If you want to use another version, set JAVA_HOME and set SPARK_USEMYJAVA to 1.
Table of contents
Spark with Scala
See Spark Quick Start for a quick introduction to Spark with Scala. (You can also choose Python for in that introduction.) This demonstrates typing Scala Spark programs interactively.
As documented in that introduction, an easy way to try Spark is to type commands interactively.
spark-shell
However for significant programs you probably don't want to type them in every time. Spark programs using Scala are built just like Java applications, because Scala runs in a Java JVM. It's really just a different syntax for Java functionality. Thus programs are put into .jar files, as they are for Java.
Here are a couple of examples of using example programs, where the .jar files are already installed in the system.
spark-submit --class org.apache.spark.examples.SparkPi /common/system/spark3/examples/jars/spark-examples*.jar 10Here's what it does:
- Normally you'll create a jar file with your application in it. We're using a sample application that's supplied with Spark. In this case several jar files are specified.
- 10 is an argument to the program
- --class indicates the specific class to be run. That class will be in one of the jar files. It will run the method called "main," as in Java
Building and running your own program
The following instructions apply to Ubuntu 22.
WARNING: it will not work for Ubuntu 20. All systems will be upgraded to 22 in the summer of 2023. If you need to do this for Ubuntu 20, set JAVA_HOME to /usr/lib/jvm/java-11-openjdk-amd64/.
This uses 304 MB of space in ~/.m2
Set up your build area as follows:
- Create a directory (with mkdir) and cd into it
- Download pom.xml into it. pom.xml specifies how to get the various libraries needed by a basic Spark program in Scala.
- mkdir -p src/main/scala
- cd src/main/scala
- Download SparkWordCount.scala into it. This is just a sample. Once it works, you'll want to remove it and create your own program.
- cd ../../.., i.e. go back to the original directory you created
- You may need to do "export JAVA_HOME /usr/lib/jvm/java-17-openjdk-amd64/" Do this if you get an error from "mvn package" that include "error: invalid target release: 17"
- "mvn package". This will download a huge number of files, taking quite a while. (This only happens the first time.) Then it will build the program. The program ends up in target/sparkwordcount-0.0.2-SNAPSHOT.jar. The name of the jar file is specified in pom.xml. It comes from the artifactId and version at the beginning of the file. You'll want to change the name and probably the version number when you write your own program.
NOTE: Despite the name "SparkWordCount," the program is actually a bit more complex. It ends up outputting character counts, not word counts.
Now that you have built a jar file, you can run it just like the samples. For this program, the input can be any text file. Here I'm using data.txt. This copy of Spark defaults to reading local files, so a normal Linux file name works.
spark-submit --class SparkWordCount --master local --driver-memory 4g --name wordcount target/sparkwordcount-0.0.1-SNAPSHOT.jar data.txt 1
This is a word count program. data.txt is the file whose words it should count. Note that Spark expects the input The "1" says to count all words that are at least 1 character long.
Note that you must specify both the jar file that was built by "mvn package" and the name of the main class. The class name is specified at the beginning of the program in the main Object declaration. Spark-submit will run the method called "main" in the class you specify. It will pass any arguments that you include on the command line after the jar file.
The only option you're likely to need is --driver-memory. The default is 8G. That should be enough for most programs. If you need more than that, please consult your instructor or a staff member. There's likely to be a problem with your program, and you could end up using excessive resources.
See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them.
Python
Setting your python version
You must select a version of python We have several versions of python. By default, "python" gives you an old python2. You really don't want to use that with Spark. You almost certainly want to use our most recent standard virtual environment. See Using Python on CS Linux systems for the official way to pick your version. The following steps are an alternate shortcut that will work for Spark, but may not be enough for other things.
- Type "ls /common/system/venv/". Unless you have a reason to use an older one, pick the most recent Python 3. New versions of Python come out about once a year, so you should check regularly for changes.
- Put the version in your path. E.g.
export PATH="/common/system/venv/python312/bin:$PATH"
This can go in your .bashrc so it happens automatically when you login. - Set PYSPARK_PYTHON to this version, e.g.
export PYSPARK_PYTHON=/common/system/venv/python312/bin/python
You will also want to put that in your .bashrc file.
Both the pyspark shell and spark-submit will use the version of Python you specify.
You can check this by typing "pyspark" and verifying that it prints the version number you were expecting.
Using Spark with Python
If you want to use Python interactively, type "pyspark." This will give you an interactive copy of python. It's like typing "python", except that a Spark environment is set up for you. The following are predefined:
- spark - a pyspark.sql.SparkSession (using Hive)
- sc - a SparkContext
- sql - an bound method SparkSession.sql for the session
- sqlContext - an SQLContext [for compatibility]
- sqlCtx - an old name for sqlContext
However for longer programs, you probably want to put them in a file and run thhem. Here's how to run an example program that they supply:
spark-submit /common/system/spark3/examples/src/main/python/pi.py 100
If you need to use additional libraries that we haven't installed, one approach is to pass them to spark-submit with the --py-files argument. This takes a comma-separated list of files. They can be either .py files or ZIP files. However it's probably easier to install the libraries using "python -m pip install --user". (This obviously assumes that your path is set so you get the right version of python.)
Here's a sample Python Spark program, as usual a word count: WordCount.py This program is written be work under both Python 2 and 3. Since we're now using Python 3 only, the line starting "from __future__" is no longer needed.
To run it, specify a file to count. It can be any text file.
spark-submit WordCount.py data.txt
Java
Java is used much like Scala, because Scala actually uses the Java JVM and build process. Thus we suggest that you look at the Scala section and try running the samples. That will get you used to the tools.
Building and running your own program
The following instructions apply to Ubuntu 22.
WARNING: it will not work for Ubuntu 20. All systems will be upgraded to 22 in the summer of 2023. If you need to do this for Ubuntu 20, set JAVA_HOME to /usr/lib/jvm/java-11-openjdk-amd64/, and edit pom.xml changing "source" and "target" under "maven-compiler-plugin" from 17 to 11.
This uses 304 MB of space in ~/.m2
Set up your build area as follows:
- Create a directory (with mkdir) and cd into it
- Download pom.xml into it. pom.xml specifies how to get the various libraries needed by a basic Spark program in Java.
- mkdir -p src/main/java
- cd src/main/java
- Download JavaWordCount.java into it. This is just a sample. Once it works, you'll want to remove it and create your own program.
- cd ../../.., i.e. go back to the original directory you created
- You may need to do "export JAVA_HOME /usr/lib/jvm/java-17-openjdk-amd64/" Do this if you get an error from "mvn package" that include "error: invalid target release: 17"
- "mvn package". This will download a huge number of files, taking quite a while. Then it will build the program. The program ends up in target/javawordcount-1.jar. The name of the jar file is specified in pom.xml. It comes from the artifactId and version at the beginning of the file. You'll want to change the name and probably the version number when you write your own program.
Now that you have built a jar file, you can run it just like the samples.
spark-submit --class JavaWordCount --driver-memory 4g target/javawordcount-1.jar data.txt
- At a minimum, you need to specify the name of the main class and the jar file containing the code.
- data.txt is an argument to the program. It's the name of the input file to read.
- --driver-memory specified the size of the JVM to use. You won't normally need that. This is just to show you how to specify memory if you need to. The default is 8 GB, which should be enough.
This is a word count program. data.txt is the file whose words it should count. This version of Spark is configured to expect normal Linux files.
See the POM section for information about pom.xml. I've supplied pom.xml files that will work for simple examples. But as you start programming you may need to know more about them. You can use a pom.xml file that has support for Scala. However the sample provided here supports only Java.
About pom.xml (Scala and Java)
When you specify an "import" statement in your program, this implicitly refers to a jar file that has the code implementing the package. That jar file has to be available both when your program is being compiled / built, and when it is run.
The pom.xml file declares what packages you need. The "mvn package" command will retrieve them from various network respositories, and make them avaiable to the Java compiler when building your program.
pom.xml does several other things in addition:
- At the beginning of the file, you'll see a group ID, artifact ID, and version. The artifact ID will be used for the name of the .jar file for your program. The version number will be added to it.
- In the <plugins> section, it specifies what compilers should be used. The pom.xml file for Scala includes both the Scala and Java compilers. The Java example includes only the Java compiler. You can actually use the Scala pom.xml file with the Java program. It's fine to declare both compilers even if you're only using one.
- In the plugin maven-compiler-plugin, the "source" and "target" properties specify the version of the Java compiler to use. At the moment we're using Java 17.
- The Scala compiler uses the version declared for the Scala library dependency.
NOTE: Libraries come in lots of versions. As people make changes and fixes they move to a new version number. In the POM file you have to specify the specific version you want. I've used versions that match the specific version of Spark we have installed.
But suppose you want to use packages that aren't part of Spark. In that case you'll need to locate the necessary jar files and add them as dependencies in pom.xml. Generally you'll be using standard open-source libraries. (See below for how to add your own .jar files.) There's a very large network respository, which has pretty much every package known to man: Maven Central. The "mvn package" command knows how to find packages in that repository, download them, and use them.
But first you have to find the location of the package in the repository. Generally a bit of searching in Google show you the Maven specificatino for a package. You're looking for what is called a "Maven dependency." Normally they look like this:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.5.1</version> </dependency>If you look at pom.xml, it should be obvious where to add them.
More complex programs may use more than one source file. If you add additional files in src/main/java, they will be compiled and added to the jar file in target.
Including packages at runtime
Unfortunately listing a package in pom.xml is only half the battle. That only makes the .jar file available when your program is built. It also has to be available when the program runs.
Many common packages are already present in the Spark systen. You can see which ones are there by looking at /common/system/spark/jars/. If your package is already present, you need to put it in pom.xml, but you don't need this section. This section shows how to supply a package in the execution environment if it's not already there.
If you need to add the package to the execution environment, you'll need to use the --packages argument to spark-submit.
E.g. suppose you wanted to use the "JSON simple" package to work with JSON objects. E.g.
import org.json.simple.JSONArray; import org.json.simple.JSONObject; ... JSONObject obj = new JSONObject(); obj.put("name", "Rutgers University"); obj.put("age", new Integer(100)); System.out.println(obj.toJSONString());The JSON simple documentation gives you the Maven dependency. But you could also look in google for "maven json simple." That will point you to a Maven web page. You'll find that The most recent version is 1.1.1. It shows the following dependency:
<dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1.1</version> </dependency>
Add that to the <dependency> section of your pom.xml. You should now be able to do "mvn package".
But there's more .... If you do
spark-submit --class JavaWordCount --driver-memory 4g target/javawordcount-1.jar data.txtyou'll get the following error:
Exception in thread "main" java.lang.NoClassDefFoundError: org/json/simple/JSONObjectThat's because you had the jar files available through maven when you compiled the program, but not when you ran it. This won't always be a problem. The execution environment has many of the most common libraries, e.g. most of the Apache Commons packages. It also has a couple of recent JSON libraries. I actually picked JSON Simple specifically because it's an older library that isn't present in the execution environment. (I needed something that isn't there to do this demonstration.)
To make the necessary jar file available during execution, you need to include the package in the "spark-submit" command
spark-submit --packages com.googlecode.json-simple:json-simple:1.1.1 --class JavaWordCount --driver-memory 4g target/javawordcount-1.jar data.txtNote the --packages argument. It simply puts together the group ID, artifact ID and version number. If you have more than one package, list all of them separated by commas, without any whitespace.
Some packages are in repositories other than Maven cental. E.g. there's a spark-packages.org that collects packages for spark. They have their own repository. To load packages from there, you need to specify --repositories=https://dl.bintray.com/spark-packages/maven in addition to the --packages argument. For both packages and repositories, you can specify more than one value, separated by comma. No white space.
If you have jar files that aren't in maven, you'll need to have copies of them in your directory. Then you can supply a "--jars" argument to "spark-submit", using the name of the jar files separated by commas.
Note that jars loaded from --packages are cached in .ivy2, so they don't have to be downloaded every time. For jobs submitted to the cluster, there's a per-user cache on each cluster node.
Putting options in a file
If you need lots of packages, you may find it inconvenient to type long arguments to spark-shell or spark-submit each time. You can put arguments in a properties file. E.g. here's one that adds a package from the Spark packages collection. Put the following in spark.properties
spark.yarn.appMasterEnv.PYSPARK_PYTHON /common/system/venv/python312/bin/python spark.jars.packages graphframes:graphframes:0.8.0-spark3.0-s_2.12 spark.jars.repositories https://dl.bintray.com/spark-packages/mavenNote that the property names are different from the arguments. That is --packages on the command line corredponds to the property spark.jars.packages. You can see the property names in the Spark documentation. You'll see that there are properties to control many things: how many nodes your job uses, how many cores, how much memory, etc.
You can then point spark-shell or spark-submit to this file, e.g.
spark-shell --properties-file spark.properties'To avoid typing --properties-file each time, you can alias the spark-shell and spark-submit commands:
alias spark-shell='spark-shell --properties-file ~/spark.properties' alias spark-submit='spark-submit --properties-file ~/spark.properties'To get this each time, put it in your .bashrc and .profile files
SQL
Spark has builtin ability to do SQL queries. If you have a DataFrame, you can use df.createOrReplaceTempView("name") to make the data from that DataFrame accessible to SQL as a table called "name". Of course there are native DataFrame calls to do most of the searching you might want, so it's your choice whether to use SQL.
You can use df.write.saveAsTable("name") to save data from the DataFrame into a file. By default it saves it in Parquet format, which is a directory of files, in spark-warehouse in your home directory. You can, of course, choose other fornats and specify another location.
You can also work with tables stored in Hive. See Hive from Spark.
There's also a separate spark-sql program, which gives you a simple SQL interface. It's part of the overall Spark infrastructure, so it uses Spark-style parallel execution for SQL commands. By default, it deals with Hive tables. The data from a Hive table is normally stored in spark-warehouse in your home directory. However there is some additional meta-data stored in metastore_db.
In the spark-sql program, if you do "create table", it will by default put the table in the spark-warehouse directory, in Parquet format, and create the appropriate Hive meta-data in metastore_db. Hive is able to deal with other formats as well.
If you have a DataFrame in Spark, and do df.write.saveAsTable("name"), it will put the data into spark-warehouse. However it won't create the Hive metadata in metastore_db.
To alllow spark-sql to access tables saved by saveAsTable, you'll need to tell Hive about them. Here's an example. It assummes that you did saveAsTable("tab")
create table mytable using org.apache.spark.sql.parquet options (path "/ilab/users/NETID/spark-warehouse/tab");You can use any name you want in place of "mytable." Use your own NETID in place of NETID. THis command only needs to be done once. After that, you can access the data from spark-sql using the table name mytable.