Spring Cloud Stream 도입하기

Spring Cloud Stream 도입하기

요약: 카카오페이 정보플랫폼팀이 다양한 데이터를 조합하고 처리하기 위해 Spring Cloud Stream을 활용한 경험을 공유합니다. Spring Cloud Stream의 구조적 이점과 실제 구현 사례를 통해 활용 방안을 구체적으로 전달하고자 합니다.

💡 리뷰어 한줄평

bri.ghten Spring Cloud Stream 진입장벽을 낮춰주는 좋은 글입니다. 이 글을 읽고 간단하게 Spring Cloud Stream을 연습 삼아 써 보고 도입할 만한 부분이 있는지 생각해 보는 계기가 되었습니다.

dory.m Spring Cloud Stream에 대해 들어보셨나요? KafkaListener의 대안이자 그 너머의 기능까지, 웨이드의 글이 풍부한 가이드가 되어줄 것입니다!

wonny.p Spring Cloud Stream을 도입하려는 분에게 좋은 가이드가 되어 줄 문서라 생각합니다.

시작하며

안녕하세요. 카카오페이 정보플랫폼팀 웨이드입니다.

정보플랫폼팀에서 카카오페이 내 다양한 서비스 데이터를 조합하여 외부 기관에 제출하는 업무를 맡고 있습니다.

다양한 데이터를 쉽게 조합하기 위해서 Spring Cloud Stream, Kotlin을 사용하고 있습니다. 그중에서 Spring Cloud Stream을 활용하여 쉽게 데이터를 조합하고 추출하고 파이프라인으로 데이터를 연결하여 처리하는 방법에 대해서 공유하려고 합니다.

Spring Cloud Stream 살펴보기

기본 컨셉

Spring Cloud Stream은 메시지 기반 마이크로서비스 애플리케이션을 구축하기 위한 프레임워크입니다. Spring Boot 기반으로 Spring 애플리케이션을 만들고 Spring Integration을 사용하여 메시지 브로커에 대한 연결을 제공합니다.

사용 방법은 spring-cloud-stream 라이브러리 디펜던시를 추가하면 준비가 완료됩니다. java.util.function.Function를 사용하여 기능을 구현합니다. Function 외에 Supplier, Consumer도 가능합니다.

다음은 간단한 예입니다.

@SpringBootApplication
class SampleApplication{
    @Bean
    fun uppercase(): Function<String, String> {
        return Function { value: String -> value.uppercase(Locale.getDefault()) }
    }
}

fun main(vararg args: String) {
    runApplication<SampleApplication>(*args)
}

조금 더 살펴보기

Spring Cloud Stream 구성 요소를 살펴보겠습니다.

Spring Cloud Stream 구성 요소
Spring Cloud Stream 구성 요소

우리가 개발해야 하는 부분은 Application Core(function)이고 나머지 부분은 설정으로 이루어집니다. Binder를 통해서 Kafka, RabbitMQ 같은 메시지 브로커와 애플리케이션을 연결합니다. Binder는 추상화되어 있어서 쉽게 다른 Binder로 전환이 가능합니다.

실제 도입하고 사용하는 방법은 다음 챕터에서 이야기하겠습니다.

Spring Cloud Stream 도입하기

왜 도입했는가?

다양한 데이터를 조합하기 위해서 spring-kafka 대신 Spring Cloud Stream을 선택했습니다.

그렇다면 왜 Spring Cloud Stream을 선택하고 활용하게 되었을까요? 일단 요구 사항을 살펴보도록 하겠습니다.

  1. 카카오페이 내 다양한 어드민 api를 활용하여 데이터 추출
  2. 하둡을 비롯하여 다양한 데이터 소스에서 데이터 추출
  3. 추출한 데이터를 바탕으로 데이터를 조합하고 필요하면 다른 어드민 api 호출

익숙한 스트리밍 기반 기술 중 하나는 spring-kafka입니다. 아래 예제는 카프카로 메시지를 받아서 다시 메시지로 전송하는 예제입니다.

@KafkaListener(
    topics = ["topic"],
    groupId = "group-id",
)
fun extract(request:Request) {
   val response:Response = extractPayment(request) // 데이터 추출하기
   kafkaTemplate.sendDefault(request.userId, response) // 다음 데이터 추출하기 (카프카로 전송)
}

spring-kafka는 low level로 개발하는 거라서 코딩 스타일 강제가 불가능합니다. 데이터 흐름을 코드 베이스로 파악해야 하는 어려움이 있습니다. 또한 잘못 코딩하게 되면 결합도가 높아집니다.

