Skip to main content

Elasticsearch

Background information

Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents.

The information supported by Elasticsearch Connector is as follows:

CategoryDescription
Supported typesSource, Dimension, Result table
Supported modesBatch mode, Streaming mode
Data FormatJson
Monitoring indicatorsSource: numBytesOut, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime. Dimension: None. Result(Sink): None
Types of APIsSQL, Datastream

Prerequisite

  • An Elasticsearch index has been created.
  • The Elasticsearch public network or private network access whitelist has been configured.

Usage restrictions

  • Source and dimension table only support Elasticsearch 5.5 and later versions.
  • Result table only supports Elasticsearch 6.x and 7.x versions.
  • Only the Flink computing engine VERA 2.0.0 and above supports the Elasticsearch Connector.
  • Only full Elasticsearch source table are supported, and incremental Elasticsearch source table are not supported.

Grammatical structures

Source table:

    CREATE TABLE elasticsearch_source(
name STRING,
location STRING,
value FLOAT
) WITH (
'connector' = 'elasticsearch',
'endPoint' = '<yourEndPoint>',
'indexName' = '<yourIndexName>'
);
note

The fields in the DDL correspond to the fields in the Elasticsearch document, and writing the document ID into the table is not supported.

Dimension table

    CREATE TABLE es_dim(
field1 STRING, -- as a JOIN key, must be of type STRING.
field2 FLOAT,
field3 BIGINT,
PRIMARY KEY (field1) NOT ENFORCED
) WITH (
'connector' ='elasticsearch',
'endPoint' = '< yourEndPoint>',
'indexName' = '<yourIndexName>'
);

Description:

  • If the primary key is specified, there can only be one key for dimension table JOIN, and it is the document ID of the index corresponding to Elasticsearch.
  • If the primary key is not specified, there can be one or more keys in dimension table JOIN, and they are fields in the corresponding indexed documents of Elasticsearch.

Result table

    CREATE TABLE es_sink(
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- the primary key is optional, if the primary key is defined, it will be used as the document ID, otherwise the document ID will be a random value. ) WITH
(
'connector' = 'elasticsearch-7', -- if it is es6.x, fill in elasticsearch-6
'hosts' = '<yourHosts>',
'index' = '<yourIndex>'
);
note

The fields in the DDL correspond to the fields in the Elasticsearch document, and writing the document ID into the table is not supported.

WITH parameter

Common to source and dimension

ParameterDescriptionType of dataRequiredDefaultsAdditional info
connectorSource table or dimension table type.Source table or dimension table type.yesnoneThe fixed value is elasticsearch.
endPointServer address.StringyesnoneFor example: http://127.0.0.1:9200
indexNameStringyesnonenoneAdditional info
accessIdThe username of the Elasticsearch instance.Stringnononenone
accessKeyThe password for the Elasticsearch instance.Stringnononenone
typeNamesThe name of type.StringNo_docIt is not recommended to set this parameter in versions above Elasticsearch 7.0.

Source exclusive

ParameterDescriptionType of dataRequiredDefaultsAdditional info
batchSizeThe maximum number of documents to fetch from the Elasticsearch cluster per scroll request.Intno2000none
keepScrollAliveSecsThe maximum time to keep the scroll context.Intno3600The unit is seconds.

Sink exclusive

