Overview

There are two sets of low-level APIs: there is one for manipulating distributed data (RDDs), and another for distributing and manipulating distributed shared variables (broadcast variables and accumulators).

When to Use the Low-Level APIs?

In three situations:

  • You need some functionality that you cannot find in the higher-level APIs; for example, if you need very tight control over physical data placement across the cluster.
  • You need to maintain some legacy codebase written using RDDs.
  • You need to do some custom shared variable manipulation.

INFORMATION

When you’re calling a DataFrame transformation, it actually just becomes a set of RDD transformations.

How to Use the Low-Level APIs?

A SparkContext is the entry point for low-level API functionality. You access it through the SparkSession, which is the tool you use to perform computation across a Spark cluster.

spark.sparkContext

About RDDs

In short, an RDD represents an immutable, partitioned collection of records (dataset distributed into each worker) that can be operated on in parallel. Unlike DataFrames though, where each record is a structured row containing fields with a known schema, in RDDs the records are just Java, Scala, or Python objects of the programmer’s choosing. RDDs give you complete control.

HINT

Every record in an RDD is a just a Java or Python object. You can store anything you want in these objects, in any format you want.

Every manipulation and interaction between values must be defined by hand. 1. Why Apache Spark RDD is immutable?

Types of RDDs

Two types of RDDs:

  • The “generic” RDD type
  • A key-value RDD Internally, each RDD is characterized by five main properties:
  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g., to say that the RDD is hash partitioned)
  • Optionally, a list of preferred locations on which to compute each split (e.g., block locations for a Hadoop Distributed File System HDFS file)

The Partitioner

The Partitioner is probably one of the core reasons why you might want to use RDDs in your code. Specifying your own custom Partitioner can give you significant performance and stability improvements if you use it correctly.

NOTE

There is no concept of “rows” in RDDs; individual records are just raw Java/Scala/Python objects, and you manipulate those manually instead of tapping into the repository of functions that you have in the structured APIs.

HINT

For Scala and Java, the performance is for the most part the same, the large costs incurred in manipulating the raw objects.

Running Python RDDs equates to running Python user-defined functions (UDFs) row by row (We serialize the data to the Python process, operate on it in Python, and then serialize it back to the Java Virtual Machine (JVM))

Creating RDDs

Interoperating Between DataFrames, Datasets, and RDDs. One of the easiest ways to get RDDs is from an existing DataFrame or Dataset. Converting these to an RDD is simple: just use the rdd method on any of these data types.

spark.range(10).rdd

To operate on this data, you will need to convert this Row object to the correct data type or extract values out of it.

spark.range(10).toDF("id").rdd.map(lambda row: row[0])

You can use the same methodology to create a DataFrame or Dataset from an RDD.

spark.range(10).rdd.toDF()

From a Local Collection To create an RDD from a collection, you will need to use the parallelize method on a SparkContext (within a SparkSession). This turns a single node collection into a parallel collection. When creating this parallel collection, you can also explicitly state the number of partitions into which you would like to distribute this array. In this case, we are creating two partitions:

myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\ 
.split(" ") 
 
words = spark.sparkContext.parallelize(myCollection, 2)
 
# you can then name this RDD to show up in the Spark UI according to a given name
words.setName("myWords") 
words.name() # myWords

From Data Sources

# each record in the RDD represents a line in that text file or files.
spark.sparkContext.textFile("/some/path/withTextFiles")
 
# each file is a file that consists of a large JSON object or some document that you will operate on as an individual
spark.sparkContext.wholeTextFiles("/some/path/withTextFiles")

In this RDD, the name of the file is the first object and the value of the text file is the second string object.

Manipulating RDDs

You manipulate RDDs in much the same way that you manipulate DataFrames. The core difference being that you manipulate raw Java or Scala objects instead of Spark types. There is also a dearth of “helper” methods or functions that you can draw upon to simplify calculations. Rather, you must define each filter, map functions, aggregation, and any other manipulation that you want as a function.

Transformations

distinct

words.distinct().count()

filter

def startsWithS(individual): 
	return individual.startswith("S")
 
words.filter(lambda word: startsWithS(word)).collect()

map You specify a function that returns the value that you want, given the correct input. You then apply that, record by record.

words2 = words.map(lambda word: (word, word[0], word.startswith("S")))

You can subsequently filter on this by selecting the relevant Boolean value in a new function:

words2.filter(lambda record: record[2]).take(5)

flatMap Sometimes, each current row should return multiple rows, instead. For example, you might want to take your set of words and flatMap it into a set of characters. Because each word has multiple characters, you should use flatMap to expand it. flatMap requires that the ouput of the map function be an iterable that can be expanded:

words.flatMap(lambda word: list(word)).take(5)

sort

words.sortBy(lambda word: len(word) * -1).take(2)

Random Splits We can also randomly split an RDD into an Array of RDDs by using the randomSplit method, which accepts an Array of weights and a random seed:

