协程基础:轻量级并发编程

Kotlin协程(Coroutines)是Android异步编程的革命性解决方案。相比传统的回调地狱和RxJava的复杂链式调用,协程提供了以同步方式编写异步代码的能力,让代码逻辑更加清晰直观。协程的本质是轻量级线程,可以在单线程上高效调度成千上万个协程,而不会造成系统资源枯竭。

协程核心优势

  • 轻量级:协程的创建成本远低于线程,内存占用仅几百字节
  • 结构化并发:协程具有明确的父子关系,父协程取消时自动取消子协程
  • 挂起而非阻塞:协程挂起时不会阻塞线程,线程可执行其他任务
  • 异常传播透明:异常自动向上传播,便于统一处理

协程基础使用

在Android中使用协程需要添加依赖并理解几个核心概念:CoroutineScope、Job、Deferred和Dispatchers。

// build.gradle 依赖配置
dependencies {
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3"
    implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.6.2"
    implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.6.2"
}

// ViewModel中使用协程
class UserViewModel : ViewModel() {
    
    /**
     * viewModelScope 是Lifecycle提供的协程作用域
     * 当ViewModel被清除时,所有协程自动取消
     */
    fun fetchUserData(userId: String) {
        viewModelScope.launch {
            try {
                // 在主线程启动协程
                val user = withContext(Dispatchers.IO) {
                    // 切换到IO线程执行网络请求
                    userRepository.getUser(userId)
                }
                // 自动回到主线程更新UI
                _userLiveData.value = user
            } catch (e: Exception) {
                _errorLiveData.value = e.message
            }
        }
    }
}

// Activity中使用lifecycleScope
class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        
        lifecycleScope.launch {
            // 在STARTED状态时执行
            val data = loadData()
            updateUI(data)
        }
        
        // 当生命周期进入DESTROYED时自动取消
        lifecycleScope.launchWhenResumed {
            // 仅在RESUMED状态时执行
            startAnimation()
        }
    }
}

协程构建器与调度器

Kotlin提供了多种协程构建器来满足不同的并发需求,每种构建器都有其特定的使用场景和返回值类型。

核心构建器对比

构建器 返回值 使用场景 异常处理
launch Job 不需要返回值的并发任务 异常自动传播
async Deferred<T> 需要返回值的异步计算 异常包装在Deferred中
runBlocking T 桥接阻塞与非阻塞代码(测试/主函数) 直接抛出异常
withContext T 切换上下文执行代码块 直接抛出异常
// async/await 并行执行示例
class Repository {
    
    suspend fun fetchDashboardData(): DashboardData = coroutineScope {
        // 同时启动两个并行请求
        val userDeferred = async { userApi.getCurrentUser() }
        val ordersDeferred = async { orderApi.getRecentOrders() }
        
        // 等待两个请求都完成
        val user = userDeferred.await()
        val orders = ordersDeferred.await()
        
        DashboardData(user, orders)
    }
    
    // 超时处理
    suspend fun fetchWithTimeout(): Data = withTimeout(5000) {
        api.fetchData()
    }
    
    // 可取消的超时
    suspend fun fetchWithTimeoutOrNull(): Data? = withTimeoutOrNull(3000) {
        api.fetchData()
    }
}

// 自定义CoroutineScope
class MyPresenter : CoroutineScope by MainScope() {
    
    fun loadData() {
        launch {
            val result = withContext(Dispatchers.Default) {
                // CPU密集型计算
                heavyComputation()
            }
            updateView(result)
        }
    }
    
    fun destroy() {
        // 取消所有协程
        cancel()
    }
}

调度器选择建议

  • Dispatchers.Main:UI更新、轻量级操作(避免耗时操作)
  • Dispatchers.IO:网络请求、文件读写、数据库操作
  • Dispatchers.Default:CPU密集型计算、复杂数据处理
  • Dispatchers.Unconfined:不指定线程,从调用线程开始执行

Flow:Kotlin响应式流

Flow是Kotlin提供的冷流(Cold Stream)实现,专为异步数据流设计。与LiveData相比,Flow更加灵活,支持复杂的操作符链式调用,并且可以无缝集成到MVVM架构中。Flow是协程的扩展,天然支持协程的所有特性。

Flow核心特性

  • 冷流特性:每次收集都会重新执行上游代码
  • 背压处理:自动处理生产者和消费者速度不匹配
  • 操作符丰富:map、filter、debounce、combine等
  • 协程集成:完全基于协程构建
// Flow基本使用
class UserRepository {
    
    // 创建Flow
    fun getUsersFlow(): Flow<List<User>> = flow {
        // 模拟网络请求
        val users = api.fetchUsers()
        emit(users) // 发射数据
    }.flowOn(Dispatchers.IO) // 在IO线程执行
    
