본문 바로가기

Kafka

[kafka] consumer 동작 원리 및 tuning configuration(logstash)

 

kafka consumer의 동작 원리 와 tuning configuration을 logstash kafka input plugin을 통해 알아본다.

우선, kafka의 목적을 정확히 이해해야 한다.

Kafka는 버퍼의 기능이 있는데, push 방식이 아닌 consumer의 역량에 맞게 데이터를 가져가는 pull 방식을 취하고 있다.

무슨 뜻이냐면, request가 kafka broker에서 시작되는게 아니고 consumer에서 시작된다는 뜻이다.

 

데이터를 가지고오는 request를 poll이라고 하고, 보통 for 루프 안에 이 poll을 넣어 consumer의 로직을 구현한다.

이를 수도코드로 표현하면 이해하기가 더 쉬운데, 

for {

  data := consumer.poll() // data는 배열, 배열의 원소 하나에 offset 하나가 들어있음. (1 offset = 1 message)

  for i in data {

     storeDataInDatabase(i) // data를 처리하는 로직

  }

}

이런 식으로 구현한다.

 

그렇기 때문에, 보통 kafka consumer의 poll method를 호출하는 주기에 대해 검색하는 사람들이 많은데(kafka default poll interval 이런 식으로) 이 값은 찾을래야 찾을 수가 없다. (data를 처리하는 로직의 시간에 따라 달라지는 거니까)

 

카프카 브로커는 이 poll 요청을 받으면 특정 조건에 따라 데이터(offset 집합)을 반환하고, consumer는 이 데이터를 받아 처리하며 처리하는 중간 혹은 처리가 모두 끝났을때 본인이 가져온 데이터를 무사히 받았다는 메세지를 commit을 통해 전달한다. 이것이 카프카 consumer에서 데이터가 처리되는 간단한 라이프 사이클이다. (poll -> response -> 처리 -> commit)

 

 

kafka consumer의 주요 옵션을 logstash input plugin을 통해 간단히 알아보자.

1. auto_offset_reset 

consumer group으로 처음 broker에 진입했을때, 데이터를 가지고 오는 시작점을 지정할 수 있다. 보통 default가 latest로 되어있는데, broker에 데이터가 10시부터 17시까지 쌓이고 있고 consumer를 17시에 붙였다면, 10시부터 17시까지의 데이터는 가져오지 않고 17시 이후의 데이터만 가지고 온다. 특히 kafka cluster를 이전하거나 할 때 이 조건에 따라 데이터 누락이 발생할 수 있다.

개인적으로 이 기능은 무조건 earliest로 기입하는 것을 추천한다.(default는 latest로 되어있음.)

earliest 설정은 kafka broker에 consumer group이 설정되어 있지 않은 경우에만 적용되므로, 추후 consumer를 재시작하더라도 데이터 중복의 이슈는 걱정하지 않아도 된다. (재시작의 경우 consumer group에 이미 offset 정보 및 metadata가 남아있을 것이기 때문에 ealiest 설정은 적용되지 않고, 그 offset 정보를 시작점으로 해서 데이터를 가지고 온다. )

 

2. max_poll_interval_ms + max_poll_records

consumer에서 heartbeat를 주기적으로 kafka broker에 보낸다 할 지라도 poll 요청이 max_poll_interval_ms 설정된 시간만큼 일어나지 않으면 kafka broker에서는 해당 consumer에 이상이 있는 것으로 판단하고 rebalancing을 해버린다. 혹시 consumer에서 데이터를 가지고 온뒤 다음 poll까지 로직처리가 오래 걸릴 경우 이 값을 반드시 높여주어야 한다. max_poll_records 조절을 통해 처음부터 가지고 와서 처리해야하는 데이터를 줄이고, poll 주기를 앞당길 수 있다.

 

3. fetch_max_bytes + fetch_min_bytes + fetch_max_wait_ms

poll 요청을 보냈을 때 kafka broker에다가 위 세 가지 정보를 전달하면, kafka broker는 응답을 바로 주는게 아니고 위 세가지 정보를 모두 만족시키는 양 만큼의 데이터만 응답으로 전달한다. fetch_max_bytes와 fetch_min_bytes에서 한 가지 주의 할 것은 이 두 수치가 모두 절대적인 수치가 아니라는 점이다. 무슨 뜻이냐면 consumer는 fetch_max_bytes보다 큰 데이터를 받을 가능성이 있고 마찬가지로 fetc_min_bytes보다 작은 데이터를 받을 가능성도 있다.

offset(message) 하나의 크기가 fetct_max_bytes보다 클 경우 consumer는 fetch_max_bytes보다 큰 데이터를 받게 된다. fetch_max_wait_ms는 fetch_min_bytes로 인한 무한 대기 때문에 설정해 둔 것인데, producing되는 데이터가 너무 적어서 fetch_min_bytes를 채우는 것이 너무 오래걸리면 일부 데이터는 무한으로 consuming이 되지 않을 수 있다. 그래서 fetch_min_bytes의 크기가 채워지지 않더라도 fetch_max_wait_ms 시간이 되면 kafka broker는 그 동안에 쌓인 데이터만 모아서 응답한다.

 

Kafka에 poll 요청이 너무 자주 일어난다고 생각한다면, 두 가지 방법으로 이를 튜닝할 수 있다.

1. kafka broker에서 조절하는 방법

fetch_max_bytes, fetch_min_bytes, fetch_max_wait_ms를 통해 poll요청과 response 사이의 기간을 늘려 다음 poll 요청이 오래걸리도록 조절한다.

2. consumer에서 조절하는 방법

데이터 처리 로직을 늘려 poll 요청 자체를 늦추는 방법이 있다. 그 예로 logstash에서 filter에 sleep plugin을 사용하면 event(kafka에서는 offset이라고 표현하는 것을 logstash 용어로 변경하면 event임)마다 sleep을 걸 수 있는데, 실제로 이 값을 1초로 걸고 테스트해보면 poll 요청이 더 늦게 가는 것을 알 수 있다. (poll 요청이 늦게 가는 것은 kibana에 찍힌 event의 timestamp을 보면 알 수 있음. timestamp가 logstash에서 찍어주는 값이기 대문에.)

내가 테스트해봤을때는 원래 sleep을 걸지 않았을때는 timestamp가 0.3초 간격으로 발생이 되었는데, sleep을 걸고 난 뒤에는 timestamp 간격이 30초로 증가하였다.