引言
最近打算基于 Rust async-std 造轮子,自然是要熟悉下这个库。以下内容是根据 Rust async-std 官方文档翻译整理,当然也加入了部分自己的理解,有不到位的地方还请指点。
async-std 旨在简化异步编程,由于是模拟 Rust 标准库接口,所以熟悉标准库的话,使用起来也会非常舒服。目前 async-std
给我们提供了很多接口:文件系统、网络、计时器等等;它还提供了一个 task
模型,有点类似 Rust 标准库中的 thread
模块。此外,还有 async/await
风格的 Mutex
原语。
Rust 中有两种类型的 Future
:
- 源自标准库的
std::future::Future
- 源自 futures-rs crate 的
futures::future::Future
背景是这样的,futures-rs crate 中定义的 future 是 Future 类型最初的实现。Rust 为了支持 async/await
语法,就把核心的 trait Future
转移到了标准库中,也就是现在的 std::future::Future
。所以,我们可以把 std::future::Future
看作是 futures::future::Future
的最小子集。
我们需要严格区分 std::future::Future
和 futures::future::Future
,以及 async-std
是如何使用它们的。总的来说,普通的用户一般是不会和 std::future::Future
打交道的(除了使用 .await
调用)。通常只有实现 Future
的开发者才会关注 std::future::Future
的内部工作机制。过去很多在 Future
中定义的功能都被移动到了 FuturesExt
扩展 trait 中。所以,可以将 futures
库当作是核心 Rust 异步功能的扩展。
那么,上面一直提到的 Futures 究竟是何方神圣?又是怎么被执行的呢?简单来说,Futures 就是对于代码是如何运行的一种抽象,它们其实什么也不做。那么怎么推进执行并获得结果呢?答案是依靠执行器 executor
,它会决定何时及如何执行你的 Futures。async-std::task
模块就为我们提供了这样的执行器接口。
Futures
Rust 有个非常亮眼的特性叫「无谓并发」,也就是说我们在编写多线程应用时,可以在获得并发特性的同时,不用担心数据竞争的问题(编译器会在编译时就能检查到数据竞争的问题)。
Futures 是对计算的抽象,它们描述了「做什么(what)」,但是与「在哪儿(where)」、「什么时候(when)」执行是分开的。因此,我们的目标是将代码打散成小段的、可组合的行为,这样可以作为系统的一部分执行。接下来我们来看看什么是计算(compute things),并找到可以抽象的地方。
Send 和 Sync
Rust 安全并发中有两个重要的抽象(Markers):Send
和 Sync
。下面来看看简单的介绍:
Send
标记的数据类型是可以安全地从一个计算(computation)转移到(moved)另外一个并发计算(所有权也同样被转移了,发送方将不能再访问它)。Sync
是指我们可以在并发环境中共享(sharing)数据。
以上没有使用 thread
即线程这样的字眼,而是使用了抽象的 computation
。Send
和 Sync
最强大的特点在于它为我们减轻了知道共享什么的负担。在实现时,我们只需要知道对于相应的数据类型采用什么分享方式即可。
关于 Send
和 Sync
的组合还有更多有趣的特性,可以参考 Rust Book 了解更多。
什么是计算(computation)
所谓的计算(computation)是指一个可组合的操作序列,可以基于决策分支,可以一直推进运行到结束,最终返回结果或者错误。
延迟计算
如上所述,Send
和 Sync
都是描述数据的;但应用程序可不止数据,我们还要知道如何计算出数据,由此引入了 Futures
。我们可以看看 Futures
是怎么允许我们用自然语言表达要做的事情的,Futures
从这样的计划:
- Do X
- If X succeeded, do Y
变成了:
- Start doing X
- Once X succeeds, start doing Y
相比于告诉计算机要做什么,并且根据当前结果决定下一步做什么;延迟计算就是让我们告诉计算机要开始做什么,并响应未来可能发生的事件。
从简单的例子开始
下面的例子是从指定的文件中读取内容:
1 | fn read_file(path: &str) -> io::Result<String> { |
我们可以在任意时刻调用上面的函数,也很直观。问题是,一旦调用上述函数,控制权就会转移至被调用的函数直到其返回。需要说明的是,上述返回值描述的是过去。而过去存在的问题是:所有的决策都已经确定了。但也不是没有优点:返回的结果很明显,我们可以直接将程序过去计算的结果解包(unwrap)出来,然后决定如何使用它。
但我们还是想要抽象计算,并让别人选择如何运行它。所以我们需要一种类型,可以描述计算过程,但是又不执行它。👆 上面的函数只能让我们在调用前或者调用后执行操作。这其实并非预期的,我们希望能够在在运行的时候做点别的事情(实际上就是能够打断执行流)。当在并行编程时,这也剥夺了我们在第一个任务运行时启动另外一个并行任务的能力(这时控制权已经转移给正在执行的函数了)。
此处虽然可以引入线程,但是线程本身是非常特定的并发原语,而我们需要寻找的是一种抽象。具体来说,这样的抽象就是用来表示一个进行中的工作,会在未来产生结果。下面看看非完整定义的 Future
trait:
1 | trait Future { |
如此,我们可以通过 poll()
方法检查 Future
是否完成,如果已经完成,则返回对应的结果。显然,最简单的机制就是在循环中不断轮询;不过我们通常使用成熟的 runtime 来执行。需要注意的是,在 poll()
返回 Poll::Ready
后,继续调用可能会有令人困惑的行为产生,具体可以参见 futures-docs。
Async
实际上 Future
在 Rust 中已经存在一段时间了,只是直接构建和描述它们比较麻烦。因此,async
关键词就是我们的救星,下面的例子中演示了 async-std
搭配 async/await
重构上面的函数:1
2
3
4
5
6
7
8
9// async 标记函数体是一个延迟计算对象,当函数调用时,会产生一个 `Future<Output = io::Result<string>>`,
// 该值并非立即返回的结果 `io::Result<String>`,
//(准确地说,是产生了实现了 `Future<Output = io::Result<String>>` 的类型)。
async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}
.await
干了什么
顾名思义,.await
表示等待请求行为(requested action)直到完成,然后才会继续后续的执行。准确的来说,.await
是一个标记,表示此处的代码将会等待直到 Future
产生结果。至于 Future 是怎么结束的,我们不用关心。.await
会让运行时掌控这段代码的执行,至于在计算结束时要执行哪些操作都由运行时来操心。当你编写的操作在后台完成时,它会回到标记点,继续后续的操作。所以这种模式也称为 事件驱动编程(evented programming),因为我们在等待某些事件发生(如打开文件),并据此作出响应(比如开始读取内容)。
当有 2 个及以上这样的函数在同时运行时,我们的运行时系统就能够处理当前所有进行的事件了,避免了傻傻地等待。
Tasks
我们已经知道什么是 Futures 了,那具体怎么运行它们呢?这就是接下来要介绍的 tasks
模块要做的事情。下面看个简单的例子:
1 | use async_std::{fs::File, io, prelude::*, task}; |
Rust 的 Futures 有时也叫「冷(cold)」Futures,你需要运行它们的东西。为了运行一个 Future,可能还需要一些额外的记账工具:比如当前 Future 是否在运行中还是已经结束,在内存中的位置和当前的状态。这种记账逻辑就被抽象到了 Task
中。
Task
类似于 Thread
,但也有些许不同:它是由应用程序(用户空间)被调度,操作系统内核不会感知;一旦到了某个点需要等待,应用程序自己需要负责唤醒任务。async_std
的任务可以拥有名称和 ID。
当通过 spawn
生成任务后,会一直在后台运行。返回的 JoinHandle
本身是一个 Future,它会在 Task
运行结束后(run to conclusion)也会结束。类似 threads
和 join
函数,我们可以调用对 JoinHandle
调用 block_on
函数,从而阻塞程序(准确来说是调用线程被阻塞)直到其运行结束。
Task 中是如何推进 Future 完成计算的呢?
通过简单阅读源码,可以在 src/task/block_on.rs 看到执行的逻辑,当 Task 被调度执行时,对应的 run
函数也会被执行,进而推进 Future 中的计算逻辑直到完成:
1 | /// Blocks the current thread on a future's result. |
JoinHandle 是如何将任务结果返回的呢?
翻阅 src/task/join_handle.rs 源码得知,它其实也是一个实现了 Future trait
的对象,具体逻辑如下:
1 | pub struct JoinHandle<T>(async_task::JoinHandle<T, Task>); |
可以看到,async_std
中的 JoinHandle
实际是对 async_task::JoinHandle
的包装,我们可以深入看下具体是怎么从 Task 中取到结果的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49// async-task-1.1.0/src/join_handle.rs
/// A handle that awaits the result of a task.
///
/// * `None` 表示任务被取消或者 panic 了
/// * `Some(result)` 表示任务结束,返回的结果 `result` 类型为 `R`
pub struct JoinHandle<R, T> {
/// A raw task pointer.
pub(crate) raw_task: NonNull<()>,
/// A marker capturing generic types `R` and `T`.
pub(crate) _marker: PhantomData<(R, T)>,
}
impl<R, T> Future for JoinHandle<R, T> {
type Output = Option<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
unsafe {
let mut state = (*header).state.load(Ordering::Acquire);
loop {
// If the task has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// ...
return Poll::Ready(None);
}
// If the task is not completed, register the current task.
if state & COMPLETED == 0 {
// ...
}
// 任务完成,提取结果
match (*header).state.compare_exchange(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// 从这里把任务结果获取出来(挺 Hack 的),就是下面的 output.read()
let output = ((*header).vtable.get_output)(ptr) as *mut R;
return Poll::Ready(Some(output.read()));
}
Err(s) => state = s,
}
}
}
}
}
async_std 中的 Task
Task 是 async_std
中最核心的抽象之一,和 Rust 中的 thread
类似。Tasks
和运行时虽然也有关联,但它们是分开的。async_std
Task 有如下几个特性:
- 所有任务是单次分配的(in one single allocation);
- 所有任务都有一个
backchannel
,这样可以通过JoinHandle
将结果和错误传递到对应的任务(spawning task); - 它们携带用于调试的元信息;
- 它们支持 任务级别的 local storage。
async_std
任务 API 会处理背后的 runtime 初始化(setup)和清理(teardown)工作,作为用户,我们不需要显式地启动运行时(这个和 Python 3 有丢丢区别)。
阻塞(Blocking)
一般我们认为 Tasks 都是并发执行的,可能它们会共享同一个执行线程(在该线程上切换任务执行)。这同时也意味着如果在某个执行线程上调用了阻塞 API(比如 std::thread::sleep
或者标准库的 IO 函数),会导致对应执行线程阻塞,从而导致该执行线程上的所有任务都停止运行了。其它的一些库(如 db drivers)也会有类似的行为。
需要注意的是,阻塞当前线程本身并非坏事情,只是不要和 async_std
并发执行的模型搞混淆即可。总之,不要这样做:
1 | fn main() { |
如果需要混合阻塞 API 调用,建议另起一个线程执行这样的阻塞操作。
关于错误和 Panic
常规模式下,如果任务可能出错,那么任务的输出 Output
应该是 Result<T, E>
类型。但是在 panic
的时候,具体的表现则取决于有没有合理处理 panic 的地方,如果没有的话,则会退出。
实践中,意味着 block_on
会传递 panic 到阻塞调用的地方:1
2
3
4
5
6
7
8fn main() {
task::block_on(async {
panic!("test");
});
}
thread 'async-task-driver' panicked at 'test', examples/panic.rs:8:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
而对于 spwan 出去的任务如果 panic,则会导致退出(abort):1
2
3
4
5
6
7
8
9
10
11task::spawn(async {
panic!("test");
});
task::block_on(async {
task::sleep(Duration::from_millis(10000)).await;
})
thread 'async-task-driver' panicked at 'test', examples/panic.rs:8:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
Aborted (core dumped)
上面的行为最初看上去有点奇怪,但另外一种选项是对于 spawn 出去的任务如果发生 panic,则简单忽略掉。当前的行为可以被修改为捕获 spawn 出去的任务中发生的 panic,并且根据不同的情况做出不同的响应,这样我们就可以根据需要采取不同的 panic 处理策略。
更多示例
在 这里 有不少示例代码可以参考,下面是一个简单的 UDP 客户端例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20use async_std::io;
use async_std::net::UdpSocket;
use async_std::task;
fn main() -> io::Result<()> {
task::block_on(async {
let socket = UdpSocket::bind("127.0.0.1:8081").await?;
println!("Listening on {}", socket.local_addr()?);
let msg = "hello world";
println!("<- {}", msg);
socket.send_to(msg.as_bytes(), "127.0.0.1:8080").await?;
let mut buf = vec![0u8; 1024];
let (n, _) = socket.recv_from(&mut buf).await?;
println!("-> {}\n", String::from_utf8_lossy(&buf[..n]));
Ok(())
})
}