--- title: "Kotlin Flow响应式编程与背压机制" date: "2026-04-24" readTime: "32分钟" tags: ["Kotlin", "Flow", "协程", "响应式编程", "背压", "Android"] ---

Kotlin Flow响应式编程与背压机制 - i开源架构

引言

Kotlin Flow 是 Kotlin 协程库中用于处理异步数据流的 API,它诞生于对 RxJava 的反思和对 Kotlin 语言特性的原生适配。相比 RxJava,Flow 与协程深度整合,支持结构化并发、取消传播和上下文保留,同时提供了更直觉的背压策略。在 Android 开发中,Flow 已经基本取代 RxJava 成为响应式数据绑定的首选方案。本文从设计哲学对比开始,深度剖析 Flow 的背压机制、状态流与通道的选型、以及生产环境中的异常处理模式。

一、Flow vs LiveData vs RxJava 对比

1.1 三者的设计哲学差异

理解 Flow、LiveData 和 RxJava 的本质区别,有助于在正确场景选择正确的工具:

三种方案的对比矩阵
// LiveData:生命周期感知的数据容器
// 核心语义:只要 LifecycleOwner 处于 STARTED 以上状态就发送数据
// 适合场景:UI 状态持有(Android 专用)

// RxJava 3:通用的响应式扩展库(JVM 平台)
// 核心语义:推模型 + 丰富的操作符 + 背压策略
// 适合场景:复杂的事件流变换、跨平台服务端流处理

// Kotlin Flow:协程原生的冷流
// 核心语义:冷数据流 + 结构化并发 + 取消传播
// 适合场景:协程范围内的异步数据流、Android UI 状态流

// ===== LiveData vs Flow 在 Android 中的选型 =====

// ❌ LiveData 的局限:生命周期绑死,无法在协程作用域中使用
class MyViewModel : ViewModel() {
    private val _data = MutableLiveData()
    val data: LiveData = _data
    
    // 无法在 suspend 函数中直接发射
    fun loadData() {
        viewModelScope.launch {
            // ❌ 编译错误:emit 不存在于 LiveData
            _data.emit("loaded")  
        }
    }
}

// ✅ Flow 的优势:天然支持协程生命周期 + 取消传播
class MyViewModel2 : ViewModel() {
    private val _data = MutableStateFlow("")
    val data: StateFlow = _data.asStateFlow()
    
    fun loadData() {
        viewModelScope.launch {
            _data.value = "loaded"  // 直接赋值,协程安全
        }
    }
}

1.2 Flow 的冷流特性

Flow 是"冷流"(Cold Stream)——上游数据源只在有下游消费者时才产生数据,且每个收集者都拥有独立的执行实例:

冷流 vs 热流
// Flow 是冷流:每次 collect 都重新执行
fun coldFlow(): Flow<Int> = flow {
    println("🔵 [Cold] Flow 开始发射数据")
    repeat(3) { i ->
        delay(100)
        emit(i)
    }
    println("🔵 [Cold] Flow 发射完毕")
}

// 每个 collect 都会重新触发 flow {} 内的代码
val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())

scope.launch {
    println("收集者 A 开始")
    coldFlow().collect { value -> println("A 收到: $value") }
    println("收集者 A 结束")
}

scope.launch {
    delay(50)  // 延迟 50ms 启动
    println("收集者 B 开始")
    coldFlow().collect { value -> println("B 收到: $value") }
    println("收集者 B 结束")
}

// 输出:
// 🔵 [Cold] Flow 开始发射数据  (A)
// A 收到: 0
// 🔵 [Cold] Flow 开始发射数据  (B) ← B 的 collect 重新触发了 flow
// A 收到: 1
// B 收到: 0
// A 收到: 2
// 🔵 [Cold] Flow 发射完毕  (A)
// B 收到: 1
// B 收到: 2
// 🔵 [Cold] Flow 发射完毕  (B)
// 关键:两个收集者各自触发了一次完整的数据流执行

这与 RxJava 的 connect() 操作符形成鲜明对比——Flow 的冷流特性使测试变得极其简单,不需要手动管理订阅生命周期。

二、背压策略深度解析

2.1 背压的本质

