[Spark By Example] Explode and Collect

In Spark, you dealing with array or map data as they come. The following example shows how to convert collection data to rows and vice versa.


Explode and Collect Functions

You can use the following functions.

  • explode()
    • The explode() function returns a new row for each element in the given array or map.
  • collect_list()
    • The collect_list() function aggregates the values into an ArrayType typically after group by. It includes all duplicates.
  • collect_set()
    • The collect_set() function is similar to collect_list() but eliminates the duplicates and results in unique for each value.
explode(column) 

collec_list(column)

collect_set(column)

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":

  spark = (SparkSession
    .builder
    .appName("ExplodeAndCollect")
    .getOrCreate())

  data_file = "test/data/employees.json"

  # schema
  custom_schema = StructType(fields=[
    StructField("id", IntegerType(), False),     
    StructField("name", MapType(StringType(), StringType()), False),     
    StructField("technologies", ArrayType(StringType()), False),   
  ])

  # read json with schema
  employees = (spark.read.format("json")
    .option("multiline", True)
    .schema(custom_schema)
    .load(data_file))
  
  employees.printSchema()

  # 3 rows
  employees.show()
  
  # explode name -- maps
  name_df = employees.select(
    col("id"),
    explode(col("name").alias("key", "value")))
  name_df.show()

  # concat first name and last name + explode technologies (list)
  employees_fullname = employees.withColumn("fullName", 
      (concat_ws(" ", col("name.first"), col("name.last")))
    ).select(
    col("id"),
    col("fullname"),
    explode(col("technologies")).alias("technology"))
  employees_fullname.show()

  # collect_list - allow duplicates
  employees_tech_list = employees_fullname.groupBy("fullname").agg(
    collect_list("technology").alias("tech-list"))
  employees_tech_list.show()

  # collect_set - remove duplicates
  employees_tech_set = employees_fullname.groupBy("fullname").agg(
    collect_set("technology").alias("tech-set"))
  employees_tech_set.show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class ExplodeAndCollect
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("ExplodeAndCollect")
                    .GetOrCreate();

            string filePath = "data/employees.json";

            // schema
            StructField[] fields = {
                new StructField("id", new IntegerType(), false),
                new StructField("name", 
                    new MapType(new StringType(), new StringType()), false),
                new StructField("technologies",
                    new ArrayType(new StringType()), false)
            };
            StructType schema = new StructType(fields);

            // initial data frame
            DataFrame employees = spark.Read()
                .Format("json")
                .Option("multiline", true)
                .Schema(schema)
                .Load(filePath);
            employees.PrintSchema();

            // 3 rows
            employees.Show();

            // explode name -- maps
            DataFrame nameDf =
                employees.Select(
                    Col("id"),
                    Explode(Col("name").As(new String[] { "key", "value" })));
            nameDf.Show();

            // concat first name and last name + explode technologies (list)
            DataFrame employeesFullName =
                employees
                .WithColumn("fullName", 
                    ConcatWs(" ", Col("name.first"), Col("name.last")))
                .Select(
                    Col("id"),
                    Col("fullName"),
                    Explode(Col("technologies")).Alias("technology"));
            employeesFullName.Show();

            // collect_list - allow duplicates
            DataFrame employeesTechList =
                employeesFullName.GroupBy("fullname").Agg(
                    CollectList("technology").Alias("tech-list"));
            employeesTechList.Show();

            // collect_set - remove duplicates
            DataFrame employeesTechSet =
                employeesFullName.GroupBy("fullname").Agg(
                    CollectSet("technology").Alias("tech-set"));
            employeesTechSet.Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s