** [Camel-Quarkus] Shop Example using Kafka, JPA Component 예제에서 이어지는 내용입니다.

 

Orderform 정보(주문자 Id, 주문한 Item Id, 주문 수량)를 가지고 Order를 생성하는 과정은 아래와 같다.

1) Member Id로 주문자 정보를 조회 → 주문자(회원) 정보가 없으면 Exception
2) Item Id로 상품 정보를 조회 → Item 정보가 없으면 Exeption
3) Member.address 정보로 Delivery(배송정보) 생성
4) 주문 수량과 Item.price 정보로 OrderItem(주문상품 정보) 생성
5) Order 생성 → 이 때 주문 수량만큼 재고를 줄이는데, 재고가 부족하면 Exception

 

위 프로세스에서 밑줄 친 부분은 예외 case에 해당한다.


아래 코드는, 위 과정에서 발생할 수 있는 Exception에 대한 처리가 되어있지 않은 상태이다.

Exception이 발생하면 Camel Route가 Exception이 발생한 시점에 동작을 멈추게 되고, 당연히 DB가 Update되지 않는다.

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
	.process(new Processor() {
    	@Override
        public void process(Exchange exchange) throws Exception {
        	Message message = exchange.getIn();
            System.out.println("[message header] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());

            Orderform orderform = message.getBody(Orderform.class);

            // Message Transformation
            Long memberId = orderform.getMemberId();
            Long itemId = orderform.getItemId();
            Long orderCount = orderform.getOrderCount();

            Map<String, Object> orderFormMap = Map.of(
            	"memberId", memberId,"itemId", itemId, "orderCount", orderCount
            );
            message.setHeaders(orderFormMap);

            System.out.println("[message header(changed)] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());
        }
    })
    // Search Member Info. (주문 회원 정보를 DB에서 select)
    .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
    .to("bean:orderProcessingBean?method=checkMember")
    // Search Item Info. (주문상품 정보를 DB에서 select)
    .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
    .to("bean:orderProcessingBean?method=checkItem")
    // Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
    .to("bean:orderProcessingBean?method=makeOrder")
    // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
    .log("--- ${body} ---");

 

예를 들어 Make Order(주문 Entity 생성) 부분에서 재고가 없는 Exception이 발생할 경우,
위에 정의된 Camel Route를 따라 processing이 진행되는 중에  NotEnoughStockExeption이 발생한다.

//package org.acme.routes.beans;
public void makeOrder(Message message) {
    Map<String, Object> orderFormMap = message.getHeaders();
    Long orderCount = (Long) orderFormMap.get("orderCount");

    OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
    Order order = Order.createOrder(this.member, this.delivery, orderItem);

    message.setBody(order);
}

//package org.acme.domain;
public static OrderItem createOrderItem(Item item, int orderPrice, int count) {
    OrderItem orderItem = new OrderItem();
    orderItem.setItem(item);
    orderItem.setOrderPrice(orderPrice);
    orderItem.setCount(count);

    item.removeStock(count);
    return orderItem;
}

//package org.acme.domain.Item;
public void removeStock(int quantity) {
    int restStock = this.stockQuantity - quantity;
    if (restStock < 0) {
        throw new NotEnoughStockException("need more stock");
    }
    this.stockQuantity = restStock;
}

Camel onException() 메소드를 사용한 Exception Handling

 

아래 예시와 같이 onException() 메소드를 사용해서 간단히 Exception Handling이 가능하다.

단, onException() 메소드는 해당 라우트에서 onException 구문이 나오기 전까지 발생한 exception만 handling하므로,

특정 Exception이 발생할 것으로 예상되는 DSL문 다음에 사용해야 한다.

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Message message = exchange.getIn();
                System.out.println("[message header] -- " + message.getHeaders());
                System.out.println("[message body] -- " + message.getBody());

                Orderform orderform = message.getBody(Orderform.class);

                // Message Transformation
                Long memberId = orderform.getMemberId();
                Long itemId = orderform.getItemId();
                Long orderCount = orderform.getOrderCount();

                Map<String, Object> orderFormMap = Map.of(
                        "memberId", memberId, "itemId", itemId, "orderCount", orderCount
                );
                message.setHeaders(orderFormMap);

                System.out.println("[message header(changed)] -- " + message.getHeaders());
                System.out.println("[message body] -- " + message.getBody());
            }
        })
        // Search Member Info. (주문자 정보를 DB에서 select)
        .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
        .to("bean:orderProcessingBean?method=checkMember")
        // Search Item Info (주문상품 정보를 DB에서 select)
        .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
        .to("bean:orderProcessingBean?method=checkItem")
        // Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
        .to("bean:orderProcessingBean?method=makeOrder")
        // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
        .to("jpa://" + Order.class.getName() + "?usePersist=true")
        .onException(NotEnoughStockException.class)
            .transform(simple("exception"))
            .to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/xxx...");

Camel Branch 구문(choice ~ when)을 사용한 예외 처리 

 

아래와 같이 camel branch를 활용하여 exception을 처리할 수도 있다.

일반적으로 message body나 header의 내용에 따라 분기를 나눌 것이다.

// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.choice()
.when(simple("${body} is 'org.acme.domain.Order'"))
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
.otherwise() // Exception이 발생한 경우, message body가 Order type이 아니므로 slack으로 메시지 송신 
    .to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/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");

 

Camel Route에서 message header나 body의 내용을 보고 Exception 여부를 확인할 수 있도록,
processing bean에서 exception 처리를 해준다

