Producer는 Leader만 Write하고 Consumer는 Leader로부터만 Read함 Follower는 Broker 장애시 안정성을 제공하기 위해서만 존재
Follower는 Leader의 Commit Log에서 데이터를 가져오기 위해 요청(Fetch Request)으로 복제
Leader 장애 -> 새로운 Leader를 선출
Kafka 클러스터는 Follower 중에서 새로운 Leader를 선출 Client(Producer/Consumer)는 자동으로 새 Leader로 전환
Partition Leader에 대한 자동 분산: Hot Spot 방지
auto.leader.rebalance.enable: 기본값 enable leader.imbalance.check.interval.seconds: 기본값 300 sec . 300초마다 leader 분산에 불균형 여부 check leader.imbalnce.per.broker.percentage: 기본값 10 . 다른 브로커보다 10% 이상 더 많이 가져가면 불균형으로 판단
Rack Awareness: Rack간 분산하여 Rack 장애를 대비
동일한 Rack 혹은 Available Zone상의 Broker들에 동일한 "rack name" 지정 복제본(Replica-Leader/Follower)은 최대한 Rack 간에 균형을 유지하여 Rack 장애 대비 Topic 생성시 또는 Auto Data Balancer/Self Balancing Cluster 동작 때만 실행
Kafka Broker는 Partition에 대한 Read 및 Write를 관리하는 소프트웨어 . Kafka Server라고 부르기도 함 . Topic 내의 Partition들을 분산, 유지 및 관리 . 각각의 Broker들은 ID로 식별됨(ID는 숫자) . Topic의 일부 Partition들을 포함 -> Topic 데이터의 일부분(Partition)을 갖을 뿐 데이터 전체를 갖고 있지 않음 . Kafka Cluster: 여러 대의 Broker들로 구성됨 . Client는 특정 Broker에 연결하면 전체 클러스터에 연결됨 . 최소 3대 이상의 Broker를 하나의 Cluster로 구성해야 함 -> 4대 이상을 권장함
Zookeeper: Broker를 관리
Zookeeper는 Broker를 관리 (Broker 들의 목록/설정을 관리)하는 소프트웨어
. 변경사항에 대해 Kafka에게 알림: 토픽 생성/제거, Broker 추가/제거 등
. Zookeeper는 홀수의 서버로 작동하게 설계되어 있음(최소 3, 권장 5)
. Zookeeper에는 Leader(writes)가 있고 나머지 서버는 Follower(reads)
Zookeeper 아키텍쳐: Leader/Follower 기반 Mater/Slave 아키텍쳐
Zookeeper는 분산형 Configuration 정보 유지, 분산 동기화 서비스를 제공하고 대용량 분산 시스템을 위한 네이밍 레지스트리를 제공하는 소프트웨어
분산 작업을 제어하기 위한 Tree 형태의 저장소 -> Zookeeper를 사용하여 멀티 Kafka Broker들 간의 정보(변경 사항 포함) 공유, 동기화 등을 수행
.변경사항에 대해 Kafka에게 알림: 토픽 생성/제거, Broker 추가/제거 등
. Zookeeper는 홀수의 서버로 작동하게 설계되어 있음(최소 3, 권장 5)
. Zookeeper에는 Leader(writes)가 있고 나머지 서버는 Follower(reads)
Zookeeper Failover: Quorum 알고리즘 기반
Quorum(쿼럼)은 "정족수"이며, 합의체가 의사를 진행시키거나 의결을 하는데 필요한 최소 인원을 뜻함
분산 코디네이션 환경에서 예상치 못한 장애가 발생해도 분산 시스템의 일관성을 유지시키기 위해서 사용
Producer: 메시지를 생산해서 Kafka의 Topic으로 메시지를 보내는 애플리케이션
Consumer: Topic의 메시지를 가져와서 소비하는 애플리케이션
ConsumerGroup: Topic의 메시지를 사용하기 위해 협력하는 Consumer들의 집합
. 하나의 Consumer는 하나의 Consumer Group에 포함되며, Consumer Group 내의 Consumer들은 협력하여 Topic의 메시지를 분산 병렬 처리함
Producer와 Consumer의 분리(Decoupling)
. Producer와 Consumer는 서로 알지 못하며, 각각 고유의 속도로 Commit Log에 Write 및 Read를 수행
. 다른 Consumer Group에 속한 Consumer들은 서로 관련이 없으며, Commit Log에 있는 Event를 동시에 다른 위치에서 Read할 수 있음
Kafka Commit Log
Commit Log
. 추가만 가능하고 변경 불가능한 데이터 스트럭쳐
Offset . Commit Log에서 Event의 위치
Kafka Offset: Commit Log에서 Event의 위치
Producer가 Write하는 LOG-END-OFFSET과 Consumer Group의 Consumer가 Read하고 처리한 후에 Commit한 CURRENT-OFFSET과의 차이(Consumer Lag)가 발생할 수 있음
Topic, Partition, Segment: Logical View
Topic: Kafka 안에서 메시지가 저장되는 장소, 논리적인 표현
Partition . Commit Log, 하나의 Topic은 하나 이상의 Partition으로 구성 . 병렬 처리(Throughput 향상)를 위해서 다수의 Partition을 사용
Segment . 메시지(데이터)가 저장되는 실제 물리 File
. Segment File이 지저된 크기보다 크거나 지정된 기간보다 오래되면 새 파일이 열리고 메시지는 새 파일에 추가됨
Topic, Partition, Segment: Physical View
Topic 생성 시 Partition 개수를 지정하고, 각 Partition은 Broker들에 분산되며 Segment File들로 구성됨 Rolling Strategy: 특정 용량이나 시간에 따라 Segment 파일을 분리 . log.segment.bytes(default 1GB) . log.roll.hours(default 168 hours)
파일을 Rolling하여 분리/생성
Active Segment: Partition당 하나의 Active Segment
Partition당 오직 하나의 Segment가 활성화(Active) 되어 있음 -> 데이터가 계속 쓰여지고 있는 중
Topic, Partition, Segment의 특징
Topic 생성 시 Partition 개수를 지정 - 개수 변경 가능하나 운영 시에는 변경 권장하지 않음
Partition 번호는 0부터 시작하고 오름차순
Topic 내의 Partition들은 서로 독립적임
Event(Message)의 위치를 나타내는 Offset 이 존재
Offset은 하나의 Partition에서만 의미를 가짐 - Partition 0 의 offset 1은 Partition 1의 offset 1과 연관 없음
Offset값은 계속 증가하고 0으로 돌아가지 않음
Event(Message)의 순서는 하나의 Partition 내에서만 보장
Partition에 저장된 데이터(Message)는 변경이 불가능(Immutable)
Partition에 Write되는 데이터는 맨 끝에 추가되어 저장됨
Partition은 Segment File들로 구성됨 - Rolling 정책: log.segment.bytes, log.roll.hours
Orderform 정보(주문자 Id, 주문한 Item Id, 주문 수량)를 가지고 Order를 생성하는 과정은 아래와 같다.
1) Member Id로 주문자 정보를 조회 → 주문자(회원) 정보가 없으면 Exception 2) Item Id로 상품 정보를 조회 → Item 정보가 없으면 Exeption 3) Member.address 정보로 Delivery(배송정보) 생성 4) 주문 수량과 Item.price 정보로 OrderItem(주문상품 정보) 생성 5) Order 생성 → 이 때 주문 수량만큼 재고를 줄이는데, 재고가 부족하면 Exception
위 프로세스에서 밑줄 친 부분은 예외 case에 해당한다.
아래 코드는, 위 과정에서 발생할 수 있는 Exception에 대한 처리가 되어있지 않은 상태이다.
Exception이 발생하면 Camel Route가 Exception이 발생한 시점에 동작을 멈추게 되고, 당연히 DB가 Update되지 않는다.
from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
.process(new Processor() {
@Overridepublic void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
System.out.println("[message header] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
Orderform orderform = message.getBody(Orderform.class);
// Message TransformationLong memberId = orderform.getMemberId();
Long itemId = orderform.getItemId();
Long orderCount = orderform.getOrderCount();
Map<String, Object> orderFormMap = Map.of(
"memberId", memberId,"itemId", itemId, "orderCount", orderCount
);
message.setHeaders(orderFormMap);
System.out.println("[message header(changed)] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
}
})
// Search Member Info. (주문 회원 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info. (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}" )
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품의 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.log("--- ${body} ---");
예를 들어 Make Order(주문 Entity 생성) 부분에서 재고가 없는 Exception이 발생할 경우, 위에 정의된 Camel Route를 따라 processing이 진행되는 중에 NotEnoughStockExeption이 발생한다.
//package org.acme.routes.beans;publicvoidmakeOrder(Message message){
Map<String, Object> orderFormMap = message.getHeaders();
Long orderCount = (Long) orderFormMap.get("orderCount");
OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
Order order = Order.createOrder(this.member, this.delivery, orderItem);
message.setBody(order);
}
//package org.acme.domain;publicstatic OrderItem createOrderItem(Item item, int orderPrice, int count){
OrderItem orderItem = new OrderItem();
orderItem.setItem(item);
orderItem.setOrderPrice(orderPrice);
orderItem.setCount(count);
item.removeStock(count);
return orderItem;
}
//package org.acme.domain.Item;publicvoidremoveStock(int quantity){
int restStock = this.stockQuantity - quantity;
if (restStock < 0) {
thrownew NotEnoughStockException("need more stock");
}
this.stockQuantity = restStock;
}
Camel onException() 메소드를 사용한 Exception Handling
아래 예시와 같이 onException() 메소드를 사용해서 간단히 Exception Handling이 가능하다.
단, onException() 메소드는 해당 라우트에서 onException 구문이 나오기 전까지 발생한 exception만 handling하므로,
특정 Exception이 발생할 것으로 예상되는 DSL문 다음에 사용해야 한다.
from("kafka:orders?brokers=localhost:9092&valueDeserializer=org.acme.json.OrderformDeserializer")
.process(new Processor() {
@Overridepublicvoidprocess(Exchange exchange)throws Exception {
Message message = exchange.getIn();
System.out.println("[message header] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
Orderform orderform = message.getBody(Orderform.class);
// Message Transformation
Long memberId = orderform.getMemberId();
Long itemId = orderform.getItemId();
Long orderCount = orderform.getOrderCount();
Map<String, Object> orderFormMap = Map.of(
"memberId", memberId, "itemId", itemId, "orderCount", orderCount
);
message.setHeaders(orderFormMap);
System.out.println("[message header(changed)] -- " + message.getHeaders());
System.out.println("[message body] -- " + message.getBody());
}
})
// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.onException(NotEnoughStockException.class)
.transform(simple("exception"))
.to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/xxx...");
Camel Branch 구문(choice ~ when)을 사용한 예외 처리
아래와 같이 camel branch를 활용하여 exception을 처리할 수도 있다.
일반적으로 message body나 header의 내용에 따라 분기를 나눌 것이다.
// Search Member Info. (주문자 정보를 DB에서 select)
.toD("jpa://" + Member.class.getName() + "?query=select b from " + Member.class.getName() + " b where b.id = ${header.memberId}")
.to("bean:orderProcessingBean?method=checkMember")
// Search Item Info (주문상품 정보를 DB에서 select)
.toD("jpa://" + Item.class.getName() + "?query=select b from " + Item.class.getName() + " b where b.id = ${header.itemId}")
.to("bean:orderProcessingBean?method=checkItem")
// Make Order (주문 회원, 배송정보, 주문상품 정보로 실제 주문 엔티티를 생성)
.to("bean:orderProcessingBean?method=makeOrder")
// JPA persist (주문 정보를 DB에 저장, 연관 table들이 같이 update됨)
.choice()
.when(simple("${body} is 'org.acme.domain.Order'"))
.to("jpa://" + Order.class.getName() + "?usePersist=true")
.otherwise() // Exception이 발생한 경우, message body가 Order type이 아니므로 slack으로 메시지 송신
.to("slack:@hslee09?webhookUrl=https://hooks.slack.com/services/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");
Camel Route에서 message header나 body의 내용을 보고 Exception 여부를 확인할 수 있도록, processing bean에서 exception 처리를 해준다
// package org.acme.routes.beans;publicvoidmakeOrder(Message message){
try {
Map<String, Object> orderFormMap = message.getHeaders();
Long orderCount = (Long) orderFormMap.get("orderCount");
OrderItem orderItem = OrderItem.createOrderItem(this.item, this.item.getPrice(), orderCount.intValue());
Order order = Order.createOrder(this.member, this.delivery, orderItem);
// 정상적으로 order을 생성한 경우, message body는 order 객체가 들어가게 된다
message.setBody(order);
} catch (RuntimeException e) {
// 예외가 발생한 경우, message body를 "exception" string으로 transform 한다
message.setBody("exception");
}
}
** quarkus.hibernate-orm.database.generation.create-schemas Find the class with @Entity annotation and create a DDL statement at the start of the server and apply it to the DB.
Quarkus has built-in support for JSON serialization and deserialization based on Jackson. It will alsogeneratethe serializer and deserializer for you, so you do not have to configure anything about 'mp.messaging.outgoing.orderform-out.value.serializer'
When you use Reactive Messaging, you send messages to a channel and receive them from another channel. These channels are mapped to the underlying messaging technology by configuration. In our application, we must indicate that our reception and publication channels will use themoviesKafka channel
#Kafka Server
kafka.bootstrap.servers=localhost:9092
#Kafka Topic Name
kafka.topic.name=employees
# Configuring the incoming channel (reading from Kafka)
mp.messaging.incoming.employees-in.connector=smallrye-kafka
mp.messaging.incoming.employees-in.topic=employees
mp.messaging.incoming.employees-in.key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.employees-in.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Configuring the outgoing channel (writing to Kafka)
mp.messaging.outgoing.employees-out.connector=smallrye-kafka
mp.messaging.outgoing.employees-out.topic=employees
mp.messaging.outgoing.employees-out.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.employees-out.value.serializer=org.apache.kafka.common.serialization.StringSerializer
No configuration required in case of String type(default configuration), but in Integer Type, data will be lost if the serial/deserializer is not set properly. In case of Object Type (JSON), the serial/deserializer built into Quarkus automatically operates, so there is no need to set the key/value type.
String Type의 경우 Default이기 때문에 설정이 필요 없으나, Integer Type으로 key나 value를 consume/produce 할 경우 serializer를 알맞게 설정하지 않으면 data가 소실된다. Object Type(JSON)의 경우 Quarkus에 built-in 된 serial/deserializer가 자동으로 동작하게 되어 key/value type 설정이 필요 없다.
Publishing to Kafka
Create theorg.acme.domain.Employeeclass with the following content:
In this class, we inject anEmitter, i.e., an object responsible for sending a message to a channel. This emitter is attached to themovies-outchannel (and so will send messages to Kafka).
So, the rest of our application can simply use thesendMovieToKafkamethod to send a Employee info. to our Kafka topic.
Confluent Kafka . Community License: 소프트웨어 수정, 재배포 가능하지만 Saas 형태로 서비스 제공하는 것은 금지됨 . Enterprise License: 연간 구독형
데이터 파이프라인(Data Pipeline)이란?
중간에 사람의 개입 없이 데이터를 오염, 중복, 유실과 같은 결함 없이 수집, 저장, ETL(Extract, Transform, Load)이 가능하도록 일련의 흐름을 만들어 주는 과정
Event는 비즈니스에서 일어나는 모든 일(데이터)를 의미
. 웹사이트에서 무언가를 클릭하는 것
. 청구서 발행
. 송금
. 배송 물건의 위치 정보
. 택시의 GPS 좌표
. 센서의 온도/압력 데이터
Event Stream은 연속적인 많은 이벤트들의 흐름
. BigData의 특징을 가짐
. 비즈니스의 모든 영역에서 광범위하게 발생
. 대용량의 데이터(Big Data 발생)
Apache Kafka의 3가지 주요 특징
. 이벤트 스트림을 안전하게 전송: Publish & Subscribe
. 이벤트 스트림을 디스크에 저장: Write to Disk
. 이벤트 스트림을 분석 및 처리: Processing & Analysis
Apache Kafka의 사용 사례: Event(메시지/데이터)가 사용되는 모든 곳
. Messaging Syetem . IOT 디바이스로부터 데이터 수집 . 애플리케이션에서 발생하는 로그 수집 . Realtime Event Stream Processing (Fraud Detection, 이상 감지 등) . DB 동기화(MSA 기반의 분리된 DB간 동기화) . 실시간 ETL(Extract-Transform-Loda) . Spark, Flink, Strom, Hadoop과 같은 빅데이터 기술과 같이 사용
요약
. Apache Kafka는 흐르는 데이터를 처리하기 위한 플랫폼(Event-Streaming Platform)