본문 바로가기

Programming/Kafka

Apache Kafka 의 기본 아키텍쳐

반응형

Apache Kafka

아파치 카프카(Apache Kafka)는 분산 메시지 스트리밍 플랫폼이며 데이터 파이프 라인을 구성할 때 많이 사용된다. 카프카는 대용량 실시간 로그처리에 특화되어 개발된 오픈소스인 만큼, 분산 및 복제 구성과 더불어 Fault-Tolerant 한 안정적인 아키텍처와 빠른 성능으로 데이터를 처리할 수 있다.

 

서비스에서 사용하고 있던 리소스 공간은 서비스의 로직과는 별개로, 상황에 따라 필요가 없어질 수도 있고 확장이 필요할 수도 있다. 또한 최근 등장하고 있는 다양한 서버 디바이스 특성에 따라 비동기 메시징 프레임워크를 기반으로 통신할 필요가 있을 수 있다. (예를 들면 디바이스가 가진 네트워크 커넥션이 탄탄하지 못하기 때문에 네트워크 지연 가능성을 고려해야하는 경우가 있겠다.) 결국 이 같은 필요성은 느슨한 결합으로 구성된 컴퓨터 아키텍쳐를 선호하게끔 하였다. 이를 기반으로 등장한 비동기 메시징 프레임워크 중 하나가 아파치 카프카이다.

 

사실 아파치 카프카는 링크드인에서, 자사에서 발생하는 이슈를 해결하고자 개발된 기술이다. 링크드인에서 사용하던 기존 아키텍쳐에서는 1) 통합된 데이터 전송 영역의 부재로 데이터 처리의 복잡도가 증가했고, 2) 데이터 파이프라인의 관리가 어려웠다는 문제점이 있었다. 때문에 특정 부분을 수정해야할 때도 연관된 부분을 모조리 찾아 수정해야할 수도 있었다.

 

이 문제점을 해결하기 위해, 링크드인에서는 Kafka 라는 새로운 시스템을 구축하였으며, 다음 4가지 컨셉을 목표로 하였다.

 

  1. Publish 모델과 Subscribe 모델의 분리
  2. 영구성을 가지고 있는 데이터를 여러 Subscribe 모델이 사용할 수 있도록
  3. 높은 처리량을 위한 메시지 최적화
  4. 데이터의 증가에 따른 scale out

아파치 카프카가 개발된 배경을 간단하게 설명해보았는데, 이런 배경과 목적을 갖고 탄생한 카프카는 아키텍쳐의 관점에서 몇가지 특징을 가지고 있다. 먼저 카프카는 본래 메시지 스트리밍 플랫폼이기 때문에 레코드 스트림이 발생하면 -> 처리한다 는 컨셉을 지닌다. 따라서 실시간 애플리케이션 구축에 유용한데, 이 스트리밍을 consuming / producing 프로세스를 통해 실현시키도록 하였다. 한마디로, publish / subscribe 패턴을 지향하는 아키텍쳐로 개발되었는데, 이제 이 아키텍쳐와 그 구성 요소들에 대해 자세히 다뤄보겠다.

Apache Kafka : Pub / Sub 모델

Apache kafka 아키텍쳐

아파치 카프카는 중앙 집중형 메시지 관리 방식으로, 메시지의 생성과 소비와 관리를 완전히 독립시킨 구조이다. 메시지를 관리하는 데이터 큐를 중앙에 두고, 독립적인 데이터의 발행과 소비가 이루어진다. 이 같은 패턴을 publish / subscribe 모델 패턴이라 칭하며 이런 느슨한 결합을 통해 publisher 나 subscriber 에 장애가 발생할 때에도 의존성이 없으므로 안정적인 데이터 처리가 가능해진다.

 

카프카에서 publisher 와 subscriber 의 역할을 하고 있는 클라이언트는 각각 ProducerConsumer 이다. Producer 는 데이터를 push 하여 발행하고, Consumer 는 pull 하여 데이터를 처리할 수 있다. 이 때 카프카는 애초에 Dumb Broker, Smart producer/consumer 컨셉을 가지고 프로듀서를 중심적으로 설계된 플랫폼이기 때문에, Producer 와 Consumer 클라이언트의 설정값에 따라 서비스에 최적화된 성능을 가지고 데이터를 발행 및 처리할 수 있다.

Kafka Cluster

카프카가 지향하는 pub / sub 모델 패턴에서 메시지 관리를 담당한다. 카프카 클러스터는 임의 갯수의 노드로 구성되고 여기서 노드를 Kafka broker host 로 바꾸어 말할 수 있다. 보통 클러스터에는 1개 이상의 Kafka broker host (서버) 를 두어 특정 노드의 물리적 장애로 인한 데이터 유실을 대비하는데, 이와 관련된 특징에는 topic 과 partition 이 있다.

 

카프카 클러스터는 Topic 이라는 데이터 유닛을 임의 갯수만큼 호스팅하며 데이터를 카테고라이징하는 방식으로 관리해내고 있다. 데이터의 처리량을 높이기 위해 1개 Topic 은 1개 이상의 파티션(Partition)을 가지고 데이터를 분산시켜 저장하고 있다. 카프카가 가진 분산 처리의 특징은 여기에서 드러나는데, 카프카는 1개 토픽에 발행될 데이터를 특정 파티션으로 분산시켜 저장하고 있을 뿐만 아니라, 파티션 각각도 클러스터에 포함된 서버들에 분산된 형태로 나누어 처리될 수 있다.

 

클러스터 내에서 파티션 각각이 서버에 분산된 형태로 나누어 처리될 수 있다고 표현한 이유는 위 그림이 담고 있다. 기본적으로 토픽 내의 파티션이 추가 생성되면 카프카는 등록된 브로커 서버에 분산된 형태로 새 파티션을 배치(?) 시켜주는 개념이다.

 

1개 파티션은 리더팔로워가 존재한다. 카프카에서는 파티션마다 replica 를 설정해서 이를 실현하며, 사실 이 특징은 브로커 노드의 장애나 fail 에 대비하는 것에 목표를 두었다. replication factor 가 3이라면, 클러스터 내에 최소 3대의 브로커 서버 등록이 보장되어야 하며 1개의 리더 노드와 2개의 팔로워 노드로 구성될 수 있다. replication factor 가 2 이상으로 설정된 파티션은 반드시 팔로워 노드가 존재하는 데, 리더 노드는 클라이언트(Producer, Consumer) 를 통한 데이터 write (쓰기 연산) / read (읽기 연산) 를, 팔로워 노드는 데이터 복제만을 담당한다. 즉, 리더 노드에 발행된 데이터를 팔로워 노드가 복제만 해가는 구조이다. 이걸 부하 분산의 관점에서 다시 바라보면, 결국 각 파티션의 리더가 클러스터 내의 브로커들에 균등하게 분배되도록 설계되었다는걸 알 수 있다.

 

여기서 설정하게 되는 replication factor 는 파티션 각각에 대해 카프카 클러스터 내의 모든 브로커에 동일한 값으로 설정되어야 하는 값이다. 이 값으로 클러스터 내의 몇 개 브로커에 데이터를 저장 / 복제해둘지를 결정한다. 

 

토픽 내의 파티션은 이렇게 리더 - 팔로워 구조를 유지하고 있다가, 리더 노드로 등록된 브로커가 죽었을 때 팔로워들 중 하나를 다시 리더로 선출하여 데이터의 유실을 방지하고 복구를 진행할 수 있도록 한다. 따라서 N+1 의 replication factor 를 가진 TopicPartition 이라면 N 번의 장애까지 견딜 수 있다. 개인적으로는 카프카가 가용성을 확보하기 위한 수단 중 하나라고 생각한다.

 

이 같은 특징은 reassign-partitions.sh 를 통해 CLI 모드에서 커스터마이징할 수도 있다. 위 그림에서 설정된 아키텍쳐는 다음과 같이 출력될 수 있다. 여기서는 구체적으로, 생성된 토픽 파티션이 어떤 구조로 클러스터에 배치될 지를 결정할 수 있다.

 

 

클러스터 내의 브로커는 각각 id = 1, 2, 3 으로 설정되었다는 가정하에, 위 같은 출력이 나올 수 있다. replicas 프로퍼티가 담고 있는 리스트의 첫번째 요소는 리더 브로커의 id 이며, 그 다음부터 데이터를 복제해 갈 팔로워 노드 id 를 저장하고 있다. 뿐만 아니라 replicas 프로퍼티의 array size 는 replication factor 값을 말하고 있다. 따라서, 이 Topic Partition 각각은 모두 replication-factor 값이 2이다.

Zookeeper

카프카 아키텍쳐 내에서 카프카 브로커들을 하나의 클러스터로 코디네이팅 하기 위해서는 별도의 분산 코디네이팅 시스템이 필요하다. 따라서 카프카에서는 주키퍼를 두어, 카프카 클러스터의 설정 정보를 최신으로 유지하며 클러스터 내에서 브로커 서버가 추가되거나 삭제될 때 그 정보를 클러스터에 속한 서버들끼리 공유할 수 있도록 한다. 둘의 관계에서 클러스터 내의 브로커 서버는 주키퍼에게 토픽 메타데이터를 공유하고, 주키퍼는 카프카 클러스터의 리더를 발탁하는 방식을 제공하고 있다.

 

뿐만 아니라 이전 카프카 버전 (old consumer 라고 칭한다) 에서는 주키퍼가 직접적인 데이터 관리에도 참여했다. 컨슈머 측에서 읽은 파티션의 offset 정보를 주키퍼에게 공유했었는데, 현재 new consumer (아마 사용하시는 버전에 해당되고 있을 것이다) 에서는 이 offset 정보를 컨슈머 그룹에서 관리한다.

 

운영에 있어서는, 주키퍼가 과반수 방식으로 운영되고 있기 때문에 홀수개의 주키퍼 서버 운영을 추천하고 있다. 따라서 3개의 서버 중 1개만 죽으면 2개가 살아남기 때문에 정상 운영이 가능하지만, 4개 서버 중 2개가 죽으면 운영이 불가능하다. 

Producer

Producer 는 카프카 클러스터에서 호스팅하는 토픽에 데이터 입력을 요청할 수 있는 클라이언트이다. 정확히는, 토픽 파티션의 리더 노드에게 쓰기 연산에 대한 요청을 보내서 지정된 토픽 파티션에 메시지를 발행하는 주체이다. 이 때 메시지에는 header (String key, byte[] value), key, value 가 포함된다. 만약 key 를 별도로 지정하지 않는다면 카프카는 라운드로빈 방식으로 파티션에 메시지를 분배하여 발행하게 된다.

 

Producer 클라이언트는 서비스의 특성이나 상황에 따라 적합한 설정 정보를 구성하여 생성할 수 있다. 클라이언트 생성 시 프로퍼티로 셋팅하게 되는 configuration 값들이 그 역할을 하게 되는데, 카프카에서 설정된 디폴트 값들이 셋팅된 새로운 값들로 오버라이딩되는 개념이다. 대표적으로는 메시지 key / value 에 대한 serialization type config 를 지정하여 serializing 을 위한 타입을 지정할 수 있다. 

 

Producer config 에 대한 프로퍼티 옵션들과 그 디폴트 값은 공식문서나 많은 블로그에서 소개되고 있으므로, 리서치 중 특이하다고 느꼈던 것들 위주로 정리해보겠다. (공식문서 : https://kafka.apache.org/documentation/#producerconfigs)

 

  • acks (default = 1)
    • 개인적으로는 카프카에서 QoS 를 보장하는 수단이 될 수 있는 옵션이라 생각한다.
    • Producer 가 토픽 파티션의 리더 노드에게 메시지 발행을 요청한 후, 해당 요청을 완료하는 것에 대한 기준이 되는 설정이다.
    • acks 의 값이 클수록 퍼포먼스는 느려지고 안정성은 증가할 수 있다.
    • acks = 0 이면 프로듀서가 어떤 acks 응답도 기다리지 않는다. 즉, 카프카 서버가 데이터를 받았는지를 보장하지 않았으므로 전송 (요청) 실패에 따른 재요청도 없다.
    • acks = 1 이면 리더 노드가 메시지 발행 요청을 받은 건 확인하지만, 팔로워 노드가 그걸 복제해갔는지에 대해선 확인하지 않는다. 속도와 안정성 측면에서 가장 많이 쓰인다고 한다.
    • acks = all (-1) 이면 리더 노드 뿐만 아니라 모든 팔로워 노드들의 응답을 기다리므로 데이터가 손실되지 않는다.
  • max.in.flight.requests.per.connection (default = 5)
    • 공식문서 정의 : The maximum number of unacknowledged requests the client will send on a single connection before blocking. 
    • 프로듀서가 입력하는 데이터의 최소 단위는 batch.size 에 지정한 값이 되는데, 이 옵션으로는 발행될 이벤트에 대한 batch 단위의 순서를 보장할 수 있다. (물론 1개 파티션 내에서의 이야기가 되겠다.)
    • 예를 들어, 2개의 batch 레코드 셋이 1개 파티션으로 발행 요청된 상황이 있다고 가정해보자. 이 상황에서 첫번째에 발행된 레코드 셋은 요청에 실패하여 retry 를 시도하고 있지만, 두번째 발행된 레코드 셋은 곧바로 성공하였다. 
    • 주어진 상황에서 max.in.flight.requests.per.connection 이 1 보다 큰 값으로 설정되어 있다면, 두번째 batch 레코드셋이 먼저 발행에 성공할 수 있게 된다.
    • max.in.flight.requests.per.connection 을 1 로 설정한다면, 두번째 레코드셋은 첫번째에서 retry 를 성공할 때까지 발행 요청을 진행하지 않으므로, 파티션 내에서 프로듀서가 이벤트 발행을 요청한 순서를 보장한다. 다만 개인적으로는, 이렇게 되면 flexible 한 가용성을 충족시켜주진 못할 것 같다.

Consumer

Consumer 는 카프카 클러스터에서 호스팅하는 토픽에 발행된 데이터를 pull 방식으로 읽어들일 수 있는 클라이언트이다. 이런 행위를 보통 컨슈밍이라 칭한다. 위에서 설명했듯이 컨슈밍할 때에는 해당 파티션의 리더노드에게 그걸 요청한다. 

 

 

Consumer 를 설명할 때에 빠질 수 없는 건 consumer group 이다. 1개 토픽에 대해서 여러개 컨슈머 그룹이 각각 다른 목적으로 존재할 수 있으며, 1개 토픽에 발행된 데이터는 그 발행 횟수에 관계 없이 여러 컨슈머 그룹이 각자의 목적에 맞게 처리하기 위해 여러번 읽어갈 수 있다. 덧붙이자면, offset 을 지정한다면 1개 컨슈머 그룹이 같은 토픽 내의 데이터를 n 번 읽어갈 수 있으며 이 또한 발행 횟수와는 관계 없다. 

 

이는 카프카가 가진 스토리지 특성과도 관련이 있다. 카프카는 컨슈머에 의해 처리된 메시지를 곧바로 삭제하지 않고 그대로 저장했다가 수명이 지나면 삭제처리를 하는 시스템이다. 따라서 메시지 처리 도중에 문제가 생겼거나, 로직에 변경이 생겼을 경우 컨슈머가 처음부터 데이터를 다시 처리할 수도 있다. 

 

다시 offset 과 컨슈머 그룹에 대한 이야기로 돌아와보겠다. 각 컨슈머 그룹은 토픽 파티션에 대한 offset 정보를 관리하는데, 컨슈머 클라이언트 각각은 config 옵션 중 auto.offset.reset 을 통해 해당 offset 정보를 참조하여 데이터를 읽고 처리할 수 있다. 옵션 값으로 none 이 지정되면 컨슈머 그룹이 저장하고 있는 offset 정보를 읽어와서 해당 offset 부터 데이터를 읽어올 수 있게 된다. 

 

이 외에도 컨슈머 클라이언트 또한 서비스의 특성이나 상황에 따라 적합한 프로퍼티를 구성하여 디폴트 값에 오버라이딩시킬 수 있는데, 이 또한 공식문서에 잘 나와있으므로 몇 개만 살펴보겠다. (공식문서 : https://kafka.apache.org/documentation/#consumerconfigs)

 

  • max.poll.interval.ms
    • 컨슈머는 실제 데이터 polling 을 진행하고 있지 않을 때에도 컨슈머 그룹 멤버로서 지속적으로 존재하기 위해 카프카 클러스터 (coordinator) 에게 heartbeat request 를 전송한다.
    • 하지만 컨슈머 클라이언트가 클러스터에 heartbeat 는 보내지만 실제 데이터는 읽어가지 않는 상황이 길어질 경우, 파티션이 무한정으로 점유될 수 있다.
    • 이 같은 상황을 방지하기 위해 이 설정을 추가하여, 해당 값 (시간) 이 만료될 때 까지 message polling 을 시도하지 않는다면 컨슈머 그룹 구성에 변화가 생겼음 (해당 컨슈머가 장애라고 판단하고 그룹에서 제외시킬 수 있다) 을 인지할 수 있도록 한다.
  • max.poll.size
    • 컨슈머 클라이언트가 특정 토픽을 구독한 뒤부터 poll 작업이 이루어질 수 있는데, 이 때 poll() 한 번에 따라 가져올 수 있는 최대 레코드 수를 지정한다.
    • 이 때 카프카를 사용하는 애플리케이션에서는 poll 메소드를 호출해서 레코드를 가져오고, 특정 스레드에서 그걸 전부 처리해준 뒤 다시 poll 메소드를 호출해 새로운 레코드를 가져오게 된다.
    • 이 속성의 기본값은 500이므로, 기본 값을 사용하는 경우 poll 메소드로 한번에 최대 500개 레코드까지 가져올 수 있다.

max.poll.size 속성은 애플리케이션 특성에 따라 중요도가 클 수 있다.

 

일반적인 경우 서비스 내에서 1개 레코드를 처리하는 데에 걸리는 시간이 있고, poll 메소드를 통해 가져온 레코드를 전부 처리한 후에 다시 poll 을 호출하는 구조를 채택하고 있다. 이런 상황에서 서비스 내에서 기본 값인 500개의 레코드를 처리하는 시간은 500 x 1개 레코드를 처리하는 데 걸리는 시간 이 된다. 즉, poll() 메소드의 호출 간격은 500 x 1개 레코드를 처리하는 데 걸리는 시간이 된다.

 

한마디로 max.poll.size 속성을 통해 컨슈머의 poll 메소드 호출 간격을 결정할 수 있다. 특히 이 호출 간격이 길어진다는 것은 리벨런싱과 직접적으로 연관이 되기 때문에 배포되는 서비스의 경우 중요성을 인지할 필요가 있다. 

 

리벨런싱 과정은, 컨슈머 그룹 내의 모든 컨슈머들이 poll 메소드를 통해 그룹 join request 를 coordinator 에게 전송해야 완료될 수 있다. 리벨런싱에 참여하는 그룹의 컨슈머 A, B, C 가 존재한다고 가정해보겠다. 이 때 B 가 이전 poll 을 통해 가져온 레코드들을 전부 처리하지 못했기 때문에 다음 poll() 을 호출하지 못하고 있다면 어떤 일이 발생할까? Coordinator 는 컨슈머 B 로부터 조인 요청을 아직 받지 못했기 때문에 리더를 선출하지 못하며 A, C 는 이를 기다릴 수 밖에 없다.

 

예시로 든 상황은 poll 메소드의 호출 간격이 길어진다면 충분히 발생할 수 있다. 이런 경우 poll() 의 호출 간격이 길어졌기 때문에 리소스 리벨런싱이 완전히 끝날 때까지 걸리는 시간은 그만큼 길어질 수 밖에 없다. 

 

거의 실시간에 가까운 서비스를 제공해야 할 경우, 리벨런싱 시간이 길어진다는 건 데이터를 처리하지 못하는 시간이 지연된다는 의미와 같다. 따라서 리벨런싱 시간에 대한 조절이 필요할 텐데, 이 시간은 곧 poll() 을 통해 읽어온 레코드의 수 x 1개 레코드를 처리하는 데에 걸리는 시간 으로 결정된다. 때문에 해결안은, poll() 을 통해 읽어올 레코드 수를 줄이거나 1개 레코드를 처리하는 데에 걸리는 시간을 절약하는 것으로 좁혀질 수 있다.

 

이 같은 관점에서 max.poll.size 속성은 컨슈머의 처리 능력 뿐 아니라 애플리케이션의 동작 자체에도 영향을 주고 있다. 사실 리벨런싱은 예시로 든 상황보다 훨씬 다양하게 애플리케이션에 영향을 줄 수 있으며, 현재 소개되고 있는 카프카의 버전에서는 리벨런싱을 유연하게 조절하게 될 수도 있다. 카프카 컨슈머와 리벨런싱은 카프카를 사용하는 애플리케이션을 개발하는 중이라면 반드시 고려해야할 사항이므로, 다음 포스트에서 조금 더 자세히 다루어보겠다.

반응형