The following sample code (by Python and C#) shows how to read JSON file with complex objects.
CSV handles the flat data structure. But with JSON, you can read complex data structure into the DataFrame.
Data File
- Create a json file with the following content.
[
{
"name": {
"firstName": "Carl",
"lastName": "Adams"
},
"phone": "101-123-4567",
"address": {
"street": "1 Main St.",
"city": "New York",
"state": "NY"
}
},
{
"name": {
"firstName": "Abigail",
"lastName": "Hills"
},
"phone": "401-799-1234",
"address": {
"street": "34 Timer Dr.",
"city": "Cambria",
"state": "CA"
}
},
{
"name": {
"firstName": "Anne",
"lastName": "Smith"
},
"phone": "343-100-1698",
"address": {
"street": "100 King St.",
"city": "El Paso",
"state": "TX"
}
},
{
"name": {
"firstName": "Eric",
"lastName": "Baker"
},
"phone": "771-324-2345",
"address": {
"street": "402 Bolman Court",
"city": "Southfield",
"state": "MI"
}
}
]
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("contacts")
.getOrCreate())
data_file = "test/data/contacts.json"
# schema
custom_schema = StructType([
StructField("name", StructType([
StructField("firstName", StringType(), False),
StructField("lastName", StringType(), False)
]), False),
StructField("phone", StringType(), False),
StructField("address", StructType([
StructField("street", StringType(), False),
StructField("city", StringType(), False),
StructField("state", StringType(), False)
]), False),
])
# read json with schema
contacts = (spark.read.format("json")
.option("multiline", True) # important!
.schema(custom_schema)
.load(data_file))
contacts = contacts.na.drop("all")
contacts.show()
contacts.printSchema()
# select nested values
contacts.withColumn("fullName",
(concat_ws(" ", col("name.firstName"), expr("name.lastName"))
)).select(
col("fullName"),
col("phone"),
col("address.street"),
col("address.city"),
col("address.state")).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using System.Net;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class Contacts
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("contacts")
.GetOrCreate();
string filePath = "data/contacts.json";
// schema
StructField[] fields = {
new StructField("name", new StructType(new[] {
new StructField("firstName", new StringType(), false),
new StructField("lastName", new StringType(), false)
}), false),
new StructField("phone", new StringType(), false),
new StructField("address", new StructType(new[] {
new StructField("street", new StringType(), false),
new StructField("city", new StringType(), false),
new StructField("state", new StringType(), false)
}), false),
};
StructType schema = new StructType(fields);
// initial data frame - products as string
DataFrame contacts = spark.Read()
.Format("json")
.Option("multiline", true) // important!
.Schema(schema)
.Load(filePath);
contacts = contacts.Na().Drop("all");
contacts.Show();
contacts.PrintSchema();
// select nested values
contacts.WithColumn("fullName",
ConcatWs(" ", Col("name.firstName"), Col("name.lastName")))
.Select(
Col("fullName"),
Col("phone"),
Col("address.street"),
Col("address.city"),
Col("address.state")
).Show();
spark.Stop();
}
}
}