[Spark By Example] Read CSV – Array Type

The following sample code (by Python and C#) shows how to read CSV file with a column of array.

CSV does not support complex objects such as an array. To make it work, you need to pass a JSON array in CSV and parse it. You will learn this trick here.

JSON array format is like this.

["Ford", "Toyota", "BMW", "Fiat"]

Data File

  • Create a csv file with the following content.

When you embed JSON strings to CSV, you need to make sure you properly wrap them with double quotes (it will disregard commas as a separator) and espace the inner double quote characters.

id,category,products
1,Appliances,"[""Washer"",""Dryer"",""Refrigerator""]"
2,Clothing,"[""Sweater"",""Jacket""]"
3,Grocery,"[""Bread"",""Coffee"",""Milk""]"
4,Electronics,"[""TV"",""Computer"",""Camera"", ""Speaker""]"

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("product-category")
          .getOrCreate())

  data_file = "test/data/product-category.csv"

  # schema
  custom_schema = StructType([        
        StructField("id", IntegerType(), False),
        StructField("category", StringType(), False),
        StructField("products", StringType(), True)])

  # initial data frame - products as string
  category = (spark.read.format("csv")
          .option("header", True)
          .option("escape", '"')
          .schema(custom_schema)
          .load(data_file))
  category.show()

  # convert the products column as an array of strings
  category_products = category.withColumn("products", 
        from_json("products", ArrayType(StringType())))
  category_products.show()
  category_products.printSchema()
  print(category_products.schema)
  print()

  # expand products with the category
  category_products.select(
        col("id"), 
        col("category"), 
        explode(col("products")).alias("product")).show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;

using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class ProductCategory
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("product-category")
                    .GetOrCreate();

            string filePath = "data/product-category.csv";

            // schema
            StructField[] fields = {
                new StructField("id", new IntegerType(), false),
                new StructField("category", new StringType(), false),
                new StructField("products", new StringType(), true)
            };
            StructType schema = new StructType(fields);

            // initial data frame - products as string
            DataFrame csvDataDF = spark.Read()
                .Format("csv")
                .Option("header", true)
                .Option("escape", "\"")
                .Schema(schema)
                .Load(filePath);

            csvDataDF.Show();

            // convert the products column as an array of strings
            string arrayOfStringJsonSchema = (new ArrayType(new StringType())).Json;
            Console.WriteLine(arrayOfStringJsonSchema);

            DataFrame dataDF = csvDataDF
                .WithColumn("products", 
                    Functions.FromJson(Functions.Col("products"),
                    arrayOfStringJsonSchema));

            dataDF.Show();
            dataDF.PrintSchema();
            Console.WriteLine(dataDF.Schema().SimpleString);

            // expand products with the category
            dataDF.Select(
                Col("id"),
                Col("category"), 
                Explode(Col("products")).Alias("product")
            ).Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s