Skip to main content

SDK Quickstart

Overview

After completing this quickstart, you will have:

  1. Added the Responsive SDK to a Kafka Streams application.
  2. Got the application to use a local MongoDB instance as a state store.

Note: You will need a free Responsive Trial License to complete this quickstart. Instructions for obtaining this license are a part of the quickstart.

We've built a simple repository that contains the skeleton code you need to run the Kafka Streams WordCount example as well as migrate that code to Responsive. Following that will run through the following quickstart:

  1. Spin up a Kafka Broker and a MongoDB instance
  2. Run a KafkaStreams application
  3. Migrate to ResponsiveKafkaStreams and run it
  4. Query MongoDB to demonstrate disaggregated state

Initial setup

Required tools

In order to run this quickstart, you must have access to the following systems:

  • Git to clone the repository
  • Docker and Docker Compose (or a compatible engine)
  • Gradle (it will install JDK 21 if you do not have it installed)

Get the Code

Get the quickstart code locally with:

$ git clone https://github.com/responsivedev/java-sdk-quickstart.git
$ cd java-sdk-quickstart

Prepare your dependencies

Setup Kafka and MongoDB

The first step is to deploy a Kafka broker and a MongoDB instance. You can use the following docker-compose.yml file (which is also included in <root>/etc of this repository):

docker-compose.yml
---
services:
broker:
image: confluentinc/cp-kafka:7.7.1
container_name: kafka-broker
ports:
- "9092:9092"
environment:
CLUSTER_ID: 'jHS82zyorYvKMntfzD4XRQ'
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTER_BROKER'
KAFKA_LISTENERS: 'INTER_BROKER://:29092,PUBLIC://:9092,CONTROLLER://:9093'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:9093'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTER_BROKER:PLAINTEXT,PUBLIC:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'INTER_BROKER://localhost:29092,PUBLIC://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000

mongo:
image: mongo:5.0
container_name: mongo
ports:
- "27017:27017"

You can spin this up by running docker compose up in the directory with this docker-compose.yml file

After these spin up, we will need to create topics in the broker for the Kafka Streams application to use. You can do this by executing into the docker container and running the following commands:

$ docker exec -it kafka-broker bash

$ kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic words

$ kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--config cleanup.policy=compact \
--topic output

Build & Run your Kafka Streams app

Start by building the code.

./gradlew build

The application dev.responsive.quickstart.WordCount is identical to the open source word count app and counts the number of occurrences of each word in the input topic, split by whitespace. To run the application, use:

./gradlew run

Test your setup: Produce & Consume events

Let's produce some data in the words topic, and then verify that the application is counting those words! First, produce using kafka-console-producer

# if you do not have the kafka-console-producer locally, it is available
# on the kafka-broker docker container we started earlier in this tutorial
$ docker exec -it kafka-broker bash
$ kafka-console-producer --bootstrap-server localhost:9092 --topic words
> all streams flow through kafka streams

You can verify that the word count app is working as designed by consuming from the output topic using kafka-console-consumer:

$ docker exec -it kafka-broker bash
$ kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic output \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Wait a while to make sure the buffering time limits have elapsed, and you should see the following output:

all	1
flow 1
through 1
kafka 1
streams 2

Add the Responsive SDK

Shortcut: No Editor Necessary

Copy the follow diff and run pbpaste | git apply to apply it.

patch.diff
diff --git a/app/src/main/java/dev/responsive/quickstart/WordCount.java b/app/src/main/java/dev/responsive/quickstart/WordCount.java
index f53d193..a2bbe9a 100644
--- a/app/src/main/java/dev/responsive/quickstart/WordCount.java
+++ b/app/src/main/java/dev/responsive/quickstart/WordCount.java
@@ -1,5 +1,6 @@
package dev.responsive.quickstart;

+import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import java.util.Arrays;
@@ -44,7 +45,7 @@ public class WordCount {

final Topology topology = builder.build();

- final KafkaStreams streams = new KafkaStreams(topology, props);
+ final KafkaStreams streams = new ResponsiveKafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c

To update the license in the code to your trial license, you can run the following command:

LICENSE='<YOUR_LICENSE>'; sed -i '' "s|<YOUR_LICENSE>|${LICENSE}|" app/src/main/java/dev/responsive/quickstart/WordCount.java

You can now skip to the next section.

Introducing the Responsive SDK for an app that only uses the DSL is as simple as it gets:

  1. Add the required imports:
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
  1. Change new KafkaStreams to new ResponsiveKafkaStreams:
- final KafkaStreams streams = new KafkaStreams(topology, props);
+ final KafkaStreams streams = new ResponsiveKafkaStreams(topology, props);
  1. Add the configurations required to talk to MongoDB:
props.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name());
props.put(ResponsiveConfig.MONGO_ENDPOINT_CONFIG, "mongodb://localhost:27017");
props.put(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, "<your-license>");
Get A License

If you have not yet obtained a trial license for the ResponsiveSDK, make sure to get a free one here (it takes 2 minutes and is free).

That's all you have to do! Underneath the hood, this replaces a bunch of components of Kafka Streams from the state storage to the way restoration and commits work so that you can leverage remote state.

These lines of code are the only changes you need to make if you only use the DSL (this example fits this category), if you manually specify stores using the Materialized API or use the Processor API read the documentation to get a sense of how to migrate individual stores. We promise it's just as easy.

Build & Run your Responsive App

When you run the Responsive version of the app for the first time, it will read the existing changelog topic and bootstrap the remote store the same way a normal restore works with RocksDB. The difference is that this is the last restore your app will ever need to do:

./gradlew build
./gradlew run

You can confirm that the bootstrap was successful by querying MongoDB directly:

$ docker exec -it mongo mongosh

test> use counts_store;
switched to db counts_store
counts_store> db.kv_data.countDocuments();
5

Finally, you can produce more data using the console producer and confirm that it works using the console producer with the same steps outlined above.

Next Steps

Congrats! You have successfully run through the quickstart for the Responsive SDK. Here's what you can do next:

  1. Test this on one of your existing apps. Read the more detailed migration instructions for onboarding more complicated apps.
  2. Read about our advanced functionality like async processing or row-level TTL.
  3. Explore our other products like the Control Plane and RS3.