Overview

This chapter covers the advanced RDD operations and focuses on key–value RDDs, a powerful abstraction for manipulating data. We also touch on some more advanced topics like custom partitioning, a reason you might want to use RDDs in the first place. With a custom partitioning function, you can control exactly how data is laid out on the cluster and manipulate that individual partition accordingly. Before we get there, let’s summarize the key topics we will cover:

  • Aggregations and key–value RDDs
  • Custom partitioning
  • RDD joins

The example dataset:

# in Python 
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\ 
.split(" ") 
words = spark.sparkContext.parallelize(myCollection, 2)

Key-Value Basics (Key-Value RDDs)

HINT

Whenever you see ByKey in a method name, it means that you can perform this only on a PairRDD type.

words.map(lambda word: (word.lower(), 1))

keyBy

The preceding example demonstrated a simple way to create a key. However, you can also use the keyBy function to achieve the same result by specifying a function that creates the key from your current value.

keyword = words.keyBy(lambda word: word.lower()[0])

Mapping over Values

INFO

If we have a tuple, Spark will assume that the first element is the key, and the second is the value. When in this format, you can explicitly choose to map-over the values (and ignore the individual keys).

keyword.mapValues(lambda word: word.upper()).collect()
 
# Output
[
	 ('s', 'SPARK'), 
	 ('t', 'THE'), 
	 ('d', 'DEFINITIVE'), 
	 ('g', 'GUIDE'), 
	 (':', ':'), 
	 ('b', 'BIG'),
	 ('d', 'DATA'), 
	 ('p', 'PROCESSING'), 
	 ('m', 'MADE'), 
	 ('s', 'SIMPLE')
]

You can flatMap over the rows to expand the number of rows that you have to make it so that each row represents a character. In the following example, we will omit the output, but it would simply be each character as we converted them into arrays:

keyword.flatMapValues(lambda word: word.upper()).collect()

Extracting Keys and Values

When we are in the key–value pair format, we can also extract the specific keys or values by using the following methods:

keyword.keys().collect() 
keyword.values().collect()

lookup

One interesting task you might want to do with an RDD is look up the result for a particular key.

NOTE

There is no enforcement mechanism with respect to there being only one key for each input.

If we lookup “s”, we are going to get both values associated with that—“Spark” and “Simple”:

keyword.lookup("s")

sampleByKey

There are two ways: approximation or exactly This is done via simple random sampling with one pass over the RDD, which produces a sample of size that’s approximately equal to the sum of math.ceil(numItems * samplingRate) over all key values:

import random 
 
distinctChars = words.flatMap(
	lambda word: list(word.lower())
).distinct().collect() 
sampleMap = dict(
	map(lambda c: (c, random.random()), distinctChars)
) 
words.map(lambda word: (word.lower()[0], word))\ 
.sampleByKey(True, sampleMap, 6).collect()

This method differs from sampleByKey in that you make additional passes over the RDD to create a sample size that’s exactly equal to the sum of math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence.

NOTE

When sampling without replacement, you need one additional pass over the RDD to guarantee sample size.

When sampling with replacement, you need two additional passes:

words.map(lambda word: (word.lower()[0], word))\ 
.sampleByKey(True, sampleMap, 6).collect()

Aggregations

Example:

chars = words.flatMap(lambda word: word.lower()) 
KVcharacters = chars.map(lambda letter: (letter, 1)) 
 
def maxFunc(left, right): 
	return max(left, right)
	 
def addFunc(left, right): 
	return left + right
 
nums = sc.parallelize(range(1,31), 5)

countByKey

KVcharacters.countByKey()

Understanding Aggregation Implementations

The implementation is actually quite important for job stability. The two fundamental: groupBy and reduce

groupByKey Looking at the API documentation, you might think groupByKey with a map over each grouping is the best way to sum up the counts for each key:

from functools import reduce
 
KVcharacters.groupByKey().map(
	lambda row: (row[0], reduce(addFunc, row[1]))).collect()

However, this is, for the majority of cases, the wrong way to approach the problem.

The fundamental issue of above example

The fundamental issue here is that each executor must hold all values for a given key in memory before applying the function to them.

Why is this problematic?

If you have massive key skew, some partitions might be completely overloaded with a ton of values for a given key, and you will get OutOfMemoryErrors. This obviously doesn’t cause an issue with our current dataset, but it can cause serious problems at scale. This is not guaranteed to happen, but it can happen.

There are use cases when groupByKey does make sense. If you have consistent value sizes for each key and know that they will fit in the memory of a given executor, you’re going to be just fine. It’s just good to know exactly what you’re getting yourself into when you do this. There is a preferred approach for additive use cases: reduceByKey.

reduceByKey Because we are performing a simple count, a much more stable approach is to perform the same flatMap and then just perform a map to map each letter instance to the number one, and then perform a reduceByKey with a summation function in order to collect back the array.

