stateIn, shareIn, and SharingStarted.WhileSubscribed
stateIn, shareIn, and SharingStarted.WhileSubscribed
코틀린 Flow에는 콜드 Flow(cold flow)를 코루틴 스코프 내에서 공유되는 핫 Flow(hot flow)로 전환하는 두 가지 연산자가 있습니다. shareIn은 SharedFlow를, stateIn은 StateFlow를 만들어 냅니다. 두 연산자 모두 SharingStarted 인자를 받는데, 이 인자가 상위 스트림(upstream)을 언제 수집할지, 그리고 언제 수집을 멈추도록 허용할지를 결정합니다. 안드로이드에서 가장 흔히 쓰이는 설정인 SharingStarted.WhileSubscribed(5_000\)은 언뜻 보면 의미를 알 수 없는 매직 넘버(magic number)처럼 보이지만, 이 5초라는 값은 구성 변경(configuration change) 및 ViewModel의 생명주기와 맞물려 구체적인 역할을 수행합니다. 면접에서도 "WhileSubscribed\(5_000)의 5초는 왜 하필 5초인가요"라는 질문이 자주 등장하므로, 이 값이 단순한 관습이 아니라 명확한 근거를 가진 설정임을 설명하실 수 있어야 합니다.
이번 면접 질문을 통하여 아래 내용들을 학습하실 수 있습니다.
shareIn과stateIn이 상위 스트림 Flow에 실제로 어떤 작업을 수행하는지SharingStarted전략이 방출할 수 있는 세 가지 공유 명령(sharing command)WhileSubscribed가subscriptionCount와transformLatest위에서 어떻게 구성되는지stopTimeoutMillis와replayExpirationMillis가 왜 별개의 두 매개변수로 분리되어 있는지- 주어진 스코프에 맞춰
Eagerly,Lazily,WhileSubscribed중 무엇을 선택해야 하는지
stateIn과 shareIn이 실제로 하는 일
두 연산자는 모두 콜드 상위 스트림 Flow를 받아서, 전달받은 스코프에 단 하나의 코루틴을 실행하여 이를 수집하고, 그 값을 하위 스트림(downstream) 구독자가 관찰하는 핫 MutableSharedFlow 또는 MutableStateFlow로 전달합니다. 여러 구독자가 각각 독립적인 수집을 유발하는 것이 아니라, 동일한 상위 스트림 수집을 함께 공유하는 구조입니다.
Share.kt에 있는 shareIn의 구현을 살펴보겠습니다.
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
val config = configureSharing(replay)
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
return ReadonlySharedFlow(shared, job)
}
여기서는 세 가지 일이 일어납니다. 먼저 핫 릴레이(relay) 역할을 하는 MutableSharedFlow가 생성됩니다. 그리고 상위 스트림의 수집을 구동하기 위한 공유 코루틴이 scope에서 실행됩니다. 마지막으로 반환되는 ReadonlySharedFlow는 이 릴레이를 감싸고, 공유 Job에 대한 강한 참조(strong reference)를 보유합니다. 덕분에 공유가 활성화되어 있는 동안에는 릴레이가 가비지 컬렉션(garbage collection)으로 회수되지 않습니다.
stateIn은 구조적으로 거의 동일하지만, MutableStateFlow(initialValue)를 생성하고 launchSharing에 NO_VALUE 대신 initialValue를 전달한다는 점이 다릅니다. 이 초기값은 최초 읽기 시점뿐만 아니라, 뒤에서 다룰 리셋(reset) 동작에서도 중요한 역할을 합니다.
공유 코루틴 자체는 started 전략에 따라 분기하여 동작합니다. launchSharing에서 우리가 주목할 부분은 started가 Eagerly도 Lazily도 아닐 때 실행되는 블록입니다.
started.command(shared.subscriptionCount)
.distinctUntilChanged()
.collectLatest {
when (it) {
SharingCommand.START -> upstream.collect(shared)
SharingCommand.STOP -> { /* 취소만 하고 그 외에는 아무것도 하지 않음 */ }
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
if (initialValue === NO_VALUE) {
shared.resetReplayCache()
} else {
shared.tryEmit(initialValue)
}
}
}
}
이 전략은 릴레이의 실시간 subscriptionCount를 SharingCommand 값의 Flow로 변환합니다. 여기서 핵심은 collectLatest입니다. 새로운 명령이 도착하면, 이전 명령으로 시작된 작업이 취소됩니다. 바로 이 메커니즘 덕분에 STOP이 진행 중이던 upstream.collect(shared)를 취소할 수 있고, 빠른 재구독이 대기 중이던 중단 동작을 취소할 수 있는 것입니다.
리셋 분기를 눈여겨보시기 바랍니다. initialValue가 센티넬(sentinel) 값인 NO_VALUE일 때(shareIn의 경우)에는, 리셋이 replay 버퍼를 비웁니다. 반면 initialValue가 실제 값일 때(stateIn의 경우)에는, 리셋이 initialValue를 다시 방출하여 상태 Flow를 시작 상태로 되돌립니다.
SharingStarted 인터페이스의 계약
SharingStarted는 단 하나의 메서드만 가진 함수형 인터페이스(functional interface)입니다.
public fun interface SharingStarted {
public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}
이 메서드는 공유 릴레이의 실시간 구독자 수를 받아서 명령의 Flow를 반환합니다. SharingCommand 열거형(enum)에는 다음 세 가지 명령이 정의되어 있습니다.
- START: 상위 스트림 수집을 시작합니다.
- STOP: 상위 스트림 수집을 취소하되, 이미 replay 캐시에 들어 있는 값은 그대로 유지합니다.
- STOP_AND_RESET_REPLAY_CACHE: 수집을 취소하고 캐시된 상태를 리셋합니다(
shareIn에서는 버퍼를 비우고,stateIn에서는initialValue로 되돌립니다).
내장된 두 전략인 Eagerly와 Lazily는 단순한 명령 Flow로 구현되어 있습니다.
private class StartedEagerly : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
flowOf(SharingCommand.START)
}
private class StartedLazily : SharingStarted {
override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = unsafeFlow {
var started = false
subscriptionCount.collect { count ->
if (count > 0 && !started) {
started = true
emit(SharingCommand.START)
}
}
}
}