Functions are used in most SQL queries. Apache Flink® features a large set of built-in functions that cover a wide spectrum of applications. Moreover, you can implement and register user-defined functions to execute custom business logic in your SQL queries.
There are three types of functions in SQL:
- Scalar Functions
A scalar function is called once per row with one or more parameters and returns a single value for every call. A scalar function can be used at every place in a query that also accepts a table attribute.
SQRTare examples of scalar functions. The following query shows how they can be used.
SELECT POWER(t.a, 2) -- compute the power of 2 of t.a FROM myTable t WHERE SQRT(t.b) < 16; -- compute the square root of t.b
- Aggregate Functions
An aggregate function is always called in the context of a grouped set of rows and returns for every group a single value. Hence, an aggregate function can only be used in combination with clauses that produce groups of rows, such as a
GROUP BYor an
MAXis a commonly used aggregate function. The following query shows how it can be used.
SELECT t.a, MAX(t.b) -- compute maximum value of t.b for each row group -- with the same t.a FROM myTable t GROUP BY t.a;
- Table Functions
A table function is called for every row of another table and returns for every call zero, one, or more rows with one or more columns. A table function is always used in a
LATERAL TABLEcause in the
FROMpart of a query.
In the following query, the table function
tableFuncis called for every value of
t.aand returns zero, one, or more rows with two attributes
SELECT t.a, t.b, s.y FROM myTable t, LATERAL TABLE(tableFunc(t.a)) AS s(x, y) -- compute for every value of t.a -- a table with zero, one, or more -- rows with two attributes (x, y) WHERE t.b = s.x
Apache Flink® provides a large set of built-in scalar and aggregation functions. Please refer to the official Flink documentation for a complete list of supported functions.
If you need to run computation logic that cannot be expressed using Apache Flink®’s built-in functions, you can implement, package, and register user-defined functions. Once a user-defined function is registered in the catalog, it can be used just like a built-in function.
Ververica Platform eases the management of user-defined functions via UDF Artifacts.
User-defined functions for Flink are implemented as Java or Scala classes. Depending on the type of function that you want to implement, you need to extend a different base class.
- Scalar function:
- Aggregate function:
- Table function:
The base classes are provided by the following Maven dependency
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.15.2</version> <scope>provided</scope> </dependency>
Ververica Platform’s supported Scala version is 2.12. Moreover, you should prefer Java’s boxed primitive types (
Double, …) that support
NULL values over Scala’s own types (
Double, …) which are mapped to primitive types and hence do not support
Please check the UDF Implementation Guide of the official Flink documentation for details on how to write user-defined functions for Flink.
The standard way of packaging and distributing Java and Scala classes are JAR files. For Ververica Platform, UDF classes need to be packaged as JAR files as well. You can package one or more UDF classes in a JAR file. All dependencies of the classes (except for dependencies provided by Flink) need to be included in the JAR as well.
Ververica Platform does not resolve dependency conflicts between UDFs which can for example be caused if two function classes (from different JAR files) use different versions of the same library. A UDF JAR is only added to the classpath of a query if it contains the implementation of a function that is used by the query. Therefore, you need to ensure that the implementations of all UDFs which are used by a query do not have conflicting dependencies.
Ververica Platform manages user-defined functions via UDF Artifact resources. A UDF Artifact represents a JAR file that contains one or more UDF implementation classes. You can create, update, and delete UDF Artifacts on the “Functions” section of the Ververica Platform UI or use the corresponding REST endpoints.
UDF Artifacts ease the management of user-defined functions. Ververica Platform ensures that the function definitions in the catalog are in sync with function implementations of the registered UDF Artifacts.
- When creating a UDF Artifact and providing a JAR file, Ververica Platform automatically analyzes the JAR, identifies all function implementations, and recommends to register them as functions in the catalog.
- When updating a UDF Artifact with a new JAR file to add, improve, or fix function implementations, Ververica Platform checks that the new JAR file still contains implementations for all functions that are registered in the catalog and offers to add unregistered functions to the catalog.
- When creating a new UDF Artifact or updating an existing UDF Artifact, Ververica Platform checks that the new JAR does not contain a function implementation class that is already provided by another UDF Artifact.
- When deleting a UDF Artifact, Ververica Platform ensures that all functions of a UDF Artifact are dropped from the catalog before the UDF Artifact is deleted.
Updating and deleting UDF Artifacts (as well as altering and dropping functions from the catalog) will only affect a running SQL Deployment when it is re-translated, which happens every time the Deployment transitions to RUNNING. This means, in particular, that catalog changes take effect when Autopilot applies a change to the Deployment.
Ververica Platform’s UI only supports uploading UDF JAR files of up to 50 MB. If you need to manage UDF JARs of more than 50 MB, you can manually upload them to UBS and use the UDF Artifact REST API to reference the uploaded JAR file.
The “Functions” section of Ververica Platform UI for managing UDF Artifacts automatically creates and drops functions. If you manage your UDF Artifacts with the REST endpoints, you can manage user-defined functions via DDL statements.
Altering and dropping functions from the catalog (as well as updating and deleting UDF Artifacts) will only affect a running SQL Deployment when it is re-translated, which happens every time the Deployment transitions to RUNNING. This means, in particular, that catalog changes take effect when Autopilot applies a change to the Deployment.
At this point, functions can only be registered in VVP Catalogs.
CREATE FUNCTION [IF NOT EXISTS] [catalog_name.][db_name.]function_name AS class_name [LANGUAGE JAVA|SCALA]
Registers a function in the catalog that is implemented by the
class_name Java or Scala class. The implementation class must be registered in Ververica Platform via a UDF Artifact. The create statement fails if the class does not exist.
The statement fails if a function with the same name already exists in the catalog, unless the
IF NOT EXISTS clause was provided. It is possible to reference the same implementation class by multiple functions.
ALTER FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name AS class_name [LANGUAGE JAVA|SCALA]
Updates the implementation of a function to a new class
class_name. The new implementation class must be registered in Ververica Platform via a UDF Artifact. The alter statement fails if the new class cannot be found.
The statement fails as well if the function does not exist in the catalog, unless the
IF EXISTS clause was provided.
DROP FUNCTION [IF EXISTS] [catalog_name.][db_name.]function_name
Drops the function of the given name.
The statement fails if the function does not exist in the catalog, unless the
IF EXISTS clause was provided.