[Spark By Example] DataFrameReader

DataFrameReader is an interface to load a DataFrame from external sources.

You cannot create the DataFrameReader object, but you can access it through the “SparkSession.read” property.

DataFrameReader

The basic syntax to use the DataFrameReader is

data_frame = (spark.read.format(...)
          .option("key", value)
          .option("key", value)
          .schema(...)
          .load(...))
  • format: “csv”, “txt”, “json”, “parquet”, etc.
  • option: (“header”, { true | false }), (“inferSchema”, { true | false }) …
  • schema: StructType object
  • load: {path}

One of the interesting options is the “mode.” The mode has the following values:

  • PERMISSIVE (default): allowing corrupt records during parsing
  • FAILFAST: throwing an exception for a corrupted record
  • DROPMALFORMED: ignoring all corrupted records

Data File

  • Create a csv file with the following content.
firstName,lastName,age
Paul,Henderson,20
Grace,Carr,32
Julia,Jackson,14
Alexandra,May,43
Anna,Jones,NaN

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *

if __name__ == "__main__":

  spark = (SparkSession
        .builder
        .appName("dataframereader")
        .getOrCreate())

  data_file = "test/data/people-error.csv"

  # schema
  custom_schema = StructType([
        StructField("firstName", StringType(), False),
        StructField("lastName", StringType(), False),
        StructField("age", IntegerType(), False)])

  # DataFrameReader
  dataFrameReader = spark.read

  # PERMISSIVE - default
  print ("=== PERMISSIVE ===")
  df_permissive = (dataFrameReader.format("csv")
          .option("mode", "PERMISSIVE")
          .option("header", True)
          .schema(custom_schema)
          .load(data_file))
  df_permissive.show()

  # DROPMALFORMED
  print ("=== DROPMALFORMED ===")
  df_dropmalformed = (dataFrameReader.format("csv")
          .option("mode", "DROPMALFORMED")
          .option("header", True)
          .schema(custom_schema)
          .load(data_file))
  df_dropmalformed.show()

  # FAILFAST
  print ("=== FAILFAST ===")
  df_failfast = (dataFrameReader.format("csv")
          .option("mode", "FAILFAST")
          .option("header", True)
          .schema(custom_schema)
          .load(data_file))
  df_failfast.show()
  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class PeopleDataFrameReader
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("dataframereader")
                    .GetOrCreate();

            string filePath = "data/people-error.csv";

            // schema
            StructField[] fields = {
                new StructField("firstName", new StringType(), false),
                new StructField("lastName", new StringType(), false),
                new StructField("age", new IntegerType(), false)
            };
            StructType schema = new StructType(fields);

            DataFrameReader reader = spark.Read();

            // PERMISSIVE - default
            Console.WriteLine("=== PERMISSIVE ===");
            DataFrame dfPermissive = reader
                .Format("csv")
                .Option("mode", "PERMISSIVE")
                .Option("header", true)
                .Schema(schema)
                .Load(filePath);
            dfPermissive.Show();

            // DROPMALFORMED
            Console.WriteLine("=== DROPMALFORMED ===");
            DataFrame dfDropMalformed = reader
                .Format("csv")
                .Option("mode", "DROPMALFORMED")
                .Option("header", true)
                .Schema(schema)
                .Load(filePath);
            dfDropMalformed.Show();

            // FAILFAST
            Console.WriteLine("=== FAILFAST ===");
            DataFrame dfFailfast = reader
                .Format("csv")
                .Option("mode", "FAILFAST")
                .Option("header", true)
                .Schema(schema)
                .Load(filePath);
            dfFailfast.Show();

            spark.Stop();
        }
    }
}

[Note] With malformed data, the “FAILFAST” mode throws an exception like this:

org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option ‘mode’ as ‘PERMISSIVE’.

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s