The following sample code (by Python and C#) shows how to read JSON file with array data.
With JSON, it is easy to specify the schema. Therefore, you can directly parse the array data into the DataFrame.
Data File
- Create a json file with the following content.
[
{
"id": 1,
"category": "Appliances",
"products": [ "Washer", "Dryer", "Refrigerator" ]
},
{
"id": 2,
"category": "Clothing",
"products": [ "Sweater", "Jacket" ]
},
{
"id": 3,
"category": "Grocery",
"products": [ "Bread", "Coffee", "Milk" ]
},
{
"id": 4,
"category": "Electronics",
"products": [ "TV", "Computer", "Camera", "Speaker" ]
}
]
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("product-category")
.getOrCreate())
data_file = "test/data/product-category.json"
# schema
custom_schema = StructType([
StructField("id", IntegerType(), False),
StructField("category", StringType(), False),
StructField("products", ArrayType(StringType()), True)])
# read json with schema
category = (spark.read.format("json")
.option("multiline", True) # important!
.schema(custom_schema)
.load(data_file))
category= category.na.drop("all")
category.show()
category.printSchema()
print()
# expand products with the category
category.select(
col("id"),
col("category"),
explode(col("products")).alias("product")).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class ProductCategoryJson
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("product-category")
.GetOrCreate();
string filePath = "data/product-category.json";
// schema
StructField[] fields = {
new StructField("id", new IntegerType(), false),
new StructField("category", new StringType(), false),
new StructField("products", new ArrayType(new StringType()), true)
};
StructType schema = new StructType(fields);
// initial data frame - products as string
DataFrame category = spark.Read()
.Format("json")
.Option("multiline", true)
.Schema(schema)
.Load(filePath);
category = category.Na().Drop("all");
category.Show();
category.PrintSchema();
// expand products with the category
category.Select(
Col("id"),
Col("category"),
Explode(Col("products")).Alias("product")
).Show();
spark.Stop();
}
}
}

