Skip to main content

Flink ML

Flink Machine Learning (ML), a library of Apache Flink, offers standardized ML APIs along with a range of ML operators. These capabilities allow you to construct ML algorithms using these standard ML APIs and assemble ML pipelines for both training and inference deployments. This guide provides instructions on creating a Flink ML deployment utilizing Python APIs within the console of a fully managed Flink environment.

Before deploying, it's necessary to first create a Flink ML Python deployment on your local machine. For additional information on this, refer to the Flink ML: Apache Flink Machine Learning Library.

Below is an example demonstrating the utilization of the Flink ML k-means operator for model training and prediction. The k-means algorithm, being an iterative clustering algorithm, is extensively applied in various fields including customer segmentation, fraud detection, and automatic alert clustering. In this demonstration, a limited amount of in-memory data is used for model training and prediction. Depending on your specific business needs, you could use the Flink connectors to pull training data from external storage and store the model. Save the code to a local file named

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from import Vectors, DenseVectorTypeInfo
from import KMeans
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input data
input_data = t_env.from_data_stream(
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([0.0, 0.3]),),
(Vectors.dense([0.3, 3.0]),),
(Vectors.dense([9.0, 0.0]),),
(Vectors.dense([9.0, 0.6]),),
(Vectors.dense([9.6, 0.0]),),

# create a kmeans object and initialize its parameters
kmeans = KMeans().set_k(2).set_seed(1)

# train the kmeans model
model =

# use the kmeans model for predictions
output = model.transform(input_data)[0]

# extract and display the results
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
features = result[field_names.index(kmeans.get_features_col())]
cluster_id = result[field_names.index(kmeans.get_prediction_col())]
print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))
  1. In the Ververica console, upload the file. For detailed instructions, refer to the guide on how to create a Python deployment.
  2. Proceed to create a Flink ML Python deployment.

For further guidance, consult the instructions in a how-to create a Python deployment doc. The parameters that need to be configured are outlined in the following table.

Deployment TypeSelect PYTHON.
Deployment NameEnter the name of the Flink ML deployment.
Engine VersionSelect a VERA version.
Python UriThe Uniform Resource Identifier (URI) that is used to access the uploaded Python deployment file
  1. On the Deployments page, select the deployment you're interested in. Under the Configuration tab, navigate to the Logging section and enable the "Allow Log Archives" option. This lets you access logs even after the deployment has finished.
  2. Navigate to the Deployments page, locate the target deployment, and select "Start" from the Actions column.

Once you select "Start", the deployment status will switch to either "RUNNING" or "FINISHED", signifying that the deployment is executing as anticipated.

Step 3: Inspect the Deployment Outcome​

The demo code defines a finite data stream, meaning the Flink ML deployment will conclude after running for a specified duration. Once finished, navigate to the "Logs" tab on the Deployments page. There, use the cluster ID to search within the JobManager logs for the prediction results.


To delve deeper into Flink ML operators and the nuances of draft development, refer to Flink ML: Apache Flink Machine Learning Library.