UDFs

User-defined function is a widely used concept in SQL world. It is a user-programmable routine to extend SQL.

There are two types of user defined functions in SQL:

  • UDF: User-defined function that acts on one row and return a single value.

  • UDAF: User-defined aggregate function that acts on multiple rows at once and return a single aggregated value as a result.

Easy SQL provides support to define UDF/UDAF, register them, and then use them in ETL.

Since there are multiple SQL engine backends in Easy SQl, we need to implement UDF/UDAF according to the specific backend being selected.

This document will guide you through to create and use UDF/UDAFs.

For spark backend

We can create spark UDF/UDAF using scala or python.

It’s easy to create a spark UDF. An example is as below:

val random = udf(() => Math.random())
spark.udf.register("random", random.asNondeterministic())

For details, please refer to spark UDF / UDAF introduction page.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

For details, please refer to pyspark UDF introduction page. There is no support to create UDAF from pyspark yet. We can use collect_list with pyspark UDF to simulate UDAF.

Easy SQL provides support for registering UDF / UDAF in both scala and python.

Register and use scala UDF/UDAF

To define and register a scala UDF/UDAF, we need to create a scala file with a funciton named initUdf. Below is an example:

// udfs.scala
package your.company

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._

object udfs {
    def initUdfs(spark: SparkSession) {
        val string_set = udf((s: Seq[String]) => s.filter(_ != null).toSet.toArray)
        spark.udf.register("string_set", string_set)
    }
}

The next thing is to compile this file and package it into a jar file. An example command line could be:

# SCALA_BIN is a path to the bin folder of scala sdk directory.
# SCALA_CP is the java class path for compiling. Usually are some jars.
${SCALA_BIN}/scalac -nobootcp -cp ${SCALA_CP} -d classes your/company/*.scala
cd classes && jar -cvf ../udf.jar .

After the command above, you’ll get a jar file named udf.jar.

We can register the above UDFs in ETL configuration block. Below is an example:

-- backend: spark
-- config: spark.jars=udf.jar
-- config: easy_sql.scala_udf_initializer=your.company.udfs

-- target=log.test_udf
select string_set(array("a", "a", "b")) as stringset

Save the file above to a file named etl_with_udf.sql, and then run command below to execute the ETL.

bash -c "$(python3 -m easy_sql.data_process -f etl_with_udf.sql -p)"

You’ll get a result like:

===================== REPORT FOR step-1 ==================
config: StepConfig(target=log.test_udf, condition=None, line_no=5)
sql: select string_set(array("a", "a", "b")) as stringset
status: SUCCEEDED
start time: xxxx-xx-xx xx:xx:xx, end time: xxxx-xx-xx xx:xx:xx, execution time: 2.194115s - 65.29%
messages:
stringset=['a', 'b']

Register and use python UDF

To register a python UDF is much easier. But if you choose to implement UDF in python, there might be a performance issue since spark application runs in Java and must talk to a python process when calling UDF.

First, we need to define a UDF in a python file:

from typing import List

__all__ = ['string_set']

def string_set(string_arr: List[str]) -> List[str]:
    return list(set(string_arr))

Let’s assume the file name is udf.py.

We can register the above UDFs in ETL configuration block. Below is an example:

-- backend: spark
-- config: easy_sql.udf_file_path=udf.py

-- target=log.test_udf
select string_set(array("a", "a", "b")) as stringset

Save the file above to a file named etl_with_udf.sql, and then run command below to execute the ETL.

bash -c "$(python3 -m easy_sql.data_process -f etl_with_udf.sql -p)"

You’ll get a result like:

===================== REPORT FOR step-1 ==================
config: StepConfig(target=log.test_udf, condition=None, line_no=5)
sql: select string_set(array("a", "a", "b")) as stringset
status: SUCCEEDED
start time: xxxx-xx-xx xx:xx:xx, end time: xxxx-xx-xx xx:xx:xx, execution time: 2.194115s - 65.29%
messages:
stringset=['a', 'b']

For other backends

For other backends, we can create UDF/UDAF with SQL.

Easy SQL provides a way to define some SQL to create UDF in a python file.

An example for clickhouse backend is as follows:

def translate():
    return "CREATE FUNCTION IF NOT EXISTS translate AS (input, from, to) -> replaceAll(input, from, to)"

Let’s assume the file name is udf.py.

We can register the above UDFs in ETL configuration block. Below is an example:

-- backend: clickhouse
-- config: easy_sql.udf_file_path=udf.py

-- target=log.test_udf
select translate('abcad', 'a', '')) as translated_str

Save the file above to a file named etl_with_udf.sql, and then run command below to execute the ETL.

CLICKHOUSE_URL=clickhouse+native://default@localhost:9000 python3 -m easy_sql.data_process -f etl_with_udf.sql

You’ll get a result like:

===================== REPORT FOR step-1 ==================
config: StepConfig(target=log.test_udf, condition=None, line_no=4)
sql: select translate('abcad', 'a', '') as translated_str
status: SUCCEEDED
start time: xxxx-xx-xx xx:xx:xx, end time: xxxx-xx-xx xx:xx:xx, execution time: 0.048148s - 70.54%
messages:
(translated_str='bcd')

Notes:

  • We need to follow the syntax provided by the backend to create UDF / UDAF.

    • For PostgreSql, we can refer to the doc here.

    • For Clickhouse, we can refer to the doc here.

    • For BigQuery, we can refer to the doc here.

Register and use UDF programmatically

We can register and use UDF in a programmatic manner as well. Below is an example of some sample code:

from pyspark.sql import SparkSession

from easy_sql.sql_processor import SqlProcessor
from easy_sql.sql_processor.backend import SparkBackend

if __name__ == '__main__':
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    backend = SparkBackend(spark)
    sql = '''
-- target=log.test_udf
select string_set(array("a", "a", "b")) as stringset
    '''
    sql_processor = SqlProcessor(backend, sql, scala_udf_initializer='your.company.udfs')
    sql_processor.register_udfs_from_pyfile('udf.py')
    sql_processor.run()

For a detailed sample implementation and other backends, please refer to the implementation of data_process module.

UDF reference

There are several UDFs implemented in Easy SQL. Below are a list of them for referencing.

Spark UDFs

PostgreSQL UDFs

Clickhouse UDFs