Skip to main content

StarRocks

This topic describes how to use the StarRocks connector.

Background Information

StarRocks is a new generation of Massively Parallel Processing (MPP) data warehouses that provide extremely fast query performance. StarRocks is dedicated to providing an extremely fast and unified analytics experience. StarRocks provides the following benefits:

  • Compatibility: Compatible with the MySQL protocol. You can use a MySQL client or a common business intelligence (BI) tool to access StarRocks for data analytics.
  • Distributed Architecture:
    • Horizontally splits tables and stores data in multiple replicas.
    • Scales clusters in a flexible manner to support analytics of up to 10 PB of data.
    • Supports MPP architecture to accelerate data computing.
    • Supports multiple replicas to ensure fault tolerance.

Flink connectors cache data and use Stream Load to import data in batches to generate sink tables, and read data in batches to generate source tables. The following table describes the capabilities supported by the StarRocks connector.

Capabilities Table

ItemDescription
Table typeSource table and sink table
Running modeStreaming mode and batch mode
Data formatCSV and JSON
MetricN/A
Data update or deletion in a sink tableSupported

Features

StarRocks of E-MapReduce (EMR) supports the CREATE TABLE AS and CREATE DATABASE AS statements. The CREATE TABLE AS statement can be used to synchronize the schema and data of a single table. The CREATE DATABASE AS statement can be used to synchronize data of an entire database or the schema and data of multiple tables in the same database.

Prerequisites

  • A StarRocks cluster is created. The StarRocks cluster can be a StarRocks cluster of EMR or a self-managed StarRocks cluster that is hosted on Elastic Compute Service (ECS) instances.

Limits

  • Only Ververica Cloud for Apache Flink that uses vera-1-0-6-flink-1.17 or later supports the StarRocks connector.
  • The StarRocks connector supports only the at-least-once and exactly-once semantics.

Syntax

CREATE TABLE USER_RESULT (
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port',
'load-url' = 'fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);

Parameters in the WITH Clause

CategoryParameterDescriptionData TypeRequiredDefault ValueRemarks
Common parametersconnectorThe type of tables.STRINGYesNo default valueSet the value to starrocks.
Common parametersjdbc-urlThe Java Database Connectivity (JDBC) URL that is used to connect to the database.STRINGYesNo default valueThe specified IP address and JDBC port of a frontend (FE) are used. The value of this parameter is in the jdbc:mysql://ip:port format.
Common parametersdatabase-nameThe name of the StarRocks database.STRINGYesNo default valueN/A
Common parameterstable-nameThe name of the StarRocks table.STRINGYesNo default valueN/A
Common parametersusernameThe username that is used to connect to the StarRocks database.STRINGYesNo default valueN/A
Common parameterspasswordThe password that is used to connect to the StarRocks database.STRINGYesNo default valueN/A
Common parametersstarrocks.create.table.propertiesThe properties of the StarRocks table.STRINGNoNo default valueThe initial properties of the StarRocks table, such as the engine and the number of replicas, are specified. Example: starrocks.create.table.properties = buckets 8, starrocks.create.table.properties = replication_num=1
Parameters only for source tablesscan-urlThe URL for data scan.STRINGNoNo default valueThe specified IP address and HTTP port of an FE are used. The value of this parameter is in the fe_ip:http_port;fe_ip:http_port format.
Parameters only for source tablesscan.connect.timeout-msThe timeout period for the StarRocks connector of Flink to connect to the StarRocks database.STRINGNo1000Unit: milliseconds.
Parameters only for source tablesscan.params.keep-alive-minThe keep-alive period of the query task.STRINGNo10N/A
Parameters only for source tablesscan.params.query-timeout-sThe timeout period of the query task.STRINGNo600Unit: seconds.
Parameters only for source tablesscan.params.mem-limit-byteThe maximum memory for a single query in a backend (BE) node.STRINGNo1073741824 (1 GB)Unit: byte.
Parameters only for source tablesscan.max-retriesThe maximum number of retries when a query fails.STRINGNo1N/A
Parameters only for sink tablesload-urlThe URL for data import.STRINGYesNo default valueThe specified IP address and HTTP port of an FE are used. The value of this parameter is in the fe_ip:http_port;fe_ip:http_port format.
Parameters only for sink tablessink.semanticThe semantics for data writing.STRINGNoat-least-onceValid values: at-least-once, exactly-once.

Data Type Mappings

Data type of StarRocksData type of Flink
NULLNULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
LARGEINTSTRING
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
DECIMALV2DECIMAL
DECIMAL32DECIMAL
DECIMAL64DECIMAL
DECIMAL128DECIMAL
CHARCHAR
VARCHARSTRING

Sample Code

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;