Skip to main content

StateStore Config Reference

info

This page introduces the different Kafka Streams State Store implementations available in Responsive and how to configure them.

Supported Stores

Store TypeSupportedDescription
KeyValueStoreKey-value stores are stores that support simple key-value operations, as well as some additional query functionality such as retrieving all or a certain range of keys. Check out the javadocs for more details.
WindowStoreWindow stores support key-value operations on a defined "window", which can be either hopping or tumbling.
SessionStoreSession stores are akin to Window stores but on a specially defined session window.

Using Responsive Stores

All Responsive state stores can be generated through the ResponsiveStores factory, covered in more detail below. Use these when building your org.apache.kafka.streams.Topology to easily swap in Responsive stores wherever state is used. You can plug these into your topology in exactly the same way you would nor

We also include type-specific parameters that can be used to enable features that are exclusive to Responsive, such as ttl (time-to-live).

DSL operators

For DSL operators, you will need to use the ResponsiveStores#materialized API to obtain a Materialized object for each stateful operator in your topology.

Example Usage (DSL)
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));

Processor API

For PAPI processors and DSL transformers, you will instead be using the ResponsiveStores#keyValueStore to get a StoreSupplier and then pass it into a ResponsiveStores#keyValueStoreBuilder or ResponsiveStores#timestampedKeyValueStoreBuilder to get a StoreBuilder. You should again have one StoreBuilder (with a unique StoreSupplier) for each state store.

Example Usage (PAPI)
    builder.addStateStore(
ResponsiveStores.timestampedKeyValueStoreBuilder(
ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.keyValue(STATE_STORE)
.withTimeToLive(Duration.ofDays(30))),
new StringSerde(),
new StringSerde()
)
);

Responsive StateStore Features

Responsive state stores offer some features beyond what's available for the built-in stores of Kafka Streams:

Time-to-live for key-value stores

Key-value stores in Kafka Streams have infinite retention, but this can be undesirable for data sets with an unbounded keyspace or timeseries data. To address these use cases and minimize storage needs, Responsive allows you to define a ttl (time-to-live) on any KeyValueStore to set a retention time for the data inside it.

To define a ttl for your Responsive KeyValueStore, simply apply it to the ResponsiveKeyValueParams and pass these params in to the appropriate ResponsiveStores method (depending on whether you use the DSL or PAPI, as shown in the above section). There are two ways of doing this, corresponding to the two different types of ttl we offer:

Constant ttl

The first type of ttl that Responsive provides is a simple, constant tll. In this case the retention is applied equally to all records in the store. You can use the ResponsiveKeyValueParams#withTimeToLive(Duration) API to define a constant ttl for your store, as in the following example:

final ResponsiveKeyValueParams ttlKeyValueParams = ResponsiveKeyValueParams
.keyValue("my-store")
.withTimeToLive(Duration.ofDays(30))

Row-level ttl

The second type of ttl that Responsive provides is "row-level" ttl, in which a different retention can be applied to each individual record (or "row") in your state store. The ttl function can use either the key or value, or both, to compute the ttl for that row and override the default ttl. It's also possible to define a row-level ttl function with no default ttl, in other words, to apply no retention by default and only expire specific records. Conversely, you can apply a finite default ttl but override this to retain certain records infinitely.

This kind of ttl is defined using a TtlProvider passed in via the ResponsiveKeyValueParams#withTtlProvider API.

The following example shows a store with a default ttl of 30 days, where records with a key containing "OTTER" being retained infinitely and records with a key containing "SQUIRREL" being kept for only 24 hours.

final TtlProvider<String, ?> ttlProvider =
TtlProvider.<String, Object>withDefault(Duration.ofDays(30))
.fromKey(k -> {
if (k.contains("OTTER")) {
// keep forever
return Optional.of(TtlDuration.infinite());
} else if (k.contains("SQUIRREL")) {
// keep for only a day
return Optional.of(TtlDuration.of(Duration.ofDays(1)));
} else {
// keep for the default (30 days)
return Optional.empty();
}
});

final ResponsiveKeyValueParams params = ResponsiveKeyValueParams
.keyValue("my-store")
.withTtlProvider(ttlProvider);