** [Camel-Quarkus] Shop Example using Kafka, JPA Component 예제에서 이어지는 내용입니다.
Orderform 정보(주문자 Id, 주문한 Item Id, 주문 수량)를 가지고 Order를 생성하는 과정은 아래와 같다.
1) Member Id로 주문자 정보를 조회 → 주문자(회원) 정보가 없으면 Exception
2) Item Id로 상품 정보를 조회 → Item 정보가 없으면 Exeption
3) Member.address 정보로 Delivery(배송정보) 생성
4) 주문 수량과 Item.price 정보로 OrderItem(주문상품 정보) 생성
5) Order 생성 → 이 때 주문 수량만큼 재고를 줄이는데, 재고가 부족하면 Exception
위 프로세스에서 밑줄 친 부분은 예외 case에 해당한다.
아래 코드는, 위 과정에서 발생할 수 있는 Exception에 대한 처리가 되어있지 않은 상태이다.
Exception이 발생하면 Camel Route가 Exception이 발생한 시점에 동작을 멈추게 되고, 당연히 DB가 Update되지 않는다.
from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println("[message header] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
Orderform orderform = message.getBody(Orderform.class);
// Message Transformation
Long memberId = orderform.getMemberId();
Long itemId = orderform.getItemId();
Long orderCount = orderform.getOrderCount();
Map<String, Object> orderFormMap = Map.of(
"memberId", memberId,"itemId", itemId, "orderCount", orderCount
);
message.setHeaders(orderFormMap);
System.out.println("[message header(changed)] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
}
})
// Search Member Info. (주문 회원 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info. (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.log("--- ${body} ---");
예를 들어 Make Order(주문 Entity 생성) 부분에서 재고가 없는 Exception이 발생할 경우,
위에 정의된 Camel Route를 따라 processing이 진행되는 중에 NotEnoughStockExeption이 발생한다.
//package org.acme.routes.beans;
public void makeOrder(Message message) {
Map<String, Object> orderFormMap = message.getHeaders();
Long orderCount = (Long) orderFormMap.get("orderCount");
OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
Order order = Order.createOrder(this.member, this.delivery, orderItem);
message.setBody(order);
}
//package org.acme.domain;
public static OrderItem createOrderItem(Item item, int orderPrice, int count) {
OrderItem orderItem = new OrderItem();
orderItem.setItem(item);
orderItem.setOrderPrice(orderPrice);
orderItem.setCount(count);
item.removeStock(count);
return orderItem;
}
//package org.acme.domain.Item;
public void removeStock(int quantity) {
int restStock = this.stockQuantity - quantity;
if (restStock < 0) {
throw new NotEnoughStockException("need more stock");
}
this.stockQuantity = restStock;
}
Camel onException() 메소드를 사용한 Exception Handling
아래 예시와 같이 onException() 메소드를 사용해서 간단히 Exception Handling이 가능하다.
단, onException() 메소드는 해당 라우트에서 onException 구문이 나오기 전까지 발생한 exception만 handling하므로,
특정 Exception이 발생할 것으로 예상되는 DSL문 다음에 사용해야 한다.
from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println("[message header] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
Orderform orderform = message.getBody(Orderform.class);
// Message Transformation
Long memberId = orderform.getMemberId();
Long itemId = orderform.getItemId();
Long orderCount = orderform.getOrderCount();
Map<String, Object> orderFormMap = Map.of(
"memberId", memberId, "itemId", itemId, "orderCount", orderCount
);
message.setHeaders(orderFormMap);
System.out.println("[message header(changed)] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
}
})
// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.onException(NotEnoughStockException.class)
.transform(simple("exception"))
.to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/xxx...");
Camel Branch 구문(choice ~ when)을 사용한 예외 처리
아래와 같이 camel branch를 활용하여 exception을 처리할 수도 있다.
일반적으로 message body나 header의 내용에 따라 분기를 나눌 것이다.
// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.choice()
.when(simple("${body} is 'org.acme.domain.Order'"))
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.otherwise() // Exception이 발생한 경우, message body가 Order type이 아니므로 slack으로 메시지 송신
.to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/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");
Camel Route에서 message header나 body의 내용을 보고 Exception 여부를 확인할 수 있도록,
processing bean에서 exception 처리를 해준다
// package org.acme.routes.beans;
public void makeOrder(Message message) {
try {
Map<String, Object> orderFormMap = message.getHeaders();
Long orderCount = (Long) orderFormMap.get("orderCount");
OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
Order order = Order.createOrder(this.member, this.delivery, orderItem);
// 정상적으로 order을 생성한 경우, message body는 order 객체가 들어가게 된다
message.setBody(order);
} catch (RuntimeException e) {
// 예외가 발생한 경우, message body를 "exception" string으로 transform 한다
message.setBody("exception");
}
}
'~2022 > Camel with Quarkus' 카테고리의 다른 글
[Camel-Quarkus] Shop Example using Kafka, JPA Component (0) | 2022.03.28 |
---|