StarRocks connector
On this page
This topic describes how to use the StarRocks connector.
Background Information
StarRocks is a new generation of Massively Parallel Processing (MPP) data warehouses that provide extremely fast query performance. StarRocks is dedicated to providing an extremely fast and unified analytics experience. StarRocks provides the following benefits:
- Compatibility: Compatible with the MySQL protocol. You can use a MySQL client or a common business intelligence (BI) tool to access StarRocks for data analytics.
- Distributed Architecture:
- Horizontally splits tables and stores data in multiple replicas.
- Scales clusters in a flexible manner to support analytics of up to 10 PB of data.
- Supports MPP architecture to accelerate data computing.
- Supports multiple replicas to ensure fault tolerance.
Flink connectors cache data and use Stream Load to import data in batches to generate sink tables, and read data in batches to generate source tables. The following table describes the capabilities supported by the StarRocks connector.
Capabilities Table
Features
StarRocks of E-MapReduce (EMR) supports the CREATE TABLE AS and CREATE DATABASE AS statements. The CREATE TABLE AS statement can be used to synchronize the schema and data of a single table. The CREATE DATABASE AS statement can be used to synchronize data of an entire database or the schema and data of multiple tables in the same database.
Prerequisites
- A StarRocks cluster is created. The StarRocks cluster can be a StarRocks cluster of EMR or a self-managed StarRocks cluster that is hosted on Elastic Compute Service (ECS) instances.
Limits
- Only Ververica Cloud for Apache Flink that uses
vera-1-0-6-flink-1.17or later supports the StarRocks connector. - The StarRocks connector supports only the at-least-once and exactly-once semantics.
Syntax
1CREATE TABLE USER_RESULT (
2 name VARCHAR,
3 score BIGINT
4) WITH (
5 'connector' = 'starrocks',
6 'jdbc-url' = 'jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port',
7 'load-url' = 'fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
8 'database-name' = 'xxx',
9 'table-name' = 'xxx',
10 'username' = 'xxx',
11 'password' = 'xxx'
12);Parameters in the WITH Clause
Data Type Mappings
Sample Code
1CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
2 `runoob_id` BIGINT NOT NULL,
3 `runoob_title` STRING NOT NULL,
4 `runoob_author` STRING NOT NULL,
5 `submission_date` DATE NULL
6) WITH (
7 'connector' = 'starrocks',
8 'jdbc-url' = 'jdbc:mysql://ip:9030',
9 'scan-url' = 'ip:18030',
10 'database-name' = 'db_name',
11 'table-name' = 'table_name',
12 'password' = 'xxxxxxx',
13 'username' = 'xxxxx'
14);
15CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
16 `runoob_id` BIGINT NOT NULL,
17 `runoob_title` STRING NOT NULL,
18 `runoob_author` STRING NOT NULL,
19 `submission_date` DATE NULL
20 PRIMARY KEY(`runoob_id`)
21 NOT ENFORCED
22) WITH (
23 'jdbc-url' = 'jdbc:mysql://ip:9030',
24 'connector' = 'starrocks',
25 'load-url' = 'ip:18030',
26 'database-name' = 'db_name',
27 'table-name' = 'table_name',
28 'password' = 'xxxxxxx',
29 'username' = 'xxxx',
30 'sink.buffer-flush.interval-ms' = '5000'
31);
32INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;