Skip to main content

Amazon Kinesis

Background Information

The Kinesis connector allows for reading data from and writing data into Amazon Kinesis Data Streams (KDS).

How to Create a Kinesis Data Stream Table

The following example shows how to create a table backed by a Kinesis data stream:

    CREATE TABLE KinesisTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
'stream' = 'user_behavior',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'csv'
);

Available Metadata

KeyDescriptionData type
timestampThe approximate time when the record was inserted into the stream.TIMESTAMP_LTZ(3) NOT NULL
shard-idThe unique identifier of the shard within the stream from which the record was read.VARCHAR(128) NOT NULL
sequence-numberThe unique identifier of the record within its shard.VARCHAR(128) NOT NULL

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

    CREATE TABLE KinesisTable (
`user_id` BIGINT,
`item_id` BIGINT,
`category_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3),
`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
`shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
`sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
)
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
'stream' = 'user_behavior',
'aws.region' = 'us-east-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'csv'
);

Common Options

ParameterDescriptionRequiredForwardedDefaultType
connectorSpecify what connector to use. For Kinesis use ‘kinesis’.yesnononeString
streamName of the Kinesis data stream backing this table.yesyesnoneString
formatThe format used to deserialize and serialize Kinesis data stream records. See Data type mapping for details.yesnononeString
aws.regionThe AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or aws.region are required.nonononeString
aws.endpointThe AWS region where the stream is defined. Either this or aws.endpoint are required.nonononeString
aws.trust.all.certificatesIf true, accepts all SSL certificates.nonoFALSEString

Authentication Options

ParameterDescriptionRequiredForwardedDefaultType
aws.credentials.providerA credentials provider to use when authenticating against the Kinesis endpoint. See Authentication for details.nonoAUTOString
aws.credentials.basic.accesskeyidThe AWS access key ID to use when setting credentials provider type to BASIC.nonononeString
aws.credentials.basic.secretkeyThe AWS secret key to use when setting credentials provider type to BASIC.nonononeString
aws.credentials.profile.pathOptional configuration for profile path if credential provider type is set to be PROFILE.nonononeString
aws.credentials.profile.nameOptional configuration for profile name if credential provider type is set to be PROFILE.nonononeString
aws.credentials.role.arnThe role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.nonononeString
aws.credentials.role.sessionNameThe role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.nonononeString
aws.credentials.role.externalIdThe external ID to use when credential provider type is set to ASSUME_ROLE.nonononeString
aws.credentials.role.providerThe credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE.nonononeString
aws.credentials.webIdentityToken.fileThe absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.nonononeString

Source Options

