🐌 학습노트/낙서장

[Kafka #2] 카프카 프로듀서

mini_world 2025. 9. 16. 23:46
목차 접기

 

프로듀서란?

프로듀서는 어플리케이션에서 Kafka로 메세지를 발행(produce)하는 역할을 수행한다.
프로듀서는 특정 토픽에 전송하고, 카프카 브로커가 이를 파티션단위로 저장한다.
이벤트 스트리밍(사용자행동기록)/ 성능 매트릭 기록/ 로그수집/ 실시간 데이터 파이프라인에 사용된다.

 

https://www.confluent.io/blog/kafka-producer-internals-preparing-event-data/

프로듀서에서 생성하는 메세지는 키-밸류 형식이다.
어플리케이션 코드에서 이벤트/데이터를 메세지 객체로 생성하고, 메세지가 전송될 토픽을 지정한다.
이때, 메세지의 키는 어떤 파티션으로 전송될지 지정된다. (키가 없으면 라운드로빈)

메세지를 브로커에 전송할때는 크게 동기/비동기 방식이 있지만, 보통 비동기 전송 방식을 사용한다.

  • Fire and forget: 메세지 전송만하고 확인 안함
  • Synchronous send: 메세지 동기전송 ( send() 호출 후 Future객체 대기)
  • Asynchronous send : 메세지 비동기전송 ( send() 호출 후 콜백)

프로듀서는 세가지 필수 속성값을 갖는다.

  • bootstrap.servers: 프로듀서가 사용할 호스트 목록 지정
  • key.serializer: 키값으로 쓰일 객체를 직렬화 하기 위한 시리얼라이저 클래스 이름
  • value.serializer: 밸류값으로 쓰일 객체를 직렬화 하기 위한 시리얼라이저 클래스 이름

 

프로듀서 주요 파라메터

https://docs.confluent.io/platform/current/clients/producer.html

 

Kafka Producer for Confluent Platform | Confluent Documentation

The full list of configuration settings are available in Kafka Producer Configurations. The key configuration settings and how they affect the producer’s behavior are highlighted below. Core Configuration These settings are the same for Java, C/C++, Pyth

docs.confluent.io

프로듀서의 주요 파라메터는 위 링크에서 확인 할 수 있다. 
전체가 아닌 중요한 부분만 짧게 확인해보자.

메시지 지속성 acks 이 설정을 통해 Kafka에 작성된 메시지의 내구성을 제어할 수 있다.
- 기본값 all: 파티션 리더가 쓰기를 수락할 뿐만 아니라 모든 동기화된 복제본에 성공적으로 복제됨을 보장 (신뢰성 높지만 레코드 전송 느림)
- 1: 1파티션 리더로부터 쓰기가 성공했다는 명시적인 확인이 필요함
- 0: 처리량을 극대화하지만 브로커가 응답을 보내지 않으므로 메시지가 브로커 로그에 성공적으로 기록되었다는 보장은 없음 (신뢰성 낮지만 레코드 전송 빠름)

다만, 컨슈머가 값을 읽을 수 있는 시간까지의 의미의 종단간 지연의 경우 세값이 모두 같다.
카프카는 일관성을 위해 모든 인-싱크 레플리카에 복제가 완료된 후에 컨슈머가 레코드를 읽을 수 있기 때문이다.
메시지 순서 retries 일반적으로 메시지는 프로듀서 클라이언트가 수신한 순서대로 브로커에 기록되지만,이 설정에 따라 순서가 변경될 수 있다.

- 기본값 0, 메시지 재시도를 활성화 수
메시지 순서 max.in.flight.requests.per.connection 순서를 변경하지 않고 재시도를 활성화하려면 이 속성을 1로 설정한다.
프로듀서가 서버로부터응답을 받지 못한 상태에서 전송할 수 있는 최대 메세지의 수를 결정하는 옵션이다. 

 

시리얼라이저

시리얼라이저란, 데이터를 한 형태에서 다른 형태로 변환하는 도구를 말한다.

브로커는 바이트 단위로 처리한다.

카프카 프로듀서가 메세지를 브로커에 보내려면 메세지의 키와 값을 바이트 배열로 변환하야 하며,
이 작업을 하는것이 시리얼라이저 이다. (객체 -> 바이트 변환)

카프카에서 기본으로 제공하는 시리얼라이저가 존재하지만, 스키마 기반의 시리얼라이저(Apache Avro, Protobuf, Thrift)를 사용하는것이 좋다.

https://medium.com/@affanhasan88/how-to-publish-and-consume-avro-encoded-apache-kafka-messages-using-java-44ed42890637

 

Json/String 직렬화는 쉽지만 명확한 데이터 구조(스키마)가 없기때문에 필드를 변경하려고 할때 문제가 발생한다.
이때, 스키마 기반의 시리얼라이저를 사용하면 데이터 구조가 바뀌어도 호환성을 유지할 수있다.

별도의 스키마 레지스트리가 존재하며, 메세지를 직렬화 할때 데이터와 스키마 정보를 함께 관리한다.

 

파티션

토픽은 여러개의 파티션으로 나뉘어 저장되며, 각 파티션은 순서가 보장된 로그 구조이다.

프로듀서에서 메세지를 보낼 때 토픽/키/값/파티션 정보를 ProducerRecord에 담는다. 
(메세지 작성 과정에서 파티션 지정 가능한 구조)

# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?_gl=1*1jdp8yp*_gcl_aw*R0NMLjE3NTgwMzE5MTkuRUFJYUlRb2JDaE1JMklmRW5MdmRqd01WNkRKN0J4M250Q24zRUFBWUFTQUFFZ0tBaFBEX0J3RQ..*_gcl_au*MjA3MjYxMjE4OC4xNzU4MDMxMTI4*_ga*ODYxODc0NTAwLjE3NTgwMzExMjg.*_ga_D2D3EGKSGD*czE3NTgwMzExMjgkbzEkZzEkdDE3NTgwMzMzNDAkajYwJGwwJGgw&_ga=2.269062833.1731004784.1758031128-861874500.1758031128#confluent_kafka.Producer.produce
produce(topic[, value][, key][, partition][, on_delivery][, timestamp][, headers])

프로듀서에서 파티션 번호를 지정한 경우에는 지정한 파티션으로 바로 전송된다. (직접 제어)
키가 있는경우에는 키를 해시하여 파티션을 정한다.
키가 없는경우 라운드로빈 또는 스티키 파티셔너 방식으로 파티션을 고른다.

운영시 주의할점 

  • 핫파티션: 특정 파티션만 과부하 (병목)
  • 파티션 수 변경 문제: hash(key) / numPartitions 이기때문에, 파티션 개수가 달라지면 기존 키의 파티션 위치가 달라져 순서가 깨짐

 

헤더

카프카 메세지는 키/ 값/ 토픽/ 파티션/ 오프셋으로 구성되지만 헤더를 추가할 수 있다.
메세지의 값과는 별도로 추가적인 메타데이터를 헤더로 담을 수 있다.

헤더는 메세지 본문을 건드리지 않고 부가 정보를 전달할때 유용하게 쓰일 수 있다.
예를들어 트레이싱/로깅/라우팅정보전달/ 메세지 포멧 버전/ 우선순위/ 보안&인증 등에 쓰일 수 있다.

주의할점은 헤더는 메세지 본문과 독립적으로 구성되어 별도 처리가 가능하고,
브로커는 헤더에 관심이 없다.

 

인터셉터

인터셉터(Interceptor)는 프로듀서와 컨슈머 양쪽에 다 있는 개념이며,
메세지를 브로커에 보내거나 읽기 전에 가로채서 추가 동작을 할 수 있게 해주는 훅(hook) 이다.

프로듀서 인터셉터로 프로듀서가 메세지를 브로커에 전송하기 직전 또는 전송 결과를 받은 직후에 동작하는 콜백 로직을 추가할 수 있다. 

일반적으로 운영 편의성과 모니터링 목적으로 쓰이며, 코드를 변경하지 않고 작동을 변경해야 하는 경우일때 유용하게 쓰일 수있다.

 

728x90