Why is the reduce method much more stable?

The reduce happens within each partition and doesn’t need to put everything in memory. There is no incurred shuffle during this operation; everything happens at each worker individually before performing a final reduce.

KVcharacters.reduceByKey(addFunc).collect()
 
# The output
Array((d,4), (p,3), (t,3), (b,1), (h,1), (n,2), 
... 
(a,4), (i,7), (k,1), (u,1), (o,1), (g,3), (m,2), (c,1))

The reduceByKey method returns an RDD of a group (the key) and sequence of elements that are not guranteed to have an ordering. Therefore this method is completely appropriate when our workload is associative but inappropriate when the order matters.

Other Aggregation Methods

aggregate

IMPORTANT

This function requires a null and start value and then requires you to specify two different functions. The first aggregates within partitions, the second aggregates across partitions. The start value will be used at both aggregation levels

nums.aggregate(0, maxFunc, addFunc)

Performance implication

aggregate does have some performance implications because it performs the final aggregation on the driver. If the results from the executors are too large, they can take down the driver with an OutOfMemoryError.

There is another method, treeAggregate that does the same thing as aggregate (at the user level) but does so in a different way.

treeAggreate

It basically “pushes down” some of the subaggregations (creating a tree from executor to executor) before performing the final aggregation on the driver. Having multiple levels can help you to ensure that the driver does not run out of memory in the process of the aggregation. These tree-based implementations are often to try to improve stability in certain operations

depth = 3 nums.treeAggregate(0, maxFunc, addFunc, depth)

aggregateByKey This function does the same as aggregate but instead of doing it partition by partition, it does it by key. The start value and functions follow the same properties:

KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()

combineByKey Instead of specifying an aggregation function, you can specify a combiner.

The combiner

This combiner operates on a given key and merges the values according to some function. It then goes to merge the different outputs of the combiners to give us our result.

def valToCombiner(value): 
	return [value]
 
def mergeValuesFunc(vals, valToAppend):
	vals.append(valToAppend) 
	return vals 
 
def mergeCombinerFunc(vals1, vals2): 
	return vals1 + vals2 
	
outputPartitions = 6 
KVcharacters\ 
	.combineByKey( 
		valToCombiner, 
		mergeValuesFunc, 
		mergeCombinerFunc, 
		outputPartitions)\ 
	.collect()

foldByKey foldByKey merges the values for each key using an associative function and a neutral “zero value,” which can be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication):

KVcharacters.foldByKey(0, addFunc).collect()

CoGroups

The CoGroups

CoGroups give you the ability to group together up to three key–value RDDs together in Scala and two in Python. This joins the given values by key.

This is effectively just a group-based join on an RDD. When doing this, you can also specify a number of output partitions or a custom partitioning function to control exactly how this data is distributed across the cluster.

import random 
 
distinctChars = words.flatMap(
	lambda word: word.lower()).distinct() 
charRDD = distinctChars.map(
	lambda c: (c, random.random())) 
charRDD2 = distinctChars.map(
	lambda c: (c, random.random()))
 
charRDD.cogroup(charRDD2).take(5)

The result is a group with our key on one side, and all of the relevant values on the other side.

Joins

RDDs have much the same joins as we saw in the Structured API.

Inner Join

We’ll demonstrate an inner join now. Notice how we are setting the number of output partitions we would like to see:

keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10 
KVcharacters.join(keyedChars).count() 
KVcharacters.join(keyedChars, outputPartitions).count()

cartesian

cartesian (This, again, is very dangerous! It does not accept a join key and can have a massive output.)

zips

The zips isn’t really types of join,but it combine 2 RDDs.

NOTE

2 RDDs have the same length, and they must have the same number of partitions. This creates a PairRDD.

numRange = sc.parallelize(range(10), 2) 
words.zip(numRange).collect()
 
# The output
[
	 ('Spark', 0), 
	 ('The', 1), 
	 ('Definitive', 2), 
	 ('Guide', 3), 
	 (':', 4), 
	 ('Big', 5), 
	 ('Data', 6), 
	 ('Processing', 7), 
	 ('Made', 8), 
	 ('Simple', 9)
]

Controlling Partitions

You have control over how data is exactly physically distributed across the cluster. The key addition (that does not exist in the Structured APIs) is the ability to specify a partitioning function (formally a custom Partitioner, which we discuss later when we look at basic methods).

coalesce

coalesce effectively collapses partitions on the same worker in order to avoid a shuffle of the data when repartitioning. For instance, our words RDD is currently two partitions, we can collapse that to one partition by using coalesce without bringing about a shuffle of the data:

words.coalesce(1).getNumPartitions() # 1

repartition

