** [Camel-Quarkus] Shop Example using Kafka, JPA Component 예제에서 이어지는 내용입니다.

 

Orderform 정보(주문자 Id, 주문한 Item Id, 주문 수량)를 가지고 Order를 생성하는 과정은 아래와 같다.

1) Member Id로 주문자 정보를 조회 → 주문자(회원) 정보가 없으면 Exception
2) Item Id로 상품 정보를 조회 → Item 정보가 없으면 Exeption
3) Member.address 정보로 Delivery(배송정보) 생성
4) 주문 수량과 Item.price 정보로 OrderItem(주문상품 정보) 생성
5) Order 생성 → 이 때 주문 수량만큼 재고를 줄이는데, 재고가 부족하면 Exception

 

위 프로세스에서 밑줄 친 부분은 예외 case에 해당한다.


아래 코드는, 위 과정에서 발생할 수 있는 Exception에 대한 처리가 되어있지 않은 상태이다.

Exception이 발생하면 Camel Route가 Exception이 발생한 시점에 동작을 멈추게 되고, 당연히 DB가 Update되지 않는다.

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
	.process(new Processor() {
    	@Override
        public void process(Exchange exchange) throws Exception {
        	Message message = exchange.getIn();
            System.out.println("[message header] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());

            Orderform orderform = message.getBody(Orderform.class);

            // Message Transformation
            Long memberId = orderform.getMemberId();
            Long itemId = orderform.getItemId();
            Long orderCount = orderform.getOrderCount();

            Map<String, Object> orderFormMap = Map.of(
            	"memberId", memberId,"itemId", itemId, "orderCount", orderCount
            );
            message.setHeaders(orderFormMap);

            System.out.println("[message header(changed)] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());
        }
    })
    // Search Member Info. (주문 회원 정보를 DB에서 select)
    .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
    .to("bean:orderProcessingBean?method=checkMember")
    // Search Item Info. (주문상품 정보를 DB에서 select)
    .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
    .to("bean:orderProcessingBean?method=checkItem")
    // Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
    .to("bean:orderProcessingBean?method=makeOrder")
    // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
    .log("--- ${body} ---");

 

예를 들어 Make Order(주문 Entity 생성) 부분에서 재고가 없는 Exception이 발생할 경우,
위에 정의된 Camel Route를 따라 processing이 진행되는 중에  NotEnoughStockExeption이 발생한다.

//package org.acme.routes.beans;
public void makeOrder(Message message) {
    Map<String, Object> orderFormMap = message.getHeaders();
    Long orderCount = (Long) orderFormMap.get("orderCount");

    OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
    Order order = Order.createOrder(this.member, this.delivery, orderItem);

    message.setBody(order);
}

//package org.acme.domain;
public static OrderItem createOrderItem(Item item, int orderPrice, int count) {
    OrderItem orderItem = new OrderItem();
    orderItem.setItem(item);
    orderItem.setOrderPrice(orderPrice);
    orderItem.setCount(count);

    item.removeStock(count);
    return orderItem;
}

//package org.acme.domain.Item;
public void removeStock(int quantity) {
    int restStock = this.stockQuantity - quantity;
    if (restStock < 0) {
        throw new NotEnoughStockException("need more stock");
    }
    this.stockQuantity = restStock;
}

Camel onException() 메소드를 사용한 Exception Handling

 

아래 예시와 같이 onException() 메소드를 사용해서 간단히 Exception Handling이 가능하다.

단, onException() 메소드는 해당 라우트에서 onException 구문이 나오기 전까지 발생한 exception만 handling하므로,

특정 Exception이 발생할 것으로 예상되는 DSL문 다음에 사용해야 한다.

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Message message = exchange.getIn();
                System.out.println("[message header] -- " + message.getHeaders());
                System.out.println("[message body] -- " + message.getBody());

                Orderform orderform = message.getBody(Orderform.class);

                // Message Transformation
                Long memberId = orderform.getMemberId();
                Long itemId = orderform.getItemId();
                Long orderCount = orderform.getOrderCount();

                Map<String, Object> orderFormMap = Map.of(
                        "memberId", memberId, "itemId", itemId, "orderCount", orderCount
                );
                message.setHeaders(orderFormMap);

                System.out.println("[message header(changed)] -- " + message.getHeaders());
                System.out.println("[message body] -- " + message.getBody());
            }
        })
        // Search Member Info. (주문자 정보를 DB에서 select)
        .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
        .to("bean:orderProcessingBean?method=checkMember")
        // Search Item Info (주문상품 정보를 DB에서 select)
        .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
        .to("bean:orderProcessingBean?method=checkItem")
        // Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
        .to("bean:orderProcessingBean?method=makeOrder")
        // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
        .to("jpa://" + Order.class.getName() + "?usePersist=true")
        .onException(NotEnoughStockException.class)
            .transform(simple("exception"))
            .to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/xxx...");

