State Stores
This page introduces the different Kafka Streams State Store implementations available in Responsive and how to configure them.
Store Types
Responsive supports the three main store types of Kafka Streams:
- KeyValueStore
- WindowStore (coming soon!)
- SessionStore (coming soon!)
KeyValueStore
Key-value stores are stores that support simple key-value operations, as well as some additional query functionality such as retrieving all or a certain range of keys. Check out the javadocs for more details.
There are two sub-types of the key-value store:
- Key-Value Store (the default -- if you're not sure which one to use, pick the plain key-value store)
- Fact Store
Key-Value Store
This is your basic key-value store with the same semantics and functionality as the default KeyValueStore in Kafka Streams. For the majority of use cases, this will be the appropriate choice. Generally all stateful DSL operators should be using this sub-type.
Fact Store
A fact store assumes that either all writes for a given key will always have the same value, or the user of the store is value-agnostic and cares only about the keys. The implementation does not enforce this constraint, but may return stale values and should be used with caution.
For those applications that fit the described criteria, this store can show significantly better performance by optimizing the consistency protocol to allow split-brain writes that would otherwise have been fenced.
Examples of usage patterns that make good use of a fact store:
- A deduplication store that records whether or not a key has been seen.
- Sensor data that reports measurements from sensors as time-series data.
- Processing of keyed events with no payload, where the value is missing or ignored.
Delete operations, while supported, are not recommended on fact tables as it
can prevent data from properly being cleaned. If deletion is required to avoid
unbounded growth, consider specifying a time-to-live (ttl
) instead. This can
be set using the ResponsiveKeyValueParams
covered in more detail below.
WindowStore
Time windowed stores are not yet implemented, check back soon or follow our blog for release announcements!
SessionStore
Session windowed stores are not yet implemented, check back soon or follow our blog for release announcements!
Plugging Responsive into your Topology
All Responsive state stores can be generated through the ResponsiveStores
factory, covered in more detail below. Use these when building your
org.apache.kafka.streams.Topology
to easily swap in Responsive stores wherever
state is used. You can plug these into your topology in exactly the same way you
would nor
See org.apache.kafka.streams.state.Stores
for instructions on how to plug in custom state stores and configure them, or
Migrate Kafka Streams
for some examples of migrating an existing application topology.
ResponsiveStores
A factory for creating Kafka Streams state stores on top of a Responsive storage backend. This class includes static APIs for all store types and usages, whether you are plugging them into a DSL operator, transformer, or PAPI processor.
We also include type-specific parameters that can be used to enable features that
are exclusive to Responsive, such as ttl
for key-value stores and changelog
truncation. See Responsive Params for more information on the available options.
DSL operators
For DSL operators, you will need to use the ResponsiveStores#materialized
API to obtain
a Materialized
object for each stateful operator in your topology.
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));
PAPI
For PAPI processors and DSL transformers, you will instead be using the ResponsiveStores#keyValueStore
to get a StoreSupplier
and then pass it into a ResponsiveStores#keyValueStoreBuilder
or ResponsiveStores#timestampedKeyValueStoreBuilder
to get a StoreBuilder
. You should
again have one StoreBuilder
(with a unique StoreSupplier
) for each state store.
builder.addStateStore(
ResponsiveStores.timestampedKeyValueStoreBuilder(
.keyValueStore(ResponsiveKeyValueParams.fact(STATE_STORE)
.withTimeToLive(Duration.ofDays(30))),
new StringSerde(),
new StringSerde()
)
);
Notable Methods
Method Name | Notes |
---|---|
keyValueStore( | Returns a key-value StoreSupplier . This method should be preferred over keyValueStore(String) as it provides additional functionality for configuring features unique to responsive, such as ttl , changelog truncation, and the fact variant. See also ResponsiveKeyValueParams . |
materialized( | Similar to the above methods, but for use in the Kafka Streams DSL. |
keyValueStoreBuilder( | Returns a key-value StoreBuilder that can be passed in to custom processors or transformers. Make sure to pass in the StoreSupplier from #keyValueStore(ResponsiveKeyValueParams) . |
timestampedKeyValueStoreBuilder( | Returns a timestamped key-value StoreBuilder that can be passed in to custom processors or transformers. A timestamped store can be used to store a timestamp associated with each record. Make sure to pass in the StoreSupplier from #keyValueStore(ResponsiveKeyValueParams) . |
Responsive Parameters
In addition to the usual configuration options for state stores (eg store name), we provide several optional Responsive features that are specific to each store type.
ResponsiveKeyValueParams
A parameter class that contains the configuration options for all KeyValueStores types.
Notable Methods
Method Name | Notes |
---|---|
keyValue(String) | Indicates that the desired store should be a key-value store |
fact(String) | Indicates that the desired store should be a fact store |
withTimeToLive() | Sets a time-to-live (ttl ) on the store created with these parameters. ttl works on a wall-clock basis, meaning the records that are inserted will no longer be retrievable after ttl has elapsed independent of whether or not stream time is advanced. For more information on the distinction between stream/wallclock time see the Kafka Streams docs |
withTruncateChangelog() | Indicates that Responsive should delete records from the store's changelog topic once they have been committed successfully to the remote state store. This is an optimization to avoid storing duplicate data that has already been persisted to the Responsive storage backend and is no longer needed by this Kafka Streams application. Not compatible with tables using the source-topic changelog optimization. Caution: do not enable if there are downstream dependencies that consume from the changelog. |
ResponsiveWindowParams
Time windowed stores are not yet implemented, check back soon or follow our blog for release announcements!
ResponsiveSessionParams
Session windowed stores are not yet implemented, check back soon or follow our blog for release announcements!