Introduction

This page will walk through the Spark UI with an example query designed to help you understand how to trace your own jobs through the execution life cycle.

The Monitoring Landscape

The components we can monitor:

  • Spark Applications and Jobs: It’s report information about the applications currently running at the level of concepts in Spark, such as RDDs and query plans.
  • JVM: Spark runs the executors in individual Java Virtual Machines (JVMs). JVM utilities such as jstack for providing stack traces, jmap for creating heap-dumps, jstat for reporting time–series statistics, and jconsole for visually exploring various JVM properties are useful for those comfortable with JVM internals. You can also use a tool like jvisualvm to help profile Spark jobs.
  • OS/Machine: This includes monitoring things like CPU, network, and I/O. These are often reported in cluster-level monitoring solutions; however, there are more specific tools that you can use, including dstat, iostat, and iotop.
  • Cluster: Some popular cluster-level monitoring tools include Ganglia and Prometheus.

What to Monitor

There are two things to monitor: the processes running your application (at the level of CPU usage, memory usage, etc.), and the query execution inside it (e.g., jobs and tasks).

Driver and Executor Processes

  • Driver: We are going to want to keep an eye on the driver, this is where all of the state of your application lives, and you’ll need to be sure it’s running in a stable manner.
  • Executor: Understanding the state of the executors is also extremely important for monitoring individual Spark jobs.

The metrics system is configured via a configuration file

$SPARK_HOME/conf/metrics.properties

Queries, Jobs, Stages, and Tasks

This information allows you to know exactly what’s running on the cluster at a given time. When looking for performance tuning or debugging, this is where you are most likely to start. The two most common ways of doing so: the Spark logs and the Spark UI.

Spark Logs

Naturally, strange events in Spark’s logs, or in the logging that you added to your Spark Application, can help you take note of exactly where jobs are failing or what is causing that failure.

Custom Spark Logs

You can custom Spark logs, that making them very easy to correlate.

Challenge with Python

Python won’t be able to integrate directly with Spark’s Javabased logging library. Using Python’s logging module or even simple print statements will still print the results to standard error.

To change Spark’s log level, simply run the following command:

spark.sparkContext.setLogLevel("INFO")

You won’t always find the answer you need simply by searching logs, but it can help you pinpoint the given problem that you’re encountering and possibly add new log statements in your application to better understand it.

The Spark UI

Spark UI

The Spark UI provides a visual way to monitor applications while they are running as well as metrics about your Spark workload, at the Spark and JVM level.

Every SparkContext running launches a web UI, by default on port 4040, that displays useful information about the application.

SparkContext web UI

When you run Spark in local mode, for example, just navigate to http://localhost:4040 to see the UI when running a Spark Application on your local machine. If you’re running multiple applications, they will launch web UIs on increasing port numbers (4041, 4042, …). Cluster managers will also link to each application’s web UI from their own UI.

These tabs are accessible for each of the things that we’d like to monitor. For the most part, each of these should be self-explanatory:

  • The Jobs tab refers to Spark jobs.
  • The Stages tab pertains to individual stages (and their relevant tasks).
  • The Storage tab includes information and the data that is currently cached in our Spark Application.
  • The Environment tab contains relevant information about the configurations and current settings of the Spark application.
  • The SQL tab refers to our Structured API queries (including SQL and DataFrames).
  • The Executors tab provides detailed information about each executor running our application.

Open a new Spark shell, run the following code, and we will trace its execution through the Spark UI:

import os
from pyspark.sql import SparkSession
 
jars = [
    'containers/jars/hadoop-aws-3.4.0.jar',
    'containers/jars/aws-java-sdk-bundle-1.12.262.jar',
    'containers/jars/bundle-2.23.19.jar',
    'containers/jars/hadoop-client-api-3.4.0.jar',
    'containers/jars/hadoop-client-runtime-3.4.0.jar'
]
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--jars {",".join(jars)} pyspark-shell'
 
spark = SparkSession.builder.master('spark://localhost:7077').getOrCreate()
 
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'endpoint')
sc._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'access_key')
sc._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'secret_key')
sc._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
sc._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', 'false')
sc._jsc.hadoopConfiguration().set('fs.s3a.connection.establish.timeout', '15000')
sc._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
 
