Let’s play with Spark SQL more.
[Note] When the underlying DataFrame schema is changed, the view should be updated again.
Data File
- Create a csv file with the following content.
first name,last name,age
Paul,Henderson,20
Grace,Carr,32
Julia,Jackson,14
Alexandra,May,43
Anna,Jones,24
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("people-sql-group")
.getOrCreate())
data_file = "test/data/people.csv"
# schema
custom_schema = StructType([
StructField("first name", StringType(), False),
StructField("last name", StringType(), False),
StructField("age", IntegerType(), False)])
data_df = (spark.read.format("csv")
.option("header", True)
.schema(custom_schema)
.load(data_file))
data_df = (data_df
.withColumnRenamed("first name", "firstname")
.withColumnRenamed("last name", "lastname"))
# create a view
data_df.createOrReplaceTempView("people_view")
# spark sql - projection
query = """SELECT firstname, lastname, age
FROM people_view
"""
spark.sql(query).show()
# add a new column to the dataframe
data_df = data_df.withColumn("ageGroup", floor(col("age")/10)*10)
# the following query will fail
# the view does not have a new column
query = "SELECT ageGroup FROM people_view"
#spark.sql(query).show()
# need to replace the view
data_df.createOrReplaceTempView("people_view")
spark.sql(query).show()
# spark sql - grouping
query = """SELECT ageGroup, COUNT(*) AS count
FROM people_view
GROUP BY ageGroup
ORDER BY ageGroup DESC
"""
spark.sql(query).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class PeopleSqlGrouping
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("column-test")
.GetOrCreate();
string filePath = "data/people.csv";
// schema
StructField[] fields = {
new StructField("first name", new StringType(), false),
new StructField("last name", new StringType(), false),
new StructField("age", new IntegerType(), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame dataDF = spark.Read()
.Format("csv")
.Option("header", true)
.Schema(schema)
.Load(filePath);
dataDF = dataDF
.WithColumnRenamed("first name", "firstname")
.WithColumnRenamed("last name", "lastname");
// create a view
dataDF.CreateOrReplaceTempView("people_view");
// spark sql - projection
string query = @"
SELECT firstname, lastname, age
FROM people_view";
spark.Sql(query).Show();
// # add a new column to the dataframe
dataDF = dataDF.WithColumn(
"ageGroup",
Floor(Col("age") / 10) * 10);
// the following query will fail
// the view does not have a new column
query = "SELECT ageGroup FROM people_view";
// spark.Sql(query).Show();
// need to replace the view
dataDF.CreateOrReplaceTempView("people_view");
spark.Sql(query).Show();
// spark sql - grouping
query = @"
SELECT ageGroup, COUNT(*) AS count
FROM people_view
GROUP BY ageGroup
ORDER BY ageGroup DESC";
spark.Sql(query).Show();
spark.Stop();
}
}
}