Spring Cloud Stream은 데이터 흐름 제어나 코딩 스타일을 어느 정도 규격화 하는 것이 가능하고 코드 결합도도 낮출 수 있습니다. 실제로 어떻게 데이터 흐름 제어나 코딩 스타일을 규격화 했는지 살펴보겠습니다.

어떻게 적용했는가?

어드민 api를 사용해서 데이터를 추출하는 예시를 들어 보겠습니다.

  1. 결제 어드민 api에서 데이터 추출
  2. 포인트 어드민 api에서 데이터 추출
  3. 하둡에서 정보 추출

기존의 추출 로직을 작성하면 이렇습니다.

  • api 내에서 순차적으로 호출하여 데이터 추출
  • 배치로 잡을 만들어서 파이프라인으로 이어서 데이터 추출
// 기존 데이터 추출하는 코드
fun extract(request:Request): Response {
  val paymentResponse = extractPayment(request) // 결제 어드민 api 호출
  val pointResponse = extractPoint(request) // 포인트 어드민 api 호출
  val hadoopResponse = extractHadoop(request) // 하둡 호출
  return Response(
      paymentResponse = paymentResponse,
      pointResponse = pointResponse,
      hadoopResponse = hadoopResponse,
    )
}

fun extractPayment(request:Request): PaymentResponse  {
    return paymentClient.getApi(request.userId)
}

fun extractPoint(request:Request): PointResponse  {
    return pointClient.getApi(request.userId)
}


fun extractHadoop(request:Request): HadoopResponse {
  return hadoopTemplate.query(request.userId)
}

이러한 경우에는 단점이 존재합니다.

  • 신규 어드민이 추가되거나 어드민에서 데이터가 변경되면, 로직을 수정하거나 데이터를 변경하는 코드를 작성해야 합니다.
  • 특정 어드민이나 하둡의 경우, 많은 리소스를 필요로 할 수 있어서 분리도 불가능하고 스케일업도 역시 불가능합니다.
  • 추출 로직은 순차적으로 실행되기 때문에 많은 시간도 필요로 합니다.

마지막으로 언급한 순차적 실행 케이스는 코루틴이나 스레드를 사용하면 여러 로직을 한 번에 실행시키는 것도 가능하지만 여기서 이 부분까지는 다루지 않겠습니다.

위 로직을 Spring Cloud Stream에서 function으로 구현해 보면 아래와 같습니다.

@Bean
fun extract(): Function<Request, Response> {
    return Function {
        val paymentResponse = extractPayment().apply(it) // 결제 어드민 api 호출
        val pointResponse = extractPoint().apply(it) // 포인트 어드민 api 호출
        val hadoopResponse = extractHadoop().apply(it) // 하둡 호출
        Response(
            paymentResponse = paymentResponse,
            pointResponse = pointResponse,
            accountResponse = accountResponse,
            hadoopResponse = hadoopResponse,
        )
    }
}
@Bean
fun extractPayment(): Function<Request, PaymentResponse> {
    return Function { request: Request -> paymentClient.getApi(request.userId) }
}

@Bean
fun extractPoint(): Function<Request, PointResponse> {
    return Function { request: Request -> pointClient.getApi(request.userId) }
}

@Bean
fun extractHadoop(): Function<Request, HadoopResponse> {
    return Function { request: Request -> hadoopTemplate.query(request.userId) }
}

아래와 같은 설정도 필요합니다.

  • bean으로 설정한 function을 definition에 넣기
  • definition 설정한 function에 in, out 정의하기

destination은 카프카에 토픽처럼 설정 가능합니다.

spring:
  cloud:
    function:
      definition: extractPayment;extractPoint;extractAccount;extractHadoop;
    stream:
      bindings:
        extractPayment-in-0:
          destination: extract-payment.v1
          group: payment-group.v1
        extractPoint-in-0:
          destination: extract-point.v1
          group: point-group.v1
        extractHadoop-in-0:
          destination: extract-hadoop.v1
          group: hadoop-group.v1

위아래 코드를 비교하면 Function으로 구현했다는 부분 외에는 큰 차이가 없습니다.

이제 한 번에 여러 로직을 호출하는 걸로 변경해 보겠습니다. 설정을 추가해야 합니다. 또한 호출하는 로직도 변경해야 합니다.