ParameterDescriptionType of dataRequiredDefaultsAdditional info
connectorThe type of result(sink) tableStringyesnoneFixed to elasticsearch-6 or elasticsearch-7.
hostsServer address.StringyesnoneFor example: http://127.0.0.1:9200
indexindex nameStringyesnonenone
document-typedocument typeStringelasticsearch-6: required. elasticsearch-7: not supported.noneWhen the Connector type is elasticsearch-6, the value of the parameter here needs to be consistent with the value of the type parameter on the Elasticsearch side.
usernameusernameStringnonullThe default is empty, no permission verification is performed.
passwordpasswordStringnonullIf a username is defined, a non-empty password must be defined.
document-id.key-delimiterSeparator for document idsStringnonone
failure-handlerFailure handling strategy when Elasticsearch request fails.StringnofailThe optional strategies are as follows: fail (default): If the request fails, the job fails. ignore: ignore the failure and delete the request. retry-rejected: Re-add requests that failed due to full queue capacity. custom class name: Used to use ActionRequestFailureHandler subclasses for failure handling.
sink.flush-on-checkpointWhether to perform flush at checkpoint.BooleannotrueThe default value is true. After disabling this function, when Elasticsearch checkpoints, the connector will not wait for confirmation that all pending requests have been completed. Therefore, the connector does not provide at-least-once guarantees for requests.
sink.bulk-flush.backoff.strategyIf the flush operation fails due to a temporary request error, set sink.bulk-flush.backoff.strategy to specify the retry strategy.EnumnoDISABLEDDISABLED (default): Do not perform retries, i.e. fail after first request error. CONSTANT: Constant rollback, that is, the waiting time for each rollback is the same. EXPONENTIAL: Exponential rollback, that is, the waiting time for each rollback increases exponentially.
sink.bulk-flush.backoff.max-retriesThe maximum number of fallback retries.Intnononenone
sink.bulk-flush.backoff.delayThe delay between each fallback attempt.DurationnononeFor the CONSTANT fallback strategy: the value is the delay between each retry. For EXPONENTIAL fallback strategy: This value is the initial baseline delay.
sink.bulk-flush.max-actionsThe maximum number of buffered operations per bulk request.Intno10000 means disable the feature.
sink.bulk-flush.max-sizeThe maximum buffer memory size for storing requests.Stringno2mbThe unit is MB, the default value is 2, 0 means disable this function.
sink.bulk-flush.intervalflush intervalDurationno1The default value is 1s, 0 means disable the function.
connection.path-prefixA prefix string to add to every REST communicationStringnonullnone
retry-on-conflictThe maximum number of retries allowed due to version conflict exceptions during an update operation. After this number is exceeded, an exception will be thrown and the job will fail.Intno0Only the Flink engine VERA 1.0.3 and later versions support this parameter. This parameter takes effect only when a primary key is defined.

Type mapping

Flink uses JSON to parse Elasticsearch data.

Example of use

Source example:

    CREATE TEMPORARY TABLE elasticsearch_source (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='blackhole'
);

INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;

Dimension example:

    CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE es_dim (
id STRING,
`value` FLOAT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);

CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);

INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;

Sink example:

    CREATE TEMPORARY TABLE datagen_source (
id STRING,
name STRING,
uv BIGINT
) with (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE es_sink (
user_id STRING,
user_name STRING,
uv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- the primary key is optional, if the primary key is defined, it will be used as the document ID, otherwise the document ID will be a random value. )
WITH (
'connector ' = 'elasticsearch-6',
'hosts' = '<yourHosts>',
'index' = '<yourIndex>',
'document-type' = '<yourElasticsearch.types>',
'username' = '<yourElasticsearch. accessId>',
'password' = '<yourElasticsearch. accessKey>'
);

INSERT INTO es_sink
SELECT id, name, uv
FROM datagen_source;

Document Id

Elasticsearch Sink can determine whether it works in upsert mode or append mode according to whether the primary key is defined. If a primary key is defined, Elasticsearch Sink will work in upsert mode, which can consume messages including UPDATE and DELETE. If no primary key is defined, Elasticsearch Sink will work in append mode, which can only consume INSERT messages.

In Elasticsearch Sink, the primary key is used to calculate the document id for Elasticsearch. The document id is a string of up to 512 bytes without spaces. Elasticsearch Sink generates a document id string for each row by concatenating all primary key fields in the order defined in the DDL using the key delimiter specified by document-id.key-delimiter. Some types (such as BYTES, ROW, ARRAY, and MAP, etc.) are not allowed as primary key fields because they do not have corresponding string representations. If no primary key is specified, Elasticsearch will automatically generate a random document id.

Dynamic index

Elasticsearch Sink supports both static and dynamic indexes:

  • If using a static index, the index option value should be a plain string, such as myusers, all records will be written to the myusers index.
  • If using a dynamic index, you can use {field_name} to refer to the field value in the record to dynamically generate the target index. You can also use {field_name|date_format_string} to convert field values of type TIMESTAMP, DATE, and TIME to the format specified by date_format_string. date_format_string is compatible with Java’s DateTimeFormatter. For example, if it is set to myusers-{log_ts|yyyy-MM-dd}, the records whose log_ts field value is 2020-03-27 12:25:55 will be written into the myusers-2020-03-27 index.
note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.