Redis
Background Information
Redis is an open source, in-memory data structure store used as a database, cache, message broker, and streaming engine. Flink supports it as an output of streaming data. The information supported by Redis Connector is as follows:
Category | Description |
---|---|
Supported types | Dimension, Result table |
Supported modes | Streaming mode |
Data Format | String |
Monitoring indicators | Source: numBytesOut, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime. Dimension: None |
Prerequisite
- A Redis instance has been created.
Usage Restrictions
- Only the Flink computing engine VERA 1.0.3 and above supports the Redis connector.
- Only supported as result table and dimension table, not as source tables.
- Currently, configuration of multiple hosts is not supported.
Grammatical Structures
Source table:
CREATE TABLE redis_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- required.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>'
);
Description:
- The DDL of Redis dimension table must meet the following requirements:
- Redis dimension table must declare and only one primary key can be declared.
- Redis dimension table only supports the declaration of two fields, and the field type must be STRING.
- Redis dimension table only supports reading data of type STRING and HASHMAP in Redis.
- The ON condition of Redis dimension table must include the equivalent conditions of all primary keys.
- Redis dimension table only supports two cache strategies: None and LRU.
WITH Parameter
Universal for Sink and Dimension
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
connector | Connector type | String | yes | none | Fixed to Redis. |
host | Redis Server connection address | String | yes | none | none |
port | Redis Server connection port. | int | no | 6379 | none |
password | Redis database password | String | no | empty string | The default value is empty string, which means no validation |
dbNum | Select the number of database to operate on. | int | no | 0 | none |
clusterMode | Whether the Redis cluster is in cluster mode. | Boolean | no | false | none |
Sink Exclusive
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
mode | Corresponding to the data structure of Redis. | String | yes | none | The allowed values are: string, list, set, hashmap, sortedset |
ignoreDelete | Whether to ignore retraction messages. | Boolean | no | false | If set to false, when retraction is received, the key corresponding to the data and the inserted data will be deleted at the same time. |
expiration | Set the TTL for the Key corresponding to the written data. (Only supported by VERA 1.0.3 and above) | Long | no | 0 | The default value is 0, which means no TTL is set. If the value of this parameter is greater than 0, the key corresponding to the written data will be set with the corresponding TTL, in milliseconds. |
Exclusive to Dimension
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
hashName | Hash key name in hash mode. | String | no | no | The default value is empty. Generally, the data type in the Redis dimension table is a STRING type, which is a key value pair. If the hashName parameter is set, the data type in the Redis dimension table is a HASHMAP type, that is, a key - {field value} pair, where: Key is the hashName parameter value. Field is the key specified in CREATE TABLE. Value is the assigned value corresponding to the key, which has the same semantics as the value in the STRING type key-value. |
cache | caching strategy | String | no | none | Redis dimension tables support the following two caching strategies: None (default): no caching. LRU: Cache part of the data in the dimension table. Each piece of data in the source table will trigger the system to search for the data in the cache first, and if not found, it will search in the physical dimension table. Relevant parameters need to be configured: cache size (cacheSize) and cache update interval (cacheTTLMs). |
cacheSize | cache size | Long | no | 10000 | After selecting the LRU cache policy, you can set the cache size, which is 10,000 lines by default. |
cache TTLMs | Cache timeout, in milliseconds. | Long | no | none | cacheTTLMs configuration is related to cache: If cache is configured as None, cacheTTLMs can be left unconfigured, indicating that the cache does not timeout. If the cache is configured as LRU, cacheTTLMs is the cache timeout. The default is not to expire. |
cacheEmpty | Whether to cache empty results. | Boolean | no | true | none |
Type Mapping
Type | Flink field type | Redis field type |
---|---|---|
universal | VARCHAR | STRING |
Sink Exclusive | DOUBLE | SCORE |
Description:
- Redis connector result table supports five Redis data structures, and its DDL must be defined in the following format and the primary key must be defined:
- STRING type DDL has two columns: the first column is key (STRING type), and the second column is value (STRING type). The command to insert data in Redis is set key value.
- LIST type DDL has two columns: the first column is key (STRING type), and the second column is value (STRING type). The command to insert data in Redis is lpush key value.
- SET type DDL has two columns: the first column is key (STRING type), and the second column is value (STRING type). The command to insert data in Redis is sadd key value.
- HASHMAP type The DDL has three columns: the first column is key (STRING type), the second column is hash_key (STRING type), and the third column is hash_value (STRING type) corresponding to hash_key. The command to insert data in Redis is hmset key hash_key hash_value.
- SORTEDSET type DDL has three columns: the first column is key (STRING type), the second column is score (DOUBLE), and the third column is value (STRING type). The command to insert data in Redis is zadd key score value.
- Because the SCORE type of Redis is applied to SORTEDSET (ordered set), it is necessary to manually set a DOUBLE type SCORE for each Value, so that the Value can be sorted according to the SCORE from small to large.
Example
Sink example:
CREATE TEMPORARY TABLE datagen_source (
v STRING,
p STRING
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE redis_sink (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED
) with (
'connector' = 'redis',
'mode' = 'string',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
INSERT INTO redis_sink
SELECT v, p
FROM datagen_source;
Dimension example:
CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE redis_dim (
id STRING,
name STRING,
PRIMARY KEY (id) NOT ENFORCED --Row Key field in Redis.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
name STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN redis_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;