We will be making use of Spring Boot and Camel SQL Component for inserting and retrieving records from MySQL

The SQL component allows you to work with databases using JDBC queries. 

The SQL component allows you to work with databases using JDBC queries. 

The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query.

So it is much easier to use parameterized queries. In case of the scenes for the actual SQL handling, while JDBC component uses the standard JDBC API. 

 

So Camel SQL component is much advanced as we can also make use of features like Spring transaction


Gradle Project 생성 및 의존성, DB 설정 (application properties)

 

- Spring Web, Apche Camel, MySQL Driver dependency 추가

 

- Camel-SQL component dependency 추가

. SQL component는 camel core에 포함되어있지 않으므로 따로 추가해주어야 함

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-sql</artifactId>
 <version>${camel.version}</version>
</dependency>

- application properties에 MySQL 연결 설정

. createDatabaseIfNotExist: DB가 없으면 자동으로 생성해주는 설정

. local에 설정한 username/password가 정확해야 함

spring.datasource.url=jdbc:mysql://localhost:3306/bootdb?createDatabaseIfNotExist=true
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=0103
spring.datasource.platform=mysql
spring.datasource.initialization-mode=always

- schema-mysql.sql 파일 생성

. Spring Boot JDBC가 resources/schema-mysql.sql script를 읽어들여 자동으로 table을 생성해준다

DROP TABLE IF EXISTS employee;

CREATE TABLE employee (
  empId VARCHAR(10) NOT NULL,
  empName VARCHAR(100) NOT NULL
);

Domain, Service, Controller Class 구성

 

- Employee Domain Class 생성

@Getter
@Setter
@ToString
public class Employee {

    private String empId;
    private String empName;

}

- Service Class 생성

. Camel Routebuilder를 상속받아 camel route를 정의하는 class

. Autowiring the datasource been created for us b y Spring Boot JDBC

. Spring Boot Controller가 받은 Http 요청을 Service Class에서 처리

@Service
public class EmployeeService extends RouteBuilder {

    @Autowired
    DataSource dataSource;

    public DataSource getDataSource() {
        return dataSource;
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    // define the SQL Component bean which will be used as an endpoint in our route
    @Bean
    public SqlComponent sql(DataSource dataSource) {
        SqlComponent sql = new SqlComponent();
        sql.setDataSource(dataSource);
        return sql;
    }

    @Override
    public void configure() throws Exception {
        //Insert Route
        from("direct:insert").log("Processing message ").setHeader("message", body()).process(
            new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    //take the Employee object from the exchange and create the parameter map
                    Employee employee = exchange.getIn().getBody(Employee.class);
                    Map<String, Object> employeeMap = new HashMap<>();
                    employeeMap.put("EmpId", employee.getEmpId());
                    employeeMap.put("EmpName", employee.getEmpName());
                    exchange.getIn().setBody(employeeMap);
                }
            }).to("sql:INSERT INTO employee(EmpId, EmpName) VALUES (:#EmpId, :#EmpName)");
        //Select Route
        from("direct:select").to("sql:select * from employee").process(
            new org.apache.camel.Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    //the camel sql select query has been executed. We get the list of employees.
                    ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) exchange.getIn()
                        .getBody();
                    List<Employee> employees = new ArrayList<Employee>();
                    System.out.println(dataList);
                    for (Map<String, String> data : dataList) {
                        Employee employee = new Employee();
                        employee.setEmpId(data.get("empId"));
                        employee.setEmpName(data.get("empName"));
                        employees.add(employee);
                    }
                    exchange.getIn().setBody(employees);
                }
            });
    }
}

. POST(insert) 요청은 direct component(direct:insert)로 연결 → exchange에서 Employee 객체 정보를 꺼내서 parameter map  생성하고 exchange에 parameter map을 set해준다

. GET(select) 요청은 direct component(direct:select)연결하여 별도의 processing 없이 DB로 query를 날리게 되고, query가 실행된 후에 select 결과를 exchange에 담는다

 

- Controller Class 생성

. ProducerTemplate 를 사용해서 Camel Route를 호출하게 됨

@RestController
public class EmployeeController {
    @Autowired
    ProducerTemplate producerTemplate;

    @RequestMapping(value = "/employees", method = RequestMethod.GET)
    public List<Employee> getAllEmployees() {
        List<Employee> employees = producerTemplate.requestBody("direct:select", null, List.class);
        return employees;
    }

    @RequestMapping(value = "/employees", consumes = "application/json", method = RequestMethod.POST)
    public boolean insertEmployee(@RequestBody Employee employee) {
        producerTemplate.requestBody("direct:insert", employee, List.class);
        return true;
    }
}

