Working with Boolean

Spark will flatten all of these filters into one statement and perform the filter at the same time, creating the and statement for us.

from pyspark.sql.functions import instr 
priceFilter = col("UnitPrice") > 600 
descripFilter = instr(df.Description, "POSTAGE") >= 1 df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

Boolean expressions are not just reserved to filters. To filter a DataFrame, you can also just specify a Boolean column:

from pyspark.sql.functions import instr 
DOTCodeFilter = col("StockCode") == "DOT" 
priceFilter = col("UnitPrice") > 600 
descripFilter = instr(col("Description"), "POSTAGE") >= 1 df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\ .where("isExpensive")\ .select("unitPrice", "isExpensive").show(5)
SELECT UnitPrice, (StockCode = 'DOT' AND 
		(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive FROM dfTable 
WHERE (StockCode = 'DOT' AND 
	   (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

Working with null data

One “gotcha” that can come up is if you’re working with null data when creating Boolean expressions. If there is a null in your data, you’ll need to treat things a bit differently. Here’s how you can ensure that you perform a null-safe equivalence test:

df.where(col("Description").eqNullSafe("hello")).show()

Working with numbers

We can do all of this as a SQL expression, as well:

df.selectExpr(
	"CustomerId", 
	"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity 
FROM dfTable

There are a number of statistical functions available in the StatFunctions Package

df.stat

Working with Strings

The initcap function will capitalize every word in a given string when that word is separated from another by a space.

from pyspark.sql.functions import initcap df.select(initcap(col("Description"))).show()

you can cast strings in uppercase and lowercase, as well:

from pyspark.sql.functions import lower, upper 
df.select(
	col("Description"), 
	lower(col("Description")), 
	upper(lower(col("Description")))).show(2)

Another trivial task is adding or removing spaces around a string. You can do this by using lpad, ltrim, rpad and rtrim, trim:

from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim 
df.select( 
	ltrim(lit(" HELLO ")).alias("ltrim"), 
	rtrim(lit(" HELLO ")).alias("rtrim"), 
	trim(lit(" HELLO ")).alias("trim"), 
	lpad(lit("HELLO"), 3, " ").alias("lp"), 
	rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)
 
#+---------+---------+-----+---+----------+ 
#|    ltrim|    rtrim| trim| lp|        rp| 
#+---------+---------+-----+---+----------+ 
#|   HELLO |    HELLO|HELLO| HE|    HELLO | 
#|   HELLO |    HELLO|HELLO| HE|    HELLO | 
#+---------+---------+-----+---+----------+

Regular Expressions There are two key functions in Spark that you’ll need in order to perform regular expression tasks: regexp_extract and regexp_replace. These functions extract values and replace values, respectively.

from pyspark.sql.functions import regexp_replace 
regex_string = "BLACK|WHITE|RED|GREEN|BLUE" 
df.select( 
	regexp_replace(col("Description"), 
	regex_string, "COLOR").alias("color_clean"), 
	col("Description")).show(2)
 
#+--------------------+--------------------+ 
#|         color_clean|         Description| 
#+--------------------+--------------------+ 
#|COLOR HANGING HEA...|WHITE HANGING HEA...| 
#| COLOR METAL LANTERN| WHITE METAL LANTERN| 
#+--------------------+--------------------+
from pyspark.sql.functions import regexp_extract 
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)" 
df.select( 
	regexp_extract(col("Description"), extract_str, 1).alias("color_clean"), 
	col("Description")).show(2)
 
#+-------------+--------------------+ 
#|  color_clean|         Description| 
#+-------------+--------------------+ 
#|        WHITE|WHITE HANGING HEA...| 
#|        WHITE| WHITE METAL LANTERN| 
#+-------------+--------------------+

Sometimes, rather than extracting values, we simply want to check for their existence. We can do this with the contains method on each column.

from pyspark.sql.functions import instr 
containsBlack = instr(col("Description"), "BLACK") >= 1 
containsWhite = instr(col("Description"), "WHITE") >= 1 df.withColumn("hasSimpleColor", containsBlack | containsWhite)\ 
.where("hasSimpleColor")\ 
.select("Description").show(3, False)
 
#+----------------------------------+ 
#|                      Description | 
#+----------------------------------+ 
#|WHITE HANGING HEART T-LIGHT HOLDER| 
#|              WHITE METAL LANTERN | 
#|   RED WOOLLY HOTTIE WHITE HEART. |
#+----------------------------------+

When we convert a list of values into a set of arguments and pass them into a function, we use a language feature called varargs. Using this feature, we can effectively unravel an array of arbitrary length and pass it as arguments to a function. This, coupled with select makes it possible for us to create arbitrary numbers of columns dynamically:

from pyspark.sql.functions import expr, locate 
simpleColors = ["black", "white", "red", "green", "blue"] 
 
def color_locator(column, color_string): 
	return locate(color_string.upper(), column)\ 
	.cast("boolean")\ 
	.alias("is_" + c) 
 
selectedColumns = [color_locator(df.Description, c) for c in simpleColors] selectedColumns.append(expr("*")) # has to a be Column type 
df.select(*selectedColumns).where(expr("is_white OR is_red"))\ .select("Description").show(3, False)

Working with Dates and Timestamps

Timezone

There are a lot of caveats, unfortunately, when working with dates and timestamps, especially when it comes to timezone handling. In version 2.1 and before, Spark parsed according to the machine’s timezone if timezones are not explicitly specified in the value that you are parsing. You can set a session local timezone if necessary by setting spark.conf.sessionLocalTimeZone in the SQL configurations. This should be set according to the Java TimeZone format.

Spark is working with Java dates and timestamps and therefore conforms to those standards. Let’s begin with the basics and get the current date and the current timestamps:

from pyspark.sql.functions import current_date, current_timestamp 
dateDF = spark.range(10)\ 
.withColumn("today", current_date())\ 
.withColumn("now", current_timestamp()) dateDF.createOrReplaceTempView("dateTable") 
dateDF.printSchema() 
 
#root 
#|-- id: long (nullable = false) 
#|-- today: date (nullable = false) 
#|-- now: timestamp (nullable = false)

let’s add and subtract five days from today.

from pyspark.sql.functions import date_add, date_sub dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

Another common task is to take a look at the difference between two dates. We can do this with the datediff function that will return the number of days in between two dates.

from pyspark.sql.functions import datediff, months_between, to_date 
 
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\ .select(datediff(col("week_ago"), col("today"))).show(1) 
dateDF.select( 
	to_date(lit("2016-01-01")).alias("start"), 
	to_date(lit("2017-05-22")).alias("end"))\ .select(months_between(col("start"), col("end"))).show(1)

The to_date function allows you to convert a string to a date, optionally with a specified format.

from pyspark.sql.functions import to_date, lit 
 
spark.range(5).withColumn("date", lit("2017-01-01"))\ 
.select(to_date(col("date"))).show(1)

How does the spark face off to parse date?

Spark will not throw an error if it cannot parse the date; rather, it will just return null. This can be a bit tricky in larger pipelines because you might be expecting your data in one format and getting it in another.

dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1) 
 
#+-------------------+-------------------+ 
#|to_date(2016-20-12)|to_date(2017-12-11)| 
#+-------------------+-------------------+ 
#|               null|         2017-12-11|
#+-------------------+-------------------+

In the previous example, notice how the second date appears as Decembers 11th instead of the correct day, November 12th. Spark doesn’t throw an error because it cannot know whether the days are mixed up or that specific row is incorrect. We will use two functions to fix this: to_date and to_timestamp. The former optionally expects a format, whereas the latter requires one:

from pyspark.sql.functions import to_date 
 
dateFormat = "yyyy-dd-MM" 
cleanDateDF = spark.range(1).select( 
				to_date(lit("2017-12-11"), dateFormat).alias("date"), 
				to_date(lit("2017-20-12"), dateFormat).alias("date2")) 
				
cleanDateDF.createOrReplaceTempView("dateTable2")

Now let’s use an example of to_timestamp, which always requires a format to be specified:

from pyspark.sql.functions import to_timestamp 
 
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()

After we have our date or timestamp in the correct format and type, comparing between them is actually quite easy. We just need to be sure to either use a date/timestamp type or specify our string according to the right format of yyyy-MM-dd if we’re comparing a date:

cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()

One minor point is that we can also set this as a string, which Spark parses to a literal:

cleanDateDF.filter(col("date2") > "'2017-12-12'").show()

Cast date or timestamp

Implicit type casting is an easy way to shoot yourself in the foot, especially when dealing with null values or dates in different timezones or formats. We recommend that you parse them explicitly instead of relying on implicit conversions.

Working with nulls

Spark can optimize working with null values more than it can if you use empty strings or other values.

Warning

Nulls are a challenging part of all programming, and Spark is no exception. In our opinion, being explicit is always better than being implicit when handling null values.

When we declare a column as not having a null time, that is not actually enforced. To reiterate, when you define a schema in which all columns are declared to not have null values, Spark will not enforce that and will happily let null values into that column. The nullable signal is simply to help Spark SQL optimize for handling that column. If you have null values in columns that should not have null values, you can get an incorrect result or see strange exceptions that can be difficult to debug.

There are two things you can do with null values: you can explicitly drop nulls or you can fill them with a value. Spark includes a function to allow you to select the first non-null value from a set of columns by using the coalesce function.

from pyspark.sql.functions import coalesce 
 
df.select(coalesce(col("Description"), col("CustomerId"))).show()

ifnull, nullIf, nvl, and nvl2

There are several other SQL functions that you can use to achieve similar things. ifnull allows you to select the second value if the first is null, and defaults to the first. Alternatively, you could use nullif, which returns null if the two values are equal or else returns the second if they are not. nvl returns the second value if the first is null, but defaults to the first. Finally, nvl2 returns the second value if the first is not null; otherwise, it will return the last specified value (else_value in the following example):

SELECT 
	ifnull(null, 'return_value'), 
	nullif('value', 'value'),
	nvl(null, 'return_value'), 
	nvl2('not_null', 'return_value', "else_value") 
FROM dfTable 
LIMIT 1
 
+------------+----+------------+------------+ 
|           a|   b|           c|           d|
+------------+----+------------+------------+ |return_value|null|return_value|return_value| 
+------------+----+------------+------------+

drop

The simplest function is drop, which removes rows that contain nulls. The default is to drop any row in which any value is null:

df.na.drop()

Specifying any as an argument drops a row if any of the values are null. Using all drops the row only if all values are null or NaN for that row:

df.na.drop("any")
df.na.drop("all")

fill

Using the fill function, you can fill one or more columns with a set of values. This can be done by specifying a map—that is a particular value and a set of columns. For example, to fill all null values in columns of type String, you might specify the following:

df.na.fill("All Null values become this string")
 
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"} 
df.na.fill(fill_cols_vals)

replace

In addition to replacing null values like we did with drop and fill, there are more flexible options that you can use with more than just null values.

df.na.replace([""], ["UNKNOWN"], "Description")

Working with Complex Types

Structs

from pyspark.sql.functions import struct 
 
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex")) 
complexDF.createOrReplaceTempView("complexDF")
 
# df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

We now have a DataFrame with a column complex. We can query it just as we might another DataFrame, the only difference is that we use a dot syntax to do so, or the column method getField:

complexDF.select("complex.Description") 
complexDF.select(col("complex").getField("Description"))

We can also query all values in the struct by using *. This brings up all the columns to the toplevel DataFrame:

complexDF.select("complex.*") 
 
#-- in SQL 
#SELECT complex.* FROM complexDF

Arrays

split

from pyspark.sql.functions import split 
 
df.select(split(col("Description"), " ")).show(2)
 
#+---------------------+ 
#| split(Description, )|
#+---------------------+ 
#| [WHITE, HANGING, ...| 
#| [WHITE, METAL, LA...| 
#+---------------------+
SELECT split(Description, ' ') FROM dfTable

To manipulate the array type

df.select(split(col("Description"), " ").alias("array_col"))\ .selectExpr("array_col[0]").show(2)
 
#+------------+ 
#|array_col[0]| 
#+------------+ 
#|       WHITE|
#|       WHITE|
#+------------+
SELECT split(Description, ' ')[0] FROM dfTable

Array Length

from pyspark.sql.functions import size
 
df.select(size(split(col("Description"), " "))).show(2) # shows 5 and 3

array_contains

from pyspark.sql.functions import array_contains 
 
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
 
#+--------------------------------------------+ 
#| array_contains(split(Description, ), WHITE)| 
#+--------------------------------------------+
#|                                        true|
#|                                        true|
#+--------------------------------------------+
SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable

explode The explode function takes a column that consists of arrays and creates one row (with the rest of the values duplicated) per value in the array.

from pyspark.sql.functions import split, explode 
 
df.withColumn("splitted", split(col("Description"), " "))\ 
.withColumn("exploded", explode(col("splitted")))\ 
.select("Description", "InvoiceNo", "exploded").show(2)
SELECT Description, InvoiceNo, exploded 
FROM (SELECT *, split(Description, " ") as splitted FROM dfTable) 
LATERAL VIEW explode(splitted) as exploded

Maps

Maps are created by using the map function and key-value pairs of columns. You then can select them just like you might select from an array:

from pyspark.sql.functions import create_map 
 
df.select(create_map(col("Description"), col("InvoiceNo")).alias("complex_map"))\ 
.show(2)
 
#+--------------------+ 
#|         complex_map|
#+--------------------+ 
#|Map(WHITE HANGING...|
#|Map(WHITE METAL L...| 
#+--------------------+
SELECT map(Description, InvoiceNo) as complex_map 
FROM dfTable WHERE Description IS NOT NULL

You can query them by using the proper key. A missing key returns null:

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
 
#+--------------------------------+ 
#|complex_map[WHITE METAL LANTERN]| 
#+--------------------------------+ 
#|                            null|
#|                          536365| 
#+--------------------------------+

You can also explode map types, which will turn them into columns:

df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\ 
.selectExpr("explode(complex_map)").show(2)
 
#+--------------------+------+ 
#|                 key| value| 
#+--------------------+------+
#|WHITE HANGING HEA...|536365|
#| WHITE METAL LANTERN|536365|
#+--------------------+------+

Working with JSON

You can operate directly on strings of JSON in Spark and parse from JSON or extract JSON objects.

jsonDF = spark.range(1).selectExpr("""
	'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")

You can use the get_json_object to inline query a JSON object, be it a dictionary or array. You can use json_tuple if this object has only one level of nesting:

from pyspark.sql.functions import get_json_object, json_tuple 
 
jsonDF.select( 
	get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column", 
	json_tuple(col("jsonString"), "myJSONKey")).show(2)
 
#+------+--------------------+ 
#|column|                  c0|
#+------+--------------------+ 
#|     2|{"myJSONValue":[1...| 
#+------+--------------------+
jsonDF.selectExpr( 
	"json_tuple(jsonString, '$.myJSONKey.myJSONValue[1]') as column").show(2)

You can also turn a StructType into a JSON string by using the to_json function:

from pyspark.sql.functions import to_json 
 
df.selectExpr("(InvoiceNo, Description) as myStruct")\ 
.select(to_json(col("myStruct")))

This function also accepts a dictionary (map) of parameters that are the same as the JSON data source. You can use the from_json function to parse this (or other JSON data) back in. This naturally requires you to specify a schema, and optionally you can specify a map of options, as well:

from pyspark.sql.functions import from_json 
from pyspark.sql.types import * 
 
parseSchema = StructType(( 
	StructField("InvoiceNo",StringType(),True), 
	StructField("Description",StringType(),True)))
 
df.selectExpr("(InvoiceNo, Description) as myStruct")\ 
.select(to_json(col("myStruct")).alias("newJSON"))\ 
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)
 
#+----------------------+--------------------+ 
#|jsontostructs(newJSON)|             newJSON|
#+----------------------+--------------------+ 
#|  [536365,WHITE HAN...|{"InvoiceNo":"536...| 
#|  [536365,WHITE MET...|{"InvoiceNo":"536...| 
#+----------------------+--------------------+

User-Defined Functions

UDF operate on the data record by record By default, these functions are registered as temporary functions to be used in that specific SparkSession or Context.

Let’s write a power3 function that takes a number and raises it to a power of three:

udfExampleDF = spark.range(5).toDF("num") 
 
def power3(double_value): 
	return double_value ** 3 
 
power3(2.0)

Registering functions with Spark

Spark will serialize the function on the driver and transfer it over the network to all executor processes. This happens regardless of language.

Important

When you use the function, there are essentially two different things that occur.

  • If the function is written in Scala or Java, you can use it within the Java Virtual Machine (JVM). This means that there will be little performance penalty aside from the fact that you can’t take advantage of code generation capabilities that Spark has for built-in functions. There can be performance issues if you create or use a lot of objects.
  • If the function is written in Python, Spark starts a Python process on the worker, serializes all of the data to a format that Python can understand, executes the function row by row on that data in the Python process, and then finally returns the results of the row operations to the JVM and Spark.

UDF with python

Starting this Python process is expensive, but the real cost is in serializing the data to Python. This is costly for two reasons:

  • It is an expensive computation
  • After the data enters Python, Spark cannot manage the memory of the worker. This means that you could potentially cause a worker to fail if it becomes resource constrained (because both the JVM and Python are competing for memory on the same machine).

Optimize UDF with python

We recommend that you write your UDFs in Scala or Java—the small amount of time it should take you to write the function in Scala will always yield significant speed ups, and on top of that, you can still use the function from Python

let’s work through an example:

from pyspark.sql.functions import udf, col
 
power3udf = udf(power3)
 
udfExampleDF.select(power3udf(col("num"))).show(2)

we can also register this UDF as a Spark SQL function. This is valuable because it makes it simple to use this function within SQL as well as across languages.

#in Scala 
spark.udf.register("power3", power3(_:Double):Double) 
 
udfExampleDF.selectExpr("power3(num)").show(2)

We can also register our Python function to be available as a SQL function and use that in any language, as well. To ensure that our functions are working correctly is specify a return type.

Define spark type with python

Spark manages its own type information, which does not align exactly with Python’s types. Therefore, it’s a best practice to define the return type for your function when you define it. If you specify the type that doesn’t align with the actual type returned by the function, Spark will not throw an error but will just return null to designate a failure.

from pyspark.sql.types import IntegerType, DoubleType
 
spark.udf.register("power3py", power3, DoubleType())
 
udfExampleDF.selectExpr("power3py(num)").show(2)

This is because the range creates integers. When integers are operated on in Python, Python won’t convert them into floats (the corresponding type to Spark’s double type), therefore we see null. We can remedy this by ensuring that our Python function returns a float instead of an integer and the function will behave correctly. Naturally, we can use either of these from SQL, too, after we register them. When you want to optionally return a value from a UDF, you should return None in Python and an Option type in Scala. How to use PySpark with Scala or Java UDFs?

Default number of partitions in Spark

The default number of partitions in Spark is 200, which is defined by the configuration setting spark.sql.shuffle.partitions. This number can be adjusted based on your specific needs and the size of your dataset.

How can I specify the number of partitions in an RDD?

You can specify the number of partitions when creating an RDD using the parallelize(), textFile(), or wholeTextFiles() methods. For example, you can use spark.sparkContext.parallelize(data, 4) to create an RDD with 4 partitions.

Can I use Spark coalesce() to increase the number of partitions?

No, Spark coalesce() can only be used to decrease the number of partitions. If you need to increase the number of partitions, you should use Spark repartition() instead.

References