Skip to main content

Redis

Background Information

Redis is an open source, in-memory data structure store used as a database, cache, message broker, and streaming engine. Flink supports it as an output of streaming data. The information supported by Redis Connector is as follows:

CategoryDescription
Supported typesDimension, Result table
Supported modesStreaming mode
Data FormatString
Monitoring indicatorsSource: numBytesOut, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime. Dimension: None

Prerequisite

  • A Redis instance has been created.

Usage Restrictions

  • Only the Flink computing engine VERA 1.0.3 and above supports the Redis connector.
  • Only supported as result table and dimension table, not as source tables.
  • Currently, configuration of multiple hosts is not supported.

Grammatical Structures

Source table:

    CREATE TABLE redis_table (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED -- required.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>'
);

Description:

  • The DDL of Redis dimension table must meet the following requirements:
  • Redis dimension table must declare and only one primary key can be declared.
  • Redis dimension table only supports the declaration of two fields, and the field type must be STRING.
  • Redis dimension table only supports reading data of type STRING and HASHMAP in Redis.
  • The ON condition of Redis dimension table must include the equivalent conditions of all primary keys.
  • Redis dimension table only supports two cache strategies: None and LRU.

WITH Parameter

Universal for Sink and Dimension

ParameterDescriptionType of dataRequiredDefaultsAdditional info
connectorConnector typeStringyesnoneFixed to Redis.
hostRedis Server connection addressStringyesnonenone
portRedis Server connection port.intno6379none
passwordRedis database passwordStringnoempty stringThe default value is empty string, which means no validation
dbNumSelect the number of database to operate on.intno0none
clusterModeWhether the Redis cluster is in cluster mode.Booleannofalsenone

Sink Exclusive

ParameterDescriptionType of dataRequiredDefaultsAdditional info
modeCorresponding to the data structure of Redis.StringyesnoneThe allowed values are: string, list, set, hashmap, sortedset
ignoreDeleteWhether to ignore retraction messages.BooleannofalseIf set to false, when retraction is received, the key corresponding to the data and the inserted data will be deleted at the same time.
expirationSet the TTL for the Key corresponding to the written data. (Only supported by VERA 1.0.3 and above)Longno0The default value is 0, which means no TTL is set. If the value of this parameter is greater than 0, the key corresponding to the written data will be set with the corresponding TTL, in milliseconds.

Exclusive to Dimension

ParameterDescriptionType of dataRequiredDefaultsAdditional info
hashNameHash key name in hash mode.StringnonoThe default value is empty. Generally, the data type in the Redis dimension table is a STRING type, which is a key value pair. If the hashName parameter is set, the data type in the Redis dimension table is a HASHMAP type, that is, a key - {field value} pair, where: Key is the hashName parameter value. Field is the key specified in CREATE TABLE. Value is the assigned value corresponding to the key, which has the same semantics as the value in the STRING type key-value.
cachecaching strategyStringnononeRedis dimension tables support the following two caching strategies: None (default): no caching. LRU: Cache part of the data in the dimension table. Each piece of data in the source table will trigger the system to search for the data in the cache first, and if not found, it will search in the physical dimension table. Relevant parameters need to be configured: cache size (cacheSize) and cache update interval (cacheTTLMs).
cacheSizecache sizeLongno10000After selecting the LRU cache policy, you can set the cache size, which is 10,000 lines by default.
cache TTLMsCache timeout, in milliseconds.LongnononecacheTTLMs configuration is related to cache: If cache is configured as None, cacheTTLMs can be left unconfigured, indicating that the cache does not timeout. If the cache is configured as LRU, cacheTTLMs is the cache timeout. The default is not to expire.
cacheEmptyWhether to cache empty results.Booleannotruenone

Type Mapping

TypeFlink field typeRedis field type
universalVARCHARSTRING
Sink ExclusiveDOUBLESCORE

Description:

  1. Redis connector result table supports five Redis data structures, and its DDL must be defined in the following format and the primary key must be defined:
  • STRING type DDL has two columns: the first column is key (STRING type), and the second column is value (STRING type). The command to insert data in Redis is set key value.
  • LIST type DDL has two columns: the first column is key (STRING type), and the second column is value (STRING type). The command to insert data in Redis is lpush key value.
  • SET type DDL has two columns: the first column is key (STRING type), and the second column is value (STRING type). The command to insert data in Redis is sadd key value.
  • HASHMAP type The DDL has three columns: the first column is key (STRING type), the second column is hash_key (STRING type), and the third column is hash_value (STRING type) corresponding to hash_key. The command to insert data in Redis is hmset key hash_key hash_value.
  • SORTEDSET type DDL has three columns: the first column is key (STRING type), the second column is score (DOUBLE), and the third column is value (STRING type). The command to insert data in Redis is zadd key score value.
  1. Because the SCORE type of Redis is applied to SORTEDSET (ordered set), it is necessary to manually set a DOUBLE type SCORE for each Value, so that the Value can be sorted according to the SCORE from small to large.

Example

Sink example:

    CREATE TEMPORARY TABLE datagen_source (
v STRING,
p STRING
) with (
'connector' = 'datagen'
);
    CREATE TEMPORARY TABLE redis_sink (
a STRING,
b STRING,
PRIMARY KEY (a) NOT ENFORCED
) with (
'connector' = 'redis',
'mode' = 'string',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
    INSERT INTO redis_sink
SELECT v, p
FROM datagen_source;

Dimension example:

    CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen'
);
    CREATE TEMPORARY TABLE redis_dim (
id STRING,
name STRING,
PRIMARY KEY (id) NOT ENFORCED --Row Key field in Redis.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'port' = '<yourPort>',
'password' = '<yourPassword>'
);
    CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
name STRING
) WITH (
'connector' = 'blackhole'
);
    INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN redis_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;