** [Camel-Quarkus] Shop Example using Kafka, JPA Component 예제에서 이어지는 내용입니다.

 

Orderform 정보(주문자 Id, 주문한 Item Id, 주문 수량)를 가지고 Order를 생성하는 과정은 아래와 같다.

1) Member Id로 주문자 정보를 조회 → 주문자(회원) 정보가 없으면 Exception
2) Item Id로 상품 정보를 조회 → Item 정보가 없으면 Exeption
3) Member.address 정보로 Delivery(배송정보) 생성
4) 주문 수량과 Item.price 정보로 OrderItem(주문상품 정보) 생성
5) Order 생성 → 이 때 주문 수량만큼 재고를 줄이는데, 재고가 부족하면 Exception

 

위 프로세스에서 밑줄 친 부분은 예외 case에 해당한다.


아래 코드는, 위 과정에서 발생할 수 있는 Exception에 대한 처리가 되어있지 않은 상태이다.

Exception이 발생하면 Camel Route가 Exception이 발생한 시점에 동작을 멈추게 되고, 당연히 DB가 Update되지 않는다.

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
	.process(new Processor() {
    	@Override
        public void process(Exchange exchange) throws Exception {
        	Message message = exchange.getIn();
            System.out.println("[message header] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());

            Orderform orderform = message.getBody(Orderform.class);

            // Message Transformation
            Long memberId = orderform.getMemberId();
            Long itemId = orderform.getItemId();
            Long orderCount = orderform.getOrderCount();

            Map<String, Object> orderFormMap = Map.of(
            	"memberId", memberId,"itemId", itemId, "orderCount", orderCount
            );
            message.setHeaders(orderFormMap);

            System.out.println("[message header(changed)] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());
        }
    })
    // Search Member Info. (주문 회원 정보를 DB에서 select)
    .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
    .to("bean:orderProcessingBean?method=checkMember")
    // Search Item Info. (주문상품 정보를 DB에서 select)
    .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
    .to("bean:orderProcessingBean?method=checkItem")
    // Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
    .to("bean:orderProcessingBean?method=makeOrder")
    // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
    .log("--- ${body} ---");

 

예를 들어 Make Order(주문 Entity 생성) 부분에서 재고가 없는 Exception이 발생할 경우,
위에 정의된 Camel Route를 따라 processing이 진행되는 중에  NotEnoughStockExeption이 발생한다.

//package org.acme.routes.beans;
public void makeOrder(Message message) {
    Map<String, Object> orderFormMap = message.getHeaders();
    Long orderCount = (Long) orderFormMap.get("orderCount");

    OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
    Order order = Order.createOrder(this.member, this.delivery, orderItem);

    message.setBody(order);
}

//package org.acme.domain;
public static OrderItem createOrderItem(Item item, int orderPrice, int count) {
    OrderItem orderItem = new OrderItem();
    orderItem.setItem(item);
    orderItem.setOrderPrice(orderPrice);
    orderItem.setCount(count);

    item.removeStock(count);
    return orderItem;
}

//package org.acme.domain.Item;
public void removeStock(int quantity) {
    int restStock = this.stockQuantity - quantity;
    if (restStock < 0) {
        throw new NotEnoughStockException("need more stock");
    }
    this.stockQuantity = restStock;
}

Camel onException() 메소드를 사용한 Exception Handling

 

아래 예시와 같이 onException() 메소드를 사용해서 간단히 Exception Handling이 가능하다.

단, onException() 메소드는 해당 라우트에서 onException 구문이 나오기 전까지 발생한 exception만 handling하므로,

특정 Exception이 발생할 것으로 예상되는 DSL문 다음에 사용해야 한다.

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Message message = exchange.getIn();
                System.out.println("[message header] -- " + message.getHeaders());
                System.out.println("[message body] -- " + message.getBody());

                Orderform orderform = message.getBody(Orderform.class);

                // Message Transformation
                Long memberId = orderform.getMemberId();
                Long itemId = orderform.getItemId();
                Long orderCount = orderform.getOrderCount();

                Map<String, Object> orderFormMap = Map.of(
                        "memberId", memberId, "itemId", itemId, "orderCount", orderCount
                );
                message.setHeaders(orderFormMap);

                System.out.println("[message header(changed)] -- " + message.getHeaders());
                System.out.println("[message body] -- " + message.getBody());
            }
        })
        // Search Member Info. (주문자 정보를 DB에서 select)
        .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
        .to("bean:orderProcessingBean?method=checkMember")
        // Search Item Info (주문상품 정보를 DB에서 select)
        .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
        .to("bean:orderProcessingBean?method=checkItem")
        // Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
        .to("bean:orderProcessingBean?method=makeOrder")
        // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
        .to("jpa://" + Order.class.getName() + "?usePersist=true")
        .onException(NotEnoughStockException.class)
            .transform(simple("exception"))
            .to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/xxx...");

