RxJava一直是我长久以来的救星。它提供了丰富的功能,让我在Android编程中更加注重响应式思维。我的代码中到处都是Single、Subject和Completable。

而现在,协程成为了备受赞誉和推崇的选择,许多演讲和会议都推荐使用。于是我开始学习它。

为了展示我目前的学习成果,我将尝试比较RxJava和协程在解决一些常见问题时的差异。我使用的库和设计模式有:

MVVM与Android Architecture ComponentRepository模式Retrofit

加载并显示

打开菜单片段并显示从服务器获取的咖啡列表

RxJava

让我们从ViewModel的角度开始。在MenuViewModel中,我调用menuRepository.getMenu()以获取Single的实例。

class MenuViewModel @Inject constructor(

private val menuRepository: MenuRepository)

: ViewModel() {

val coffeeList: LiveData>

get() = _coffeeList

private val _coffeeList = MutableLiveData>()

private val disposeBag = CompositeDisposable()

override fun onCleared() {

super.onCleared()

disposeBag.dispose()

}

fun loadMenu() {

val disposable = menuRepository.getMenu()

.subscribe { list: List ->

_coffeeList.value = list

}

disposeBag.add(disposable)

}

}

请注意,忽略了错误处理。

为了避免内存泄漏,在onCleared()期间我们必须记得处理可释放的资源。

repository代码如下:

class MenuRepository @Inject constructor(

private val menuApi: MenuApi

) {

fun getMenu(): Single> {

return menuApi.getMenu()

.observeOn(AndroidSchedulers.mainThread())

.subscribeOn(Schedulers.io())

}

}

interface MenuApi {

@GET("menu")

fun getMenu(): Single>

}

我们正在调用subscribeOn和observeOn来确保网络调用不在主线程上执行。

Coroutines

class MenuViewModel @Inject constructor(

private val menuRepository: MenuRepository)

: ViewModel() {

val coffeeList: LiveData> = liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {

emit(menuRepository.getMenu())

}

}

class MenuRepository @Inject constructor(

private val menuApi: MenuApi

) {

suspend fun getMenu(): List {

return menuApi.getMenu()

}

}

interface MenuApi {

@GET("menu")

suspend fun getMenu(): List

}

正如您在这里所看到的,借助协程和KTX扩展,代码变得简单得多。生命周期和线程管理只需向liveData块传递一个参数即可:

context = viewModelScope.coroutineContext + Dispatchers.IO

将视图A从视图B更新

当用户在菜单类型Fragment中选择菜单类型时,更新咖啡菜单。

更具体地说,菜单类型Fragment放置在菜单Fragment上方。当用户在此处更改菜单类型时,我们通过所选的菜单类型更新下方的菜单。

为了分离关注点并解耦组件,这两个片段将不直接相互通信。我们将利用MenuRepository作为中间人,并让两个片段共享相同的实例。这可以通过依赖注入框架如Dagger来实现。这里将忽略其实际实现。

RxJava

class MenuViewModel {

fun loadMenu() {

menuRepository.menu

.subscribe { list: List ->

_coffeeList.value = list

}

menuRepository.refreshMenu(MenuType.DEFAULT).subscribe()

}

}

class MenuRepository {

private val menuSubject = BehaviorSubject.create>()

val menu: Observable>

get() = menuSubject

fun refreshMenu(menuType: MenuType): Completable {

return menuApi.getMenu(menuType)

.observeOn(AndroidSchedulers.mainThread())

.subscribeOn(Schedulers.io())

.doOnSuccess { menuSubject.onNext(it) }

.ignoreElement()

}

}

在MenuRepository中,getMenu()被替换为一个菜单属性和一个refreshMenu()方法。我使用副作用运算符doOnSuccess()将接收到的数据传递给Subject,它是一个BehaviorSubject,用于保存最新的菜单数据。在MenuFragment中,getMethod().subscribe {}被拆分为两个调用,menu.subscribe {}和refreshMenu().subscribe()。

通过这个变化,我们可以在MenuTypeFragment中简单地这样做。

class MenuTypeViewModel @Inject constructor(

private val menuRepo: MenuRepository

): ViewModel() {

fun onMenuTypeChanged() {

menuRepo.refreshMenu(MenuType.TODAY_SPECIAL).subscribe()

}

}

