Spark can infer the data structure, but you can explicitly specify the data by providing the Schema to the DataFrame.
Schema
- You can use “StructType” to define Schema.
- You can define nested “StructType”s.
- The “StructType” is a collection of “StructField“.
- The “StructField” defines a column
- name
- data type
- nullable
- “StructField” Data Types
- StringType
- BooleanType
- IntegerType
- BinaryType
- ArrayType
- MapType
Data File
- Create a json file with the following content.
[
{
"id": 1,
"category":"Appliances",
"noTax": false,
"manager": { "firstname": "Paul", "lastname": "Henderson" },
"products": ["Washer","Dryer","Refrigerator"]
},
{
"id": 2,
"category":"Clothing",
"manager": { "firstname": "Grace", "lastname": "Carr" },
"products": ["Sweater","Jacket"]
},
{
"id": 3,
"category":"Grocery",
"noTax": true,
"manager": { "firstname": "Julia", "lastname": "Jackson" },
"products": ["Bread","Coffee","Milk"]
}
]
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
if __name__ == "__main__":
# create a session with the builder
spark = (SparkSession
.builder
.appName("dataframe-schema")
.getOrCreate())
data_file = "test/data/products.json"
# schema
custom_schema = StructType([
StructField('id', IntegerType(), False),
StructField('category', StringType(), False),
StructField('noTax', BooleanType(), True),
StructField('manager', StructType([
StructField('firstname', StringType(), False),
StructField('lastname', StringType(), False)
]), False),
StructField('products', ArrayType(StringType()), True)])
# all data
df = (spark.read
.format('json')
.option('multiline', True) # important!
.schema(custom_schema)
.load(data_file))
df.show()
# products with no tax
df_no_tax = df.where(
col('noTax')
).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
using Microsoft.Spark.Sql.Types;
namespace MySpark.Examples.Basics
{
internal class DataFrameSchema
{
public static void Run()
{
SparkSession spark =
SparkSession.Builder()
.AppName("dataframe-schema")
.GetOrCreate();
string filePath = "data/products.json";
// schema
StructField[] fields = {
new StructField("id", new IntegerType(), false),
new StructField("category", new StringType(), false),
new StructField("noTax", new BooleanType(), true),
new StructField("manager", new StructType( new StructField[]{
new StructField("firstname", new StringType(), false),
new StructField("lastname", new StringType(), false)
}), false),
new StructField("products", new ArrayType(new StringType()), true)
};
StructType schema = new StructType(fields);
// all data
DataFrame df = spark.Read()
.Format("json")
.Option("multiline", true)
.Schema(schema)
.Load(filePath);
df.Show();
// products with no tax
DataFrame dfNoTax = df
.Where(Col("noTax"));
dfNoTax.Show();
spark.Stop();
}
}
}