OverView

Shop Example Overview

 

DataSource 

Adding Dependencies

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-hibernate-orm</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-jdbc-mysql</artifactId>
    </dependency>

DatasSource Configuration 

quarkus.datasource.db-kind=mysql
quarkus.datasource.username=root
quarkus.datasource.password=0103
quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/my-database?createDatabaseIfNotExist=true
quarkus.hibernate-orm.database.generation.create-schemas=true
quarkus.hibernate-orm.database.generation=drop-and-create

** quarkus.hibernate-orm.database.generation.create-schemas
Find the class with @Entity annotation and create a DDL statement at the start of the server and apply it to the DB.

** quarkus.hibernate-orm.database.generation=drop-and-create

If schema exists, delete it and recreate it.

 

Shop Domain Design 

 

Send Order(Publishing) to Kafka from REST Endpoint

package org.acme.kafka;

import org.acme.domain.Orderform;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped//-> @Component in Spring
public class OrderformProducer {

    @Inject //-> @Autowired in Spring
    @Channel("orderform-out") // messaging channel
    Emitter<Orderform> emitter;

    public void sendOrderformToKafka(Orderform orderForm) {
        emitter.send(orderForm);
    }
}

application.properties

mp.messaging.outgoing.orderform-out.connector=smallrye-kafka
mp.messaging.outgoing.orderform-out.topic=orders

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything about 'mp.messaging.outgoing.orderform-out.value.serializer'

 

Receiving Order from Kafka -> Camel Route

Adding Camel Dependencies

    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jdbc</artifactId>
    </dependency>
	<dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jpa</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-bean</artifactId>
    </dependency>

 

Define Route

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
	.process(new Processor() {
    	@Override
        public void process(Exchange exchange) throws Exception {
        	Message message = exchange.getIn();
            System.out.println("[message header] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());

            Orderform orderform = message.getBody(Orderform.class);

            // Message Transformation
            Long memberId = orderform.getMemberId();
            Long itemId = orderform.getItemId();
            Long orderCount = orderform.getOrderCount();

            Map<String, Object> orderFormMap = Map.of(
            	"memberId", memberId,"itemId", itemId, "orderCount", orderCount
            );
            message.setHeaders(orderFormMap);

            System.out.println("[message header(changed)] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());
        }
    })
    // Search Member Info. (주문 회원 정보를 DB에서 select)
    .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
    .to("bean:orderProcessingBean?method=checkMember")
    // Search Item Info. (주문상품 정보를 DB에서 select)
    .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
    .to("bean:orderProcessingBean?method=checkItem")
    // Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
    .to("bean:orderProcessingBean?method=makeOrder")
    // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
    .log("--- ${body} ---");

By processing the Message Processing process using Bean,
you can organize the Route concisely and reuse the code for the same processing later.

package org.acme.routes.beans;

import io.quarkus.runtime.annotations.RegisterForReflection;
import org.acme.domain.*;
import org.acme.domain.Item.Item;
import org.apache.camel.Message;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Map;

@ApplicationScoped
@Named("orderProcessingBean")
@RegisterForReflection
public class OrderProcessingBean {

    private static Logger logger = LoggerFactory.getLogger(OrderProcessingBean.class);
    private Member member;
    private Delivery delivery;
    private Item item;

    public void checkMember(ArrayList<Member> memberList) {
        if (ObjectUtils.isEmpty(memberList)) {
            logger.error("member가 존재하지 않습니다.");
        } else if (memberList.size() > 1) {
            logger.error("하나의 member_id key에 두 명 이상의 member가 조회되었습니다.");
        } else {
            Member member = memberList.get(0);
            this.member = member;
            this.delivery = new Delivery(member.getAddress(), DeliveryStatus.READY);
        }
    }

    public void checkItem(ArrayList<Item> itemList) {
        if (ObjectUtils.isEmpty(itemList)) {
            logger.error("item이 존재하지 않습니다.");
        } else if (itemList.size() > 1) {
            logger.error("하나의 item_id에 두 개 이상의 item이 조회되었습니다.");
        } else {
            Item item = itemList.get(0);
            this.item = item;
        }
    }

    public void makeOrder(Message message) {
        Map<String, Object> orderFormMap = message.getHeaders();
        Long orderCount = (Long) orderFormMap.get("orderCount");

        OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
        Order order = Order.createOrder(this.member, this.delivery, orderItem);

        message.setBody(order);
    }
}

To receive JSON data from Camel Kafka Component, you must define a custom deserializer.

package org.acme.json;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import org.acme.domain.Orderform;

public class OrderformDeserializer extends ObjectMapperDeserializer<Orderform> {
    public OrderformDeserializer() {
        super(Orderform.class);
    }
}

 

Run the Application and Check Results

 

[Send Post Request] The Application will publish orderform message.

[Message Transformation] header contents is changed.

[After JPA persist] Order, Delivery, OrderItem table is updated.

 

 

https://quarkus.io/guides/hibernate-orm

https://medium.com/nerd-for-tech/apache-camel-crud-with-jpa-fa9603430ff5

https://quarkus.io/guides/kafka#kafka-serialization

+ Recent posts