Introducing the brand new Amazon Kinesis supply connector for Apache Flink


On November 11, 2024, the Apache Flink group launched a brand new model of AWS providers connectors, an AWS open supply contribution. This new launch, model 5.0.0, introduces a brand new supply connector to learn knowledge from Amazon Kinesis Knowledge Streams. On this publish, we clarify how the brand new options of this connector can enhance efficiency and reliability of your Apache Flink software.

Apache Flink has each a supply and sink connector, to learn from and write to Kinesis Knowledge Streams. On this publish, we give attention to the brand new supply connector, as a result of model 5.0.0 doesn’t introduce new performance for the sink.

Apache Flink is a framework and distributed stream processing engine designed to carry out computation at in-memory velocity and at any scale. Amazon Managed Service for Apache Flink presents a totally managed, serverless expertise to run your Flink purposes, applied in Java, Python or SQL, and utilizing all of the APIs accessible in Flink: SQL, Desk, DataStream, and ProcessFunction API.

Apache Flink connectors

Flink helps studying and writing knowledge to exterior techniques, by way of connectors, that are elements that permit your software to work together with stream-storage message brokers, databases, or object shops. Kinesis Knowledge Streams is a well-liked supply and vacation spot for streaming purposes. Flink offers each supply and sink connectors for Kinesis Knowledge Streams.

The next diagram illustrates a pattern structure.

Earlier than continuing additional, it’s vital to make clear three phrases typically used interchangeably in knowledge streaming and within the Apache Flink documentation:

  • Kinesis Knowledge Streams refers back to the Amazon service
  • Kinesis supply and Kinesis client confer with the Apache Flink elements, specifically the supply connectors, that enables studying knowledge from Kinesis Knowledge Streams
  • On this publish, we use the time period stream referring to a single Kinesis knowledge stream

Introducing the brand new Flink Kinesis supply connector

The launch of the model 5.0.0 of AWS connectors introduces a brand new connector for studying occasions from Kinesis Knowledge Streams. The brand new connector is known as Kinesis Streams Supply and supersedes the Kinesis Client because the supply connector for Kinesis Knowledge Streams.

The brand new connector introduces a number of new options and adheres to the brand new Flink Supply interface, and is appropriate with Flink 2.x, the primary main model launch by the Flink group. Flink 2.x introduces various breaking modifications, together with eradicating the SourceFunction interface utilized by legacy connectors. The legacy Kinesis Client will now not work with Flink 2.x.

Organising the connector is barely completely different than with the legacy Kinesis connector. Let’s begin with the DataStream API.

use the brand new connector with the DataStream API

So as to add the brand new connector to your software, you’ll want to replace the connector dependency. For the DataStream API, the dependency has modified its title to flink-connector-aws-kinesis-streams.

On the time of writing, the most recent connector model is 5.0.0 and it helps the newest steady Flink variations, 1.19 and 1.20. The connector can also be appropriate with Flink 2.0, however no connector has been formally launched for Flink 2.x but. Assuming you’re utilizing Flink 1.20, the brand new dependency is the next:


    org.apache.flink
    flink-connector-aws-kinesis streams
    5.0.0-1.20

The connector makes use of the brand new Flink Supply interface. This interface implements the brand new FLIP-27 normal, and replaces the legacy SourceFunction interface that has been deprecated. SourceFunction will probably be fully eliminated in Flink 2.x.

In your software, now you can use a fluent and expressive builder interface to instantiate and configure the supply. The minimal setup solely requires the stream Amazon Useful resource Identify (ARN) and the deserialization schema:

KinesisStreamsSource kdsSource = KinesisStreamsSource.builder()
    .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
    .setDeserializationSchema(new SimpleStringSchema())
    .construct();

The brand new supply class is known as KinesisStreamSource. To not be confused with the legacy supply, FlinkKinesisConsumer.

You may then add the supply to the execution atmosphere utilizing the brand new fromSource() methodology. This methodology requires explicitly specifying the watermark technique, together with a reputation for the supply:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ...
DataStream kinesisRecordsWithEventTimeWatermarks = env.fromSource(
    kdsSource,
    WatermarkStrategy.forMonotonousTimestamps()
        .withIdleness(Length.ofSeconds(1)),
    "Kinesis supply");

These few traces of code introduce among the most important modifications within the interface of the connector, which we focus on within the following sections.

Stream ARN

Now you can outline the Kinesis knowledge stream ARN, versus the stream title. This makes it less complicated to eat from streams cross-Area and cross-account.

