✍️ Kafka Topics, Producer, Consumer를 학습하고 실습을 진행 해보겠습니다.
🚀 Apache Kafka의 토픽(Topic)이란 ?
토픽(Topic)은 데이터의 주제나 카테고리를 나타내는 개념입니다. 토픽은 Kafka에서 메시지를 구분하고 분류하는 데 사용됩니다. 간단히 말하면, 특정 주제에 관련된 데이터를 포함하는 카프카의 논리적인 채널이라고 생각할 수 있습니다. 여러 프로듀서(데이터를 생성 및 전송하는 역할)가 특정 토픽으로 데이터를 전송할 수 있고, 이를 통해 여러 컨슈머(데이터를 소비하는 역할)가 해당 토픽의 데이터를 읽을 수 있습니다.
토픽은 Kafka 클러스터 내에서 파티션으로 나뉠 수 있습니다. 각 파티션은 일련의 메시지를 저장하며, 특정 파티션의 데이터는 순서가 보장됩니다. 토픽은 데이터의 논리적인 그룹화를 제공하여 시스템을 쉽게 확장하고 유지보수할 수 있도록 돕습니다. 예를 들어, 특정 주제(토픽)에 관련된 데이터를 별도로 처리하거나 분석할 수 있습니다.
kafka-topics.sh
이 커맨드 라인 툴을 통해 토픽(topic)과 관련된 명령을 실행할 수 있습니다. 토픽은 카프카에서 데이터를 구분하는 가장 기본적인 개념입니다. RDBMS에서 사용하는 테이블(table)과 유사하다고 볼 수 있습니다. 카프카 클러스터에서 토픽은 여러개 존재할 수 있으며, 파티션(partition)이 존재합니다. 파티션의 개수는 최소 1개부터 시작되며, 파티션을 통해 한 번에 처리할 수 있는 데이터양을 늘릴 수 있고 토픽 내부에서도 파티션을 통해 데이터의 종류를 나누어 처리할 수 있습니다.
💡 토픽을 생성하는 2가지 방법
토픽을 생성하는 상황은 크게 2가지가 있습니다. 첫 번째는 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할 때, 그리고 두 번째는 커맨드 라인 툴로 명시적으로 토픽을 생성하는 것입니다. 토픽을 효과적으로 유지보수 하기 위해서는 명시적으로 생성하는 것을 추천합니다. 토픽마다 처리되어야 하는 데이터의 특성이 다르기 때문입니다.
예를 들어, 동시 데이터 처리량이 많아야 하는 토픽의 경우 파티션의 개수를 100으로 설정할 수 있습니다. 단기간 데이터 처리만 필요한 경우에는 토픽에 들어온 데이터의 보관기관 옵션을 짧게 설정할 수도 있습니다. 이런 이유로 토픽에 들어오는 데이터의 양과 병렬로 처리되어야 하는 용량을 잘 파악하여 생성하는 것이 중요합니다.
💡 해당 포스트에서는 EC2 인스턴스를 중지 시켜놓고 다시 실행해서 실습을 진행중이기 때문에 EC2 Public IP가 변경 됩니다. 참고 부탁드립니다.
토픽 생성
kafka-topics.sh를 통해 토픽 관련 명령어에 --create 옵션을 사용하여 hello.kafka라는 이름을 가진 토픽을 생성하겠습니다.
-- 주키퍼 실행
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ jps -m
-- 카프카 브로커 실행
$ bin/kafka-server-start.sh -daemon config/server.properties
$ tail -f logs/server.log
-- 토픽 생성
$ bin/kafka-topics.sh --create --bootstrap-server {Your EC2 Public IP}:9092 --topic hello.kafka
options - create: 토픽 생성하는 명령어라는 것을 명시합니다. - bootstrap-server: 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 Port를 명시. 여기서는 1개의 브로커와 통신하므로 ec2 public ip:9092만 입력합니다. - topic: 토픽 이름을 명시합니다. 카프카를 운영하다보면 다양한 데이터 처리를 위해 수많은 토픽이 만들어지기 때문에 유지보수를 위해 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천합니다.
hello.kafka 토픽이 브로커에 설정된 기본값으로 정상적으로 생성된 것을 확인할 수 있습니다.
토픽 생성시 옵션 지정
파티션 개수 / 복제 개수 / 토픽 데이터 유지 기간 옵션들을 지정하여 토픽을 생성하고 싶다면 다음과 같이 명령을 실행하면 됩니다.
--config를 통해 kafka-topics.sh 명령에 포함되지 않은 추가적인 설정을 할 수 있습니다. retention.ms는 토픽 데이터를 유지하는 기간을 뜻합니다. 172800000ms는 2일을 단위로 나타낸 것입니다. 이 기간이 지나면 데이터는 삭제됩니다.
hello.kafka.2 토픽이 정상적으로 생성된 것을 확인할 수 있습니다.
토픽 리스트 조회
아래 명령어로 토픽 리스트를 조회했을 때, 생성한 토픽 목록이 정상적으로 출력 되는 것을 확인할 수 있습니다.
$ bin/kafka-topics.sh --bootstrap-server {Your EC2 Public IP}:9092 --list
hello.kafka
hello.kafka.2
💡 토픽 생성 시 --zookeeper가 아니라 --bootstrap-server 옵션을 사용하는 이유
카프카 2.1 버전을 포함한 이전 버전에서는 일부 카프카 커맨드 라인툴이 주키퍼와 직접 통신하여 명령을 실행했지만 주키퍼와 직접 통신하여 명령을 처리하는 것은 아키텍처의 복잡도를 높이기 때문에 2.2 버전 이후로는 카프카와 직접 통신하여 토픽 관련 명령을 실행할 수 있습니다.
이미 생성된 토픽의 상태를 --describe 옵션을 사용하여 확인할 수 있습니다. 파티션 개수 / 복제된 파티션이 위치한 브로커의 번호 / 기타 토픽 구성 설정들을 출력합니다.
토픽 옵션 수정
토픽에 설정된 옵션을 변경하기 위해서는 kafka-topics.sh 또는 kafka-configs.sh 두 개를 사용해야 합니다. 파티션 개수 변경을 하려면 kafka-topics.sh를 사용해야 하고 토픽 삭제 정책인 리텐션 기간을 변경하려면 kafka-configs.sh를 사용해야 합니다. 해당 명령어를 통해 파티션 개수를 3개에서 4개로 늘리고, 리텐션 기간은 172800000ms 에서 86400000ms(1일)로 변경 해보겠습니다.
이렇게, 문자를 작성하고 엔터를 누르면 별다른 응답 없이 메시지 값이 전송 됩니다. 여기서 주의할 점은 kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화 된다는 점입니다. 즉 String이 아닌 타입으로는 직렬화하여 전송할 수 없습니다. 만약 다른 타입으로 직렬화하여 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야 합니다.
key.separator: 메시지 키와 메시지 값을 구분하는 구분자를 선언합니다. default 설정은 Tab delimiter(\t) 입니다.
이처럼 메시지 키와 메시지 값을 함께 전송한 레코드는 토픽의 파티션에 저장됩니다. 메시지 키가null인 경우에는 프로듀서가 파티션으로 전송할 때 레코드 배치 단위(레코드 전송 묶음)로 라운드로빈으로 전송합니다. 메시지 키가 존재하는 경우에는 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당됩니다.
이로 인해 메시지 키가 동일한 경우 동일한 파티션으로 전송됩니다. 이와 같은 역할은 프로듀서에 설정된 파티셔너가 결정합니다.
kafka-console-consumer.sh
hello.kafka 토픽으로 전송한 데이터는 kafka-console-consumer.sh 명령어로 확일할 수 있습니다. 추가적으로, --from-beginning 옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력할 수 있습니다.
여기서 --group 옵션을 통해 신규 컨슈머 그룹(consumer group)을 생성 했습니다. 1개 이상의 컨슈머로 이루어져 있고, 이 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋(commit) 합니다. 커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것입니다. 커밋 정보는 __consumer_offsets 이름의 내부에 저장됩니다.
또한 위 출력 내용을 봤을 때, 프로듀서 명령어로 전송했던 데이터의 순서가 현재 출력되는 순서와 다르다는 것입니다. 이는 카프카의 핵심인 파티션 개념 때문에 생기는 현상입니다. kafka-console-consumer.sh 명령어를 통해 토픽의 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져갑니다. 만약 데이터의 순서를 보장하고 싶다면 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것입니다.
kafka-consumer-groups.sh
hello-group 이름의 컨슈머 그룹으로 생성된 컨슈머로 hello.kafka 토픽의 데이터를 가져갔습니다. 컨슈머 그룹은 따로 생성하는 명령을 날리지 않고 컨슈머를 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성 됩니다. 생성된 컨슈머 그룹의 리스트는 kafka-consumer-groups.sh 명령어로 확인할 수 있습니다.
리스트 조회
$ bin/kafka-consumer-groups.sh --bootstrap-server {Your EC2 Public IP}:9092 --list
hello-group
이처럼 컨슈머 그룹의 상세 정보를 확인하는 것은 컨슈머를 개발할 때 중요하게 활용됩니다. 대표적으로 아래 4가지입니다. (1) 컨슈머 그룹이 중복되는지 확인 (2) 컨슈머의 랙(LAG)이 증가하고 있는지 확인 (증가시, 프로듀서가 데이터를 토픽으로 전달하는 속도에 비해 컨슈머의 처리량이 느리다는 증거) (3) 카프카에 연결된 컨슈머의 호스트명 또는 IP 확인 (4) 접근 중인 컨슈머의 정보를 토대로 카프카가 인가된 사람에게만 사용중인지 확인
kafka-verifiable-producer.sh
kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고 받을 수 있습니다. 카프카 클러스터 설치가 완료된 이후에 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할 때 유용합니다.
max-messages는 kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정합니다. 만약 -1을 옵션값으로 입력하면 kafka-verifiable-producer.sh가 종료될 때까지 계속 데이터를 토픽으로 보냅니다. 전송한 데이터는 kafka-verifiable-consumer.sh로 확인할 수 있습니다.
이미 적재된 토픽의 데이터를 지우는 방법으로 kafka-delete-records.sh를 사용할 수 있습니다. kafka-delete-records.sh는 이미 적재된 토픽의 데이터 중 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터 특정 시점의 오프셋까지 삭제할 수 있습니다.
예를 들어, test 토픽의 0번 파티션에 0부터 100까지 데이터가 들어 있다고 가정했을때, test 토픽의 0번 파티션에 저장된데이터 중 0부터 30오프셋 데이터까지 지우고 싶다면 다음과 같이 입력할 수 있습니다.
$ vi delete-topic.json
{"partitions": [{"topic": "test", "partition": 0, "offset": 50}], "version":1 }
$ bin/kafka-delete-records.sh --bootstrap-server {Your EC2 Public IP}:9092 \
--offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 50
여기서 주의해야 할 점은 토픽의 특정 레코드 하나만 삭제되는 것이 아니라 파티션에 존재하는 가장 오래된 오프셋부터 지정한 오프셋까지 삭제된다는 점입니다. 카프카에서는 토픽의 파티션에 저장된 특정 데이터만 삭제할 수는 없다는 점을 명시해야 합니다.
정리
이번 챕터에서는 실습용 카프카를 설치하고 관련 명령어를 실행하여 토픽을 생성, 수정하고 데이터를 전송(프로듀서)하고 받는(컨슈머) 실습을 진행해 봤습니다. 카프카에서 제공하는 다양한 명령어로 카프카의 핵심 기능을 사용할 수 있다는 것을 알게 되었습니다. 이 명령어들은 카프카 운영 시 자주 사용되므로 손에 익히는 것이 좋습니다.