在讲tokio之之前我们先讲一下基本概念

Async

异步编程,诀窍就是当CPU等待外部事件或动作时,异步运行时会安排其他可继续执行的任务在CPU上执行,而当从磁盘或者I/O子系统中断到达的时候,异步运行时会知道识别这事,并安排原来的任务继续执行。 一般来说,I/O受限(I/O Bound)的程序(程序执行的速度依赖于I/O子系统的速度)比起CPU受限(CPU Bound)的任务(程序执行的速度依赖于CPU的速度)可能更适合于异步任务的执行。 async,await关键字事rust标准库用于异步编程的内置核心原语集的代表,就是语法糖。下面是代码示例,注释掉的async1,async2是async语法糖部分,等价于未注释掉的async1,async2方法,使用async编译器会识别成未注释掉的那部分。

use tokio::{select, time};

use std::future::Future;

#[tokio::main]

async fn main() {

println!("hello asnyc");

let h1 = tokio::spawn(async {

let result = async1();

println!("asnyc1 resut is {}",result.await);

});

let h2 = tokio::spawn(async {

let result = async2().await;

println!("asnyc2 resut is {}",result);

});

tokio::join!(h1,h2);

}

// async fn async1() ->String {

// time::sleep(time::Duration::from_secs(2)).await;

// String::from("asnyc1")

// }

// async fn async2() ->String {

// time::sleep(time::Duration::from_secs(4)).await;

// String::from("asnyc2")

// }

fn async1() -> impl Future {

async {

time::sleep(time::Duration::from_secs(2)).await;

String::from("asnyc1")

}

}

fn async2() -> impl Future {

async {

time::sleep(time::Duration::from_secs(4)).await;

String::from("asnyc2")

}

}

Future

Rust异步的核心就是Future,Future是由异步计算或函数产生的单一终止值,Rust的异步函数都会返回Future,Future基本上就是代表着延迟的计算。

Output : 代表Future成功之后返回的类型。 poll方法: 返回一个枚举Poll,Poll有两个变体,分别是Ready(val)和Pending。 Ready(val): 表示Future已经完成,val是返回的具体的值。 Pending: 表示Future没有完成。

那么谁来调用poll方法呢?

是异步执行器,它是异步运行时的一部分。异步执行器会管理一个Future的集合,并通过调用Future上的poll方法来驱动他们完成。所以函数或代码块在前面加上async关键字之后,就相当于告诉异步执行器他会返回Future,这个Future需要被驱动直到完成。 但是异步执行器怎么知道异步已经准备好取得进展(可以产生值)了呢?他会持续不断的调用poll方法吗?带着这个疑问,继续往下看。

利用Tokio库来 尝试理解Future

tokio运行时就是管理异步任务并安排他们在CPU上执行的组件。 如图,一个程序可能生成多个异步任务,每个异步任务可能包含一个或多个Future

Tokio执行器如何知道何时再次poll第一个Future呢?

它是一直不断的对他进行poll吗?肯定不会一直poll。 Tokio(rust的异步设计)是使用一个Waker组件来处理这件事的。 当被异步执行器poll过的任务还没有准备好产生结果的时候,这个任务就被注册到一个Waker,Waker会有一个处理程序(handle),它会被存储在任务关联的Context对象中。 Waker有一个wake()方法,可以用来告诉异步执行器关联的任务应该被唤醒了。当wake()方法被调用了,Tokio执行器就会被通知 是时候再次poll这个异步的任务了,具体方法就是调用任务上的poll()函数。

Tokio的组件组成

Tokio运行时需要理解操作系统(内核)的方法来开启I/O操作(读取网络数据,读写文件等)。Tokio运行时会注册异步的处理程序,以便在事件发生时作为I/O操作的一部分进行调用。而在Tokio运行时里面,从内核监听这些事件并与Tokio其他部分通信的组件就是反应器(reactor)。Tokio执行器,他会把一个Future,当其可以取得更多进展时,通过调用Future的poll()方法来驱动其完成。

那么Future是如何告诉执行器他们准备好取得进展了呢? 就是Future调用Waker组件上的wake()方法。Waker组件就会通知执行器,然后再把Future放回队列,并再次调用poll()方法,直到Future完成。

Tokio组件工具流程代码示例

流程说明:

main函数在Tokio运行时上生成任务任务1有一个Future,会从一个大文件读取内容从文件读取内容的请求交给到系统内核的文件子系统与此同时,任务2也被Tokio运行时安排进行处理当任务1的文件操作结束时,文件子系统会出发一个系统中断,他会被编译成Tokio响应器可识别的一个事件Tokio响应器会通知任务1:文件操作的数据已经准备好任务1通知它注册的Waker组件:说明它可以产生一个值了Waker组件通知Tokio执行器来调用任务1关联的poll()方法Tokio执行器安排任务1进行处理,并调用poll()方法 10.任务1产生一个值返回

use tokio::{select, time};

use std::future::Future;

use std::pin::Pin;

use std::task::{Context,Poll};

struct TestFutrue{}

impl Future for TestFutrue {

type Output = String;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {

println!("Tokio ! stop polling me");

cx.waker().wake_by_ref();

//Poll::Pending

Poll::Ready(String::from("poll Ready"))

}

}

#[tokio::main]

async fn main() {

println!("hello asnyc");

let h1 = tokio::spawn(async {

let result: TestFutrue = TestFutrue{};

println!("TestFutrue resut is {}",result.await);

});

let h2 = tokio::spawn(async {

let result = async2().await;

println!("asnyc2 resut is {}",result);

});

tokio::join!(h1,h2);

}

fn async1() -> impl Future {

async {

time::sleep(time::Duration::from_secs(2)).await;

String::from("asnyc1")

}

}

fn async2() -> impl Future {

async {

time::sleep(time::Duration::from_secs(4)).await;

String::from("asnyc2")

}

}

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: