Amazon Kinesis
Background Information
The Kinesis connector allows for reading data from and writing data into Amazon Kinesis Data Streams (KDS).
How to Create a Kinesis Data Stream Table
The following example shows how to create a table backed by a Kinesis data stream:
CREATE TABLE KinesisTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
'stream' = 'user_behavior',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'csv'
);
Available Metadata
Key | Description | Data type |
---|---|---|
timestamp | The approximate time when the record was inserted into the stream. | TIMESTAMP_LTZ(3) NOT NULL |
shard-id | The unique identifier of the shard within the stream from which the record was read. | VARCHAR(128) NOT NULL |
sequence-number | The unique identifier of the record within its shard. | VARCHAR(128) NOT NULL |
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE KinesisTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3),
`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
`shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
`sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
)
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
'stream' = 'user_behavior',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'csv'
);
Common Options
Parameter | Description | Required | Forwarded | Default | Type |
---|---|---|---|---|---|
connector | Specify what connector to use. For Kinesis use ‘kinesis’. | yes | no | none | String |
stream | Name of the Kinesis data stream backing this table. | yes | yes | none | String |
format | The format used to deserialize and serialize Kinesis data stream records. See Data type mapping for details. | yes | no | none | String |
aws.region | The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or aws.region are required. | no | no | none | String |
aws.endpoint | The AWS region where the stream is defined. Either this or aws.endpoint are required. | no | no | none | String |
aws.trust.all.certificates | If true, accepts all SSL certificates. | no | no | FALSE | String |
Authentication Options
Parameter | Description | Required | Forwarded | Default | Type |
---|---|---|---|---|---|
aws.credentials.provider | A credentials provider to use when authenticating against the Kinesis endpoint. See Authentication for details. | no | no | AUTO | String |
aws.credentials.basic.accesskeyid | The AWS access key ID to use when setting credentials provider type to BASIC. | no | no | none | String |
aws.credentials.basic.secretkey | The AWS secret key to use when setting credentials provider type to BASIC. | no | no | none | String |
aws.credentials.profile.path | Optional configuration for profile path if credential provider type is set to be PROFILE. | no | no | none | String |
aws.credentials.profile.name | Optional configuration for profile name if credential provider type is set to be PROFILE. | no | no | none | String |
aws.credentials.role.arn | The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN. | no | no | none | String |
aws.credentials.role.sessionName | The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN. | no | no | none | String |
aws.credentials.role.externalId | The external ID to use when credential provider type is set to ASSUME_ROLE. | no | no | none | String |
aws.credentials.role.provider | The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE. | no | no | none | String |
aws.credentials.webIdentityToken.file | The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN. | no | no | none | String |
Source Options
Parameter | Description | Required | Forwarded | Default | Type |
---|---|---|---|---|---|
scan.stream.initpos | Initial position to be used when reading from the table. See Start Reading Position for details. | no | no | LATEST | String |
scan.stream.initpos-timestamp | The initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details. | no | no | none | String |
scan.stream.initpos-timestamp-format | The date format of initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details. | no | no | yyyy-MM-dd’T’HH:mm:ss.SSSXXX | String |
scan.stream.recordpublisher | The RecordPublisher type to use for sources. See Enhanced Fan-Out for details. | no | no | POLLING | String |
scan.stream.efo.consumername | The name of the EFO consumer to register with KDS. See Enhanced Fan-Out for details. | no | no | none | String |
scan.stream.efo.registration | Determine how and when consumer de-/registration is performed | no | no | LAZY | String |
scan.stream.efo.consumerarn | Maximum number of allowed concurrent requests for the EFO client. See Enhanced Fan-Out for details. | no | no | 10000 | Integer |
scan.stream.describe.maxretries | The maximum number of describeStream attempts if we get a recoverable exception. | no | no | 50 | Integer |
scan.stream.describe.backoff.base | The base backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams). | no | no | 2000 | Long |
scan.stream.describe.backoff.max | The maximum backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams). | no | no | 5000 | Long |
scan.stream.describe.backoff.expconst | The power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams). | no | no | 1,5 | Double |
scan.list.shards.maxretries | The maximum number of listShards attempts if we get a recoverable exception. | no | no | 10 | Integer |
scan.list.shards.backoff.base | The base backoff time (in milliseconds) between each listShards attempt. | no | no | 1000 | Long |
scan.list.shards.backoff.max | The maximum backoff time (in milliseconds) between each listShards attempt. | no | no | 5000 | Long |
aws.region | The AWS region where the stream is defined. Either this or aws.endpoint are required. | no | no | none | String |
scan.list.shards.backoff.expconst | The power constant for exponential backoff between each listShards attempt. | no | no | 1.5 | Double |
scan.stream.describestreamconsumer.maxretries | The maximum number of describeStreamConsumer attempts if we get a recoverable exception. | no | no | 50 | Integer |
scan.stream.describestreamconsumer.backoff.base | The base backoff time (in milliseconds) between each describeStreamConsumer attempt. | no | no | 2000 | Long |
scan.stream.describestreamconsumer.backoff.max | The maximum backoff time (in milliseconds) between each describeStreamConsumer attempt. | no | no | 5000 | Long |
scan.stream.describestreamconsumer.backoff.expconst | The power constant for exponential backoff between each describeStreamConsumer attempt. | no | no | 1.5 | Double |
scan.stream.registerstreamconsumer.maxretries | The maximum number of registerStream attempts if we get a recoverable exception. | no | no | 10 | Integer |
scan.stream.registerstreamconsumer.timeout | The maximum time in seconds to wait for a stream consumer to become active before giving up. | no | no | 60 | Integer |
scan.stream.registerstreamconsumer.backoff.base | The base backoff time (in milliseconds) between each registerStream attempt. | no | no | 500 | Long |
scan.stream.registerstreamconsumer.backoff.max | The maximum backoff time (in milliseconds) between each registerStream attempt. | no | no | 2000 | Long |
scan.stream.registerstreamconsumer.backoff.expconst | The power constant for exponential backoff between each registerStream attempt. | no | no | 1.5 | Double |
aws.region | The AWS region where the stream is defined. Either this or aws.endpoint are required. | no | no | none | String |
scan.stream.deregisterstreamconsumer.maxretries | The maximum number of deregisterStream attempts if we get a recoverable exception. | no | no | 10 | Integer |
scan.stream.deregisterstreamconsumer.timeout | The maximum time in seconds to wait for a stream consumer to deregister before giving up. | no | no | 60 | Integer |
scan.stream.deregisterstreamconsumer.backoff.base | The base backoff time (in milliseconds) between each deregisterStream attempt. | no | no | 500 | Long |
scan.stream.deregisterstreamconsumer.backoff.max | The maximum backoff time (in milliseconds) between each deregisterStream attempt. | no | no | 2000 | Long |
scan.stream.deregisterstreamconsumer.backoff.expconst | The power constant for exponential backoff between each deregisterStream attempt. | no | no | 1.5 | Double |
scan.shard.subscribetoshard.maxretries | The maximum number of subscribeToShard attempts if we get a recoverable exception. | no | no | 10 | Integer |
scan.shard.subscribetoshard.backoff.base | The base backoff time (in milliseconds) between each subscribeToShard attempt. | no | no | 1000 | Long |
scan.shard.subscribetoshard.backoff.max | The maximum backoff time (in milliseconds) between each subscribeToShard attempt. | no | no | 2000 | Long |
scan.shard.subscribetoshard.backoff.expconst | The power constant for exponential backoff between each subscribeToShard attempt. | no | no | 1.5 | Double |
scan.shard.getrecords.maxrecordcount | The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. | no | no | 10000 | Integer |
scan.shard.getrecords.maxretries | The maximum number of getRecords attempts if we get a recoverable exception. | no | no | 3 | Integer |
scan.shard.getrecords.backoff.base | The base backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException. | no | no | 300 | Long |
scan.shard.getrecords.backoff.max | The maximum backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException. | no | no | 1000 | Long |
scan.shard.getrecords.backoff.expconst | The power constant for exponential backoff between each getRecords attempt. | no | no | 1.5 | Double |
scan.shard.getrecords.intervalmillis | The interval (in milliseconds) between each getRecords request to a AWS Kinesis shard in milliseconds. | no | no | 200 | Long |
aws.region | The AWS region where the stream is defined. Either this or aws.endpoint are required. | no | no | none | String |
scan.shard.getiterator.maxretries | The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException. | no | no | 3 | Integer |
scan.shard.getiterator.backoff.base | The base backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException. | no | no | 300 | Long |
scan.shard.getiterator.backoff.max | The maximum backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException. | no | no | 1000 | Long |
scan.shard.getiterator.backoff.expconst | The power constant for exponential backoff between each getShardIterator attempt. | no | no | 1.5 | Double |
aws.region | The AWS region where the stream is defined. Either this or aws.endpoint are required. | no | no | none | String |
scan.shard.discovery.intervalmillis | The interval between each attempt to discover new shards. | no | no | 10000 | Integer |
scan.shard.adaptivereads | The config to turn on adaptive reads from a shard. See the AdaptivePollingRecordPublisher documentation for details. | no | no | FALSE | Boolean |
scan.shard.idle.interval | The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don’t receive new records. | no | no | -1 | Long |
scan.watermark.sync.interval | The interval (in milliseconds) for periodically synchronizing the shared watermark state. | no | no | 30000 | Long |
scan.watermark.lookahead.millis | The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark. | no | no | 0 | Long |
aws.region | The AWS region where the stream is defined. Either this or aws.endpoint are required. | no | no | none | String |
scan.watermark.sync.queue.capacity | The maximum number of records that will be buffered before suspending consumption of a shard. | no | no | 100 | Integer |
Sink Options
Parameter | Description | Required | Forwarded | Default | Type |
---|---|---|---|---|---|
sink.partitioner | Optional output partitioning from Flink’s partitions into Kinesis shards. See Sink Partitioning for details. | no | yes | random or row-based | String |
sink.partitioner-field-delimiter | no | yes | String | ||
sink.producer.* | Deprecated options previously used by the legacy connector. Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings. | no | no | none | |
sink.http-client.max-concurrency | Maximum number of allowed concurrent requests by KinesisAsyncClient. | no | no | 10000 | Integer |
sink.http-client.read-timeout | Maximum amount of time in ms for requests to be sent by KinesisAsyncClient. | no | no | 360000 | Integer |
sink.http-client.protocol.version | Http version used by Kinesis Client. | no | no | HTTP2 | String |
sink.batch.max-size | Maximum batch size of elements to be passed to KinesisAsyncClient to be written downstream. | no | yes | 500 | Integer |
sink.requests.max-inflight | Request threshold for uncompleted requests by KinesisAsyncClientbefore blocking new write requests and applying backpressure. | no | yes | 16 | Integer |
sink.requests.max-buffered | Request buffer threshold for buffered requests by KinesisAsyncClient before blocking new write requests and applying backpressure. | no | yes | 10000 | String |
sink.flush-buffer.size | Threshold value in bytes for writer buffer in KinesisAsyncClient before flushing. | no | yes | 5242880 | Long |
sink.flush-buffer.timeout | Threshold time in milliseconds for an element to be in a buffer ofKinesisAsyncClient before flushing. | no | yes | 5000 | Long |
sink.fail-on-error | Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job. | no | yes | FALSE | Boolean |
note
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.