ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] 프로듀서 애플리케이션 생성 및 실행
    Apache Kafka 2024. 4. 20. 15:40
    반응형

     

        

     

    아파치 카프카 애플리케이션 프로그래밍 With Java 책을 학습한 내용을 공유합니다.

     

    ✍️ Kafka Producer Application을 생성하고 실행까지 진행해보겠습니다.


     실습환경

    • Inteliij
    • Java 17
    • Gradle

     

     

    kafka-client Dependency 추가

    프로듀서를 구현하기 위해 카프카 클라이언트 라이브러리를 추가합니다.

    dependencies {
        testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
        testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
        implementation 'org.apache.kafka:kafka-clients:2.5.0' // add
        implementation 'org.slf4j:slf4j-simple:1.7.30'        // add
    }

     

    SimpleProducer.class 추가

    프로듀서 애플리케이션을 개발하기 위해 프로듀서 클래스를 추가합니다.

    public class SimpleProducer {
        private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
        private final static String TOPIC_NAME = "test2";
        private final static String BOOTSTRAP_SERVERS = "{Your Public Host IP}";
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            
            KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
            String messageValue = "testMessage";
            
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
            producer.send(record);
            logger.info("{}", record);
            
            producer.flush();
            producer.close();
        }
    }

     

     

     

    코드 설명은 아래와 같습니다.

    1. PropertiesKafkaProducer의 생성 파라미터로 추가하여 인스턴스를 생성합니다.
    2. 카프카 브로커로 데이터를 보내기 위해 ProducerRecord를 생성합니다. 예제에서는 토픽 이름과 메시지 값만 선언했기 때문에 메시지 키는 null로 설정되어 전송됩니다.
    3. 생성한 ProducerRecord를 전송하기 위해 record를 파라미터로 가지는 send() 메서드를 호출합니다. 메서드를 호출한다고 즉각 전송되는 것이 아니라, record를 프로듀서 내부에 가지고 있다가 flush()를 통해 프로듀서 내부 버퍼에 가지고 있던 레코드 배치를 브로커로 전송합니다. 이를 ‘배치 전송’ 이라고 부르기도 합니다.
    4. 애플리케이션을 종료하기 전에 close() 메서드를 호출하여 producer 인스턴스의 리소스들을 안전하게 종료합니다.

    토픽 생성

    카프카 프로듀서 애플리케이션을 실행하기 전에 전송될 토픽을 생성합니다.

    $ bin/kafka-topics.sh --bootstrap-server {Your Public Host IP}:9092 --create \
      --topic test2 \
      --partitions 3

     

     

    토픽 생성이 완료되었고 프로듀서는 데이터를 전송할 준비가 완료되었습니다. main 함수를 실행하면 아래와 같은 로그가 출력 됩니다. Properties에 설정한 옵션들이 정상적으로 적용된 것을 확인할 수 있습니다.

    [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    	acks = 1
    	batch.size = 16384
    ... 생략
    
    [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
    [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
    [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1705990504361
    [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BTsvMCdmToWRUbJocIgOXg
    [main] INFO com.example.SimpleProducer - ProducerRecord(topic=test2, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)
    [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

     

     

     

    프로듀서 애플리케이션이 정상적으로 토픽에 데이터를 전송했는지 확인하기 위해 kafka-console-consumer 명령으로
    확인할 수 있습니다. --from-beginning 옵션을 추가로 넣어서 확인하면 토픽에 있는 모든 레코드를 확인할 수 있습니다.

    $ bin/kafka-console-consumer.sh --bootstrap-server {Your Public Host IP}:9092 --topic test2 \
      --from-beginning
    - 메시지 정상 출력
    testMessage

     


    프로듀서 중요 개념

    프로듀서는 카프카 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거칩니다. 전송하고자 하는 데이터를 ProducerRecord 클래스를 통해 필수 파라미터인 토픽메시지 값만 설정하여 인스턴스를 생성했습니다. 하지만 때에 따라서 파티션 번호를 직접 지정하거나 타임스탬프 설정, 메시지 키를 설정할 수도 있습니다.

    KafkaProducer 인스턴스가 send() 메서드를 호출하면 ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해집니다. 파티셔너를 따로 지정하지 않고 KafkaProducer 인스턴스를 생성하게 되면 기본값인 DefaultPartitioner로 설정 되어 파티션이 정해집니다. 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 어큐뮬레이터(accumulator)에 데이터를 버퍼로 쌓아놓고 발송합니다. 버퍼로 쌓인 데이터는 배치로 묶어서 전송함으로써 카프카 프로듀서 처리량을 향상시키는 데에 상당한 도움을 줍니다.

    프로듀서 API를 사용하면 ‘UniformStickyPartitioner’와 ‘RoundRobinPartitioner’ 2개 파티션을 제공합니다. 카프카 클라이언트 2.4.0 이전에는 RoundRobinPartitioner가 기본 파티셔너로 설정 되어 있었는데, 2.5.0 버전에선 RoundRobinPartitioner의 단점을 개선한 UniformStickyPartitioner이 기본 파티셔너로 설정 되어 있습니다. UniformStickyPartitioner는 어큐뮬레이터에서 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터는 모두 동일한 파티션에 전송함으로써 RoundRobinPartitioner에 비해 향상된 성능을 가지게 되었습니다.

     

    프로듀서 주요 옵션

    프로듀서 애플리케이션을 실행할 때 설정해야 하는 필수 옵션 선택 옵션이 있습니다.

     

    필수 옵션

    • bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성합니다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정도 가능합니다.
    • key or value.serializer: 레코드의 메시지 키, 값을 직렬화하는 클래스를 지정합니다.

    선택 옵션

    • acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는 데에 사용하는 옵션입니다. 0, 1, -1(all) 중 하나로 설정할 수 있습니다. 설정값에 따라 데이터의 유실 가능성이 달라집니다.
    • buffer.memory: 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 지정합니다. 기본값은 32MB입니다.
    • retries: 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정합니다.
    • batch.size: 배치로 전송할 레코드 최대 용량을 지정합니다. 너무 작게 설정하면 프로듀서가 브로커로 더 자주 보내기 때문에 네트워크 부담이 있고, 너무 크게 설정하면 메모리를 더 많이 사용하게 되는 점을 주의해야 합니다. 기본값은 16384입니다.
    • linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간입니다. 기본값은 0 입니다.
    • partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정합니다.
    • enable.idempotence: 멱등성 프로듀서로 동작할지 여부를 결정합니다. 기본값은 false 입니다.
    • transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정합니다. 프로듀서의 고유한 트랜잭션 아이디를 설정할 수 있습니다. 이 값을 설정하면 트랜잭션 프로듀서로 동작합니다. 기본값은 null 입니다.

    커스텀 파티셔너를 가지는 프로듀서

    프로듀서 사용환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 할 때가 있습니다. 예를 들어 Pangyo라는 값을 가진 메시지 키가 0번 파티션으로 들어가야한다고 가정합시다. 기본 설정 파티셔너를 사용할 경우 메시지 키의 해시값을 파티션에 매칭하여 데이터를 전송하므로 어느 파티션에 들어갈지 알 수 없습니다. 이 때 Partitioner 인터페이스를 사용하여 사용자 정의 파티셔너를 생성하면 해당 키에 대해서 무조건 파티션 0번으로 지정하도록 설정할 수 있습니다. 토픽의 파티션 개수가 변경되더라도 고정됩니다.

     

     

    CustomPartitioner.class

    public class CustomPartitioner implements Partitioner {
    
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            if (keyBytes == null) {
                throw new InvalidRecordException("Need message key");
            }
    
            if (((String)key).equals("Pangyo")) {
                return 0;
            }
    
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }

     

    코드 설명은 아래와 같습니다.

    1. partition 메서드에는 레코드를 기본으로 파티션을 정하는 로직이 포함됩니다. 리턴값은 주어진 레코드가 들어갈 파티션 번호입니다.
    2. 레코드에 메시지 키를 지정하지 않은 경우에는 비정상적인 데이터로 간주하고 InvalidRecordException을 발생시킵니다.
    3. 메시지 키가 Pangyo일 경우 파티션 0번으로 지정되도록 0을 리턴합니다.
    4. Pangyo가 아닌 메시지 키를 가진 레코드는 해시값을 지정하여 특정 파티션에 매칭되도록 설정합니다.

     

     

    커스텀 파티셔너를 지정한 경우 ProducerConfigPARTITIONER_CLASS_CONFIG 옵션을 사용자 생성 파티셔너로 설정하여 KafkaProducer 인스턴스를 생성해야 합니다. SimpleProducer.class 설정에 추가합니다.

    Properties configs = new Properties();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
    KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

     


    브로커 정상 전송 여부를 확인하는 프로듀서

    KafkaProducer의 send() 메서드는 Future 객체를 반환합니다. 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재 되었는지에 대한 데이터가 포함되어 있습니다. 다음 코드와 같이 get() 메서드를 사용하면 데이터의 결과를 동기적으로 가져올 수 있습니다.

    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
    RecordMetadata metadata = producer.send(record).get();
    logger.info("{}", metadata.toString());

     

     

    main 함수를 실행한 결과, 정상적으로 RecordMetadata 인스턴스를 반환하는 것을 확인할 수 있습니다. 또한
    test2 토픽의 0번째 파티션에 적재 되었으며 해당 레코드에 부여된 오프셋 번호는 1번인 것을 확인할 수 있습니다.

    [main] INFO com.example.SimpleProducer - test2-0@1

     

     

    비동기로 결과를 조회

    그러나 동기방식으로 프로듀서의 전송 결과를 확인하는 것은 빠른 전송에 허들이 될 수 있습니다. 따라서 이를 원하지 않는 경우를 위해 비동기로 결과를 확인할 수 있도록 Callback 인터페이스를 제공하고 있습니다. 해당 인터페이스를 이용하여
    사용자 정의 Callback 클래스를 생성한 뒤, 레코드의 전송 결과에 대응하는 로직을 구현해보겠습니다.

     

    ProducerCallback.class

    public class ProducerCallback implements Callback {
    
        private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
    
        @Override
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null) {
                logger.error(e.getMessage(), e);
            } else {
                logger.info(metadata.toString());
            }
        }
    }


    onCompletion 메서드는 레코드의 비동기 결과를 받기 위해 사용합니다. 위 코드에서는 만약 브로커 적재에 이슈가 생겼을 경우 Exception에 어떤 에러가 발생하였는지 담겨서 메서드가 실행됩니다. 에러가 발생하지 않았을 경우에는 RecordMetadata를 통해 해당 레코드가 적재된 토픽 이름과 파티션 번호, 오프셋을 알 수 있습니다.

     

    SimpleProducer.class

    // async
    KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
    producer.send(record, new ProducerCallback()); // this

     

    SimpleProducer에서 위 코드로 수정이 필요합니다. 레코드 전송 후 비동기로 결과를 받기 위해서는 send() 메서드 호출 시 ProducerRecord 객체와 함께 사용자 정의 Callback 클래스를 넣으면 됩니다. 비동기로 결과를 받을 경우 동기로 결과를 받는 경우보다 더 빠른 속도로 데이터를 추가 처리할 수 있지만 전송하는 데이터의 순서가 중요한 경우 사용하면 안됩니다. 비동기로 결과를 기다리는동안 다음으로 보낼 데이터의 전송이 성공하고 앞서 보낸 데이터의 결과가 실패할 경우 재전송으로 인해 데이터 순서가 역전될 수 있기 때문입니다. 그러므로 데이터의 순서가 중요하다면 동기로 결과를 받아야 합니다.

     


    컨슈머 API

    프로듀서가 전송한 데이터는 카프카 브로커에 적재됩니다. 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 합니다. 예를 들어, 마케팅 문자를 고객에게 보내는 기능이 있다면 컨슈머는 토픽으로부터 고객 데이터를 가져와서 문자 발송 처리를 하게 됩니다.

     

    카프카 컨슈머 프로젝트 생성

    기본 설정으로 생성할 수 있는 오토 커밋(auto commit) 카프카 컨슈머 애플리케이션을 만들어보겠습니다. 프로젝트 설정은 별도 추가할 것은 없습니다.

    public class SimpleConsumer {
    
        private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
        private final static String TOPIC_NAME = "test2";
        private final static String BOOTSTRAP_SERVERS = "{Your Public Host IP}:9092";
        private final static String GROUP_ID = "test-group";
    
        public static void main(String[] args) {
            Properties configs = new Properties();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
            consumer.subscribe(Arrays.asList(TOPIC_NAME));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                records.forEach(record -> {
                    logger.info("{}", record);
                });
            }
        }
    }

     

    위 코드에서 컨슈머 그룹 이름을 선언한 이유는 컨슈머 그룹을 통해 컨슈머의 목적을 구분할 수 있기 때문입니다. 예를 들어, email 발송 처리를 하는 애플리케이션이라면 email-application-group으로 저장하여 동일한 역할을 하는 컨슈머를 묶어 관리할 수 있습니다.

     

     

    $ bin/kafka-console-producer.sh --bootstrap-server {Your Public Host IP}:9092 \
      --topic test2
    > test
    > testmessss
    > message2

    테스트를 위해 test2 토픽에 kafka-console-producer 명령으로 데이터를 넣겠습니다.

     

     

    위 로그를 확인했을때 정상적으로 test / rtestmessss / message2 Record를 Poll 하는것을 확인할 수 있습니다.

     

     

    이처럼 컨슈머 그룹을 나누는 것은 중요한 포인트입니다. 그 예시로 서버의 주요 리소스인 CPU, 메모리 정보를 수집하는 데이터 파이프라

    을 구축한다고 가정해봅시다. 실시간 리소스를 시간순으로 확인하기 위해서 데이터를 ElasticSearch에 저장하고 Hadoop에 적재합니다.

    이렇게 동기적으로 실행이 되는 ElasticSearch 혹은 Hadoop에 장애가 발생한다면 더는 적재가 불가능할 수도 있습니다. 반면, 카프카는

    최종 적재되는 저장소의 장애에 유연하게 대응할 수 있도록 각기 다른 저장소에 저장하는 컨슈머를 다른 컨슈머 그룹으로 묶음으로써 각 저

    장소의 장애에 격리되어 운영할 수 있습니다.

    만약 컨슈머 그룹의 컨슈머에 장애가 발생하면 어떻게 될까요? 이런 경우엔 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어가게 됩니다. 이러한 과정을 ‘리밸런싱(rebalancing)’이라고 부릅니다. 리밸런싱은 크게 두 가지 상황에서 일어나는데, 첫 번째는 컨슈머가 추가되는 상황이고 두 번째는 컨슈머가 제외되는 상황입니다. 이를 위해 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해놔야 합니다.

     


    오프셋 커밋

    컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 커밋(commit)을 통해 기록합니다. 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽(__consumer_offsets)에 기록됩니다.
    컨슈머 동작 이슈가 발생하여 __consumer_offsets 토픽에 어느 레코드까지 읽어갔는지 오프셋 커밋이 기록되지 못했다면 데이터 처리의 중복이 발생할 수 있습니다. 그러므로 오프셋 커밋을 정상적으로 처리했는지 검증해야만 합니다.

    오프셋 커밋은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행할 수 있습니다. 기본 옵션은 poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 설정되어 있습니다. 이렇게 일정 간격마다 자동으로 커밋되는 것을 ‘비명시 오프셋 커밋’ 이라고 합니다. poll() 메서드가 auto.commit.interval.ms에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋합니다. 편리하지만 poll() 메서드 호출 이후에 리밸런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 가능성이 있는 취약한 구조를 가지고 있습니다.

    명시적으로 오프셋을 커밋하려면 poll() 메서드 호출 이후에 반환받은 데이터의 처리가 완료 되고 commitSync() 메서드를 호출하면 됩니다. commitSync() 메서드는 poll() 메서드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행합니다. 다만 브로커에 커밋 요청을 하고 정상적으로 처리되었는지 응답하기까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼칩니다. 이를 해결하기 위해 commitAsync() 메서드를 사용하여 비동기 커밋 요청을 할 수 있습니다. 하지만 커밋 요청이 실패한 경우 현재 처리 중인 데이터의 순서를 보장하지 않고 데이터의 중복 처리가 발생할 수 있습니다.

     

    동기 오프셋 커밋

    poll() 메서드가 호출된 이후에 commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있습니다.


    정리

    이번 챕터에서는 실습용 프로듀서 애플리케이션을 설치하고, 토픽 생성 및 데이터를 전송, 수신하는 기능 실습을 진행해 봤습니다.
    또한 동기, 비동기적으로 커밋을 하는 방법과 차이점을 살펴봤습니다.

     

    https://github.com/bjpublic/apache-kafka-with-java

    https://product.kyobobook.co.kr/detail/S000001842177

    반응형

    댓글

Designed by Tistory.