The following sample code (by Python and C#) shows how to read JSON file with schema. With JSON, it is always a good idea to provide the schema for your data.
Data File
- Create a json file with the following content.
[{"product": "Milk", "price": 3.99},
{"product": "Bread", "price": 4.5},
{"product": "Bread", "price": 4.25},
{"product": "Egg", "price": 2.99},
{"product": "Milk", "price": 4.3},
{"product": "Egg", "price": 3.49},
{"product": "Bread", "price": 4.15},
{"product": "Egg", "price": 3.75},
{"product": "Milk", "price": 3.89},
{"product": "Egg", "price": 3.15}]
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("grocery-price")
.getOrCreate())
data_file = "test/data/grocery-price.json"
# schema
custom_schema = StructType([
StructField("product", StringType(), False),
StructField("price", DoubleType(), False)])
# read json with schema
grocery_price = (spark.read.format("json")
.option("multiline", True) # important!
.schema(custom_schema)
.load(data_file))
grocery_price = grocery_price.na.drop("all")
grocery_price.show()
# group by product and get the average price
average_price = (grocery_price
.groupBy("product")
.agg(avg("price")))
average_price.show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class GroceryListJson
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("grocery-price")
.GetOrCreate();
string filePath = "data/grocery-price.json";
// schema
StructField[] fields = {
new StructField("product", new StringType(), false),
new StructField("price", new DoubleType(), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame groceryPrice = spark.Read()
.Format("json")
.Option("multiline", true)
.Schema(schema)
.Load(filePath);
groceryPrice = groceryPrice.Na().Drop("all");
groceryPrice.Show();
// Group by product and Get the average price
DataFrame averagePrice =
groceryPrice.GroupBy("product")
.Agg(Avg("price"));
averagePrice.Show();
spark.Stop();
}
}
}