[Spark By Example] DataFrame Query

The following sample code (by Python and C#) shows how to read query data in the DataFrame. Once you have a DataFrame object, you can query the data using the SQL-like syntax regardless of the origin of data.

Query

  • Projection
    • select(list of columns …)
  • Filtering:
    • where(condition)
    • filter(condition)
  • Sorting: ascending by default. Use column’s asc() and desc() functions.
    • orderby(list of columns …)
    • sort(list of columns …)

Data File

  • Create a csv file with the following content.
code,territory,country
403,Calgary,Canada
201,New Jersey,US
407,Orlando,US
204,Manitoba,Canada
211,"Manhattan, New York",US
306,Saskatchewan,Canada
404,Atlanta,US
418,Quebec City,Canada
202,Washington D.C,US

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  spark = (SparkSession
          .builder
          .appName("dataframe-query")
          .getOrCreate())

  data_file = "test/data/area-code.csv"

  # schema
  custom_schema = StructType([
        StructField("code", StringType(), False),
        StructField("territory", StringType(), False),
        StructField("country", StringType(), False)])

  area_code = (spark.read.format("csv")
          .option("header", True) 
          .option("escape", '"')
          .schema(custom_schema)
          .load(data_file))
  area_code.show()
    
  # simple projection - select
  area_code.select(
        col("code"), col("territory")
  ).show()

  # filtering - where
  area_code.select(
        col("code").alias("code in US"),
        col("territory")
  ).where(col("country") == "US").show()

  # sorting - ascending
  area_code.orderBy(
        col("country"), # asc default
        col("code").asc()
  ).show()

  # sorting - descending
  area_code.orderBy(
        col("country").desc(),
        col("code").desc()
  ).show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using System;
using static Microsoft.Spark.Sql.Functions;

namespace MySpark.Examples
{
    internal class DataFrameQuery
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("dataframe-query")
                    .GetOrCreate();

            string filePath = "data/area-code.csv";

            // schema
            StructField[] fields = {
                new StructField("code", new StringType(), false),
                new StructField("territory", new StringType(), false),
                new StructField("country", new StringType(), false)
            };
            StructType schema = new StructType(fields);

            // initial data frame
            DataFrame areaCode = spark.Read()
                .Format("csv")
                .Option("header", true)
                .Option("escape", "\"")
                .Schema(schema)
                .Load(filePath);

            areaCode.Show();

            // simple projection - select
            areaCode.Select(
                Col("code"), Col("territory")
                ).Show();

            // filtering - where
            areaCode.Select(
                Col("code").Alias("code in US"),
                Col("territory")
            ).Where(Col("country") == "US")
            .Show();

            // sorting - ascending
            areaCode.OrderBy(
                Col("country"), // asc default
                Col("code").Asc()
            ).Show();

            // sorting - descending
            areaCode.OrderBy(
                Col("country").Desc(),
                Col("code").Desc()
            ).Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s