API TEST

 

. POSTMAN으로 GET/POST 요청 후 결과 확인

POST API CALL
GET API CALL


ActiveMQ 설치


Camel Dependency 추가

 

- ActiveMQ 컴포넌트 추가

. ActiveMQ 컴포넌트는 Camel 컴포넌트가 아니기 때문에 pom.xml에 dependency 추가해주어야 한다.


Sender(Producer)

 

- Message Sender Route 정의

@Component
public class ActiveMqSenderRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        //timer
        from("timer:active-mq-timer?period=10000")
            .transform().constant("My message for Active MQ")
            .to("activemq:my-activemq-queue");
        //queue
    }
}

. sender(producer) route를 정의한 camel application을 구동하면, 아래와 같이 queue가 생성됨

. message가 생성되는 period는 30초로 설정 (timer 컴포넌트 옵션)

. 아직 receiver(consumer)가 없으므로 메시지가 pending messages로 쌓이는 것을 확인할 수 있음

- Message Receiver Route 정의 

@Component
public class ActiveMqReceiverRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("activemq:my-activemq-queue")
            .to("log:received-message-from-active-mq");
    }

}

. receiver(consumer) route를 정의한 camel application을 구동하면, 아래와 같이 queue에서 메시지를 가져와 log를 생성

. Queue 상태창을 보면 pending되었던 메시지들이 모두 소비된 것과, consumer application이 구동 중인 것을 확인할 수 있음

Route 정의

. file 컴포넌트 다른 Camel 컴포넌트에서 파일을 처리하거나, 다른 컴포넌트로부터 온 메시지를 디스크에 저장할 수 있게 해준다.

. log를 찍어보면 파일의 contents를 확인할 수 있다.

Message Transformation

 

- By Using transform() method

@Component
public class MyTimerRouterTransformation extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")

            //// Transformation by Processing Method  ////
            .transform().constant("Time now is " + LocalDateTime.now())
            .log("${body}")

            .to("log:first-timer"); // Exchange[ExchangePattern: InOnly, BodyType: String, Body: My Constant Message]
    }
}

 

- By Using a Bean

2-1) bean name 호출을 통한 transformation

@Component
public class MyTimerRouterBeanTransformationA extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")

            //// Bean Transformation ////
            .bean("getCurrentTimeBeanA") // bean 이름으로 bean 호출
            .log("${body}")

            .to("log:first-timer");
    }
}
@Component
public class GetCurrentTimeBeanA {

    public String getCurrentTime1() {
        return "Time now is " + LocalDateTime.now();
    }
}

 

2-2) Autowired 어노테이션으로 생성한 객체를 호출하여 transformation

. 아래와 같이 bean에 메소드가 2개 이상일 경우, 반드시 메소드명까지 명시해야 함

@Component
public class MyTimerRouterBeanTransformationB extends RouteBuilder {

    @Autowired
    private GetCurrentTimeBeanB getCurrentTimeBeanB;

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")            

            //// Bean Transformation ////
            .bean(getCurrentTimeBeanB, "getCurrentTime1") // 메소드가 2개이상이면 메소드이름도 명시해야함
            .log("${body}")

            .to("log:first-timer");
    }
}
@Component
public class GetCurrentTimeBeanB {

    public String getCurrentTime1() {
        return "1. Time now is " + LocalDateTime.now();
    }

    public String getCurrentTime2() {
        return "2. Time now is " + LocalDateTime.now();
    }
}


Message Processing

 

- By Using a Bean

. (3) getCurrentTimeBean을 통한 message transformation 이후에 message body가 바뀌었음을 알 수 있다
. (5) bean processing 이후에는 body가 변하지 않는다.
. bean method의 return type이 String이면 body 내용이 변하고(Transformation), void이면 message body가 변하지 않는다(Processing).

 

- By Creating Processor

Route 정의

- RouteBuilder를 상속받아 Route 정의

. timer 컴포넌트는 message exchange를 발생시키는 컴포넌트이며, 생성된 메시지는 오로지 소비(consume)만 가능

. log 컴포넌트는 message exchange에 대한 log를 기록

. 두 Endpoint 사이에서 log() 메소드로 message 내용을 확인할 수 있음 

@Component
public class MyFirstTimerRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}") // null
            .transform().constant("My Constant Message")
            .log("${body}") // My Constant Message
            .to("log:first-timer"); // Exchange[ExchangePattern: InOnly, BodyType: String, Body: My Constant Message]
    }
}

  • pom.xml 에서 camel version 설정

+ Recent posts