Join Types
- Inner joins (keep rows with keys that exist in the left and right datasets)
- Outer joins (keep rows with keys in either the left or right datasets)
- Left outer joins (keep rows with keys in the left dataset)
- Right outer joins (keep rows with keys in the right dataset)
- Left semi joins (keep the rows in the left, and only the left, dataset where the key appears in the right dataset)
- Left anti joins (keep the rows in the left, and only the left, dataset where they do not appear in the right dataset)
- Natural joins (perform a join by implicitly matching the columns between the two datasets with the same names)
- Cross (or Cartesian) joins (match every row in the left dataset with every row in the right dataset)
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")Inner Join
Inner joins (default join) evaluate the keys in both of the DataFrames or tables and include (and join together) only the rows that evaluate to true.
joinType = "inner"
joinExpression = person["graduate_program"] == graduateProgram['id']
person.join(graduateProgram, joinExpression, joinType).show()
+---+----------------+----------------+---------------+---+-------+--------------
| id| name|graduate_program| spark_status| id| degree| department...
+---+----------------+----------------+---------------+---+-------+--------------
| 0| Bill Chambers| 0| [100]| 0|Masters| School...
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS...
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS...
+---+----------------+----------------+---------------+---+-------+--------------Outer Join
Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. If there is no equivalent row in either the left or right DataFrame, Spark will insert null:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+-------------
| id| name|graduate_program| spark_status| id| degree| departmen...
+----+----------------+----------------+---------------+---+-------+-------------
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EEC...
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EEC...
|null| null| null| null| 2|Masters| EEC...
| 0| Bill Chambers| 0| [100]| 0|Masters| School...
+----+----------------+----------------+---------------+---+-------+-------------Left Outer Joins
Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame. If there is no equivalent row in the right DataFrame, Spark will insert null:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+----------+-----------+----+----------------+----------------+---
| id| degree|department| school| id| name|graduate_program|...
+---+-------+----------+-----------+----+----------------+----------------+---
| 0|Masters| School...|UC Berkeley| 0| Bill Chambers| 0|...
| 2|Masters| EECS|UC Berkeley|null| null| null|...
| 1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1|...
| 1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|...
+---+-------+----------+-----------+----+----------------+----------------+---Right Outer Joins
Right outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the right DataFrame as well as any rows in the left DataFrame that have a match in the right DataFrame. If there is no equivalent row in the left DataFrame, Spark will insert null:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()
+----+----------------+----------------+---------------+---+-------+------------+
| id| name|graduate_program| spark_status| id| degree| department|
+----+----------------+----------------+---------------+---+-------+------------+
| 0| Bill Chambers| 0| [100]| 0|Masters| School of...|
|null| null| null| null| 2|Masters| EECS|
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.| EECS|
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.| EECS|
+----+----------------+----------------+---------------+---+-------+------------+Left Semi Joins
Semi joins are a bit of a departure from the other joins. They do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. If the value does exist, those rows will be kept in the result, even if there are duplicate keys in the left DataFrame. Think of left semi joins as filters on a DataFrame, as opposed to the function of a conventional join:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+--------------------+-----------+
| id| degree| department| school|
+---+-------+--------------------+-----------+
| 0|Masters|School of Informa...|UC Berkeley|
| 1| Ph.D.| EECS|UC Berkeley|
+---+-------+--------------------+-----------+Left Anti Joins
Left anti joins are the opposite of left semi joins. Like left semi joins, they do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. However, rather than keeping the values that exist in the second DataFrame, they keep only the values that do not have a corresponding key in the second DataFrame. Think of anti joins as a NOT IN SQL-style filter:
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+----------+-----------+
| id| degree|department| school|
+---+-------+----------+-----------+
| 2|Masters| EECS|UC Berkeley|
+---+-------+----------+-----------+Natural Joins
Natural joins make implicit guesses at the columns on which you would like to join. It finds matching columns and returns the results. Left, right, and outer natural joins are all supported.
Warning
Implicit is always dangerous! The following query will give us incorrect results because the two DataFrames/tables share a column name (id), but it means different things in the datasets. You should always use this join with caution.
SELECT * FROM graduateProgram NATURAL JOIN personCross (Cartesian) Joins
The last of our joins are cross-joins or cartesian products. Cross-joins in simplest terms are inner joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. This will cause an absolute explosion in the number of rows contained in the resulting DataFrame. If you have 1,000 rows in each DataFrame, the crossjoin of these will result in 1,000,000 (1,000 x 1,000) rows.
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()
+---+-------+----------+-----------+---+----------------+----------------+-------
| id| degree|department| school| id| name| graduate_program|spar...
+---+-------+----------+-----------+---+----------------+----------------+-------
| 0|Masters| School...|UC Berkeley| 0| Bill Chambers| 0| ...
| 1| Ph.D.| EECS|UC Berkeley| 2|Michael Armbrust| 1| [2...
| 1| Ph.D.| EECS|UC Berkeley| 1| Matei Zaharia| 1|[500...
+---+-------+----------+-----------+---+----------------+----------------+-------Warning
You should use cross-joins only if you are absolutely, 100 percent sure that this is the join you need. There is a reason why you need to be explicit when defining a cross-join in Spark. They’re dangerous! Advanced users can set the session-level configuration spark.sql.crossJoin.enable to true in order to allow cross-joins without warnings or without Spark trying to perform another join for you.
Challenges When Using Joins
Joins on Complex Types
from pyspark.sql.functions import expr
person.withColumnRenamed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()
+--------+----------------+----------------+---------------+---+--------------+
|personId| name|graduate_program| spark_status| id| status|
+--------+----------------+----------------+---------------+---+--------------+
| 0| Bill Chambers| 0| [100]|100| Contributor|
| 1| Matei Zaharia| 1|[500, 250, 100]|500|Vice President|
| 1| Matei Zaharia| 1|[500, 250, 100]|250| PMC Member|
| 1| Matei Zaharia| 1|[500, 250, 100]|100| Contributor|
| 2|Michael Armbrust| 1| [250, 100]|250| PMC Member|
| 2|Michael Armbrust| 1| [250, 100]|100| Contributor|
+--------+----------------+----------------+---------------+---+--------------+SELECT * FROM
(select id as personId, name, graduate_program, spark_status FROM person)
INNER JOIN sparkStatus ON array_contains(spark_status, id)Handling Duplicate Column Names
One of the tricky things that come up in joins is dealing with duplicate column names in your results DataFrame. This can occur in two distinct situations:
- The join expression that you specify does not remove one key from one of the input DataFrames and the keys have the same column name
- Two columns on which you are not performing the join have the same name
Let’s create a problem dataset that we can use to illustrate these problems:
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
joinExpr = gradProgramDupe.col("graduate_program") === person.col( "graduate_program")Note that there are now two graduate_program columns, even though we joined on that key:
person.join(gradProgramDupe, joinExpr).show()The challenge arises when we refer to one of these columns:
person.join(gradProgramDupe, joinExpr).select("graduate_program").show()Given the previous code snippet, we will receive an error. In this particular example, Spark generates this message:
org.apache.spark.sql.AnalysisException: Reference 'graduate_program' is ambiguous, could be: graduate_program#40, graduate_program#1079.;Approach 1: Different join expression
person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()Approach 2: Dropping the column after the join
person.join(gradProgramDupe, joinExpr).drop(person.col("graduate_program")) .select("graduate_program").show()Approach 3: Renaming a column before the join
gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
joinExpr = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3, joinExpr).show()How Spark Performs Joins
The two core resources at play:
- The node-to-node communication strategy
- Per node computation strategy
Communication Strategies
Spark approaches cluster communication in two different ways during joins:
- Shuffle join (all-to-all communication)
- Broadcast join The core foundation of our simplified view of joins is that in Spark you will have either a big table or a small table. Although this is obviously a spectrum (and things do happen differently if you have a “medium-sized table”), it can help to be binary about the distinction for the sake of this explanation.
Big table–to–big table
When you join a big table to another big table, you end up with a shuffle join, such as that illustrates

In a shuffle join, every node talks to every other node and they share data according to which node has a certain key or set of keys (on which you are joining).
Important
These joins are expensive because the network can become congested with traffic, especially if your data is not partitioned well.
This join describes taking a big table of data and joining it to another big table of data. An example of this might be a company that receives billions of messages every day from the Internet of Things, and needs to identify the day-over-day changes that have occurred. The way to do this is by joining on deviceId, messageType, and date in one column, and date - 1 day in the other column.
In Figure 8-1, DataFrame 1 and DataFrame 2 are both large DataFrames. This means that all worker nodes (and potentially every partition) will need to communicate with one another during the entire join process (with no intelligent partitioning of data).
Big table–to–small table When the table is small enough to fit into the memory of a single worker node. We can optimize our join.
Although we can use a big table–to–big table communication strategy, it can often be more efficient to use a broadcast join. What this means is that we will replicate our small DataFrame onto every worker node in the cluster (be it located on one machine or many).
Now this sounds expensive, what this does is prevent us from performing the all-to-all communication during the entire join process.
Instead, we perform it only once at the beginning and then let each individual worker node perform the work without having to wait or communicate with any other worker node
At the beginning of this join will be a large communication, immediately after that first, there will be no further communication between nodes. This means that joins will be performed on every single node individually, making CPU the biggest bottleneck.
For our current set of data, we can see that Spark has automatically set this up as a broadcast join by looking at the explain plan:
joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr).explain()
#== Physical Plan ==
#*BroadcastHashJoin [graduate_program#40], [id#5....
#:- LocalTableScan [id#38, name#39, graduate_progr...
#+- BroadcastExchange HashedRelationBroadcastMode(....
#+- LocalTableScan [id#56, degree#57, departmen....With the DataFrame API, we can also explicitly give the optimizer a hint that we would like to use a broadcast join by using the correct function around the small DataFrame in question. In this example, these result in the same plan we just saw; however, this is not always the case:
from pyspark.sql.functions import broadcast
joinExpr = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr).explain()The SQL interface also includes the ability to provide hints to perform joins. These are not enforced, however, so the optimizer might choose to ignore them. You can set one of these hints by using a special comment syntax. MAPJOIN, BROADCAST, and BROADCASTJOIN all do the same thing and are all supported:
SELECT /*+ MAPJOIN(graduateProgram) */
*
FROM person JOIN graduateProgram ON person.graduate_program = graduateProgram.idBroadcast something too large
You can crash your driver node (because that collect is expensive).
More about 5. Pyspark broadcast join
Little table–to–little table When performing joins with small tables, it’s usually best to let Spark decide how to join them. You can always force a broadcast join if you’re noticing strange behavior.
Conclusion
- It is important to consider is if you partition your data correctly prior to a join, you can end up with much more efficient execution because even if a shuffle is planned, if data from two different DataFrames is already located on the same machine, Spark can avoid the shuffle.
- Experiment with some of your data and try partitioning beforehand to see if you can notice the increase in speed when performing those joins.
- Later on, Spark’s data source APIs. There are additional implications when you decide what order joins should occur in. Because some joins act as filters, this can be a low-hanging improvement in your workloads, as you are guaranteed to reduce data exchanged over the network.
Optimize Spark Jobs

References
- Spark: The Definitive Guide by Bill Chambers and Matei Zaharia.
