Logo
2.9
  • Getting Started
    • Getting Started - Installation
    • Getting Started - Flink SQL
    • Getting Started - Flink Operations
  • Installation & Upgrades
  • Administration & Configuration
  • User Guide
  • Resources
  • Release Notes
Ververica Platform
  • Docs »
  • Getting Started »
  • Getting Started - Flink SQL

Getting Started - Flink SQL¶

See also

Please see Getting Started - Installation to install Ververica Platform before going through this guide.

Note

If you have installed the optional logging or metrics integrations, please check the logging setup and the metrics setup to ensure you have the correct services port-forwarded to access them on your local machine.

Ververica Platform is an end-to-end platform for the development and operations of Apache Flink® SQL. You can interactively develop SQL scripts, manage catalogs, connectors and user-defined functions and deploy the resulting long-running queries as Ververica Platform Deployments.

Writing SQL Queries¶

Begin by navigating to the SQL editor in the web user interface of the platform by clicking SQL in the left sidebar.

Flink SQL queries operate on tables from which records are read from and written into similar to any database. In contrast to relational databases, though, tables are always backed by an external system (e.g. Apache Kafka® or Elasticsearch®) and not stored in Apache Flink® itself. As a first step you will create a table that you can afterward query by executing the CREATE TABLE statement below. The connector property determines the system that this table is backed by. datagen is a special type of connector that is not backed by an external system but generates random data on-demand. Without any external dependencies, datagen tables are very convenient for testing and prototyping. Instead of copying the complete statement, you can also click the + icon next to Tables in the schema explorer and choose datagen to insert a template for a datagen table into the editor.

CREATE TABLE orders (
  id BIGINT,
  ordertime TIMESTAMP(3),
  totalprice DECIMAL(6,2),
  customerid BIGINT,
  WATERMARK FOR ordertime AS ordertime
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
)

The table definition is now stored in the default database in the vvp catalog, and we can query it be running the following statement.

SELECT * FROM orders;

Executing a SELECT statement requires a Session Cluster that is assigned to execute all queries from the editor. If no cluster is yet assigned, you will be asked to create a session cluster now.

No Preview Session Cluster

If you have not used this namespace before, you’ll need to create a Deployment Target when creating the session cluster. A Deployment Target links a Deployment to a Kubernetes namespace, which your Flink applications will be deployed into. In this case you can use the vvp-jobs namespace.

Creating Deployment Target

After creating the session cluster - the default configurations are good - return to the editor and re-run your SELECT statement. You can now experiment with different queries. For example, you can count orders and sum up their total price in tumbling (non-overlapping) 10 second windows.

SELECT
  TUMBLE_ROWTIME(ordertime, INTERVAL '10' SECONDS) AS windowtime,
  COUNT(*) AS numorders,
  SUM(totalprice) AS sumtotalprice
FROM orders
GROUP BY
  TUMBLE(ordertime, INTERVAL '10' SECONDS)
Order Stats Results

Deploying SQL Queries¶

So far, you have written the results of your long-running queries “to the screen”. This is great during development, but a production query needs to write its results to a table, that can be consumed by downstream applications: either by another Flink SQL query or via an application that is accessing the system that stores the table directly (e.g. a dashboard that queries a table stored in an Elasticsearch® index).

For the output table, we propose using a temporary table of type print. A temporary table is not stored in any catalog and can only be used within the script, which it is defined in. A table of type print prints every row to STDOUT (of the Taskmanagers). Again, instead of copying the CREATE TABLE statement you can use the + icon in the schema explorer. You will notice that the editor automatically fills in the correct schema for the query that is currently (selected) in the editor.

CREATE TEMPORARY TABLE order_stats (
  windowtime TIMESTAMP(3),
  numorders BIGINT,
  sumtotalprice DECIMAL(20,2)
)  WITH (
  'connector' = 'print'
);

INSERT INTO order_stats
SELECT
  TUMBLE_ROWTIME(ordertime, INTERVAL '10' SECONDS) AS windowtime,
  COUNT(*) AS numorders,
  SUM(totalprice) AS sumtotalprice
FROM orders
GROUP BY
  TUMBLE(ordertime, INTERVAL '10' SECONDS)

If you Run the script above, Ververica Platform will tell you that this requires a Deployment. A Deployment is one of Ververica Platform’s core abstractions and represents a long-running Apache Flink® application. To learn more about Deployments checkout Getting Started - Flink Operations. In the Deployment creation form you can configure parallelism, resources, Flink configuration and much more. For now, you only need to give your Deployment a name like “Order Statistics” and hit Create SQL Deployment.

Order Statistics Deployment

It will take about a minute until your Deployment enters the RUNNING state. During this time Ververica Platform spins up a dedicated Apache Flink® cluster and deploys your query including its dependencies to it.

Order Statistics Deployment

If you installed the platform with logging, you can checkout the results of the query in Kibana (Remember: a print tables prints to STDOUT of the TaskManager containers). To access Kibana, you need to forward a local port to the the Kibana Service.

kubectl --namespace vvp port-forward services/kibana 5601:5601

Otherwise, you can run

kubectl logs -n vvp-jobs <taskmanager-pod>

to see the results of query.

Cleaning Up¶

If you want to clean up all resources created during this getting started guide, first, cancel and subsequently delete the Deployment that you just started. Second, delete the table definition of the orders table from the catalog by running the following statement. Or right-click, Drop on the table in the schema explorer.

DROP TABLE orders;

Next Steps¶

In this guide you have only scratched the surface of what is possible with Flink SQL on Ververica Platform. Here are a few suggestions on how to continue.

  • Explore the different packaged connectors that are included in the platform, or bundle your own custom connectors.
  • Extend the functionality of Flink SQL by uploading your own user-defined functions.
  • Use multiple catalogs and databases to organize your tables and functions and integrate external catalogs like the Hive Metastore.
  • Find out more about the different kinds of SQL queries that are supported in Flink SQL.
  • Read about how you can use SQL Deployments to manage your long-running queries.
  • Checkout our Jupyter Extension to write and run Flink SQL queries directly from Jupyter instead of the web-based editor.
Next Previous

© Copyright 2023, Ververica GmbH.

Apache Flink, Apache Hadoop, Apache Kafka, Apache ORC, Apache Parquet, Apache Avro, Apache HCatalog, Apache HBase, Apache Cassandra, Flink®, Hadoop®, Kafka®, ORC®, Parquet®, Avro®, HCatalog®, HBase®, Cassandra®, Apache®, the squirrel logo, and the Apache feather logo and any other Apache project name or logo are either registered trademarks or trademarks of The Apache Software Foundation.

Report an issue with this documentation page | Imprint

Other Versions v: v2.9
Tags
v2.10
v2.9
v2.8
v2.7
v2.6
v2.5
v2.4
v2.3
v2.2
v2.1
v2.0
v1.4
v1.3
v1.2
v1.1
v1.0
sql-eap