Skip to main content

Migrate From Kafka Streams

This documentation covers the information you need to know to migrate an existing Kafka Streams application to Responsive. If you would like to see an example of an end-to-end app, see the Quickstart.

Dependencies

Include the following dependencies into your project build.

<dependency>
<groupId>dev.responsive</groupId>
<artifactId>kafka-client</artifactId>
<version>0.22.0</version>
</dependency>

Migration

Migrate KafkaStreams

Replace usages of new KafkaStreams with new ResponsiveKafkaStreams. Since ResponsiveKafkaStreams extends KafkaStreams, you should only need to change invocations of the constructor:

-   KafkaStreams streams = new KafkaStreams(...)
+ KafkaStreams streams = new ResponsiveKafkaStreams(...)
// Everything else stays the same
streams.start();

Migrate Code

All methods in org.apache.kafka.streams.state.Stores have a corresponding method in ResponsiveStores to help make it ergonomic to replace your existing store usage/creation to Responsive. See some examples below for the different Kafka Streams APIs:

Using Stateful Operators (DSL)

KIP-954

KIP-954 was released in 3.7 and will make migrating DSL applications to Responsive as simple as a single configuration change. Until then, follow this guide.

Avoid Unnecessary State

🦦 You may want to examine your original topology via Topology#describe in order to locate the exact operators which are connected to state stores; Streams may not always materialize state for all stateful operators. Adding Materialized where there was not previously a store is not incorrect, but it may cause unnecessary duplication of stored data that would otherwise be optimized away.

When using the DSL, state stores are either implicitly created as necessary or explicitly materialized via the Materialied class. You can create a Responsive store using ResponsiveStores#materialized:

  KTable<Long, Object> table = builder.table(
topic,
- Materialized.as(Stores.persistentKeyValueStore("table"),
+ ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("table")
);

Note that Materialized.as(ResponsiveStores.keyValueStore("table")) will compile but will not perform important validation steps that prevent errors. Make sure to always use the ResponsiveStores.materialized method.

Using Stateful Transformers (DSL + PAPI)

If you are using the DSL in conjunction with the PAPI, you must register stores manually with the StreamsBuilder. These stores should be modified to use ResponsiveStores:

  StreamsBuilder builder = new StreamsBuilder();
builder.addStateStore(
- Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(
+ ResponsiveStores.keyValueStoreBuilder(ResponsiveStores.keyValueStore(
...
);
);

// this stays the same
builder.process(() -> new Processor() {
@Override public void init(final ProcessorContext context) {
store = context.getStateStore(...);
}
}

Using Processor API (PAPI)

If you are using the PAPI, you must register stores manually with the Topology. These stores should be modified to use ResponsiveStores:

  Topology topology = new Topology();

topology.addStateStore(
- Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(...), ...),
+ ResponsiveStores.keyValueStoreBuilder(ResponsiveStores.keyValueStore(...), ...),
"processor-1"
);

// this stays the same
topology.addProcessor("processor-1", () -> new Processor(){
@Override public void init(final ProcessorContext context) {
store = context.getStateStore(...);
}
});

See the State Stores API Reference for more details, as well as some additional state store configurations that are exclusive to Responsive.

Migrate Tests

Responsive provides its own version of the TopologyTestDriver that can be used in exactly the same way, with all the same APIs, so that all you need to do is swap it out with ResponsiveTopologyTestDriver:

  Topology topology = appTopology();
TopologyTestDriver testDriver =
- new TopologyTestDriver(topology, props, Instant.EPOCH);
+ new ResponsiveTopologyTestDriver(topology, props, Instant.EPOCH);

Like the TopologyTestDriver, the Responsive equivalent does not require any additional infrastructure such as a Kafka cluster or Responsive platform components such as the storage server or operator. The driver does run with real Responsive state stores; only the innermost layer that connects to the remote storage engine is swapped out for a simple in-memory map. This makes it a viable option for testing correctness and application logic.

note

You can check out ResponsiveTopologyTestDriverTest for some full examples of using the ResponsiveTopologyTestDriver if you are writing new tests from scratch.