[Spark By Example] Schema

Spark can infer the data structure, but you can explicitly specify the data by providing the Schema to the DataFrame.

Schema

  • You can use “StructType” to define Schema.
  • You can define nested “StructType”s.
  • The “StructType” is a collection of “StructField“.
  • The “StructField” defines a column
    • name
    • data type
    • nullable
  • “StructField” Data Types
    • StringType
    • BooleanType
    • IntegerType
    • BinaryType
    • ArrayType
    • MapType

Data File

  • Create a json file with the following content.
[
    { 
      "id": 1, 
      "category":"Appliances", 
      "noTax": false,
      "manager": { "firstname": "Paul", "lastname": "Henderson" },
      "products": ["Washer","Dryer","Refrigerator"]
    },
    { 
      "id": 2, 
      "category":"Clothing", 
      "manager": { "firstname": "Grace", "lastname": "Carr" },
      "products": ["Sweater","Jacket"]
    },
    { 
      "id": 3, 
      "category":"Grocery", 
      "noTax": true,
      "manager": { "firstname": "Julia", "lastname": "Jackson" },
      "products": ["Bread","Coffee","Milk"]
    }
  ]

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

if __name__ == "__main__":
  
  # create a session with the builder
  spark = (SparkSession
          .builder
          .appName("dataframe-schema")
          .getOrCreate())
 
  data_file = "test/data/products.json"

   # schema
  custom_schema = StructType([
        StructField('id', IntegerType(), False),
        StructField('category', StringType(), False),
        StructField('noTax', BooleanType(), True),
        StructField('manager', StructType([
            StructField('firstname', StringType(), False),
            StructField('lastname', StringType(), False)
        ]), False),
        StructField('products', ArrayType(StringType()), True)])

  # all data
  df = (spark.read
    .format('json')
    .option('multiline', True) # important!
    .schema(custom_schema)
    .load(data_file))
  df.show()

  # products with no tax
  df_no_tax = df.where(
    col('noTax')
  ).show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
using Microsoft.Spark.Sql.Types;

namespace MySpark.Examples.Basics
{
    internal class DataFrameSchema
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession.Builder()
                .AppName("dataframe-schema")
                .GetOrCreate();

            string filePath = "data/products.json";

            // schema
            StructField[] fields = {
                new StructField("id", new IntegerType(), false),
                new StructField("category", new StringType(), false),
                new StructField("noTax", new BooleanType(), true),
                new StructField("manager", new StructType( new StructField[]{
                    new StructField("firstname", new StringType(), false),
                    new StructField("lastname", new StringType(), false)
                    }), false),
                new StructField("products", new ArrayType(new StringType()), true)
            };
            StructType schema = new StructType(fields);

            // all data
            DataFrame df = spark.Read()
                .Format("json")
                .Option("multiline", true)
                .Schema(schema)
                .Load(filePath);
            df.Show();

            // products with no tax
            DataFrame dfNoTax = df
                .Where(Col("noTax"));
            dfNoTax.Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s