Overview
The spark’s core data sources:
- CSV
- JSON
- Parquet
- ORC
- JDBC/ODBC connections
- Plain-text files And spark have numerous community-created data sources.
The Structure of the Data Sources API
Read API Structure
The core structure for reading data is as follows:
DataFrameReader.format(...).option("key", "value").schema(...).load()We will use this format to read from all of our data sources.
format is optional because by default Spark will use the Parquet format.
option allows you to set key-value configurations to parameterize how you will read data.
schema is optional if the data source provides a schema or if you intend to use schema inference.
Basics of Reading Data
The foundation for reading data in Spark is the DataFrameReader. We access this through the SparkSession via the read attribute:
spark.readAfter we have a DataFrame reader, we specify several values:
- The
format - The
schema - The read
mode - A series of
optionsThe format, options, and schema each return aDataFrameReader
spark.read.format("csv")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema) .load()Read mode
| Read mode | Description |
|---|---|
| permissive | Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column called _corrupt_record |
| dropMalformed | Drops the row that contains malformed records |
| failFast | Fails immediately upon encountering malformed records |
| The default is permissive. |
Write API Structure
The core structure for writing data is as follows:
DataFrameWriter
.format(...)
.option(...)
.partitionBy(...)
.bucketBy(...)
.sortBy( ...)
.save()We will use this format to write to all of our data sources.
formatis optional because by default, Spark will use the parquet format.option, again, allows us to configure how to write out our given data.partitionBy,bucketBy, andsortBywork only for file-based data sources; you can use them to control the specific layout of files at the destination.
Basics of Writing Data
we access the DataFrameWriter on a per-DataFrame basis via the write attribute:
dataFrame.writeAfter we have a DataFrameWriter, we specify three values: the format, a series of options, and the save mode. At a minimum, you must supply a path. We will cover the potential for options, which vary from data source to data source, shortly.
dataframe.write.format("csv")
.option("mode", "OVERWRITE")
.option("dateFormat", "yyyy-MM-dd")
.option("path", "path/to/file(s)")
.save()Save modes
| Save mode | Description |
|---|---|
| append | Appends the output files to the list of files that already exist at that location |
| overwrite | Will completely overwrite any data that already exists there |
| errrorIfExists | Throws an error and fails the write if data or files already exist at the specified location |
| ignore | If data or files exist at the location, do nothing with the current DataFrame |
The default is errorIfExists. |
CSV Files
CSV Options
| Read/Write | Key | Potential Values | Default | Description |
|---|---|---|---|---|
| Both | sep | Any single string character | , | The single character that is used as a separator for each field and value. |
| Both | header | true, false | false | A Boolean flag that declares whether the first line in the file(s) are the names of the columns. |
| Read | escape | Any string character | \ | The character Spark should use to escape other characters in the file. |
| Read | inferSchema | true, false | false | Specifies whether Spark should infer column types when reading the file. |
| Read | ignoreLeadingWhiteSpace | true, false | false | Declares whether leading spaces from values being read should be skipped. |
| Read | ignoreTrailingWhiteSpace | true, false | false | Declares whether trailing spaces from values being read should be skipped. |
| Both | nullValue | Any string character | “” | Declares what character represents a null value in the file. |
| Both | nanValue | Any string character | NaN | Declares what character represents a NaN or missing character in the CSV file. |
| Both | positiveInf | Any string or character | Inf | Declares what character(s) represent a positive infinite value. |
| Both | negativeInf | Any string or character | -Inf | Declares what character(s) represent a negative infinite value. |
| Both | compressionCodec | None, uncompressed, bzip2, deflate, gzip, lz4, or snappy | none | Declares what compression codec Spark should use to read or write the file. |
| Both | dateFormat | Any string or character that conforms to java’s SimpleDataFormat. | yyyy-MM-dd | Declares the date format for any columns that are date type. |
| Both | timestampFormat | Any string or character that conforms to java’s SimpleDataFormat. | yyyy-MM-dd’T’HH:mm:ss.SSSZZ | Declares the timestamp format for any columns that are timestamp type. |
| Read | maxColumns | Any integer | 20480 | Declares the maximum number of columns in the file. |
| Read | maxCharsPerColumn | Any integer | 1000000 | Declares the maximum number of characters in a column. |
| Read | escapeQuotes | true, false | true | Declares whether Spark should escape quotes that are found in lines. |
| Read | maxMalformedLogPerPartition | Any integer | 10 | Sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored. |
| Write | quoteAll | true, false | FALSE | Specifies whether all values should be enclosed in quotes, as opposed to just escaping values that have a quote character. |
| Read | multiline | true, false | FALSE | This option allows you to read multiline CSV files where each logical row in the CSV file might span multiple rows in the file itself. |
Reading CSV Files
Example We’ll set the header to true for our CSV file, the mode to be FAILFAST, and inferSchema to true:
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.option("inferSchema", "true")
.load("some/path/to/file.csv")we can use the mode to specify how much tolerance we have for malformed data.
import pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType, true),
StructField("ORIGIN_COUNTRY_NAME", StringType, true),
StructField("count", LongType, false)
])
spark.read.format("csv")
.option("header", "true")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/data/flight-data/csv/2010-summary.csv") .show(5)In general, Spark will fail only at job execution time rather than DataFrame definition time— even if, for example, we point to a file that does not exist.
Writing CSV Files
This is a subset of the reading options because many do not apply when writing data (like maxColumns and inferSchema).
csvFile = spark.read.format("csv")\
.option("header", "true")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/data/flight-data/csv/2010-summary.csv")
# Write to TSV
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
.save("/tmp/my-tsv-file.tsv")The output files will be seen at my-tsv-file, but that actually a folder with numerous files within it:
ls /tmp/my-tsv-file.tsv/
/tmp/my-tsv-file.tsv/part-00000-35cf9453-1943-4a8c-9c82-9f6ea9742b29.csvThis actually reflects the number of partitions in our DataFrame at the time we write it out. If we were to repartition our data before then, we would end up with a different number of files.
JSON Files
The line-delimited versus multiline trade-off is controlled by a single option: multiLine.
When you set this option to true, you can read an entire file as one json object and Spark will go through the work of parsing that into a DataFrame.
The best format is Line-delimited.
JSON Options
Ref: https://spark.apache.org/docs/latest/sql-data-sources-json.html
Reading JSON Files
spark.read.format("json").option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/data/flight-data/json/2010-summary.json").show(5)Writing JSON Files
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")ls /tmp/my-json-file.json/
/tmp/my-json-file.json/part-00000-tid-543....jsonParquet Files
Parquet is an open source column-oriented data store that provides a variety of storage optimizations, especially for analytics workloads. It provides columnar compression, which saves storage space and allows for reading individual columns instead of entire files. It is a file format that works exceptionally well with Apache Spark and is in fact the default file format.
Reading Parquet Files
The parquet format enforces its own schema when storing data.
Oftentimes, we can set schema. However, with Parquet files, we can use inferSchema or not, which is powerful because the schema is built into the file itself.
spark.read.format("parquet")\
.load("/data/flight-data/parquet/2010-summary.parquet").show(5)Parquet Optionss
Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option
WARNING
Even though there are only two options, you can still encounter problems if you’re working with incompatible Parquet files. Be careful when you write out Parquet files with different versions of Spark (especially older ones) because this can cause significant headache.
Writing Parquet Files
csvFile.write.format("parquet").mode("overwrite")\
.save("/tmp/my-parquet-file.parquet")ORC Files
ORC is a self-describing, type-aware columnar file format designed for Hadoop workloads. It is optimized for large streaming reads, but with integrated support for finding required rows quickly. ORC actually has no options for reading in data because Spark understands the file format quite well.
What is the difference between ORC and Parquet?
The fundamental difference is that Parquet is further optimized for use with Spark, whereas ORC is further optimized for Hive.
Reading ORC Files
spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)Writing ORC Files
csvFile.write.format("orc").mode("overwrite").save("/tmp/my-json-file.orc")SQL Databases
To read and write from these databases, you need to do two things:
- The Java Database Connectivity (JDBC) driver
- The proper JAR for the driver itself For example, to be able to read and write from PostgreSQL:
./bin/spark-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jars postgresql-9.4.1207.jarJDBC Options
Ref: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option
Reading from SQL Databases
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://database_server")\
.option("dbtable", "schema.tablename")\
.option("user", "username")\
.option("password", "my-secret-password")\
.load()Schema
You’ll also notice that there is already a schema, as well. That’s because Spark gathers this information from the table itself and maps the types to Spark data types.
Query Pushdown
First, Spark makes a best-effort attempt to filter data in the database itself before creating the DataFrame. For example, in the previous sample query, we can see from the query plan that it selects only the relevant column name from the table:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain== Physical Plan ==
*HashAggregate(keys=[DEST_COUNTRY_NAME#8108], functions=[])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#8108, 200)
+- *HashAggregate(keys=[DEST_COUNTRY_NAME#8108], functions=[])
+- *Scan JDBCRelation(flight_info) [numPartitions=1] ...
Spark can actually do better than this on certain queries. For example, if we specify a filter on our DataFrame, Spark will push that filter down into the database. We can see this in the explain plan under PushedFilters.
dbDataFrame.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain()== Physical Plan ==
*Scan JDBCRel... PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])],
Note
Spark can’t translate all of its own functions into the functions available in the SQL database in which you’re working. Therefore, sometimes you’re going to want to pass an entire query into your SQL that will return the results as a DataFrame.
pushdownQuery = """
(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info
"""
dbDataFrame = spark.read.format("jdbc")\
.option("url", url)\
.option("dbtable", pushdownQuery)\
.option("driver", driver)\
.load()Spark doesn’t even know about the actual schema of the table, just the one that results from our previous query:
dbDataFrame.explain()== Physical Plan ==
*Scan JDBCRelation(
(SELECT DISTINCT(DEST_COUNTRY_NAME)
FROM flight_info) as flight_info
) [numPartitions=1] [DEST_COUNTRY_NAME#788] ReadSchema: ...
Reading from databases in parallel
Spark has an underlying algorithm that can read multiple files into one partition, or conversely, read multiple partitions out of one file, depending on the file size and the “splitability” of the file type and compression. When we are working with SQL must configure it a bit more manually. What you can configure, as seen in the previous options, is the ability to specify a maximum number of partitions to allow you to limit how much you are reading and writing in parallel:
dbDataFrame = spark.read.format("jdbc")\
.option("url", url)\
.option("dbtable", tablename)\
.option("driver", driver)\
.option("numPartitions", 10)\
.load()In this case, this will still remain as one partition because there is not too much data.
There are several other optimizations that unfortunately only seem to be under another API set. You can explicitly push predicates down into SQL databases through the connection itself. This optimization allows you to control the physical location of certain data in certain partitions by specifying predicates. Example: We only need data from two countries in our data: Anguilla and Sweden. We could filter these down and have them pushed into the database, but we can also go further by having them arrive in their own partitions in Spark. We do that by specifying a list of predicates when we create the data source
props = {"driver":"org.sqlite.JDBC"}
predicates = [
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"
]
spark.read.jdbc(url, tablename, predicates=predicates, properties=props).show() spark.read.jdbc(url,tablename,predicates=predicates,properties=props)\
.rdd.getNumPartitions() # 2
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| Sweden| United States| 65|
| United States| Sweden| 73|
| Anguilla| United States| 21|
| United States| Anguilla| 20|
+-----------------+-------------------+-----+Partitioning based on a sliding window
How we can partition based on predicates.
Example:
we specify a minimum and a maximum for both the first partition and last partition.
Anything outside of these bounds will be in the first partition or final partition. Then, we set the number of partitions we would like total (this is the level of parallelism).
Spark then queries our database in parallel and returns numPartitions partitions.
colName = "count"
lowerBound = 0L
upperBound = 348113L # this is the max count in our database
numPartitions = 10
props = {"driver":"org.sqlite.JDBC"}
spark.read.jdbc(url, tablename, column=colName, properties=props,
lowerBound=lowerBound, upperBound=upperBound,
numPartitions=numPartitions).count() # 255Writing to SQL Databases
newPath = "jdbc:sqlite://tmp/my-sqlite.db"
props = {"driver":"org.sqlite.JDBC"}
csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)
# append to the table
csvFile.write.jdbc(newPath, tablename, mode="append", properties=props)Text Files
Each line in the file becomes a record in the DataFrame.
Reading Text Files
You simply specify the type to be textFile.
With textFile, partitioned directory names are ignored.
To read and write text files according to partitions, you should use text, which respects partitioning on reading and writing:
spark.read.textFile("/data/flight-data/csv/2010-summary.csv")
.selectExpr("split(value, ',') as rows").show()
+--------------------+
| rows|
+--------------------+
|[DEST_COUNTRY_NAM...|
|[United States, R...|
...
|[United States, A...|
|[Saint Vincent an...|
|[Italy, United St...|
+--------------------+Writing Text Files
When you write a text file, you need to be sure to have only one string column; otherwise, the write will fail:
csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")If you perform some partitioning when performing your write, you can write more columns. However, those columns will manifest as directories in the folder to which you’re writing out to, instead of columns on every single file:
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count")\
.write.partitionBy("count").text("/tmp/five-csv-files2py.csv")Advanced I/O Concepts (Recommended)
We saw previously that we can control the parallelism of files that we write by controlling the partitions prior to writing. We can also control specific data layout by controlling two things: bucketing and partitioning.
Splittable File Types and Compression
Certain file formats are fundamentally splittable This can improve speed because it makes it possible for Spark to avoid reading an entire file, and access only the parts of the file necessary to satisfy your query. Additionally if you’re using something like Hadoop Distributed File System (HDFS), splitting a file can provide further optimization if that file spans multiple blocks. In conjunction with this is a need to manage compression. Not all compression schemes are splittable. How you store your data is of immense consequence when it comes to making your Spark jobs run smoothly. We recommend Parquet with gzip compression.
Reading Data in Parallel
Hint
Multiple executors cannot read from the same file at the same time necessarily, but they can read different files at the same time.
In general, this means that when you read from a folder with multiple files in it, each one of those files will become a partition in your DataFrame and be read in by available executors in parallel (with the remaining queueing up behind the others).
Writing Data in Parallel
The number of files or data written is dependent on the number of partitions the DataFrame has at the time you write out the data. By default, one file is written per partition of the data.
Writing a file
This means that although we specify a “file”, it’s actually a number of files within a folder, with the name of the specified file, with one file per each partition that is written.
csvFile.repartition(5).write.format("csv").save("/tmp/multiple.csv")will end up with five files inside of that folder. As you can see from the list call:
ls /tmp/multiple.csv
/tmp/multiple.csv/part-00000-767df509-ec97-4740-8e15-4e173d365a8b.csv /tmp/multiple.csv/part-00001-767df509-ec97-4740-8e15-4e173d365a8b.csv /tmp/multiple.csv/part-00002-767df509-ec97-4740-8e15-4e173d365a8b.csv /tmp/multiple.csv/part-00003-767df509-ec97-4740-8e15-4e173d365a8b.csv /tmp/multiple.csv/part-00004-767df509-ec97-4740-8e15-4e173d365a8b.csvPartitioning
Partitioning is a tool that allows you to control what data is stored (and where) as you write it. When you write a file to a partitioned directory (or table), you basically encode a column as a folder. What this allows you to do is skip lots of data when you go to read it in later, allowing you to read in only the data relevant to your problem instead of having to scan the complete dataset. These are supported for all file-based data sources:
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
.save("/tmp/partitioned-files.parquet")Upon writing, you get a list of folders in your Parquet “file”:
ls /tmp/partitioned-files.parquet
...
DEST_COUNTRY_NAME=Costa Rica/
DEST_COUNTRY_NAME=Egypt/
DEST_COUNTRY_NAME=Equatorial Guinea/
DEST_COUNTRY_NAME=Senegal/
DEST_COUNTRY_NAME=United States/Each of these will contain Parquet files that contain that data where the previous predicate was true:
ls /tmp/partitioned-files.parquet/DEST_COUNTRY_NAME=Senegal/
part-00000-tid.....parquetThis is probably the lowest-hanging optimization that you can use when you have a table that readers frequently filter by before manipulating.
Bucketing
Bucketing is another file organization approach with which you can control the data that is specifically written to each file.
Info
This can help avoid shuffles later when you go to read the data because data with the same bucket ID will all be grouped together into one physical partition. This means that the data is prepartitioned according to how you expect to use that data later on, meaning you can avoid expensive shuffles when joining or aggregating.
Rather than partitioning on a specific column (which might write out a ton of directories), it’s probably worthwhile to explore bucketing the data instead. This will create a certain number of files and organize our data into those “buckets”:
numberBuckets = 10
columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")ls /user/hive/warehouse/bucketedfiles/
part-00000-tid-1020575097626332666-8....parquet
part-00000-tid-1020575097626332666-8....parquet
part-00000-tid-1020575097626332666-8....parquet
...Bucketing is supported only for Spark-managed tables.
Writing Complex Types
CSV files do not support complex types, whereas Parquet and ORC do.
Managing File Size
Managing file sizes is an important factor not so much for writing data but reading it later on. When you’re writing lots of small files, there’s a significant metadata overhead that you incur managing all of those files. Spark especially does not do well with small files, although many file systems (like HDFS) don’t handle lots of small files well, either. You might hear this referred to as the “12. Spark small file problem.” The opposite is also true: you don’t want files that are too large either, because it becomes inefficient to have to read entire blocks of data when you need only a few rows.
Spark 2.2 introduced a new method for controlling file sizes in a more automatic way.
Now, you can take advantage of another tool in order to limit output file sizes so that you can target an optimum file size. You can use the maxRecordsPerFile option and specify a number of your choosing. This allows you to better control file sizes by controlling the number of records that are written to each file.
For example, if you set an option for a writer as Spark will ensure that files will contain at most 5,000 records.
df.write.option("maxRecordsPerFile", 5000)References
- Spark: The Definitive Guide by Bill Chambers and Matei Zaharia.