The repartition operation allows you to repartition your data up or down but performs a shuffle across nodes in the process. Increasing the number of partitions can increase the level of parallelism when operating in map- and filter-type operations:

words.repartition(10) # gives us 10 partitions

repartitionAndSortWithinPartitions

This operation gives you the ability to repartition as well as specify the ordering of each one of those output partitions.

Custom Partitioning (Important)

This ability is one of the primary reasons you’d want to use RDDs. Custom partitioners are not available in the Structured APIs because they don’t really have a logical counterpart. The canonical example to motivate custom partition for this operation is PageRank whereby we seek to control the layout of the data on the cluster and avoid shuffles.

In our shopping dataset, this might mean partitioning by each customer ID (we’ll get to this example in a moment). In short, the sole goal of custom partitioning is to even out the distribution of your data across the cluster so that you can work around problems like data skew.

If you’re going to use custom partitioners, you should drop down to RDDs from the Structured APIs, apply your custom partitioner, and then convert it back to a DataFrame or Dataset. This way, you get the best of both worlds, only dropping down to custom partitioning when you need to.

Custom Partitioner

To perform custom partitioning you need to implement your own class that extends Partitioner.

WARNING

You need to do this only when you have lots of domain knowledge about your problem space

df = spark.read\
	.option("header", "true")\
	.option("inferSchema", "true")\ 
	.csv("/data/retail-data/all/") 
 
rdd = df.coalesce(10).rdd 
 
df.printSchema()

Spark has two built-in Partitioners:

  • HashPartitioner
  • RangePartitioner

These two work for discrete values and continuous values. Spark’s Structured APIs will already use these, although we can use the same thing in RDDs.

import org.apache.spark.HashPartitioner 
 
rdd.map(r => r(6)).take(5).foreach(println) 
val keyedRDD = rdd.keyBy(row => row(6).asInstanceOf[Int].toDouble) 
keyedRDD.partitionBy(new HashPartitioner(10)).take(10)

Although the hash and range partitioners are useful, they’re fairly rudimentary. At times, you will need to perform some very low-level partitioning because you’re working with very large data and large key skew. You want to break these keys as much as possible to improve parallelism and prevent OutOfMemoryErrors during the course of execution.

One instance might be that you need to partition more keys if and only if the key matches a certain format. For instance, we might know that there are two customers in your dataset that always crash your analysis and we need to break them up further than other customer IDs. In fact, these two are so skewed that they need to be operated on alone, whereas all of the others can be lumped into large groups. This is obviously a bit of a caricatured example, but you might see similar situations in your data, as well:

import org.apache.spark.Partitioner 
 
class DomainPartitioner extends Partitioner { 
	def numPartitions = 3 
	def getPartition(key: Any): Int = { 
		val customerId = key.asInstanceOf[Double].toInt 
		if (customerId == 17850.0 || customerId == 12583.0) {
			return 0 
		} else { 
			return new java.util.Random().nextInt(2) + 1 
		} 
	} 
} 
keyedRDD 
	.partitionBy(new DomainPartitioner)
	.map(_._1)
	.glom()
	.map(_.toSet.toSeq.length) 
	.take(5)

After you run this, you will see the count of results in each partition. The second two numbers will vary, because we’re distributing them randomly (as you will see when we do the same in Python) but the same principles apply:

def partitionFunc(key): 
	import random 
	if key == 17850 or key == 12583: 
		return 0 
	else: 
		return random.randint(1,2)
 
keyedRDD = rdd.keyBy(lambda row: row[6]) 
keyedRDD\ 
	.partitionBy(3, partitionFunc)\ 
	.map(lambda x: x[0])\ 
	.glom()\ 
	.map(lambda x: len(set(x)))\ 
	.take(5)

This custom key distribution logic is available only at the RDD level. Of course, this is a simple example, but it does show the power of using arbitrary logic to distribute the data around the cluster in a physical manner.

Custom Serialization

The issue of Kryo serialization. Any object that you hope to parallelize (or function) must be serializable:

class SomeClass extends Serializable { 
	var someValue = 0 
	def setSomeValue(i:Int) = { 
		someValue = i 
		this 
	} 
} 
 
sc.parallelize(1 to 10)
	.map(num => new SomeClass().setSomeValue(num))

The default serialization can be quite slow. Spark can use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all serializable types and requires you to register the classes you’ll use in the program in advance for best performance.

Initializing to use Kryo

You can use Kryo by initializing your job with a SparkConf and setting the value of spark.serializer to org.apache.spark.serializer.KryoSerializer

INFO

This setting configures the serializer used for shuffling data between worker nodes and serializing RDDs to disk.

Why is the Kryo not default serializer?

The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application. Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.

To register your own custom classes with Kryo, use the registerKryoClasses method:

val conf = new SparkConf().setMaster(...).setAppName(...) 
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) 
val sc = new SparkContext(conf)

References