[Spark By Example] Spark SQL – UDFs

In Spark SQL, you can define your custom functions and use them in the SQL statement. The following example shows how to create a very simple UDF, register it, and use it in the SQL.


User Defined Functions (UDFs)

  • You can use the following function to register an UDF.
UDFRegistration.register(name, function, returnType) 

Result


Python Application

  • UDFs in the pyspark are slow due to the serialization between JVM and Python. The solution is to use the pandas UDF.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas  # pip install pandas, pyarrow

# UDF
def sumToN(n):
  return (n * (n+1)) / 2

if __name__ == "__main__":

  # For pyspark and Python 3, you need this config to use UDFs
  spark = (SparkSession
        .builder
        .appName("udf-test")
        .getOrCreate())
  
  # rather than registering UDF
  # spark.udf.register("sumToN", sumToN, IntegerType())
  # create a panda udf
  sumToN_udf = pandas_udf(sumToN, returnType= IntegerType())

  # Create a dataframe
  numbers_df = spark.range(1,11)
  numbers_df.show()    

  # query with the UDF
  numbers_df.select("id", sumToN_udf(col("id"))).show()

  spark.stop()

C# Application

using Microsoft.Spark.Sql;

namespace MySpark.Examples
{
    internal class SumUDF
    {
        public static int SumToN(int n)
        {
            return (n * (n + 1)) / 2;
        }

        public static void Run()
        {
            SparkSession spark =
                SparkSession
                    .Builder()
                    .AppName("udf-test")
                    .GetOrCreate();

            // register UDF
            Func<int, int> sumUDF = SumUDF.SumToN;
            spark.Udf().Register<int, int>("sumToN", sumUDF);

            // Create a temp view
            spark.Range(1, 11).CreateOrReplaceTempView("udf_sum");
            spark.Sql("SELECT * FROM udf_sum")
                .Show();

            // query with the UDF
            spark.Sql("SELECT id AS n, sumToN(id) AS sum_1_to_num FROM udf_sum")
                .Show();

            spark.Stop();
        }
    }
}

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s