The following sample code (by Python and C#) shows how to read query data in the DataFrame. Once you have a DataFrame object, you can query the data using the SQL-like syntax regardless of the origin of data.
Query
- Projection
- select(list of columns …)
- Filtering:
- where(condition)
- filter(condition)
- Sorting: ascending by default. Use column’s asc() and desc() functions.
- orderby(list of columns …)
- sort(list of columns …)
Data File
- Create a csv file with the following content.
code,territory,country
403,Calgary,Canada
201,New Jersey,US
407,Orlando,US
204,Manitoba,Canada
211,"Manhattan, New York",US
306,Saskatchewan,Canada
404,Atlanta,US
418,Quebec City,Canada
202,Washington D.C,US
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = (SparkSession
.builder
.appName("dataframe-query")
.getOrCreate())
data_file = "test/data/area-code.csv"
# schema
custom_schema = StructType([
StructField("code", StringType(), False),
StructField("territory", StringType(), False),
StructField("country", StringType(), False)])
area_code = (spark.read.format("csv")
.option("header", True)
.option("escape", '"')
.schema(custom_schema)
.load(data_file))
area_code.show()
# simple projection - select
area_code.select(
col("code"), col("territory")
).show()
# filtering - where
area_code.select(
col("code").alias("code in US"),
col("territory")
).where(col("country") == "US").show()
# sorting - ascending
area_code.orderBy(
col("country"), # asc default
col("code").asc()
).show()
# sorting - descending
area_code.orderBy(
col("country").desc(),
col("code").desc()
).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using System;
using static Microsoft.Spark.Sql.Functions;
namespace MySpark.Examples
{
internal class DataFrameQuery
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("dataframe-query")
.GetOrCreate();
string filePath = "data/area-code.csv";
// schema
StructField[] fields = {
new StructField("code", new StringType(), false),
new StructField("territory", new StringType(), false),
new StructField("country", new StringType(), false)
};
StructType schema = new StructType(fields);
// initial data frame
DataFrame areaCode = spark.Read()
.Format("csv")
.Option("header", true)
.Option("escape", "\"")
.Schema(schema)
.Load(filePath);
areaCode.Show();
// simple projection - select
areaCode.Select(
Col("code"), Col("territory")
).Show();
// filtering - where
areaCode.Select(
Col("code").Alias("code in US"),
Col("territory")
).Where(Col("country") == "US")
.Show();
// sorting - ascending
areaCode.OrderBy(
Col("country"), // asc default
Col("code").Asc()
).Show();
// sorting - descending
areaCode.OrderBy(
Col("country").Desc(),
Col("code").Desc()
).Show();
spark.Stop();
}
}
}