Quickstart
This guide demonstrates how to get a minimal Responsive environment set up and running. The example use case implements the open source Kafka Streams Demo Application on the Responsive platform.
Currently, this quickstart only covers the compute functionality of the Responsive Platform. Autoscaling is only available in our managed cloud offering.
Prerequisites
You will need Docker Compose, which you can install by following the Docker Compose documentation. To check if you have it installed, run:
docker compose version
You should see an output resembling Docker Compose version v2.19.1
.
Deploy Kafka & Scylla
Responsive for Kafka Streams requires a Kafka broker and a storage backend
compatible with Apache Cassandra. In this quickstart, we will spin up
confluentinc/cp-kafka
and scylladb/scylla
containers.
Copy and paste the following YAML content into a file named
docker-compose.yaml
:Docker Compose YAML
docker-compose.yml---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000
scylla:
image: scylladb/scylla:latest
container_name: scylla
ports:
- "9042:9042"Run the
docker-compose.yaml
file:$ docker compose up -d
[+] Running 3/3
✔ Container scylla Started 0.2s
✔ Container zookeeper Started 0.2s
✔ Container broker Started 0.4sInitialize Scylla by creating a
KEYSPACE
to use for this quickstart:docker exec scylla cqlsh -e \
"CREATE KEYSPACE quickstart \
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"
Add Responsive Dependencies
This section covers adding the required dependencies to an existing Java development environment. If you do not already have one, you can follow the Maven Quickstart Archetype or the Gradle Quickstart to initialize your project.
- Maven
- Gradle
<dependency>
<groupId>dev.responsive</groupId>
<artifactId>kafka-client</artifactId>
<version>0.14.0</version>
</dependency>
implementation 'dev.responsive:kafka-client:0.14.0'
Implement Word Count
This part of the quickstart will walk you through implementing a simple Kafka Streams "Word Count" application that runs on Responsive.
Streams Topology
Take a look at the code below, which constructs the topology:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("plaintext-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));
wordCounts
.toStream()
.to("wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
If you are familiar with Kafka Streams, you should feel right at home. The only difference is on line 6 (highlighted), where we specifically indicate that the state store should be materialized using the Responsive state implementation.
Creating the Application
Instead of using new KafkaStreams(Topology, Map<?,?>)
to get your KafkaStreams
object, simply swap that out for new ResponsiveKafkaStreams(Topology, Map<?,?>)
and pass in the same inputs.
Properties props = new Properties();
// ...
KafkaStreams streams = new ResponsiveKafkaStreams(builder.build(), props);
streams.start();
Full Code
To run the ResponsiveWordCountApplication
, copy the code snippet below into your own Java project:
Toggle the Full Code Snippet
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.ResponsiveKeyValueParams;
import dev.responsive.kafka.api.ResponsiveStores;
import dev.responsive.kafka.config.ResponsiveConfig;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
public class ResponsiveWordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
// Kafka Streams Configs
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Responsive Configs
props.put(ResponsiveConfig.STORAGE_HOSTNAME_CONFIG, "localhost");
props.put(ResponsiveConfig.STORAGE_PORT_CONFIG, "9042");
props.put(ResponsiveConfig.STORAGE_DATACENTER_CONFIG, "datacenter1");
props.put(ResponsiveConfig.TENANT_ID_CONFIG, "quickstart");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("plaintext-input");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("counts-store")));
wordCounts
.toStream()
.to("wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new ResponsiveKafkaStreams(builder.build(), props);
streams.start();
}
}
Run Word Count
This section will guide you through creating the Kafka topics, producing data, running the Responsive Kafka Streams Application and finally consuming data.
- Create the source topic:
docker exec broker /bin/kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic plaintext-input
- Run the
WordCountApplication
class:
- Maven
- Gradle
mvn exec:java -Dexec.mainClass="dev.responsive.quickstart.WordCountApplication"
./gradlew :quickstart:WordCountApplication.main()
- Produce source data to the
plaintext-input
topic:
docker exec -it broker /bin/kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic plaintext-input
Type the following lines, hitting <ENTER>
after each line:
all streams lead to kafka
hello kafka streams
- Read the data from the output topic:
docker exec broker /bin/kafka-console-consumer --bootstrap-server localhost:9092 \
--topic wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Your output should resemble:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
For an explanation on why there are multiple outputs for each key, read the excellent Kafka Streams documentation
Clean Up
When you're done playing around with the quickstart application, type ^C
to terminate the Streams application and terminate the Kafka Broker and
Scylla database by running:
docker compose down
Next Steps
- Learn more about the Responsive platform
- Read the detailed migration guide to migrate from Kafka Streams
- See the full API reference