You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

15 KiB

创建异步任务

同志们,抓稳了,我们即将换挡提速,通向 mini-redis 服务端的高速之路已经开启。

不过在开始之前,先来做点收尾工作:上一章节中,我们实现了一个简易的 mini-redis 客户端并支持了 SET/GET 操作, 现在将该代码移动到 example 文件夹下,因为我们这个章节要实现的是服务器,后面可以用之前客户端示例对我们的服务器端进行测试:

$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs

然后再重新创建一个空的 src/main.rs 文件,至此换挡已经完成,提速正式开始。

接收 sockets

作为服务器端,最基础的工作无疑是接收外部进来的 TCP 连接,可以通过 tokio::net::TcpListener 来完成。

Tokio 中大多数类型的名称都和标准库中对应的同步类型名称相同而且如果没有特殊原因Tokio 的 API 名称也和标准库保持一致,只不过用 async fn 取代 fn 来声明函数。

TcpListener 监听 6379 端口,然后通过循环来接收外部进来的连接,每个连接在处理完后会被关闭。对于目前来说,我们的任务很简单:读取命令、打印到标准输出 stdout,最后回复给客户端一个错误。

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    // 监听指定地址,等待 TCP 连接进来
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // 第二个被忽略的项中包含有新连接的 `IP` 和端口信息
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // `Connection` 对于 redis 的读写进行了抽象封装因此我们读到的是一个一个数据帧frame(数据帧 = redis命令 + 数据),而不是字节流
    // `Connection` 是在 mini-redis 中定义
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // 回复一个错误
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

现在运行我们的简单服务器 :

cargo run

此时服务器会处于循环等待以接收连接的状态,接下来在一个新的终端窗口中启动上一章节中的 redis 客户端,由于相关代码已经放入 examples 文件夹下,因此我们可以使用 -- example 来指定运行该客户端示例:

$ cargo run --example hello-redis

此时,客户端的输出是: Error: "unimplemented", 同时服务器端打印出了客户端发来的由 redis 命令和数据 组成的数据帧: GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

生成任务

上面的服务器,如果你仔细看,它其实一次只能接受和处理一条 TCP 连接,只有等当前的处理完并结束后,才能开始接收下一条连接。原因在于 loop 循环中的 await 会导致当前任务进入阻塞等待,也就是 loop 循环会被阻塞。

而这显然不是我们想要的,服务器能并发地处理多条连接的请求,才是正确的打开姿势,下面来看看如何实现真正的并发。

关于并发和并行,在多线程章节中有详细的解释

为了并发的处理连接,需要为每一条进来的连接都生成( spawn )一个新的任务, 然后在该任务中处理连接:

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // 为每一条连接都生成一个新的任务,
        // `socket` 的所有权将被移动到新的任务中,并在那里进行处理
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

任务

一个 Tokio 任务是一个异步的绿色线程,它们通过 tokio::spawn 进行创建,该函数会返回一个 JoinHandle 类型的句柄,调用者可以使用该句柄跟创建的任务进行交互。

spawn 函数的参数是一个 async 语句块,该语句块甚至可以返回一个值,然后调用者可以通过 JoinHandle 句柄获取该值:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
       10086
    });

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

以上代码会打印出GOT 10086。实际上,上面代码中.await 会返回一个 Result ,若 spawn 创建的任务正常运行结束,则返回一个 Ok(T)的值,否则会返回一个错误 Err:例如任务内部发生了 panic 或任务因为运行时关闭被强制取消时。

任务是调度器管理的执行单元。spawn生成的任务会首先提交给调度器,然后由它负责调度执行。需要注意的是,执行任务的线程未必是创建任务的线程,任务完全有可能运行在另一个不同的线程上,而且任务在生成后,它还可能会在线程间被移动。

任务在 Tokio 中远比看上去要更轻量例如创建一个任务仅仅需要一次64字节大小的内存分配。因此应用程序在生成任务上完全不应该有任何心理负担除非你在一台没那么好的机器上疯狂生成了几百万个任务。。。

'static 约束

当使用 Tokio 创建一个任务时,该任务类型的生命周期必须是 'static。意味着,在任务中不能使用外部数据的引用:

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

上面代码中,spawn 出的任务引用了外部环境中的变量 v ,导致以下报错:

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

原因在于:默认情况下,变量并不是通过 move 的方式转移进 async 语句块的, v 变量的所有权依然属于 main 函数,因为任务内部的 println! 是通过借用的方式使用了 v,但是这种借用并不能满足 'static 生命周期的要求。

在报错的同时Rust编译器还给出了相当有帮助的提示async 语句块使用 move 关键字,这样就能将 v 的所有权从 main 函数转移到新创建的任务中。

但是 move 有一个问题,一个数据只能被一个任务使用,如果想要多个任务使用一个数据,就有些强人所难。不知道还有多少同学记得 Arc,它可以轻松解决该问题,还是线程安全的。

