[Spark By Example] Read JSON – Array Type

The following sample code (by Python and C#) shows how to read JSON file with array data.

With JSON, it is easy to specify the schema. Therefore, you can directly parse the array data into the DataFrame.


Data File

  • Create a json file with the following content.
[
  {
    "id": 1,
    "category": "Appliances",
    "products": [ "Washer", "Dryer", "Refrigerator" ]
  },
  {
    "id": 2,
    "category": "Clothing",
    "products": [ "Sweater", "Jacket" ]
  },
  {
    "id": 3,
    "category": "Grocery",
    "products": [ "Bread", "Coffee", "Milk" ]
  },
  {
    "id": 4,
    "category": "Electronics",
    "products": [ "TV", "Computer", "Camera", "Speaker" ]
  }
]

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("product-category")
          .getOrCreate())

  data_file = "test/data/product-category.json"

  # schema
  custom_schema = StructType([
        StructField("id", IntegerType(), False),
        StructField("category", StringType(), False),
        StructField("products", ArrayType(StringType()), True)])

  # read json with schema
  category = (spark.read.format("json")
          .option("multiline", True) # important!
          .schema(custom_schema)
          .load(data_file))
  category= category.na.drop("all")
  category.show()
  category.printSchema()
  print()

  # expand products with the category
  category.select(
        col("id"), 
        col("category"), 
        explode(col("products")).alias("product")).show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class ProductCategoryJson
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("product-category")
                    .GetOrCreate();

            string filePath = "data/product-category.json";

            // schema
            StructField[] fields = {
                new StructField("id", new IntegerType(), false),
                new StructField("category", new StringType(), false),
                new StructField("products", new ArrayType(new StringType()), true)
            };
            StructType schema = new StructType(fields);

            // initial data frame - products as string
            DataFrame category = spark.Read()
                .Format("json")
                .Option("multiline", true)
                .Schema(schema)
                .Load(filePath);
            category = category.Na().Drop("all"); 

            category.Show();
            category.PrintSchema();

            // expand products with the category
            category.Select(
                Col("id"),
                Col("category"), 
                Explode(Col("products")).Alias("product")
            ).Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s