When working in Amazon Managed Service for Apache Flink, you solely want so as to add to the appliance AWS Identification and Entry Administration (IAM) function permissions to entry the stream. The ARN permits pointing to a stream situated in a unique AWS Area or account, with out assuming roles or passing any exterior credentials.

Express watermark

One of the vital traits of the brand new Supply interface is that it’s a must to explicitly outline a watermark technique whenever you connect the supply to the execution atmosphere. In case your software solely implements processing-time semantics, you’ll be able to specify WatermarkStrategy.noWatermarks().

That is an enchancment by way of code readability. Wanting on the supply, you understand instantly which sort of watermark you could have, or when you don’t have any. Beforehand, many connectors had been offering some kind of default watermarks that the consumer might override. Nonetheless, the default watermark of every connector was barely completely different and complicated for the consumer.

With the brand new connector, you’ll be able to obtain the identical conduct because the legacy FlinkKinesisConsumer default watermarks, utilizing WatermarkStrategy.forMonotonousTimestamps(), as proven within the earlier instance. This technique generates watermarks primarily based on the approximateArrivalTimestamp returned by Kinesis Knowledge Streams. This timestamp corresponds to the time when the file was revealed to Kinesis Knowledge Streams.

Idleness and watermark alignment

With the watermark technique, you’ll be able to moreover outline an idleness, which permits the watermark to progress even when some shards of the stream are idle and receiving no data. Confer with Dealing With Idle Sources for extra particulars about idleness and watermark turbines.

A function launched by the brand new Supply interface, and totally supported by the brand new Kinesis supply, is watermark alignment. Watermark alignment works in the other way of idleness. It slows down consuming from a shard that’s progressing quicker than others. That is significantly helpful when replaying knowledge from a stream, to cut back the quantity of information buffered within the software state. Confer with Watermark alignment for extra particulars.

Arrange the connector with the Desk API and SQL

Assuming you’re utilizing Flink 1.20, the dependency containing each Kinesis supply and sink for the Desk API and SQL is the next (each Flink 1.19 and 1.20 are supported, alter the model accordingly):


    org.apache.flink
    flink-connector-kinesis
    5.0.0-1.20

This dependency incorporates each the brand new supply and the legacy supply. Confer with Versioning in case you’re planning to make use of each in the identical software.

When defining the supply in SQL or the Desk API, you employ the connector title kinesis, because it was with the legacy supply. Nonetheless, many parameters have modified with the brand new supply:

CREATE TABLE KinesisTable (
    `user_id` BIGINT,
    `item_id` BIGINT,
    `category_id` BIGINT,
    `conduct` STRING,
    `ts` TIMESTAMP(3)
)
PARTITIONED BY (user_id, item_id)
WITH (
    'connector' = 'kinesis',
    'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
    'aws.area' = 'us-east-1',
    'supply.init.place' = 'LATEST',
    'format' = 'csv'
);

A few notable connector choices modified from the legacy supply are:

  • stream.arn specifies the stream ARN, versus the stream title used within the legacy supply.
  • init.initpos defines the beginning place. This feature works equally to the legacy supply, however the possibility title is completely different. It was beforehand scan.stream.initpos.

For the total record of connector choices confer with Connector Choices.

New options and enhancements

On this part, we focus on a very powerful options launched by the brand new connector. These options can be found within the DataStream API, and likewise the Desk API and SQL.

Ordering ensures

An important enchancment launched by the brand new connector is about ordering ensures.

With Kinesis Knowledge Streams, the order of the message is retained per partitionId. That is achieved by placing all data with the identical partitionId in the identical shard. Nonetheless, when the stream scales, splitting or merging shards, data with the identical partitionId find yourself in a brand new shard. Kinesis retains observe of the parent-child lineage when resharding occurs.

Stream resharding

One identified limitation of the legacy Kinesis supply is that it was unable to comply with the parent-child shard lineage. As a consequence, ordering couldn’t be assured when resharding occurs. The issue was significantly related when the appliance replayed previous messages from a stream that had been resharded as a result of ordering can be misplaced. This additionally made watermark technology and event-time processing non-deterministic.

With the brand new connector, ordering is retained additionally when resharding occurs. That is achieved following the parent-child shard lineage, and consuming all data from a dad or mum shard earlier than continuing with the kid shard.

How the connector follows shard lineage

A greater default shard assigner

Every Kinesis knowledge stream is comprised of many shards. Additionally, the Flink supply operator runs in a number of parallel subtasks. The shard assigner is the part that decides easy methods to assign the shards of the stream throughout the supply subtasks. The shard assigner’s job is non-trivial, as a result of shard break up or merge operations (resharding) would possibly occur when the stream scales up or down.

