一、Kotlin协程

在Android开发中,Kotlin最终编译为Java的字节码。众所周知,Java中只有进程和线程的概念,并没有协程的概念。那么什么是协程?为什么我们需要协程?

协程,又称微线程。协程不像线程和进程那样,需要进行系统内核的上下文切换,协程的上下文切换由开发人员来决定。

概念过于抽象,我们举一个例子。

我们都知道多线程,当需要执行多项任务的时候,会采用多线程并发执行。拿Android开发中的网络请求来说,假如每个网络请求彼此独立不相干,那我们可以每个网络请求可以单独开启一个线程来执行。

当有1个网络请求,我们开启1个线程。

当有10个网络请求,我们开启10个线程。

当有100个网络请求,我们开启100个线程。

当有1000个网络请求,我们开启1000个线程。

隐隐感觉有哪里不对。 虽然理论上多线程并发执行会带来效率上的提高,但是系统内核就那么多,太多的线程并发执行只会拥堵在那里,并不会带来效率上的提升,并且会带来额外的线程切换开销。不仅如此,每个线程都会带来额外的内存开销。所以我们不能无限制的创建线程。

协程的出现刚好可以解决上述2个问题。协程运行在线程之上,当一个协程执行完成后,可以选择主动让出,让另一个协程运行在当前线程之上。协程并没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多个协程。

有人会提出异议,协程听起来和线程池没有啥区别啊?

不错,笔者认为,Kotlin中的协程本质上与线程池并无区别,Kotlin中协程本质上是一套易用的线程池API,带来的是可读性的提升。

但是这种可读性的提升帮助是巨大的。协程有效解决了回调地狱问题。

假设有下面的场景,要先进行登录,然后才能获取位置,然后才能获取用户信息。每一次都必须异步执行,那我们不使用协程的写法如下:

fun main() {

login {

getLocation {

getUserInfo {

//通过一层层回调拿到真正结果

}

}

}

}

fun login(result: (Boolean) -> Unit) {

result.invoke(true)

}

fun getLocation(result: (Any) -> Unit) {

result.invoke(true)

}

fun getUserInfo(result: (Any) -> Unit) {

result.invoke(true)

}

使用协程之后,可以发现原来使用回调来实现的逻辑,现在变成了顺序执行,代码可读性好了很多。

fun main() {

GlobalScope.launch {

val loginResult = login()

val location = getLocation()

val user = getUserInfo()

}

}

suspend fun login(): Boolean {

//Dispatchers.IO表示使用IO调度器

return GlobalScope.async(Dispatchers.IO) { true }.await()

}

suspend fun getLocation(): Any {

//Dispatchers.IO表示使用Main调度器

return GlobalScope.async(Dispatchers.Main) { Any() }.await()

}

suspend fun getUserInfo():Any {

return GlobalScope.async(Dispatchers.Default) { Any() }.await()

}

二、Kotlin预置的调度器

在创建协程时候,需要指定调度器。协程的调度器用于确定执行协程的目标载体,即运行在哪个线程,包含在一个或者多个线程中。协程调度器可以将协程的执行操作限制在特定线程上,也可以将其分派在线程池中,或者让它无线执行。系统预制的调度器有三种:

Dispatchers.Default:默认的调度器,当不指定调度器时,使用该调度器。此调度程序经过专门优化,适合在主线程之外执行占用大量CPU资源的工作,如对列表排序,对JSON解析。Dispatchers.Main:此调度可以在Android主线程上运行协程。此调度只能用于与界面交互和执行快速工作。如Android界面框架操作,更新LiveData对象等。Dispatchers.IO:此调度适合在主线程之外执行磁盘或者网络IO。如使用ROOM组件、从文件中读取数据或向文件中写入数据,以及运行任何网络操作。Dispatchers.Unconfined:对执行协程的线程不做限制,可以直接在当前调度器所在的线程上执行。

篇幅原因,这里我们选用Dispatchers.Default看一下源码实现: 核心部分有三个位置,第一个是参数的定义

public open class ExperimentalCoroutineDispatcher(

private val corePoolSize: Int,

private val maxPoolSize: Int,

private val idleWorkerKeepAliveNs: Long,

private val schedulerName: String = "CoroutineScheduler"

) : ExecutorCoroutineDispatcher() {

public constructor(

corePoolSize: Int = CORE_POOL_SIZE, //Default调度器会向线程池一样,定义核心线程,默认是2个

maxPoolSize: Int = MAX_POOL_SIZE, //定义做大线程

schedulerName: String = DEFAULT_SCHEDULER_NAME

) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

xxxxxx

}

第二个位置是CoroutineScheduler类的dispatch部分

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {

trackTask() // this is needed for virtual time support

//将我们要执行的任务 block 封装为一个Task

val task = createTask(block, taskContext)

// try to submit the task to the local queue and act depending on the result

//找到要执行这个任务的worker,Worker可以被理解为Thread

val currentWorker = currentWorker()

//将任务提交到队列中执行

val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)

//xxxxx

}

第三个位置是Worker的runWorker方法

private fun runWorker() {

var rescanned = false

while (!isTerminated && state != WorkerState.TERMINATED) {

val task = findTask(mayHaveLocalTasks)

// Task found. Execute and repeat

if (task != null) {

rescanned = false

minDelayUntilStealableTaskNs = 0L

//这里会依次从队首获取到当前要执行的任务, 进行执行

executeTask(task)

continue

} else {

mayHaveLocalTasks = false

}

}

三、Kotlin自定义调度器

从上述例子中,我们可以看到系统预置的Default调度器是怎么执行的,我们可以仿照Default调度器来实现自己的调度器。

比如说有以下场景,我希望我的任务都在子线程中顺序执行,那我们可以定义一个线程数为1的调度器,如下。

我们定义一个线程池,实现调度器中的dispatch方法,直接在单线程池中执行即可。

object SingleDispatcher : ExecutorCoroutineDispatcher() {

private val myExecutor: Executor by lazy {

Executors.newSingleThreadExecutor()

}

override val executor: Executor

get() = myExecutor

override fun close() {

}

override fun dispatch(context: CoroutineContext, block: Runnable) {

executor.execute(block)

}

}

或者我们参考Default实现,规定最大线程数即可。

@OptIn(InternalCoroutinesApi::class)

object NewSingleDispatcher

: ExperimentalCoroutineDispatcher(

corePoolSize = 1,

maxPoolSize = 1,

idleWorkerKeepAliveNs = 60 * 1000 * 1000,

"NewSingleDisapthcer"

)

使用办法如下

suspend fun login(): Boolean {

//Dispatchers.IO表示使用IO调度器

return GlobalScope.async(SingleDispatcher) { true }.await()

}

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: