Introduction

In this page, you will discover about how Spark runs your code on the cluster and deploy it on a cluster.

Writing Spark Applications

There are two parts: a Spark cluster and your code.

Writing Python Applications

To write a spark applications, Spark provided a package to do that. It’s PySpark. Writing an application with PySpark, it’s quite similar normal Python application.

To facilitate code reuse, it is common to package multiple Python files into egg or ZIP files of Spark code. To do that, you can use the --py-files argument of spark-submit to be distributed with your application.

When it’s time to run your code, you create the equivalent of a “Scala/Java main class” in Python. Specify a certain script as an executable script that builds the SparkSession. This is the one that we will pass as the main argument to spark-submit:

from __future__ import print_function 
 
if __name__ == '__main__': 
	from pyspark.sql import SparkSession 
	
	spark = SparkSession.builder \ 
		.master("local") \ 
		.appName("Word Count") \ 
		.config("spark.some.config.option", "some-value") \
		.getOrCreate() 
	print(spark.range(5000).where("id > 500").selectExpr("sum(id)").collect())

When you do this, you’re going to get a SparkSession that you can pass around your application. It is best practice to pass around this variable at runtime rather than instantiating it within every Python class.

Running the application

$SPARK_HOME/bin/spark-submit --master local pyspark_template/main.py

Testing Spark Application

Strategic Principles

In this section, we’ll first discuss what you might want to test in a typical Spark Application, then discuss how to organize your code for easy testing.

Input data resilience

Being resilient to different kinds of input data is something that is quite fundamental to how you write your data pipelines. The data will change because the business needs will change. Therefore your Spark Applications and pipelines should be resilient to at least some degree of change in the input data or otherwise ensure that these failures are handled in a graceful and resilient way.

Business logic resilience and evolution

The business logic in your pipelines will likely change as well as the input data. Even more importantly, you want to be sure that what you’re deducing from the raw data is what you actually think that you’re deducing.

Resilience in output and atomicity

Assuming that you’re prepared for departures in the structure of input data and that your business logic is well tested, you now want to ensure that your output structure is what you expect. This means you will need to gracefully handle output schema resolution. It’s not often that data is simply dumped in some location, never to be read again—most of your Spark pipelines are probably feeding other Spark pipelines. For this reason you’re going to want to make certain that your downstream consumers understand the “state” of the data—this could mean how frequently it’s updated as well as whether the data is “complete” (e.g., there is no late data) or that there won’t be any last-minute corrections to the data.

Tactical Takeaways

Managing SparkSessions

Testing your Spark code using a unit test framework such as: pyspark.testing or pytest, … The Spark supports Spark’s local mode, which just create a local model SparkSession as part of your test harness to run it.

Dependency Injection

Dependency injection (DI) is a design pattern where objects receive their dependencies from external sources rather than creating them themselves.

You should try to perform dependency injection as much as possible when managing SparkSessions in your code.

In the context of Spark, it means that your code should not directly create a SparkSession. Instead, it should receive a SparkSession instance as a parameter.

That is, initialize the SparkSession only once and pass it around to relevant functions and classes at runtime in a way that makes it easy to substitute during testing. This makes it much easier to test each individual function with a dummy SparkSession in unit tests.

class MySparkJob:
    def __init__(self, spark):
        self.spark = spark
 
    def process_data(self, input_path, output_path):
        df = self.spark.read.csv(input_path)
        # ... perform transformations on df ...
        df.write.csv(output_path)
 
from pyspark.sql import SparkSession
from pytest import fixture
 
class TestMySparkJob:
    @fixture(scope="class")
    def spark(self):
        spark = SparkSession.builder().master("local[*]") \
	        .appName("test-app").getOrCreate()
        yield spark
        spark.stop()
 
    def test_process_data(self, spark):
        job = MySparkJob(spark)
        input_path = "src/test/resources/input.csv"
        output_path = "target/test-output"
 
        job.process_data(input_path, output_path)
        # ... assert the output file's contents ...

Which Spark API to Use?

To be perfectly honest, the right API depends on your team and its needs: some teams and projects will need the less strict SQL and DataFrame APIs for speed of development, while others will want to use type-safe Datasets or RDDs.

Tip

In general, we recommend documenting and testing the input and output types of each function regardless of which API you use.

Connecting to Data Sources

As much as possible, you should make sure your testing code does not connect to production data sources, so that developers can easily run it in isolation if these data sources change.

One easy way to make this happen is to have all your business logic functions take DataFrames or Datasets as input instead of directly connecting to various sources; after all, subsequent code will work the same way no matter what the data source was.

Tip

If you are using the structured APIs in Spark, another way to make this happen is named tables: you can simply register some dummy datasets (e.g., loaded from small text file or from in-memory objects) as various table names and go from there.

Configuring Applications

The majority of configurations fall into the following categories:

  • Application properties
  • Runtime environment
  • Shuffle behavior
  • Spark UI
  • Compression and serialization
  • Memory management
  • Execution behavior
  • Networking
  • Scheduling
  • Dynamic allocation
  • Security
  • Encryption
  • Spark SQL
  • Spark streaming
  • SparkR Spark provides three locations to configure the system:
  • Spark properties control most application parameters and can be set by using a SparkConf object
  • Java system properties
  • Hardcoded configuration files

References Spark Configuration

Environmental Variables

You can configure certain Spark settings through environment variables, which are read from the conf/spark-env.sh script in the directory where Spark is installed.

Spark env file

conf/spark-env.sh does not exist by default when Spark is installed. However, you can copy conf/spark-env.sh.template to create it. Be sure to make the copy executable.

References Spark Environment Variables

Running Spark on YARN

When running Spark on YARN in cluster mode, you need to set environment variables by using the spark.yarn.appMasterEnv.[EnvironmentVariableName] property in your conf/spark-defaults.conf file. Environment variables that are set in spark-env.sh will not be reflected in the YARN Application Master process in cluster mode. See the YARN-related Spark Properties for more information.

Job Scheduling Within an Application

Within a given Spark Application, multiple parallel jobs can run simultaneously if they were submitted from separate threads. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g., queries for multiple users).

FIFO

Spark Scheduler

By default, Spark’s scheduler runs jobs in FIFO fashion.

Priority Spark Jobs

If the jobs at the head of the queue don’t need to use the entire cluster, later jobs can begin to run right away, but if the jobs at the head of the queue are large, later jobs might be delayed significantly.

Fair sharing (round-robin)

It is also possible to configure fair sharing between jobs.

Fair sharing

Under fair sharing, Spark assigns tasks between jobs in a round-robin fashion so that all jobs get a roughly equal share of cluster resources.

Priority Spark Jobs

This means that short jobs submitted while a long job is running can begin receiving resources right away and still achieve good response times without waiting for the long job to finish. This mode is best for multiple users settings.

Enable the fair scheduler

To enable the fair scheduler, set the spark.scheduler.mode property to FAIR when configuring a SparkContext.

The fair scheduler also supports grouping jobs into pools, and setting different scheduling options, or weights, for each pool. This can be useful to create a high-priority pool for more important jobs or to group the jobs of each user together and give users equal shares regardless of how many concurrent jobs they have instead of giving jobs equal shares. This approach is modeled after the Hadoop Fair Scheduler.

Without any intervention, newly submitted jobs go into a default pool, but jobs pools can be set by adding the spark.scheduler.pool local property to the SparkContext in the thread that’s submitting them. This is done as follows (assuming sc is your SparkContext):

sc.setLocalProperty("spark.scheduler.pool", "pool1")

After setting this local property, all jobs submitted within this thread will use this pool name. The setting is per-thread to make it easy to have a thread run multiple jobs on behalf of the same user. If you’d like to clear the pool that a thread is associated with, set it to null.

References