Flow 컨텍스트 보존(Context Preservation)과 SafeCollector
Flow 컨텍스트 보존(Context Preservation)과 SafeCollector
Kotlin Flow는 엄격한 컨텍스트 보존 규칙을 적용합니다. 모든 방출(emission)은 Flow가 수집(collect)되는 코루틴 컨텍스트와 동일한 컨텍스트에서 이루어져야 합니다. 이 불변 조건(invariant)은 Flow 설계의 근간이 되는 원칙으로, 별도의 동기화 없이도 스레드 안전성을 보장하고, 연산자 체인 전반에 걸쳐 순차적 처리를 유지하며, 취소(cancellation) 전파가 올바르게 이루어지도록 합니다. SafeCollector 클래스는 이 규칙을 런타임에서 실제로 강제하는 역할을 담당하며, 모든 FlowCollector를 래핑하여 각 emit() 호출을 가로채고 검증한 뒤에야 다운스트림 연산자로 값을 전달합니다. 이번 면접 질문을 통하여 아래 내용들을 학습하실 수 있습니다.
- Flow가 스레드 안전성, 순차 실행, 취소 처리를 위해 왜 컨텍스트 보존을 요구하는지 설명할 수 있습니다.
SafeCollector가 방출을 가로채고 코루틴 컨텍스트를 런타임에서 어떻게 검증하는지 이해할 수 있습니다.- 비-
Job요소에 대한 참조 동등성(identity) 검사와Job요소에 대한 전이적 부모(transitive parent) 로직을 포함한 컨텍스트 검증 알고리즘을 추적할 수 있습니다. flowOn이 업스트림 컨텍스트를 변경하는 올바른 연산자인 이유, 그리고emit()주변에서withContext를 사용하면 안 되는 이유를 파악할 수 있습니다.- 서로 다른 컨텍스트에서 방출이 필요한 정당한 사용 사례에
channelFlow와callbackFlow를 적용할 수 있습니다.
컨텍스트 보존이 존재하는 이유
Flow의 컨텍스트 보존 규칙은 세 가지 정확성(correctness) 문제를 해결하기 위해 존재합니다. 첫째, FlowCollector는 스레드에 안전하지 않습니다. 이 인터페이스는 동기화 프리미티브(synchronization primitive) 없이 단일 suspend fun emit(value: T) 메서드만 선언하고 있습니다. scan, fold, onEach 같은 연산자들은 방출 간에 가변 상태를 유지하는데, 여러 스레드에서 동시에 접근하면 경쟁 조건(race condition)이 발생할 수 있습니다.
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
둘째, Flow 연산자들은 순차 실행을 전제로 동작합니다. .map { it * 2 }.filter { it > 1 }.collect {} 같은 체인은 값이 순서대로 하나씩 도착해야 올바르게 처리됩니다. 만약 방출이 동시에 이루어진다면 출력이 비결정적으로 뒤섞여 변환 체인이 깨질 수 있습니다. 순차적 접근 방식 덕분에 연산자들은 별도의 동기화 없이도 내부 상태를 안전하게 유지할 수 있습니다.
셋째, 취소 전파는 수집 컨텍스트에 연결되어 있습니다. 방출이 별도의 Job을 가진 다른 컨텍스트에서 이루어지면, 수집 스코프를 취소하더라도 방출 측은 취소되지 않아 리소스 누수가 발생할 수 있습니다.
val job = launch {
longRunningFlow().collect { process(it) }
}
job.cancel() // 수집과 방출 양쪽 모두 취소되어야 합니다
면접에서 이 질문에 답변하실 때는, 단순히 "컨텍스트를 보존해야 합니다"라는 결론만 말하기보다 이 세 가지 이유(스레드 안전성, 순차 실행, 취소 전파)를 함께 설명하시면 깊이 있는 답변이 됩니다.
SafeCollector 구현
flow { } 빌더로 생성된 모든 Flow는 블록에 전달하기 전에 다운스트림 FlowCollector를 SafeCollector로 래핑합니다. 이 래퍼는 모든 emit() 호출을 가로채어, 현재 코루틴 컨텍스트가 수집 시점에 캡처된 컨텍스트와 일치하는지 검증합니다.
SafeCollector 클래스는 수집 컨텍스트를 저장하고, 이후 검증에 사용할 요소 개수를 미리 계산해 둡니다.
internal class SafeCollector<T>(
internal val collector: FlowCollector<T>,
internal val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {
// 수집 컨텍스트의 요소 개수를 미리 계산
internal val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
private var lastEmissionContext: CoroutineContext? = null
emit() 오버라이드는 continuation에서 현재 코루틴 컨텍스트를 추출한 뒤, 취소 여부를 확인하고 컨텍스트 일관성을 검증합니다.
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
val currentContext = uCont.context
currentContext.ensureActive() // 취소 여부 확인
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext)
lastEmissionContext = currentContext
}
// 실제 collector로 위임
emitFun(collector, value, this as Continuation<Unit>)
}
}
}