Rust async-std 入门

引言

最近打算基于 Rust async-std 造轮子,自然是要熟悉下这个库。以下内容是根据 Rust async-std 官方文档翻译整理,当然也加入了部分自己的理解,有不到位的地方还请指点。

async-std 旨在简化异步编程,由于是模拟 Rust 标准库接口,所以熟悉标准库的话,使用起来也会非常舒服。目前 async-std 给我们提供了很多接口:文件系统、网络、计时器等等;它还提供了一个 task 模型,有点类似 Rust 标准库中的 thread 模块。此外,还有 async/await 风格的 Mutex 原语。

Rust 中有两种类型的 Future

  1. 源自标准库std::future::Future
  2. 源自 futures-rs cratefutures::future::Future

背景是这样的,futures-rs crate 中定义的 future 是 Future 类型最初的实现。Rust 为了支持 async/await 语法,就把核心的 trait Future 转移到了标准库中,也就是现在的 std::future::Future。所以,我们可以把 std::future::Future 看作是 futures::future::Future 的最小子集。

我们需要严格区分 std::future::Futurefutures::future::Future,以及 async-std 是如何使用它们的。总的来说,普通的用户一般是不会和 std::future::Future 打交道的(除了使用 .await 调用)。通常只有实现 Future 的开发者才会关注 std::future::Future 的内部工作机制。过去很多在 Future 中定义的功能都被移动到了 FuturesExt 扩展 trait 中。所以,可以将 futures 库当作是核心 Rust 异步功能的扩展。

那么,上面一直提到的 Futures 究竟是何方神圣?又是怎么被执行的呢?简单来说,Futures 就是对于代码是如何运行的一种抽象,它们其实什么也不做。那么怎么推进执行并获得结果呢?答案是依靠执行器 executor,它会决定何时如何执行你的 Futures。async-std::task 模块就为我们提供了这样的执行器接口。

Futures

Rust 有个非常亮眼的特性叫「无谓并发」,也就是说我们在编写多线程应用时,可以在获得并发特性的同时,不用担心数据竞争的问题(编译器会在编译时就能检查到数据竞争的问题)。

Futures 是对计算的抽象,它们描述了「做什么(what)」,但是与「在哪儿(where)」、「什么时候(when)」执行是分开的。因此,我们的目标是将代码打散成小段的、可组合的行为,这样可以作为系统的一部分执行。接下来我们来看看什么是计算(compute things),并找到可以抽象的地方。

Send 和 Sync

Rust 安全并发中有两个重要的抽象(Markers):SendSync。下面来看看简单的介绍:

  • Send 标记的数据类型是可以安全地从一个计算(computation)转移到(moved)另外一个并发计算(所有权也同样被转移了,发送方将不能再访问它)。
  • Sync 是指我们可以在并发环境中共享(sharing)数据。

以上没有使用 thread 即线程这样的字眼,而是使用了抽象的 computationSendSync 最强大的特点在于它为我们减轻了知道共享什么的负担。在实现时,我们只需要知道对于相应的数据类型采用什么分享方式即可。

关于 SendSync 的组合还有更多有趣的特性,可以参考 Rust Book 了解更多。

什么是计算(computation)

所谓的计算(computation)是指一个可组合的操作序列,可以基于决策分支,可以一直推进运行到结束,最终返回结果或者错误。

延迟计算

如上所述,SendSync 都是描述数据的;但应用程序可不止数据,我们还要知道如何计算出数据,由此引入了 Futures。我们可以看看 Futures 是怎么允许我们用自然语言表达要做的事情的,Futures 从这样的计划:

  • Do X
  • If X succeeded, do Y

变成了:

  • Start doing X
  • Once X succeeds, start doing Y

相比于告诉计算机要做什么,并且根据当前结果决定下一步做什么;延迟计算就是让我们告诉计算机要开始做什么,并响应未来可能发生的事件。

从简单的例子开始

下面的例子是从指定的文件中读取内容:

1
2
3
4
5
6
fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}

我们可以在任意时刻调用上面的函数,也很直观。问题是,一旦调用上述函数,控制权就会转移至被调用的函数直到其返回。需要说明的是,上述返回值描述的是过去。而过去存在的问题是:所有的决策都已经确定了。但也不是没有优点:返回的结果很明显,我们可以直接将程序过去计算的结果解包(unwrap)出来,然后决定如何使用它。

但我们还是想要抽象计算,并让别人选择如何运行它。所以我们需要一种类型,可以描述计算过程,但是又不执行它。👆 上面的函数只能让我们在调用前或者调用后执行操作。这其实并非预期的,我们希望能够在在运行的时候做点别的事情(实际上就是能够打断执行流)。当在并行编程时,这也剥夺了我们在第一个任务运行时启动另外一个并行任务的能力(这时控制权已经转移给正在执行的函数了)。

