Skip to main content
Version: 2.12

Getting Started - Flink SQL

note

Please see Getting Started - Installation to install Ververica Platform before going through this guide.

note

If you have installed the optional logging or metrics integrations, please check the logging setup and the metrics setup to ensure you have the correct services port-forwarded to access them on your local machine.

Ververica Platform is an end-to-end platform for the development and operations of Apache Flink® SQL. You can interactively develop SQL scripts, manage catalogs, connectors and user-defined functions and deploy the resulting long-running queries as Ververica Platform Deployments.

Writing SQL Queries

Begin by navigating to the SQL editor in the web user interface of the platform by clicking SQL in the left sidebar.

Flink SQL queries operate on tables from which records are read from and written into similar to any database. In contrast to relational databases, though, tables are always backed by an external system (e.g. Apache Kafka® or Elasticsearch®) and not stored in Apache Flink® itself. As a first step you will create a table that you can afterward query by executing the CREATE TABLE statement below. The connector property determines the system that this table is backed by. datagen is a special type of connector that is not backed by an external system but generates random data on demand. Without any external dependencies, datagen tables are very convenient for testing and prototyping. Instead of copying the complete statement, you can also click the + icon next to Tables in the schema explorer and choose datagen to insert a template for a datagen table into the editor.

  CREATE TABLE orders (
id BIGINT,
ordertime TIMESTAMP(3),
totalprice DECIMAL(6,2),
customerid BIGINT,
WATERMARK FOR ordertime AS ordertime
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)

The table definition is now stored in the default database in the vvp catalog, and we can query it by running the following statement.

  SELECT * FROM orders;

Executing a SELECT statement requires a Session Cluster that is assigned to execute all queries from the editor. If no cluster is yet assigned, you will be asked to create a session cluster now.

Image

If you have not used this namespace before, you'll need to create a Deployment Target when creating the session cluster. A Deployment Target links a Deployment to a Kubernetes namespace, into which your Flink applications will be deployed. In this case you can use the vvp-jobs namespace.

Image

After creating the session cluster, assuming the default configurations are good, return to the editor and re-run your SELECT statement. You can now experiment with different queries. For example, you can count orders and sum up their total price in tumbling (non-overlapping) 10-second windows.

  SELECT
TUMBLE_ROWTIME(ordertime, INTERVAL '10' SECONDS) AS windowtime,
COUNT(*) AS numorders,
SUM(totalprice) AS sumtotalprice
FROM orders
GROUP BY
TUMBLE(ordertime, INTERVAL '10' SECONDS)

Image

Deploying SQL Queries

So far, you have written the results of your long-running queries "to the screen". This is great during development, but a production query needs to write its results to a table, that can be consumed by downstream applications: either by another Flink SQL query or via an application that is accessing the system that stores the table directly (e.g. a dashboard that queries a table stored in an Elasticsearch® index).

For the output table, we propose using a temporary table of type print. A temporary table is not stored in any catalog and can only be used within the script where it is explicitly defined. A table of type print prints every row to STDOUT (of the Taskmanagers). Again, instead of copying the CREATE TABLE statement you can use the + icon in the schema explorer. You will notice that the editor automatically fills in the correct schema for the query that is currently selected in the editor.

  CREATE TEMPORARY TABLE order_stats (
windowtime TIMESTAMP(3),
numorders BIGINT,
sumtotalprice DECIMAL(20,2)
) WITH (
'connector' = 'print'
);

INSERT INTO order_stats
SELECT
TUMBLE_ROWTIME(ordertime, INTERVAL '10' SECONDS) AS windowtime,
COUNT(*) AS numorders,
SUM(totalprice) AS sumtotalprice
FROM orders
GROUP BY
TUMBLE(ordertime, INTERVAL '10' SECONDS)

If you Run the script above, Ververica Platform will tell you that this requires a Deployment. A Deployment is one of Ververica Platform's core abstractions and represents a long-running Apache Flink® application. To learn more about Deployments checkout Getting Started - Flink Operations. In the Deployment creation form you can configure parallelism, resources, Flink configuration and much more. For now, you only need to give your Deployment a name like "Order Statistics" and hit Create SQL Deployment.

Image

It will take about a minute until your Deployment enters the RUNNING state. During this time Ververica Platform spins up a dedicated Apache Flink® cluster and deploys your query including its dependencies to it.

Image

If you installed the platform with logging, you can check out the results of the query in Kibana (Remember: a print tables prints to STDOUT of the TaskManager containers). To access Kibana, you need to forward a local port to the the Kibana Service.

kubectl --namespace vvp port-forward services/kibana 5601:5601

Otherwise, you can run

kubectl logs -n vvp-jobs <taskmanager-pod>

to see the results of query.

Cleaning Up

If you want to clean up all resources created during this getting started guide, first, cancel and subsequently delete the Deployment that you just started. Second, delete the table definition of the orders table from the catalog by running the following statement (or right-click, Drop on the table in the schema explorer).

  DROP TABLE orders;

Next Steps

In this guide you have only scratched the surface of what is possible with Flink SQL on Ververica Platform. Here are a few suggestions on how to continue:

  • Explore the different packaged connectors that are included in the platform, or bundle your own custom connectors.
  • Extend the functionality of Flink SQL by uploading your own user-defined functions.
  • Use multiple catalogs and databases to organize your tables and functions and integrate external catalogs like the Hive Metastore.
  • Find out more about the different kinds of SQL queries that are supported in Flink SQL.
  • Read about how you can use SQL Deployments to manage your long-running queries.
  • Checkout our Jupyter Extension to write and run Flink SQL queries directly from Jupyter instead of the web-based editor.