@Service
class ExtractService(
    private val streamBridge: StreamBridge,
) {
  fun extract() {
    streamBridge.send("extract-out-0", Request()) // 추출 요청하기
  }
}
spring:
  cloud:
    function:
      definition: extractPayment;extractPoint;extractAccount;extractHadoop;
    stream:
      bindings:
        extract-out-0:
          destination: extract.v1
        extractPayment-in-0:
          destination: extract.v1
          group: payment-group.v1
        extractPoint-in-0:
          destination: extract.v1
          group: point-group.v1
        extractHadoop-in-0:
          destination: extract.v1
          group: hadoop-group.v1

이런 식으로 구성하면 쉽게 비동기 메시지 방식으로 로직을 호출할 수가 있습니다. 다만 메시지로 데이터를 호출했기 때문에 결과 처리는 다르게 해야 합니다.

임시로 데이터를 보관할 장소가 필요한데 저희 팀에서는 redis를 선택하여 이곳에 데이터를 보관하였습니다. 해당 방식을 따로 다루진 않겠습니다.

이제 다음 문제를 살펴보시죠.

계정 정보는 결제 포인트에 id로만 존재합니다. 따라서 계정 api를 사용해서 사용자 정보를 결합해야 해요.

Spring Cloud Stream에서는 Function 끼리 연결할 수 있게 | 연산을 지원합니다. 아래처럼 계정을 호출할 수 있는 Function을 만들고 설정에서 이어줍니다.

@Bean
fun accountResultCovert(): Function<Response, Response> {
    return Function { response: Response -> accountClinet.getApi(response.userId) } // 계정 어드민 호출하기
}
spring:
  cloud:
    function:
      definition: extractPayment|accountResultCovert;extractPoint|accountResultCovert;extractHadoop|accountResultCovert;
    stream:
      bindings:
        extract-out-0:
          destination: extract.v1
        extractPayment|accountResultCovert-in-0:
          destination: extract.v1
          group: payment-group.v1
        extractPoint|accountResultCovert-in-0:
          destination: extract.v1
          group: point-group.v1
        extractHadoop|accountResultCovert-in-0:
          destination: extract.v1
          group: hadoop-group.v1

메시지로 전달해서 이어주는 방법도 제공됩니다. 역시 설정으로 가능합니다.

spring:
  cloud:
    function:
      definition: extractPayment;extractPoint;extractHadoop;accountResultCovert;
    stream:
      bindings:
        extract-out-0:
          destination: extract.v1
        extractPayment-in-0:
          destination: extract.v1
          group: payment-group.v1
        extractPoint-in-0:
          destination: extract.v1
          group: point-group.v1
        extractHadoop-in-0:
          destination: extract.v1
          group: hadoop-group.v1

        extractPayment-out-0:
          destination: account.v1
        extractPoint-out-0:
          destination: account.v1
        extractHadoop-out-0:
          destination: account.v1

        accountResultCovert-in-0:
          destination: account.v1

위 두 개의 차이점은 function을 바로 실행시키는가 아니면 메시지를 통해서 실행시키는가에 차이가 있습니다. 이렇듯 다양한 방법으로 데이터를 파이프라인으로 처리할 수 있게 도와줍니다.

한걸음 더 나아가기

위에서는 기본적인 Spring Cloud Stream 도입 방법을 알아보았습니다. 지금부터는 멀티 모듈과 Function을 규격화하여 확장성과 변화에 유연하게 대응하는 아키텍처로 변경해 보도록 하겠습니다.

멀티 모듈 도입

데이터 추출을 위해서 외부 어드민 api를 호출하는 상황입니다. 데이터가 표준화되어 있지 않고 응답 포맷, 응답 코드 등 다 다르게 되어 있습니다. 외부 어드민 api로 인한 코드 영향을 최소화하기 위해서 멀티 모듈로 분리하도록 하겠습니다.

├── extract-function
│   ├── build.gradle.kts
│   ├── extract-account-function
│   ├── extract-hadoop-function
│   ├── extract-payment-function
│   ├── extract-point-function
└── └── src

extract-function은 하위에 있는 function을 디펜던시로 가지게 설정합니다. spring boot가 실행할 수 있는 main 함수와 bindings 설정만 존재하게 설정합니다.

[extract-function]

@SpringBootApplication
class ExtractFunctionApplication

fun main(vararg args: String) {
    runApplication<ExtractFunctionApplication>(*args)
}

[extract-payment-function 패키지 구조]

├── client
│   ├── PaymentApiClient.kt
├── response
│   └── PaymentApiResponseDto.kt
└── function
    └── ExtractPaymentFunction.kt

이렇게 분리하면 언제든지 별도 애플리케이션으로 분리가 가능합니다. 분리하기 위해서는 각 모듈에 main 함수와 binding 설정은 따로 해야 합니다.

예외 처리

로직을 처리하다 보면 예외는 항상 발생하게 됩니다. 여기서는 다른 서비스 api를 이용하다 보니 우리의 의지와 상관없이 예외가 발생하게 됩니다.

Spring Cloud Stream에서는 기본적으로 예외 발생 시 3회 재처리하도록 설계되어 있습니다. 역시 설정으로 재처리 횟수는 변경이 가능합니다.

예외를 처리할 수 있는 방법으로 아래 2가지가 있습니다.

  1. DLQ 이용
  2. 사용자 정의 오류 핸들러

저희는 두 번째 방법을 선택해 사용자 정의로 오류를 처리하고 있습니다. 각 어드민 서비스에서 응답 실패가 발생하면 저희 쪽에서 할 수 있는 일이 없습니다. 때문에 이를 실패로 기록하고 사용자에게 재처리할 수 있는 기능을 제공합니다.

사용자 정의 오류 핸들러 역시 Function으로 구현이 가능합니다.

@Bean
fun handleErrorConsumer(): Consumer<ErrorMessage> {
    return Consumer<ErrorMessage> { errorMessage ->
            log.error()
    }
}

아래와 같이 설정도 필요합니다.

spring:
  application.name: function-extract
  cloud:
    function:
      definition:
        "extractPaymentFunction;extractPointFunction;extractHadoopFunction;\
        accountResultConvertFunction;\
        redisSaveConsumer;"
    stream:
      default.error-handler-definition: handleErrorConsumer
      bindings:
        extractPaymentFunction-in-0:
          destination: extract.v1
          group: payment-group.v1
        extractPointFunction-in-0:
          destination: extract.v1
          group: point-group.v1
        extractHadoopFunction-in-0:
          destination: extract.v1
          group: hadoop-group.v1

        extractPaymentFunction-out-0:
          destination: account.v1
        extractPointFunction-out-0:
          destination: account.v1
        extractHadoopFunction-out-0:
          destination: account.v1

        accountResultConvertFunction-in-0:
          destination: account.v1

        accountResultConvertFunction-out-0:
          destination: redis.sink

        redisSaveConsumer-in-0:
          destination: redis.sink

Function 및 데이터 규격화 하기

Function과 데이터를 규격화하여 확장성과 변화에 유연하게 대응할 수 있습니다. 새로운 어드민 api가 추가되면 Function을 확장하거나 데이터만 추가하면 됩니다.

interface ExtractFunction : Function<ExtractRequestMessage, ExtractResponseMessage> {
    /*
     * 추출 Function을 수행하기 위한 선행조건(pre-condition)을 체크한다.
     * - Function을 수행할 충분한 선행조건이 충족되지 않을 경우 NOT_VALID로 전송
     */
    val requestValid: (ExtractRequestMessage) -> Boolean

    // 추출 로직 정의
    fun doAction(request: ExtractRequestMessage): ExtractResponseMessage

    // 추출 진행 및 예외 처리
    override fun apply(request: ExtractRequestMessage): ExtractResponseMessage {
        when {
            requestValid(request) -> {
                try {
                    doAction(request)
                } catch (e: Exception) {
                    log.error("error message: ${e.message}", e)
                    responseByRequestAndStatus(request, ExtractionStatus.FAIL)
                }
            }
            else -> responseByRequestAndStatus(request, ExtractionStatus.NOT_VALID)
        }
    }

    private fun responseByRequestAndStatus(request: ExtractRequestMessage, status: ExtractionStatus) =
        ExtractResponseMessage(
            extractRequestId = request.extractRequestId,
            extractionStatus = status,
        )


}

이렇게 상위 interface를 정의하고 구현체에서는 requestValid와 doAction만 재정의해서 사용하게 됩니다.

아래는 그 예입니다.

@Component
class ExtractPaymentFunction(
    private val paymentApiClient: PaymentApiClient
) : ExtractFunction {
    private val log: Logger by lazy { LoggerFactory.getLogger(javaClass) }

    // 유효성 검사하기
    override val requestValid: (ExtractRequestMessage) -> Boolean = { true }

    // 추출 로직 작성하기
    override fun doAction(request: ExtractRequestMessage): ExtractResponseMessage {
        log.info("payment extraction request: $request")
        val paymentApiContents = paymentApiClient.getApi(request.userId)

        // 응답 데이터에 맵핑
        return ExtractResponseMessage(
            extractRequestId = request.extractRequestId,
            extractionStatus = if (paymentApiContents.isNotEmpty()) ExtractionStatus.SUCCESS else ExtractionStatus.SUCCESS_WITHOUT_RESULT,
            extractionContent = paymentApiContents.map { it.toExtractionContent() }
        )
    }

    // 데이터 변환
    private fun PaymentApiResponseDto.toExtractionContent() = PaymentContent(
        userId = id,
        tractionType = paymentType,
        transactionAmount = paymentAmount.toBigDecimal()
    )

}

이제 function을 규격화했으니 data도 같이 규격화해줍니다.

data class ExtractRequestMessage(
    val userId: Long,
    val extractRequestId: Long,
)

data class ExtractResponseMessage(
    val extractRequestId: Long,
    val extractionStatus: ExtractionStatus = ExtractionStatus.NONE,
    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
    val extractionContent: List<ExtractionContent> = listOf()
)

enum class ExtractionStatus {
    NONE, SUCCESS, SUCCESS_WITHOUT_RESULT, FAIL, NOT_VALID
}

sealed interface ExtractionContent {
    val userId: Long
    val userName: String?
}

// 결제 데이터 정의
data class PaymentContent(
    override val userId: Long,
    override val userName: String? = null,
    val tractionType: String,
    val transactionAmount: BigDecimal,
) : ExtractionContent

// 포인트 데이터 정의
data class PointContent(
    override val userId: Long,
    override val userName: String? = null,
    val pointAmount: BigDecimal,
) : ExtractionContent

// 하둡 데이터 정의
data class HadoopContent(
    override val userId: Long,
    override val userName: String? = null,
    val etcInfo: String,
) : ExtractionContent

sealed를 사용한 이유는 redis로 저장할 때 extractionContents 변환 후 암호화 하여 저장합니다.

아래 예시를 보겠습니다.

@Service
class ExtractResultHashService(private val resultHashRepository: ExtractResultHashRepository) {

    fun save(response: ExtractResponseMessage) {
        val extractionResults = response.extractionContent.map {
            it.toContentHashList() // 데이터 변환하기
        }.toList()

        val extractResultHash = ExtractResultHash(
            extractRequestId = response.extractRequestId,
            extractionResult = extractionResults
        )
        resultHashRepository.save(extractResultHash)
    }
}

private fun ExtractionContent.toContentHashList(): ExtractionContentHash {
    return when (this) { // 데이터 변환 로직
        is HadoopContent ->
            Transformer(
                inClass = HadoopContent::class,
                outclass = HadoopContentHash::class
            ).transform(this)

        is PaymentContent -> Transformer(
            inClass = PaymentContent::class,
            outclass = PaymentContentHash::class
        ).transform(this)

        is PointContent -> Transformer(
            inClass = PointContent::class,
            outclass = PointContentHash::class
        ).transform(this)
    }
}

// 데이터 복사 유틸
open class Transformer<in T : Any, out R : Any>(
    inClass: KClass<T>, outclass: KClass<R>
) {
    private val outConstructor = outclass.primaryConstructor!!
    private val inPropertiesByName: Map<String, KProperty1<T, *>> by lazy {
        inClass.memberProperties.associateBy { it.name }
    }

    fun transform(data: T): R = with(outConstructor) {
        callBy(parameters.associateWith { parameter -> argFor(parameter, data) })
    }

    open fun argFor(parameter: KParameter, data: T): Any? {
        return inPropertiesByName[parameter.name]?.get(data)
    }
}

ExtractionContent를 사용하는 when에서 나머지 조건문을 채우지 않으면 컴파일 오류가 발생합니다. 따라서 ExtractionContent를 상속하여 구현하는 클래스가 추가 되어도 when 조건문에 빠지는 실수를 방지해 줍니다.

마치며

Spring Cloud Stream을 사용하면 쉽게 데이터 파이프라인을 구성하고 조합하는 게 가능합니다. 멀티모듈, Function 규격화로 확장성과 안정성을 높일 수 있습니다.

하지만 처음부터 다 분리하고 규격화하면서 개발하는 건 너무 힘든 일입니다. 개발하면서 공통 부분과 분리할 부분을 잘 찾은 다음에 분리해도 늦지 않고, 언제든 되돌아갈 준비가 되어 있는 게 좋습니다. Spring Cloud Stream은 그런 관점에서 좋은 대안이 될 수 있습니다. 쉽게 분리 및 결합이 가능하고 제거하기도 편합니다.

전체 코드는 Github을 참고하세요.

참고 자료

wade.hong
wade.hong

카카오페이 정보플랫폼에서 개발하는 웨이드입니다. 끊임없이 성장하는 개발자가 되기 위해서 노력하고 있습니다.