此处虽然可以引入线程,但是线程本身是非常特定的并发原语,而我们需要寻找的是一种抽象。具体来说,这样的抽象就是用来表示一个进行中的工作,会在未来产生结果。下面看看非完整定义的 Future trait:

1
2
3
4
5
6
7
8
trait Future {
// Ouput 是一个泛型,表示输出结果的泛型
type Output;
// poll 方法会返回当前计算的状态,poll 方法会返回两种结果:
// 1. `Poll::Ready`,表示计算结束
// 2. `Poll::Pending`,表示计算尚未结束
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

如此,我们可以通过 poll() 方法检查 Future 是否完成,如果已经完成,则返回对应的结果。显然,最简单的机制就是在循环中不断轮询;不过我们通常使用成熟的 runtime 来执行。需要注意的是,在 poll() 返回 Poll::Ready 后,继续调用可能会有令人困惑的行为产生,具体可以参见 futures-docs

Async

实际上 Future 在 Rust 中已经存在一段时间了,只是直接构建和描述它们比较麻烦。因此,async 关键词就是我们的救星,下面的例子中演示了 async-std 搭配 async/await 重构上面的函数:

1
2
3
4
5
6
7
8
9
// async 标记函数体是一个延迟计算对象,当函数调用时,会产生一个 `Future<Output = io::Result<string>>`,
// 该值并非立即返回的结果 `io::Result<String>`,
//(准确地说,是产生了实现了 `Future<Output = io::Result<String>>` 的类型)。
async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}

.await 干了什么

顾名思义,.await 表示等待请求行为(requested action)直到完成,然后才会继续后续的执行。准确的来说,.await 是一个标记,表示此处的代码将会等待直到 Future 产生结果。至于 Future 是怎么结束的,我们不用关心。.await 会让运行时掌控这段代码的执行,至于在计算结束时要执行哪些操作都由运行时来操心。当你编写的操作在后台完成时,它会回到标记点,继续后续的操作。所以这种模式也称为 事件驱动编程(evented programming),因为我们在等待某些事件发生(如打开文件),并据此作出响应(比如开始读取内容)。

当有 2 个及以上这样的函数在同时运行时,我们的运行时系统就能够处理当前所有进行的事件了,避免了傻傻地等待。

Tasks

我们已经知道什么是 Futures 了,那具体怎么运行它们呢?这就是接下来要介绍的 tasks 模块要做的事情。下面看个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use async_std::{fs::File, io, prelude::*, task};
async fn read_file(path: &str) -> io::Result<String> {
let mut file: File = File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}

fn main() {
// `spawn` 方法接收一个 `Future`,并且在一个任务中执行它。
// 该函数会返回 `JoinHanle`。
let reader_task = task::spawn(async {
// 这里是一个异步块,必须要使用异步块才能调用异步函数,同时
// 也会引导编译器将所有相关的指令包含进来。Rust 中所有的块
// 都会返回一个值,而 `async` 块则返回的是类型为
// `Future` 的值
let result = read_file("data.csv").await;
match result {
Ok(s) => println!("{}", s),
Err(e) => println!("Error reading {:?}", e)
}
});

println!("Started task!");
task::block_on(reader_task);
println!("Stopped task!");
}

Rust 的 Futures 有时也叫「冷(cold)」Futures,你需要运行它们的东西。为了运行一个 Future,可能还需要一些额外的记账工具:比如当前 Future 是否在运行中还是已经结束,在内存中的位置和当前的状态。这种记账逻辑就被抽象到了 Task 中。

Task 类似于 Thread,但也有些许不同:它是由应用程序(用户空间)被调度,操作系统内核不会感知;一旦到了某个点需要等待,应用程序自己需要负责唤醒任务。async_std 的任务可以拥有名称和 ID。

当通过 spawn 生成任务后,会一直在后台运行。返回的 JoinHandle 本身是一个 Future,它会在 Task 运行结束后(run to conclusion)也会结束。类似 threadsjoin 函数,我们可以调用对 JoinHandle 调用 block_on 函数,从而阻塞程序(准确来说是调用线程被阻塞)直到其运行结束。

Task 中是如何推进 Future 完成计算的呢?

通过简单阅读源码,可以在 src/task/block_on.rs 看到执行的逻辑,当 Task 被调度执行时,对应的 run 函数也会被执行,进而推进 Future 中的计算逻辑直到完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/// Blocks the current thread on a future's result.
fn run<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
CACHE.with(|cache| {
// ...
loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t;
}

// Yield a few times or park the current thread.
if step < 3 {
thread::yield_now();
step += 1;
} else {
arc_parker.park();
step = 0;
}
}
})
}

JoinHandle 是如何将任务结果返回的呢?

翻阅 src/task/join_handle.rs 源码得知,它其实也是一个实现了 Future trait 的对象,具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pub struct JoinHandle<T>(async_task::JoinHandle<T, Task>);
// ...
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
// 如果是 None,表示任务 panic 或者被取消了
Poll::Ready(None) => panic!("cannot await the result of a panicked task"),
// 当任务结束,自然会拿到具体结果
Poll::Ready(Some(val)) => Poll::Ready(val),
}
}
}

