[Spark By Example] Read JSON – Array Type

The following sample code (by Python and C#) shows how to read JSON file with array data.

With JSON, it is easy to specify the schema. Therefore, you can directly parse the array data into the DataFrame.


Data File

  • Create a json file with the following content.
[
  {
    "id": 1,
    "category": "Appliances",
    "products": [ "Washer", "Dryer", "Refrigerator" ]
  },
  {
    "id": 2,
    "category": "Clothing",
    "products": [ "Sweater", "Jacket" ]
  },
  {
    "id": 3,
    "category": "Grocery",
    "products": [ "Bread", "Coffee", "Milk" ]
  },
  {
    "id": 4,
    "category": "Electronics",
    "products": [ "TV", "Computer", "Camera", "Speaker" ]
  }
]

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("product-category")
          .getOrCreate())

  data_file = "test/data/product-category.json"

  # schema
  custom_schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("category", StringType(), False),
        StructField("products", ArrayType(StringType()), True)])

  # read json with schema
  category = (spark.read.format("json")
          .option("multiline", True) # important!
          .schema(custom_schema)
          .load(data_file))
  category= category.na.drop("all")
  category.show()
  category.printSchema()
  print()

  # expand products with the category
  category.select(
        col("id"), 
        col("category"), 
        explode(col("products")).alias("product")).show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class ProductCategoryJson
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("product-category")
                    .GetOrCreate();

            string filePath = "data/product-category.json";

            // schema
            StructField[] fields = {
                new StructField("id", new IntegerType(), false),
                new StructField("category", new StringType(), false),
                new StructField("products", new ArrayType(new StringType()), true)
            };
            StructType schema = new StructType(fields);

            // initial data frame - products as string
            DataFrame category = spark.Read()
                .Format("json")
                .Option("multiline", true)
                .Schema(schema)
                .Load(filePath);
            category = category.Na().Drop("all"); 

            category.Show();
            category.PrintSchema();

            // expand products with the category
            category.Select(
                Col("id"),
                Col("category"), 
                Explode(Col("products")).Alias("product")
            ).Show();

            spark.Stop();
        }
    }
}

Leave a Comment