引言
Kotlin Flow 是 Kotlin 协程库中用于处理异步数据流的 API,它诞生于对 RxJava 的反思和对 Kotlin 语言特性的原生适配。相比 RxJava,Flow 与协程深度整合,支持结构化并发、取消传播和上下文保留,同时提供了更直觉的背压策略。在 Android 开发中,Flow 已经基本取代 RxJava 成为响应式数据绑定的首选方案。本文从设计哲学对比开始,深度剖析 Flow 的背压机制、状态流与通道的选型、以及生产环境中的异常处理模式。
一、Flow vs LiveData vs RxJava 对比
1.1 三者的设计哲学差异
理解 Flow、LiveData 和 RxJava 的本质区别,有助于在正确场景选择正确的工具:
// LiveData:生命周期感知的数据容器
// 核心语义:只要 LifecycleOwner 处于 STARTED 以上状态就发送数据
// 适合场景:UI 状态持有(Android 专用)
// RxJava 3:通用的响应式扩展库(JVM 平台)
// 核心语义:推模型 + 丰富的操作符 + 背压策略
// 适合场景:复杂的事件流变换、跨平台服务端流处理
// Kotlin Flow:协程原生的冷流
// 核心语义:冷数据流 + 结构化并发 + 取消传播
// 适合场景:协程范围内的异步数据流、Android UI 状态流
// ===== LiveData vs Flow 在 Android 中的选型 =====
// ❌ LiveData 的局限:生命周期绑死,无法在协程作用域中使用
class MyViewModel : ViewModel() {
private val _data = MutableLiveData()
val data: LiveData = _data
// 无法在 suspend 函数中直接发射
fun loadData() {
viewModelScope.launch {
// ❌ 编译错误:emit 不存在于 LiveData
_data.emit("loaded")
}
}
}
// ✅ Flow 的优势:天然支持协程生命周期 + 取消传播
class MyViewModel2 : ViewModel() {
private val _data = MutableStateFlow("")
val data: StateFlow = _data.asStateFlow()
fun loadData() {
viewModelScope.launch {
_data.value = "loaded" // 直接赋值,协程安全
}
}
}
1.2 Flow 的冷流特性
Flow 是"冷流"(Cold Stream)——上游数据源只在有下游消费者时才产生数据,且每个收集者都拥有独立的执行实例:
// Flow 是冷流:每次 collect 都重新执行
fun coldFlow(): Flow<Int> = flow {
println("🔵 [Cold] Flow 开始发射数据")
repeat(3) { i ->
delay(100)
emit(i)
}
println("🔵 [Cold] Flow 发射完毕")
}
// 每个 collect 都会重新触发 flow {} 内的代码
val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
scope.launch {
println("收集者 A 开始")
coldFlow().collect { value -> println("A 收到: $value") }
println("收集者 A 结束")
}
scope.launch {
delay(50) // 延迟 50ms 启动
println("收集者 B 开始")
coldFlow().collect { value -> println("B 收到: $value") }
println("收集者 B 结束")
}
// 输出:
// 🔵 [Cold] Flow 开始发射数据 (A)
// A 收到: 0
// 🔵 [Cold] Flow 开始发射数据 (B) ← B 的 collect 重新触发了 flow
// A 收到: 1
// B 收到: 0
// A 收到: 2
// 🔵 [Cold] Flow 发射完毕 (A)
// B 收到: 1
// B 收到: 2
// 🔵 [Cold] Flow 发射完毕 (B)
// 关键:两个收集者各自触发了一次完整的数据流执行
这与 RxJava 的 connect() 操作符形成鲜明对比——Flow 的冷流特性使测试变得极其简单,不需要手动管理订阅生命周期。
二、背压策略深度解析
2.1 背压的本质
背压(Backpressure)是生产速率 > 消费速率时的流量控制机制。在实际场景中:用户快速滚动 RecyclerView(生产者每秒发射 1000 个 item),但渲染线程(消费者)每秒只能处理 60 帧。如果不处理背压,应用要么 OOM,要么掉帧。
// ❌ 没有背压处理的危险模式
fun dangerousFlow(): Flow<UserAction> = flow {
// 高频事件源:传感器、用户快速操作、网络推送
while (true) {
val event = eventSource.nextEvent()
emit(event) // 无视下游消费能力,盲目发射
}
}
// ✅ 使用带背压的 buffer 操作符
fun safeFlow(): Flow<UserAction> = flow {
while (true) {
val event = eventSource.nextEvent()
emit(event)
}
}.buffer(capacity = 64) // 上游继续发射,下游异步消费,缓冲区大小 64
2.2 六大背压策略详解
// 策略 1:buffer() - 带缓冲的异步处理
// 语义:上游在缓冲区满之前可继续发射,满了则挂起(等同于 RxJava 的 BUFFER)
flow {
repeat(10) { i ->
println("发送: $i")
emit(i)
}
}.buffer(capacity = 3) // 缓冲区大小为 3
.collect { value ->
println("开始处理: $value")
delay(100) // 模拟耗时处理
println("处理完毕: $value")
}
// 输出分析:
// 发送: 0 → 缓冲区: [0]
// 发送: 1 → 缓冲区: [0,1]
// 发送: 2 → 缓冲区: [0,1,2]
// 发送: 3 → 缓冲区满,上游挂起等待
// 开始处理: 0
// 处理完毕: 0
// 缓冲区取走 0,上游恢复
// 发送: 3 → 缓冲区: [1,2,3]
// 发送: 4 → 缓冲区满,上游再次挂起
// ... 交替进行
// 策略 2:conflate() - 丢弃旧值,只保留最新值
// 语义:消费者来不及处理时,只保留最新值(等同于 RxJava 的 CONFLATE)
flow {
repeat(10) { i ->
println("发送: $i")
emit(i)
}
}.conflate() // 等同于 buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.collect { value ->
println("开始处理: $value")
delay(100)
println("处理完毕: $value")
}
// 输出:
// 发送: 0 → 被处理
// 发送: 1, 2, 3 → 在 conflate 缓冲区中被丢弃(只保留最新的)
// 处理期间 4, 5, 6 → 继续丢弃,只保留 6
// 处理完毕后,直接处理最新的:7, 8, 9
// 策略 3:latest / collectLatest - 丢弃旧值,切换到最新值
// 语义:消费者正在处理时,如果上游发射了新值,取消当前处理,切换到最新值
flow {
repeat(10) { i ->
println("发送: $i")
emit(i)
}
}.collectLatest { value ->
println("开始处理: $value")
delay(100)
println("处理完毕: $value")
}
// 输出:
// 发送: 0 → 开始处理: 0 (处理中)
// 发送: 1 → 处理被取消(collectLatest 取消了前一个协程)
// 发送: 2 → 处理被取消
// ... 最终只处理了 9
// 策略 4:dropLatest - 生产者端丢弃最新值
flow {
repeat(10) { i ->
emit(i)
}
}.dropLatest() // 缓冲区满时丢弃正在等待的(最新的)值
// 策略 5:dropWhile - 丢弃满足条件的初始值
flow {
emitAll(dataSource)
}.dropWhile { it.value < threshold } // 跳过未达到阈值的数据
// 策略 6:自定义背压:onBufferOverflow
flow {
// 自定义溢出行为
}.buffer(
capacity = 64,
onBufferOverflow = BufferOverflow.SUSPEND // 默认:满时挂起
// onBufferOverflow = BufferOverflow.DROP_OLDEST // 丢弃最旧的
// onBufferOverflow = BufferOverflow.DROP_LATEST // 丢弃最新的
)
// 在 Android 中的实战推荐:
@Composable
fun UserActionList(actions: Flow<UserAction>) {
// 使用 collectAsState 默认已处理背压(State 只能存储一个最新值)
val currentAction by actions.collectAsState(initial = UserAction.Empty)
// 如果需要更多控制:
LaunchedEffect(Unit) {
actions
.conflate() // 快速操作场景(滑动位置、输入事件)
.collect { action ->
handleAction(action)
}
}
}
2.3 生产环境背压策略选型指南
// 场景 1:搜索自动补全(用户输入 → API 查询)
// 推荐策略:debounce + latest
searchQuery
.debounce(300) // 等待用户停止输入 300ms
.distinctUntilChanged() // 相同内容不重复查询
.filter { it.length >= 2 } // 至少 2 个字符
.flatMapLatest { query -> // 新查询自动取消旧查询(等同于 collectLatest)
api.search(query)
}
// 场景 2:实时位置更新(GPS → UI 渲染)
// 推荐策略:conflate(只关心最新位置,不关心历史)
locationProvider.locations
.conflate() // 快速移动时丢弃中间位置
.collect { location ->
mapView.updatePosition(location)
}
// 场景 3:聊天消息流(消息队列 → 列表渲染)
// 推荐策略:buffer(capacity) 保留顺序
messageQueue.messages
.buffer(capacity = 100) // 保留最多 100 条有序消息
.collect { messageList ->
adapter.submitList(messageList)
}
// 场景 4:文件下载进度(上传多个文件 → 进度条)
// 推荐策略:带并行数的 buffer
downloadManager.downloads
.buffer(capacity = 5, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.collect { progress ->
progressBar.progress = progress
}
三、StateFlow vs SharedFlow vs Channel
3.1 核心区别与选型决策树
这是 Kotlin Flow 生态中最容易混淆的三个概念。它们的本质区别在于"热"的程度:
// ===== StateFlow = 热流 + 状态语义 =====
// 特性:始终持有当前值(初始值必须),新订阅者立即收到最新值
// 类似:LiveData(但无生命周期感知)
// 用途:UI 状态持有
private val _uiState = MutableStateFlow(UiState())
val uiState: StateFlow<UiState> = _uiState
// 特点:永远有值,订阅无延迟,自动去重(值相同时不触发重放)
_uiState.update { it.copy(loading = true) } // 结构性更新
// ===== SharedFlow = 热流 + 事件语义 =====
/*
* 特性:无初始值(可配置 replay),每个订阅者独立接收发射后的值
* 类似:RxJava Subject(BehaviorSubject + PublishSubject)
* 用途:一次性事件、跨协程通信
*/
private val _events = MutableSharedFlow<UiEvent>(
replay = 0, // 新订阅者不补发历史事件
extraBufferCapacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
// 发射事件(协程安全)
scope.launch {
_events.emit(UiEvent.ShowSnackbar("保存成功"))
}
// ===== Channel = 带缓冲的管道 =====
/*
* 特性:近似于阻塞队列(FIFO),协程间的一对一通信
* 类似:Go 的 Channel,Kotlin 的 Rendezvous(无缓冲)
* 用途:协程间点对点通信、Stream 的原始构造块
*/
private val channel = Channel<Command>(capacity = Channel.UNLIMITED)
val commands: ReceiveChannel<Command> = channel
// ===== 选型决策树 =====
//
// 是否需要持有当前状态?
// ├── 是 → 订阅者需要立即获取最新状态?
// │ └── 是 → StateFlow(UI 状态)
// │ └── 否 → 想在协程间点对点传递?
// │ └── 是 → Channel(一对一)
// │ └── 否 → SharedFlow with replay=1(跨协程广播)
// └── 否 → 一次性事件?
// └── 是 → SharedFlow replay=0(UI 一次性事件)
// └── 否 → SharedFlow with replay=N(带重放的广播)
3.2 StateFlow 在 Android 中的标准用法
// ViewModel:状态持有
@HiltViewModel
class ArticleViewModel @Inject constructor(
private val getArticles: GetArticlesUseCase,
private val saveArticle: SaveArticleUseCase
) : ViewModel() {
// UI State:完整的状态快照(推荐使用 data class)
private val _uiState = MutableStateFlow(ArticleUiState())
val uiState: StateFlow<ArticleUiState> = _uiState.asStateFlow()
// 一次性事件(使用 Channel + asSharedFlow)
private val _events = MutableSharedFlow<ArticleEvent>()
val events: SharedFlow<ArticleEvent> = _events.asSharedFlow()
fun onIntent(intent: ArticleIntent) {
when (intent) {
is ArticleIntent.Load -> loadArticles()
is ArticleIntent.Save -> saveArticle(intent.article)
is ArticleIntent.Select -> selectArticle(intent.id)
}
}
private fun loadArticles() {
viewModelScope.launch {
_uiState.update { it.copy(isLoading = true, error = null) }
getArticles()
.onSuccess { articles ->
_uiState.update {
it.copy(isLoading = false, articles = articles)
}
}
.onFailure { error ->
_uiState.update {
it.copy(isLoading = false, error = error.message)
}
_events.emit(ArticleEvent.ShowError(error.message ?: "加载失败"))
}
}
}
private fun selectArticle(id: String) {
viewModelScope.launch {
val selected = _uiState.value.articles.find { it.id == id }
_events.emit(ArticleEvent.NavigateToDetail(selected))
}
}
}
// Composable:消费 StateFlow
@Composable
fun ArticleScreen(viewModel: ArticleViewModel = hiltViewModel()) {
val uiState by viewModel.uiState.collectAsState()
val snackbarHostState = remember { SnackbarHostState() }
// 收集一次性事件
LaunchedEffect(Unit) {
viewModel.events.collect { event ->
when (event) {
is ArticleEvent.ShowError -> snackbarHostState.showSnackbar(event.message)
is ArticleEvent.NavigateToDetail -> { /* 导航 */ }
}
}
}
Scaffold(
snackbarHost = { SnackbarHost(snackbarHostState) }
) { padding ->
when {
uiState.isLoading -> LoadingIndicator()
uiState.error != null -> ErrorView(
message = uiState.error!!,
onRetry = { viewModel.onIntent(ArticleIntent.Load) }
)
else -> ArticleList(
articles = uiState.articles,
onArticleClick = { viewModel.onIntent(ArticleIntent.Select(it)) }
)
}
}
}
3.3 SharedFlow 的"冷订阅陷阱"
SharedFlow 默认 replay = 0,这意味着:如果订阅发生在事件发射之后,订阅者将错过该事件。这是一个常见 Bug 来源。
class OrderViewModel : ViewModel() {
private val _orderPlaced = MutableSharedFlow<Order>()
val orderPlaced: SharedFlow<Order> = _orderPlaced.asSharedFlow()
fun placeOrder(order: Order) {
viewModelScope.launch {
orderRepository.save(order)
_orderPlaced.emit(order) // 事件在 Composable 订阅前发射
}
}
}
// ❌ 陷阱:CollectionScreen 在 orderPlaced 发射后才订阅
@Composable
fun CollectionScreen(viewModel: OrderViewModel) {
LaunchedEffect(Unit) {
// 此时订阅,但 orderPlaced 已经 emit 了,replay=0 导致事件丢失
viewModel.orderPlaced.collect { order ->
showNotification(order) // 永远不会被调用
}
}
}
// ✅ 修复 1:使用 MutableStateFlow 替代(状态永远保留)
private val _lastOrder = MutableStateFlow<Order?>(null)
val lastOrder: StateFlow<Order?> = _lastOrder.asStateFlow()
// ✅ 修复 2:若必须用 SharedFlow,设置 replay = 1
private val _orderPlaced = MutableSharedFlow<Order>(replay = 1)
// ✅ 修复 3:使用 Event wrapper(防止冷订阅问题)
sealed class UiEvent<out T> {
data class Success<T>(val data: T) : UiEvent<T>()
data class Error(val message: String) : UiEvent<Nothing>()
}
// 在 SharedFlow 上使用.first() 消费单次事件
viewModel.events.first { it is UiEvent.Success }
四、协程上下文与 Flow 构建器
4.1 flowOn 与 context 保留
flowOn 是 Flow 中控制发射线程的核心 API。但它有一个反直觉的行为:它只改变上游的发射上下文,不影响下游收集的上下文:
fun complexFlow(): Flow<Result<Data>> = flow {
// 发射阶段:在 IO 线程执行
println("发射线程: ${Thread.currentThread().name}")
val data = withContext(Dispatchers.IO) { fetchFromNetwork() }
emit(data) // ⚠️ 仍在 IO 线程
}.flowOn(Dispatchers.IO) // 改变上游线程,下游保持不变
.map { result -> // 变换阶段:在收集线程(主线程)执行
println("map 线程: ${Thread.currentThread().name}")
result.map { it.processed() }
}
.catch { error -> // 异常处理:也在主线程执行
println("catch 线程: ${Thread.currentThread().name}")
emit(Result.failure(error))
}
CoroutineScope(Dispatchers.Main).launch {
complexFlow().collect { result ->
// 收集阶段:在主线程执行
println("collect 线程: ${Thread.currentThread().name}")
handleResult(result)
}
}
// 输出:
// 发射线程: DefaultDispatcher-worker-1 (IO 线程)
// map 线程: main (主线程)
// catch 线程: main (主线程)
// collect 线程: main (主线程)
注意:flowOn 会在底层创建一个带缓冲的 Channel(默认容量 64),这意味着它隐式包含了 buffer() 行为。如果希望精确控制缓冲大小和背压策略,应该显式组合:
// ❌ 不推荐:flowOn 的隐式 buffer 可能不符合预期
.flowOn(Dispatchers.IO)
// ✅ 推荐:显式声明 buffer 策略
.flow {
// 发射在 IO 线程
emitAll(networkDataSource.fetch())
}.buffer(
capacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST
).flowOn(Dispatchers.IO)
.map { it.processed() } // 变换在主线程
.catch { emit(Result.failure(it)) }
4.2 Flow 构建器家族
// flow{} - 最通用,支持完整发射语义(suspend emit)
flow {
// 可以调用 suspend 函数
val result = api.getData()
emit(result)
}.map { it.value } // 转换
// asFlow() - 集合/序列转 Flow
listOf(1, 2, 3).asFlow() // 同步发射
sequenceOf(1, 2, 3).asFlow() // 序列转 Flow
flowOf(1, 2, 3) // 直接发射固定值(已优化)
// channelFlow{} - 基于 Channel 的 Flow,支持多生产者
channelFlow {
// 可在协程中并发发射(不像 flow{} 必须顺序发射)
launch { send(item1) }
launch { send(item2) }
awaitAll() // 等待所有生产者完成
}
// callbackFlow{} - 回调转 Flow(替代 RxJava 的 Observable.fromCallback)
fun getSensorUpdates(): Flow<SensorEvent> = callbackFlow {
val listener = object : SensorEventListener {
override fun onEvent(event: SensorEvent) {
trySend(event) // 从回调线程发送到 Flow
}
}
sensorManager.register(listener)
awaitClose { sensorManager.unregister(listener) } // 自动取消订阅时注销
}
// tickerFlow - 定时发射(用于轮询场景)
tickerFlow(delayMillis = 1000) // 每秒发射一次 Unit
callbackFlow 是 Android 中将原生回调(Sensor、Location、Bluetooth)转换为 Flow 的标准方案。关键的 awaitClose 块保证了协程取消时资源被正确释放——这比 RxJava 的 addDisposable() 更加结构化。
4.3 transform 操作符的陷阱
每个 Flow 操作符都有自己的上下文语义,混用时容易出错:
// ❌ map / filter / take 等简单变换:
// 不改变上下文,保持上游发射时的线程
// ❌ transform:可以发射任意次(不同于 map 的 1:1)
flow {
emit(1)
emit(2)
emit(3)
}.transform { value ->
// transform 内可以多次 emit,突破了 map 的 1:1 限制
if (value > 1) {
emit("大于1: $value")
}
emit("原始值: $value")
}
// ❌ flatMapMerge vs flatMapConcat vs flatMapLatest
// flatMapMerge(capacity = 16):并发执行内部 Flow,最终按发射顺序合并
// flatMapConcat:顺序执行,上一个完成才执行下一个
// flatMapLatest:新值到达时取消前一个内部 Flow(等同于 switchMap)
// Android 中的正确用法(数据库 + 网络的双数据源)
val articles: Flow<List<Article>> = callbackFlow {
// 数据库流(Room)
val dbObserver = articleDao.observeAll().collect { dbData ->
trySend(dbData) // 优先发送本地数据
}
// 同时拉取网络数据
launch {
val networkData = api.getArticles()
trySend(networkData) // 网络数据覆盖本地
}
awaitClose { dbObserver.cancel() }
}
五、异常处理与结构化并发
5.1 Flow 异常处理层级
Flow 的异常处理分为三个层级,每层覆盖不同的错误场景:
// 第一层:上游 try-catch(在 flow {} 内捕获发射异常)
flow {
try {
emit(fetchData())
} catch (e: IOException) {
// 只捕获 emit() 本身的异常
emit(Result.failure(e))
}
}
// 第二层:catch 操作符(捕获转换异常,不会拦截发射方的 suspend 异常)
flow {
emit(dataSource.getData())
}.catch { error ->
// 捕获 map、filter 等变换过程中的异常
emit(Result.failure(error)) // 转换为错误值而非抛出
// ⚠️ 注意:catch 不拦截 flow {} 内发射代码的异常
}.retry(3) { error ->
// retry:自动重试失败的流
error is NetworkException && error.code != 401 // 401 不重试
}
// 第三层:collection 层 try-finally(最外层保障)
CoroutineScope(Dispatchers.Main).launch {
try {
dataFlow.collect { data ->
process(data)
}
} finally {
// 协程取消时执行(清理资源)
closeResources()
}
}
在 Android ViewModel 中,推荐的异常处理模式是"防御性包装":将所有可能失败的 Flow 操作包裹在 try-catch 内,并通过 StateFlow 传播错误状态,而不是让异常向上穿透导致协程崩溃:
// 使用 sealed class 统一表示 UI 状态
sealed class ArticleUiState {
data object Loading : ArticleUiState()
data class Success(val articles: List<Article>) : ArticleUiState()
data class Error(val message: String, val retry: ()->Unit) : ArticleUiState()
}
class ArticleViewModel(private val repository: ArticleRepository) : ViewModel() {
private val _uiState = MutableStateFlow<ArticleUiState>(ArticleUiState.Loading)
// 对外暴露不可变的 UI State(单一数据源)
val uiState: StateFlow<ArticleUiState> = _uiState.asStateFlow()
fun load() {
viewModelScope.launch {
_uiState.value = ArticleUiState.Loading
repository.getArticles()
.onSuccess { articles ->
_uiState.value = ArticleUiState.Success(articles)
}
.onFailure { error ->
_uiState.value = ArticleUiState.Error(
message = error.message ?: "未知错误",
retry = { load() } // 携带闭包,无需额外状态
)
}
}
}
}
5.2 结构化并发与作用域管理
Flow 与结构化并发的结合点是协程作用域。Flow 的生命周期自动绑定到启动它的协程作用域——当作用域取消时,Flow 的收集自动终止,所有未处理的发射被丢弃:
class SensorRepository(
private val scope: CoroutineScope // 由 ViewModel 注入
) {
fun observeSensor(): Flow<SensorData> = callbackFlow {
val callback = SensorCallback { data ->
trySend(data)
}
sensorManager.register(callback)
// awaitClose 确保取消时注销传感器
awaitClose {
sensorManager.unregister(callback)
}
}.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(5000), // 订阅后立即开始,停止订阅后 5 秒关闭
replay = 1 // 新订阅者收到最近的传感器数据
)
}
// ViewModel 使用 shareIn 实现热流复用
class MyViewModel(
sensorRepository: SensorRepository
) : ViewModel() {
// 多个 Composable 共享同一个传感器订阅
val sensorData: Flow<SensorData> = sensorRepository.observeSensor()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = SensorData.empty
)
// viewModelScope 取消时,所有 Flow 自动清理
// 无需手动 cancel,无内存泄漏风险
}
shareIn 和 stateIn 是将冷流转化为热流的标准 API。它们的 started 参数控制订阅策略:
WhileSubscribed(5000):有订阅者时启动,最后订阅者消失后 5 秒停止(适合耗资源的流,如传感器、数据库查询)Eagerly:立即启动,永不停止(适合全局单例的共享资源)Lazy:有订阅者时才启动,第一个订阅者消失后停止
六、生产环境最佳实践
6.1 Flow 与 Compose 的集成模式
// 在 Compose 中使用 Flow 的最佳实践
@Composable
fun ArticleScreen(viewModel: ArticleViewModel = hiltViewModel()) {
// collectAsStateWithLifecycle:遵守生命周期,只有在 STARTED+ 时才收集
// (需要依赖 lifecycle-runtime-compose 或 activity-compose)
val uiState by viewModel.uiState.collectAsStateWithLifecycle()
// 对于 StateFlow,alwaysAssignableFrom 意味着永远不会失败
// collectAsState 是简单场景的首选
// collectAsStateWithLifecycle 是生产环境(电池友好)的首选
// 推荐依赖:androidx.lifecycle:lifecycle-runtime-compose:2.7.0
val lifecycle = LocalLifecycleOwner.current.lifecycle
LaunchedEffect(lifecycle) {
lifecycle.repeatOnLifecycle(STARTED) {
viewModel.events.collect { event ->
// 生命周期感知的事件处理
handleEvent(event)
}
}
}
}
// StateFlow 监听器的生命周期边界
// STARTED: Compose 在屏幕上可见时开始收集
// RESUMED: Compose 可见且可交互时开始收集(最节能)
repeatOnLifecycle(STARTED) {
viewModel.uiState.collect { state ->
updateUi(state) // UI 更新
}
}
6.2 测试 Flow 的可靠性
class ArticleViewModelTest {
@RelaxedMockK
private lateinit var repository: ArticleRepository
@Test
fun `loadArticles success updates UI state`() = runTest {
// given
val articles = listOf(Article("1", "Test Article"))
coEvery { repository.getArticles() } returns Flow.just(articles)
// when
val viewModel = ArticleViewModel(repository)
viewModel.load()
// then(虚拟时间控制:Flow 自动在 virtualTestDispatcher 执行)
viewModel.uiState.test {
val state = awaitItem()
assertTrue(state is ArticleUiState.Loading)
val success = awaitItem()
assertTrue(success is ArticleUiState.Success)
assertEquals(1, (success as ArticleUiState.Success).articles.size)
cancelAndIgnoreRemainingEvents()
}
}
@Test
fun `loadArticles error emits error state`() = runTest {
coEvery { repository.getArticles() } returns Flow.error(IOException("Network error"))
val viewModel = ArticleViewModel(repository)
viewModel.load()
viewModel.uiState.test {
skipItems(1) // 跳过 Loading 状态
val errorState = awaitItem()
assertTrue(errorState is ArticleUiState.Error)
}
}
}
总结
Kotlin Flow 的核心优势在于与协程的无缝整合——冷流语义使测试简单、结构化并发使内存安全、背压策略使高吞吐场景可控。在 Android 中,StateFlow 是 UI 状态持有的最佳选择,SharedFlow 用于一次性事件,Channel 则用于协程间的点对点通信。熟练掌握 buffer/conflate/collectLatest 三大背压策略、shareIn/stateIn 热流转化,以及 catch/retry 的异常处理组合,是用好 Flow 的关键。
下一篇我们将探讨 Android 应用启动优化与 App Startup 库,从系统底层到用户可见首帧的全链路优化实践。