可以看到,async_std 中的 JoinHandle 实际是对 async_task::JoinHandle 的包装,我们可以深入看下具体是怎么从 Task 中取到结果的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// async-task-1.1.0/src/join_handle.rs

/// A handle that awaits the result of a task.
///
/// * `None` 表示任务被取消或者 panic 了
/// * `Some(result)` 表示任务结束,返回的结果 `result` 类型为 `R`
pub struct JoinHandle<R, T> {
/// A raw task pointer.
pub(crate) raw_task: NonNull<()>,
/// A marker capturing generic types `R` and `T`.
pub(crate) _marker: PhantomData<(R, T)>,
}

impl<R, T> Future for JoinHandle<R, T> {
type Output = Option<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
unsafe {
let mut state = (*header).state.load(Ordering::Acquire);
loop {
// If the task has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// ...
return Poll::Ready(None);
}
// If the task is not completed, register the current task.
if state & COMPLETED == 0 {
// ...
}

// 任务完成,提取结果
match (*header).state.compare_exchange(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// 从这里把任务结果获取出来(挺 Hack 的),就是下面的 output.read()
let output = ((*header).vtable.get_output)(ptr) as *mut R;
return Poll::Ready(Some(output.read()));
}
Err(s) => state = s,
}
}
}
}
}

async_std 中的 Task

Task 是 async_std 中最核心的抽象之一,和 Rust 中的 thread 类似。Tasks 和运行时虽然也有关联,但它们是分开的。async_std Task 有如下几个特性:

  • 所有任务是单次分配的(in one single allocation);
  • 所有任务都有一个 backchannel,这样可以通过 JoinHandle 将结果和错误传递到对应的任务(spawning task);
  • 它们携带用于调试的元信息;
  • 它们支持 任务级别的 local storage。

async_std 任务 API 会处理背后的 runtime 初始化(setup)和清理(teardown)工作,作为用户,我们不需要显式地启动运行时(这个和 Python 3 有丢丢区别)。

阻塞(Blocking)

一般我们认为 Tasks 都是并发执行的,可能它们会共享同一个执行线程(在该线程上切换任务执行)。这同时也意味着如果在某个执行线程上调用了阻塞 API(比如 std::thread::sleep 或者标准库的 IO 函数),会导致对应执行线程阻塞,从而导致该执行线程上的所有任务都停止运行了。其它的一些库(如 db drivers)也会有类似的行为。

需要注意的是,阻塞当前线程本身并非坏事情,只是不要和 async_std 并发执行的模型搞混淆即可。总之,不要这样做:

1
2
3
4
5
6
fn main() {
task::block_on(async {
// 标准库 std::fs,会导致阻塞
std::fs::read_to_string("test_file");
})
}

如果需要混合阻塞 API 调用,建议另起一个线程执行这样的阻塞操作。

关于错误和 Panic

常规模式下,如果任务可能出错,那么任务的输出 Output 应该是 Result<T, E> 类型。但是在 panic 的时候,具体的表现则取决于有没有合理处理 panic 的地方,如果没有的话,则会退出。

实践中,意味着 block_on 会传递 panic 到阻塞调用的地方:

1
2
3
4
5
6
7
8
fn main() {
task::block_on(async {
panic!("test");
});
}

thread 'async-task-driver' panicked at 'test', examples/panic.rs:8:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.

而对于 spwan 出去的任务如果 panic,则会导致退出(abort):

1
2
3
4
5
6
7
8
9
10
11
task::spawn(async {
panic!("test");
});

task::block_on(async {
task::sleep(Duration::from_millis(10000)).await;
})

thread 'async-task-driver' panicked at 'test', examples/panic.rs:8:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Aborted (core dumped)

上面的行为最初看上去有点奇怪,但另外一种选项是对于 spawn 出去的任务如果发生 panic,则简单忽略掉。当前的行为可以被修改为捕获 spawn 出去的任务中发生的 panic,并且根据不同的情况做出不同的响应,这样我们就可以根据需要采取不同的 panic 处理策略。

更多示例

这里 有不少示例代码可以参考,下面是一个简单的 UDP 客户端例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use async_std::io;
use async_std::net::UdpSocket;
use async_std::task;

fn main() -> io::Result<()> {
task::block_on(async {
let socket = UdpSocket::bind("127.0.0.1:8081").await?;
println!("Listening on {}", socket.local_addr()?);

let msg = "hello world";
println!("<- {}", msg);
socket.send_to(msg.as_bytes(), "127.0.0.1:8080").await?;

let mut buf = vec![0u8; 1024];
let (n, _) = socket.recv_from(&mut buf).await?;
println!("-> {}\n", String::from_utf8_lossy(&buf[..n]));

Ok(())
})
}

参考

0%