    // 带状态的Flow
    fun searchUsers(query: String): Flow<Result<List<User>>> = flow {
        emit(Result.Loading)
        try {
            val users = api.searchUsers(query)
            emit(Result.Success(users))
        } catch (e: Exception) {
            emit(Result.Error(e))
        }
    }.flowOn(Dispatchers.IO)
}

// ViewModel中使用Flow
class SearchViewModel : ViewModel() {
    
    private val _searchQuery = MutableStateFlow("")
    val searchQuery: StateFlow<String> = _searchQuery.asStateFlow()
    
    // 搜索结果的Flow,自动响应查询变化
    val searchResults: StateFlow<Result<List<User>>> = _searchQuery
        .debounce(300) // 防抖300ms
        .filter { it.length >= 2 } // 至少2个字符才搜索
        .distinctUntilChanged() // 重复值不重新搜索
        .flatMapLatest { query ->
            // 新查询到来时取消上一个请求
            repository.searchUsers(query)
        }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = Result.Success(emptyList())
        )
    
    fun setSearchQuery(query: String) {
        _searchQuery.value = query
    }
}

// Activity/Fragment中收集Flow
class SearchFragment : Fragment() {
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        
        // 使用lifecycleScope收集Flow
        viewLifecycleOwner.lifecycleScope.launch {
            viewModel.searchResults.collect { result ->
                when (result) {
                    is Result.Loading -> showLoading()
                    is Result.Success -> showUsers(result.data)
                    is Result.Error -> showError(result.exception)
                }
            }
        }
        
        // 使用repeatOnLifecycle在特定状态收集
        viewLifecycleOwner.lifecycleScope.launch {
            viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.searchResults.collect { updateUI(it) }
            }
        }
    }
}

Flow操作符实战

Flow提供了丰富的操作符来处理数据流,掌握这些操作符对于编写高效的响应式代码至关重要。

常用操作符分类

// 变换操作符
val transformedFlow = originalFlow
    .map { it * 2 }                    // 转换每个元素
    .filter { it > 10 }                // 过滤元素
    .take(5)                           // 只取前5个
    .drop(2)                           // 跳过前2个
    .distinctUntilChanged()            // 去重连续重复值

// 组合操作符
val combinedFlow = combine(flow1, flow2) { a, b ->
    a + b  // 合并两个流的最新值
}

val zippedFlow = flow1.zip(flow2) { a, b ->
    Pair(a, b)  // 按对组合
}

// 展平操作符
val flatMapped = flowOf(1, 2, 3)
    .flatMapConcat { value ->
        // 顺序执行每个内部Flow
        flow { emit(value * 10); delay(100); emit(value * 100) }
    }

val flatMappedLatest = searchQueryFlow
    .flatMapLatest { query ->
        // 新值到来时取消之前的内部Flow
        repository.search(query)
    }

// 缓冲与背压
val bufferedFlow = dataFlow
    .buffer(100)           // 缓冲100个元素
    .conflate()            // 只保留最新值(跳过中间值)
    .collectLatest { value ->
        // 新值到来时取消当前处理
        processValue(value)
    }

// 错误处理
val safeFlow = riskyFlow
    .catch { e ->
        // 捕获异常,可以发射默认值或重新抛出
        emit(defaultValue)
        logError(e)
    }
    .retry(3) { cause ->
        // 重试3次
        cause is IOException
    }
    .retryWhen { cause, attempt ->
        // 自定义重试逻辑
        attempt < 3 && cause is NetworkException
    }

Flow收集注意事项

  • Flow是冷流,每次collect都会重新执行上游代码
  • 在UI层收集Flow时,务必使用repeatOnLifecycle避免内存泄漏
  • StateFlow和SharedFlow是热流,适合在ViewModel中作为状态管理
  • 避免在Flow中使用阻塞操作,应该使用挂起函数

StateFlow与SharedFlow

StateFlow和SharedFlow是Flow的热流(Hot Stream)实现,适合在ViewModel中管理UI状态。StateFlow是有状态的,总是保存最新值;SharedFlow更通用,支持配置重播策略。

// StateFlow - 状态管理
class ProfileViewModel : ViewModel() {
    
    private val _uiState = MutableStateFlow(ProfileUiState())
    val uiState: StateFlow<ProfileUiState> = _uiState.asStateFlow()
    
    fun loadProfile(userId: String) {
        viewModelScope.launch {
            _uiState.update { it.copy(isLoading = true) }
            
            try {
                val profile = repository.getProfile(userId)
                _uiState.update { 
                    it.copy(
                        isLoading = false,
                        profile = profile,
                        error = null
                    )
                }
            } catch (e: Exception) {
                _uiState.update {
                    it.copy(
                        isLoading = false,
                        error = e.message
                    )
                }
            }
        }
    }
    
    data class ProfileUiState(
        val isLoading: Boolean = false,
        val profile: UserProfile? = null,
        val error: String? = null
    )
}

// SharedFlow - 事件通知
class EventViewModel : ViewModel() {
    
    // 配置重播最近1个事件,缓存100个额外事件
    private val _events = MutableSharedFlow<Event>(
        replay = 1,
        extraBufferCapacity = 100,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    val events: SharedFlow<Event> = _events.asSharedFlow()
    
    fun showToast(message: String) {
        _events.tryEmit(Event.ShowToast(message))
    }
    
    fun navigateToDetail(id: String) {
        _events.tryEmit(Event.NavigateToDetail(id))
    }
    
    sealed class Event {
        data class ShowToast(val message: String) : Event()
        data class NavigateToDetail(val id: String) : Event()
    }
}

// Fragment中处理事件
class MyFragment : Fragment() {
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        
        // 收集状态
        viewLifecycleOwner.lifecycleScope.launch {
            viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.uiState.collect { state ->
                    updateUI(state)
                }
            }
        }
        
        // 收集一次性事件
        viewLifecycleOwner.lifecycleScope.launch {
            viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.events.collect { event ->
                    when (event) {
                        is Event.ShowToast -> {
                            Toast.makeText(context, event.message, Toast.LENGTH_SHORT).show()
                        }
                        is Event.NavigateToDetail -> {
                            findNavController().navigate(
                                R.id.action_to_detail,
                                bundleOf("id" to event.id)
                            )
                        }
                    }
                }
            }
        }
    }
}

实战:完整MVVM架构示例

下面展示一个完整的MVVM架构示例,集成协程、Flow、Retrofit和Room,实现一个功能完整的用户列表页面。

// 数据层 - Room数据库
@Dao
interface UserDao {
    @Query("SELECT * FROM users")
    fun getAllUsers(): Flow<List<UserEntity>>
    
    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertUsers(users: List<UserEntity>)
    
    @Query("SELECT * FROM users WHERE name LIKE '%' || :query || '%'")
    fun searchUsers(query: String): Flow<List<UserEntity>>
}

// 数据层 - Retrofit API
interface UserApi {
    @GET("users")
    suspend fun getUsers(): List<UserDto>
    
    @GET("users/{id}")
    suspend fun getUser(@Path("id") id: String): UserDto
}

// Repository层
class UserRepositoryImpl @Inject constructor(
    private val userApi: UserApi,
    private val userDao: UserDao,
    private val connectivityObserver: ConnectivityObserver
) : UserRepository {
    
    override fun getUsers(): Flow<Result<List<User>>> = flow {
        emit(Result.Loading)
        
        // 先加载本地缓存
        val localUsers = userDao.getAllUsers().first().map { it.toDomain() }
        if (localUsers.isNotEmpty()) {
            emit(Result.Success(localUsers))
        }
        
        // 检查网络并刷新
        if (connectivityObserver.isOnline) {
            try {
                val remoteUsers = userApi.getUsers().map { it.toDomain() }
                userDao.insertUsers(remoteUsers.map { it.toEntity() })
                emit(Result.Success(remoteUsers))
            } catch (e: Exception) {
                if (localUsers.isEmpty()) {
                    emit(Result.Error(e))
                }
            }
        }
    }.flowOn(Dispatchers.IO)
    
    override fun searchUsers(query: String): Flow<List<User>> {
        return userDao.searchUsers(query)
            .map { entities -> entities.map { it.toDomain() } }
            .flowOn(Dispatchers.IO)
    }
}

// ViewModel层
@HiltViewModel
class UserListViewModel @Inject constructor(
    private val repository: UserRepository
) : ViewModel() {
    
    private val _searchQuery = MutableStateFlow("")
    
    val uiState: StateFlow<UserListUiState> = combine(
        repository.getUsers(),
        _searchQuery
    ) { result, query ->
        when (result) {
            is Result.Loading -> UserListUiState(isLoading = true)
            is Result.Success -> {
                val filtered = if (query.isBlank()) {
                    result.data
                } else {
                    result.data.filter { it.name.contains(query, ignoreCase = true) }
                }
                UserListUiState(users = filtered)
            }
            is Result.Error -> UserListUiState(error = result.exception.message)
        }
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = UserListUiState(isLoading = true)
    )
    
    fun onSearchQueryChange(query: String) {
        _searchQuery.value = query
    }
    
    data class UserListUiState(
        val isLoading: Boolean = false,
        val users: List<User> = emptyList(),
        val error: String? = null
    )
}

最佳实践总结

  • 使用viewModelScope和lifecycleScope管理协程生命周期
  • Repository返回Flow,在ViewModel中转换为StateFlow
  • 使用stateIn操作符将冷流转换为热流,避免重复计算
  • UI层使用repeatOnLifecycle收集Flow,避免内存泄漏
  • 使用Result包装类统一处理加载、成功、错误状态