What is Spring Cloud Stream?
Spring Cloud Stream is a framework that decouples the implementation of Producer and Consumer irrespective of the messaging platform. It also enables us to switch to a different messaging broker by simply switching the binder.
You can find the list of available binders here
What are we gonna do?
We going to create a simple stream processing application that will generate random numbers for every second and put them into a Kafka topic called numbers-topic. And we will listen to this numbers-topic and do the below operations,
- Compute the square root to each number and put it into the sqrt-topic
- Compute the cube to each number and put it into the cube-topic
- Basics of Java 8 function-based programming
- basics of Kafka
Setting up the environment
Please refer to the official documentation to set up Kafka using docker. Alternatively, you can use the docker-compose file from this demo project to up and run the Kafka in your local.
I used https://start.spring.io to generate the structure of this project.
Once you generate the project, add the below binder dependency in the pom.xml — we will be using the Kafka binder in this example,
You can find the complete code in the below git repositories,
Producer service from here
Consumer service from here
To create a producer we only need to create beans with a supplier
Yes, we have created a Producer to send random numbers using the Java functional programming style.
Configuring Producer service
Let’s jump to the configuration part, where we will see how to configure the Kafka topic name for the producer.
Since we are using Kafka, the destination should be the Kafka topic name in this case numbers-topic. And also we have used the functional programming way for defining the producer, the binder name needs to be of a particular format.
<functionName> + -out- + <index>
Here the out refers to the producer. The index is an index of the binder and is typically set to 0. You can read more about the binder naming convention here
With the above-said configuration — in our project, we have to bind the supplier function numberProducer using the below format,
In this case, the supplier function is invoked by a default polling mechanism which is provided by the framework. It calls the supplier every second. We can usestreamBridge to send messages on demand which I will cover in the upcoming blogs.
To create a consumer, we can either use Consumer or Function based on our needs, let’s implement both the models in this project
As you can see we have a Consumer and two Function where the Consumer will act as an event listener. In this case for each message from numbers-topic. The Function acts as a Listener and a Producer who listens to a topic and produces messages on the same or different topic based on the binder configuration. In this project, we are using different topics for each Function which you can see below,
numberConsumer— Listen to our topic
numbers-topicand simply log the incoming message
consumeAndProcessSqrt— Listen to our topic
numbers-topicand process the square root operation and simply returns the computed result
consumeAndProcessCube— Listen to our topic
numbers-topicand process the cube operation and returns the computed result
Configuring Consumer service
As discussed in the Producer section, the Consumer binder’s destination should also be the Kafka topic name. In this case cube-topic and sqrt-topic for different processors. Like the Producer configuration, the consumer has a particular format for binding the name like below,
<functionName> + -in- + <index>
Here the in refers to the Consumer. The index is an index of the binder and is typically set to 0.
With the above-said configuration — in our project, we have to bind the Consumer function numberConsumerusing the format numberConsumer-in-0 and for the Function, we should have both in and out configuration since it deals with consumer and producer functionality,
consumerAndProcessSqrt-in-0 consumeAndProcessSqrt-out-0consumeAndProcessCube-in-0 consumeAndProcessCube-out-0
You can see that consumeAndProcessSqrt-out-0 and consumeAndProcessCube-out-0 have different destinations sqrt-topic and cube-topic respectively which is based on our project needs.
Running the Project
Once the Kafka setup is completed and make sure you able to access the control center at the http://localhost:9021
Now, run the Producer-service and Consumer-service applications with the help of your favorite IDE. Once the applications started successfully you should be able to see the stream of incoming data in the control center for our topics.
In this blog, we saw how we can use Spring Cloud Streams to send and receive messages from a Kafka topic. We discussed how to define the binders and then used the Kafka binder dependency to send messages to the Kafka broker.