CamelContext

  . Camel의 핵심 런타임 API로 컴포넌트, 프로세서, Endpoint, 데이터타입, 라우팅 등을 관리

  . CamelContext에 의해서 다양한 모듈이 로딩되고 관리된다.

  . RouteBuilder가 Route를 객체화하여 Context에 binding

** Registry: JNDI registry로, Spring으로 camel을 사용하면 ApplicationContext가 되고 OSGI 컨테이너에 camel을 탑재해 사용하면 OSGI registry가 됨

 

Route

. 컴포넌트, 프로세서들의 연속적인 메시지 연결 정의

. 송신 Component → Proceessor → 수신 Comonent로 하나의 Route가 정의됨

. 1:1 혹은 1:N, N:1 등 다양하게 정의될 수 있다

. https://camel.apache.org/manual/routes.html

Component 

. 일종의 아답터 개념

. Endpoint URL을 가지는 Camel이 메시지를 라우팅 할 수 있는 프로그램 단위

. 통신 프로토콜을 구현한 컴포넌트, 파일시스템 연동을 구현한 컴포넌트 등 자주 사용되는 거의 대부분의 기술에 대해 이미 제작되어있는 Component를 지원함

. 맞춤 Component 작성 기능도 제공

. 동일한 접근법과 구문을 사용하여 서로 다른 기술에 대한 인터페이스 제공 → 어떤 종류의 Transport가 사용되는지에 관계없이 동일한 API로 작업할 수 있게 해줌

. https://camel.apache.org/components/next/

Processor

. Camel 내부에서 메시지를 생성, 형식 변환, 수정, 검증, 필터링 등의 작업을 하여 다른 컴포넌트로 라우팅하는 모듈

. Camel은 EIP 패턴에 기반한 메시지 프로세싱을 지원한다

. Java 인터페이스를 구현해 메시지에 대한 거의 모든 처리를 구현할 수 있음

. https://camel.apache.org/manual/processor.html

 

CamelContext 생성 예시 

https://www.tutorialspoint.com/apache_camel/apache_camel_camelcontext.htm

1) with simple route/simple filter

CamelContext context = new DefaultCamelContext(); // CarnmelContext 생성
try {    
   context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {  // 하나의 configure 매소드에 여러 route 설정 가능
         from("direct:DistributeOrderDSL")        // .xml을 사용한 Spring DSL이나 Scala DSL 등도 사용 가능
         .split(xpath("//order[@product = 'soaps']/items")) // 상품이 "비누"인 경우만 filter
         .to("stream:out"); }  }
} );

2) with simple route/multiple-predicates filter

from("direct:DistributeOrderDSL")
   .choice()
      .when(header("order").isEqualTo("oil"))
         .to("direct:oil")
      .when(header("order").isEqualTo("milk"))
         .to("direct:milk")
      .otherwise()
         .to("direct:d");

3) with simple route/simple filter/custom processor

Processor myCustomProcessor = new Processor() {
   public void process(Exchange exchange) {
      // implement your custom processing
   }
};
CamelContext context = new DefaultCamelContext(); 
try {    
   context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {  
         from("direct:DistributeOrderDSL")        
         .split(xpath("//order[@product = 'soaps']/items")) 
         .process(myProcessor);}
} );

 

DSL(Domain Specific Language)

  . 컴포넌트와 프로세서를 통해 라우트 구성을 정의하기 위해 사용하는 언어

  . DSL을 통해 다양한 동적 라우팅 및 메시지 변환 등 프로그래밍 요소를 삽입 가능

  . DSL은 Route의 Processor 부분을 정의하는데 주로 사용되는데, 송/수신 Component만 정의되면 사실상 시스템 연계에서 구현해야 되는 부분은 거의 Processor이며

    Processor의 로직 대부분은 메세지 처리/변환/라우팅에 해당하는 내용이기 때문에 특수 목적의 DSL을 사용할 수 있으며 DSL 사용을 통해서 개발 생산성이나 코드양을 획기적으로 줄일 수 있음

  . http://camel.apache.org/dsl.html

EndPoint

  . Camel에서 라우팅을 위해 URI 형태로 기술한 컴포넌트 주소

  . 웹 서비스 URI, 대기열(queue) URI, 파일, 전자 메일 주소 등을 참조 가능

  . DSL에서 from() 안에 사용되는 엔드포인트를 Consumer 엔드포인트, to() 안에 사용되는 것을 Producer 엔드포인트라고 함

  . https://camel.apache.org/manual/endpoint.html

Producer

  . Endpoint에 메시지를 생성, 전달하는 개체(송신측)

Consumer

  . Producer에 의해 생성된 메시지를 수신하는 개체(수신측)

  . 수신 후 Exchange를 생성하여 라우터에 던져준다

'~2022 > Apache Camel' 카테고리의 다른 글

[Apache Camel] Feature  (0) 2022.03.12
[Apache Camel] Overview  (0) 2022.03.12

Camel Feature

 

통합 연계 라우팅 엔진

  . Camel은 메시지 라우팅 연계 엔진

  . 다양한 프로토콜, 메시지를 통합하여 일관된 인터페이스로 처리 가능

  . 기본적으로 Sender로부터 메시지를 받아 Receiver에 전달하는 역할

    + 위와 같이 복잡한 환경을 통합하기 위한 여러 기능을 제공

        1) 메시지 포맷 변환 (Format Translation) : ftp → http / jms → json

        2) 메시지 콘텐츠에 따른 필터링 (Filtering, User-Defined)

        3) 라우팅 규칙 정의 (Routing Rules) : Sender A → Receiver B,C / Sender B → Receiver A, C

  . Maintainalbe & Scalable

        1) 송/수신측 메시지 형식에 관계없이 동일한 API를 사용하기 때문에 Sender/Receiver의 변화에 유연함

        2) 새로운 Sender/Receiver 추가가 용이

 

Lightwight Core 라이브러리 모듈

  . camel core 모듈은 4M 정도의 라이브러리 형태로 가벼움

  . 다양한 컨테이너에 포함되어 구동될 수 있고 마이크로 서비스 형태의 서비스가 가능함

 

EIP (Enterprise Integration Patterns) 구현

  . Gregor Hohpe와 Bobby Woolf가 시스템 통합의 문제 해결을 위한 유사한 해법들을 패턴으로 정리한 것

  . 기업 연계 패턴을 분석하여 패턴화 시킨 것 → 메시지 변환, 라우팅, 로그 추적을 위한 글로벌 트랜잭션, 장애시 재처리 등 시스템 연동 등

  . Camel을 통해 EIP 패턴에 기반한 다양한 연계 유형을 조립하여 구성할 수 있음

https://camel.apache.org/components/3.15.x/eips/enterprise-integration-patterns.html

 

DSL (Domain Specific Language) 지원

  . 메시지 라우팅 및 메시지 프로세싱을 다양한 언어로 기술할 수 있도록 함

  . Java, XML, Groovy, Scalar, Kotlin 등 다양한 언어 지원

 

다양한 연계 컴포넌트(150+), 메시지 변환 프로세서 제공

  . 다양한 연계 컴포넌트, 메시지 변환 프로세스가 제작되어 포함되어 있음

  . 필요 시 사용자 컴포넌트를 제작하여 함께 구동시킬 수 있음

'~2022 > Apache Camel' 카테고리의 다른 글

[Apache Camel] Architecture  (0) 2022.03.12
[Apache Camel] Overview  (0) 2022.03.12

Camel Overview

 

Open-source integration framework based on EIP(Enterprise Integration Patterns)

  . 시스템 통합(System Integration)을 위한 오픈 소스 자바 프레임워크

  . Camel: Concise Application Message Exchange Language

  . 대부분의 EIP(Enterprise Integration Patterns) 연계 패턴 구현

  . 라우트 구성을 위해 다양한 언어로 구현된 DSL 지원(Java, XML, Scala, Groovy 등)

  . 다양한 연계 컴포넌트가 구현되어 있고 사용 가능(150+)

  . 특정 컨테이너나 프레임워크에 의존성 없음 → OSGI 컨테이너나 WAS, Spring 등에 탑재 가능하며, 마이크로 서비스 형태로 서비스 가능

'~2022 > Apache Camel' 카테고리의 다른 글

[Apache Camel] Architecture  (0) 2022.03.12
[Apache Camel] Feature  (0) 2022.03.12

[우선  프로듀서의 idempotence에 대해 알아보자]

 

** 멱등성(idempotence)
  . 멱등성이란 연산을 여러번 적용하더라도 결과가 달라지지 않는 것.
  . REST API를 예로 들자면 get, put, delete는 멱등성을 가지고 있지만, post는 상태를 변경하기 때문에 멱등성이 없다.


** 카프카에서 멱등성 프로듀서란?
프로듀서가 동일한 데이터를 여러번 보내더라도 브로커 단위에서 1번만 적재하는 것.
  . 프로듀서는 retry 로직에 의해서 동일 ProducerRecord가 브로커에 여러 번 적재될 가능성이 있음

  . 프로듀서는 ProducerRecord를 전송할 때 데이터에 숫자(Sequence)와
    PID(프로듀서 번호)를 보냄으로서 브로커가 중복 처리

 

 

[멱등성 프로듀서]

 

