The following sample code (by Python and C#) shows how to read CSV file with a column of array.
CSV does not support complex objects such as an array. To make it work, you need to pass a JSON array in CSV and parse it. You will learn this trick here.
JSON array format is like this.
["Ford", "Toyota", "BMW", "Fiat"]
Data File
- Create a csv file with the following content.
When you embed JSON strings to CSV, you need to make sure you properly wrap them with double quotes (it will disregard commas as a separator) and espace the inner double quote characters.
id,category,products
1,Appliances,"[""Washer"",""Dryer"",""Refrigerator""]"
2,Clothing,"[""Sweater"",""Jacket""]"
3,Grocery,"[""Bread"",""Coffee"",""Milk""]"
4,Electronics,"[""TV"",""Computer"",""Camera"", ""Speaker""]"
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("product-category")
.getOrCreate())
data_file = "test/data/product-category.csv"
# schema
custom_schema = StructType([
StructField("id", IntegerType(), False),
StructField("category", StringType(), False),
StructField("products", StringType(), True)])
# initial data frame - products as string
category = (spark.read.format("csv")
.option("header", True)
.option("escape", '"')
.schema(custom_schema)
.load(data_file))
category.show()
# convert the products column as an array of strings
category_products = category.withColumn("products",
from_json("products", ArrayType(StringType())))
category_products.show()
category_products.printSchema()
print(category_products.schema)
print()
# expand products with the category
category_products.select(
col("id"),
col("category"),
explode(col("products")).alias("product")).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class ProductCategory
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("product-category")
.GetOrCreate();
string filePath = "data/product-category.csv";
// schema
StructField[] fields = {
new StructField("id", new IntegerType(), false),
new StructField("category", new StringType(), false),
new StructField("products", new StringType(), true)
};
StructType schema = new StructType(fields);
// initial data frame - products as string
DataFrame csvDataDF = spark.Read()
.Format("csv")
.Option("header", true)
.Option("escape", "\"")
.Schema(schema)
.Load(filePath);
csvDataDF.Show();
// convert the products column as an array of strings
string arrayOfStringJsonSchema = (new ArrayType(new StringType())).Json;
Console.WriteLine(arrayOfStringJsonSchema);
DataFrame dataDF = csvDataDF
.WithColumn("products",
Functions.FromJson(Functions.Col("products"),
arrayOfStringJsonSchema));
dataDF.Show();
dataDF.PrintSchema();
Console.WriteLine(dataDF.Schema().SimpleString);
// expand products with the category
dataDF.Select(
Col("id"),
Col("category"),
Explode(Col("products")).Alias("product")
).Show();
spark.Stop();
}
}
}