ParameterDescriptionRequiredForwardedDefaultType
scan.stream.initposInitial position to be used when reading from the table. See Start Reading Position for details.nonoLATESTString
scan.stream.initpos-timestampThe initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.nonononeString
scan.stream.initpos-timestamp-formatThe date format of initial timestamp to start reading Kinesis stream from (when scan.stream.initpos is AT_TIMESTAMP). See Start Reading Position for details.nonoyyyy-MM-dd’T’HH:mm:ss.SSSXXXString
scan.stream.recordpublisherThe RecordPublisher type to use for sources. See Enhanced Fan-Out for details.nonoPOLLINGString
scan.stream.efo.consumernameThe name of the EFO consumer to register with KDS. See Enhanced Fan-Out for details.nonononeString
scan.stream.efo.registrationDetermine how and when consumer de-/registration is performednonoLAZYString
scan.stream.efo.consumerarnMaximum number of allowed concurrent requests for the EFO client. See Enhanced Fan-Out for details.nono10000Integer
scan.stream.describe.maxretriesThe maximum number of describeStream attempts if we get a recoverable exception.nono50Integer
scan.stream.describe.backoff.baseThe base backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams).nono2000Long
scan.stream.describe.backoff.maxThe maximum backoff time (in milliseconds) between each describeStream attempt (for consuming from DynamoDB streams).nono5000Long
scan.stream.describe.backoff.expconstThe power constant for exponential backoff between each describeStream attempt (for consuming from DynamoDB streams).nono1,5Double
scan.list.shards.maxretriesThe maximum number of listShards attempts if we get a recoverable exception.nono10Integer
scan.list.shards.backoff.baseThe base backoff time (in milliseconds) between each listShards attempt.nono1000Long
scan.list.shards.backoff.maxThe maximum backoff time (in milliseconds) between each listShards attempt.nono5000Long
aws.regionThe AWS region where the stream is defined. Either this or aws.endpoint are required.nonononeString
scan.list.shards.backoff.expconstThe power constant for exponential backoff between each listShards attempt.nono1.5Double
scan.stream.describestreamconsumer.maxretriesThe maximum number of describeStreamConsumer attempts if we get a recoverable exception.nono50Integer
scan.stream.describestreamconsumer.backoff.baseThe base backoff time (in milliseconds) between each describeStreamConsumer attempt.nono2000Long
scan.stream.describestreamconsumer.backoff.maxThe maximum backoff time (in milliseconds) between each describeStreamConsumer attempt.nono5000Long
scan.stream.describestreamconsumer.backoff.expconstThe power constant for exponential backoff between each describeStreamConsumer attempt.nono1.5Double
scan.stream.registerstreamconsumer.maxretriesThe maximum number of registerStream attempts if we get a recoverable exception.nono10Integer
scan.stream.registerstreamconsumer.timeoutThe maximum time in seconds to wait for a stream consumer to become active before giving up.nono60Integer
scan.stream.registerstreamconsumer.backoff.baseThe base backoff time (in milliseconds) between each registerStream attempt.nono500Long
scan.stream.registerstreamconsumer.backoff.maxThe maximum backoff time (in milliseconds) between each registerStream attempt.nono2000Long
scan.stream.registerstreamconsumer.backoff.expconstThe power constant for exponential backoff between each registerStream attempt.nono1.5Double
aws.regionThe AWS region where the stream is defined. Either this or aws.endpoint are required.nonononeString
scan.stream.deregisterstreamconsumer.maxretriesThe maximum number of deregisterStream attempts if we get a recoverable exception.nono10Integer
scan.stream.deregisterstreamconsumer.timeoutThe maximum time in seconds to wait for a stream consumer to deregister before giving up.nono60Integer
scan.stream.deregisterstreamconsumer.backoff.baseThe base backoff time (in milliseconds) between each deregisterStream attempt.nono500Long
scan.stream.deregisterstreamconsumer.backoff.maxThe maximum backoff time (in milliseconds) between each deregisterStream attempt.nono2000Long
scan.stream.deregisterstreamconsumer.backoff.expconstThe power constant for exponential backoff between each deregisterStream attempt.nono1.5Double
scan.shard.subscribetoshard.maxretriesThe maximum number of subscribeToShard attempts if we get a recoverable exception.nono10Integer
scan.shard.subscribetoshard.backoff.baseThe base backoff time (in milliseconds) between each subscribeToShard attempt.nono1000Long
scan.shard.subscribetoshard.backoff.maxThe maximum backoff time (in milliseconds) between each subscribeToShard attempt.nono2000Long
scan.shard.subscribetoshard.backoff.expconstThe power constant for exponential backoff between each subscribeToShard attempt.nono1.5Double
scan.shard.getrecords.maxrecordcountThe maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.nono10000Integer
scan.shard.getrecords.maxretriesThe maximum number of getRecords attempts if we get a recoverable exception.nono3Integer
scan.shard.getrecords.backoff.baseThe base backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException.nono300Long
scan.shard.getrecords.backoff.maxThe maximum backoff time (in milliseconds) between getRecords attempts if we get a ProvisionedThroughputExceededException.nono1000Long
scan.shard.getrecords.backoff.expconstThe power constant for exponential backoff between each getRecords attempt.nono1.5Double
scan.shard.getrecords.intervalmillisThe interval (in milliseconds) between each getRecords request to a AWS Kinesis shard in milliseconds.nono200Long
aws.regionThe AWS region where the stream is defined. Either this or aws.endpoint are required.nonononeString
scan.shard.getiterator.maxretriesThe maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException.nono3Integer
scan.shard.getiterator.backoff.baseThe base backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException.nono300Long
scan.shard.getiterator.backoff.maxThe maximum backoff time (in milliseconds) between getShardIterator attempts if we get a ProvisionedThroughputExceededException.nono1000Long
scan.shard.getiterator.backoff.expconstThe power constant for exponential backoff between each getShardIterator attempt.nono1.5Double
aws.regionThe AWS region where the stream is defined. Either this or aws.endpoint are required.nonononeString
scan.shard.discovery.intervalmillisThe interval between each attempt to discover new shards.nono10000Integer
scan.shard.adaptivereadsThe config to turn on adaptive reads from a shard. See the AdaptivePollingRecordPublisher documentation for details.nonoFALSEBoolean
scan.shard.idle.intervalThe interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don’t receive new records.nono-1Long
scan.watermark.sync.intervalThe interval (in milliseconds) for periodically synchronizing the shared watermark state.nono30000Long
scan.watermark.lookahead.millisThe maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.nono0Long
aws.regionThe AWS region where the stream is defined. Either this or aws.endpoint are required.nonononeString
scan.watermark.sync.queue.capacityThe maximum number of records that will be buffered before suspending consumption of a shard.nono100Integer

Sink Options

ParameterDescriptionRequiredForwardedDefaultType
sink.partitionerOptional output partitioning from Flink’s partitions into Kinesis shards. See Sink Partitioning for details.noyesrandom or row-basedString
sink.partitioner-field-delimiternoyesString
sink.producer.*Deprecated options previously used by the legacy connector. Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings.nononone
sink.http-client.max-concurrencyMaximum number of allowed concurrent requests by KinesisAsyncClient.nono10000Integer
sink.http-client.read-timeoutMaximum amount of time in ms for requests to be sent by KinesisAsyncClient.nono360000Integer
sink.http-client.protocol.versionHttp version used by Kinesis Client.nonoHTTP2String
sink.batch.max-sizeMaximum batch size of elements to be passed to KinesisAsyncClient to be written downstream.noyes500Integer
sink.requests.max-inflightRequest threshold for uncompleted requests by KinesisAsyncClientbefore blocking new write requests and applying backpressure.noyes16Integer
sink.requests.max-bufferedRequest buffer threshold for buffered requests by KinesisAsyncClient before blocking new write requests and applying backpressure.noyes10000String
sink.flush-buffer.sizeThreshold value in bytes for writer buffer in KinesisAsyncClient before flushing.noyes5242880Long
sink.flush-buffer.timeoutThreshold time in milliseconds for an element to be in a buffer ofKinesisAsyncClient before flushing.noyes5000Long
sink.fail-on-errorFlag used for retrying failed requests. If set any request failure will not be retried and will fail the job.noyesFALSEBoolean
note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.