멱등성 프로듀서로 동작하지 않는 기본 프로듀서는 send() 이후에 데이터에 대한 적재 응답을 제대로 못받았을 경우에는 데이터를 다시 보내게 됩니다. 이것은 retry 로직의 자연스러운 데이터 처리 방법인데, 카프카는 기본 옵션으로 사용했을 때 "at least once" 즉 적어도 한 번 이상 데이터를 보내는 로직으로 되어 있기 때문입니다. 이 방식은 데이터를 유실 없이 안정적으로 처리하지만, 중복해서 데이터를 처리할 가능성이 있습니다.


동일한 레코드가 여러 번 들어가는 것을 원치 않을때는 멱등성 프로듀서를 사용하면 됩니다.
브로커는 PID와 Sequence 번호를 토대로 데이터가 중복되어 전송되지 않았는지 확인합니다.

 

 

[멱등성 프로듀서의 설정과 장/단점]

 

** 설정 방법
  . ENABLE_IDEMPOTENCE_CONFIG = true


** 장점
  . 프로듀서와 브로커 사이의 데이터 전송에 있어 레코드가 딱 한번만 적재됨을 보장
  . 딱 한번만 적재(Exactly-once)되므로 데이터 유실이나 중복이 발생하면 안되는 곳에서는 반드시 이 옵션을 켜야 함


** 단점
  . 브로커는 PID(프로듀스 아이디)와 레코드 시퀀스넘버를 가지고 있으면서 계속 모니터링하므로 브로커에 부하가 발생하므로(= 데이터의 처리속도가 느려질 수 있음, 데이터의 처리량이 낮아질 수 있음) 브로커의 CPU나 메모리 사용량을 고려해야 함
  . 프로듀서의 장애애 의해서 프로듀서가 재시작 되는 경우에는 PID가 달라지므로 이에 대한 대응책이 필요함

 

 

[트랜잭션이란?]

 

** 데이터베이스의 상태를 변환시키는 하나의 논리적인 기능을 수행하는 작업의 단위 또는 일련의 연산
  . 커밋, 롤백을 통해 논리적인 수행 단위를 완료 또는 복구 가능

 

** 카프카에 트랜잭션이란?
  . 여러 파티션에 데이터를 atomic(원자성)하게 보내는 것
  . 여러 작업 단위를 여러 토픽에 보낼 경우 논리적으로 수행한 단위가 완료되었음을 begin과 commit으로 지정할 수 있다.

 

카프카에서는 스트림 데이터 처리에 있어 트랜잭션 개념을 추가하였습니다.
즉 카프카에서 트랜잭션이란 스트림 트랜잭션에 가깝습니다.

 

트랜잭션 프로듀서는 beginTransaction()을 호출해서 원자 단위로 묶을 레코드들이 이제 전송된다는 것을 선언합니다. 그리고 트랜잭션 컨슈머는 트랜잭션이 시작된 데이터에 대해서 즉시 가져가지는 않습니다. 왜냐하면 커밋이 되어서 원자 단위로 트랜잭션이 끝나지 않았기 때문입니다.

트랜잭션 프로듀서는 하나의 트랜잭션으로 묶일 레코드를 모두 보내고 난 이후에 commitTransaction()을 호출해서 트랜잭션이 끝났음을 알립니다. 이후 트랜잭션 컨슈머는 커밋이 완료된 데이터를 가져가서 처리하게 됩니다.

스트림데이터에서 트랜잭션이란 여러 레코드를 하나의 단위로 묶어서 처리하는 것이라고 볼 수 있습니다.

 

[트랜잭션 프로듀서, 컨슈머 설정 방법]

 

** 트랜잭션 프로듀서
  . 설정
     - TRANSACTION_ID_CONFIG = UUID.randomUUID().toString() -> 유일한 값으로 설정해야 함
  . 트랜잭션 프로듀서를 설정하면 자동으로 멱등성 프로듀서가 설정된다.


** 트랜잭션 컨슈머
  . 설정
     - ISOLATION_LEVEL_CONFIG = "read_committed"
  . 프로듀서가 커밋하기 이전 레코드는 가져가지 않는다
  . 만약 기본 설정값(read_uncommitted)일 경우에는 커밋되지 않은 레코드도 모두 가져가서 처리한다.

 

 

[언제 트랜잭션 프로듀서와 트랜잭션 컨슈머를 사용해아 할까?]

 

** 여러 이벤트가 반드시 동시에 처리되어야 하는 경우

'~2022 > Apache Kafka' 카테고리의 다른 글

[Kafka] server.properties  (0) 2022.04.06
[Kafka] 기본 개념  (0) 2022.03.16
[Apache Kafka] 9. 컨슈머와 컨슈머 그룹  (0) 2021.11.11
[Apache Kafka] 8. 복제  (0) 2021.11.11
[Apache Kafka] 7. 프로듀서  (0) 2021.11.11

[ 컨슈머 ]

컨슈머는 파티션에서 레코드를 가져와(Subscribe) 데이터를 처리할 수 있습니다.

컨슈머를 역할에 따라 묶은 것을 컨슈머 그룹이라고 부릅니다. 컨슈머 그룹에는 1개 이상의 컨슈머가 있어야 합니다.

컨슈머 그룹은 아래와 같이 어떤 특정 토픽에 대해 데이터를 병렬 처리할 수 있습니다.

단, 아래와 같이 컨슈머가 파티션의 개수보다 많다고 해도 파티션은 최대 하나의 컨슈머에 할당 가능합니다.

[ 목적에 따른 컨슈머 그룹 ]

서로 다른 역할에 따라 컨슈머 그룹을 나누고 데이터를 처리하면, 커플링 약하게 동일한 데이터를 여러 번 처리할 수 있습니다. 둘중 하나의 컨슈머 그룹의 장애가 생기더라도 나머지 컨슈머 그룹이 영향 없이 데이터를 처리할 수 있는 것입니다.
** 엘라스틱 서치는 시간 순서대로 데이터를 검색하던가, 문자열 데이터로 데이터를 검색할 때 사용
** 하둡 적내는 데이터를 오랜 기간 안전하게 적재하기 위한 용도로 사용

 

[ 컨슈머의 오프셋 커밋 ]

 

컨슈머는 오프셋 커밋을 통해서 데이터를 어디까지 처리했는지 저장합니다. 예를들어 컨슈머가 파티션#0의 1번 오프셋 레코드를 처리 완료하면, 컨슈머는 "커밋"이라는 명령어를 통해 1번까지 처리했고, 다음 처리할 레코드는 2번이라는 것을 알 수 있습니다. 그리고 2번 오프셋 레코드를 처리하는 도중 이슈가 발생하면, 마지막 커밋이 된 오프셋이 1번임을 보고 2번 레코드부터 다시 처리를 시작합니다.

그러므로 컨슈머를 구현할 때에는 정말로 데이터 처리가 완료되었을 때에 오프셋 커밋을 해야 데이터의 유실 없이 안전하게 처리할 수 있습니다.

 

[ 어느 오프셋부터 가져갈까? ]


** 기본 설정은 auto.offset.reset = latest
  . 가장 최신 오프셋의 레코드부터 가져감
  . 컨슈머는 파티션#0의 5번 오프셋 레코드, 파티션#1의 3번 오프셋 레코드 부터 가져감

** auto.offset.reset = earlist 로 설정하면
  . 각 파티션의 오프셋 중 가장 낮은 숫자부터 가져감
  . 컨슈머는 파티션#0의 0번 오프셋 레코드, 파티션#1의 0번 오프셋 레코드 부터 가져감

이미 데이터들이 토픽에 들어와 있는 상태에서, 새로운 컨슈머를 실행시킨다면 auto.offset.reset = earlist 로 설정해야 모든 데이터를 다 처리할 수 있을 것입니다 ^^;

'~2022 > Apache Kafka' 카테고리의 다른 글

[Kafka] 기본 개념  (0) 2022.03.16
[Apache Kafka] 10. 프로듀서와 컨슈머의 트랜잭션  (0) 2021.11.11
[Apache Kafka] 8. 복제  (0) 2021.11.11
[Apache Kafka] 7. 프로듀서  (0) 2021.11.11
[Apache Kafka] 6. 카프카 클러스터  (0) 2021.11.11

[카프카 브로커 간의 데이터 복제]


카프카에서는 서버 단위로 데이터를 복제해서 데이터를 안전하게 보관합니다.
리더 파티션은 프로듀서나 컨슈머와 직접 데이터 통신을 하는 단위이고, 나머지 팔로워 파티션이 복제본입니다.

만약 리더 파티션이 있는 브로커 0번에 장애가 났다고 가정하면, 브로커 1번 혹은 2번에 있던 팔로워 파티션이 리더 파티션으로 승급됩니다.

 

 

[메타데이터로 리더 파티션을 구별한다]


프로듀서나 컨슈머와 같은 카프카 클라이언트는 직접 데이터를 주고받기 전에 메타데이터를 요청합니다. 이 작업을 통해 브로커가 가지고 있는 메타데이터 캐시를 응답값으로 돌려받습니다. 그럼 클라이언트는 어느 브로커로 데이터를 보내야 할지 알게 되고 통신을 수행하게 됩니다.

그런데 이런 메타데이터가 일부 상황에서 적절히 update되지 않는 오류가 발생할 때도 있습니다. 프로듀서나 컨슈머가 가지고 있는 메타데이터가 적절하게 update되지 않으면 리더 파티션이 존재하지 않는 브로커와 통신을 할 때도 있습니다. 이런 경우에는 NotLeaderForPartitionException이 발생합니다.

 

 

