Skip to main content

State Stores

info

This page introduces the different Kafka Streams State Store implementations available in Responsive and how to configure them.

Supported Stores

Store TypeSupportedDescription
KeyValueStoreKey-value stores are stores that support simple key-value operations, as well as some additional query functionality such as retrieving all or a certain range of keys. Check out the javadocs for more details.
WindowStoreWindow stores support key-value operations on a defined "window", which can be either hopping or tumbling.
SessionStoreSession stores are akin to Window stores but on a specially defined session window.

Using Responsive Stores

All Responsive state stores can be generated through the ResponsiveStores factory, covered in more detail below. Use these when building your org.apache.kafka.streams.Topology to easily swap in Responsive stores wherever state is used. You can plug these into your topology in exactly the same way you would nor

We also include type-specific parameters that can be used to enable features that are exclusive to Responsive, such as ttl (time-to-live).

DSL operators

For DSL operators, you will need to use the ResponsiveStores#materialized API to obtain a Materialized object for each stateful operator in your topology.

Example Usage (DSL)
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));

Processor API

For PAPI processors and DSL transformers, you will instead be using the ResponsiveStores#keyValueStore to get a StoreSupplier and then pass it into a ResponsiveStores#keyValueStoreBuilder or ResponsiveStores#timestampedKeyValueStoreBuilder to get a StoreBuilder. You should again have one StoreBuilder (with a unique StoreSupplier) for each state store.

Example Usage (PAPI)
    builder.addStateStore(
ResponsiveStores.timestampedKeyValueStoreBuilder(
.keyValueStore(ResponsiveKeyValueParams.keyValue(STATE_STORE)
.withTimeToLive(Duration.ofDays(30))),
new StringSerde(),
new StringSerde()
)
);