Elasticsearch
Background information
Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents.
The information supported by Elasticsearch Connector is as follows:
Category | Description |
---|---|
Supported types | Source, Dimension, Result table |
Supported modes | Batch mode, Streaming mode |
Data Format | Json |
Monitoring indicators | Source: numBytesOut, numRecordsOutPerSecond, numBytesOutPerSecond, currentSendTime. Dimension: None. Result(Sink): None |
Types of APIs | SQL, Datastream |
Prerequisite
- An Elasticsearch index has been created.
- The Elasticsearch public network or private network access whitelist has been configured.
Usage restrictions
- Source and dimension table only support Elasticsearch 5.5 and later versions.
- Result table only supports Elasticsearch 6.x and 7.x versions.
- Only the Flink computing engine VERA 2.0.0 and above supports the Elasticsearch Connector.
- Only full Elasticsearch source table are supported, and incremental Elasticsearch source table are not supported.
Grammatical structures
Source table:
CREATE TABLE elasticsearch_source(
name STRING,
location STRING,
value FLOAT
) WITH (
'connector' = 'elasticsearch',
'endPoint' = '<yourEndPoint>',
'indexName' = '<yourIndexName>'
);
The fields in the DDL correspond to the fields in the Elasticsearch document, and writing the document ID into the table is not supported.
Dimension table
CREATE TABLE es_dim(
field1 STRING, -- as a JOIN key, must be of type STRING.
field2 FLOAT,
field3 BIGINT,
PRIMARY KEY (field1) NOT ENFORCED
) WITH (
'connector' ='elasticsearch',
'endPoint' = '< yourEndPoint>',
'indexName' = '<yourIndexName>'
);
Description:
- If the primary key is specified, there can only be one key for dimension table JOIN, and it is the document ID of the index corresponding to Elasticsearch.
- If the primary key is not specified, there can be one or more keys in dimension table JOIN, and they are fields in the corresponding indexed documents of Elasticsearch.
Result table
CREATE TABLE es_sink(
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- the primary key is optional, if the primary key is defined, it will be used as the document ID, otherwise the document ID will be a random value. ) WITH
(
'connector' = 'elasticsearch-7', -- if it is es6.x, fill in elasticsearch-6
'hosts' = '<yourHosts>',
'index' = '<yourIndex>'
);
The fields in the DDL correspond to the fields in the Elasticsearch document, and writing the document ID into the table is not supported.
WITH parameter
Common to source and dimension
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
connector | Source table or dimension table type. | Source table or dimension table type. | yes | none | The fixed value is elasticsearch. |
endPoint | Server address. | String | yes | none | For example: http://127.0.0.1:9200 |
indexName | String | yes | none | none | Additional info |
accessId | The username of the Elasticsearch instance. | String | no | none | none |
accessKey | The password for the Elasticsearch instance. | String | no | none | none |
typeNames | The name of type. | String | No | _doc | It is not recommended to set this parameter in versions above Elasticsearch 7.0. |
Source exclusive
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
batchSize | The maximum number of documents to fetch from the Elasticsearch cluster per scroll request. | Int | no | 2000 | none |
keepScrollAliveSecs | The maximum time to keep the scroll context. | Int | no | 3600 | The unit is seconds. |
Sink exclusive
Parameter | Description | Type of data | Required | Defaults | Additional info |
---|---|---|---|---|---|
connector | The type of result(sink) table | String | yes | none | Fixed to elasticsearch-6 or elasticsearch-7. |
hosts | Server address. | String | yes | none | For example: http://127.0.0.1:9200 |
index | index name | String | yes | none | none |
document-type | document type | String | elasticsearch-6: required. elasticsearch-7: not supported. | none | When the Connector type is elasticsearch-6, the value of the parameter here needs to be consistent with the value of the type parameter on the Elasticsearch side. |
username | username | String | no | null | The default is empty, no permission verification is performed. |
password | password | String | no | null | If a username is defined, a non-empty password must be defined. |
document-id.key-delimiter | Separator for document ids | String | no | none | |
failure-handler | Failure handling strategy when Elasticsearch request fails. | String | no | fail | The optional strategies are as follows: fail (default): If the request fails, the job fails. ignore: ignore the failure and delete the request. retry-rejected: Re-add requests that failed due to full queue capacity. custom class name: Used to use ActionRequestFailureHandler subclasses for failure handling. |
sink.flush-on-checkpoint | Whether to perform flush at checkpoint. | Boolean | no | true | The default value is true. After disabling this function, when Elasticsearch checkpoints, the connector will not wait for confirmation that all pending requests have been completed. Therefore, the connector does not provide at-least-once guarantees for requests. |
sink.bulk-flush.backoff.strategy | If the flush operation fails due to a temporary request error, set sink.bulk-flush.backoff.strategy to specify the retry strategy. | Enum | no | DISABLED | DISABLED (default): Do not perform retries, i.e. fail after first request error. CONSTANT: Constant rollback, that is, the waiting time for each rollback is the same. EXPONENTIAL: Exponential rollback, that is, the waiting time for each rollback increases exponentially. |
sink.bulk-flush.backoff.max-retries | The maximum number of fallback retries. | Int | no | none | none |
sink.bulk-flush.backoff.delay | The delay between each fallback attempt. | Duration | no | none | For the CONSTANT fallback strategy: the value is the delay between each retry. For EXPONENTIAL fallback strategy: This value is the initial baseline delay. |
sink.bulk-flush.max-actions | The maximum number of buffered operations per bulk request. | Int | no | 1000 | 0 means disable the feature. |
sink.bulk-flush.max-size | The maximum buffer memory size for storing requests. | String | no | 2mb | The unit is MB, the default value is 2, 0 means disable this function. |
sink.bulk-flush.interval | flush interval | Duration | no | 1 | The default value is 1s, 0 means disable the function. |
connection.path-prefix | A prefix string to add to every REST communication | String | no | null | none |
retry-on-conflict | The maximum number of retries allowed due to version conflict exceptions during an update operation. After this number is exceeded, an exception will be thrown and the job will fail. | Int | no | 0 | Only the Flink engine VERA 1.0.3 and later versions support this parameter. This parameter takes effect only when a primary key is defined. |
Type mapping
Flink uses JSON to parse Elasticsearch data.
Example of use
Source example:
CREATE TEMPORARY TABLE elasticsearch_source (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
name STRING,
location STRING,
`value` FLOAT
) WITH (
'connector' ='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, location, `value`
FROM elasticsearch_source;
Dimension example:
CREATE TEMPORARY TABLE datagen_source (
id STRING,
data STRING,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_dim (
id STRING,
`value` FLOAT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' ='elasticsearch',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'indexName' = '<yourIndexName>',
'typeNames' = '<yourTypeName>'
);
CREATE TEMPORARY TABLE blackhole_sink (
id STRING,
data STRING,
`value` FLOAT
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink
SELECT e.*, w.*
FROM datagen_source AS e
JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
ON e.id = w.id;
Sink example:
CREATE TEMPORARY TABLE datagen_source (
id STRING,
name STRING,
uv BIGINT
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE es_sink (
user_id STRING,
user_name STRING,
uv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED -- the primary key is optional, if the primary key is defined, it will be used as the document ID, otherwise the document ID will be a random value. )
WITH (
'connector ' = 'elasticsearch-6',
'hosts' = '<yourHosts>',
'index' = '<yourIndex>',
'document-type' = '<yourElasticsearch.types>',
'username' = '<yourElasticsearch. accessId>',
'password' = '<yourElasticsearch. accessKey>'
);
INSERT INTO es_sink
SELECT id, name, uv
FROM datagen_source;
Document Id
Elasticsearch Sink can determine whether it works in upsert mode or append mode according to whether the primary key is defined. If a primary key is defined, Elasticsearch Sink will work in upsert mode, which can consume messages including UPDATE and DELETE. If no primary key is defined, Elasticsearch Sink will work in append mode, which can only consume INSERT messages.
In Elasticsearch Sink, the primary key is used to calculate the document id for Elasticsearch. The document id is a string of up to 512 bytes without spaces. Elasticsearch Sink generates a document id string for each row by concatenating all primary key fields in the order defined in the DDL using the key delimiter specified by document-id.key-delimiter. Some types (such as BYTES, ROW, ARRAY, and MAP, etc.) are not allowed as primary key fields because they do not have corresponding string representations. If no primary key is specified, Elasticsearch will automatically generate a random document id.
Dynamic index
Elasticsearch Sink supports both static and dynamic indexes:
- If using a static index, the index option value should be a plain string, such as
myusers
, all records will be written to themyusers
index. - If using a dynamic index, you can use
{field_name}
to refer to the field value in the record to dynamically generate the target index. You can also use{field_name|date_format_string}
to convert field values of type TIMESTAMP, DATE, and TIME to the format specified bydate_format_string
.date_format_string
is compatible with Java’sDateTimeFormatter
. For example, if it is set tomyusers-{log_ts|yyyy-MM-dd}
, the records whoselog_ts
field value is2020-03-27 12:25:55
will be written intothe myusers-2020-03-27
index.
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.