The following sample code (by Python and C#) shows how to group DataFrame and compute the aggregated values such as Max, Min, or Average.
Aggregation
- Grouping: groups the DataFrame using the specified columns, so we can run aggregation on them.
- groupby(list of columns …)
- Filtering:
- where(condition)
- filter(condition)
- Aggregate Functions
- count(), count(column)
- max(column)
- min(column)
- avg(column)
- sum(column)
- Aggregation: computes aggregates and returns the result as a DataFrame
- agg(expr)
Data File
- Create a csv file with the following content.
product,price
Milk,3.99
Bread,4.5
Bread,4.25
Egg,2.99
Milk,4.3
Egg,3.49
Bread,4.15
Egg,3.75
Milk,3.89
Egg,3.15
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("aggregation-test")
.getOrCreate())
data_file = "test/data/grocery-list.csv"
# schema
custom_schema = StructType([
StructField("product", StringType(), False),
StructField("price", DoubleType(), False)])
grocery_price = (spark.read.format("csv")
.option("header", True)
.schema(custom_schema)
.load(data_file))
grocery_price.show()
# no grouping
# just find the max, min, and average price
grocery_price.select(
format_number(max(col("price")), 2).alias("MaxPrice"),
format_number(min(col("price")), 2).alias("MinPrice"),
format_number(avg(col("price")), 2).alias("AvgPrice")
).show()
# get the number of items of each group
grocery_price.groupBy(col("product")).count().show()
# get the aggregate values of each group
grouped_price = (grocery_price
.groupBy(col("product"))
.agg(
count(col("price")),
max(col("price")).alias("MaxPrice"),
min(col("price")).alias("MinPrice"),
avg(col("price")).alias("AveragePrice"),
sum(col("price")).alias("TotalPrice")
)
)
grouped_price.show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class AggregationTest
{
static public void Run()
{
// Create Spark session
SparkSession spark =
SparkSession
.Builder()
.AppName("aggregation-test")
.GetOrCreate();
// Create initial DataFrame
string dataFile = "data/grocery-list.csv";
// schema
StructField[] fields = {
new StructField("product", new StringType(), false),
new StructField("price", new DoubleType(), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame groceryList= spark.Read()
.Format("csv")
.Option("header", true)
.Schema(schema)
.Load(dataFile);
groceryList.Show();
// no grouping
// just find the max, min, and average price
groceryList.Select(
FormatNumber(Max(Col("price")), 2).Alias("MaxPrice"),
FormatNumber(Min(Col("price")), 2).Alias("MinPrice"),
FormatNumber(Avg(Col("price")), 2).Alias("AvgPrice")
).Show();
// get the number of items of each group
groceryList
.GroupBy(Col("product"))
.Count()
.Show();
// # get the aggregate values of each group
DataFrame groupedPrice = groceryList
.GroupBy(Col("product"))
.Agg(
Count(Col("price")),
Max(Col("price")).Alias("MaxPrice"),
Min(Col("price")).Alias("MinPrice"),
Avg(Col("price")).Alias("AveragePrice"),
Sum(Col("price")).Alias("TotalPrice")
);
groupedPrice.Show();
// Stop Spark session
spark.Stop();
}
}
}