Programming/Kafka

Kafka Rebalancing 의 이해와 동작 과정

forky.kim 2020. 6. 3. 19:12
반응형

 

카프카 컨슈머 그룹을 구성하여 데이터를 처리하게 된다면, 운영 상황에서 다양한 경우의 수를 마주할 수 있다. 그룹에 참여하는 컨슈머 클라이언트 구성에 변화가 생길 경우 이 변화를 반영하기 위해 카프카에서는 일련의 과정이 필요한데, 이걸 rebalancing (리벨런싱) 이라고 한다. 이번 포스팅에서는 카프카 컨슈머 그룹과 리벨런싱에 대해 이해해보고, 리벨런싱이 어떻게 동작하는지 정리해보겠다.

 

그 전에, 이 내용들은 카프카 아키텍쳐를 먼저 알고 있어야 할 것 같은데, 필요하다면 이 링크 글을 참고하길 바란다.

-> https://joooootopia.tistory.com/29

 

Apache Kafka 의 기본 아키텍쳐

Apache Kafka 아파치 카프카(Apache Kafka)는 분산 메시지 스트리밍 플랫폼이며 데이터 파이프 라인을 구성할 때 많이 사용된다. 카프카는 대용량 실시간 로그처리에 특화되어 개발된 오픈소스인 만큼,

joooootopia.tistory.com

kafka Consumer Group

카프카는 consumer group 이라는 논리적인 멤버십을 제공함으로서 특정 토픽에 발행된 메시지를 각 그룹에서 목적에 맞게 읽고 처리할 수 있도록 한다. 하지만 컨슈머 그룹과 그 멤버 인스턴스, 그리고 토픽의 파티션과의 관계는 단순한 데이터처리 이상이다.

 

컨슈머 인스턴스가 가진 제약사항에는 카프카에서 1개 파티션은 컨슈머 그룹 내의 최대 1개 인스턴스까지만 접근이 가능하다 라는 게 있다. 이 제약사항은 곧, 1개 컨슈머 그룹 인스턴스 수가 토픽이 가진 파티션 갯수보다 많을 수는 없다는 특징을 보장한다. 정확히는 파티션 수가 컨슈머 그룹 내에서 실제 컨슈밍을 진행하고 있는, 최대 active consumer 갯수를 제한한다고도 말할 수 있다. 카프카 컨슈머는 컨슈머 그룹과 파티션의 관계를 통해 분산 처리를 실현하고 있는데, 이런 맥락에 기반하여 로드 밸런싱이 이루어지며 분산 처리 해가는 구조이다. 

 

좀 더 자세한 상황을 들어보겠다. 1개 컨슈머 그룹이 있고, 여기 속해있는 3개 컨슈머 인스턴스와 토픽에 생성된 3개의 파티션이 존재한다고 가정해보겠다. 여기서 컨슈머-1 이 셧다운 되었을 때 원래 이 컨슈머에 할당되어 있었던 파티션 (리소스) 은 재할당이 필요하게 된다. 이 때 이 컨슈머 그룹 구성을 정리하고 (컨슈머-1 은 어쨌거나 그룹에서 제외된 상황이므로) 파티션 재할당 작업을 진행하는 것에 대해, 컨슈머 그룹 안에서 리벨런싱이 발생했다고 말할 수 있고 이건 카프카 컨슈머의 로드밸런싱 방법이다. 

 

따라서 리벨런싱을 정의해보자면, 컨슈머 클라이언트를 사용하여 필요한 논리적인 그룹을 형성하고, 그 그룹 멤버들끼리 리소스 (파티션) 을 적절히 분배하는 프로세스 라고 할 수 있다.

 

다른 관점으로 바라보자면, 리벨런싱을 통해서는 새로 참여하게 되거나 제외시키고자 하는 클라이언트를 dynamically 하게 반영할 뿐 아니라 클라이언트의 최신 상황에 맞추어 데이터를 지속적으로 처리해갈 수 있도록 한다. 따라서 카프카의 특징 중 확장성가용성을 보장해주는 수단이라고 할 수 있을 것 같다. 

Kafka Rebalance Protocol

이렇게 컨슈머 인스턴스가 어떤 그룹과 잘 cooperating 할 수 있도록, 카프카에서는 Kafka Rebalance Protocol 이라는 걸 제공한다. 컨슈머 클라이언트 입장에서 보자면, 토픽을 컨슈밍할 논리적인 그룹을 형성하고 리더 역할을 할 클라이언트를 선출하며 파티션을 분배하는 과정이 결국 Cooperating 하는 방법이라고 할 수 있다. (컨슈머 그룹에서 리더 노드는 멤버들에게 자원을 분배하는 데에 역할을 가지고 있는데, 아래에서 자세히 설명하겠다.)

 

즉, 카프카 클라이언트는 Group Management 라는 방법을 사용해서 클라이언트들을 논리적인 그룹에 참여시킴으로서 cooperating 하려고 한다. Cooperating 과정에서, 그룹 코디네이터(Group Coordinator)는 GroupCoordinator 인스턴스를 백그라운드 프로세스로 실행하면서 컨슈머 그룹을 관리하는 역할을 가진 카프카 브로커이다. 컨슈머 그룹 멤버십에 변화가 발생했을 때, 리벨런싱 과정에서 컨슈머 클라이언트와 그룹 코디네이터 간의 요청/응답 메세지 전달이 이루어진다.

 

하지만 Kafka Rebalance Protocol 에서는 그룹 코디네이터의 개입을 여기까지만으로 제한시키면서 역할을 축소시키는 특징이 있다. 사실상 실질적인 리소스 재분배 작업은 컨슈머 클라이언트끼리만 하는 일이고, 코디네이터는 그룹의 형성 자체만을 담당한다고 봐도 무방하다. 

 

결과적으로 카프카 컨슈머의 Load Distribution 은 그룹 멤버십의 범주 안에서 리더를 선출하고, 멤버 클라이언트가 직접 프로토콜을 정의 (부하 분배에 대해 어떤 strategy 를 따를건지 등에 대한 정의가 있을 수 있다. ) 하는 작업으로 구성된다. 다시 말해 Group Management Protocol 의 영향을 받아 Distribution 을 수행한다고 할 수 있다. 

 

이처럼 카프카의 Rebalancing Protocol 에서 코디네이터의 역할 대신 클라이언트 단의 역할이 크게 작용하게 되면서, 카프카 리벨런싱은 몇가지 장점을 가지게 되었다. 

 

먼저, 그룹 코디네이터 역할을 하는 브로커 도움 없이 클라이언트가 로드벨런싱 알고리즘을 수행할 수 있게 되었는데, 이걸 공식 문서로는 autonomy 라고 표현하고 있다. 따라서 리벨런싱의 과정에서 카프카 브로커는 제네릭한 그룹 멤버십 API 를 제공하고, 클라이언트는 로드벨런싱 자체의 디테일 작업에 집중하며 결과적으로는 브로커의 코드를 단순화시킬 수 있게 되었다. 

 

Kafka Connect 같은 클라이언트는 자원(테스크)을 서로 다른 유저에게 분배한다. Kafka Rebalance Protocol 에서는 이 Distribution 정보 또한 클라이언트가 관할하기 때문에, 발생하는 멀티 테넌시를 클라이언트 레벨에서 쉽게 다룰 수 있게 한다는 이점이 있다. 

카프카 리벨런싱의 동작 : Rebalancing Round

Kafka Rebalance Protocol 에서는 리소스가 그룹 멤버들 (컨슈머 클라이언트) 사이에서 적절하게 분리되어야 할 때마다 리벨런싱의 새로운 라운드가 시작되어야 한다는 원칙이 지켜져야 한다. 카프카에서 1번의 리벨런싱 발생은 곧 어떤 리벨런싱 라운드가 1번 완료되었음을 뜻한다. 위에서 언급했듯이 이 과정에서 그룹 멤버십 프로토콜의 조건을 충족시켜가면서 컨슈머 그룹 내에서 파티션을 분배하는 작업을 수행하게 된다.

 

kafka rebalacing 의 동작 과정

group id = A 인 컨슈머 그룹에 C-3 이라는 새로운 컨슈머 멤버가 추가되었다고 가정해보겠다. 이 상황에서 리벨런싱은 위 그림과 같은 과정으로 동작하게 될 것이다. 

 

C-3 이 스타트되면, 그룹 멤버십 구성에 변화가 생겼으므로 변화를 반영시켜주기 위해 리벨런싱 라운드를 시작해야한다. 따라서 group.id = A 에 참여할 컨슈머들은 특정 카프카 브로커를 Group Coordinator 자격으로 얻기 위해 FindCoordinator request 를 브로커에게 보낸다.

JoinGroup

그 뒤로 컨슈머 클라이언트들은 JoinGroup request 를 전송하며 Rebalance Protocol 을 초기화한다. 여기서 JoinGroup request 메시지에선 다음 필드를 포함한다. 

(JoinGroup request 는 자바 애플리케이션에서 클라이언트가 poll() 메소드를 호출하며 이루어진다.)

 

  • group id
  • session timeout
  • rebalance timeout
  • protocol type
  • group protocols
    • protocol name
    • protocol metadata

session timeout 과 rebalance timeout 속성으로 하여금 Coordinator 는 특정 컨슈머가 유효한지 판단할 수 있다. 이 중 session timeout 속성에 대해 다루어보겠다.

 

Coordinator 는 리벨런싱 라운드에서 어떤 컨슈머를 그룹에 참여시킬지 말지 (컨슈머가 유효한지 여부) 를 결정하게 되는데, 그걸 판단하는 방법 중 하나는 하트비트이다. 그룹 멤버가 될 컨슈머들은 heartbeat.interval.ms 속성으로 가지고 있는 시간마다 한번씩 코디네이터 브로커에게 하트비트 요청 메시지를 전송하며 Session Alive 신호를 보낸다. 리벨런스 프로세스에 들어가게 되면 코디네이터는 그 하트비트를 기준으로 컨슈머를 그룹에 (re)join 시킬지 판단한다. 

 

클라이언트 단에서 JoinGroup 요청을 전송할 때 메시지에 session.timeout.ms 속성을 포함하는데, 등록된 시간만큼 하트비트가 도착하지 않는다면 그룹 코디네이터는 해당하는 컨슈머 클라이언트가 죽었다고 판단한다. 예를 들어, 그룹 내 1개 컨슈머가 의도적으로 Stop 하기 위해 LeaveGroup Request 를 코디네이터에게 전송했다고 가정해보겠다. 이 때 나머지 컨슈머들은 다음 하트비트 때에 리벨런싱이 수행될 것을 알 수 있고, 새로운 리벨런싱 라운드로 파티션 재분배가 발생할 수 있다. 

 

group protocols 에서는 protocol name 과 protocol metadata 필드를 포함한다. protocol name 에서는 파티션을 할당받을, 유효한 컨슈머 목록이 등록된다. 이 속성은 클라이언트 단에서 partition.assignment.strategy 프로퍼티로 제어할 수 있고, 자바에서는 아파치 카프카 클라이언트가 제공하는 ConsumerPartitionAssignor 인터페이스를 구현하면서 커스터마이징이 가능하다.

 

이렇게 Group Management 에 참여하는 모든 컨슈머들이 코디네이터에게 JoinGroup 요청을 전송하고 나면, 코디네이터 브로커는 이에 대한 Response 를 전송한다. 이 과정에서 컨슈머 그룹 내 리더 노드가 선출되며, 리더 컨슈머는 그룹의 partition assignment 를 담당한다.

 

JoinGroup Response 에서 브로커가 전송하는 메시지는 컨슈머의 역할에 따라 다르다. 특정 컨슈머의 경우 active member 목록과 resource assignment 전략을 받고 그룹 리더로 선정된다. 이 때 나머지 컨슈머들은 empty response 를 받게 된다. 

SyncGroup

이제 Group Management 에 참여하고 있는 모든 컨슈머 멤버들은 코디네이터에게 SyncGroup Request 를 전송한다. 여기서 리더 컨슈머는 메시지에 다음 속성을 담아 보낸다.

 

  • group id
  • generation id
  • member id
  • group assignment
    • member id
    • member assignment

리더 컨슈머의 경우, group assignment 속성을 통해 각 컨슈머 인스턴스의 파티션 할당 정보를 코디네이터에게 알린다. member id 에 따른 member assignment, 즉 파티션 정보를 메시지에 담는 것이다. 

 

코디네이터가 모든 멤버들에게 SyncGroup 요청을 받게 되면, 그 응답으로 컨슈머들에게 각자 자기가 할당받은 파티션 정보를 member_assignment 필드에 담아 전송한다. 클라이언트 단에서 이 메시지를 받게 되면, 연계된 자바 애플리케이션 내에서는 ConsumerRebalanceListener onPartitionAssigned() 메소드가 트리거된다.

 

JoinGroup 과 SyncGroup 요청 / 응답 과정으로 알 수 있듯, 코디네이터의 역할은 컨슈머 그룹 내의 리더를 선출하고, 리더가 전송한 새로운 파티션 할당 정보를 다른 멤버들에게 분배해주는 것으로 그친다. 

Fetching

이제 컨슈머 그룹에선 Rebalancing Round 를 완료하고 카프카가 관리하는 데이터를 가져올 수 있다. 이 때 Fetcher 클래스를 활용하게 된다.

 

Fetching 과정에서 poll() 메소드가 호출되면 Fetcher 는 자기가 갖고 있는 레코드를 max.poll.records 속성값만큼 반환해준다. 이 때 Fetcher 가 레코드를 가지고 있지 않아서 빈 Map 이 반환된 경우에만 브로커에서 호스팅하는, 컨슈머가 할당받은 토픽의 파티션 리더에게 요청이 전송된다. 즉, 컨슈머의 poll() 호출은 보통 Fetcher 가 가지고 있는 레코드를 반환하지만, Fetcher 가 레코드를 가지고 있지 않는 경우에만 브로커로 요청이 전송되는 구조이다. 

Rebalancing Round

이처럼 카프카에서는 1) 컨슈머 그룹 멤버 구성에 변화가 생겼을 때 2) 리소스의 재분배가 필요할 때마다 새로운 Rebalancing Round 가 발생한다. 그리고 여기에서 Rebalancing Round 자체에 해당하는 부분은 JoinGroup Request 에서 SyncGroup Response 까지를 말한다.

 

이 단계가 끝나면 다음과 같은 일이 완료된다.

 

  1. Logical 그룹 멤버십 재정의
  2. 리더 선출
  3. 리소스 재할당

하지만 이렇게 리벨런스 라운드가 발생하는 동안, 그룹에 참여하는 그 어떤 컨슈머들도 정상적인 데이터 처리를 하지 못한다는 문제를 가진다. 카프카에서는 이를 Stop The World 라는 용어로 부른다. 이는 천 개의 Connect Task 가 그룹에 존재했을 때 그 천 개의 프로세스가 전부 정상 동작하지 못하게 되는 상황을 맞이한다. 또한 이렇게 리벨런싱이 초래한 Stop The World 는 일반적인 하드웨어나 네트워크 손실 문제로 발생한 일시적인 client fail 과 더불어, scale up / down 의 상황이나 계획적인 클라이언트 start / stop / restart 의 상황에서 전부 발생할 수 있다. 

 

카프카를 사용하는 애플리케이션을 운영할 때 위 상황은 충분히 마주하게 된다. 따라서 Rebalancing 과 Stop The World 는 그 파급력을 줄일 필요가 있었는데, 다음 포스팅에서 이 Stop The World 와, 카프카에서 이 문제를 조금 완화해보고자 소개했던 프로토콜에 대해 정리해보겠다.

 

 

 

반응형