In Spark, you dealing with array or map data as they come. The following example shows how to convert collection data to rows and vice versa.
Explode and Collect Functions
You can use the following functions.
- explode()
- The explode() function returns a new row for each element in the given array or map.
- collect_list()
- The collect_list() function aggregates the values into an ArrayType typically after group by. It includes all duplicates.
- collect_set()
- The collect_set() function is similar to collect_list() but eliminates the duplicates and results in unique for each value.
explode(column)
collec_list(column)
collect_set(column)
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("ExplodeAndCollect")
.getOrCreate())
data_file = "test/data/employees.json"
# schema
custom_schema = StructType(fields=[
StructField("id", IntegerType(), False),
StructField("name", MapType(StringType(), StringType()), False),
StructField("technologies", ArrayType(StringType()), False),
])
# read json with schema
employees = (spark.read.format("json")
.option("multiline", True)
.schema(custom_schema)
.load(data_file))
employees.printSchema()
# 3 rows
employees.show()
# explode name -- maps
name_df = employees.select(
col("id"),
explode(col("name").alias("key", "value")))
name_df.show()
# concat first name and last name + explode technologies (list)
employees_fullname = employees.withColumn("fullName",
(concat_ws(" ", col("name.first"), col("name.last")))
).select(
col("id"),
col("fullname"),
explode(col("technologies")).alias("technology"))
employees_fullname.show()
# collect_list - allow duplicates
employees_tech_list = employees_fullname.groupBy("fullname").agg(
collect_list("technology").alias("tech-list"))
employees_tech_list.show()
# collect_set - remove duplicates
employees_tech_set = employees_fullname.groupBy("fullname").agg(
collect_set("technology").alias("tech-set"))
employees_tech_set.show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class ExplodeAndCollect
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("ExplodeAndCollect")
.GetOrCreate();
string filePath = "data/employees.json";
// schema
StructField[] fields = {
new StructField("id", new IntegerType(), false),
new StructField("name",
new MapType(new StringType(), new StringType()), false),
new StructField("technologies",
new ArrayType(new StringType()), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame employees = spark.Read()
.Format("json")
.Option("multiline", true)
.Schema(schema)
.Load(filePath);
employees.PrintSchema();
// 3 rows
employees.Show();
// explode name -- maps
DataFrame nameDf =
employees.Select(
Col("id"),
Explode(Col("name").As(new String[] { "key", "value" })));
nameDf.Show();
// concat first name and last name + explode technologies (list)
DataFrame employeesFullName =
employees
.WithColumn("fullName",
ConcatWs(" ", Col("name.first"), Col("name.last")))
.Select(
Col("id"),
Col("fullName"),
Explode(Col("technologies")).Alias("technology"));
employeesFullName.Show();
// collect_list - allow duplicates
DataFrame employeesTechList =
employeesFullName.GroupBy("fullname").Agg(
CollectList("technology").Alias("tech-list"));
employeesTechList.Show();
// collect_set - remove duplicates
DataFrame employeesTechSet =
employeesFullName.GroupBy("fullname").Agg(
CollectSet("technology").Alias("tech-set"));
employeesTechSet.Show();
spark.Stop();
}
}
}