Route 정의

. file 컴포넌트 다른 Camel 컴포넌트에서 파일을 처리하거나, 다른 컴포넌트로부터 온 메시지를 디스크에 저장할 수 있게 해준다.

. log를 찍어보면 파일의 contents를 확인할 수 있다.

Message Transformation

 

- By Using transform() method

@Component
public class MyTimerRouterTransformation extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")

            //// Transformation by Processing Method  ////
            .transform().constant("Time now is " + LocalDateTime.now())
            .log("${body}")

            .to("log:first-timer"); // Exchange[ExchangePattern: InOnly, BodyType: String, Body: My Constant Message]
    }
}

 

- By Using a Bean

2-1) bean name 호출을 통한 transformation

@Component
public class MyTimerRouterBeanTransformationA extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")

            //// Bean Transformation ////
            .bean("getCurrentTimeBeanA") // bean 이름으로 bean 호출
            .log("${body}")

            .to("log:first-timer");
    }
}
@Component
public class GetCurrentTimeBeanA {

    public String getCurrentTime1() {
        return "Time now is " + LocalDateTime.now();
    }
}

 

2-2) Autowired 어노테이션으로 생성한 객체를 호출하여 transformation

. 아래와 같이 bean에 메소드가 2개 이상일 경우, 반드시 메소드명까지 명시해야 함

@Component
public class MyTimerRouterBeanTransformationB extends RouteBuilder {

    @Autowired
    private GetCurrentTimeBeanB getCurrentTimeBeanB;

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}")            

            //// Bean Transformation ////
            .bean(getCurrentTimeBeanB, "getCurrentTime1") // 메소드가 2개이상이면 메소드이름도 명시해야함
            .log("${body}")

            .to("log:first-timer");
    }
}
@Component
public class GetCurrentTimeBeanB {

    public String getCurrentTime1() {
        return "1. Time now is " + LocalDateTime.now();
    }

    public String getCurrentTime2() {
        return "2. Time now is " + LocalDateTime.now();
    }
}


Message Processing

 

- By Using a Bean

. (3) getCurrentTimeBean을 통한 message transformation 이후에 message body가 바뀌었음을 알 수 있다
. (5) bean processing 이후에는 body가 변하지 않는다.
. bean method의 return type이 String이면 body 내용이 변하고(Transformation), void이면 message body가 변하지 않는다(Processing).

 

- By Creating Processor

Route 정의

- RouteBuilder를 상속받아 Route 정의

. timer 컴포넌트는 message exchange를 발생시키는 컴포넌트이며, 생성된 메시지는 오로지 소비(consume)만 가능

. log 컴포넌트는 message exchange에 대한 log를 기록

. 두 Endpoint 사이에서 log() 메소드로 message 내용을 확인할 수 있음 

@Component
public class MyFirstTimerRouter extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("timer:first-timer")
            .log("${body}") // null
            .transform().constant("My Constant Message")
            .log("${body}") // My Constant Message
            .to("log:first-timer"); // Exchange[ExchangePattern: InOnly, BodyType: String, Body: My Constant Message]
    }
}

  • pom.xml 에서 camel version 설정

CamelContext

  . Camel의 핵심 런타임 API로 컴포넌트, 프로세서, Endpoint, 데이터타입, 라우팅 등을 관리

  . CamelContext에 의해서 다양한 모듈이 로딩되고 관리된다.

  . RouteBuilder가 Route를 객체화하여 Context에 binding

** Registry: JNDI registry로, Spring으로 camel을 사용하면 ApplicationContext가 되고 OSGI 컨테이너에 camel을 탑재해 사용하면 OSGI registry가 됨

 

Route

. 컴포넌트, 프로세서들의 연속적인 메시지 연결 정의

. 송신 Component → Proceessor → 수신 Comonent로 하나의 Route가 정의됨

. 1:1 혹은 1:N, N:1 등 다양하게 정의될 수 있다

. https://camel.apache.org/manual/routes.html

Component 

. 일종의 아답터 개념

. Endpoint URL을 가지는 Camel이 메시지를 라우팅 할 수 있는 프로그램 단위

. 통신 프로토콜을 구현한 컴포넌트, 파일시스템 연동을 구현한 컴포넌트 등 자주 사용되는 거의 대부분의 기술에 대해 이미 제작되어있는 Component를 지원함

. 맞춤 Component 작성 기능도 제공

. 동일한 접근법과 구문을 사용하여 서로 다른 기술에 대한 인터페이스 제공 → 어떤 종류의 Transport가 사용되는지에 관계없이 동일한 API로 작업할 수 있게 해줌

. https://camel.apache.org/components/next/

Processor

. Camel 내부에서 메시지를 생성, 형식 변환, 수정, 검증, 필터링 등의 작업을 하여 다른 컴포넌트로 라우팅하는 모듈

. Camel은 EIP 패턴에 기반한 메시지 프로세싱을 지원한다

. Java 인터페이스를 구현해 메시지에 대한 거의 모든 처리를 구현할 수 있음

. https://camel.apache.org/manual/processor.html

 

CamelContext 생성 예시 

https://www.tutorialspoint.com/apache_camel/apache_camel_camelcontext.htm

1) with simple route/simple filter

CamelContext context = new DefaultCamelContext(); // CarnmelContext 생성
try {    
   context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {  // 하나의 configure 매소드에 여러 route 설정 가능
         from("direct:DistributeOrderDSL")        // .xml을 사용한 Spring DSL이나 Scala DSL 등도 사용 가능
         .split(xpath("//order[@product = 'soaps']/items")) // 상품이 "비누"인 경우만 filter
         .to("stream:out"); }  }
} );

2) with simple route/multiple-predicates filter

from("direct:DistributeOrderDSL")
   .choice()
      .when(header("order").isEqualTo("oil"))
         .to("direct:oil")
      .when(header("order").isEqualTo("milk"))
         .to("direct:milk")
      .otherwise()
         .to("direct:d");

3) with simple route/simple filter/custom processor

Processor myCustomProcessor = new Processor() {
   public void process(Exchange exchange) {
      // implement your custom processing
   }
};
CamelContext context = new DefaultCamelContext(); 
try {    
   context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {  
         from("direct:DistributeOrderDSL")        
         .split(xpath("//order[@product = 'soaps']/items")) 
         .process(myProcessor);}
} );

 

DSL(Domain Specific Language)

  . 컴포넌트와 프로세서를 통해 라우트 구성을 정의하기 위해 사용하는 언어

  . DSL을 통해 다양한 동적 라우팅 및 메시지 변환 등 프로그래밍 요소를 삽입 가능

  . DSL은 Route의 Processor 부분을 정의하는데 주로 사용되는데, 송/수신 Component만 정의되면 사실상 시스템 연계에서 구현해야 되는 부분은 거의 Processor이며

    Processor의 로직 대부분은 메세지 처리/변환/라우팅에 해당하는 내용이기 때문에 특수 목적의 DSL을 사용할 수 있으며 DSL 사용을 통해서 개발 생산성이나 코드양을 획기적으로 줄일 수 있음

  . http://camel.apache.org/dsl.html

EndPoint

  . Camel에서 라우팅을 위해 URI 형태로 기술한 컴포넌트 주소

  . 웹 서비스 URI, 대기열(queue) URI, 파일, 전자 메일 주소 등을 참조 가능

  . DSL에서 from() 안에 사용되는 엔드포인트를 Consumer 엔드포인트, to() 안에 사용되는 것을 Producer 엔드포인트라고 함

  . https://camel.apache.org/manual/endpoint.html

Producer

  . Endpoint에 메시지를 생성, 전달하는 개체(송신측)

Consumer

  . Producer에 의해 생성된 메시지를 수신하는 개체(수신측)

  . 수신 후 Exchange를 생성하여 라우터에 던져준다

'~2022 > Apache Camel' 카테고리의 다른 글

[Apache Camel] Feature  (0) 2022.03.12
[Apache Camel] Overview  (0) 2022.03.12

Camel Feature

 

통합 연계 라우팅 엔진

  . Camel은 메시지 라우팅 연계 엔진

  . 다양한 프로토콜, 메시지를 통합하여 일관된 인터페이스로 처리 가능

  . 기본적으로 Sender로부터 메시지를 받아 Receiver에 전달하는 역할

    + 위와 같이 복잡한 환경을 통합하기 위한 여러 기능을 제공

        1) 메시지 포맷 변환 (Format Translation) : ftp → http / jms → json

        2) 메시지 콘텐츠에 따른 필터링 (Filtering, User-Defined)

        3) 라우팅 규칙 정의 (Routing Rules) : Sender A → Receiver B,C / Sender B → Receiver A, C

  . Maintainalbe & Scalable

        1) 송/수신측 메시지 형식에 관계없이 동일한 API를 사용하기 때문에 Sender/Receiver의 변화에 유연함

        2) 새로운 Sender/Receiver 추가가 용이

 

Lightwight Core 라이브러리 모듈

  . camel core 모듈은 4M 정도의 라이브러리 형태로 가벼움

  . 다양한 컨테이너에 포함되어 구동될 수 있고 마이크로 서비스 형태의 서비스가 가능함

 

EIP (Enterprise Integration Patterns) 구현

  . Gregor Hohpe와 Bobby Woolf가 시스템 통합의 문제 해결을 위한 유사한 해법들을 패턴으로 정리한 것

  . 기업 연계 패턴을 분석하여 패턴화 시킨 것 → 메시지 변환, 라우팅, 로그 추적을 위한 글로벌 트랜잭션, 장애시 재처리 등 시스템 연동 등

  . Camel을 통해 EIP 패턴에 기반한 다양한 연계 유형을 조립하여 구성할 수 있음

https://camel.apache.org/components/3.15.x/eips/enterprise-integration-patterns.html

 

DSL (Domain Specific Language) 지원

  . 메시지 라우팅 및 메시지 프로세싱을 다양한 언어로 기술할 수 있도록 함

  . Java, XML, Groovy, Scalar, Kotlin 등 다양한 언어 지원

 

다양한 연계 컴포넌트(150+), 메시지 변환 프로세서 제공

  . 다양한 연계 컴포넌트, 메시지 변환 프로세스가 제작되어 포함되어 있음

  . 필요 시 사용자 컴포넌트를 제작하여 함께 구동시킬 수 있음

'~2022 > Apache Camel' 카테고리의 다른 글

[Apache Camel] Architecture  (0) 2022.03.12
[Apache Camel] Overview  (0) 2022.03.12

Camel Overview

 

Open-source integration framework based on EIP(Enterprise Integration Patterns)

  . 시스템 통합(System Integration)을 위한 오픈 소스 자바 프레임워크

  . Camel: Concise Application Message Exchange Language

  . 대부분의 EIP(Enterprise Integration Patterns) 연계 패턴 구현

  . 라우트 구성을 위해 다양한 언어로 구현된 DSL 지원(Java, XML, Scala, Groovy 등)

  . 다양한 연계 컴포넌트가 구현되어 있고 사용 가능(150+)

  . 특정 컨테이너나 프레임워크에 의존성 없음 → OSGI 컨테이너나 WAS, Spring 등에 탑재 가능하며, 마이크로 서비스 형태로 서비스 가능

'~2022 > Apache Camel' 카테고리의 다른 글

[Apache Camel] Architecture  (0) 2022.03.12
[Apache Camel] Feature  (0) 2022.03.12

[우선  프로듀서의 idempotence에 대해 알아보자]

 

** 멱등성(idempotence)
  . 멱등성이란 연산을 여러번 적용하더라도 결과가 달라지지 않는 것.
  . REST API를 예로 들자면 get, put, delete는 멱등성을 가지고 있지만, post는 상태를 변경하기 때문에 멱등성이 없다.


** 카프카에서 멱등성 프로듀서란?
프로듀서가 동일한 데이터를 여러번 보내더라도 브로커 단위에서 1번만 적재하는 것.
  . 프로듀서는 retry 로직에 의해서 동일 ProducerRecord가 브로커에 여러 번 적재될 가능성이 있음

  . 프로듀서는 ProducerRecord를 전송할 때 데이터에 숫자(Sequence)와
    PID(프로듀서 번호)를 보냄으로서 브로커가 중복 처리

 

 

[멱등성 프로듀서]

 

멱등성 프로듀서로 동작하지 않는 기본 프로듀서는 send() 이후에 데이터에 대한 적재 응답을 제대로 못받았을 경우에는 데이터를 다시 보내게 됩니다. 이것은 retry 로직의 자연스러운 데이터 처리 방법인데, 카프카는 기본 옵션으로 사용했을 때 "at least once" 즉 적어도 한 번 이상 데이터를 보내는 로직으로 되어 있기 때문입니다. 이 방식은 데이터를 유실 없이 안정적으로 처리하지만, 중복해서 데이터를 처리할 가능성이 있습니다.


동일한 레코드가 여러 번 들어가는 것을 원치 않을때는 멱등성 프로듀서를 사용하면 됩니다.
브로커는 PID와 Sequence 번호를 토대로 데이터가 중복되어 전송되지 않았는지 확인합니다.

 

 

[멱등성 프로듀서의 설정과 장/단점]

 

** 설정 방법
  . ENABLE_IDEMPOTENCE_CONFIG = true


** 장점
  . 프로듀서와 브로커 사이의 데이터 전송에 있어 레코드가 딱 한번만 적재됨을 보장
  . 딱 한번만 적재(Exactly-once)되므로 데이터 유실이나 중복이 발생하면 안되는 곳에서는 반드시 이 옵션을 켜야 함


** 단점
  . 브로커는 PID(프로듀스 아이디)와 레코드 시퀀스넘버를 가지고 있으면서 계속 모니터링하므로 브로커에 부하가 발생하므로(= 데이터의 처리속도가 느려질 수 있음, 데이터의 처리량이 낮아질 수 있음) 브로커의 CPU나 메모리 사용량을 고려해야 함
  . 프로듀서의 장애애 의해서 프로듀서가 재시작 되는 경우에는 PID가 달라지므로 이에 대한 대응책이 필요함

 

 

[트랜잭션이란?]

 

** 데이터베이스의 상태를 변환시키는 하나의 논리적인 기능을 수행하는 작업의 단위 또는 일련의 연산
  . 커밋, 롤백을 통해 논리적인 수행 단위를 완료 또는 복구 가능

 

** 카프카에 트랜잭션이란?
  . 여러 파티션에 데이터를 atomic(원자성)하게 보내는 것
  . 여러 작업 단위를 여러 토픽에 보낼 경우 논리적으로 수행한 단위가 완료되었음을 begin과 commit으로 지정할 수 있다.

 

카프카에서는 스트림 데이터 처리에 있어 트랜잭션 개념을 추가하였습니다.
즉 카프카에서 트랜잭션이란 스트림 트랜잭션에 가깝습니다.

 

트랜잭션 프로듀서는 beginTransaction()을 호출해서 원자 단위로 묶을 레코드들이 이제 전송된다는 것을 선언합니다. 그리고 트랜잭션 컨슈머는 트랜잭션이 시작된 데이터에 대해서 즉시 가져가지는 않습니다. 왜냐하면 커밋이 되어서 원자 단위로 트랜잭션이 끝나지 않았기 때문입니다.

트랜잭션 프로듀서는 하나의 트랜잭션으로 묶일 레코드를 모두 보내고 난 이후에 commitTransaction()을 호출해서 트랜잭션이 끝났음을 알립니다. 이후 트랜잭션 컨슈머는 커밋이 완료된 데이터를 가져가서 처리하게 됩니다.

스트림데이터에서 트랜잭션이란 여러 레코드를 하나의 단위로 묶어서 처리하는 것이라고 볼 수 있습니다.

 

[트랜잭션 프로듀서, 컨슈머 설정 방법]

 

** 트랜잭션 프로듀서
  . 설정
     - TRANSACTION_ID_CONFIG = UUID.randomUUID().toString() -> 유일한 값으로 설정해야 함
  . 트랜잭션 프로듀서를 설정하면 자동으로 멱등성 프로듀서가 설정된다.


** 트랜잭션 컨슈머
  . 설정
     - ISOLATION_LEVEL_CONFIG = "read_committed"
  . 프로듀서가 커밋하기 이전 레코드는 가져가지 않는다
  . 만약 기본 설정값(read_uncommitted)일 경우에는 커밋되지 않은 레코드도 모두 가져가서 처리한다.

 

 

[언제 트랜잭션 프로듀서와 트랜잭션 컨슈머를 사용해아 할까?]

 

** 여러 이벤트가 반드시 동시에 처리되어야 하는 경우

'~2022 > Apache Kafka' 카테고리의 다른 글

[Kafka] server.properties  (0) 2022.04.06
[Kafka] 기본 개념  (0) 2022.03.16
[Apache Kafka] 9. 컨슈머와 컨슈머 그룹  (0) 2021.11.11
[Apache Kafka] 8. 복제  (0) 2021.11.11
[Apache Kafka] 7. 프로듀서  (0) 2021.11.11

+ Recent posts