Key Takeaways:
- Use Spark
repartition()when the data needs to be evenly distributed across partitions for better parallel processing efficiency. - Use Spark
coalesce()when the number of partitions needs to be reduced for improved performance without expensive full shuffling operations. It only moves data within the same executor. - When using
repartition()orcoalesce(), consider the resulting data movement, performance, and number of partitions for better optimization of Spark jobs.
What is the difference between Repartition and Coalesce?
Shuffle Behavior
- Full Shuffle:Ā Repartition()Ā involves a full shuffle, redistributing all data across the specified partitions.
- Partition 3: This specifies the number of partitions after repartitioning, for example, partitioning data into 3 parts.
- Partition 2:Ā Coalesce()Ā reduces the number of partitions to 2 without a full shuffle, combining data as needed.
- Partition 6: Using repartition to evenly distribute data across 6 partitions can aid in parallel processing.
- Partition 5: Coalesce() is efficient for reducing the number of partitions to 5 if the data can be combined without a full shuffle. For optimal performance, consider the data size and distribution before choosing between repartition() andĀ coalesce().
Performance
- Before using repartition() or coalesce(), consider the available resources and the size of data.
- For reducing the number of partitions without expensive operations, it is recommended to use coalesce() over repartition().
- Use repartition() for evenly distributing data, but be careful as it may result in costly shuffle operations.
- Efficient processing can be ensured by monitoring the number of partitions.
Spark optimize performance
Prioritize coalesce() over repartition() to improve efficiency and minimize expensive operations.
Number of partitions
- Adjusting Partition Size: Determine the optimal number of partitions based on data volume and processing requirements.
- Output from Local[5]: Consider output parallelize: 6 and output textfile: 10 to evaluate the impact of partition 5 on partition size.
- Part Files: Understand the impact on the number of part files generated with partition 5 and other partition sizes.
When to use repartition?
Repartition() is ideal when you need to increase or decrease the number of partitions in a DataFrame, such as going from partition 3 to partition 5. Itās useful for evenly redistributing data to optimize parallelism. Use coalesce() when reducing the number of partitions, like going from partition 5 to partition 2 or 5, to minimize shuffling and improve performance.
When to use Coalesce?
- Consider usingĀ
rdd.coalesce(numPartitions)Ā when you want to reduce the number of partitions in an RDD to improve performance. - Ensure to select anĀ improved versionĀ of the dataset to avoid unnecessary shuffle.
- When usingĀ
coalesce, specify the target number of partitions, e.g.,Ācoalesce(3), if you want to merge data intoĀ partition 3Ā for efficient processing. - Review the data distribution and processing requirements to decide the number of partitions that would optimize the operation, such asĀ partition 2,Ā partition 6, orĀ partition 5.
What Are The Best Practices For Using Repartition and Coalesce?
- Avoid usingĀ
repartition()Ā andĀcoalesce()Ā on small data, as it can degrade performance. - UseĀ
coalesce()Ā when possible to minimize shuffling and optimize performance. - UseĀ
repartition()Ā when data needs to be evenly distributed across partitions for balanced processing.
Monitor number of partitions
It is important to regularly monitor the number of partitions to ensure efficient data processing and prevent excessive partitioning.
we will create a dataset consisting of one million records with two columns: id and departmentName.
from pyspark.sql.functions import expr
employeeId = spark.range(1,1000001,1)
case_statement = "CASE WHEN id <= 100000 THEN 'FrontEnd' WHEN id <= 600000 THEN 'BackEnd' WHEN id <= 900000 THEN 'DataEngineering' WHEN id <= 950000 THEN 'DataScience' ELSE 'Devops' END"
departments = employeeId.withColumn("departmentName",expr(case_statement))
# To understand the data
departments = employeeId.withColumn("departmentName",expr(case_statement))
display(departments.groupBy("departmentName").count())Note: I will be using the following code to know the data distribution across partitions within a DataFrame or RDD.
rdd_partitions = departments.rdd.glom().collect()
for partition_id in range(len(rdd_partitions)):
print(
"partition_id :", partition_id,
"departments_present :",set(
row.departmentName for row in rdd_partitions[partition_id]),
"partition_dist_cnt :",len(rdd_partitions[partition_id]))departments.repartition(5).explain()
departments = departments.repartition(5)
print("No of Partitions : ", departments.rdd.getNumPartitions())
rdd_partitions = departments.rdd.glom().collect()
for partition_id in range(len(rdd_partitions)):
print(
"partition_id :", partition_id,
"departments_present :", set(
row.departmentName for row in rdd_partitions[partition_id]),
"partition_dist_cnt :",len(rdd_partitions[partition_id]))Repartitioning by specifying only the Partition Column : In this case, data distribution across partitions will occur using the Hash partitioning method. Data will be distributed across partitions based on the hash values of the āvalueā column.
departments.repartition("departmentName").explain()
departments = departments.repartition("departmentName")
print("No of Partitions : ",departments.rdd.getNumPartitions())
rdd_partitions = departments.rdd.glom().collect()
for partition_id in range(len(rdd_partitions)):
print(
"partition_id :", partition_id,
"departments_present :", set(
row.departmentName for row in rdd_partitions[partition_id]),
"partition_dist_cnt :",len(rdd_partitions[partition_id]))Spark's Adaptive Query Execution (AQE)
For the sake of this example, I have temporarily disabled Sparkās Adaptive Query Execution (AQE). If AQE is enabled, Spark may not create 200 partitions(AQE Internally uses Coalesce function to merge the smaller partitions), as this can lead to the generation of many empty partitions, which is not an optimal scenario. To follow this the code and its underlying principles, you can disable AQE during the learning process and enable it again once its done.
# To Turn off AQE
spark.conf.set("spark.sql.adaptive.enabled", "False")
# To Turn on AQE
spark.conf.set("spark.sql.adaptive.enabled", "True")Repartitioning using both Number of Partitions and Partition Column In this scenario, we will utilize both the number of partitions and the partition column to perform data repartitioning. Once again, the method employed for distribution is hash partitioning, but the number of partitions will align with the specified input parameter.
departments.repartition(4,"departmentName").explain()
departments = departments.repartition(4,"departmentName")
print("No of Partitions : ",departments.rdd.getNumPartitions())
rdd_partitions = departments.rdd.glom().collect()
for partition_id in range(len(rdd_partitions)):
print(
"partition_id :", partition_id,
"departments_present :", set(
row.departmentName for row in rdd_partitions[partition_id]),
"partition_dist_cnt :",len(rdd_partitions[partition_id]))