StateStore Config Reference
This page introduces the different Kafka Streams State Store implementations available in Responsive and how to configure them.
Supported Stores
Store Type | Supported | Description |
---|---|---|
KeyValueStore | Key-value stores are stores that support simple key-value operations, as well as some additional query functionality such as retrieving all or a certain range of keys. Check out the javadocs for more details. | |
WindowStore | Window stores support key-value operations on a defined "window", which can be either hopping or tumbling. | |
SessionStore | Session stores are akin to Window stores but on a specially defined session window. |
Using Responsive Stores
All Responsive state stores can be generated through the ResponsiveStores
factory, covered in more detail below. Use these when building your
org.apache.kafka.streams.Topology
to easily swap in Responsive stores wherever
state is used. You can plug these into your topology in exactly the same way you
would nor
We also include type-specific parameters that can be used to enable features that
are exclusive to Responsive, such as ttl
(time-to-live).
DSL operators
For DSL operators, you will need to use the ResponsiveStores#materialized
API to obtain
a Materialized
object for each stateful operator in your topology.
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));
Processor API
For PAPI processors and DSL transformers, you will instead be using the ResponsiveStores#keyValueStore
to get a StoreSupplier
and then pass it into a ResponsiveStores#keyValueStoreBuilder
or ResponsiveStores#timestampedKeyValueStoreBuilder
to get a StoreBuilder
. You should
again have one StoreBuilder
(with a unique StoreSupplier
) for each state store.
builder.addStateStore(
ResponsiveStores.timestampedKeyValueStoreBuilder(
ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.keyValue(STATE_STORE)
.withTimeToLive(Duration.ofDays(30))),
new StringSerde(),
new StringSerde()
)
);
Responsive StateStore Features
Responsive state stores offer some features beyond what's available for the built-in stores of Kafka Streams:
Time-to-live for key-value stores
Key-value stores in Kafka Streams have infinite retention, but this can be undesirable for data sets with an unbounded keyspace or timeseries data. To address these use cases and minimize storage needs, Responsive allows you to define a ttl (time-to-live) on any KeyValueStore to set a retention time for the data inside it.
To define a ttl for your Responsive KeyValueStore, simply apply it to the ResponsiveKeyValueParams
and pass these
params in to the appropriate ResponsiveStores
method (depending on whether you use the DSL or PAPI, as shown in the
above section).
There are two ways of doing this, corresponding to the two different types of ttl we offer:
Constant ttl
The first type of ttl that Responsive provides is a simple, constant tll. In this case the retention is applied equally
to all records in the store. You can use the ResponsiveKeyValueParams#withTimeToLive(Duration)
API to define a constant
ttl for your store, as in the following example:
final ResponsiveKeyValueParams ttlKeyValueParams = ResponsiveKeyValueParams
.keyValue("my-store")
.withTimeToLive(Duration.ofDays(30))
Row-level ttl
The second type of ttl that Responsive provides is "row-level" ttl, in which a different retention can be applied to each individual record (or "row") in your state store. The ttl function can use either the key or value, or both, to compute the ttl for that row and override the default ttl. It's also possible to define a row-level ttl function with no default ttl, in other words, to apply no retention by default and only expire specific records. Conversely, you can apply a finite default ttl but override this to retain certain records infinitely.
This kind of ttl is defined using a TtlProvider
passed in via the ResponsiveKeyValueParams#withTtlProvider
API.
The following example shows a store with a default ttl of 30 days, where records with a key containing "OTTER" being retained infinitely and records with a key containing "SQUIRREL" being kept for only 24 hours.
final TtlProvider<String, ?> ttlProvider =
TtlProvider.<String, Object>withDefault(Duration.ofDays(30))
.fromKey(k -> {
if (k.contains("OTTER")) {
// keep forever
return Optional.of(TtlDuration.infinite());
} else if (k.contains("SQUIRREL")) {
// keep for only a day
return Optional.of(TtlDuration.of(Duration.ofDays(1)));
} else {
// keep for the default (30 days)
return Optional.empty();
}
});
final ResponsiveKeyValueParams params = ResponsiveKeyValueParams
.keyValue("my-store")
.withTtlProvider(ttlProvider);