Camel Branch 구문(choice ~ when)을 사용한 예외 처리 

 

아래와 같이 camel branch를 활용하여 exception을 처리할 수도 있다.

일반적으로 message body나 header의 내용에 따라 분기를 나눌 것이다.

// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.choice()
.when(simple("${body} is 'org.acme.domain.Order'"))
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
.otherwise() // Exception이 발생한 경우, message body가 Order type이 아니므로 slack으로 메시지 송신 
    .to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/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");

 

Camel Route에서 message header나 body의 내용을 보고 Exception 여부를 확인할 수 있도록,
processing bean에서 exception 처리를 해준다

// package org.acme.routes.beans;
public void makeOrder(Message message) {
    try {
        Map<String, Object> orderFormMap = message.getHeaders();
        Long orderCount = (Long) orderFormMap.get("orderCount");

        OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
        Order order = Order.createOrder(this.member, this.delivery, orderItem);
		
        // 정상적으로 order을 생성한 경우, message body는 order 객체가 들어가게 된다
        message.setBody(order);
    } catch (RuntimeException e) {
    	// 예외가 발생한 경우, message body를 "exception" string으로 transform 한다
        message.setBody("exception");
    }
}

 

'~2022 > Camel with Quarkus' 카테고리의 다른 글

[Camel-Quarkus] Shop Example using Kafka, JPA Component  (0) 2022.03.28

OverView

Shop Example Overview

 

DataSource 

Adding Dependencies

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-hibernate-orm</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-jdbc-mysql</artifactId>
    </dependency>

DatasSource Configuration 

quarkus.datasource.db-kind=mysql
quarkus.datasource.username=root
quarkus.datasource.password=0103
quarkus.datasource.jdbc.url=jdbc:mysql://localhost:3306/my-database?createDatabaseIfNotExist=true
quarkus.hibernate-orm.database.generation.create-schemas=true
quarkus.hibernate-orm.database.generation=drop-and-create

** quarkus.hibernate-orm.database.generation.create-schemas
Find the class with @Entity annotation and create a DDL statement at the start of the server and apply it to the DB.

** quarkus.hibernate-orm.database.generation=drop-and-create

If schema exists, delete it and recreate it.

 

Shop Domain Design 

 

Send Order(Publishing) to Kafka from REST Endpoint

package org.acme.kafka;

import org.acme.domain.Orderform;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped//-> @Component in Spring
public class OrderformProducer {

    @Inject //-> @Autowired in Spring
    @Channel("orderform-out") // messaging channel
    Emitter<Orderform> emitter;

    public void sendOrderformToKafka(Orderform orderForm) {
        emitter.send(orderForm);
    }
}

application.properties

mp.messaging.outgoing.orderform-out.connector=smallrye-kafka
mp.messaging.outgoing.orderform-out.topic=orders

Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will also generate the serializer and deserializer for you, so you do not have to configure anything about 'mp.messaging.outgoing.orderform-out.value.serializer'

 

Receiving Order from Kafka -> Camel Route

Adding Camel Dependencies

    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jdbc</artifactId>
    </dependency>
	<dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-jpa</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel.quarkus</groupId>
      <artifactId>camel-quarkus-bean</artifactId>
    </dependency>

 

Define Route

