Migrate From Kafka Streams
This documentation covers the information you need to know to migrate an existing Kafka Streams application to Responsive. If you would like to see an example of an end-to-end app, see the Quickstart.
Dependencies
Include the following dependencies into your project build.
- Maven
- Gradle
<dependency>
<groupId>dev.responsive</groupId>
<artifactId>kafka-client</artifactId>
<version>0.29.0</version>
</dependency>
implementation 'dev.responsive:kafka-client:0.29.0'
Migration
Migrate KafkaStreams
Replace usages of new KafkaStreams
with new ResponsiveKafkaStreams
.
Since ResponsiveKafkaStreams
extends KafkaStreams
, you should only need
to change invocations of the constructor:
- KafkaStreams streams = new KafkaStreams(...)
+ KafkaStreams streams = new ResponsiveKafkaStreams(...)
// Everything else stays the same
streams.start();
Migrate Code
All methods in org.apache.kafka.streams.state.Stores
have a corresponding
method in ResponsiveStores
to help make it ergonomic to replace your
existing store usage/creation to Responsive. See some examples below for
the different Kafka Streams APIs:
Using Stateful Operators (DSL)
KIP-954 was released in 3.7 and will make migrating DSL applications to Responsive as simple as a single configuration change. Until then, follow this guide.
🦦 You may want to examine your original topology via Topology#describe
in
order to locate the exact operators which are connected to state stores; Streams
may not always materialize state for all stateful operators. Adding
Materialized
where there was not previously a store is not incorrect, but it
may cause unnecessary duplication of stored data that would otherwise be
optimized away.
When using the DSL, state stores are either implicitly created as necessary
or explicitly materialized via the Materialied
class. You can create a
Responsive store using ResponsiveStores#materialized
:
KTable<Long, Object> table = builder.table(
topic,
- Materialized.as(Stores.persistentKeyValueStore("table"),
+ ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("table")
);
Note that Materialized.as(ResponsiveStores.keyValueStore("table"))
will compile but will not perform important validation steps that prevent
errors. Make sure to always use the ResponsiveStores.materialized
method.
Using Stateful Transformers (DSL + PAPI)
If you are using the DSL in conjunction with the PAPI, you must register stores
manually with the StreamsBuilder
. These stores should be modified to use
ResponsiveStores
:
StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(
- Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(
+ ResponsiveStores.keyValueStoreBuilder(ResponsiveStores.keyValueStore(
...
);
);
// this stays the same
builder.process(() -> new Processor() {
@Override public void init(final ProcessorContext context) {
store = context.getStateStore(...);
}
}
Using Processor API (PAPI)
If you are using the PAPI, you must register stores manually with the
Topology
. These stores should be modified to use ResponsiveStores
:
Topology topology = new Topology();
topology.addStateStore(
- Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(...), ...),
+ ResponsiveStores.keyValueStoreBuilder(ResponsiveStores.keyValueStore(...), ...),
"processor-1"
);
// this stays the same
topology.addProcessor("processor-1", () -> new Processor(){
@Override public void init(final ProcessorContext context) {
store = context.getStateStore(...);
}
});
See the State Stores API Reference for more details, as well as some additional state store configurations that are exclusive to Responsive.
Migrate Tests
Responsive provides its own version of the TopologyTestDriver
that can be
used in exactly the same way, with all the same APIs, so that all you need
to do is swap it out with ResponsiveTopologyTestDriver
:
Topology topology = appTopology();
TopologyTestDriver testDriver =
- new TopologyTestDriver(topology, props, Instant.EPOCH);
+ new ResponsiveTopologyTestDriver(topology, props, Instant.EPOCH);
Like the TopologyTestDriver
, the Responsive equivalent does not require any
additional infrastructure such as a Kafka cluster or Responsive platform
components such as the storage server or operator. The driver does run with
real Responsive state stores; only the innermost layer that connects to the
remote storage engine is swapped out for a simple in-memory map. This makes
it a viable option for testing correctness and application logic.
You can check out ResponsiveTopologyTestDriverTest
for some full examples of using the ResponsiveTopologyTestDriver
if you are
writing new tests from scratch.