[Spark By Example] SparkSession

The following sample code (by Python and C#) shows how to use SparkSession.

SparkSession

  • SparkSession is an entry point to your spark application since Spark version 2.
  • SparkSession wraps all different contexts (SparkContext, SQLContext, HiveContext, …) to a single entry point.
  • You can create as many SparkSessions as you want.
  • In the Spark shell, such as PySpark shell, the SparkSession object (named as “spark”) is created for you.
  • In the application, you need to create a SparkSession object.

Builder Class

The Builder class provides the following methods.

class Builder(object):

	# sets the master URL
	# local: run locally
	# local[4]: run locally with 4 cores
	# spark://master:7077: run on Spark standalone cluster
	def master(self, master):

	# Sets a name for the application
	def appName(self, name):

	# returns a valid global default SparkSession
	# if there is no existing one, create a new SparkSession
	def getOrCreate(self):

	# sets the config options
	def config(self, key=None, value=None): 

Spark Properties

https://spark.apache.org/docs/latest/configuration.html


SparkSession Methods or Properties

  • version: returns the Spark version
  • conf: returns configurations
  • sparkContext: returns a SparkContext
  • sqlContext: returns a SQLContext

  • createDataFrame(): creates a DataFrame from a collection and an RDD
  • read(): returns an instance of “DataFrameReader”, used to read records from various sources (parquet, csv, json …)
  • readStream(): returns an instance of “DataStreamReader” used to read streaming data
  • newSession(): creates a new SparkSession – static
  • stop(): stops the current SparkContext
  • udf(): creates a UDF (User Defined Function) to use it on DataFrame, DataSet, and SQL

Data File

  • N/A

Result


Python Application

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

if __name__ == "__main__":
  
  # create a session with the builder
  spark = (SparkSession
          .builder
          .master('local[2]')
          .appName("spark-session")
          .config('spark.sql.shuffle.partitions', 5) # 200 default
          .config('spark.executor.memory', '512m') # 1g default
          .getOrCreate())
 
  print('Spark Version: ' + spark.version)

  # get Spark Context options
  print('==================')
  for conf in spark.sparkContext.getConf().getAll():
      print(conf)

  # get Spark SQL options
  print('==================')
  (spark.sql('SET -v')
      .select(col('key'), col('value'))
  ).show()

  # update the config
  print('==================')
  print('partitions: ' + spark.conf.get('spark.sql.shuffle.partitions'))
  spark.conf.set('spark.sql.shuffle.partitions', 6)
  print('partitions: ' + spark.conf.get('spark.sql.shuffle.partitions'))

  spark.stop()

C# Application

using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
using System;

namespace MySpark.Examples.Basics
{
    internal class SparkSessionCheck
    {
        public static void Run()
        {
            SparkSession spark =
                SparkSession
                .Builder()
                .Master("local[2]")
                .AppName("spark-session")
                .Config("spark.sql.shuffle.partitions", 5) // 200 default
                .Config("spark.executor.memory", "512m") // 1g default
                .GetOrCreate();

            Console.WriteLine("Spark Version: " + spark.Version());

            // get Spark Context options
            Console.WriteLine("==============");
            foreach(var conf in spark.SparkContext.GetConf().GetAll())
            {
                Console.WriteLine(conf);
            }

            // get Spark SQL options
            Console.WriteLine("==============");
            spark.Sql("SET -v")
                .Select(Col("key"), Col("value"))
                .Show();

            // update the config
            Console.WriteLine("==============");
            Console.WriteLine("Partitions: " +
                spark.Conf().Get("spark.sql.shuffle.partitions"));

            spark.Conf().Set("spark.sql.shuffle.partitions", 6);

            Console.WriteLine("Partitions: " +
                spark.Conf().Get("spark.sql.shuffle.partitions"));

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s