from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
	.process(new Processor() {
    	@Override
        public void process(Exchange exchange) throws Exception {
        	Message message = exchange.getIn();
            System.out.println("[message header] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());

            Orderform orderform = message.getBody(Orderform.class);

            // Message Transformation
            Long memberId = orderform.getMemberId();
            Long itemId = orderform.getItemId();
            Long orderCount = orderform.getOrderCount();

            Map<String, Object> orderFormMap = Map.of(
            	"memberId", memberId,"itemId", itemId, "orderCount", orderCount
            );
            message.setHeaders(orderFormMap);

            System.out.println("[message header(changed)] -- " + message.getHeaders());
            System.out.println("[message body] -- " + message.getBody());
        }
    })
    // Search Member Info. (주문 회원 정보를 DB에서 select)
    .toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
    .to("bean:orderProcessingBean?method=checkMember")
    // Search Item Info. (주문상품 정보를 DB에서 select)
    .toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
    .to("bean:orderProcessingBean?method=checkItem")
    // Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
    .to("bean:orderProcessingBean?method=makeOrder")
    // JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
    .to("jpa://" + Order.class.getName() + "?usePersist=true")
    .log("--- ${body} ---");

By processing the Message Processing process using Bean,
you can organize the Route concisely and reuse the code for the same processing later.

package org.acme.routes.beans;

import io.quarkus.runtime.annotations.RegisterForReflection;
import org.acme.domain.*;
import org.acme.domain.Item.Item;
import org.apache.camel.Message;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.ArrayList;
import java.util.Map;

@ApplicationScoped
@Named("orderProcessingBean")
@RegisterForReflection
public class OrderProcessingBean {

    private static Logger logger = LoggerFactory.getLogger(OrderProcessingBean.class);
    private Member member;
    private Delivery delivery;
    private Item item;

    public void checkMember(ArrayList<Member> memberList) {
        if (ObjectUtils.isEmpty(memberList)) {
            logger.error("member가 존재하지 않습니다.");
        } else if (memberList.size() > 1) {
            logger.error("하나의 member_id key에 두 명 이상의 member가 조회되었습니다.");
        } else {
            Member member = memberList.get(0);
            this.member = member;
            this.delivery = new Delivery(member.getAddress(), DeliveryStatus.READY);
        }
    }

    public void checkItem(ArrayList<Item> itemList) {
        if (ObjectUtils.isEmpty(itemList)) {
            logger.error("item이 존재하지 않습니다.");
        } else if (itemList.size() > 1) {
            logger.error("하나의 item_id에 두 개 이상의 item이 조회되었습니다.");
        } else {
            Item item = itemList.get(0);
            this.item = item;
        }
    }

    public void makeOrder(Message message) {
        Map<String, Object> orderFormMap = message.getHeaders();
        Long orderCount = (Long) orderFormMap.get("orderCount");

        OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
        Order order = Order.createOrder(this.member, this.delivery, orderItem);

        message.setBody(order);
    }
}

To receive JSON data from Camel Kafka Component, you must define a custom deserializer.

package org.acme.json;

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import org.acme.domain.Orderform;

public class OrderformDeserializer extends ObjectMapperDeserializer<Orderform> {
    public OrderformDeserializer() {
        super(Orderform.class);
    }
}

 

Run the Application and Check Results

 

[Send Post Request] The Application will publish orderform message.

[Message Transformation] header contents is changed.

[After JPA persist] Order, Delivery, OrderItem table is updated.

 

 

https://quarkus.io/guides/hibernate-orm

https://medium.com/nerd-for-tech/apache-camel-crud-with-jpa-fa9603430ff5

https://quarkus.io/guides/kafka#kafka-serialization

Adding dependencies

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>

 

What are we going to exchange?

@Entity
@Getter @ToString
public class Employee {
    @Id
    private Integer empId;
    private String empName;
}

 

Configure the Application

When you use Reactive Messaging, you send messages to a channel and receive them from another channel. These channels are mapped to the underlying messaging technology by configuration. In our application, we must indicate that our reception and publication channels will use the movies Kafka channel

#Kafka Server
kafka.bootstrap.servers=localhost:9092
#Kafka Topic Name
kafka.topic.name=employees

# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.employees-in.connector=smallrye-kafka
mp.messaging.incoming.employees-in.topic=employees
mp.messaging.incoming.employees-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.employees-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.employees-out.connector=smallrye-kafka
mp.messaging.outgoing.employees-out.topic=employees
mp.messaging.outgoing.employees-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.employees-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer

No configuration required in case of String type(default configuration), but in Integer Type, data will be lost if the serial/deserializer is not set properly.
In case of Object Type (JSON), the serial/deserializer built into Quarkus automatically operates, so there is no need to set the key/value type.

String Type의 경우 Default이기 때문에 설정이 필요 없으나, Integer Type으로 key나 value를 consume/produce 할 경우 serializer를 알맞게 설정하지 않으면 data가 소실된다.
Object Type(JSON)의 경우 Quarkus에 built-in 된 serial/deserializer가 자동으로 동작하게 되어 key/value type 설정이 필요 없다.

Publishing to Kafka

Create the org.acme.domain.Employee class with the following content:

package org.acme.kafka;

import io.smallrye.reactive.messaging.kafka.Record;
import org.acme.domain.Employee;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

@ApplicationScoped//-> @Component in Spring
public class EmployeeProducer {

    @Inject //-> @Autowired in Spring
    @Channel("employees-out") // messaging channel
    Emitter<Record<Integer, String>> emitter;

    public void sendEmployeeToKafka(Employee employee) {
        emitter.send(Record.of(employee.getEmpId(), employee.getEmpName()));
    }
}

In this class, we inject an Emitter, i.e., an object responsible for sending a message to a channel. This emitter is attached to the movies-out channel (and so will send messages to Kafka). 

So, the rest of our application can simply use the sendMovieToKafka method to send a Employee info. to our Kafka topic.

 

Consuming from Kafka

package org.acme.kafka;

import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped //-> @Component in Spring
public class EmployeeConsumer {

    private final Logger logger = Logger.getLogger(EmployeeConsumer.class);

    @Incoming("employees-in") // messaging channel
    public void receive(Record<Integer, String> record) {
        logger.infof("Got an employee: %s - %s", record.key(), record.value());
    }
}

Here, we use the @Incoming annotation to indicate to Quarkus to call the receive method for every received record.

 

Sending employees from a REST endpoint

package org.acme.kafka.resource;

import org.acme.kafka.EmployeeProducer;
import org.acme.domain.Employee;

import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class EmployeeResource {

    @Inject//-> @Autowired in Spring
    EmployeeProducer producer;

    @POST
    public Response send(Employee employee) {
        producer.sendEmployeeToKafka(employee);
        // Return an 202 - Accepted response.
        return Response.accepted().build();
    }
}

 

Run the application

 

Send Post Request

 

Check the producer's logs when receiving messages

 

Kafka의 종류

Apache Kafka
  . 오픈 소스, 자유롭게 수정과 배포 가능

Confluent Kafka
  . Community License: 소프트웨어 수정, 재배포 가능하지만 Saas 형태로 서비스 제공하는 것은 금지됨
  . Enterprise License: 연간 구독형

 

데이터 파이프라인(Data Pipeline)이란?

중간에 사람의 개입 없이
데이터를 오염, 중복, 유실과 같은 결함 없이
수집, 저장, ETL(Extract, Transform, Load)이 가능하도록
일련의 흐름을 만들어 주는 과정

Event는 비즈니스에서 일어나는 모든 일(데이터)를 의미

. 웹사이트에서 무언가를 클릭하는 것

. 청구서 발행

. 송금

. 배송 물건의 위치 정보

. 택시의 GPS 좌표

. 센서의 온도/압력 데이터

Event Stream은 연속적인 많은 이벤트들의 흐름

. BigData의 특징을 가짐

. 비즈니스의 모든 영역에서 광범위하게 발생

. 대용량의 데이터(Big Data 발생)

Apache Kafka의 3가지 주요 특징

. 이벤트 스트림을 안전하게 전송: Publish & Subscribe

. 이벤트 스트림을 디스크에 저장: Write to Disk

. 이벤트 스트림을 분석 및 처리: Processing & Analysis

Apache Kafka의 사용 사례: Event(메시지/데이터)가 사용되는 모든 곳

. Messaging Syetem
. IOT 디바이스로부터 데이터 수집
. 애플리케이션에서 발생하는 로그 수집
. Realtime Event Stream Processing (Fraud Detection, 이상 감지 등)
. DB 동기화(MSA 기반의 분리된 DB간 동기화)
. 실시간 ETL(Extract-Transform-Loda)
. Spark, Flink, Strom, Hadoop과 같은 빅데이터 기술과 같이 사용

 

요약

. Apache Kafka는 흐르는 데이터를 처리하기 위한 플랫폼(Event-Streaming Platform)

We will be making use of Spring Boot and Camel SQL Component for inserting and retrieving records from MySQL

The SQL component allows you to work with databases using JDBC queries. 

The SQL component allows you to work with databases using JDBC queries. 

The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query.

So it is much easier to use parameterized queries. In case of the scenes for the actual SQL handling, while JDBC component uses the standard JDBC API. 

 

So Camel SQL component is much advanced as we can also make use of features like Spring transaction


Gradle Project 생성 및 의존성, DB 설정 (application properties)

 

- Spring Web, Apche Camel, MySQL Driver dependency 추가

 

- Camel-SQL component dependency 추가

. SQL component는 camel core에 포함되어있지 않으므로 따로 추가해주어야 함

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-sql</artifactId>
 <version>${camel.version}</version>
</dependency>

- application properties에 MySQL 연결 설정

. createDatabaseIfNotExist: DB가 없으면 자동으로 생성해주는 설정

. local에 설정한 username/password가 정확해야 함

spring.datasource.url=jdbc:mysql://localhost:3306/bootdb?createDatabaseIfNotExist=true
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=0103
spring.datasource.platform=mysql
spring.datasource.initialization-mode=always

- schema-mysql.sql 파일 생성

. Spring Boot JDBC가 resources/schema-mysql.sql script를 읽어들여 자동으로 table을 생성해준다

DROP TABLE IF EXISTS employee;

CREATE TABLE employee (
  empId VARCHAR(10) NOT NULL,
  empName VARCHAR(100) NOT NULL
);

Domain, Service, Controller Class 구성

 

- Employee Domain Class 생성

@Getter
@Setter
@ToString
public class Employee {

    private String empId;
    private String empName;

}

- Service Class 생성

. Camel Routebuilder를 상속받아 camel route를 정의하는 class

. Autowiring the datasource been created for us b y Spring Boot JDBC

. Spring Boot Controller가 받은 Http 요청을 Service Class에서 처리

@Service
public class EmployeeService extends RouteBuilder {

    @Autowired
    DataSource dataSource;

    public DataSource getDataSource() {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    // define the SQL Component bean which will be used as an endpoint in our route
    @Bean
    public SqlComponent sql(DataSource dataSource) {
        SqlComponent sql = new SqlComponent();
        sql.setDataSource(dataSource);
        return sql;
    }

    @Override
    public void configure() throws Exception {
        //Insert Route
        from("direct:insert").log("Processing message ").setHeader("message", body()).process(
            new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    //take the Employee object from the exchange and create the parameter map
                    Employee employee = exchange.getIn().getBody(Employee.class);
                    Map<String, Object> employeeMap = new HashMap<>();
                    employeeMap.put("EmpId", employee.getEmpId());
                    employeeMap.put("EmpName", employee.getEmpName());
                    exchange.getIn().setBody(employeeMap);
                }
            }).to("sql:INSERT INTO employee(EmpId, EmpName) VALUES (:#EmpId, :#EmpName)");
        //Select Route
        from("direct:select").to("sql:select * from employee").process(
            new org.apache.camel.Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    //the camel sql select query has been executed. We get the list of employees.
                    ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) exchange.getIn()
                        .getBody();
                    List<Employee> employees = new ArrayList<Employee>();
                    System.out.println(dataList);
                    for (Map<String, String> data : dataList) {
                        Employee employee = new Employee();
                        employee.setEmpId(data.get("empId"));
                        employee.setEmpName(data.get("empName"));
                        employees.add(employee);
                    }
                    exchange.getIn().setBody(employees);
                }
            });
    }
}

. POST(insert) 요청은 direct component(direct:insert)로 연결 → exchange에서 Employee 객체 정보를 꺼내서 parameter map  생성하고 exchange에 parameter map을 set해준다

. GET(select) 요청은 direct component(direct:select)연결하여 별도의 processing 없이 DB로 query를 날리게 되고, query가 실행된 후에 select 결과를 exchange에 담는다

 

- Controller Class 생성

. ProducerTemplate 를 사용해서 Camel Route를 호출하게 됨

@RestController
public class EmployeeController {
    @Autowired
    ProducerTemplate producerTemplate;

    @RequestMapping(value = "/employees", method = RequestMethod.GET)
    public List<Employee> getAllEmployees() {
        List<Employee> employees = producerTemplate.requestBody("direct:select", null, List.class);
        return employees;
    }

    @RequestMapping(value = "/employees", consumes = "application/json", method = RequestMethod.POST)
    public boolean insertEmployee(@RequestBody Employee employee) {
        producerTemplate.requestBody("direct:insert", employee, List.class);
        return true;
    }
}

API TEST

 

. POSTMAN으로 GET/POST 요청 후 결과 확인

POST API CALL
GET API CALL


ActiveMQ 설치


Camel Dependency 추가

 

- ActiveMQ 컴포넌트 추가

. ActiveMQ 컴포넌트는 Camel 컴포넌트가 아니기 때문에 pom.xml에 dependency 추가해주어야 한다.


Sender(Producer)

 

- Message Sender Route 정의

@Component
public class ActiveMqSenderRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        //timer
        from("timer:active-mq-timer?period=10000")
            .transform().constant("My message for Active MQ")
            .to("activemq:my-activemq-queue");
        //queue
    }
}

. sender(producer) route를 정의한 camel application을 구동하면, 아래와 같이 queue가 생성됨

. message가 생성되는 period는 30초로 설정 (timer 컴포넌트 옵션)

. 아직 receiver(consumer)가 없으므로 메시지가 pending messages로 쌓이는 것을 확인할 수 있음

- Message Receiver Route 정의 

@Component
public class ActiveMqReceiverRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("activemq:my-activemq-queue")
            .to("log:received-message-from-active-mq");
    }

}

. receiver(consumer) route를 정의한 camel application을 구동하면, 아래와 같이 queue에서 메시지를 가져와 log를 생성

. Queue 상태창을 보면 pending되었던 메시지들이 모두 소비된 것과, consumer application이 구동 중인 것을 확인할 수 있음

Route 정의

. file 컴포넌트 다른 Camel 컴포넌트에서 파일을 처리하거나, 다른 컴포넌트로부터 온 메시지를 디스크에 저장할 수 있게 해준다.

