DataFrameReader is an interface to load a DataFrame from external sources.
You cannot create the DataFrameReader object, but you can access it through the “SparkSession.read” property.
DataFrameReader
The basic syntax to use the DataFrameReader is
data_frame = (spark.read.format(...)
.option("key", value)
.option("key", value)
.schema(...)
.load(...))
- format: “csv”, “txt”, “json”, “parquet”, etc.
- option: (“header”, { true | false }), (“inferSchema”, { true | false }) …
- schema: StructType object
- load: {path}
One of the interesting options is the “mode.” The mode has the following values:
- PERMISSIVE (default): allowing corrupt records during parsing
- FAILFAST: throwing an exception for a corrupted record
- DROPMALFORMED: ignoring all corrupted records
Data File
- Create a csv file with the following content.
firstName,lastName,age
Paul,Henderson,20
Grace,Carr,32
Julia,Jackson,14
Alexandra,May,43
Anna,Jones,NaN
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("dataframereader")
.getOrCreate())
data_file = "test/data/people-error.csv"
# schema
custom_schema = StructType([
StructField("firstName", StringType(), False),
StructField("lastName", StringType(), False),
StructField("age", IntegerType(), False)])
# DataFrameReader
dataFrameReader = spark.read
# PERMISSIVE - default
print ("=== PERMISSIVE ===")
df_permissive = (dataFrameReader.format("csv")
.option("mode", "PERMISSIVE")
.option("header", True)
.schema(custom_schema)
.load(data_file))
df_permissive.show()
# DROPMALFORMED
print ("=== DROPMALFORMED ===")
df_dropmalformed = (dataFrameReader.format("csv")
.option("mode", "DROPMALFORMED")
.option("header", True)
.schema(custom_schema)
.load(data_file))
df_dropmalformed.show()
# FAILFAST
print ("=== FAILFAST ===")
df_failfast = (dataFrameReader.format("csv")
.option("mode", "FAILFAST")
.option("header", True)
.schema(custom_schema)
.load(data_file))
df_failfast.show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class PeopleDataFrameReader
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("dataframereader")
.GetOrCreate();
string filePath = "data/people-error.csv";
// schema
StructField[] fields = {
new StructField("firstName", new StringType(), false),
new StructField("lastName", new StringType(), false),
new StructField("age", new IntegerType(), false)
};
StructType schema = new StructType(fields);
DataFrameReader reader = spark.Read();
// PERMISSIVE - default
Console.WriteLine("=== PERMISSIVE ===");
DataFrame dfPermissive = reader
.Format("csv")
.Option("mode", "PERMISSIVE")
.Option("header", true)
.Schema(schema)
.Load(filePath);
dfPermissive.Show();
// DROPMALFORMED
Console.WriteLine("=== DROPMALFORMED ===");
DataFrame dfDropMalformed = reader
.Format("csv")
.Option("mode", "DROPMALFORMED")
.Option("header", true)
.Schema(schema)
.Load(filePath);
dfDropMalformed.Show();
// FAILFAST
Console.WriteLine("=== FAILFAST ===");
DataFrame dfFailfast = reader
.Format("csv")
.Option("mode", "FAILFAST")
.Option("header", true)
.Schema(schema)
.Load(filePath);
dfFailfast.Show();
spark.Stop();
}
}
}
[Note] With malformed data, the “FAILFAST” mode throws an exception like this:
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option ‘mode’ as ‘PERMISSIVE’.