Overview
This chapter covers the advanced RDD operations and focuses on key–value RDDs, a powerful abstraction for manipulating data. We also touch on some more advanced topics like custom partitioning, a reason you might want to use RDDs in the first place. With a custom partitioning function, you can control exactly how data is laid out on the cluster and manipulate that individual partition accordingly. Before we get there, let’s summarize the key topics we will cover:
- Aggregations and key–value RDDs
- Custom partitioning
- RDD joins
The example dataset:
# in Python
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
.split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)Key-Value Basics (Key-Value RDDs)
HINT
Whenever you see ByKey in a method name, it means that you can perform this only on a PairRDD type.
words.map(lambda word: (word.lower(), 1))keyBy
The preceding example demonstrated a simple way to create a key.
However, you can also use the keyBy function to achieve the same result by specifying a function that creates the key from your current value.
keyword = words.keyBy(lambda word: word.lower()[0])Mapping over Values
INFO
If we have a tuple, Spark will assume that the first element is the key, and the second is the value. When in this format, you can explicitly choose to map-over the values (and ignore the individual keys).
keyword.mapValues(lambda word: word.upper()).collect()
# Output
[
('s', 'SPARK'),
('t', 'THE'),
('d', 'DEFINITIVE'),
('g', 'GUIDE'),
(':', ':'),
('b', 'BIG'),
('d', 'DATA'),
('p', 'PROCESSING'),
('m', 'MADE'),
('s', 'SIMPLE')
]You can flatMap over the rows to expand the number of rows that you have to make it so that each row represents a character. In the following example, we will omit the output, but it would simply be each character as we converted them into arrays:
keyword.flatMapValues(lambda word: word.upper()).collect()Extracting Keys and Values
When we are in the key–value pair format, we can also extract the specific keys or values by using the following methods:
keyword.keys().collect()
keyword.values().collect()lookup
One interesting task you might want to do with an RDD is look up the result for a particular key.
NOTE
There is no enforcement mechanism with respect to there being only one key for each input.
If we lookup “s”, we are going to get both values associated with that—“Spark” and “Simple”:
keyword.lookup("s")sampleByKey
There are two ways: approximation or exactly
This is done via simple random sampling with one pass over the RDD, which produces a sample of size that’s approximately equal to the sum of math.ceil(numItems * samplingRate) over all key values:
import random
distinctChars = words.flatMap(
lambda word: list(word.lower())
).distinct().collect()
sampleMap = dict(
map(lambda c: (c, random.random()), distinctChars)
)
words.map(lambda word: (word.lower()[0], word))\
.sampleByKey(True, sampleMap, 6).collect()This method differs from sampleByKey in that you make additional passes over the RDD to create a sample size that’s exactly equal to the sum of math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence.
NOTE
When sampling without replacement, you need one additional pass over the RDD to guarantee sample size.
When sampling with replacement, you need two additional passes:
words.map(lambda word: (word.lower()[0], word))\
.sampleByKey(True, sampleMap, 6).collect()Aggregations
Example:
chars = words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))
def maxFunc(left, right):
return max(left, right)
def addFunc(left, right):
return left + right
nums = sc.parallelize(range(1,31), 5)countByKey
KVcharacters.countByKey()Understanding Aggregation Implementations
The implementation is actually quite important for job stability. The two fundamental: groupBy and reduce
groupByKey Looking at the API documentation, you might think groupByKey with a map over each grouping is the best way to sum up the counts for each key:
from functools import reduce
KVcharacters.groupByKey().map(
lambda row: (row[0], reduce(addFunc, row[1]))).collect()However, this is, for the majority of cases, the wrong way to approach the problem.
The fundamental issue of above example
The fundamental issue here is that each executor must hold all values for a given key in memory before applying the function to them.
Why is this problematic?
If you have massive key skew, some partitions might be completely overloaded with a ton of values for a given key, and you will get
OutOfMemoryErrors. This obviously doesn’t cause an issue with our current dataset, but it can cause serious problems at scale. This is not guaranteed to happen, but it can happen.
There are use cases when groupByKey does make sense. If you have consistent value sizes for each key and know that they will fit in the memory of a given executor, you’re going to be just fine. It’s just good to know exactly what you’re getting yourself into when you do this. There is a preferred approach for additive use cases: reduceByKey.
reduceByKey
Because we are performing a simple count, a much more stable approach is to perform the same flatMap and then just perform a map to map each letter instance to the number one, and then perform a reduceByKey with a summation function in order to collect back the array.
Why is the reduce method much more stable?
The reduce happens within each partition and doesn’t need to put everything in memory. There is no incurred shuffle during this operation; everything happens at each worker individually before performing a final reduce.
KVcharacters.reduceByKey(addFunc).collect()
# The output
Array((d,4), (p,3), (t,3), (b,1), (h,1), (n,2),
...
(a,4), (i,7), (k,1), (u,1), (o,1), (g,3), (m,2), (c,1))The reduceByKey method returns an RDD of a group (the key) and sequence of elements that are not guranteed to have an ordering. Therefore this method is completely appropriate when our workload is associative but inappropriate when the order matters.
Other Aggregation Methods
aggregate
IMPORTANT
This function requires a null and start value and then requires you to specify two different functions. The first aggregates within partitions, the second aggregates across partitions. The start value will be used at both aggregation levels
nums.aggregate(0, maxFunc, addFunc)Performance implication
aggregatedoes have some performance implications because it performs the final aggregation on the driver. If the results from the executors are too large, they can take down the driver with anOutOfMemoryError.
There is another method, treeAggregate that does the same thing as aggregate (at the user level) but does so in a different way.
treeAggreate
It basically “pushes down” some of the subaggregations (creating a tree from executor to executor) before performing the final aggregation on the driver. Having multiple levels can help you to ensure that the driver does not run out of memory in the process of the aggregation. These tree-based implementations are often to try to improve stability in certain operations
depth = 3 nums.treeAggregate(0, maxFunc, addFunc, depth)aggregateByKey
This function does the same as aggregate but instead of doing it partition by partition, it does it by key. The start value and functions follow the same properties:
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()combineByKey Instead of specifying an aggregation function, you can specify a combiner.
The combiner
This combiner operates on a given key and merges the values according to some function. It then goes to merge the different outputs of the combiners to give us our result.
def valToCombiner(value):
return [value]
def mergeValuesFunc(vals, valToAppend):
vals.append(valToAppend)
return vals
def mergeCombinerFunc(vals1, vals2):
return vals1 + vals2
outputPartitions = 6
KVcharacters\
.combineByKey(
valToCombiner,
mergeValuesFunc,
mergeCombinerFunc,
outputPartitions)\
.collect()foldByKey
foldByKey merges the values for each key using an associative function and a neutral “zero value,” which can be added to the result an arbitrary number of times, and must not change the result (e.g., 0 for addition, or 1 for multiplication):
KVcharacters.foldByKey(0, addFunc).collect()CoGroups
The CoGroups
CoGroupsgive you the ability to group together up to three key–value RDDs together in Scala and two in Python. This joins the given values by key.
This is effectively just a group-based join on an RDD. When doing this, you can also specify a number of output partitions or a custom partitioning function to control exactly how this data is distributed across the cluster.
import random
distinctChars = words.flatMap(
lambda word: word.lower()).distinct()
charRDD = distinctChars.map(
lambda c: (c, random.random()))
charRDD2 = distinctChars.map(
lambda c: (c, random.random()))
charRDD.cogroup(charRDD2).take(5)The result is a group with our key on one side, and all of the relevant values on the other side.
Joins
RDDs have much the same joins as we saw in the Structured API.
Inner Join
We’ll demonstrate an inner join now. Notice how we are setting the number of output partitions we would like to see:
keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10
KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()cartesian
cartesian(This, again, is very dangerous! It does not accept a join key and can have a massive output.)
zips
The zips isn’t really types of join,but it combine 2 RDDs.
NOTE
2 RDDs have the same length, and they must have the same number of partitions. This creates a
PairRDD.
numRange = sc.parallelize(range(10), 2)
words.zip(numRange).collect()
# The output
[
('Spark', 0),
('The', 1),
('Definitive', 2),
('Guide', 3),
(':', 4),
('Big', 5),
('Data', 6),
('Processing', 7),
('Made', 8),
('Simple', 9)
]Controlling Partitions
You have control over how data is exactly physically distributed across the cluster.
The key addition (that does not exist in the Structured APIs) is the ability to specify a partitioning function (formally a custom Partitioner, which we discuss later when we look at basic methods).
coalesce
coalesce effectively collapses partitions on the same worker in order to avoid a shuffle of the data when repartitioning.
For instance, our words RDD is currently two partitions, we can collapse that to one partition by using coalesce without bringing about a shuffle of the data:
words.coalesce(1).getNumPartitions() # 1repartition
The repartition operation allows you to repartition your data up or down but performs a shuffle across nodes in the process. Increasing the number of partitions can increase the level of parallelism when operating in map- and filter-type operations:
words.repartition(10) # gives us 10 partitionsrepartitionAndSortWithinPartitions
This operation gives you the ability to repartition as well as specify the ordering of each one of those output partitions.
Custom Partitioning (Important)
This ability is one of the primary reasons you’d want to use RDDs. Custom partitioners are not available in the Structured APIs because they don’t really have a logical counterpart. The canonical example to motivate custom partition for this operation is PageRank whereby we seek to control the layout of the data on the cluster and avoid shuffles.
In our shopping dataset, this might mean partitioning by each customer ID (we’ll get to this example in a moment). In short, the sole goal of custom partitioning is to even out the distribution of your data across the cluster so that you can work around problems like data skew.
If you’re going to use custom partitioners, you should drop down to RDDs from the Structured APIs, apply your custom partitioner, and then convert it back to a DataFrame or Dataset. This way, you get the best of both worlds, only dropping down to custom partitioning when you need to.
Custom Partitioner
To perform custom partitioning you need to implement your own class that extends
Partitioner.
WARNING
You need to do this only when you have lots of domain knowledge about your problem space
df = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.csv("/data/retail-data/all/")
rdd = df.coalesce(10).rdd
df.printSchema()Spark has two built-in Partitioners:
HashPartitionerRangePartitioner
These two work for discrete values and continuous values. Spark’s Structured APIs will already use these, although we can use the same thing in RDDs.
import org.apache.spark.HashPartitioner
rdd.map(r => r(6)).take(5).foreach(println)
val keyedRDD = rdd.keyBy(row => row(6).asInstanceOf[Int].toDouble)
keyedRDD.partitionBy(new HashPartitioner(10)).take(10)Although the hash and range partitioners are useful, they’re fairly rudimentary. At times, you will need to perform some very low-level partitioning because you’re working with very large data and large key skew. You want to break these keys as much as possible to improve parallelism and prevent OutOfMemoryErrors during the course of execution.
One instance might be that you need to partition more keys if and only if the key matches a certain format. For instance, we might know that there are two customers in your dataset that always crash your analysis and we need to break them up further than other customer IDs. In fact, these two are so skewed that they need to be operated on alone, whereas all of the others can be lumped into large groups. This is obviously a bit of a caricatured example, but you might see similar situations in your data, as well:
import org.apache.spark.Partitioner
class DomainPartitioner extends Partitioner {
def numPartitions = 3
def getPartition(key: Any): Int = {
val customerId = key.asInstanceOf[Double].toInt
if (customerId == 17850.0 || customerId == 12583.0) {
return 0
} else {
return new java.util.Random().nextInt(2) + 1
}
}
}
keyedRDD
.partitionBy(new DomainPartitioner)
.map(_._1)
.glom()
.map(_.toSet.toSeq.length)
.take(5)After you run this, you will see the count of results in each partition. The second two numbers will vary, because we’re distributing them randomly (as you will see when we do the same in Python) but the same principles apply:
def partitionFunc(key):
import random
if key == 17850 or key == 12583:
return 0
else:
return random.randint(1,2)
keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD\
.partitionBy(3, partitionFunc)\
.map(lambda x: x[0])\
.glom()\
.map(lambda x: len(set(x)))\
.take(5)This custom key distribution logic is available only at the RDD level. Of course, this is a simple example, but it does show the power of using arbitrary logic to distribute the data around the cluster in a physical manner.
Custom Serialization
The issue of Kryo serialization. Any object that you hope to parallelize (or function) must be serializable:
class SomeClass extends Serializable {
var someValue = 0
def setSomeValue(i:Int) = {
someValue = i
this
}
}
sc.parallelize(1 to 10)
.map(num => new SomeClass().setSomeValue(num))The default serialization can be quite slow. Spark can use the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
Initializing to use Kryo
You can use Kryo by initializing your job with a SparkConf and setting the value of
spark.serializertoorg.apache.spark.serializer.KryoSerializer
INFO
This setting configures the serializer used for shuffling data between worker nodes and serializing RDDs to disk.
Why is the Kryo not default serializer?
The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application. Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
To register your own custom classes with Kryo, use the registerKryoClasses method:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)References
- Spark: The Definitive Guide by Bill Chambers and Matei Zaharia.