请注意,我忽略了一次性处理。为了避免任何内存泄漏,您仍需执行前一个用例中展示的相同处理。此外,如果API调用失败,每次调用menuRepo.refreshMenu()都必须进行错误处理。

Coroutines

在这种情况下,协程并没有像示例3中那样简化。接下来您将看到原因所在。

class MenuViewModel {

val coffeeList: LiveData> = liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {

// if the data needs to be processed before displaying,

// this is where I usually do it

menuRepo.menu.collect(::emit)

}

}

class MenuRepository {

private val menuChannel = BroadcastChannel>(CONFLATED)

val menu: Flow>

get() = channel.asFlow()

suspend fun refreshMenu(menuType: MenuType) {

menuChannel.send(menuApi.getMenu(menuType))

}

}

interface MenuApi {

@GET("menu")

suspend fun getMenu(@Query("type") menuType: MenuType): List

}

在MenuRepository中,我们仍然需要创建与BehaviorSubject类似的东西。在协程世界中,它被称为BroadcastChannel(CONFLATED)。这是一个工厂方法,它创建了ConflatedBroadcastChannel的实例。就像在RxJava中有其他类型的Subject一样,这里也有其他类型的BroadcastChannel。

为了能够观察到热数据流,我们使用asFlow将BroadcastChannel转换为Flow。然后我们可以在ViewModel中调用collect()来持续接收新数据。

这个概念更接近于RxJava世界而不是协程世界。collect()类似于subscribe(),Flow类似于Observable。如果你看一下Flow,会发现有很多内置的操作符,比如retryWhen或debounce,这些在RxJava中已经存在并且我们都喜欢。

然后在MenuTypeFragment中代码如下:

class MenuTypeViewModel @Inject constructor(

private val menuRepo: MenuRepository

): ViewModel() {

fun onMenuTypeChanged() {

viewModelScope.launch {

// try-catch-finally is how we usually handle

// error in Coroutines world

try {

menuRepo.refreshMenu(MenuType.TODAY_SPECIAL)

} catch (e: Exception) {

// e.g.

// notify user

// record exception to your exception monitoring service

} finally {

// e.g.

// dismiss loading indicator

// enable views

}

}

}

}

由于我们使用了ConflatedBroadcastChannel,因此refreshMenu()内部的send()将永远不会挂起。这意味着如果有很多地方使用refreshMenu(),将不会有很多挂起的协程持有您的资源。

随着Kotlin协程1.4.0的发布,ConflatedBroadcastChannel已被弃用。StateFlow和ShareFlow被引入。您可以在官方公告中了解更多信息。

使用哪个调度器?

在代码片段6中,我们直接在collect中调用emit。

menuRepo.menu.collect(::emit)

但是要设置值给LiveData使用了哪个调度器?为了找到答案,我打印了一些日志。

menuRepo.menu.collect {

Log.d("MenuViewModel", Thread.currentThread().name)

emit(it)

}

//打印结果: D/MenuViewModel: DefaultDispatcher-worker-1 这看起来不对劲,在我没有在主线程中设置LiveData值的情况下,为什么这不会导致应用崩溃?

原来,KTX库已经为我们解决了这个问题。在liveData { }的emit()内部,它执行了以下操作:

override suspend fun emit(value: T) = withContext(coroutineContext) {

// target: LiveData

target.clearSource()

target.value = value

}

而你可能猜到了,这就是coroutineContext:

private val coroutineContext = context + Dispatchers.Main.immediate

这里的Dispatchers.Main表示我们Android的主线程。

订阅的取消处理?

那么menuRepo.menu.collect()的取消处理呢?当LiveData变为非活跃状态时,订阅会自动取消吗?还是我们仍然需要在RxJava中执行所有的步骤?

答案是是的,它会自动取消。在KTX库内部,liveData { }生成的LiveData在LiveData变为非活跃状态时会取消正在运行的协程。

结论

当您有一个需要等待其结果的阻塞耗时任务时,协程本身可能是一个很不错的选择。如果您使用Android KTX库,您将发现自己编写了最高效的代码。

然而,当您想使用协程来实现观察者设计模式时,与RxJava相比,它并没有提升多少生产力。最终,协程本身并不是专门为此而构建的。

RxJava仍然是我工具箱中最强大的工具。我不建议在所有情况下都迁移到协程。协程中仍然存在很多实验性API,包括我示例中的BroadcastChannel。协程真正发挥作用还需要时间。

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: