Named Parameters in User-Defined Functions (UDFs)
When calling User-Defined Functions (UDFs) in Flink SQL, you can use named parameters instead of relying solely on the order of arguments. This feature can improve development efficiency and reduce maintenance costs, especially for functions with many parameters or optional ones.
Overview
Normally, you call a function by passing values in the exact order the parameters are defined: MY_FUNCTION(value1, value2, value3). If the function has many parameters, or some are optional, keeping track of the correct order can be difficult.
Named parameters solve this by allowing you to explicitly link a value to its corresponding parameter name using the => syntax:
MY_FUNCTION(parameterName1 => value1, parameterName3 => value3)
The key benefits of using UDFs include:
-
Readability: Your SQL code becomes much clearer, as you can see exactly which value corresponds to which parameter.
-
Flexibility: You can easily skip optional parameters without needing placeholders, and you can provide parameters in any order.
-
Maintainability: Calls are less likely to break if the function's parameter order changes in the future.
Using Named Parameters in a Java UDF
To call a Java UDF with named parameters in Flink SQL, first define parameter hints in your Java function, and then use the parameterName => value syntax when calling the function in SQL.
1. Define Parameter Hints in Java UDF.
In your Java ScalarFunction (or other UDF type), use the @ArgumentHint annotation for each parameter in the eval method. This annotation defines:
name: The name you will use in the SQL call.isOptional: Set totrueif the parameter is optional.type: Use@DataTypeHintto specify the expected Flink SQL data type.
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
// Implement a UDSF in which the second and third parameters are optional
public class MyFuncWithNamedArgs extends ScalarFunction {
private static final long serialVersionUID = 1L;
// Define parameter names and optionality using @ArgumentHint
public String eval(
@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {
if (i2 != null) {
return "i2#" + i2;
}
if (l3 != null) {
return "l3#" + l3;
}
return "default#" + f1;
}
}
2. Package and Upload the UDF.
To use your UDF, you must first package your Java project into a JAR file and upload it to your Ververica workspace as an Artifact. This makes the JAR available as a dependency for your Flink jobs.
For detailed steps on this process, see the Manage User-Defined Functions topic.
3. Register and Call the UDF in SQL.
After uploading the Artifact, you can register the function within your SQL draft using the CREATE TEMPORARY FUNCTION statement. After it is registered, you can call it using the parameterName => value syntax.
-- 1. Register the Java UDF, referencing the uploaded Artifact
CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';
-- 2. Example source table
CREATE TEMPORARY TABLE s1 (
a INT,
b BIGINT,
c VARCHAR,
d VARCHAR, -- Assuming 'd' contains values convertible to BIGINT
PRIMARY KEY(a) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
-- 3. Example sink table
CREATE TEMPORARY TABLE sink (
a INT,
res1 VARCHAR,
res2 VARCHAR,
res3 VARCHAR
) WITH (
'connector' = 'print'
);
-- 4. Call the UDF using named parameters
INSERT INTO sink
SELECT
a,
-- Specify only the first required parameter
MyNamedUdf(f1 => c) AS res1,
-- Specify the first required and second optional parameter
MyNamedUdf(f1 => c, f2 => a) AS res2,
-- Specify the first required and third optional parameter (order doesn't matter)
MyNamedUdf(f3 => CAST(d AS BIGINT), f1 => c) AS res3
FROM s1;