Interview QuestionPractical QuestionFollow-up Questions

Flow Context Preservation and SafeCollector

skydovesJaewoong Eum (skydoves)||11 min read

Flow Context Preservation and SafeCollector

Kotlin Flow enforces a strict context preservation rule: every emission must occur in the same coroutine context where the flow is collected. This invariant is fundamental to Flow's design, ensuring thread safety without synchronization, sequential processing through operator chains, and correct cancellation propagation. The SafeCollector class is the runtime enforcer of this rule, wrapping every FlowCollector to intercept and validate each emit() call before it reaches downstream operators. By the end of this lesson, you will be able to:

  • Explain why Flow requires context preservation for thread safety, sequential execution, and cancellation.
  • Describe how SafeCollector intercepts emissions and validates coroutine context at runtime.
  • Trace the context validation algorithm, including identity checks for non-Job elements and transitive parent logic for Job elements.
  • Identify why flowOn is the correct operator for changing upstream context, and why withContext around emit() is prohibited.
  • Apply channelFlow and callbackFlow when legitimate use cases require emitting from different contexts.

Why Context Preservation Exists

Flow's context preservation rule addresses three correctness concerns. First, FlowCollector is not thread safe. The interface declares a single suspend fun emit(value: T) with no synchronization primitives. Operators like scan, fold, and onEach maintain mutable state across emissions, and concurrent access from multiple threads would introduce race conditions:

public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

Second, Flow operators assume sequential execution. A chain like .map { it * 2 }.filter { it > 1 }.collect {} depends on values arriving one at a time in order. If emissions were concurrent, outputs could interleave non-deterministically, breaking transformation chains. Sequential access allows operators to maintain internal state without synchronization.

Third, cancellation propagation is tied to the collection context. If emissions occurred in a different context with a separate Job, cancelling the collection scope would not cancel the emission side, leading to resource leaks:

val job = launch {
    longRunningFlow().collect { process(it) }
}
job.cancel() // Must cancel both collector AND emissions

SafeCollector Implementation

Every flow built with the flow { } builder wraps the downstream FlowCollector in a SafeCollector before passing it to the block. This wrapper intercepts every emit() call to validate that the current coroutine context matches the context captured at collection time.

The SafeCollector class stores the collection context and precomputes its element count for later validation:

internal class SafeCollector<T>(
    internal val collector: FlowCollector<T>,
    internal val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) {

    internal val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    private var lastEmissionContext: CoroutineContext? = null

The emit() override extracts the current coroutine context from the continuation, checks cancellation, and validates context consistency:

    override suspend fun emit(value: T) {
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            val currentContext = uCont.context
            currentContext.ensureActive() // Check cancellation

            val previousContext = lastEmissionContext
            if (previousContext !== currentContext) {
                checkContext(currentContext)
                lastEmissionContext = currentContext
            }

            // Delegate to actual collector
            emitFun(collector, value, this as Continuation<Unit>)
        }
    }
}

The collectContext field captures the context at collect() time, establishing the validation baseline. The collectContextSize precomputes the element count so validation can detect additions or removals without iterating twice. The lastEmissionContext caches the previous emission's context for a fast reference-equality shortcut: if unchanged, the full validation is skipped. The use of suspendCoroutineUninterceptedOrReturn avoids continuation allocations in the hot path.

This interview continues for subscribers

Subscribe to Dove Letter for full access to exclusive interviews about Android and Kotlin development.

Become a Sponsor