[Spring/Kafka] Spring에서 Kafka 토픽 동적으로 구독하기
in Backend on Backend, Spring, Jpa, Column-converter, Column-transformer
![[Spring/Kafka] Spring에서 Kafka 토픽 동적으로 구독하기](/backend/images/2023-01-03-kafka-topic-subscription-dynamically/title.jpg)
spring-kafka
는 편리하게 producer
와 consumer
를 구현하는 방법을 제공하는 라이브러리이다. 하지만 이 라이브러리로 런타임에 토픽을 구독하려면 어떻게 해야할까? 가끔은 원시적인 방법이 가장 쉬운 해결책이 될 수 있다.
spring-kafka
를 이용하면 Kafka 메세지를 어노테이션을 이용해 편리하게 수신 할 수 있다.
하지만 spring-kafak
라이브러리에도 한계가 있다.
데이터베이스에서 토픽 리스트를 가져와 런타임에 구독하고 싶은데, 어노테이션으로 토픽 구독을 관리하는 spring-kafka
로는 구현이 어려웠다.
spring-kafka
로 해결 해보려고 했지만, 결국 마땅한 방법을 찾지 못했다.
그래서 원시적인 방법으로 Kafka 기본 라이브러리인 kafka-client
를 활용하여 문제를 해결했다.
아래에 kafka-client
로 구현한 코드이다.
// file: 'DynamicMessageSubscribeHelper'
package com.skaiblue.message.v1.component
import com.skaiblue.common.logger.Log
import com.skaiblue.message.v1.repository.DynamicSubscriptionRepository
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.PostConstruct
import javax.annotation.PreDestroy
/**
* 카프카 토픽을 동적으로 구독할 수 있도록 돕는 클래스
*
* 초기화 할 때 데이터베이스에 저장된 구독 정보를 불러와 토픽을 구독
*/
@Component
@Transactional(readOnly = true)
class DynamicMessageSubscribeHelper(
private val dynamicSubscriptionRepository: DynamicSubscriptionRepository,
private val dynamicMessageListener: DynamicMessageListener,
private val mailKafkaConsumer: KafkaConsumer<String, Any>,
) {
private lateinit var listener: KafkaListener
private val topics = ConcurrentHashMap.newKeySet<String>()
companion object : Log
/**
* 초기화 토픽 구독
*/
@PostConstruct
fun run() = dynamicSubscriptionRepository
.findAllTopics()
.let {
topics.addAll(it)
listener = KafkaListener(mailKafkaConsumer, dynamicMessageListener, topics)
Thread(listener).apply { start() }
}
/**
* 컴포넌트 파괴 전 쓰레드 정지
*/
@PreDestroy
fun destroy() {
logger.info("Destroy DynamicMessageSubscribeHelper")
listener.stop()
mailKafkaConsumer.close()
}
/**
* 토픽 구독
*
* @param topic 토픽
*/
fun subscribe(topic: String) {
topics.add(topic)
}
/**
* 카프카 메세지를 수신하는 리스너 쓰레드
*/
private class KafkaListener(
private val mailKafkaConsumer: KafkaConsumer<String, Any>,
private val dynamicMessageListener: DynamicMessageListener,
private val topics: Set<String>
) : Runnable {
companion object : Log
private var isRunning = false
fun stop() {
isRunning = true
}
override fun run() {
isRunning = true
logger.info("Start Kafka consumer listener")
while (isRunning) {
try {
if (topics.isEmpty()) {
// 구독 중인 토픽이 없으면 5초 대기 후 continue
Thread.sleep(1000 * 5)
continue
}
//
mailKafkaConsumer.subscribe(topics)
val records = mailKafkaConsumer.poll(Duration.ofMillis(1000 * 5))
records.forEach { record ->
logger.info("Received message [${record.topic()}]: ${record.value()}")
dynamicMessageListener.send(record.topic(), record.value().toString())
}
} catch (e: Exception) {
logger.error("Error while consuming message", e)
}
}
}
}
}
// file: 'DynamicMessageSubscribeHelper'
package com.skaiblue.message.v1.component
import com.skaiblue.common.logger.Log
import com.skaiblue.common.util.CommandExecutor
import com.skaiblue.common.util.RandomGenerator
import com.skaiblue.common.util.StringSubstitutor
import com.skaiblue.mail.v1.template.MailTemplate
import com.skaiblue.message.v1.dto.MailContact
import com.skaiblue.message.v1.repository.DynamicSubscriptionRepository
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional
/**
* 카프카 메세지를 처리하는 클래스
*
* 스프링 비동기 라이브러리로 비동기적으로 처리
*/
@Component
@Transactional(readOnly = true)
class DynamicMessageListener {
companion object : Log
@Async
fun listen(topic: String, message: String) {
// 카프카 메세지 처리
}
}
출처
타이틀 이미지: Unsplash의Wil Stewart