Elasticsearch
On this page
Background Information
Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents.
The information supported by Elasticsearch Connector is as follows:
Prerequisite
- An Elasticsearch index has been created.
- The Elasticsearch public network or private network access whitelist has been configured.
Usage Restrictions
- Source and dimension table only support Elasticsearch 5.5 and later versions.
- Result table only supports Elasticsearch 6.x and 7.x versions.
- Only the Flink computing engine VERA 2.0.0 and above supports the Elasticsearch Connector.
- Only full Elasticsearch source table are supported, and incremental Elasticsearch source table are not supported.
Grammatical Structures
Source table:
1 CREATE TABLE elasticsearch_source(
2 name STRING,
3 location STRING,
4 value FLOAT
5 ) WITH (
6 'connector' = 'elasticsearch',
7 'endPoint' = '<yourEndPoint>',
8 'indexName' = '<yourIndexName>'
9 );The fields in the DDL correspond to the fields in the Elasticsearch document, and writing the document ID into the table is not supported.
Dimension Table
1 CREATE TABLE es_dim(
2 field1 STRING, -- as a JOIN key, must be of type STRING.
3 field2 FLOAT,
4 field3 BIGINT,
5 PRIMARY KEY (field1) NOT ENFORCED
6 ) WITH (
7 'connector' ='elasticsearch',
8 'endPoint' = '< yourEndPoint>',
9 'indexName' = '<yourIndexName>'
10 );Description:
- If the primary key is specified, there can only be one key for dimension table JOIN, and it is the document ID of the index corresponding to Elasticsearch.
- If the primary key is not specified, there can be one or more keys in dimension table JOIN, and they are fields in the corresponding indexed documents of Elasticsearch.
Result Table
1 CREATE TABLE es_sink(
2 user_id STRING,
3 user_name STRING,
4 uv BIGINT,
5 pv BIGINT,
6 PRIMARY KEY (user_id) NOT ENFORCED -- the primary key is optional, if the primary key is defined, it will be used as the document ID, otherwise the document ID will be a random value. ) WITH
7 (
8 'connector' = 'elasticsearch-7', -- if it is es6.x, fill in elasticsearch-6
9 'hosts' = '<yourHosts>',
10 'index' = '<yourIndex>'
11 );The fields in the DDL correspond to the fields in the Elasticsearch document, and writing the document ID into the table is not supported.
WITH Parameter
Common to Source and Dimension
Source Exclusive
Sink Exclusive
Type Mapping
Flink uses JSON to parse Elasticsearch data.
Example of Use
Source example:
1 CREATE TEMPORARY TABLE elasticsearch_source (
2 name STRING,
3 location STRING,
4 `value` FLOAT
5 ) WITH (
6 'connector' ='elasticsearch',
7 'endPoint' = '<yourEndPoint>',
8 'accessId' = '<yourAccessId>',
9 'accessKey' = '<yourAccessSecret>',
10 'indexName' = '<yourIndexName>',
11 'typeNames' = '<yourTypeName>'
12 );
13
14 CREATE TEMPORARY TABLE blackhole_sink (
15 name STRING,
16 location STRING,
17 `value` FLOAT
18 ) WITH (
19 'connector' ='blackhole'
20 );
21
22 INSERT INTO blackhole_sink
23 SELECT name, location, `value`
24 FROM elasticsearch_source;Dimension example:
1 CREATE TEMPORARY TABLE datagen_source (
2 id STRING,
3 data STRING,
4 proctime as PROCTIME()
5 ) WITH (
6 'connector' = 'datagen'
7 );
8
9 CREATE TEMPORARY TABLE es_dim (
10 id STRING,
11 `value` FLOAT,
12 PRIMARY KEY (id) NOT ENFORCED
13 ) WITH (
14 'connector' ='elasticsearch',
15 'endPoint' = '<yourEndPoint>',
16 'accessId' = '<yourAccessId>',
17 'accessKey' = '<yourAccessSecret>',
18 'indexName' = '<yourIndexName>',
19 'typeNames' = '<yourTypeName>'
20 );
21
22 CREATE TEMPORARY TABLE blackhole_sink (
23 id STRING,
24 data STRING,
25 `value` FLOAT
26 ) WITH (
27 'connector' = 'blackhole'
28 );
29
30 INSERT INTO blackhole_sink
31 SELECT e.*, w.*
32 FROM datagen_source AS e
33 JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
34 ON e.id = w.id;Sink example:
1 CREATE TEMPORARY TABLE datagen_source (
2 id STRING,
3 name STRING,
4 uv BIGINT
5 ) with (
6 'connector' = 'datagen'
7 );
8
9 CREATE TEMPORARY TABLE es_sink (
10 user_id STRING,
11 user_name STRING,
12 uv BIGINT,
13 PRIMARY KEY (user_id) NOT ENFORCED -- the primary key is optional, if the primary key is defined, it will be used as the document ID, otherwise the document ID will be a random value. )
14 WITH (
15 'connector ' = 'elasticsearch-6',
16 'hosts' = '<yourHosts>',
17 'index' = '<yourIndex>',
18 'document-type' = '<yourElasticsearch.types>',
19 'username' = '<yourElasticsearch. accessId>',
20 'password' = '<yourElasticsearch. accessKey>'
21 );
22
23 INSERT INTO es_sink
24 SELECT id, name, uv
25 FROM datagen_source;Document Id
Elasticsearch Sink can determine whether it works in upsert mode or append mode according to whether the primary key is defined. If a primary key is defined, Elasticsearch Sink will work in upsert mode, which can consume messages including UPDATE and DELETE. If no primary key is defined, Elasticsearch Sink will work in append mode, which can only consume INSERT messages.
In Elasticsearch Sink, the primary key is used to calculate the document id for Elasticsearch. The document id is a string of up to 512 bytes without spaces. Elasticsearch Sink generates a document id string for each row by concatenating all primary key fields in the order defined in the DDL using the key delimiter specified by document-id.key-delimiter. Some types (such as BYTES, ROW, ARRAY, and MAP, etc.) are not allowed as primary key fields because they do not have corresponding string representations. If no primary key is specified, Elasticsearch will automatically generate a random document id.
Dynamic Index
Elasticsearch Sink supports both static and dynamic indexes:
- If using a static index, the index option value should be a plain string, such as
myusers, all records will be written to themyusersindex. - If using a dynamic index, you can use
{field_name}to refer to the field value in the record to dynamically generate the target index. You can also use{field_name|date_format_string}to convert field values of type TIMESTAMP, DATE, and TIME to the format specified bydate_format_string.date_format_stringis compatible with Java’sDateTimeFormatter. For example, if it is set tomyusers-{log_ts|yyyy-MM-dd}, the records whoselog_tsfield value is2020-03-27 12:25:55will be written intothe myusers-2020-03-27index.