[Spark By Example] Aggregation

The following sample code (by Python and C#) shows how to group DataFrame and compute the aggregated values such as Max, Min, or Average.

Aggregation

  • Grouping: groups the DataFrame using the specified columns, so we can run aggregation on them.
    • groupby(list of columns …)
  • Filtering:
    • where(condition)
    • filter(condition)
  • Aggregate Functions
    • count(), count(column)
    • max(column)
    • min(column)
    • avg(column)
    • sum(column)
  • Aggregation: computes aggregates and returns the result as a DataFrame
    • agg(expr)

Data File

  • Create a csv file with the following content.
product,price
Milk,3.99
Bread,4.5
Bread,4.25
Egg,2.99
Milk,4.3
Egg,3.49
Bread,4.15
Egg,3.75
Milk,3.89
Egg,3.15

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("aggregation-test")
          .getOrCreate())

  data_file = "test/data/grocery-list.csv"

  # schema
  custom_schema = StructType([
        StructField("product", StringType(), False),
        StructField("price", DoubleType(), False)])

  grocery_price = (spark.read.format("csv")
          .option("header", True)
          .schema(custom_schema)
          .load(data_file))
  grocery_price.show()
    
  # no grouping
  # just find the max, min, and average price
  grocery_price.select(
        format_number(max(col("price")), 2).alias("MaxPrice"),
        format_number(min(col("price")), 2).alias("MinPrice"),
        format_number(avg(col("price")), 2).alias("AvgPrice")
  ).show()

  # get the number of items of each group
  grocery_price.groupBy(col("product")).count().show()

  # get the aggregate values of each group
  grouped_price = (grocery_price
        .groupBy(col("product"))
        .agg(
          count(col("price")),
          max(col("price")).alias("MaxPrice"),
          min(col("price")).alias("MinPrice"),
          avg(col("price")).alias("AveragePrice"),
          sum(col("price")).alias("TotalPrice")
        )
  )
  grouped_price.show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class AggregationTest
    {
        static public void Run()
        {
            // Create Spark session
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("aggregation-test")
                    .GetOrCreate();

            // Create initial DataFrame
            string dataFile = "data/grocery-list.csv";

            // schema
            StructField[] fields = {
                new StructField("product", new StringType(), false),
                new StructField("price", new DoubleType(), false)
            };
            StructType schema = new StructType(fields);

            // initial data frame
            DataFrame groceryList= spark.Read()
                .Format("csv")
                .Option("header", true)
                .Schema(schema)
                .Load(dataFile);
            groceryList.Show();

            // no grouping
            // just find the max, min, and average price
            groceryList.Select(
                    FormatNumber(Max(Col("price")), 2).Alias("MaxPrice"),
                    FormatNumber(Min(Col("price")), 2).Alias("MinPrice"),
                    FormatNumber(Avg(Col("price")), 2).Alias("AvgPrice")
                ).Show();

            // get the number of items of each group
            groceryList
                .GroupBy(Col("product"))
                .Count()
                .Show();

            // # get the aggregate values of each group
            DataFrame groupedPrice = groceryList
                .GroupBy(Col("product"))
                .Agg(
                    Count(Col("price")),
                    Max(Col("price")).Alias("MaxPrice"),
                    Min(Col("price")).Alias("MinPrice"),
                    Avg(Col("price")).Alias("AveragePrice"),
                    Sum(Col("price")).Alias("TotalPrice")
                );
            groupedPrice.Show();

            // Stop Spark session
            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s