spark.read\
.option("header", "true")\
.csv("s3a://spark/data/retail-data/all/online-retail-dataset.csv")\
.repartition(2)\
.selectExpr("instr(Description, 'GLASS') >= 1 as is_glass")\
.groupBy("is_glass")\
.count()\
.collect()

Let’s navigate to the SQL tab, where you should see.

These will become important in a minute, but first let’s take a look at the Directed Acyclic Graph (DAG) of Spark stages. Each blue box in these tabs represent a stage of Spark tasks. The entire group of these stages represent our Spark job. The box on top, labeled WholeStateCodegen, represents a full scan of the CSV file. The box below that represents a shuffle that we forced when we called repartition. This turned our original dataset (of a yet to be specified number of partitions) into two partitions.

The next step is our projection (selecting/adding/filtering columns) and the aggregation. This conveniently lines up with the number of output rows multiplied by the number of partitions at aggregation time. This is because Spark performs an aggregation for each partition (in this case a hash-based aggregation) before shuffling the data around in preparation for the final stage.

The last stage is the aggregation of the subaggregations that we saw happen on a per-partition basis in the previous stage. We combine those two partitions in the final three rows that are the output of our total query.

Let’s look further into the job’s execution. On the Jobs tab, next to Succeeded Jobs, our job breaks down into three stages (which corresponds to what we saw on the SQL tab).

The first stage has two tasks. s. CSV files are splittable, and Spark broke up the work to be distributed relatively evenly between the different cores on the machine. This happens at the cluster level and points to an important optimization: how you store your files. The following next stage has two tasks because we explicitly called a repartition to move the data into two partitions. The last stage has 1 tasks.

Now that we reviewed how we got here, click the stage with eight tasks to see the next level of detail.

Spark provides a lot of detail about what this job did when it ran. Toward the top, notice the Summary Metrics section. This provides a synopsis of statistics regarding various metrics. In this case, everything looks very consistent; there are no wide swings in the distribution of values. In the table at the bottom, we can also examine on a per-executor basis (one for every core on this particular machine, in this case). This can help identify whether a particular executor is struggling with its workload.

Configuring the Spark user interface

There are a number of configurations that you can set regarding the Spark UI. Many of them are networking configurations such as enabling access control. Others let you configure how the Spark UI will behave (e.g., how many jobs, stages, and tasks are stored).

Spark Monitoring and Instrumentation

Spark REST API

In addition to the Spark UI, you can also access Spark’s status and metrics via a REST API. This is is available at http://localhost:4040/api/v1 For the most part this API exposes the same information presented in the web UI, except that it doesn’t include any of the SQL-related information. This can be a useful tool if you would like to build your own reporting solution based on the information available in the Spark UI.

Spark REST API

Spark UI History Server

Normally, the Spark UI is only available while a SparkContext is running, so how can you get to it after your application crashes or ends? To do this, Spark includes a tool called the Spark History Server

Spark History Server

That allow to reconstruct the Spark UI and REST API, provided that the application was configured to save an event log.

Spark History Server Options

To use the history server:

  • The first you need to configure your application to store event logs to a certain location. To do this, you need to enable spark.eventLog.enabled and the event log location spark.eventLog.dir.
  • Then, once you have stored the events, you can run the history server as a standalone application, and it will automatically reconstruct the web UI based on these logs. Some cluster managers and cloud services also configure logging automatically and run a history server by default.

Debugging and Spark First Aid

Spark Jobs Not Starting

Signs and symptoms

  • Spark jobs don’t start.
  • The Spark UI doesn’t show any nodes on the cluster except the driver.
  • The Spark UI seems to be reporting incorrect information.

Potential treatments

Caution

This mostly occurs when your cluster or your application’s resource demands are not configured properly.

