SDK Quickstart
After completing this quickstart, you will have:
- Added the Responsive SDK to a Kafka Streams application.
- Got the application to use a local MongoDB instance as a state store.
Note: You will need a free Responsive Trial License to complete this quickstart. Instructions for obtaining this license are a part of the quickstart.
We've built a simple repository that contains the skeleton code you need to run the Kafka
Streams WordCount
example as well as migrate that code to Responsive. Following that will
run through the following quickstart:
- Spin up a Kafka Broker and a MongoDB instance
- Run a
KafkaStreams
application - Migrate to
ResponsiveKafkaStreams
and run it - Query MongoDB to demonstrate disaggregated state
Initial setup
Required tools
In order to run this quickstart, you must have access to the following systems:
- Git to clone the repository
- Docker and Docker Compose (or a compatible engine)
- Gradle (it will install JDK 21 if you do not have it installed)
Get the Code
Get the quickstart code locally with:
$ git clone https://github.com/responsivedev/java-sdk-quickstart.git
$ cd java-sdk-quickstart
Prepare your dependencies
Setup Kafka and MongoDB
The first step is to deploy a Kafka broker and a MongoDB instance. You can use the
following docker-compose.yml
file (which is also included in <root>/etc
of this repository):
---
services:
broker:
image: confluentinc/cp-kafka:7.7.1
container_name: kafka-broker
ports:
- "9092:9092"
environment:
CLUSTER_ID: 'jHS82zyorYvKMntfzD4XRQ'
KAFKA_PROCESS_ROLES: 'controller,broker'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTER_BROKER'
KAFKA_LISTENERS: 'INTER_BROKER://:29092,PUBLIC://:9092,CONTROLLER://:9093'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:9093'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTER_BROKER:PLAINTEXT,PUBLIC:PLAINTEXT,CONTROLLER:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'INTER_BROKER://localhost:29092,PUBLIC://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000
mongo:
image: mongo:5.0
container_name: mongo
ports:
- "27017:27017"
You can spin this up by running docker compose up
in the directory
with this docker-compose.yml
file
After these spin up, we will need to create topics in the broker for the Kafka Streams application to use. You can do this by executing into the docker container and running the following commands:
$ docker exec -it kafka-broker bash
$ kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic words
$ kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--config cleanup.policy=compact \
--topic output
Build & Run your Kafka Streams app
Start by building the code.
./gradlew build
The application dev.responsive.quickstart.WordCount
is identical
to the open source word count app
and counts the number of occurrences of each word in the input
topic, split by whitespace. To run the application, use:
./gradlew run
Test your setup: Produce & Consume events
Let's produce some data in the words
topic, and then verify that
the application is counting those words! First, produce using kafka-console-producer
# if you do not have the kafka-console-producer locally, it is available
# on the kafka-broker docker container we started earlier in this tutorial
$ docker exec -it kafka-broker bash
$ kafka-console-producer --bootstrap-server localhost:9092 --topic words
> all streams flow through kafka streams
You can verify that the word count app is working as designed by consuming
from the output topic using kafka-console-consumer
:
$ docker exec -it kafka-broker bash
$ kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic output \
--from-beginning \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Wait a while to make sure the buffering time limits have elapsed, and you should see the following output:
all 1
flow 1
through 1
kafka 1
streams 2
Add the Responsive SDK
Shortcut: No Editor Necessary
Copy the follow diff and run pbpaste | git apply
to apply it.
diff --git a/app/src/main/java/dev/responsive/quickstart/WordCount.java b/app/src/main/java/dev/responsive/quickstart/WordCount.java
index f53d193..a2bbe9a 100644
--- a/app/src/main/java/dev/responsive/quickstart/WordCount.java
+++ b/app/src/main/java/dev/responsive/quickstart/WordCount.java
@@ -1,5 +1,6 @@
package dev.responsive.quickstart;
+import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
import java.util.Arrays;
@@ -44,7 +45,7 @@ public class WordCount {
final Topology topology = builder.build();
- final KafkaStreams streams = new KafkaStreams(topology, props);
+ final KafkaStreams streams = new ResponsiveKafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
To update the license in the code to your trial license, you can run the following command:
LICENSE='<YOUR_LICENSE>'; sed -i '' "s|<YOUR_LICENSE>|${LICENSE}|" app/src/main/java/dev/responsive/quickstart/WordCount.java
You can now skip to the next section.
Introducing the Responsive SDK for an app that only uses the DSL is as simple as it gets:
- Add the required imports:
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.StorageBackend;
- Change
new KafkaStreams
tonew ResponsiveKafkaStreams
:
- final KafkaStreams streams = new KafkaStreams(topology, props);
+ final KafkaStreams streams = new ResponsiveKafkaStreams(topology, props);
- Add the configurations required to talk to MongoDB:
props.put(ResponsiveConfig.STORAGE_BACKEND_TYPE_CONFIG, StorageBackend.MONGO_DB.name());
props.put(ResponsiveConfig.MONGO_ENDPOINT_CONFIG, "mongodb://localhost:27017");
props.put(ResponsiveConfig.RESPONSIVE_LICENSE_CONFIG, "<your-license>");
If you have not yet obtained a trial license for the ResponsiveSDK, make sure to get a free one here (it takes 2 minutes and is free).
That's all you have to do! Underneath the hood, this replaces a bunch of components of Kafka Streams from the state storage to the way restoration and commits work so that you can leverage remote state.
These lines of code are the only changes you need to make if you only use the DSL (this example fits
this category), if you manually
specify stores using the Materialized
API or use the Processor API
read the documentation to get a sense of how
to migrate individual stores. We promise it's just as easy.
Build & Run your Responsive App
When you run the Responsive version of the app for the first time, it will read the existing changelog topic and bootstrap the remote store the same way a normal restore works with RocksDB. The difference is that this is the last restore your app will ever need to do:
./gradlew build
./gradlew run
You can confirm that the bootstrap was successful by querying MongoDB directly:
$ docker exec -it mongo mongosh
test> use counts_store;
switched to db counts_store
counts_store> db.kv_data.countDocuments();
5
Finally, you can produce more data using the console producer and confirm that it works using the console producer with the same steps outlined above.
Next Steps
Congrats! You have successfully run through the quickstart for the Responsive SDK. Here's what you can do next:
- Test this on one of your existing apps. Read the more detailed migration instructions for onboarding more complicated apps.
- Read about our advanced functionality like async processing or row-level TTL.
- Explore our other products like the Control Plane and RS3.