在上面的报错中,还有一句很奇怪的信息function requires argument type to outlive 'static 函数要求参数类型的生命周期必须比 'static 长,问题是 'static 已经活得跟整个程序一样久了,难道函数的参数还能活得更久?大家可能会觉得编译器秀逗了,毕竟其它语言编译器也有秀逗的时候:)

先别急着给它扣帽子,虽然我有时候也想这么做。。原因是它说的是类型必须活得比 'static 长,而不是值。当我们说一个值是 'static 时,意味着它将永远存活。这个很重要,因为编译器无法知道新创建的任务将存活多久,所以唯一的办法就是让任务永远存活。

实际上编译的这个报错在告诉我们函数的参数要满足以下约束:T: 'static

我们一般用 T 被 'static 所约束 来描述 T: 'static,当然你也可以说T 类型比 'static 活得更久T的值是 'static ,事实上这三个是一个意思,都是说只要 T: 'static, 那它将永远活下去。

反而是 &'static T 它拥有一个不同的含义:使用'static 进行标注,但这个标注并不能让 T 永远活下去,仅仅是一个标注。

也就是说,一个类型 T 必须活得比 'static 更长,意味着它满足以下约束 T: 'static

PS: 这一段内容Tokio 的原文有些逻辑混乱,我已经尽可能的理清了作者想表达的含义, 同时补充了一些自己的理解,以让内容阅读起来更流畅 /

一个关于 'static 生命周期的常见误解就是它将永远存活(跟整个程序活得一样久),实际上并不是这样的。同样的,一个变量是 'static 并不意味着它存在内存泄漏的可能性。

Send 约束

tokio::spawn 生成的任务必须实现 Send 特征,因为当这些任务在 .await 执行过程中发生阻塞时Tokio 调度器会将任务在线程间移动。

一个任务要实现 Send 特征,那它在 .await 调用的过程中所持有的全部数据都必须实现 Send 特征。当 .await 调用发生阻塞时,任务会让出当前线程所有权给调度器,然后当任务准备好后,调度器会从上一次暂停的位置继续执行该任务。该流程能正确的工作,任务必须将.await之后使用的所有状态保存起来,这样才能在中断后恢复现场并继续执行。若这些状态实现了 Send 特征(可以在线程间安全地移动),那任务自然也就可以在线程间安全地移动。

例如以下代码可以工作:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // 语句块的使用强制了 `rc` 会在 `.await` 被调用前就被释放,
        // 因此 `rc` 并不会影响 `.await`的安全性
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` 的作用范围已经失效,因此当任务让出所有权给当前线程时,它无需作为状态被保存起来
        yield_now().await;
    });
}

但是下面代码就不行:

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");


        // `rc` 在 `.await` 后还被继续使用,因此它必须被作为任务的状态保存起来
        yield_now().await;


        // 事实上,注释掉下面一行代码,依然会报错
        // 原因是:是否保存,不取决于 `rc` 是否被使用,而是取决于 `.await`在调用时是否仍然处于 `rc` 的作用域中
        println!("{}", rc);

        // rc 作用域在这里结束
    });
}

这里有一个很重要的点,代码注释里有讲到,但是我们再重复一次: rc 是否会保存到任务状态中,取决于 .await 的调用是否处于它的作用域中,上面代码中,就算你注释掉 println! 函数,该报错依然会报错,因为 rc 的作用域直到 async 的末尾才结束!

下面是相应的报错,在下一章节,我们还会继续深入讨论该错误:

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

使用HashMap存储数据

现在,我们可以继续前进了,下面来实现 process 函数,它用于处理进入的命令。相应的值将被存储在 HashMap 中: 通过 SET 命令存值,通过 GET 命令来取值。

同时,我们将使用循环的方式在同一个客户端连接中处理多次连续的请求:

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;

    // 使用 hashmap 来存储 redis 的数据
    let mut db = HashMap::new();

    // `mini-redis` 提供的便利函数,使用返回的 `connection` 可以用于从 socket 中读取数据并解析为数据帧
    let mut connection = Connection::new(socket);

    // 使用 `read_frame` 方法从连接获取一个数据帧一条redis命令 + 相应的数据
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // 值被存储为 `Vec<u8>` 的形式
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` 期待数据的类型是 `Bytes` 该类型会在后面章节讲解,
                    // 此时,你只要知道 `&Vec<u8>` 可以使用 `into()` 方法转换成 `Bytes` 类型
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // 将请求响应返回给客户端
        connection.write_frame(&response).await.unwrap();
    }
}

// main 函数在之前已实现

使用 cargo run 运行服务器,然后再打开另一个终端窗口,运行 hello-redis 客户端示例: cargo run --example hello-redis

Bingo在看了这么多原理后我们终于迈出了小小的第一步并获取到了存在 HashMap 中的值: got value from the server; result=Some(b"world")

但是问题又来了:这些值无法在 TCP 连接中共享,如果另外一个用户连接上来并试图同时获取 hello 这个 key,他将一无所获。