본문 바로가기

프로그래밍 언어/Kotlin

Flow 중간 연산자

흐름 제어

1. debounce

flowmarbles

debound 연산자는 데이터 발행 후 설정한 ms 내에 추가적인 데이터가 발행되는 경우, 기존의 데이터는 무시되고 새로 발행된 데이터를 기준으로 다시 ms를 측정하여 추가 데이터가 발행되지 않는 경우 해당 데이터를 수집한다.

  • 데이터 1번 발행 후 250ms 내에 데이터 2번 발행
  • 1번 데이터는 무시, 2번 데이터 발행 후 250ms 내에 3번 데이터 발행
  • 2번 데이터 또한 무시
  • 3번 데이터 발행 이후 250ms 내에 추가적인 데이터 발행이 없었으므로 3번 데이터 수집

 

2. sample

flowmarbles

sample 연산자는 설정 시간 간격 내에서 발행 데이터의 최신 값을 수집한다.


변환

 

1. filter

filter 조건에 부합하는 데이터만 추출한다.

val flowData: Flow<Number> = flow {
        repeat(10){
            emit(it)
        }
    }.filter {
        it % 2 == 0 // 짝수만 발행
    }

 

2. map

발행 된 데이터를 입맛에 맞게 변환한다.

val flowData: Flow<Number> = flow {
        repeat(10){
            emit(it)
        }
    }.map {
        Number(it) // Int >> Number 변환
    }

 

 

3. onEach

데이터 수집 전 특정 작업을 수행하는 경우에 사용한다.


병합

1. flatMapConcat

상위 Flow 데이터 흐름에 맞춰 하위 Flow의 데이터가 방출된다.

만일 2번 데이터 발행 시 1번 데이터 발행(하위 데이터 A, B)가 아직 발행 중이라면 대기 후 발행된다.

fun main() {
    val scope = CoroutineScope(Dispatchers.IO)
    val flowData2: Flow<String> = flowOf("A", "B", "C")
        .onEach {
            println("flow2 delay")
            delay(2000)
        }

    val flowData: Flow<String> = flow {
        repeat(3){
            emit(it)
        }
    }.onEach {
        println("flow1 delay")
        delay(1000)
    }.flatMapConcat { flowData2 }

    scope.launch {
        flowData.collect{
            println(it)
        }
    }

    mainTreadRun()
}
/**
상위 Flow Delay
하위 Flow Delay
A
하위 Flow Delay
B
하위 Flow Delay
C
상위 Flow Delay
하위 Flow Delay
A
하위 Flow Delay
B
하위 Flow Delay
C
상위 Flow Delay
하위 Flow Delay
A
하위 Flow Delay
B
하위 Flow Delay
C
**/

 

2. flatMapMerge

상위 데이터 흐름에 맞춰 하위 데이터가 발행된다.

1번 데이터 발행(하위 A, B)과 상관없이 2번 데이터 발행 시점에 대기 없이 하위 데이터 A, B를 발행한다.

이때, 1번 데이터에서의 누락은 없다. (1번에서 B가 발행되기 전에 2번이 발행되어도 1번 B는 누락 없이 발행)

// 위의 그림을 코드로 구현
fun main() {
    val scope = CoroutineScope(Dispatchers.IO)

    fun flowData2(n: Int): Flow<String> = flow{
        emit("${n}번: A")
        delay(2000)
        emit("${n}번: B")
    }

    val flowData: Flow<String> = flow {
        emit(1)
        delay(1000)
        emit(2)
        delay(3000)
        emit(3)

    }.flatMapMerge{ flowData2(it) }

    scope.launch {
        flowData.collect{
            println(it)
        }
    }

    mainTreadRun()
}
/**
1번: A
2번: A
1번: B
2번: B
3번: A
3번: B
**/

 

3. flatMapLatest

상위 데이터 흐름에 맞춰 하위 데이터가 발행된다.

1번 데이터 발행(하위 A, B)이 끝나지 않았다면 나머지 하위 데이터는 누락되고, 2번 데이터 발행이 시작된다.

  • 1번 데이터 시점에서 하위 데이터 A 발행
  • A와 B 사이의 딜레이 동안 2번 데이터 시점에서 하위 데이터 A 발행
  • 1번 데이터에서 발행되지 않은 B는 누락

이외의 연산자들은 아래 링크를 통해 확인하실 수 있습니다.

https://flowmarbles.com/#merge

 

FlowMarbles

 

flowmarbles.com

https://reactivex.io/documentation/operators.html

 

ReactiveX - Operators

Introduction Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementa

reactivex.io

 

'프로그래밍 언어 > Kotlin' 카테고리의 다른 글

Flow Builder  (0) 2025.05.30
Coroutine Flow  (0) 2025.05.30
Channel  (0) 2025.05.29
[Kotlin] Scope Function  (0) 2025.05.10
runCatching 예외 처리  (0) 2025.03.08