Adding dependencies

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>

 

What are we going to exchange?

@Entity
@Getter @ToString
public class Employee {
    @Id
    private Integer empId;
    private String empName;
}

 

Configure the Application

When you use Reactive Messaging, you send messages to a channel and receive them from another channel. These channels are mapped to the underlying messaging technology by configuration. In our application, we must indicate that our reception and publication channels will use the movies Kafka channel

#Kafka Server
kafka.bootstrap.servers=localhost:9092
#Kafka Topic Name
kafka.topic.name=employees

# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.employees-in.connector=smallrye-kafka
mp.messaging.incoming.employees-in.topic=employees
mp.messaging.incoming.employees-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.employees-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.employees-out.connector=smallrye-kafka
mp.messaging.outgoing.employees-out.topic=employees
mp.messaging.outgoing.employees-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.employees-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer

No configuration required in case of String type(default configuration), but in Integer Type, data will be lost if the serial/deserializer is not set properly.
In case of Object Type (JSON), the serial/deserializer built into Quarkus automatically operates, so there is no need to set the key/value type.

String Type의 경우 Default이기 때문에 설정이 필요 없으나, Integer Type으로 key나 value를 consume/produce 할 경우 serializer를 알맞게 설정하지 않으면 data가 소실된다.
Object Type(JSON)의 경우 Quarkus에 built-in 된 serial/deserializer가 자동으로 동작하게 되어 key/value type 설정이 필요 없다.

Publishing to Kafka

Create the org.acme.domain.Employee class with the following content:

package org.acme.kafka;

import io.smallrye.reactive.messaging.kafka.Record;
import org.acme.domain.Employee;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped//-> @Component in Spring
public class EmployeeProducer {

    @Inject //-> @Autowired in Spring
    @Channel("employees-out") // messaging channel
    Emitter<Record<Integer, String>> emitter;

    public void sendEmployeeToKafka(Employee employee) {
        emitter.send(Record.of(employee.getEmpId(), employee.getEmpName()));
    }
}

In this class, we inject an Emitter, i.e., an object responsible for sending a message to a channel. This emitter is attached to the movies-out channel (and so will send messages to Kafka). 

So, the rest of our application can simply use the sendMovieToKafka method to send a Employee info. to our Kafka topic.

 

Consuming from Kafka

package org.acme.kafka;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped //-> @Component in Spring
public class EmployeeConsumer {

    private final Logger logger = Logger.getLogger(EmployeeConsumer.class);

    @Incoming("employees-in") // messaging channel
    public void receive(Record<Integer, String> record) {
        logger.infof("Got an employee: %s - %s", record.key(), record.value());
    }
}

Here, we use the @Incoming annotation to indicate to Quarkus to call the receive method for every received record.

 

Sending employees from a REST endpoint

package org.acme.kafka.resource;

import org.acme.kafka.EmployeeProducer;
import org.acme.domain.Employee;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class EmployeeResource {

    @Inject//-> @Autowired in Spring
    EmployeeProducer producer;

    @POST
    public Response send(Employee employee) {
        producer.sendEmployeeToKafka(employee);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

 

Run the application

 

Send Post Request

 

Check the producer's logs when receiving messages

 

+ Recent posts