[ISR]

 

프로듀서가 데이터를 리더 파티션으로 보내고 나면 나머지 팔로워 파티션에 데이터가 복제되는데 이 때 모두 데이터가 복제되어 동기화된 상태를 ISR(In-Sync-Replicas)라고 부릅니다. 즉, 모든 리더 파티션의 데이터가 팔로워 파티션에 복제 완료 되었다는 뜻입니다.

 

데이터를 복제하는 데에는 지연이 발생하게 됩니다. 프로듀서는 리더 파티션에 데이터를 보내고 데이터를 저장하는 동안에 그리고 저장이 완료되는 시점에 팔로워 파티션들에는 그 데이터가 없습니다. 왜냐하면, 리더 파티션에 데이터가 안전하게 저장된 이후에 데이터를 팔로워 파티션에 복제하기 때문입니다.

 

즉, 어떤 시점에는 리더와 팔로워 간에 데이터가 동기화되지 않은 시점이 존재하고 그럴 때 리더 파티션에 장애가 발생하게 되면 팔로워 파티션이 리더로 승급할 때 일부 데이터가 유실될 수 있습니다.

 

[min.inssync.replicas]

 

토픽 단위로 설정 가능한 min.inssync.replicas 옵션은 프로듀서가 데이터를 보낼 때 얼마나 안정적으로 데이터를 보낼지 설정하는 값으로 사용됩니다. 참고로 이 값은 acks라는 옵션을 all로 설정했을 때만 유효합니다.

 

min.inssync.replicas 값이 2일 때는 데이터가 최소 리더 파티션과 팔로워 파티션 1개에 안전하게 데이터를 저장하는 것을 보장합니다. 때문에 리더 파티션에 데이터가 저장된 직후 데이터가 복제되지 않은 상태에서 브로커 0번에 장애가 발생하면, 프로듀서는 데이터가 정상적으로 보내지지 않았다는 것을 감지하고 다시 데이터를 보내게 됩니다.

 

min.inssync.replicas 값이 3일때는 리더 파티션과 팔로워 파티션 2개에 모두 데이터가 복제 완료되었을 때 데이터가 정상적으로 전송되었음을 보장합니다. 그리고 브로커 개수만큼 min.inssync.replicas 값을 설정하면 브로커가 한 대라도 죽으면 프로듀서는 더 이상 데이터를 보내지 않습니다. 때문에 min.inssync.replicas 값은 브로커 개수보다 작게 설정해야 합니다.

 

[unclean.leader.election.enable]

 

** unclean.leader.election.enable = true: 데이터 유실을 감안해도 지속 운영하겠다
** unclean.leader.election.enable = false: 데이터 유실을 절대 허용하지 않겠다

 

unclean.leader.election.enable은 만에하나 팔로워 파티션들이 ISR로 묶이지 못한 경우에 설정할 수 있는 값입니다.

브로커급 장애가 발생했을 때, 데이터의 유실을 감수하고서라도 데이터를 지속 처리할지 결정하는 옵션입니다.

 

unclean.leader.election.enable 값이 false인 경우 리더 파티션이 있는 브로커에 장애가 발생하면,

리더 파티션이 있는 브로커가 다시 정상화될때까지 영원히 기다리게 됩니다.

[프로듀서 내부 동작]

** ProducerRecord로 메시지 키, 값 초기화.
** Partitioner는 메시지 키 유무에 따라 어느 토픽으로 보낼 것인지 지정
** Partitioner가 파티션을 지정한 후 Accumulator는 최대한 배치로 데이터를 모으고 나서 Sender는 데이터를 클러스터로 발송

프로듀서는 데이터를 저장해야하는 토픽의 리더 파티션이 위치한 프로커와 직접 통신합니다

카프카 클라이언트들은 리더 파티션이 위치한 브로커와 직접 통신하기 전에 이미 연결되어 있는 브로커와 통신해서, 리더 파티션이 위치한 브로커의 정보를 가져오는 작업을 합니다.

 

 

[메시지 키가 없는 레코드를 발송하는 프로듀서]

 

** 프로듀서 = 특정 토픽 + 특정 파티션에 데이터를 전송하는 주체
** 프로듀서가 브로커로 데이터 전송을 요청하면 브로커는 파일시스템에 데이터를 저장
** 레코드가 어느 파티션에 저장되었는지 프로듀서는 브로커로부터 응답 받음


레코드를 정의할 때, 실제로 사용하는 데이터를 메시지 값에 저장하고 데이터를 보내면 토픽의 파티션들 중 하나에 데이터가 저장되는데, 메시지 키가 없는 레코드를 보내면 파티션들에 Round-Robin으로 데이터가 돌아가면서 저장됩니다.

** 메시지 키가 없는 레코드
  . 기본적으로 라운드 로빈 형태로 파티션에 데이터를 전송
** 메시지 키가 있는 레코드
  . 해쉬 형태로 변환하여 특정 파티션에 지정되어 저장됨

 

 

[기본 파티셔너]

 

** 기본 파티셔너는 2개를 제공함
  . UniformStickyPartitioner, RoundRobinPartitioner
  . 2.4.0부터는 파티셔너를 지정하지 않을 경우 UniformStickyPartitioner가 기본 설정됨

** UniformStickyPartitioner
  . 배치로 묶어 전송되어야 하는 데이터를 Accumulator에 최대한 묶어서 발송함(메시지 키가 없는 경우)
  . 이전의 RoundRobinPartitioner는 Accumulator의 배치와 무관하게 ProduceRecoder가 들어오는대로 모두 순회하면서 전송했기 때문에 성능이 느렸음(네트워크 사용량이 높음)
  . 메시지 키가 있는 경우에는? -> 메시지 키를 해시 값으로 변환하여 파티션을 지정

 

[언제 메시지 키를 넣어야 할까?]

 

** 기본 파티셔너를 사용하는 경우에 "동일 메시지 키는 동일 파티션으로 들어간다"
  . 처리해야 하는 데이터가 컨슈머에 분산되면 안되는 경우 (ex. 상태가 있는 처리를 할 때; Stateful)
  . 처리해야 하는 데이터가 ordering을 만족해야 하는 경우 (ex. 은행 데이터, 결제 데이터)

 

[메시지 키를 넣으면 안되는 경우]

 

** 충분히 분산될 만한 메시지 키가 아닌 경우
  . Ex) 레코드의 메시지 값(json)의 스키마 버전 -> 스키마 버전은 빈번히 바뀌는 값이 아니고 분산되지 않기 때문에 일부 컨슈머에는 데이터처리를 하지 않고 계속해서 쉴 수도 있습니다.

다양한 종류가 아닌 데이터에 메시지 키를 사용하면 데이터가 일부 파티션에만 몰릴 수 있습니다. 데이터가 일부 파티션에 몰리게 되면 일부 컨슈머만 데이터가 처리되기 때문에, 컨슈머 측이 데이터를 최대의 성능으로 처리할 수 없습니다.
 
** 파티션 개수가 빈번히 변할 경우
  . 파티션의 개수가 변하면서 메시지 키와 파티션의 할당이 변경됩니다.
  . 이로 인해서 메시지 키를 넣어서 예상하는 동작과 다르게 동작할 수도 있습니다.

 

[커스텀 파티셔너]

 

** 메시지 키 뿐만 아니라 메시지 값, 토픽 이름 등을 이용하여 파티션 지정 가능
** 카프카 클라이언트 라이브러리에서 제공하는 Partitioner 인터페이스 사용

기본 파티셔너 2개 외에 다른 파티셔너를 활용하고 싶다면, 카프카 클라이언트 라이브러리의 partitioner 인터페이스를 사용해서 커스텀 파티셔너를 개발, 프로듀서 옵션을 추가하면 됩니다. 커스텀 파티셔너를 사용하면 메시지 키 뿐만 아니라 메시지 값, 토픽 이름, 클러스터 이름을 토대로 파티셔너를 지정할 수 있습니다.

 

커스텀 파티셔너를 이용한 우선순위

커스텀 파티셔너를 이용하면 여타 메시징 브로커에서 사용하는 것 처럼 우선순위 큐를 만들 수도 있습니다.

위 그림처럼 일반 고객은 파티션 2개에 할당하고, VIP 고객은 파티션 4개에 할당하는 방식을 예로 들 수 있습니다.

1개의 파티션은 최대 1개의 컨슈머에 할당되는 특징을 사용하여 일반 고객 대비 VIP 고객의 데이타 처리를 2배 빠르게 할 수 있다는 장점이 있습니다.

[카프카 클러스터]

프로듀서, 컨슈머는 리더 파티션과 직접 통신해서 데이터를 처리하게 됩니다.

팔로워 파티션은 리더 파티션에 있는 데이터를 확인해서 자신에게 없는 데이터를 복제해갑니다.

파티션 = 3, 복제개수 = 3인 토픽 생성 시

리더 파티션의 위치가 Round-Robin 방식으로 적절히 분배되기 때문에 프로듀서, 컨슈머가 클러스터와 통신을 할 때 네트워크 비용이 각 서버(브로커)에 분산된다는 장점이 있습니다. 앞단에 로드 밸런서를 놓는 방식이 아니라 컨슈머, 프로듀서가 직접 분산해서 통신하는 것입니다. 

 

복제 개수가 늘어나면 데이터를 안전하게 저장할 수 있지만, 팔로워 파티션이 지속적으로 리더 파티션에 있는 데이터를 복제해야하기 때문에 브로커들 간의 네트워크 통신이 많아진다는 단점이 있습니다. 때문에 일반적인 경우에는 복제 개수를 2로 운영하는 것이 브로커의 리소스 관리에 좋습니다.

 

리더 파티션이 특정 브로커에 몰려있는 경우, 파티션 밸런싱을 맞추는 명령어 (kafka-reassign-partisions.sh)를 적용할 수 있습니다. 때문에 카프카 클러스터를 운영하다가 서버 관련 리소스 사용 비율이 특정 브로커에 몰려있다고 판단되면, 리더 파티션이 밀집되어 있지 않은지 확인하는 것이 중요합니다.


[1주키퍼 + 다중 카프카 클러스터]

 

카프카 클러스터를 상황에 따라 분리해서 데이터를 격리할 수 있습니다. 한 개의 주키퍼에 주문 로그, 데이터 분석, 웹 로그 등과 같이 목적에 따라 나누는 것도 좋은 방법일 수 있습니다. 하지만 주키퍼가 데이터를 많이 저장하고 통신량이 많아지면 병목 지점이 될 수 있다는 부분을 유의해야 합니다.

[ 카프카 브로커 ]


카프카에서 브로커는 카프카를 저장하는 공간이고 가장 핵심이 되는 애플리케이션입니다.


카프카 브로커를 실행하기 위해서 반드시 주키퍼가 필요합니다.

주키퍼는 카프카에서 사용되는 다양한 메타데이터를 저장하고 사용되는데 토픽의 파티션 위치, 컨슈머 그룹 이름과 같은 메타데이터가 저장되고 사용됩니다. 이러한 메타데이터는 카프카 브로커 운영을 위해 내부에서 사용될 뿐 아니라 프로듀서와 컨슈머가 토픽을 클러스터와 통신을 하는데 중요한 역할을 합니다.

** 카프카 브로커의 역할
  . 데이터 복제, 삭제, 컨트롤러, 코디네이터 등
** 주키퍼의 역할
  . 카프카를 운영하는데 필요한 메타데이터 저장
  . 토픽의 파티션 위치, 컨슈머 그룹 이름 등

 

프로듀서가 보낸 데이터는 브로커에 저장되는데, 이 브로커라고 불리는 애플리케이션(or 프로세스)는 서버 1대에 1개의 브로커로 실행됩니다. 만약 여러 브로커로 분산 운영하고 싶다면 서버를 여러 대 발급받아서 각 서버당 브로커 프로세스를 실행시켜서 클러스터로 운영하는 구조입니다.

(1개의 서버에 여러 브로커를 띄울 수 있지만, 서버의 장애의 대비해서 클러스터로 서버를 이중화/삼중화하는 것입니다.)

 

프로듀서가 보낸 데이터는 브로커 애플리케이션이 있는 서버의 파일 시스템에 저장합니다. 파일 시스템에 저장함으로써 데이터를 안전하게 저장하고, 브로커가 중단되더라도 다시 브로커를 실행시켜서 데이터를 사용할 수 있는 구조라고 볼 수 있습니다.

 

[브로커:데이터 복제]

카프카에서는 데이터를 안정적으로 분산 저장하기 위해, 브로커를 여러 대 운영합니다.

데이터를 분산 저장하기위해 카프카에서는 파티션을 복제해서 여러 서버에 분산 저장합니다.

이렇게 복제한 파티션은 리더 파티션과 팔로워 파티션으로 구성되어 있습니다.

 

** 파티션은 리더 파티션과 팔로워 파티션으로 구성되어 있습니다.
  . 리더: 프로듀서, 컨슈머와 직접 통신하는 브로커에 위치
           브로커로 전송된 데이터는 가장 처음 리더 파티션에 저장된다
  . 팔로워: 리더가 있는 브로커와 다른 브로커에 저장됨
              (팔로워 파티션을 구성하기 위해서는 브로커가 2개 이상인 클러스터를 운영해야 합니다)
              리더 파티션을 계속 복제하여 데이터를 분산 저장하는 용도

 

 

[브로커:그룹 코디네이터]


여러 브로커 중 한대는 코디네이터 역할을 하게 됩니다.

** 컨슈머 그룹 상태를 체크
  . 컨슈머에 장애가 발생하여 더이상 파티션이 소비되지 않을 경우 정상 컨슈머에 파티션 할당을 변경함(리밸런스)
** 파티션 변경 상태를 체크
  . 파티션이 변경되면 리밸런스
이런 상태에 따라 컨슈머와 파티션의 할당을 변경해주는 리밸런싱을 발생시키는 역할을 합니다.

코디네이터 역할을 하는 브로커는 토픽과 연계된 파티션과 컨슈머의 상태에 따라 컨슈머와 파티션의 할당을 변경해주는 리밸런싱을 일으켜 데이터가 지속적으로 처리되도록 도와줍니다.

 

 

[브로커:컨트롤러]

클러스터를 구성하는 브로커 중 한대는 컨트롤러 역할을 합니다.


클러스터에서 시작하는 첫 번째 브로커가 컨트롤러가 되고, 나머지 브로커들에게 파티션들에 대한 정보를 나눠가질 수 있도록 전송해서 프로듀서나 컨슈머가 정상적으로 브로커와 통신할 수 있도록 도와줍니다.


또한 컨트롤러 브로커는 리더 파티션을 선출합니다. 이는 데이터가 분산 저장되어 있을 때, 안전하게 데이터를 지속적으로 처리할 수 있도록 하는 데 핵심적인 역할을 합니다.

 

 

[주키퍼가 가지고 있는 메타데이터들]

 

** 컨트롤러 정보: /controller

** 브로커 ID와 토픽 정보들: /brokers/ids, /brokers/topics

** 토픽 설정 정보: /config
  . 이런 데이터는 직접 주키퍼 명령어들을 통해서 노드를 조회하고 확인할 수 있습니다.

** 필요할 경우 하나의 주키퍼로 다른 용도의 카프카 클러스터(브로커)를 운영할 수 있습니다.
  . /data/...
  . /marketing/...
  . 와 같이 root node의 이름을 다르게 만들고 지정해서 여러 클러스터를 한꺼번에 운영할 수 있음

 

주키퍼와의 통신으 브로커, 클러스터를 운영하는 데 데이터 통신이 지속적으로 일어나 운영상의 허들이 되어왔습니다.
그래서 카크파 클러스터를 운영할 때 주키퍼 없이 운영할 수 있도록 개발중에 있습니다(3.0에서는 주키퍼 삭제 예정)

 

토픽 이름은 영대소문자와 언더바, 대시, 마침표로 구성할 수 있는데 

이외에 브로커의 버전에 따라 불가능한 이름이 있을 수 있습니다. (마침표 하나, 마침표 2개 등..)

때문에 제약사항을 정확히 알고 싶다면 깃허브에 올라가있는 아파치 카프카 오픈소스를 받아서, 토픽 생성 관련 코드를 확인해보는 것이 좋습니다.

 

[토픽 이름을 어떻게 지정해야 할까?]

 

1) 어떤 개발환경이고, 어떤 데이터 타입이 사용되는지 유추 가능한 토픽 이름이 좋습니다.
토픽에 넣을 수 있는 데이터는 직렬화되어 들어갑니다. 즉, 들어갈 수 있는 데이터 타입이 무한하다고 볼 수 있습니다.

토픽에 들어가는 데이터가 JSON인지 POJO인지에 따라 컨슈머에서 어떻게 역직렬화해서 데이터를 읽을지 정해야 하기 때문에, 토픽에 들어가는 데이터 타입을 명확히 표시하기 위해 토픽 이름에 데이터 타입이 들어가는 것이 좋은 방법일 수 있습니다.

 

2) 만약 회사에서 공용 카프카 클러스터를 사용한다면, 여러 팀이 동시에 토픽을 만들고 사용할 수 있습니다.
이런 경우에 특정 토픽이 어느 서비스에서, 어떤 팀에서 사용되는 것인지 파악이 힘들 수 있는데
그런 경우에는 토픽의 오너십(ownership)을 가진 이름을 토픽 이름에 추가하는 것도 고려할 수 있습니다.

 

3) 카프카 클러스터를 2개 이상 운영(개발 환경, 상용 환경)하는 경우 굳이 동일한 이름의 토픽 이름을 만들기보단, 클러스터의 환경에 따라 토픽 이름을 다르게 넣어서 토픽이 위치한 환경을 명확히 파악하는 것이 좋습니다.

 

[좋은 토픽 이름을 짓는 방법]

 

1) 대소문자를 섞어 쓰지 않는 것이 좋습니다.
  . 휴먼 에러로 실수할 가능성이 높음
  . kafka-console-consumer, kafka-console-producer로  테스트를 위해 cli를 작성할 때 번거로움

2) 마침표(.)와 언더바(_)를 섞어 쓰지 않는 것이 좋습니다.
  . 마침표와 언더바를 섞어쓰면 카프카에서 사용하는 메트릭 이름과 충돌이 일어날 수 있음

3) 카프카 클러스터 운영자는 규칙을 정하고 전파하는 것이 좋습니다.

** 좋은 토픽 이름 예시
  . advertise-sender-notification-prod
  . android-application-sms-prod

  . data-jira1234-dev

+ Recent posts