All the state storage options you need.
Responsive extends Kafka Streams to use traditional Databases like MongoDB and Cassandra as state stores, as well as Responsive's cloud-native store, RS3. Learn more about how you can benefit from a disaggregated state architecture for your Kafka Streams applications.
Responsive Platform separates storage from compute as a means of improving operations without sacrificing transactional semantics and without imposing a performance penalty.
The problems with embedding RocksDB in Kafka Streams
Historically, Kafka Streams bundled storage locally on compute nodes by providing RocksDB based implementations of state stores. While storage on the compute nodes was not required to be durable (it could be recovered from a changelog topic in Kafka), performance necessitated storage to be persistent across restarts. The motivation for such a design was clear at the time: most deployments of Kafka Streams were running without access to the public cloud, and keeping the library free of external dependencies other than Kafka made it uniquely easy to deploy.
The limitations of coupling compute with storage are well known:
- Sizing your nodes becomes more of an art than a science, and many Kafka Streams deployments in the wild tend to overprovision on both to ensure that neither limitation is met.
- Elastic scaling is impossible since new nodes must bootstrap storage before they can become operational.
- Recovering from a fault or rebalancing is a challenge for the same reason as above.
You can read more about these limitations in this blog post.
Over time, the Kafka Streams developers have come up with elegant improvements to these problems: standby and warmup replicas, incremental rebalancing, sticky task assignment... the list goes on.
However, Responsive takes a different approach to solving this class of problems in Kafka Streams by improving the underlying architecture to separate compute and storage - effectively eliminating these concerns in one fell swoop.
Remote Storage Requirements
As managed services in the cloud have become mission-critical for many businesses, the original design goal of keeping Kafka Streams free of external dependencies holds less weight. To take advantage of this new paradigm, we built our storage solution to leverage existing distributed databases that are available in all public clouds such as MongoDB and ScyllaDB, as well as our very own RS3.
We selected these databases because the meet the following requirements:
- Can execute common state store operations efficiently such as Key-Value reads and range scans
- Provide atomic operations that allow us to implement
exactly_once_v2
processing guarantees - Are available in AWS, GCP and Azure
- Are battle-tested and come out of the box with world-class scalability, durability and fault-tolerance
Just as Kafka Streams must solve many tough problems despite delegating the storage layer to RocksDB, Responsive has developed solutions that ensure these databases operate well in the context of Kafka Streams. T
Benefits of disaggregated state storage
Instant Restoration and Rebalances
With a RocksDB-based deployment of Kafka Streams, restarting a node that has both a RocksDB store and a checkpoint file is easy: the application will know where from the changelog to start reading from and skip to that point before starting restoration.
Responsive's remote storage takes inspiration from that strategy, and makes it apply to any node - including those that don't have a checkpoint file. The latest offset stored in the remote storage is stored beside the remote store, so any new node can pick up from there.
This also doubles as a mechanism to ensure that all data written to the changelog topic is written to remote storage before the active task processes any data (see the next section for more information).
Flexible Partitioning
Our remote stores allow us to partition data independently of the Kafka partition. Responsive state stores takes advantages of this and allows flexibility with repartitioning - with a remote store, it is possible to increase the number of input partitions without changing the remote storage at all.
Row-level Time To Live
Responsive offers row-level time-to-live (or ttl
) for items in the state stores based on wall-clock - one
of the most highly requested Kafka Streams features. The traditional RocksDB
changelog restore mechanism means that implementing ttl
is difficult, as
the records in the changelog must be purged as well. Since records are only
ever read into our remote store once, we can leverage the remote storage engine's ttl
feature
and provide that functionality. Learn how to use this feature in our SDK documentation.
Interactive Queries
In the works is another highly requested feature: routing layers for Interactive Queries (IQ). In Kafka Streams, IQ only supports lookups on a single partition - any routing or scatter gather must be done by the client. Since Responsive's remote storage is backed by a fully distributed database, it can offer distributed queries that route the request to the correct node when queried from any node.
Patching Data
A common operation in data systems is to perform some manual surgery to remediate corrupted data. Since the Responsive remote stores are true databases, and do not rely on the recovering from the changelog topic, it is possible to patch the data in the remote stores directly. Check out the CLI for more details.
Challenges with Remote State, and how we solve them
Exactly Once Semantics
The section below is a summary of our talk at Kafka Summit London 2024 in which we dive deep into the design of
our remote StateStore
for Kafka Streams. The talk is free to watch
here.
Responsive for Kafka Streams leverages MongoDB's Atomic Writes
jand Scylla's Lightweight Transactions
along with the Kafka commit protocol to support processing.guarantee=exactly_once_v2
.
Specifically, three conditions must be met:
- Only the current active task may read uncommitted data while processing.
- A zombie task should not be able to commit
- All data committed to Kafka must be committed to the remote storage before any committed data can be read.
The first condition is ensured by buffering data in between Kafka commits. This means that the only time Responsive flushes data to the remote store is when data is committed to Kafka.
Zombies are fenced by using an epoch mechanism similar to that of the Kafka transaction protocol. Epochs are tracked differently for each different storage backend, but the concepts behind them are the same. The epoch is incremented whenever a new active task comes online using an atomic write operation (atomic writes in MongoDB and Lighweight Transactions in Scylla). This ensures that there will never be two tasks with the same epoch. Whenever a write is sent to the remote, the atomic operation also ensures that the current writer's epoch is greater than or equal to the stored one, which in turn ensures that zombies are fenced.
Finally, we leverage Kafka Streams' changelog topics to ensure that all data is committed to remote storage before an active can begin processing. After flushing data to remote storage, the client also stores the last offset in the changelog topic that was flushed. On a restore, we read that offset and restore the changelog topic from that point onwards into the remote store. This allows the changelog to function as a write-ahead-log.
Performance
We can have our cake and eat it too when it comes to performance. The Responsive storage solution buffers writes and flushes concurrently, which affords writes negligible overhead to throughput.
Reads are more complicated - physics dictates that reads from a remote store will never be as fast as reads from local storage. But will that affect your throughput? Responsive Platform provides many tools to ensure this isn't a problem:
- Most Kafka Streams applications benefit tremendously from caching: there is only a single writer, which means that reading from cache is consistent. Some applications, such as windowed stores and aggregates, often read a very predictable subset of the data, which makes caching extremely effective.
- Responsive Platform provides different types of stores that can be leveraged to meet your workload. In some scenarios, these specialized schemas may be more effective than the generic key-value stores provided by RocksDB. See the State Store API Reference for more details.
- We have implemented various mechanisms to prevent unnecessary lookups to the remote storage. In windowed operations, for example, we implemented a localized Bloom Filter that prevents many negative lookups.
- We implement Async Processing to concurrently process records within a single task, thus changing the upper limit of parallelization from the level of a Kafka partition to the level of a record key. This means that even in situations where remote reads cannot be avoided, they can be parallelized and have no effect on throughput.