[Spring/Kafka] Spring에서 Kafka 토픽 동적으로 구독하기

[Spring/Kafka] Spring에서 Kafka 토픽 동적으로 구독하기

spring-kafka는 편리하게 producerconsumer를 구현하는 방법을 제공하는 라이브러리이다. 하지만 이 라이브러리로 런타임에 토픽을 구독하려면 어떻게 해야할까? 가끔은 원시적인 방법이 가장 쉬운 해결책이 될 수 있다.

spring-kafka를 이용하면 Kafka 메세지를 어노테이션을 이용해 편리하게 수신 할 수 있다.

하지만 spring-kafak 라이브러리에도 한계가 있다.

데이터베이스에서 토픽 리스트를 가져와 런타임에 구독하고 싶은데, 어노테이션으로 토픽 구독을 관리하는 spring-kafka로는 구현이 어려웠다.

spring-kafka로 해결 해보려고 했지만, 결국 마땅한 방법을 찾지 못했다.

그래서 원시적인 방법으로 Kafka 기본 라이브러리인 kafka-client를 활용하여 문제를 해결했다.

아래에 kafka-client로 구현한 코드이다.

// file: 'DynamicMessageSubscribeHelper'
package com.skaiblue.message.v1.component

import com.skaiblue.common.logger.Log
import com.skaiblue.message.v1.repository.DynamicSubscriptionRepository
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy

/**
 * 카프카 토픽을 동적으로 구독할 수 있도록 돕는 클래스
 *
 * 초기화 할 때 데이터베이스에 저장된 구독 정보를 불러와 토픽을 구독
 */
@Component
@Transactional(readOnly = true)
class DynamicMessageSubscribeHelper(
    private val dynamicSubscriptionRepository: DynamicSubscriptionRepository,
    private val dynamicMessageListener: DynamicMessageListener,
    private val mailKafkaConsumer: KafkaConsumer<String, Any>,
) {

    private lateinit var listener: KafkaListener
    private val topics = ConcurrentHashMap.newKeySet<String>()

    companion object : Log

    /**
     * 초기화 토픽 구독
     */
    @PostConstruct
    fun run() = dynamicSubscriptionRepository
        .findAllTopics()
        .let {
            topics.addAll(it)
            listener = KafkaListener(mailKafkaConsumer, dynamicMessageListener, topics)
            Thread(listener).apply { start() }
        }

    /**
     * 컴포넌트 파괴 전 쓰레드 정지
     */
    @PreDestroy
    fun destroy() {
        logger.info("Destroy DynamicMessageSubscribeHelper")
        listener.stop()
        mailKafkaConsumer.close()
    }

    /**
     * 토픽 구독
     *
     * @param topic 토픽
     */
    fun subscribe(topic: String) {
        topics.add(topic)
    }

    /**
     * 카프카 메세지를 수신하는 리스너 쓰레드
     */
    private class KafkaListener(
        private val mailKafkaConsumer: KafkaConsumer<String, Any>,
        private val dynamicMessageListener: DynamicMessageListener,
        private val topics: Set<String>
    ) : Runnable {
        companion object : Log

        private var isRunning = false

        fun stop() {
            isRunning = true
        }

        override fun run() {
            isRunning = true
            logger.info("Start Kafka consumer listener")
            while (isRunning) {
                try {
                    if (topics.isEmpty()) {
                        // 구독 중인 토픽이 없으면 5초 대기 후 continue
                        Thread.sleep(1000 * 5)
                        continue
                    }
                    //
                    mailKafkaConsumer.subscribe(topics)
                    val records = mailKafkaConsumer.poll(Duration.ofMillis(1000 * 5))

                    records.forEach { record ->
                        logger.info("Received message [${record.topic()}]: ${record.value()}")
                        dynamicMessageListener.send(record.topic(), record.value().toString())
                    }
                } catch (e: Exception) {
                    logger.error("Error while consuming message", e)
                }
            }
        }
    }
}
// file: 'DynamicMessageSubscribeHelper'
package com.skaiblue.message.v1.component

import com.skaiblue.common.logger.Log
import com.skaiblue.common.util.CommandExecutor
import com.skaiblue.common.util.RandomGenerator
import com.skaiblue.common.util.StringSubstitutor
import com.skaiblue.mail.v1.template.MailTemplate
import com.skaiblue.message.v1.dto.MailContact
import com.skaiblue.message.v1.repository.DynamicSubscriptionRepository
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

/**
 * 카프카 메세지를 처리하는 클래스
 * 
 * 스프링 비동기 라이브러리로 비동기적으로 처리
 */
@Component
@Transactional(readOnly = true)
class DynamicMessageListener {

    companion object : Log

    @Async
    fun listen(topic: String, message: String) {
        // 카프카 메세지 처리
    }
}

출처

타이틀 이미지: UnsplashWil Stewart