Flink ML
On this page
Flink ML
Flink Machine Learning (ML), a library of Apache Flink, offers standardized ML APIs along with a range of ML operators. These capabilities allow you to construct ML algorithms using these standard ML APIs and assemble ML pipelines for both training and inference deployments. This guide provides instructions on creating a Flink ML deployment utilizing Python APIs within the console of a fully managed Flink environment.
Step 1: Develop a Flink ML deployment
Before deploying, it's necessary to first create a Flink ML Python deployment on your local machine. For additional information on this, refer to the Flink ML: Apache Flink Machine Learning Library.
Below is an example demonstrating the utilization of the Flink ML k-means operator for model training and prediction. The k-means algorithm, being an iterative clustering algorithm, is extensively applied in various fields including customer segmentation, fraud detection, and automatic alert clustering. In this demonstration, a limited amount of in-memory data is used for model training and prediction. Depending on your specific business needs, you could use the Flink connectors to pull training data from external storage and store the model. Save the code to a local file named kmeans_example.py.
1from pyflink.common import Types
2from pyflink.datastream import StreamExecutionEnvironment
3from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo
4from pyflink.ml.clustering.kmeans import KMeans
5from pyflink.table import StreamTableEnvironment
6
7# create a new StreamExecutionEnvironment
8env = StreamExecutionEnvironment.get_execution_environment()
9
10# create a StreamTableEnvironment
11t_env = StreamTableEnvironment.create(env)
12
13# generate input data
14input_data = t_env.from_data_stream(
15 env.from_collection([
16 (Vectors.dense([0.0, 0.0]),),
17 (Vectors.dense([0.0, 0.3]),),
18 (Vectors.dense([0.3, 3.0]),),
19 (Vectors.dense([9.0, 0.0]),),
20 (Vectors.dense([9.0, 0.6]),),
21 (Vectors.dense([9.6, 0.0]),),
22 ],
23 type_info=Types.ROW_NAMED(
24 ['features'],
25 [DenseVectorTypeInfo()])))
26
27# create a kmeans object and initialize its parameters
28kmeans = KMeans().set_k(2).set_seed(1)
29
30# train the kmeans model
31model = kmeans.fit(input_data)
32
33# use the kmeans model for predictions
34output = model.transform(input_data)[0]
35
36# extract and display the results
37field_names = output.get_schema().get_field_names()
38for result in t_env.to_data_stream(output).execute_and_collect():
39 features = result[field_names.index(kmeans.get_features_col())]
40 cluster_id = result[field_names.index(kmeans.get_prediction_col())]
41 print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))Step 2: Submit a Flink ML draft and start the deployment for the draft
- In the Ververica console, upload the
kmeans_example.pyfile. For detailed instructions, refer to the guide on how to create a Python deployment. - Proceed to create a Flink ML Python deployment.
For further guidance, consult the instructions in a how-to create a Python deployment doc. The parameters that need to be configured are outlined in the following table.
- On the Deployments page, select the deployment you're interested in. Under the Configuration tab, navigate to the Logging section and enable the "Allow Log Archives" option. This lets you access logs even after the deployment has finished.
- Navigate to the Deployments page, locate the target deployment, and select "Start" from the Actions column.
Once you select "Start", the deployment status will switch to either "RUNNING" or "FINISHED", signifying that the deployment is executing as anticipated.
Step 3: Inspect the Deployment Outcome
The demo code defines a finite data stream, meaning the Flink ML deployment will conclude after running for a specified duration. Once finished, navigate to the "Logs" tab on the Deployments page. There, use the cluster ID to search within the JobManager logs for the prediction results.
References
To delve deeper into Flink ML operators and the nuances of draft development, refer to Flink ML: Apache Flink Machine Learning Library.