정리
이번 챕터에서는 실습용 프로듀서 애플리케이션을 설치하고, 토픽 생성 및 데이터를 전송, 수신하는 기능 실습을 진행해 봤습니다.
또한 동기, 비동기적으로 커밋을 하는 방법과 차이점을 살펴봤습니다.
아파치 카프카 애플리케이션 프로그래밍 With Java 책을 학습한 내용을 공유합니다.
✍️ Kafka Producer Application을 생성하고 실행까지 진행해보겠습니다.
실습환경
프로듀서를 구현하기 위해 카프카 클라이언트 라이브러리를 추가합니다.
dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
implementation 'org.apache.kafka:kafka-clients:2.5.0' // add
implementation 'org.slf4j:slf4j-simple:1.7.30' // add
}
프로듀서 애플리케이션을 개발하기 위해 프로듀서 클래스를 추가합니다.
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test2";
private final static String BOOTSTRAP_SERVERS = "{Your Public Host IP}";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
코드 설명은 아래와 같습니다.
카프카 프로듀서 애플리케이션을 실행하기 전에 전송될 토픽을 생성합니다.
$ bin/kafka-topics.sh --bootstrap-server {Your Public Host IP}:9092 --create \
--topic test2 \
--partitions 3
토픽 생성이 완료되었고 프로듀서는 데이터를 전송할 준비가 완료되었습니다. main 함수를 실행하면 아래와 같은 로그가 출력 됩니다. Properties에 설정한 옵션들이 정상적으로 적용된 것을 확인할 수 있습니다.
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
... 생략
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1705990504361
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BTsvMCdmToWRUbJocIgOXg
[main] INFO com.example.SimpleProducer - ProducerRecord(topic=test2, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
프로듀서 애플리케이션이 정상적으로 토픽에 데이터를 전송했는지 확인하기 위해 kafka-console-consumer 명령으로
확인할 수 있습니다. --from-beginning 옵션을 추가로 넣어서 확인하면 토픽에 있는 모든 레코드를 확인할 수 있습니다.
$ bin/kafka-console-consumer.sh --bootstrap-server {Your Public Host IP}:9092 --topic test2 \
--from-beginning
- 메시지 정상 출력
testMessage
프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거칩니다. 전송하고자 하는 데이터를 ProducerRecord 클래스를 통해 필수 파라미터인 토픽과 메시지 값만 설정하여 인스턴스를 생성했습니다. 하지만 때에 따라서 파티션 번호를 직접 지정하거나 타임스탬프 설정, 메시지 키를 설정할 수도 있습니다.
KafkaProducer 인스턴스가 send() 메서드를 호출하면 ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해집니다. 파티셔너를 따로 지정하지 않고 KafkaProducer 인스턴스를 생성하게 되면 기본값인 DefaultPartitioner로 설정 되어 파티션이 정해집니다. 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터(accumulator)에 데이터를 버퍼로 쌓아놓고 발송합니다. 버퍼로 쌓인 데이터는 배치로 묶어서 전송함으로써 카프카 프로듀서 처리량을 향상시키는 데에 상당한 도움을 줍니다.
프로듀서 API를 사용하면 ‘UniformStickyPartitioner’와 ‘RoundRobinPartitioner’ 2개 파티션을 제공합니다. 카프카 클라이언트 2.4.0 이전에는 RoundRobinPartitioner가 기본 파티셔너로 설정 되어 있었는데, 2.5.0 버전에선 RoundRobinPartitioner의 단점을 개선한 UniformStickyPartitioner이 기본 파티셔너로 설정 되어 있습니다. UniformStickyPartitioner는 어큐뮬레이터에서 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터는 모두 동일한 파티션에 전송함으로써 RoundRobinPartitioner에 비해 향상된 성능을 가지게 되었습니다.
프로듀서 애플리케이션을 실행할 때 설정해야 하는 필수 옵션과 선택 옵션이 있습니다.
프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 할 때가 있습니다. 예를 들어 Pangyo라는 값을 가진 메시지 키가 0번 파티션으로 들어가야한다고 가정합시다. 기본 설정 파티셔너를 사용할 경우 메시지 키의 해시값을 파티션에 매칭하여 데이터를 전송하므로 어느 파티션에 들어갈지 알 수 없습니다. 이 때 Partitioner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하면 해당 키에 대해서 무조건 파티션 0번으로 지정하도록 설정할 수 있습니다. 토픽의 파티션 개수가 변경되더라도 고정됩니다.
CustomPartitioner.class
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
throw new InvalidRecordException("Need message key");
}
if (((String)key).equals("Pangyo")) {
return 0;
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
코드 설명은 아래와 같습니다.
커스텀 파티셔너를 지정한 경우 ProducerConfig의 PARTITIONER_CLASS_CONFIG 옵션을 사용자 생성 파티셔너로 설정하여 KafkaProducer 인스턴스를 생성해야 합니다. SimpleProducer.class 설정에 추가합니다.
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
KafkaProducer의 send() 메서드는 Future 객체를 반환합니다. 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재 되었는지에 대한 데이터가 포함되어 있습니다. 다음 코드와 같이 get() 메서드를 사용하면 데이터의 결과를 동기적으로 가져올 수 있습니다.
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info("{}", metadata.toString());
main 함수를 실행한 결과, 정상적으로 RecordMetadata 인스턴스를 반환하는 것을 확인할 수 있습니다. 또한
test2 토픽의 0번째 파티션에 적재 되었으며 해당 레코드에 부여된 오프셋 번호는 1번인 것을 확인할 수 있습니다.
[main] INFO com.example.SimpleProducer - test2-0@1
그러나 동기방식으로 프로듀서의 전송 결과를 확인하는 것은 빠른 전송에 허들이 될 수 있습니다. 따라서 이를 원하지 않는 경우를 위해 비동기로 결과를 확인할 수 있도록 Callback 인터페이스를 제공하고 있습니다. 해당 인터페이스를 이용하여
사용자 정의 Callback 클래스를 생성한 뒤, 레코드의 전송 결과에 대응하는 로직을 구현해보겠습니다.
public class ProducerCallback implements Callback {
private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
logger.error(e.getMessage(), e);
} else {
logger.info(metadata.toString());
}
}
}
onCompletion 메서드는 레코드의 비동기 결과를 받기 위해 사용합니다. 위 코드에서는 만약 브로커 적재에 이슈가 생겼을 경우 Exception에 어떤 에러가 발생하였는지 담겨서 메서드가 실행됩니다. 에러가 발생하지 않았을 경우에는 RecordMetadata를 통해 해당 레코드가 적재된 토픽 이름과 파티션 번호, 오프셋을 알 수 있습니다.
// async
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record, new ProducerCallback()); // this
SimpleProducer에서 위 코드로 수정이 필요합니다. 레코드 전송 후 비동기로 결과를 받기 위해서는 send() 메서드 호출 시 ProducerRecord 객체와 함께 사용자 정의 Callback 클래스를 넣으면 됩니다. 비동기로 결과를 받을 경우 동기로 결과를 받는 경우보다 더 빠른 속도로 데이터를 추가 처리할 수 있지만 전송하는 데이터의 순서가 중요한 경우 사용하면 안됩니다. 비동기로 결과를 기다리는동안 다음으로 보낼 데이터의 전송이 성공하고 앞서 보낸 데이터의 결과가 실패할 경우 재전송으로 인해 데이터 순서가 역전될 수 있기 때문입니다. 그러므로 데이터의 순서가 중요하다면 동기로 결과를 받아야 합니다.
프로듀서가 전송한 데이터는 카프카 브로커에 적재됩니다. 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 합니다. 예를 들어, 마케팅 문자를 고객에게 보내는 기능이 있다면 컨슈머는 토픽으로부터 고객 데이터를 가져와서 문자 발송 처리를 하게 됩니다.
기본 설정으로 생성할 수 있는 오토 커밋(auto commit) 카프카 컨슈머 애플리케이션을 만들어보겠습니다. 프로젝트 설정은 별도 추가할 것은 없습니다.
public class SimpleConsumer {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
private final static String TOPIC_NAME = "test2";
private final static String BOOTSTRAP_SERVERS = "{Your Public Host IP}:9092";
private final static String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
logger.info("{}", record);
});
}
}
}
위 코드에서 컨슈머 그룹 이름을 선언한 이유는 컨슈머 그룹을 통해 컨슈머의 목적을 구분할 수 있기 때문입니다. 예를 들어, email 발송 처리를 하는 애플리케이션이라면 email-application-group으로 저장하여 동일한 역할을 하는 컨슈머를 묶어 관리할 수 있습니다.
$ bin/kafka-console-producer.sh --bootstrap-server {Your Public Host IP}:9092 \
--topic test2
> test
> testmessss
> message2
테스트를 위해 test2 토픽에 kafka-console-producer 명령으로 데이터를 넣겠습니다.
위 로그를 확인했을때 정상적으로 test / rtestmessss / message2 Record를 Poll 하는것을 확인할 수 있습니다.
이처럼 컨슈머 그룹을 나누는 것은 중요한 포인트입니다. 그 예시로 서버의 주요 리소스인 CPU, 메모리 정보를 수집하는 데이터 파이프라
을 구축한다고 가정해봅시다. 실시간 리소스를 시간순으로 확인하기 위해서 데이터를 ElasticSearch에 저장하고 Hadoop에 적재합니다.
이렇게 동기적으로 실행이 되는 ElasticSearch 혹은 Hadoop에 장애가 발생한다면 더는 적재가 불가능할 수도 있습니다. 반면, 카프카는
최종 적재되는 저장소의 장애에 유연하게 대응할 수 있도록 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저
장소의 장애에 격리되어 운영할 수 있습니다.
만약 컨슈머 그룹의 컨슈머에 장애가 발생하면 어떻게 될까요? 이런 경우엔 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어가게 됩니다. 이러한 과정을 ‘리밸런싱(rebalancing)’이라고 부릅니다. 리밸런싱은 크게 두 가지 상황에서 일어나는데, 첫 번째는 컨슈머가 추가되는 상황이고 두 번째는 컨슈머가 제외되는 상황입니다. 이를 위해 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해놔야 합니다.
컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋(commit)을 통해 기록합니다. 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽(__consumer_offsets)에 기록됩니다.
컨슈머 동작 이슈가 발생하여 __consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있습니다. 그러므로 오프셋 커밋을 정상적으로 처리했는지 검증해야만 합니다.
오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있습니다. 기본 옵션은 poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 설정되어 있습니다. 이렇게 일정 간격마다 자동으로 커밋되는 것을 ‘비명시 오프셋 커밋’ 이라고 합니다. poll() 메서드가 auto.commit.interval.ms에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋합니다. 편리하지만 poll() 메서드 호출 이후에 리밸런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있는 취약한 구조를 가지고 있습니다.
명시적으로 오프셋을 커밋하려면 poll() 메서드 호출 이후에 반환받은 데이터의 처리가 완료 되고 commitSync() 메서드를 호출하면 됩니다. commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행합니다. 다만 브로커에 커밋 요청을 하고 정상적으로 처리되었는지 응답하기까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼칩니다. 이를 해결하기 위해 commitAsync() 메서드를 사용하여 비동기 커밋 요청을 할 수 있습니다. 하지만 커밋 요청이 실패한 경우 현재 처리 중인 데이터의 순서를 보장하지 않고 데이터의 중복 처리가 발생할 수 있습니다.
poll() 메서드가 호출된 이후에 commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있습니다.
이번 챕터에서는 실습용 프로듀서 애플리케이션을 설치하고, 토픽 생성 및 데이터를 전송, 수신하는 기능 실습을 진행해 봤습니다.
또한 동기, 비동기적으로 커밋을 하는 방법과 차이점을 살펴봤습니다.
[Kafka] Topics, Producer, Consumer 설치 및 실행 (0) | 2024.02.14 |
---|---|
[Kafka] EC2에 카프카 브로커, 주키퍼 설치 및 실행 (2) | 2024.02.14 |