// package org.acme.routes.beans;
public void makeOrder(Message message) {
    try {
        Map<String, Object> orderFormMap = message.getHeaders();
        Long orderCount = (Long) orderFormMap.get("orderCount");

        OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
        Order order = Order.createOrder(this.member, this.delivery, orderItem);
		
        // 정상적으로 order을 생성한 경우, message body는 order 객체가 들어가게 된다
        message.setBody(order);
    } catch (RuntimeException e) {
    	// 예외가 발생한 경우, message body를 "exception" string으로 transform 한다
        message.setBody("exception");
    }
}

 

'~2022 > Camel with Quarkus' 카테고리의 다른 글

[Camel-Quarkus] Shop Example using Kafka, JPA Component  (0) 2022.03.28

OverView

Shop Example Overview

 

DataSource 

Adding Dependencies

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-hibernate-orm</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-jdbc-mysql</artifactId>
    </dependency>

DatasSource Configuration 

quarkus.datasource.db-kind=mysql
quarkus.datasource.username=root
quarkus.datasource.password=0103
quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/my-database?createDatabaseIfNotExist=true
quarkus.hibernate-orm.database.generation.create-schemas=true
quarkus.hibernate-orm.database.generation=drop-and-create

** quarkus.hibernate-orm.database.generation.create-schemas
Find the class with @Entity annotation and create a DDL statement at the start of the server and apply it to the DB.

** quarkus.hibernate-orm.database.generation=drop-and-create

If schema exists, delete it and recreate it.

 

Shop Domain Design 

 

Send Order(Publishing) to Kafka from REST Endpoint

package org.acme.kafka;

import org.acme.domain.Orderform;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped//-> @Component in Spring
public class OrderformProducer {

    @Inject //-> @Autowired in Spring
    @Channel("orderform-out") // messaging channel
    Emitter<Orderform> emitter;

    public void sendOrderformToKafka(Orderform orderForm) {
        emitter.send(orderForm);
    }
}

application.properties

mp.messaging.outgoing.orderform-out.connector=smallrye-kafka
mp.messaging.outgoing.orderform-out.topic=orders

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything about 'mp.messaging.outgoing.orderform-out.value.serializer'

 

Receiving Order from Kafka -> Camel Route

Adding Camel Dependencies

    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jdbc</artifactId>
    </dependency>
	<dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jpa</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-bean</artifactId>
    </dependency>

 

Define Route

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
	.process(new Processor() {
    	@Override
        public void process(Exchange exchange) throws Exception {
        	Message message = exchange.getIn();
            System.out.println("[message header] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());

            Orderform orderform = message.getBody(Orderform.class);

            // Message Transformation
            Long memberId = orderform.getMemberId();
            Long itemId = orderform.getItemId();
            Long orderCount = orderform.getOrderCount();

            Map<String, Object> orderFormMap = Map.of(
            	"memberId", memberId,"itemId", itemId, "orderCount", orderCount
            );
            message.setHeaders(orderFormMap);

            System.out.println("[message header(changed)] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());
        }
    })
    // Search Member Info. (주문 회원 정보를 DB에서 select)
    .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
    .to("bean:orderProcessingBean?method=checkMember")
    // Search Item Info. (주문상품 정보를 DB에서 select)
    .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
    .to("bean:orderProcessingBean?method=checkItem")
    // Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
    .to("bean:orderProcessingBean?method=makeOrder")
    // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
    .log("--- ${body} ---");

By processing the Message Processing process using Bean,
you can organize the Route concisely and reuse the code for the same processing later.

package org.acme.routes.beans;

import io.quarkus.runtime.annotations.RegisterForReflection;
import org.acme.domain.*;
import org.acme.domain.Item.Item;
import org.apache.camel.Message;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Map;

@ApplicationScoped
@Named("orderProcessingBean")
@RegisterForReflection
public class OrderProcessingBean {

    private static Logger logger = LoggerFactory.getLogger(OrderProcessingBean.class);
    private Member member;
    private Delivery delivery;
    private Item item;

    public void checkMember(ArrayList<Member> memberList) {
        if (ObjectUtils.isEmpty(memberList)) {
            logger.error("member가 존재하지 않습니다.");
        } else if (memberList.size() > 1) {
            logger.error("하나의 member_id key에 두 명 이상의 member가 조회되었습니다.");
        } else {
            Member member = memberList.get(0);
            this.member = member;
            this.delivery = new Delivery(member.getAddress(), DeliveryStatus.READY);
        }
    }

    public void checkItem(ArrayList<Item> itemList) {
        if (ObjectUtils.isEmpty(itemList)) {
            logger.error("item이 존재하지 않습니다.");
        } else if (itemList.size() > 1) {
            logger.error("하나의 item_id에 두 개 이상의 item이 조회되었습니다.");
        } else {
            Item item = itemList.get(0);
            this.item = item;
        }
    }

    public void makeOrder(Message message) {
        Map<String, Object> orderFormMap = message.getHeaders();
        Long orderCount = (Long) orderFormMap.get("orderCount");

        OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
        Order order = Order.createOrder(this.member, this.delivery, orderItem);

        message.setBody(order);
    }
}

To receive JSON data from Camel Kafka Component, you must define a custom deserializer.

package org.acme.json;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import org.acme.domain.Orderform;

public class OrderformDeserializer extends ObjectMapperDeserializer<Orderform> {
    public OrderformDeserializer() {
        super(Orderform.class);
    }
}

 

Run the Application and Check Results

 

[Send Post Request] The Application will publish orderform message.

[Message Transformation] header contents is changed.

[After JPA persist] Order, Delivery, OrderItem table is updated.

 

 

https://quarkus.io/guides/hibernate-orm

https://medium.com/nerd-for-tech/apache-camel-crud-with-jpa-fa9603430ff5

https://quarkus.io/guides/kafka#kafka-serialization

+ Recent posts