协程基础:轻量级并发编程
Kotlin协程(Coroutines)是Android异步编程的革命性解决方案。相比传统的回调地狱和RxJava的复杂链式调用,协程提供了以同步方式编写异步代码的能力,让代码逻辑更加清晰直观。协程的本质是轻量级线程,可以在单线程上高效调度成千上万个协程,而不会造成系统资源枯竭。
协程核心优势
- 轻量级:协程的创建成本远低于线程,内存占用仅几百字节
- 结构化并发:协程具有明确的父子关系,父协程取消时自动取消子协程
- 挂起而非阻塞:协程挂起时不会阻塞线程,线程可执行其他任务
- 异常传播透明:异常自动向上传播,便于统一处理
协程基础使用
在Android中使用协程需要添加依赖并理解几个核心概念:CoroutineScope、Job、Deferred和Dispatchers。
// build.gradle 依赖配置
dependencies {
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3"
implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.6.2"
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.6.2"
}
// ViewModel中使用协程
class UserViewModel : ViewModel() {
/**
* viewModelScope 是Lifecycle提供的协程作用域
* 当ViewModel被清除时,所有协程自动取消
*/
fun fetchUserData(userId: String) {
viewModelScope.launch {
try {
// 在主线程启动协程
val user = withContext(Dispatchers.IO) {
// 切换到IO线程执行网络请求
userRepository.getUser(userId)
}
// 自动回到主线程更新UI
_userLiveData.value = user
} catch (e: Exception) {
_errorLiveData.value = e.message
}
}
}
}
// Activity中使用lifecycleScope
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
// 在STARTED状态时执行
val data = loadData()
updateUI(data)
}
// 当生命周期进入DESTROYED时自动取消
lifecycleScope.launchWhenResumed {
// 仅在RESUMED状态时执行
startAnimation()
}
}
}
协程构建器与调度器
Kotlin提供了多种协程构建器来满足不同的并发需求,每种构建器都有其特定的使用场景和返回值类型。
核心构建器对比
| 构建器 | 返回值 | 使用场景 | 异常处理 |
|---|---|---|---|
| launch | Job | 不需要返回值的并发任务 | 异常自动传播 |
| async | Deferred<T> | 需要返回值的异步计算 | 异常包装在Deferred中 |
| runBlocking | T | 桥接阻塞与非阻塞代码(测试/主函数) | 直接抛出异常 |
| withContext | T | 切换上下文执行代码块 | 直接抛出异常 |
// async/await 并行执行示例
class Repository {
suspend fun fetchDashboardData(): DashboardData = coroutineScope {
// 同时启动两个并行请求
val userDeferred = async { userApi.getCurrentUser() }
val ordersDeferred = async { orderApi.getRecentOrders() }
// 等待两个请求都完成
val user = userDeferred.await()
val orders = ordersDeferred.await()
DashboardData(user, orders)
}
// 超时处理
suspend fun fetchWithTimeout(): Data = withTimeout(5000) {
api.fetchData()
}
// 可取消的超时
suspend fun fetchWithTimeoutOrNull(): Data? = withTimeoutOrNull(3000) {
api.fetchData()
}
}
// 自定义CoroutineScope
class MyPresenter : CoroutineScope by MainScope() {
fun loadData() {
launch {
val result = withContext(Dispatchers.Default) {
// CPU密集型计算
heavyComputation()
}
updateView(result)
}
}
fun destroy() {
// 取消所有协程
cancel()
}
}
调度器选择建议
- Dispatchers.Main:UI更新、轻量级操作(避免耗时操作)
- Dispatchers.IO:网络请求、文件读写、数据库操作
- Dispatchers.Default:CPU密集型计算、复杂数据处理
- Dispatchers.Unconfined:不指定线程,从调用线程开始执行
Flow:Kotlin响应式流
Flow是Kotlin提供的冷流(Cold Stream)实现,专为异步数据流设计。与LiveData相比,Flow更加灵活,支持复杂的操作符链式调用,并且可以无缝集成到MVVM架构中。Flow是协程的扩展,天然支持协程的所有特性。
Flow核心特性
- 冷流特性:每次收集都会重新执行上游代码
- 背压处理:自动处理生产者和消费者速度不匹配
- 操作符丰富:map、filter、debounce、combine等
- 协程集成:完全基于协程构建
// Flow基本使用
class UserRepository {
// 创建Flow
fun getUsersFlow(): Flow<List<User>> = flow {
// 模拟网络请求
val users = api.fetchUsers()
emit(users) // 发射数据
}.flowOn(Dispatchers.IO) // 在IO线程执行
// 带状态的Flow
fun searchUsers(query: String): Flow<Result<List<User>>> = flow {
emit(Result.Loading)
try {
val users = api.searchUsers(query)
emit(Result.Success(users))
} catch (e: Exception) {
emit(Result.Error(e))
}
}.flowOn(Dispatchers.IO)
}
// ViewModel中使用Flow
class SearchViewModel : ViewModel() {
private val _searchQuery = MutableStateFlow("")
val searchQuery: StateFlow<String> = _searchQuery.asStateFlow()
// 搜索结果的Flow,自动响应查询变化
val searchResults: StateFlow<Result<List<User>>> = _searchQuery
.debounce(300) // 防抖300ms
.filter { it.length >= 2 } // 至少2个字符才搜索
.distinctUntilChanged() // 重复值不重新搜索
.flatMapLatest { query ->
// 新查询到来时取消上一个请求
repository.searchUsers(query)
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = Result.Success(emptyList())
)
fun setSearchQuery(query: String) {
_searchQuery.value = query
}
}
// Activity/Fragment中收集Flow
class SearchFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
// 使用lifecycleScope收集Flow
viewLifecycleOwner.lifecycleScope.launch {
viewModel.searchResults.collect { result ->
when (result) {
is Result.Loading -> showLoading()
is Result.Success -> showUsers(result.data)
is Result.Error -> showError(result.exception)
}
}
}
// 使用repeatOnLifecycle在特定状态收集
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.searchResults.collect { updateUI(it) }
}
}
}
}
Flow操作符实战
Flow提供了丰富的操作符来处理数据流,掌握这些操作符对于编写高效的响应式代码至关重要。
常用操作符分类
// 变换操作符
val transformedFlow = originalFlow
.map { it * 2 } // 转换每个元素
.filter { it > 10 } // 过滤元素
.take(5) // 只取前5个
.drop(2) // 跳过前2个
.distinctUntilChanged() // 去重连续重复值
// 组合操作符
val combinedFlow = combine(flow1, flow2) { a, b ->
a + b // 合并两个流的最新值
}
val zippedFlow = flow1.zip(flow2) { a, b ->
Pair(a, b) // 按对组合
}
// 展平操作符
val flatMapped = flowOf(1, 2, 3)
.flatMapConcat { value ->
// 顺序执行每个内部Flow
flow { emit(value * 10); delay(100); emit(value * 100) }
}
val flatMappedLatest = searchQueryFlow
.flatMapLatest { query ->
// 新值到来时取消之前的内部Flow
repository.search(query)
}
// 缓冲与背压
val bufferedFlow = dataFlow
.buffer(100) // 缓冲100个元素
.conflate() // 只保留最新值(跳过中间值)
.collectLatest { value ->
// 新值到来时取消当前处理
processValue(value)
}
// 错误处理
val safeFlow = riskyFlow
.catch { e ->
// 捕获异常,可以发射默认值或重新抛出
emit(defaultValue)
logError(e)
}
.retry(3) { cause ->
// 重试3次
cause is IOException
}
.retryWhen { cause, attempt ->
// 自定义重试逻辑
attempt < 3 && cause is NetworkException
}
Flow收集注意事项
- Flow是冷流,每次collect都会重新执行上游代码
- 在UI层收集Flow时,务必使用repeatOnLifecycle避免内存泄漏
- StateFlow和SharedFlow是热流,适合在ViewModel中作为状态管理
- 避免在Flow中使用阻塞操作,应该使用挂起函数
StateFlow与SharedFlow
StateFlow和SharedFlow是Flow的热流(Hot Stream)实现,适合在ViewModel中管理UI状态。StateFlow是有状态的,总是保存最新值;SharedFlow更通用,支持配置重播策略。
// StateFlow - 状态管理
class ProfileViewModel : ViewModel() {
private val _uiState = MutableStateFlow(ProfileUiState())
val uiState: StateFlow<ProfileUiState> = _uiState.asStateFlow()
fun loadProfile(userId: String) {
viewModelScope.launch {
_uiState.update { it.copy(isLoading = true) }
try {
val profile = repository.getProfile(userId)
_uiState.update {
it.copy(
isLoading = false,
profile = profile,
error = null
)
}
} catch (e: Exception) {
_uiState.update {
it.copy(
isLoading = false,
error = e.message
)
}
}
}
}
data class ProfileUiState(
val isLoading: Boolean = false,
val profile: UserProfile? = null,
val error: String? = null
)
}
// SharedFlow - 事件通知
class EventViewModel : ViewModel() {
// 配置重播最近1个事件,缓存100个额外事件
private val _events = MutableSharedFlow<Event>(
replay = 1,
extraBufferCapacity = 100,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val events: SharedFlow<Event> = _events.asSharedFlow()
fun showToast(message: String) {
_events.tryEmit(Event.ShowToast(message))
}
fun navigateToDetail(id: String) {
_events.tryEmit(Event.NavigateToDetail(id))
}
sealed class Event {
data class ShowToast(val message: String) : Event()
data class NavigateToDetail(val id: String) : Event()
}
}
// Fragment中处理事件
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
// 收集状态
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
updateUI(state)
}
}
}
// 收集一次性事件
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.events.collect { event ->
when (event) {
is Event.ShowToast -> {
Toast.makeText(context, event.message, Toast.LENGTH_SHORT).show()
}
is Event.NavigateToDetail -> {
findNavController().navigate(
R.id.action_to_detail,
bundleOf("id" to event.id)
)
}
}
}
}
}
}
}
实战:完整MVVM架构示例
下面展示一个完整的MVVM架构示例,集成协程、Flow、Retrofit和Room,实现一个功能完整的用户列表页面。
// 数据层 - Room数据库
@Dao
interface UserDao {
@Query("SELECT * FROM users")
fun getAllUsers(): Flow<List<UserEntity>>
@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insertUsers(users: List<UserEntity>)
@Query("SELECT * FROM users WHERE name LIKE '%' || :query || '%'")
fun searchUsers(query: String): Flow<List<UserEntity>>
}
// 数据层 - Retrofit API
interface UserApi {
@GET("users")
suspend fun getUsers(): List<UserDto>
@GET("users/{id}")
suspend fun getUser(@Path("id") id: String): UserDto
}
// Repository层
class UserRepositoryImpl @Inject constructor(
private val userApi: UserApi,
private val userDao: UserDao,
private val connectivityObserver: ConnectivityObserver
) : UserRepository {
override fun getUsers(): Flow<Result<List<User>>> = flow {
emit(Result.Loading)
// 先加载本地缓存
val localUsers = userDao.getAllUsers().first().map { it.toDomain() }
if (localUsers.isNotEmpty()) {
emit(Result.Success(localUsers))
}
// 检查网络并刷新
if (connectivityObserver.isOnline) {
try {
val remoteUsers = userApi.getUsers().map { it.toDomain() }
userDao.insertUsers(remoteUsers.map { it.toEntity() })
emit(Result.Success(remoteUsers))
} catch (e: Exception) {
if (localUsers.isEmpty()) {
emit(Result.Error(e))
}
}
}
}.flowOn(Dispatchers.IO)
override fun searchUsers(query: String): Flow<List<User>> {
return userDao.searchUsers(query)
.map { entities -> entities.map { it.toDomain() } }
.flowOn(Dispatchers.IO)
}
}
// ViewModel层
@HiltViewModel
class UserListViewModel @Inject constructor(
private val repository: UserRepository
) : ViewModel() {
private val _searchQuery = MutableStateFlow("")
val uiState: StateFlow<UserListUiState> = combine(
repository.getUsers(),
_searchQuery
) { result, query ->
when (result) {
is Result.Loading -> UserListUiState(isLoading = true)
is Result.Success -> {
val filtered = if (query.isBlank()) {
result.data
} else {
result.data.filter { it.name.contains(query, ignoreCase = true) }
}
UserListUiState(users = filtered)
}
is Result.Error -> UserListUiState(error = result.exception.message)
}
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = UserListUiState(isLoading = true)
)
fun onSearchQueryChange(query: String) {
_searchQuery.value = query
}
data class UserListUiState(
val isLoading: Boolean = false,
val users: List<User> = emptyList(),
val error: String? = null
)
}
最佳实践总结
- 使用viewModelScope和lifecycleScope管理协程生命周期
- Repository返回Flow,在ViewModel中转换为StateFlow
- 使用stateIn操作符将冷流转换为热流,避免重复计算
- UI层使用repeatOnLifecycle收集Flow,避免内存泄漏
- 使用Result包装类统一处理加载、成功、错误状态