Camel Branch 구문(choice ~ when)을 사용한 예외 처리 

 

아래와 같이 camel branch를 활용하여 exception을 처리할 수도 있다.

일반적으로 message body나 header의 내용에 따라 분기를 나눌 것이다.

// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.choice()
.when(simple("${body} is 'org.acme.domain.Order'"))
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
.otherwise() // Exception이 발생한 경우, message body가 Order type이 아니므로 slack으로 메시지 송신 
    .to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/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");

 

Camel Route에서 message header나 body의 내용을 보고 Exception 여부를 확인할 수 있도록,
processing bean에서 exception 처리를 해준다

// package org.acme.routes.beans;
public void makeOrder(Message message) {
    try {
        Map<String, Object> orderFormMap = message.getHeaders();
        Long orderCount = (Long) orderFormMap.get("orderCount");

        OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
        Order order = Order.createOrder(this.member, this.delivery, orderItem);
		
        // 정상적으로 order을 생성한 경우, message body는 order 객체가 들어가게 된다
        message.setBody(order);
    } catch (RuntimeException e) {
    	// 예외가 발생한 경우, message body를 "exception" string으로 transform 한다
        message.setBody("exception");
    }
}

 

'~2022 > Camel with Quarkus' 카테고리의 다른 글

[Camel-Quarkus] Shop Example using Kafka, JPA Component  (0) 2022.03.28

Adding dependencies

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>

 

What are we going to exchange?

@Entity
@Getter @ToString
public class Employee {
    @Id
    private Integer empId;
    private String empName;
}

 

Configure the Application

When you use Reactive Messaging, you send messages to a channel and receive them from another channel. These channels are mapped to the underlying messaging technology by configuration. In our application, we must indicate that our reception and publication channels will use the movies Kafka channel

#Kafka Server
kafka.bootstrap.servers=localhost:9092
#Kafka Topic Name
kafka.topic.name=employees

# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.employees-in.connector=smallrye-kafka
mp.messaging.incoming.employees-in.topic=employees
mp.messaging.incoming.employees-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.employees-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.employees-out.connector=smallrye-kafka
mp.messaging.outgoing.employees-out.topic=employees
mp.messaging.outgoing.employees-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.employees-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer

No configuration required in case of String type(default configuration), but in Integer Type, data will be lost if the serial/deserializer is not set properly.
In case of Object Type (JSON), the serial/deserializer built into Quarkus automatically operates, so there is no need to set the key/value type.

String Type의 경우 Default이기 때문에 설정이 필요 없으나, Integer Type으로 key나 value를 consume/produce 할 경우 serializer를 알맞게 설정하지 않으면 data가 소실된다.
Object Type(JSON)의 경우 Quarkus에 built-in 된 serial/deserializer가 자동으로 동작하게 되어 key/value type 설정이 필요 없다.

Publishing to Kafka

Create the org.acme.domain.Employee class with the following content:

package org.acme.kafka;

import io.smallrye.reactive.messaging.kafka.Record;
import org.acme.domain.Employee;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped//-> @Component in Spring
public class EmployeeProducer {

    @Inject //-> @Autowired in Spring
    @Channel("employees-out") // messaging channel
    Emitter<Record<Integer, String>> emitter;

    public void sendEmployeeToKafka(Employee employee) {
        emitter.send(Record.of(employee.getEmpId(), employee.getEmpName()));
    }
}

In this class, we inject an Emitter, i.e., an object responsible for sending a message to a channel. This emitter is attached to the movies-out channel (and so will send messages to Kafka). 

So, the rest of our application can simply use the sendMovieToKafka method to send a Employee info. to our Kafka topic.

 

Consuming from Kafka

package org.acme.kafka;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped //-> @Component in Spring
public class EmployeeConsumer {

    private final Logger logger = Logger.getLogger(EmployeeConsumer.class);

    @Incoming("employees-in") // messaging channel
    public void receive(Record<Integer, String> record) {
        logger.infof("Got an employee: %s - %s", record.key(), record.value());
    }
}

Here, we use the @Incoming annotation to indicate to Quarkus to call the receive method for every received record.

 

Sending employees from a REST endpoint

package org.acme.kafka.resource;

import org.acme.kafka.EmployeeProducer;
import org.acme.domain.Employee;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class EmployeeResource {

    @Inject//-> @Autowired in Spring
    EmployeeProducer producer;

    @POST
    public Response send(Employee employee) {
        producer.sendEmployeeToKafka(employee);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

 

Run the application

 

Send Post Request

 

Check the producer's logs when receiving messages

 

+ Recent posts