. log를 찍어보면 파일의 contents를 확인할 수 있다.

Message Transformation

 

- By Using transform() method

@Component
public class MyTimerRouterTransformation extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")

            //// Transformation by Processing Method  ////
            .transform().constant("Time now is " + LocalDateTime.now())
            .log("${body}")

            .to("log:first-timer"); // Exchange[ExchangePattern: InOnly, BodyType: String, Body: My Constant Message]
    }
}

 

- By Using a Bean

2-1) bean name 호출을 통한 transformation

@Component
public class MyTimerRouterBeanTransformationA extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")

            //// Bean Transformation ////
            .bean("getCurrentTimeBeanA") // bean 이름으로 bean 호출
            .log("${body}")

            .to("log:first-timer");
    }
}
@Component
public class GetCurrentTimeBeanA {

    public String getCurrentTime1() {
        return "Time now is " + LocalDateTime.now();
    }
}

 

2-2) Autowired 어노테이션으로 생성한 객체를 호출하여 transformation

. 아래와 같이 bean에 메소드가 2개 이상일 경우, 반드시 메소드명까지 명시해야 함

@Component
public class MyTimerRouterBeanTransformationB extends RouteBuilder {

    @Autowired
    private GetCurrentTimeBeanB getCurrentTimeBeanB;

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")            

            //// Bean Transformation ////
            .bean(getCurrentTimeBeanB, "getCurrentTime1") // 메소드가 2개이상이면 메소드이름도 명시해야함
            .log("${body}")

            .to("log:first-timer");
    }
}
@Component
public class GetCurrentTimeBeanB {

    public String getCurrentTime1() {
        return "1. Time now is " + LocalDateTime.now();
    }

    public String getCurrentTime2() {
        return "2. Time now is " + LocalDateTime.now();
    }
}


Message Processing

 

- By Using a Bean

. (3) getCurrentTimeBean을 통한 message transformation 이후에 message body가 바뀌었음을 알 수 있다
. (5) bean processing 이후에는 body가 변하지 않는다.
. bean method의 return type이 String이면 body 내용이 변하고(Transformation), void이면 message body가 변하지 않는다(Processing).

 

- By Creating Processor

Route 정의

- RouteBuilder를 상속받아 Route 정의

. timer 컴포넌트는 message exchange를 발생시키는 컴포넌트이며, 생성된 메시지는 오로지 소비(consume)만 가능

. log 컴포넌트는 message exchange에 대한 log를 기록

. 두 Endpoint 사이에서 log() 메소드로 message 내용을 확인할 수 있음 

@Component
public class MyFirstTimerRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}") // null
            .transform().constant("My Constant Message")
            .log("${body}") // My Constant Message
            .to("log:first-timer"); // Exchange[ExchangePattern: InOnly, BodyType: String, Body: My Constant Message]
    }
}

  • pom.xml 에서 camel version 설정

+ Recent posts