背压(Backpressure)是生产速率 > 消费速率时的流量控制机制。在实际场景中:用户快速滚动 RecyclerView(生产者每秒发射 1000 个 item),但渲染线程(消费者)每秒只能处理 60 帧。如果不处理背压,应用要么 OOM,要么掉帧。

背压场景示意
// ❌ 没有背压处理的危险模式
fun dangerousFlow(): Flow<UserAction> = flow {
    // 高频事件源:传感器、用户快速操作、网络推送
    while (true) {
        val event = eventSource.nextEvent()
        emit(event)  // 无视下游消费能力,盲目发射
    }
}

// ✅ 使用带背压的 buffer 操作符
fun safeFlow(): Flow<UserAction> = flow {
    while (true) {
        val event = eventSource.nextEvent()
        emit(event)
    }
}.buffer(capacity = 64)  // 上游继续发射,下游异步消费,缓冲区大小 64

2.2 六大背压策略详解

buffer / conflate / latest 对比
// 策略 1:buffer() - 带缓冲的异步处理
// 语义:上游在缓冲区满之前可继续发射,满了则挂起(等同于 RxJava 的 BUFFER)
flow {
    repeat(10) { i ->
        println("发送: $i")
        emit(i)
    }
}.buffer(capacity = 3)  // 缓冲区大小为 3
    .collect { value ->
        println("开始处理: $value")
        delay(100)  // 模拟耗时处理
        println("处理完毕: $value")
    }

// 输出分析:
// 发送: 0 → 缓冲区: [0]
// 发送: 1 → 缓冲区: [0,1]
// 发送: 2 → 缓冲区: [0,1,2]
// 发送: 3 → 缓冲区满,上游挂起等待
// 开始处理: 0
// 处理完毕: 0
// 缓冲区取走 0,上游恢复
// 发送: 3 → 缓冲区: [1,2,3]
// 发送: 4 → 缓冲区满,上游再次挂起
// ... 交替进行

// 策略 2:conflate() - 丢弃旧值,只保留最新值
// 语义:消费者来不及处理时,只保留最新值(等同于 RxJava 的 CONFLATE)
flow {
    repeat(10) { i ->
        println("发送: $i")
        emit(i)
    }
}.conflate()  // 等同于 buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    .collect { value ->
        println("开始处理: $value")
        delay(100)
        println("处理完毕: $value")
    }

// 输出:
// 发送: 0 → 被处理
// 发送: 1, 2, 3 → 在 conflate 缓冲区中被丢弃(只保留最新的)
// 处理期间 4, 5, 6 → 继续丢弃,只保留 6
// 处理完毕后,直接处理最新的:7, 8, 9

// 策略 3:latest / collectLatest - 丢弃旧值,切换到最新值
// 语义:消费者正在处理时,如果上游发射了新值,取消当前处理,切换到最新值
flow {
    repeat(10) { i ->
        println("发送: $i")
        emit(i)
    }
}.collectLatest { value ->
    println("开始处理: $value")
    delay(100)
    println("处理完毕: $value")
}

// 输出:
// 发送: 0 → 开始处理: 0 (处理中)
// 发送: 1 → 处理被取消(collectLatest 取消了前一个协程)
// 发送: 2 → 处理被取消
// ... 最终只处理了 9
drop 操作符族
// 策略 4:dropLatest - 生产者端丢弃最新值
flow {
    repeat(10) { i ->
        emit(i)
    }
}.dropLatest()  // 缓冲区满时丢弃正在等待的(最新的)值

// 策略 5:dropWhile - 丢弃满足条件的初始值
flow {
    emitAll(dataSource)
}.dropWhile { it.value < threshold }  // 跳过未达到阈值的数据

// 策略 6:自定义背压:onBufferOverflow
flow {
    // 自定义溢出行为
}.buffer(
    capacity = 64,
    onBufferOverflow = BufferOverflow.SUSPEND  // 默认:满时挂起
    // onBufferOverflow = BufferOverflow.DROP_OLDEST  // 丢弃最旧的
    // onBufferOverflow = BufferOverflow.DROP_LATEST   // 丢弃最新的
)

// 在 Android 中的实战推荐:
@Composable
fun UserActionList(actions: Flow<UserAction>) {
    // 使用 collectAsState 默认已处理背压(State 只能存储一个最新值)
    val currentAction by actions.collectAsState(initial = UserAction.Empty)
    
    // 如果需要更多控制:
    LaunchedEffect(Unit) {
        actions
            .conflate()  // 快速操作场景(滑动位置、输入事件)
            .collect { action ->
                handleAction(action)
            }
    }
}