The brand new connector comes with a brand new default assigner, UniformShardAssigner. This assigner maintains uniform distribution of the stream partitionId throughout parallel subtasks, additionally when resharding occurs. That is achieved by trying on the vary of partition keys (HashKeyRange) of every shard.

This shard assigner was already accessible within the earlier connector model, however for backward compatibility, it was not the default and also you needed to set it up explicitly. That is now not the case with the brand new supply. The previous default shard assigner, the legacy FlinkKinesisConsumer, was evenly distributing shards (not partitionId) throughout subtasks. On this case, the precise knowledge distribution would possibly develop into uneven within the case of resharding, due to the mix of open and closed shards within the stream. Confer with Shard Project Technique for extra particulars.

Diminished JAR measurement

The dimensions of the JAR file has been lowered by 99%, from about 60 MB all the way down to 200 KB. This considerably reduces the dimensions of the fat-JAR of your software utilizing the connector. A smaller JAR can velocity up many operations that require redeploying the appliance.

AWS SDK for Java 2.x

The brand new connector is predicated on the newer AWS SDK for Java 2.x, which provides a number of options and improves assist for non-blocking I/O. This makes the connector future-proof as a result of the AWS SDK v1 will attain end-of-support by finish of 2025.

AWS SDK built-in retry technique

The brand new connector depends on the AWS SDK built-in retry technique, versus a customized technique applied by the legacy connector. Counting on the AWS SDK improves the classification of some errors as retriable or non-retriable.

Eliminated dependency on the Kinesis Consumer Library and Kinesis Producer Library

The brand new connector bundle now not contains the Kinesis Consumer Library (KCL) and Kinesis Producer Library (KPL), contributing to the substantial discount of the JAR measurement that now we have talked about.

An implication of this alteration is that the brand new connector now not helps de-aggregation out of the field. Except you’re publishing data to the stream utilizing the KPL and also you enabled aggregation, this is not going to make any distinction for you. In case your producers use KPL aggregation, you would possibly think about implementing a customized DeserializationSchema to de-aggregate the data within the supply.

Migrating from the legacy connector

Flink sources sometimes save the place within the checkpoint and savepoints, referred to as snapshots in Amazon Managed Service for Apache Flink. If you cease and restart the appliance, or whenever you replace the appliance to deploy a change, the default conduct is saving the supply place within the snapshot simply earlier than stopping the appliance, and restoring the place when the appliance restarts. This permits Flink to offer exactly-once ensures on the supply.

Nonetheless, as a result of main modifications launched by the brand new KinesisSource, the saved state is now not appropriate with the legacy FlinkKinesisConsumer. Because of this whenever you improve the supply of an current software, you’ll be able to’t straight restore the supply place from the snapshot.

For that reason, migrating your software to the brand new supply requires some consideration. The precise migration course of will depend on your use case. There are two basic situations:

  • Your software makes use of the DataStream API and you’re following Flink finest practices defining a UID on every operator
  • Your software makes use of the Desk API or SQL, or your software used the DataStream API and you aren’t defining a UID on every operator

Let’s cowl every of those situations.

Your software makes use of the DataStream API and you’re defining a UID on every operator

On this case, you would possibly think about selectively resetting the state of the supply operator, retaining some other software state. The final strategy is as follows:

  1. Replace your software dependencies and code, changing the FlinkKinesisConsumer with the brand new KinesisSource.
  2. Change the UID of the supply operator (use a unique string). Go away all different operators’ UIDs This can selectively reset the state of the supply whereas retaining the state of all different operators.
  3. Configure the supply beginning place utilizing AT_TIMESTAMP and set the timestamp to only earlier than the second you’ll deploy the change. See Configuring Beginning Place to discover ways to set the beginning place. We advocate passing the timestamp as a runtime property to make this extra versatile. The configured supply beginning place is used solely when the appliance can’t restore the state from a savepoint (or snapshot). On this case, we’re intentionally forcing this, altering the UID of the supply operator.
  4. Replace the Amazon Managed Service for Apache Flink software, deciding on the brand new JAR containing the modified software. Restart from the most recent snapshot (default conduct) and choose allowNonRestoredState = true. With out this flag, Flink would forestall restarting the appliance, not having the ability to restore the state of the previous supply that was saved within the snapshot. See Savepointing for extra particulars about allowNonRestoredState.

This strategy will trigger the reprocessing of some data from the supply, and inside state exactly-once consistency could be damaged. Fastidiously consider the affect of reprocessing in your software, and the affect of duplicates on the downstream techniques.

