[Spark By Example] Read JSON – Complex Type

The following sample code (by Python and C#) shows how to read JSON file with complex objects.

CSV handles the flat data structure. But with JSON, you can read complex data structure into the DataFrame.


Data File

  • Create a json file with the following content.
[
  {
    "name": {
      "firstName": "Carl",
      "lastName": "Adams"
    },
    "phone": "101-123-4567",
    "address": {
      "street": "1 Main St.",
      "city": "New York",
      "state": "NY"
    }
  },
  {
    "name": {
      "firstName": "Abigail",
      "lastName": "Hills"
    },
    "phone": "401-799-1234",
    "address": {
      "street": "34 Timer Dr.",
      "city": "Cambria",
      "state": "CA"
    }
  },
  {
    "name": {
      "firstName": "Anne",
      "lastName": "Smith"
    },
    "phone": "343-100-1698",
    "address": {
      "street": "100 King St.",
      "city": "El Paso",
      "state": "TX"
    }
  },
  {
    "name": {
      "firstName": "Eric",
      "lastName": "Baker"
    },
    "phone": "771-324-2345",
    "address": {
      "street": "402 Bolman Court",
      "city": "Southfield",
      "state": "MI"
    }
  }
]

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("contacts")
          .getOrCreate())

  data_file = "test/data/contacts.json"

  # schema  
  custom_schema = StructType([        
        StructField("name", StructType([        
            StructField("firstName", StringType(), False),
            StructField("lastName", StringType(), False)
            ]), False),
        StructField("phone", StringType(), False),
        StructField("address", StructType([        
            StructField("street", StringType(), False),
            StructField("city", StringType(), False),
            StructField("state", StringType(), False)
            ]), False),
        ])

  # read json with schema
  contacts = (spark.read.format("json")
          .option("multiline", True) # important!
          .schema(custom_schema)
          .load(data_file))
  contacts = contacts.na.drop("all")

  contacts.show()
  contacts.printSchema()

  # select nested values 
  contacts.withColumn("fullName", 
        (concat_ws(" ", col("name.firstName"), expr("name.lastName"))
        )).select(
      col("fullName"),
      col("phone"),
      col("address.street"),
      col("address.city"),
      col("address.state")).show()

  spark.stop()


C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using System.Net;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class Contacts
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("contacts")
                    .GetOrCreate();

            string filePath = "data/contacts.json";

            // schema
            StructField[] fields = {
                new StructField("name", new StructType(new[] {
                    new StructField("firstName", new StringType(), false),
                    new StructField("lastName", new StringType(), false)
                }), false),
                new StructField("phone", new StringType(), false),
                new StructField("address", new StructType(new[] {
                    new StructField("street", new StringType(), false),
                    new StructField("city", new StringType(), false),
                    new StructField("state", new StringType(), false)
                }), false),
            };
            StructType schema = new StructType(fields);

            // initial data frame - products as string
            DataFrame contacts = spark.Read()
                .Format("json")
                .Option("multiline", true) // important!
                .Schema(schema)
                .Load(filePath);
            contacts = contacts.Na().Drop("all");

            contacts.Show();
            contacts.PrintSchema();

            // select nested values 
            contacts.WithColumn("fullName",
                    ConcatWs(" ", Col("name.firstName"), Col("name.lastName")))
                .Select(
                    Col("fullName"),
                    Col("phone"), 
                    Col("address.street"),
                    Col("address.city"),
                    Col("address.state")
                ).Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s