[Spark By Example] Read JSON with Schema

The following sample code (by Python and C#) shows how to read JSON file with schema. With JSON, it is always a good idea to provide the schema for your data.

Data File

  • Create a json file with the following content.
[{"product": "Milk", "price": 3.99},
{"product": "Bread", "price": 4.5},
{"product": "Bread", "price": 4.25},
{"product": "Egg", "price": 2.99},
{"product": "Milk", "price": 4.3},
{"product": "Egg", "price": 3.49},
{"product": "Bread", "price": 4.15},
{"product": "Egg", "price": 3.75},
{"product": "Milk", "price": 3.89},
{"product": "Egg", "price": 3.15}]

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("grocery-price")
          .getOrCreate())

  data_file = "test/data/grocery-price.json"

  # schema
  custom_schema = StructType([
        StructField("product", StringType(), False),
        StructField("price", DoubleType(), False)])

  # read json with schema
  grocery_price = (spark.read.format("json")
          .option("multiline", True) # important!
          .schema(custom_schema)
          .load(data_file))
  grocery_price = grocery_price.na.drop("all")
  grocery_price.show()
    
  # group by product and get the average price
  average_price = (grocery_price
          .groupBy("product")
          .agg(avg("price")))
  average_price.show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class GroceryListJson
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("grocery-price")
                    .GetOrCreate();

            string filePath = "data/grocery-price.json";

            // schema
            StructField[] fields = {
                new StructField("product", new StringType(), false),
                new StructField("price", new DoubleType(), false)
            };
            StructType schema = new StructType(fields);

            // initial data frame
            DataFrame groceryPrice = spark.Read()
                .Format("json")
                .Option("multiline", true)
                .Schema(schema)
                .Load(filePath);
            groceryPrice = groceryPrice.Na().Drop("all");
            groceryPrice.Show();

            // Group by product and Get the average price
            DataFrame averagePrice =
                groceryPrice.GroupBy("product")
                .Agg(Avg("price"));

            averagePrice.Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s