In Spark SQL, you can define your custom functions and use them in the SQL statement. The following example shows how to create a very simple UDF, register it, and use it in the SQL.
User Defined Functions (UDFs)
- You can use the following function to register an UDF.
UDFRegistration.register(name, function, returnType)
Result
Python Application
- UDFs in the pyspark are slow due to the serialization between JVM and Python. The solution is to use the pandas UDF.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas # pip install pandas, pyarrow
# UDF
def sumToN(n):
return (n * (n+1)) / 2
if __name__ == "__main__":
# For pyspark and Python 3, you need this config to use UDFs
spark = (SparkSession
.builder
.appName("udf-test")
.getOrCreate())
# rather than registering UDF
# spark.udf.register("sumToN", sumToN, IntegerType())
# create a panda udf
sumToN_udf = pandas_udf(sumToN, returnType= IntegerType())
# Create a dataframe
numbers_df = spark.range(1,11)
numbers_df.show()
# query with the UDF
numbers_df.select("id", sumToN_udf(col("id"))).show()
spark.stop()
C# Application
using Microsoft.Spark.Sql;
namespace MySpark.Examples
{
internal class SumUDF
{
public static int SumToN(int n)
{
return (n * (n + 1)) / 2;
}
public static void Run()
{
SparkSession spark =
SparkSession
.Builder()
.AppName("udf-test")
.GetOrCreate();
// register UDF
Func<int, int> sumUDF = SumUDF.SumToN;
spark.Udf().Register<int, int>("sumToN", sumUDF);
// Create a temp view
spark.Range(1, 11).CreateOrReplaceTempView("udf_sum");
spark.Sql("SELECT * FROM udf_sum")
.Show();
// query with the UDF
spark.Sql("SELECT id AS n, sumToN(id) AS sum_1_to_num FROM udf_sum")
.Show();
spark.Stop();
}
}
}