The following sample code (by Python and C#) shows how to use SparkSession.
SparkSession
- SparkSession is an entry point to your spark application since Spark version 2.
- SparkSession wraps all different contexts (SparkContext, SQLContext, HiveContext, …) to a single entry point.
- You can create as many SparkSessions as you want.
- In the Spark shell, such as PySpark shell, the SparkSession object (named as “spark”) is created for you.
- In the application, you need to create a SparkSession object.
Builder Class
The Builder class provides the following methods.
class Builder(object):
# sets the master URL
# local: run locally
# local[4]: run locally with 4 cores
# spark://master:7077: run on Spark standalone cluster
def master(self, master):
# Sets a name for the application
def appName(self, name):
# returns a valid global default SparkSession
# if there is no existing one, create a new SparkSession
def getOrCreate(self):
# sets the config options
def config(self, key=None, value=None):
Spark Properties
https://spark.apache.org/docs/latest/configuration.html
SparkSession Methods or Properties
- version: returns the Spark version
- conf: returns configurations
- sparkContext: returns a SparkContext
- sqlContext: returns a SQLContext
- createDataFrame(): creates a DataFrame from a collection and an RDD
- read(): returns an instance of “DataFrameReader”, used to read records from various sources (parquet, csv, json …)
- readStream(): returns an instance of “DataStreamReader” used to read streaming data
- newSession(): creates a new SparkSession – static
- stop(): stops the current SparkContext
- udf(): creates a UDF (User Defined Function) to use it on DataFrame, DataSet, and SQL
Data File
- N/A
Result
Python Application
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
if __name__ == "__main__":
# create a session with the builder
spark = (SparkSession
.builder
.master('local[2]')
.appName("spark-session")
.config('spark.sql.shuffle.partitions', 5) # 200 default
.config('spark.executor.memory', '512m') # 1g default
.getOrCreate())
print('Spark Version: ' + spark.version)
# get Spark Context options
print('==================')
for conf in spark.sparkContext.getConf().getAll():
print(conf)
# get Spark SQL options
print('==================')
(spark.sql('SET -v')
.select(col('key'), col('value'))
).show()
# update the config
print('==================')
print('partitions: ' + spark.conf.get('spark.sql.shuffle.partitions'))
spark.conf.set('spark.sql.shuffle.partitions', 6)
print('partitions: ' + spark.conf.get('spark.sql.shuffle.partitions'))
spark.stop()
C# Application
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
using System;
namespace MySpark.Examples.Basics
{
internal class SparkSessionCheck
{
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.Master("local[2]")
.AppName("spark-session")
.Config("spark.sql.shuffle.partitions", 5) // 200 default
.Config("spark.executor.memory", "512m") // 1g default
.GetOrCreate();
Console.WriteLine("Spark Version: " + spark.Version());
// get Spark Context options
Console.WriteLine("==============");
foreach(var conf in spark.SparkContext.GetConf().GetAll())
{
Console.WriteLine(conf);
}
// get Spark SQL options
Console.WriteLine("==============");
spark.Sql("SET -v")
.Select(Col("key"), Col("value"))
.Show();
// update the config
Console.WriteLine("==============");
Console.WriteLine("Partitions: " +
spark.Conf().Get("spark.sql.shuffle.partitions"));
spark.Conf().Set("spark.sql.shuffle.partitions", 6);
Console.WriteLine("Partitions: " +
spark.Conf().Get("spark.sql.shuffle.partitions"));
spark.Stop();
}
}
}