This might be because you didn’t specify what IP and port is open or didn’t open the correct one. This is most likely a cluster level, machine, or configuration issue. Another option is that your application requested more resources per executor than your cluster manager currently has free, in which case the driver will be waiting forever for executors to be launched.

  • Ensure that machines can communicate with one another on the ports that you expect. Ideally, you should open up all ports between the worker nodes unless you have more stringent security constraints.
  • Ensure that your Spark resource configurations are correct and that your cluster manager is properly set up for Spark. Try running a simple application first to see if that works. One common issue may be that you requested more memory per executor than the cluster manager has free to allocate, so check how much it is reporting free (in its UI) and your spark-submit memory configuration.

Errors Before Execution

This can happen when you’re developing a new application and have previously run code on this cluster, but now some new code won’t work.

Signs and symptoms

  • Commands don’t run at all and output large error messages.
  • You check the Spark UI and no jobs, stages, or tasks seem to run.

Potential treatments

After checking and confirming that the Spark UI environment tab shows the correct information for your application, it’s worth double-checking your code.

  • You should take a look at the error returned by Spark.
  • Double-check to verify that the cluster has the network connectivity.
  • There might be issues with libraries or classpaths that are causing the wrong version of a library to be loaded for accessing storage. Try simplifying your application until you get a smaller version that reproduces the issue (e.g., just reading one dataset).

Errors During Execution

This kind of issue occurs when you already are working on a cluster or parts of your Spark Application run before you encounter an error.

Signs and symptoms

  • One Spark job runs successfully on the entire cluster but the next one fails.
  • A step in a multistep query fails.
  • A scheduled job that ran yesterday is failing today.
  • Difficult to parse error message.

Potential treatments

  • Check to see if your data exists or is in the format that you expect.
  • If an error quickly pops up when you run a query (i.e., before tasks are launched), it is most likely an analysis error while planning the query. This means that you likely misspelled a column name referenced in the query or that a column, view, or table you referenced does not exist.
  • Read through the stack trace to try to find clues about what components are involved.
  • Try to isolate the issue by progressively double-checking input data and ensuring the data conforms to your expectations. Also try removing logic until you can isolate the problem in a smaller version of your application.
  • If a job runs tasks for some time and then fails, it could be due to a problem with the input data itself, wherein the schema might be specified incorrectly or a particular row does not conform to the expected schema.
  • It’s also possible that your own code for processing the data is crashing, in which case Spark will show you the exception thrown by your code. In this case, you will see a task marked as “failed” on the Spark UI, and you can also view the logs on that machine to understand what it was doing when it failed. Try adding more logs inside your code to figure out which data record was being processed.

Slow Tasks or Stragglers

This issue is quite common when optimizing applications, and can occur either due to work not being evenly distributed across your machines (“skew”), or due to one of your machines being slower than the others (e.g., due to a hardware problem).

Signs and symptoms

  • Spark stages seem to execute until there are only a handful of tasks left. Those tasks then take a long time.
  • These slow tasks show up in the Spark UI and occur consistently on the same dataset(s).
  • These occur in stages, one after the other.
  • Scaling up the number of machines given to the Spark Application doesn’t really help some tasks still take much longer than others.
  • In the Spark metrics, certain executors are reading and writing much more data than others.

Potential treatments

Slow tasks are often called “stragglers.”

The most the source of this issue

The most often the source of this issue is that your data is partitioned unevenly into DataFrame or RDD partitions. When this happens, some executors might need to work on much larger amounts of work than others.

Common case

You use a group-by-key operation and one of the keys just has more data than others. In this case, when you look at the Spark UI, you might see that the shuffle data for some nodes is much larger than for others.

  • Try increasing the number of partitions to have less data per partition.
  • Try repartitioning by another combination of columns (In the latter case, it might make sense to first filter out the null values).
  • Try increasing the memory allocated to your executors if possible.
  • Monitor the executor that is having trouble and see if it is the same machine across jobs; you might also have an unhealthy executor or machine in your cluster, for example, one whose disk is nearly full.
  • If this issue is associated with slow joins or slow aggregations, follow the next section.
  • Check whether your user-defined functions (UDFs) are wasteful in their object allocation or business logic. Try to convert them to DataFrame code if possible.
  • Ensure that your UDFs or User-Defined Aggregate Functions (UDAFs) are running on a small enough batch of data. Oftentimes an aggregation can pull a lot of data into memory for a common key, leading to that executor having to do a lot more work than others.
  • Turning on speculation, This can be helpful if the issue is due to a faulty node because the task will get to run on a faster one. Speculation does come at a cost, however, because it consumes additional resources. In addition, for some storage systems that use eventual consistency, you could end up with duplicate output data if your writes are not idempotent.