2.3 生产环境背压策略选型指南

场景化背压策略
// 场景 1:搜索自动补全(用户输入 → API 查询)
// 推荐策略:debounce + latest
searchQuery
    .debounce(300)                    // 等待用户停止输入 300ms
    .distinctUntilChanged()           // 相同内容不重复查询
    .filter { it.length >= 2 }        // 至少 2 个字符
    .flatMapLatest { query ->        // 新查询自动取消旧查询(等同于 collectLatest)
        api.search(query)
    }

// 场景 2:实时位置更新(GPS → UI 渲染)
// 推荐策略:conflate(只关心最新位置,不关心历史)
locationProvider.locations
    .conflate()                       // 快速移动时丢弃中间位置
    .collect { location ->
        mapView.updatePosition(location)
    }

// 场景 3:聊天消息流(消息队列 → 列表渲染)
// 推荐策略:buffer(capacity) 保留顺序
messageQueue.messages
    .buffer(capacity = 100)          // 保留最多 100 条有序消息
    .collect { messageList ->
        adapter.submitList(messageList)
    }

// 场景 4:文件下载进度(上传多个文件 → 进度条)
// 推荐策略:带并行数的 buffer
downloadManager.downloads
    .buffer(capacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    .collect { progress ->
        progressBar.progress = progress
    }

三、StateFlow vs SharedFlow vs Channel

3.1 核心区别与选型决策树

这是 Kotlin Flow 生态中最容易混淆的三个概念。它们的本质区别在于"热"的程度:

StateFlow / SharedFlow / Channel 对比
// ===== StateFlow = 热流 + 状态语义 =====
// 特性:始终持有当前值(初始值必须),新订阅者立即收到最新值
// 类似:LiveData(但无生命周期感知)
// 用途:UI 状态持有

private val _uiState = MutableStateFlow(UiState())
val uiState: StateFlow<UiState> = _uiState

// 特点:永远有值,订阅无延迟,自动去重(值相同时不触发重放)
_uiState.update { it.copy(loading = true) }  // 结构性更新

// ===== SharedFlow = 热流 + 事件语义 =====
/*
 * 特性:无初始值(可配置 replay),每个订阅者独立接收发射后的值
 * 类似:RxJava Subject(BehaviorSubject + PublishSubject)
 * 用途:一次性事件、跨协程通信
 */
private val _events = MutableSharedFlow<UiEvent>(
    replay = 0,           // 新订阅者不补发历史事件
    extraBufferCapacity = 64,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<UiEvent> = _events.asSharedFlow()

// 发射事件(协程安全)
scope.launch {
    _events.emit(UiEvent.ShowSnackbar("保存成功"))
}

// ===== Channel = 带缓冲的管道 =====
/*
 * 特性:近似于阻塞队列(FIFO),协程间的一对一通信
 * 类似:Go 的 Channel,Kotlin 的 Rendezvous(无缓冲)
 * 用途:协程间点对点通信、Stream 的原始构造块
 */
private val channel = Channel<Command>(capacity = Channel.UNLIMITED)
val commands: ReceiveChannel<Command> = channel

// ===== 选型决策树 =====
//
// 是否需要持有当前状态?
//   ├── 是 → 订阅者需要立即获取最新状态?
//   │        └── 是 → StateFlow(UI 状态)
//   │        └── 否 → 想在协程间点对点传递?
//   │                 └── 是 → Channel(一对一)
//   │                 └── 否 → SharedFlow with replay=1(跨协程广播)
//   └── 否 → 一次性事件?
//            └── 是 → SharedFlow replay=0(UI 一次性事件)
//            └── 否 → SharedFlow with replay=N(带重放的广播)

3.2 StateFlow 在 Android 中的标准用法

StateFlow + Compose 状态绑定
// ViewModel:状态持有
@HiltViewModel
class ArticleViewModel @Inject constructor(
    private val getArticles: GetArticlesUseCase,
    private val saveArticle: SaveArticleUseCase
) : ViewModel() {

    // UI State:完整的状态快照(推荐使用 data class)
    private val _uiState = MutableStateFlow(ArticleUiState())
    val uiState: StateFlow<ArticleUiState> = _uiState.asStateFlow()

    // 一次性事件(使用 Channel + asSharedFlow)
    private val _events = MutableSharedFlow<ArticleEvent>()
    val events: SharedFlow<ArticleEvent> = _events.asSharedFlow()

    fun onIntent(intent: ArticleIntent) {
        when (intent) {
            is ArticleIntent.Load -> loadArticles()
            is ArticleIntent.Save -> saveArticle(intent.article)
            is ArticleIntent.Select -> selectArticle(intent.id)
        }
    }

    private fun loadArticles() {
        viewModelScope.launch {
            _uiState.update { it.copy(isLoading = true, error = null) }
            
            getArticles()
                .onSuccess { articles ->
                    _uiState.update {
                        it.copy(isLoading = false, articles = articles)
                    }
                }
                .onFailure { error ->
                    _uiState.update {
                        it.copy(isLoading = false, error = error.message)
                    }
                    _events.emit(ArticleEvent.ShowError(error.message ?: "加载失败"))
                }
        }
    }

    private fun selectArticle(id: String) {
        viewModelScope.launch {
            val selected = _uiState.value.articles.find { it.id == id }
            _events.emit(ArticleEvent.NavigateToDetail(selected))
        }
    }
}

// Composable:消费 StateFlow
@Composable
fun ArticleScreen(viewModel: ArticleViewModel = hiltViewModel()) {
    val uiState by viewModel.uiState.collectAsState()
    val snackbarHostState = remember { SnackbarHostState() }

    // 收集一次性事件
    LaunchedEffect(Unit) {
        viewModel.events.collect { event ->
            when (event) {
                is ArticleEvent.ShowError -> snackbarHostState.showSnackbar(event.message)
                is ArticleEvent.NavigateToDetail -> { /* 导航 */ }
            }
        }
    }

    Scaffold(
        snackbarHost = { SnackbarHost(snackbarHostState) }
    ) { padding ->
        when {
            uiState.isLoading -> LoadingIndicator()
            uiState.error != null -> ErrorView(
                message = uiState.error!!,
                onRetry = { viewModel.onIntent(ArticleIntent.Load) }
            )
            else -> ArticleList(
                articles = uiState.articles,
                onArticleClick = { viewModel.onIntent(ArticleIntent.Select(it)) }
            )
        }
    }
}

3.3 SharedFlow 的"冷订阅陷阱"

SharedFlow 默认 replay = 0,这意味着:如果订阅发生在事件发射之后,订阅者将错过该事件。这是一个常见 Bug 来源。

SharedFlow 订阅时机陷阱
class OrderViewModel : ViewModel() {
    private val _orderPlaced = MutableSharedFlow<Order>()
    val orderPlaced: SharedFlow<Order> = _orderPlaced.asSharedFlow()

    fun placeOrder(order: Order) {
        viewModelScope.launch {
            orderRepository.save(order)
            _orderPlaced.emit(order)  // 事件在 Composable 订阅前发射
        }
    }
}

// ❌ 陷阱:CollectionScreen 在 orderPlaced 发射后才订阅
@Composable
fun CollectionScreen(viewModel: OrderViewModel) {
    LaunchedEffect(Unit) {
        // 此时订阅,但 orderPlaced 已经 emit 了,replay=0 导致事件丢失
        viewModel.orderPlaced.collect { order ->
            showNotification(order)  // 永远不会被调用
        }
    }
}

// ✅ 修复 1:使用 MutableStateFlow 替代(状态永远保留)
private val _lastOrder = MutableStateFlow<Order?>(null)
val lastOrder: StateFlow<Order?> = _lastOrder.asStateFlow()

// ✅ 修复 2:若必须用 SharedFlow,设置 replay = 1
private val _orderPlaced = MutableSharedFlow<Order>(replay = 1)

// ✅ 修复 3:使用 Event wrapper(防止冷订阅问题)
sealed class UiEvent<out T> {
    data class Success<T>(val data: T) : UiEvent<T>()
    data class Error(val message: String) : UiEvent<Nothing>()
}

// 在 SharedFlow 上使用.first() 消费单次事件
viewModel.events.first { it is UiEvent.Success }

四、协程上下文与 Flow 构建器

4.1 flowOn 与 context 保留

flowOn 是 Flow 中控制发射线程的核心 API。但它有一个反直觉的行为:它只改变上游的发射上下文,不影响下游收集的上下文:

flowOn 线程切换
fun complexFlow(): Flow<Result<Data>> = flow {
    // 发射阶段:在 IO 线程执行
    println("发射线程: ${Thread.currentThread().name}")
    val data = withContext(Dispatchers.IO) { fetchFromNetwork() }
    emit(data)  // ⚠️ 仍在 IO 线程
}.flowOn(Dispatchers.IO)  // 改变上游线程,下游保持不变
    .map { result ->  // 变换阶段:在收集线程(主线程)执行
        println("map 线程: ${Thread.currentThread().name}")
        result.map { it.processed() }
    }
    .catch { error ->  // 异常处理:也在主线程执行
        println("catch 线程: ${Thread.currentThread().name}")
        emit(Result.failure(error))
    }

CoroutineScope(Dispatchers.Main).launch {
    complexFlow().collect { result ->
        // 收集阶段:在主线程执行
        println("collect 线程: ${Thread.currentThread().name}")
        handleResult(result)
    }
}

// 输出:
// 发射线程: DefaultDispatcher-worker-1 (IO 线程)
// map 线程: main (主线程)
// catch 线程: main (主线程)
// collect 线程: main (主线程)

注意:flowOn 会在底层创建一个带缓冲的 Channel(默认容量 64),这意味着它隐式包含了 buffer() 行为。如果希望精确控制缓冲大小和背压策略,应该显式组合:

flowOn 与 buffer 组合
// ❌ 不推荐:flowOn 的隐式 buffer 可能不符合预期
.flowOn(Dispatchers.IO)

// ✅ 推荐:显式声明 buffer 策略
.flow {
    // 发射在 IO 线程
    emitAll(networkDataSource.fetch())
}.buffer(
    capacity = 64,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
).flowOn(Dispatchers.IO)
    .map { it.processed() }  // 变换在主线程
    .catch { emit(Result.failure(it)) }

4.2 Flow 构建器家族

Flow 构建器对比
// flow{} - 最通用,支持完整发射语义(suspend emit)
flow {
    // 可以调用 suspend 函数
    val result = api.getData()
    emit(result)
}.map { it.value }  // 转换

// asFlow() - 集合/序列转 Flow
listOf(1, 2, 3).asFlow()           // 同步发射
sequenceOf(1, 2, 3).asFlow()      // 序列转 Flow
flowOf(1, 2, 3)                    // 直接发射固定值(已优化)

// channelFlow{} - 基于 Channel 的 Flow,支持多生产者
channelFlow {
    // 可在协程中并发发射(不像 flow{} 必须顺序发射)
    launch { send(item1) }
    launch { send(item2) }
    awaitAll()  // 等待所有生产者完成
}

// callbackFlow{} - 回调转 Flow(替代 RxJava 的 Observable.fromCallback)
fun getSensorUpdates(): Flow<SensorEvent> = callbackFlow {
    val listener = object : SensorEventListener {
        override fun onEvent(event: SensorEvent) {
            trySend(event)  // 从回调线程发送到 Flow
        }
    }
    sensorManager.register(listener)
    awaitClose { sensorManager.unregister(listener) }  // 自动取消订阅时注销
}

// tickerFlow - 定时发射(用于轮询场景)
tickerFlow(delayMillis = 1000)  // 每秒发射一次 Unit

callbackFlow 是 Android 中将原生回调(Sensor、Location、Bluetooth)转换为 Flow 的标准方案。关键的 awaitClose 块保证了协程取消时资源被正确释放——这比 RxJava 的 addDisposable() 更加结构化。

4.3 transform 操作符的陷阱

每个 Flow 操作符都有自己的上下文语义,混用时容易出错:

操作符上下文行为
// ❌ map / filter / take 等简单变换:
// 不改变上下文,保持上游发射时的线程

// ❌ transform:可以发射任意次(不同于 map 的 1:1)
flow {
    emit(1)
    emit(2)
    emit(3)
}.transform { value ->
    // transform 内可以多次 emit,突破了 map 的 1:1 限制
    if (value > 1) {
        emit("大于1: $value")
    }
    emit("原始值: $value")
}

// ❌ flatMapMerge vs flatMapConcat vs flatMapLatest
// flatMapMerge(capacity = 16):并发执行内部 Flow,最终按发射顺序合并
// flatMapConcat:顺序执行,上一个完成才执行下一个
// flatMapLatest:新值到达时取消前一个内部 Flow(等同于 switchMap)

// Android 中的正确用法(数据库 + 网络的双数据源)
val articles: Flow<List<Article>> = callbackFlow {
    // 数据库流(Room)
    val dbObserver = articleDao.observeAll().collect { dbData ->
        trySend(dbData)  // 优先发送本地数据
    }
    // 同时拉取网络数据
    launch {
        val networkData = api.getArticles()
        trySend(networkData)  // 网络数据覆盖本地
    }
    awaitClose { dbObserver.cancel() }
}

五、异常处理与结构化并发

5.1 Flow 异常处理层级

Flow 的异常处理分为三个层级,每层覆盖不同的错误场景:

三级异常处理
// 第一层:上游 try-catch(在 flow {} 内捕获发射异常)
flow {
    try {
        emit(fetchData())
    } catch (e: IOException) {
        // 只捕获 emit() 本身的异常
        emit(Result.failure(e))
    }
}

// 第二层:catch 操作符(捕获转换异常,不会拦截发射方的 suspend 异常)
flow {
    emit(dataSource.getData())
}.catch { error ->
    // 捕获 map、filter 等变换过程中的异常
    emit(Result.failure(error))  // 转换为错误值而非抛出
    // ⚠️ 注意:catch 不拦截 flow {} 内发射代码的异常
}.retry(3) { error ->
    // retry:自动重试失败的流
    error is NetworkException && error.code != 401  // 401 不重试
}

// 第三层:collection 层 try-finally(最外层保障)
CoroutineScope(Dispatchers.Main).launch {
    try {
        dataFlow.collect { data ->
            process(data)
        }
    } finally {
        // 协程取消时执行(清理资源)
        closeResources()
    }
}

在 Android ViewModel 中,推荐的异常处理模式是"防御性包装":将所有可能失败的 Flow 操作包裹在 try-catch 内,并通过 StateFlow 传播错误状态,而不是让异常向上穿透导致协程崩溃:

ViewModel 中的结构化异常处理
// 使用 sealed class 统一表示 UI 状态
sealed class ArticleUiState {
    data object Loading : ArticleUiState()
    data class Success(val articles: List<Article>) : ArticleUiState()
    data class Error(val message: String, val retry: ()->Unit) : ArticleUiState()
}

class ArticleViewModel(private val repository: ArticleRepository) : ViewModel() {
    private val _uiState = MutableStateFlow<ArticleUiState>(ArticleUiState.Loading)
    
    // 对外暴露不可变的 UI State(单一数据源)
    val uiState: StateFlow<ArticleUiState> = _uiState.asStateFlow()

    fun load() {
        viewModelScope.launch {
            _uiState.value = ArticleUiState.Loading
            repository.getArticles()
                .onSuccess { articles ->
                    _uiState.value = ArticleUiState.Success(articles)
                }
                .onFailure { error ->
                    _uiState.value = ArticleUiState.Error(
                        message = error.message ?: "未知错误",
                        retry = { load() }  // 携带闭包,无需额外状态
                    )
                }
        }
    }
}

5.2 结构化并发与作用域管理

Flow 与结构化并发的结合点是协程作用域。Flow 的生命周期自动绑定到启动它的协程作用域——当作用域取消时,Flow 的收集自动终止,所有未处理的发射被丢弃:

结构化并发保证
class SensorRepository(
    private val scope: CoroutineScope  // 由 ViewModel 注入
) {
    fun observeSensor(): Flow<SensorData> = callbackFlow {
        val callback = SensorCallback { data ->
            trySend(data)
        }
        sensorManager.register(callback)
        
        // awaitClose 确保取消时注销传感器
        awaitClose {
            sensorManager.unregister(callback)
        }
    }.shareIn(
        scope = scope,
        started = SharingStarted.WhileSubscribed(5000),  // 订阅后立即开始,停止订阅后 5 秒关闭
        replay = 1  // 新订阅者收到最近的传感器数据
    )
}

// ViewModel 使用 shareIn 实现热流复用
class MyViewModel(
    sensorRepository: SensorRepository
) : ViewModel() {
    // 多个 Composable 共享同一个传感器订阅
    val sensorData: Flow<SensorData> = sensorRepository.observeSensor()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = SensorData.empty
        )
    
    // viewModelScope 取消时,所有 Flow 自动清理
    // 无需手动 cancel,无内存泄漏风险
}

shareInstateIn 是将冷流转化为热流的标准 API。它们的 started 参数控制订阅策略:

  • WhileSubscribed(5000):有订阅者时启动,最后订阅者消失后 5 秒停止(适合耗资源的流,如传感器、数据库查询)
  • Eagerly:立即启动,永不停止(适合全局单例的共享资源)
  • Lazy:有订阅者时才启动,第一个订阅者消失后停止

六、生产环境最佳实践

6.1 Flow 与 Compose 的集成模式

Flow-State-Compose 完整链路
// 在 Compose 中使用 Flow 的最佳实践
@Composable
fun ArticleScreen(viewModel: ArticleViewModel = hiltViewModel()) {
    // collectAsStateWithLifecycle:遵守生命周期,只有在 STARTED+ 时才收集
    // (需要依赖 lifecycle-runtime-compose 或 activity-compose)
    val uiState by viewModel.uiState.collectAsStateWithLifecycle()
    
    // 对于 StateFlow,alwaysAssignableFrom 意味着永远不会失败
    // collectAsState 是简单场景的首选
    // collectAsStateWithLifecycle 是生产环境(电池友好)的首选
    // 推荐依赖:androidx.lifecycle:lifecycle-runtime-compose:2.7.0
    
    val lifecycle = LocalLifecycleOwner.current.lifecycle
    
    LaunchedEffect(lifecycle) {
        lifecycle.repeatOnLifecycle(STARTED) {
            viewModel.events.collect { event ->
                // 生命周期感知的事件处理
                handleEvent(event)
            }
        }
    }
}

// StateFlow 监听器的生命周期边界
// STARTED: Compose 在屏幕上可见时开始收集
// RESUMED: Compose 可见且可交互时开始收集(最节能)
repeatOnLifecycle(STARTED) {
    viewModel.uiState.collect { state ->
        updateUi(state)  // UI 更新
    }
}

6.2 测试 Flow 的可靠性

Flow 单元测试
class ArticleViewModelTest {
    @RelaxedMockK
    private lateinit var repository: ArticleRepository
    
    @Test
    fun `loadArticles success updates UI state`() = runTest {
        // given
        val articles = listOf(Article("1", "Test Article"))
        coEvery { repository.getArticles() } returns Flow.just(articles)
        
        // when
        val viewModel = ArticleViewModel(repository)
        viewModel.load()
        
        // then(虚拟时间控制:Flow 自动在 virtualTestDispatcher 执行)
        viewModel.uiState.test {
            val state = awaitItem()
            assertTrue(state is ArticleUiState.Loading)
            
            val success = awaitItem()
            assertTrue(success is ArticleUiState.Success)
            assertEquals(1, (success as ArticleUiState.Success).articles.size)
            
            cancelAndIgnoreRemainingEvents()
        }
    }
    
    @Test
    fun `loadArticles error emits error state`() = runTest {
        coEvery { repository.getArticles() } returns Flow.error(IOException("Network error"))
        
        val viewModel = ArticleViewModel(repository)
        viewModel.load()
        
        viewModel.uiState.test {
            skipItems(1)  // 跳过 Loading 状态
            val errorState = awaitItem()
            assertTrue(errorState is ArticleUiState.Error)
        }
    }
}

总结

Kotlin Flow 的核心优势在于与协程的无缝整合——冷流语义使测试简单、结构化并发使内存安全、背压策略使高吞吐场景可控。在 Android 中,StateFlow 是 UI 状态持有的最佳选择,SharedFlow 用于一次性事件,Channel 则用于协程间的点对点通信。熟练掌握 buffer/conflate/collectLatest 三大背压策略、shareIn/stateIn 热流转化,以及 catch/retry 的异常处理组合,是用好 Flow 的关键。

下一篇我们将探讨 Android 应用启动优化与 App Startup 库,从系统底层到用户可见首帧的全链路优化实践。