fiftyFiftySplit = words.randomSplit([0.5, 0.5]) # This returns an array of RDDs

Actions

reduce

spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y) # 210

You can also use this to get something like the longest word in our set of words that we defined a moment ago. The key is just to define the correct function:

def wordLengthReducer(leftWord, rightWord): 
	if len(leftWord) > len(rightWord): 
		return leftWord 
	else: 
		return rightWord 
 
words.reduce(wordLengthReducer)

This reducer is a good example because you can get one of two outputs. Because the reduce operation on the partitions is not deterministic, you can have either “definitive” or “processing” (both of length 10) as the “left” word. This means that sometimes you can end up with one, whereas other times you end up with the other.

count, countApprox, countApproxDistinct, countByValue and countByValueApprox

# count
words.count()
 
# countApprox
confidence = 0.95 
timeoutMilliseconds = 400 
words.countApprox(timeoutMilliseconds, confidence)
 
# countApproxDistinct
words.countApproxDistinct(0.05)
 
# countByValue
words.countByValue()
 
# countByValueApprox
words.countByValueApprox(1000, 0.95)

first

words.first()

max and min

spark.sparkContext.parallelize(1 to 20).max()
spark.sparkContext.parallelize(1 to 20).min()

take take and its derivative methods take a number of values from your RDD. This works by first scanning one partition and then using the results from that partition to estimate the number of additional partitions needed to satisfy the limit. There are many variations on this function, such as takeOrdered, takeSample, and top. You can use takeSample to specify a fixed-size random sample from your RDD. You can specify whether this should be done by using withReplacement, the number of values, as well as the random seed. top is effectively the opposite of takeOrdered in that it selects the top values according to the implicit ordering:

words.take(5) 
words.takeOrdered(5) 
words.top(5) 
withReplacement = true 
numberToTake = 6 
randomSeed = 100L
words.takeSample(withReplacement, numberToTake, randomSeed)

Saving Files

IMPORTANT

Saving files means writing to plain-text files. With RDDs, you cannot actually “save” to a data source in the conventional sense.

You must iterate over the partitions in order to save the contents of each partition to some external database.

saveAsTextFile

words.saveAsTextFile("file:/tmp/bookTitle")

SequenceFiles A sequenceFile is a flat file consisting of binary key–value pairs. It is extensively used in MapReduce as input/output formats.

words.saveAsObjectFile("/tmp/my/sequenceFilePath")

Caching

By default, cache and persist only handle data in memory.

words.cache()

Checkpointing

One feature not available in the DataFrame API is the concept of checkpointing.

INFO

Checkpointing is the act of saving an RDD to disk so that future references to this RDD point to those intermediate partitions on disk rather than recomputing the RDD from its original source. This is similar to caching except that it’s not stored in memory, only disk. This can be helpful when performing iterative computation, similar to the use cases for caching

spark.sparkContext.setCheckpointDir("/some/path/for/checkpointing") 
words.checkpoint()

Pipe RDDs to System Commands

With pipe, you can return an RDD created by piping elements to a forked external process. The resulting RDD is computed by executing the given process once per partition. All elements of each input partition are written to a process’s stdin as lines of input separated by a newline. The resulting partition consists of the process’s stdout output, with each line of stdout resulting in one element of the output partition. A process is invoked even for empty partitions.

We can use a simple example and pipe each partition to the command wc. Each row will be passed in as a new line, so if we perform a line count, we will get the number of lines, one per partition:

words.pipe("wc -l").collect()

mapPartitions The previous command revealed that Spark operates on a per-partition basis when it comes to actually executing code. You also might have noticed earlier that the return signature of a map function on an RDD is actually MapPartitionsRDD. This is because map is just a row-wise alias for mapPartitions, which makes it possible for you to map an individual partition (represented as an iterator). That’s because physically on the cluster we operate on each partition individually (and not a specific row).

A simple example creates the value “1” for every partition in our data, and the sum of the following expression will count the number of partitions we have:

words.mapPartitions(lambda part: [1]).sum() # 2

Naturally, this means that we operate on a per-partition basis and allows us to perform an operation on that entire partition. This is valuable for performing something on an entire subdataset of your RDD. You can gather all values of a partition class or group into one partition and then operate on that entire group using arbitrary functions and controls.

Other functions similar to mapPartitions include mapPartitionsWithIndex. With this you specify a function that accepts an index (within the partition) and an iterator that goes through all items within the partition. The partition index is the partition number in your RDD, which identifies where each record in our dataset sits (and potentially allows you to debug). You might use this to test whether your map functions are behaving correctly:

def indexedFunc(partitionIndex, withinPartIterator): 
	return ["partition: {} => {}".format(partitionIndex, x) 
			for x in withinPartIterator] 
			
words.mapPartitionsWithIndex(indexedFunc).collect()

References