OverView
DataSource
Adding Dependencies
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-orm</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-mysql</artifactId>
</dependency>
DatasSource Configuration
quarkus.datasource.db-kind=mysql
quarkus.datasource.username=root
quarkus.datasource.password=0103
quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/my-database?createDatabaseIfNotExist=true
quarkus.hibernate-orm.database.generation.create-schemas=true
quarkus.hibernate-orm.database.generation=drop-and-create
** quarkus.hibernate-orm.database.generation.create-schemas
Find the class with @Entity annotation and create a DDL statement at the start of the server and apply it to the DB.
** quarkus.hibernate-orm.database.generation=drop-and-create
If schema exists, delete it and recreate it.
Shop Domain Design
Send Order(Publishing) to Kafka from REST Endpoint
package org.acme.kafka;
import org.acme.domain.Orderform;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
@ApplicationScoped//-> @Component in Spring
public class OrderformProducer {
@Inject //-> @Autowired in Spring
@Channel("orderform-out") // messaging channel
Emitter<Orderform> emitter;
public void sendOrderformToKafka(Orderform orderForm) {
emitter.send(orderForm);
}
}
application.properties
mp.messaging.outgoing.orderform-out.connector=smallrye-kafka
mp.messaging.outgoing.orderform-out.topic=orders
Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything about 'mp.messaging.outgoing.orderform-out.value.serializer'
Receiving Order from Kafka -> Camel Route
Adding Camel Dependencies
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-bean</artifactId>
</dependency>
Define Route
from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println("[message header] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
Orderform orderform = message.getBody(Orderform.class);
// Message Transformation
Long memberId = orderform.getMemberId();
Long itemId = orderform.getItemId();
Long orderCount = orderform.getOrderCount();
Map<String, Object> orderFormMap = Map.of(
"memberId", memberId,"itemId", itemId, "orderCount", orderCount
);
message.setHeaders(orderFormMap);
System.out.println("[message header(changed)] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
}
})
// Search Member Info. (주문 회원 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info. (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.log("--- ${body} ---");
By processing the Message Processing process using Bean,
you can organize the Route concisely and reuse the code for the same processing later.
package org.acme.routes.beans;
import io.quarkus.runtime.annotations.RegisterForReflection;
import org.acme.domain.*;
import org.acme.domain.Item.Item;
import org.apache.camel.Message;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Map;
@ApplicationScoped
@Named("orderProcessingBean")
@RegisterForReflection
public class OrderProcessingBean {
private static Logger logger = LoggerFactory.getLogger(OrderProcessingBean.class);
private Member member;
private Delivery delivery;
private Item item;
public void checkMember(ArrayList<Member> memberList) {
if (ObjectUtils.isEmpty(memberList)) {
logger.error("member가 존재하지 않습니다.");
} else if (memberList.size() > 1) {
logger.error("하나의 member_id key에 두 명 이상의 member가 조회되었습니다.");
} else {
Member member = memberList.get(0);
this.member = member;
this.delivery = new Delivery(member.getAddress(), DeliveryStatus.READY);
}
}
public void checkItem(ArrayList<Item> itemList) {
if (ObjectUtils.isEmpty(itemList)) {
logger.error("item이 존재하지 않습니다.");
} else if (itemList.size() > 1) {
logger.error("하나의 item_id에 두 개 이상의 item이 조회되었습니다.");
} else {
Item item = itemList.get(0);
this.item = item;
}
}
public void makeOrder(Message message) {
Map<String, Object> orderFormMap = message.getHeaders();
Long orderCount = (Long) orderFormMap.get("orderCount");
OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
Order order = Order.createOrder(this.member, this.delivery, orderItem);
message.setBody(order);
}
}
To receive JSON data from Camel Kafka Component, you must define a custom deserializer.
package org.acme.json;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import org.acme.domain.Orderform;
public class OrderformDeserializer extends ObjectMapperDeserializer<Orderform> {
public OrderformDeserializer() {
super(Orderform.class);
}
}
Run the Application and Check Results
[Send Post Request] The Application will publish orderform message.
[Message Transformation] header contents is changed.
[After JPA persist] Order, Delivery, OrderItem table is updated.
https://quarkus.io/guides/hibernate-orm
https://medium.com/nerd-for-tech/apache-camel-crud-with-jpa-fa9603430ff5
'~2022 > Camel with Quarkus' 카테고리의 다른 글
[Camel-Quarkus] Exception Handling & Branch Statement (feat. Slack Component) (0) | 2022.04.01 |
---|