Slow Aggregations

Signs and symptoms

  • Slow tasks during a groupBy call.
  • Jobs after the aggregation are slow, as well.

Potential treatments

Unfortunately, this issue can’t always be solved. Sometimes, the data in your job just has some skewed keys, and the operation you want to run on them needs to be slow.

  • Increasing the number of partitions, prior to an aggregation, might help by reducing the number of different keys processed in each task.
  • Increasing executor memory can help alleviate this issue, as well. If a single key has lots of data, this will allow its executor to spill to disk less often and finish faster, although it may still be much slower than executors processing other keys.
  • If you find that tasks after the aggregation are also slow, this means that your dataset might have remained unbalanced after the aggregation. Try inserting a repartition call to partition it randomly.
  • Ensuring that all filters and SELECT statements that can be are above the aggregation can help to ensure that you’re working only on the data that you need to be working on and nothing else. Spark’s query optimizer will automatically do this for the structured APIs.
  • Ensure null values are represented correctly (using Spark’s concept of null) and not as some default value like ” ” or “EMPTY”. Spark often optimizes for skipping nulls early in the job when possible, but it can’t do so for your own placeholder values.
  • Some aggregation functions are also just inherently slower than others. For instance, collect_list and collect_set are very slow aggregation functions because they must return all the matching objects to the driver, and should be avoided in performance-critical code.

Slow Joins

Signs and symptoms

  • A join stage seems to be taking a long time. This can be one task or many tasks.
  • Stages before and after the join seem to be operating normally.

Potential treatments

  • Select types of joins properly.
  • Experimenting with different join orderings can really help speed up jobs, especially if some of those joins filter out a large amount of data; do those first.
  • Partitioning a dataset prior to joining can be very helpful for reducing data movement across the cluster, especially if the same dataset will be used in multiple join operations. It’s worth experimenting with different prejoin partitioning. Keep in mind, again, that this isn’t “free” and does come at the cost of a shuffle.
  • Slow joins can also be caused by data skew. There’s not always a lot you can do here, but sizing up the Spark application and/or increasing the size of executors can help, as described in earlier sections.
  • Ensuring that all filters and select statements that can be are above the join can help to ensure that you’re working only on the data that you need for the join.
  • Ensure that null values are handled correctly (that you’re using null) and not some default value like ” ” or “EMPTY”, as with aggregations.
  • Sometimes Spark can’t properly plan for a broadcast join if it doesn’t know any statistics about the input DataFrame or table. If you know that one of the tables that you are joining is small, you can try to force a broadcast, or use Spark’s statistics collection commands to let it analyze the table.

Slow Reads and Writes

Signs and symptoms

  • Slow reading of data from a distributed file system or external system.
  • Slow writes from network file systems or blob storage.

Potential treatments

  • Turning on speculation (set spark.speculation to true) can help with slow reads and writes. This will launch additional tasks with the same operation in an attempt to see whether it’s just some transient issue in the first task.

Duplicate data when using speculation

It can cause duplicate data writes with some eventually consistent cloud services, such as Amazon S3, so check whether it is supported by the storage system connector you are using.

  • Ensuring sufficient network connectivity can be important, your Spark cluster may simply not have enough total network bandwidth to get to your storage system.
  • For distributed file systems such as HDFS running on the same nodes as Spark, make sure Spark sees the same hostnames for nodes as the file system. This will enable Spark to do locality-aware scheduling, which you will be able to see in the “locality” column in the Spark UI.

Driver OutOfMemoryError or Driver Unresponsive

It often happens due to collecting too much data back to the driver, making it run out of memory.

Signs and symptoms

  • Spark Application is unresponsive or crashed.
  • OutOfMemoryErrors or garbage collection messages in the driver logs.
  • Commands take a very long time to run or don’t run at all.
  • Interactivity is very low or non-existent.
  • Memory usage is high for the driver JVM.