Your software makes use of the Desk API or SQL, or your software used the DataStream API and you aren’t defining a UID on every operator

On this case, you’ll be able to’t selectively reset the state of the supply operator.

Why does this occur? When utilizing the Desk API or SQL, or the DataStream API with out defining the operator’s UID explicitly, Flink routinely generates identifiers for all operators primarily based on the construction of the job graph of your software. These identifiers are used to determine the state of every operator when saved within the snapshots, and to revive it to the right operator whenever you restart the appliance.

Modifications to the appliance would possibly trigger modifications within the underlying knowledge stream. This modifications the auto-generated identifier. In case you are utilizing the DataStream API and you’re specifying the UID, Flink makes use of your identifiers as an alternative of the auto-generated identifies, and is ready to map again the state to the operator, even whenever you make modifications to the appliance. That is an intrinsic limitation of Flink, defined in Set UUIDs For All Operators. Enabling allowNonRestoredState doesn’t resolve this drawback, as a result of Flink isn’t capable of map the state saved within the snapshot with the precise operators, after the modifications.

In our migration situation, the one possibility is resetting the state of your software. You may obtain this in Amazon Managed Service for Apache Flink by deciding on Skip restore from snapshot (SKIP_RESTORE_FROM_SNAPSHOT) whenever you deploy the change that replaces the supply connector.

After the appliance utilizing the brand new supply is up and working, you’ll be able to change again to the default conduct of when restarting the appliance, utilizing the most recent snapshots (RESTORE_FROM_LATEST_SNAPSHOT). This fashion, no knowledge loss occurs when the appliance is restarted.

Choosing the proper connector bundle and model

The dependency model you’ll want to decide is generally -. For instance, the most recent Kinesis connector model is 5.0.0. In case you are utilizing a Flink runtime model 1.20.x, your dependency for the DataStream API is 5.0.0-1.20.

For probably the most up-to-date connector variations, see Use Apache Flink connectors with Managed Service for Apache Flink.

Connector artifact

In earlier variations of the connector (4.x and earlier than), there have been separate packages for the supply and sink. This extra degree of complexity has been eliminated with model 5.x.

In your Java software, or Python purposes the place you bundle JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository, the next dependency incorporates the brand new model of each supply and sink connectors:


    org.apache.flink
    flink-connector-aws-kinesis-streams
    5.0.0-1.20

Be sure to’re utilizing the most recent accessible model. On the time of writing, that is 5.0.0. You may confirm the accessible artifact variations in Maven Central. Additionally, use the right model relying in your Flink runtime model. The earlier instance is for Flink 1.20.0.

Connector artifacts for Python software

Should you use Python, we advocate packaging JAR dependencies utilizing Maven, as proven within the Amazon Managed Service for Apache Flink examples GitHub repository. Nonetheless, when you’re passing straight a single JAR to your Amazon Managed Service for Apache Flink software, you’ll want to use the artifact that features all transitive dependencies. Within the case of the brand new Kinesis supply and sink, that is referred to as flink-sql-connector-aws-kinesis-streams. This artifact contains solely the brand new supply. Confer with Amazon Kinesis Knowledge Streams SQL Connector for the correct bundle, in case you wish to use each the brand new and the legacy supply.

Conclusion

The brand new Flink Kinesis supply connector introduces many new options that enhance stability and efficiency, and prepares your software for Flink 2.x. Help for watermark idleness and alignment is a very vital function in case your software makes use of event-time semantics. The power to retain file ordering improves knowledge consistency, specifically when stream resharding occurs, and whenever you replay previous knowledge from a stream that has been reshared.

It’s best to fastidiously plan the change when you’re migrating your software from the legacy Kinesis supply connector, and ensure you comply with Flink’s finest practices like specifying a UID on all DataStream operators.

You will discover a working instance of Java DataStream API software utilizing the brand new connector, within the Amazon Managed Service for Apache Flink samples GitHub repository.

To study extra concerning the new Flink Kinesis supply connector, confer with Amazon Kinesis Knowledge Streams Connector and Amazon Kinesis Knowledge Streams SQL Connector.


In regards to the Creator

Lorenzo NicoraLorenzo Nicora works as a Senior Streaming Options Architect at AWS, serving to prospects throughout EMEA. He has been constructing cloud-centered, data-intensive techniques for over 25 years, working throughout industries each by way of consultancies and product firms. He has used open supply applied sciences extensively and contributed to a number of initiatives, together with Apache Flink.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles