Skip to main content
Version: 2.13

PyFlink development with Apache Flink®

PyFlink is a Python API for Apache Flink®. It provides Python bindings for a subset of the Flink API, so you can write Python code that uses Flink functions and that can be executed on a Flink cluster. Your Python code executes as a PyFlink job, and runs just like other Flink jobs.

Prerequisites

On Ververica Platform 2.10, follow the Getting Started guide to deploy a platform instance and set up the example playground. This will configure a local working environment that includes Python and PyFlink where you can test the examples in this guide.

note

If you are running an older version of Ververica Platform (2.8 or 2.9) you may need to build a custom Docker image to include Python and PyFlink, depending on your platform/Flink combination. To do so, follow the steps in Building a Platform Image, below.

If you are just getting started with Ververica Platform and its core concepts, then this short video is a good overview of basic deployment and job management.

Concepts

Using PyFlink, you can write Python code that interoperates with core Flink functionality.

Jobs

Jobs are the high level executable resource abstraction in Flink. At a lower level, pipelines execute sequences of highly parallelised tasks, for example database queries, map functions, reduce functions and so on, across source data streams. Jobs bundle tasks with input streams and manage task execution and statefulness, via checkpoints and snapshots.

PyFlink jobs are Flink jobs you create from Python code using PyFlink.

UDFs are User Defined Functions. Most SQL queries you write use functions to perform logic inside the query, and Flink includes a rich set of built-in functions. When you need to extend these to implement custom logic you define and register UDFs.

Using PyFlink, you can write your custom functions in Python.

Table API

The Flink Table API provides a unified SQL API for Java applications that abstracts the differences between different kinds of data source, for example stream or batch sources, and creates a common semantics.

PyFlink allows you to call Python UDFs from Java Table API Flink applications.

Getting Started

In this tutorial we take you through the steps to:

  • Write and register a custom Python UDF.
  • Configure and run the custom UDF in a PyFlink Job from Ververica Platform.
  • Call your Python UDF from a Java Table API application.

In Ververica Platform v2.10 PyFlink and Python are already included. For earlier versions of the platform, or for any version if you are using a Flink version earlier than 1.15.3 (e.g. Flink 1.13/1.14 with platform versions 2.8/2.9), then follow the steps to build and push a custom platform image. With PyFlink and Python included in your image you can follow this tutorial.

Create a Python UDF

In this example we will:

  • Write a custom Python UDF.
  • Use the UDF in a Python program.

User-defined functions (UDFs) allow you to add your own custom logic to a Flink application. PyFlink provides APIs and bindings that allow you to write UDFs in Python and access Python libraries to write queries and process data.

Here we write a simple Python function that takes a string as input and returns it in upper case form, and declare it as a UDF:

from pyflink.table.udf import udf
from pyflink.table import DataTypes

@udf(result_type=DataTypes.STRING())
def py_upper( str ):
"This capitalizes the whole string"
return str.upper()

The annotation @udf(result_type=DataTypes.STRING()) is the glue that makes this function available as a Python UDF.

note

Annotation is the simplest way to define a Python UDF, but you can also use the py_upper = udf(...) syntax, for example if you want to define a lambda function on the fly. See the official Flink documentation for details.

Let's see how to use this UDF in an example Python program.

The example program pyflink_table_example.py, available on GitHub defines some simple table data and a SQL query over it. It's a simple but complete Python program that uses the sys and logging standard Python libraries , a PyFlink library pyflink.table, and that imports and invokes our Python UDF to change the results of a table query to upper case (we just show a fragment of the code here):

t_env.create_temporary_function("PY_UPPER", my_udfs.py_upper)
...
t_env.sql_query("SELECT id, PY_UPPER(city) city FROM %s" % my_table) \
.execute() \
.print()

This is a simple example, but you can use the full power of Python and Python libraries to create complex functions and call them from a complete analytic or data processing application written in Python. The Python code interoperates with Flink.

In this example we will:

  • Upload our Python UDF and the program that calls it to Universal Blob Storage.
  • Create a deployment that uses Ververica Platform's native PyFlink support to run our Python program in a PyFlink Job.
  • Run the job and check the results.

First we save our Python UDF in a file my_udfs.py. Also, download the file pyflink_table_example.py from GitHub.

To run the Python code, we need to upload both files to Universal Blob Storage in a platform instance.

In your platform instance, select Deployments | Artifacts from the Deployments menu, and drag the files to upload them:

Artifacts Menu

Next, create your deployment. There are minor configuration differences for Ververica Platform versions before 2.10, so use the appropriate example.

note

A deployment target should automatically load from the deployment configuration drop-down menu in Deployment Target. If you don't see a target, go to Administration | Deployment Targets. Click Add Deployment Target, create a target deployment name and a Kubernetes namespace, for example vvp-jobs.

Create your deployment with the following values:

We specify the Python files we uploaded as Additional Dependencies.

PropertyValue
Python URIpyflink_table_example.py
Additional Python LibrariesSelect my_udfs.py in the dropdown menu
Flink registry<your own container registry>
Flink repositoryflink

When the Flink job starts, in this case our PyFlink job, Ververica Platform downloads the artifacts from its Universal Blob Storage to the user directory specified in the Entrypoint main args, here /flink/usrlib.

To run the PyFlink job, start this deployment. Because the job has a bounded data input, the deployment will end in the FINISHED state and the Flink pods will be torn down, but you should be able to find the following result in the logs of the jobmanager pod. The values in the city column have been uppercased as expected:

+----+----------------------+--------------------------------+
| op | id | city |
+----+----------------------+--------------------------------+
| +I | 1 | BERLIN |
| +I | 2 | SHANGHAI |
| +I | 3 | NEW YORK |
| +I | 4 | HANGZHOU |
+----+----------------------+--------------------------------+
note

For more about the deployment lifecycle and states, see Deployment Lifecycle.

Call Python UDFs in a Java Table API Job

In this example we:

  • Call Python UDFs from a Java based Table API program.
  • In our Java code, set the path to the Python file containing the UDF, and map the Python function so we can call it from Java.
  • Add the Python file as a deployment dependency.

On Ververica Platform versions earlier than 2.10, we needed an extra step:

  • Include Python Driver flink-python_2.12 as a dependency and package it into the job JAR file.

As well as calling Python UDFs from Python programs, you can also call them from Table API programs written in Java. This is useful when several teams collaborate on Python UDFs. For example, one team of domain experts may develop Python UDFs and provide them to a team of Java developers for use in their Table API jobs.

We will use the same Python example my_udfs.py, but this time with a Java job JavaTableExample.java which you can download here.

To make this work we need to do a few extra things.

First, we need to tell JavaTableExample.java to load the example UDF, which we do by calling .setString to set the value of PyFlink python.files configuration property. Remember, /flink/usrlib is the directory to which Ververica Platform downloads artifacts when it starts a Flink job:

tableEnv.getConfig().getConfiguration()
.setString("python.files", "/flink/usrlib/my_udfs.py");

We create a temporary mapping for the Python function by calling executeSql() to map the Python function to a Flink SQL system variable PY_UPPER:

tableEnv.executeSql("create temporary system function PY_UPPER as
'my_udfs.py_upper' language python");

Now we can use the function PY_UPPER in our SQL SELECT statement:

tableEnv.sqlQuery(
"SELECT user, PY_UPPER(product), amount FROM " + table)

As before, to run the job we need to upload my_udfs.py to our platform instance Universal Blob Storage. You can skip this if you already did so.

Finally, package the job as a JAR file:

To create the example JAR file, download pom.xml, available here.

Remove the PythonDriver dependency block, which is not needed:

<!-- Flink's PythonDriver -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-python_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>

Run mvn clean package. This creates JavaTableExample-1.0-SNAPSHOT.jar under the directory target.

Now create a deployment:

With the following difference, this is like a standard JAR deployment:

  • Add my_udfs.py under Additional Dependencies in the deployment configuration.

Start the deployment. The Python UDF will be called during the job run. As with the pure Python example, because the job has a bounded data input, the deployment will end in the FINISHED state with the Flink pods torn down. Again, check the jobmanager pod logs. The values in the product column have been uppercased as expected:

+----+----------------------+--------------------------------+-------------+
| op | user | product | amount |
+----+----------------------+--------------------------------+-------------+
| +I | 1 | BEER | 3 |
| +I | 2 | APPLE | 4 |
+----+----------------------+--------------------------------+-------------+

Building a Platform Image

If you are using older versions of Ververica Platform (earlier than 2.10), then depending on the combination of platform version and Flink version, PyFlink and Python may not be included in the standard platform images.

  • PyFlink and a Python interpreter are included in the image.
  • You do not need to build a custom image: you can just deploy a standard platform image from the Ververica registry.
  • For both 2.8 and 2.9 Ververica Platform versions with Flink 1.13 or 1.14, PyFlink and a Python interpreter are not included in the standard image.
  • You need to create a custom Docker image to include the PyFlink and Python packages, and then build and deploy the image as shown below.

Build and push a custom Ververica Platform image

In your terminal, create a new directory and cd into it. This is where you will build the new image.

Build the image

Create a new file named Dockerfile with the following content:

FROM registry.ververica.com/v2.8/flink:1.15.2-stream4-scala_2.12-java11

USER root:root

RUN apt update \
&& apt -y install python3.8 python3-pip \
&& python3.8 -m pip install apache-flink==1.15.2 \
&& apt clean \
&& ln -s /usr/bin/python3.8 /usr/bin/python \
&& rm -rf /var/lib/apt/lists/*

USER flink:flink

In this example, the base image we use for the Dockerfile is the Flink 1.15.2 image for Ververica Platform v2.8, provided by the Ververica public registry.

Now build the Docker image:

docker build -t <your container registry>/flink:1.15.2-stream4-scala_2.12-java11-python3.8-pyflink

where the trailing . is the directory where Docker will look for a file named Dockerfile (or another name if you override with the -f option).

The image will be built in the context of this directory i.e. any host or relative paths in the Dockerfile will start from this directory. The image will be saved to the local Docker repository.

Push the image

Push the image to the container registry named by the tag -t option you built with. Make sure the registry you choose is reachable by Ververica Platform:

docker push <your container registry>/flink:1.15.2-stream4-scala_2.12-java11-python3.8-pyflink