Potential treatments

  • Your code might have tried to collect an overly large dataset to the driver node using operations such as collect.
  • You might be using a broadcast join where the data to be broadcast is too big. Use Spark’s maximum broadcast join configuration to better control the size it will broadcast.
  • A long-running application generated a large number of objects on the driver and is unable to release them.

Debugging with Java's jmap tool

Java’s jmap tool can be useful to see what objects are filling most of the memory of your driver JVM by printing a histogram of the heap. However, take note that jmap will pause that JVM while running.

  • Increase the driver’s memory allocation if possible to let it work with more data.
  • Issues with JVMs running out of memory can happen if you are using another language binding, such as Python, due to data conversion between the two requiring too much memory in the JVM. Try to see whether your issue is specific to your chosen language and bring back less data to the driver node, or write it to a file instead of bringing it back as in-memory objects.
  • If you are sharing a SparkContext with other users (e.g., through the SQL JDBC server and some notebook environments), ensure that people aren’t trying to do something that might be causing large amounts of memory allocation in the driver.

Executor OutOfMemoryError or Executor Unresponsive

Signs and symptoms

  • OutOfMemoryErrors or garbage collection messages in the executor logs. You can find these in the Spark UI.
  • Executors that crash or become unresponsive.
  • Slow tasks on certain nodes that never seem to recover.

Potential treatments

  • Try increasing the memory available to executors and the number of executors.
  • Try increasing PySpark worker size via the relevant Python configurations.
  • Look for garbage collection error messages in the executor logs. Repartition your data to increase parallelism, reduce the amount of records per task, and ensure that all executors are getting the same amount of work.

Using UDFs

Especially if you’re using UDFs, can be creating lots of objects that need to be garbage collected.

  • Ensure that null values are handled correctly (that you’re using null) and not some default value like ” ” or “EMPTY”.
  • This is more likely to happen with RDDs or with Datasets because of object instantiations. Try using fewer UDFs and more of Spark’s structured operations when possible.
  • Use Java monitoring tools such as jmap to get a histogram of heap memory usage on your executors, and see which classes are taking up the most space.
  • If executors are being placed on nodes that also have other workloads running on them, such as a key-value store, try to isolate your Spark jobs from other jobs.

Unexpected Nulls in Results

Signs and symptoms

  • Unexpected null values after transformations.
  • Scheduled production jobs that used to work no longer work, or no longer produce the right results.

Potential treatments

  • It’s possible that your data format has changed without adjusting your business logic.
  • Use an accumulator to try to count records or certain types, as well as parsing or processing errors where you skip a record.

Using accumulator in a UDF

Most often, users will place the accumulator in a UDF when they are parsing their raw data into a more controlled format and perform the counts there. This allows you to count valid and invalid records and then operate accordingly after the fact.

  • Ensure that your transformations actually result in valid query plans. Spark SQL sometimes does implicit type coercions that can cause confusing results.

No Space Left on Disk Errors

Signs and symptoms

  • You see “no space left on disk” errors and your jobs fail.

Potential treatments

  • The easiest way to alleviate this, of course, is to add more disk space. You can do this by sizing up the nodes that you’re working on or attaching external storage in a cloud environment.
  • If you have a cluster with limited storage space, some nodes may run out first due to skew. Repartitioning the data as described earlier may help here.
  • There are also a number of storage configurations with which you can experiment. Some of these determine how long logs should be kept on the machine before being removed.
  • Try manually removing some old log files or old shuffle files from the machine(s) in question. This can help alleviate some of the issue although obviously it’s not a permanent fix.

Serialization Errors

Signs and symptoms

  • You see serialization errors and your jobs fail.

Potential treatments

  • This often happens when you’re working with either some code or data that cannot be serialized into a UDF or function, or if you’re working with strange data types that cannot be serialized. If you are using (or intend to be using Kryo serialization), verify that you’re actually registering your classes so that they are indeed serialized.
  • Try not to refer to any fields of the enclosing object in your UDFs when creating UDFs inside a Java or Scala class. This can cause Spark to try to serialize the whole enclosing object, which may not be possible. Instead, copy the relevant fields to local variables in the same scope as closure and use those.

References