Getting Started - Flink SQL¶
Please see Getting Started - Installation to install Ververica Platform before going through this guide.
If you have installed the optional logging or metrics integrations, please check the logging setup and the metrics setup to ensure you have the correct services port-forwarded to access them on your local machine.
Ververica Platform is an end-to-end platform for the development and operations of Apache Flink® SQL. You can interactively develop SQL scripts, manage catalogs, connectors and user-defined functions and deploy the resulting long-running queries as Ververica Platform Deployments.
Writing SQL Queries¶
Begin by navigating to the SQL editor in the web user interface of the platform by clicking
SQL in the left sidebar.
Flink SQL queries operate on tables from which records are read from and written into similar to any database.
In contrast to relational databases, though, tables are always backed by an external system (e.g. Apache Kafka® or Elasticsearch) and not stored in Apache Flink® itself.
As a first step you will create a table that you can afterward query by executing the
CREATE TABLE statement below.
The connector property determines the system that this table is backed by.
datagen is a special type of connector that is not backed by an external system but generates random data on-demand.
Without any external dependencies, datagen tables are very convenient for testing and prototyping.
Instead of copying the complete statement, you can also click the
+ icon next to Tables in the schema explorer and choose datagen to insert a template for a datagen table into the editor.
CREATE TABLE orders ( id BIGINT, ordertime TIMESTAMP(3), totalprice DECIMAL(6,2), customerid BIGINT, WATERMARK FOR ordertime AS ordertime ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' )
The table definition is now stored in the default database in the vvp catalog, and we can query it be running the following statement.
SELECT * FROM orders;
SELECT statement requires a Session Cluster that is assigned to execute all queries from the editor.
If no cluster is yet assigned, you will be asked to create a session cluster now.
If you have not used this namespace before, you’ll need to create a Deployment Target when creating the session cluster.
A Deployment Target links a Deployment to a Kubernetes namespace, which your Flink applications will be deployed into.
In this case you can use the
After creating the session cluster - the default configurations are good - return to the editor and re-run your
You can now experiment with different queries.
For example, you can count orders and sum up their total price in tumbling (non-overlapping) 10 second windows.
SELECT TUMBLE_ROWTIME(ordertime, INTERVAL '10' SECONDS) AS windowtime, COUNT(*) AS numorders, SUM(totalprice) AS sumtotalprice FROM orders GROUP BY TUMBLE(ordertime, INTERVAL '10' SECONDS)
Deploying SQL Queries¶
So far, you have written the results of your long-running queries “to the screen”. This is great during development, but a production query needs to write its results to a table, that can be consumed by downstream applications: either by another Flink SQL query or via an application that is accessing the system that stores the table directly (e.g. a dashboard that queries a table stored in an Elasticsearch index).
For the output table, we propose using a temporary table of type print.
A temporary table is not stored in any catalog and can only be used within the script, which it is defined in.
A table of type print prints every row to STDOUT (of the Taskmanagers).
Again, instead of copying the
CREATE TABLE statement you can use the
+ icon in the schema explorer.
You will notice that the editor automatically fills in the correct schema for the query that is currently (selected) in the editor.
CREATE TEMPORARY TABLE order_stats ( windowtime TIMESTAMP(3), numorders BIGINT, sumtotalprice DECIMAL(20,2) ) WITH ( 'connector' = 'print' ); INSERT INTO order_stats SELECT TUMBLE_ROWTIME(ordertime, INTERVAL '10' SECONDS) AS windowtime, COUNT(*) AS numorders, SUM(totalprice) AS sumtotalprice FROM orders GROUP BY TUMBLE(ordertime, INTERVAL '10' SECONDS)
Run the script above, Ververica Platform will tell you that this requires a Deployment.
A Deployment is one of Ververica Platform’s core abstractions and represents a long-running Apache Flink® application.
To learn more about Deployments checkout Getting Started - Flink Operations.
In the Deployment creation form you can configure parallelism, resources, Flink configuration and much more.
For now, you only need to give your Deployment a name like “Order Statistics” and hit
Create SQL Deployment.
It will take about a minute until your Deployment enters the RUNNING state. During this time Ververica Platform spins up a dedicated Apache Flink® cluster and deploys your query including its dependencies to it.
If you installed the platform with logging, you can checkout the results of the query in Kibana (Remember: a print tables prints to STDOUT of the TaskManager containers). To access Kibana, you need to forward a local port to the the Kibana Service.
kubectl --namespace vvp port-forward services/kibana 5601:5601
Otherwise, you can run
kubectl logs -n vvp-jobs <taskmanager-pod>
to see the results of query.
If you want to clean up all resources created during this getting started guide, first, cancel and subsequently delete the Deployment that you just started. Second, delete the table definition of the orders table from the catalog by running the following statement. Or right-click, Drop on the table in the schema explorer.
DROP TABLE orders;
In this guide you have only scratched the surface of what is possible with Flink SQL on Ververica Platform. Here are a few suggestions on how to continue.
- Explore the different packaged connectors that are included in the platform, or bundle your own custom connectors.
- Extend the functionality of Flink SQL by uploading your own user-defined functions.
- Use multiple catalogs and databases to organize your tables and functions and integrate external catalogs like the Hive Metastore.
- Find out more about the different kinds of SQL queries that are supported in Flink SQL.
- Read about how you can use SQL Deployments to manage your long-running queries.
- Checkout our Jupyter Extension to write and run Flink SQL queries directly from Jupyter instead of the web-based editor.