State Stores
This page introduces the different Kafka Streams State Store implementations available in Responsive and how to configure them.
Supported Stores
Store Type | Supported | Description |
---|---|---|
KeyValueStore | Key-value stores are stores that support simple key-value operations, as well as some additional query functionality such as retrieving all or a certain range of keys. Check out the javadocs for more details. | |
WindowStore | Window stores support key-value operations on a defined "window", which can be either hopping or tumbling. | |
SessionStore | Session stores are akin to Window stores but on a specially defined session window. |
Using Responsive Stores
All Responsive state stores can be generated through the ResponsiveStores
factory, covered in more detail below. Use these when building your
org.apache.kafka.streams.Topology
to easily swap in Responsive stores wherever
state is used. You can plug these into your topology in exactly the same way you
would nor
We also include type-specific parameters that can be used to enable features that
are exclusive to Responsive, such as ttl
(time-to-live).
DSL operators
For DSL operators, you will need to use the ResponsiveStores#materialized
API to obtain
a Materialized
object for each stateful operator in your topology.
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));
Processor API
For PAPI processors and DSL transformers, you will instead be using the ResponsiveStores#keyValueStore
to get a StoreSupplier
and then pass it into a ResponsiveStores#keyValueStoreBuilder
or ResponsiveStores#timestampedKeyValueStoreBuilder
to get a StoreBuilder
. You should
again have one StoreBuilder
(with a unique StoreSupplier
) for each state store.
builder.addStateStore(
ResponsiveStores.timestampedKeyValueStoreBuilder(
ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.keyValue(STATE_STORE)
.withTimeToLive(Duration.ofDays(30))),
new StringSerde(),
new StringSerde()
)
);