futures-rs: асинхронщина на Rust

оригинал: Alex Crichton • перевод: Арсен Галимов • обучение • поддержите на Patreon

Замечание: проект futures-rs был реорганизован и многие вещи были переименованы. Где возможно, ссылки были обновлены.

Начинаем работу с futures

Этот документ поможет вам изучить контейнер для языка программирования Rust — futures, который обеспечивает реализацию futures и потоков с нулевой стоимостью. Futures доступны во многих других языках программирования, таких как C++, Java, и Scala, и контейнер futures черпает вдохновение из библиотек этих языков. Однако он отличается эргономичностью, а также придерживается философии абстракций с нулевой стоимостью, присущей Rust, а именно: для создания и композиции futures не требуется выделений памяти, а для Task, управляющего ими, нужна только одна аллокация. Futures должны стать основой асинхронного компонуемого высокопроизводительного ввода/вывода в Rust, и ранние замеры производительности показывают, что простой HTTP сервер, построенный на futures, действительно быстр.

Эта документация разделена на несколько разделов:

Здравствуй, мир!

Контейнер futures требует Rust версии 1.10.0 или выше, который может быть легко установлен с помощью rustup. Контейнер проверен и точно работает на Windows, macOS и Linux, но PR’ы для других платформ всегда приветствуются. Вы можете добавить futures в Cargo.toml своего проекта следующим образом:

1
2
3
4
[dependencies]
futures = { git = "https://github.com/alexcrichton/futures-rs" }
tokio-core = { git = "https://github.com/tokio-rs/tokio-core" }
tokio-tls = { git = "https://github.com/tokio-rs/tokio-tls" }

Примечание: эта библиотека в активной разработке и требует получения исходников с git напрямую, но позже контейнер будет опубликован на crates.io.

Здесь мы добавляем в зависимости три контейнера:

Контейнер futures является низкоуровневой реализацией futures, которая не несёт в себе какой-либо среды выполнения или слоя ввода/вывода. Для примеров ниже воспользуемся конкретными реализациями, доступными в tokio-core, чтобы показать, как futures и потоки могут быть использованы для выполнения сложных операций ввода/вывода с нулевыми накладными расходами.

Теперь, когда у нас есть всё необходимое, напишем первую программу. В качестве hello-world примера скачаем домашнюю страницу Rust:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
extern crate futures;
extern crate tokio_core;
extern crate tokio_tls;

use std::net::ToSocketAddrs;

use futures::Future;
use tokio_core::reactor::Core;
use tokio_core::net::TcpStream;
use tokio_tls::ClientContext;

fn main() {
    let mut core = Core::new().unwrap();
    let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();

    let socket = TcpStream::connect(&addr, &core.handle());

    let tls_handshake = socket.and_then(|socket| {
        let cx = ClientContext::new().unwrap();
        cx.handshake("www.rust-lang.org", socket)
    });
    let request = tls_handshake.and_then(|socket| {
        tokio_core::io::write_all(socket, "\
            GET / HTTP/1.0\r\n\
            Host: www.rust-lang.org\r\n\
            \r\n\
        ".as_bytes())
    });
    let response = request.and_then(|(socket, _)| {
        tokio_core::io::read_to_end(socket, Vec::new())
    });

    let (_, data) = core.run(response).unwrap();
    println!("{}", String::from_utf8_lossy(&data));
}

Если создать файл с таким содержанием по пути src/main.rs и запустить команду cargo run, то отобразится HTML главной страницы Rust.

Примечание: rustc 1.10 компилирует этот пример медленно. С 1.11 компиляция происходит быстрее.

Этот код слишком большой, чтобы разобраться в нём сходу, так что пройдёмся построчно. Взглянем на функцию main():

1
2
let mut core = Core::new().unwrap();
let addr = "www.rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();

Здесь создаётся цикл событий, в котором будет выполняться весь ввод/вывод. После преобразуем имя хоста «www.rust-lang.org» с использованием метода to_socket_addrs из стандартной библиотеки.

Далее:

1
let socket = TcpStream::connect(&addr, &core.handle());

Получаем хэндл цикла событий и соединяемся с хостом при помощи TcpStream: connect. Примечательно, что TcpStream: connect возвращает future. В действительности, сокет не подключён, но подключение произойдёт позже.

После того, как сокет станет доступным, нам необходимо выполнить три шага для загрузки домашней страницы rust-lang.org:

  1. Выполнить TLS хендшейк. Работать с этой домашней страницей можно только по HTTPS, поэтому мы должны подключиться к порту 443 и следовать протоколу TLS.

  2. Отправить HTTP GET запрос. В рамках этого руководства мы напишем запрос вручную, тем не менее, в боевых программах следует использовать HTTP клиент, построенный на futures.

  3. В заключение, скачать ответ посредством чтения всех данных из сокета.

Рассмотрим каждый из этих шагов подробно. Первый шаг:

1
2
3
4
let tls_handshake = socket.and_then(|socket| {
    let cx = ClientContext::new().unwrap();
    cx.handshake("www.rust-lang.org", socket)
});

Здесь используется метод and_then типажа future, вызывая его у результата выполнения метода TcpStream: connect. Метод and_then принимает замыкание, которое получает значение предыдущего future. В этом случае socket будет иметь тип TcpStream. Стоит отметить, что замыкание, переданное в and_then, не будет выполнено в случае если TcpStream: connect вернёт ошибку.

Как только получен socket, мы создаём клиентский TLS контекст с помощью ClientContext: new. Этот тип из контейнера tokio-tls представляет клиентскую часть TLS соединения. Далее вызываем метод handshake, чтобы выполнить TLS хендшейк. Первый аргумент — доменное имя, к которому мы подключаемся, второй — объект ввода/вывода (в данном случае объект socket).

Как и TcpStream: connect раннее, метод handshake возвращает future. TLS хендшейк может занять некоторое время, потому что клиенту и серверу необходимо выполнить некоторый ввод/вывод, подтверждение сертификатов и т. д. После выполнения future вернёт TlsStream, похожий на рассмотренный выше TcpStream.

Комбинатор and_then выполняет много скрытой работы, обеспечивая выполнение futures в правильном порядке и отслеживая их на лету. При этом значение, возвращаемое and_then, реализует типаж Future, поэтому мы можем составлять цепочки вычислений.

Далее отправляем HTTP запрос:

1
2
3
4
5
6
7
let request = tls_handshake.and_then(|socket| {
    tokio_core::io::write_all(socket, "\
        GET / HTTP/1.0\r\n\
        Host: www.rust-lang.org\r\n\
        \r\n\
    ".as_bytes())
});

Здесь мы получили future из предыдущего шага (tls_handshake) и использовали and_then снова, чтобы продолжить вычисление. Комбинатор write_all полностью записывает HTTP запрос, производя многократные записи по необходимости.

Future, возвращаемый методом write_all, будет выполнен, как только все данные будут записаны в сокет. Примечательно, что TlsStream скрыто шифрует все данные, которые мы записывали, перед тем как отправить в сокет.

Третья и последняя часть запроса выглядит так:

1
2
3
let response = request.and_then(|(socket, _)| {
    tokio_core::io::read_to_end(socket, Vec::new())
});

Предыдущий future request снова связан, на этот раз с результатом выполнения комбинатора read_to_end. Этот future будет читать все данные из сокета и помещать их в предоставленный буфер и вернёт буфер, когда обрабатываемое соединение передаст EOF.

Как и ранее, чтение из сокета на самом деле скрыто расшифровывает данные, полученные от сервера, так что мы читаем расшифрованную версию.

Если исполнение прервётся на этом месте, вы удивитесь, так как ничего не произойдёт. Это потому что всё, что мы сделали, основано на future вычислениях, и мы на самом деле не запустили их. До этого момента мы не делали никакого ввода/вывода и не выполняли HTTP запросов и т. д.

Чтобы по-настоящему запустить futures и управлять ими до завершения, необходимо запустить цикл событий:

1
2
let (_, data) = core.run(response).unwrap();
println!("{}", String::from_utf8_lossy(&data));

Здесь future response помещается в цикл событий, запрашивая у него выполнение future. Цикл событий будет выполняться, пока не будет получен результат.

Примечательно, что вызов core.run(..) блокирует вызывающий поток, пока future не сможет быть возвращён. Это означает, что data имеет тип Vec<u8>. Тогда мы можем напечатать это в stdout как обычно.

Фух! Мы рассмотрели futures, инициализирующие TCP соединение, создающие цепочки вычислений и читающие данные из сокета. Но это только пример возможностей futures, далее рассмотрим нюансы.

Типаж Future

Типаж future является ядром контейнера futures. Этот типаж представляет асинхронные вычисления и их результат. Взглянем на следующий код:

1
2
3
4
5
6
7
8
trait Future {
    type Item;
    type Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

    // ...
}

Я уверен, что определение содержит ряд пунктов, вызывающих вопросы:

Разберём их детально.

Item и Error

1
2
type Item;
type Error;

Первая особенность типажа future, как вы, вероятно, заметили, это то, что он содержит два ассоциированных типа. Они представляют собой типы значений, которые future может получить. Каждый экземпляр Future можно обработать как Result<Self::Item, Self::Error>.

Эти два типа будут применяться очень часто в условиях where при передаче futures и в сигнатурах типа, когда futures будут возвращаться. Для примера, при возвращении future можно написать:

1
2
3
fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
    // ...
}

Или, когда принимаем future:

1
2
3
4
5
6
fn foo<F>(future: F)
    where F: Future<Error = io::Error>,
          F::Item: Clone,
{
    // ...
}

poll

1
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

Работа типажа Future построена на этом методе. Метод poll — это единственная точка входа для извлечения вычисленного в future значения. Как пользователю future вам редко понадобится вызывать этот метод напрямую. Скорее всего, вы будете взаимодействовать с futures через комбинаторы, которые создают высокоуровневые абстракции вокруг futures. Однако знание того, как futures работают под капотом, будет полезным.

Подробнее рассмотрим метод poll. Обратим внимание на аргумент &mut self, который вызывает ряд ограничений и свойств:

На самом деле тип Poll является псевдонимом:

1
type Poll<T, E> = Result<Async<T>, E>;

Так же взглянем, что из себя представляет перечисление Async:

1
2
3
4
pub enum Async<T> {
    Ready(T),
    NotReady,
}

Посредством этого перечисления futures могут взаимодействовать, когда значение future готово к использованию. Если произошла ошибка, тогда будет сразу возвращено Err. В противном случае, перечисление Async отображает, когда значение Future полностью получено или ещё не готово.

Типаж Future, как и Iterator, не определяет, что происходит после вызова метода poll, если future уже обработан. Это означает, что тем, кто реализует типаж Future, не нужно поддерживать состояние, чтобы проверить, успешно ли вернул результат метод poll.

Если вызов poll возвращает NotReady, future всё ещё требуется знать, когда необходимо выполниться снова. Для достижения этой цели future должен обеспечить следующий механизм: при получении NotReady текущая задача должна иметь возможность получить уведомление, когда значение станет доступным.

Метод park является основной точкой входа доставки уведомлений. Эта функция возвращает Task, который реализует типажи Send и 'static, и имеет основной метод — unpark. Вызов метода unpark указывает, что future может производить вычисления и возвращать значение.

Более детальную документацию можно найти здесь.

Комбинаторы future

Теперь кажется, что метод poll может внести немного боли в ваш рабочий процесс. Что если у вас есть future, который должен вернуть String, а вы хотите конвертировать его в future, возвращающий u32? Для получения такого рода композиций типаж future обеспечивает большое число комбинаторов.

Эти комбинаторы аналогичны комбинаторам из типажа Iterator, и все они принимают future и возвращают новый future. Для примера, мы могли бы написать:

1
2
3
4
5
6
7
fn parse<F>(future: F) -> Box<Future<Item=u32, Error=F::Error>>
    where F: Future<Item=String> + 'static,
{
    Box::new(future.map(|string| {
        string.parse::<u32>().unwrap()
    }))
}

Здесь для преобразования future, возвращающий тип String, во future, возвращающий u32, используется map. Упаковывание в Box не всегда необходимо и более подробно будет рассмотрено в разделе возвращений futures.

Комбинаторы позволяют выражать следующие понятия:

Использование комбинаторов похоже на использование типажа Iterator в Rust или futures в Scala. Большинство манипуляций с futures заканчивается использованием этих комбинаторов. Все комбинаторы имеют нулевую стоимость, что означает отсутствие выделений памяти, и что реализация будет оптимизирована таким образом, как будто вы писали это вручную.

Типаж Stream

Предварительно мы рассмотрели типаж Future, который полезен в случае вычисления всего лишь одного значения в течение всего времени. Но иногда вычисления лучше представить в виде потока значений. Для примера, TCP слушатель производит множество TCP соединений в течение своего времени жизни. Посмотрим, какие сущности из стандартной библиотеки эквиваленты Future и Stream:

# items Sync Async Common operations
1 [Result] [Future] [map], [and_then]
[Iterator] [Stream] [map][stream-map], [fold], [collect]

Взглянем на типаж Stream:

1
2
3
4
5
6
trait Stream {
    type Item;
    type Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

Вы могли заметить, что типаж Stream очень похож на типаж Future. Основным отличием является то, что метод poll возвращает Option<Self::Item>, а не Self::Item.

Stream со временем производит множество опциональных значений, сигнализируя о завершении потока возвратом Poll::Ok(None). По своей сути Stream представляет собой асинхронный поток, который производит значения в определённом порядке.

На самом деле, Stream — это специальный экземпляр типажа Future, и он может быть конвертирован в future при помощи метода into_future. Возвращённый future получает следующее значение из потока плюс сам поток, позволяющий получить больше значений позже. Это также позволяет составлять потоки и остальные произвольные futures с помощью базовых комбинаторов future.

Как и типаж Future, типаж Stream обеспечивает большое количество комбинаторов. Помимо future-подобных комбинаторов (например, then) поддерживаются потоко-специфичные комбинаторы, такие как fold.

Пример использования типажа Stream

Пример использования futures рассматривался в начале этого руководства, а сейчас посмотрим на пример использования потоков, применив реализацию метода incoming. Этот простой сервер, который принимает соединения, пишет слово «Hello!» и закрывает сокет:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
extern crate futures;
extern crate tokio_core;

use futures::stream::Stream;
use tokio_core::reactor::Core;
use tokio_core::net::TcpListener;

fn main() {
    let mut core = Core::new().unwrap();
    let address = "127.0.0.1:8080".parse().unwrap();
    let listener = TcpListener::bind(&address, &core.handle()).unwrap();

    let addr = listener.local_addr().unwrap();
    println!("Listening for connections on {}", addr);

    let clients = listener.incoming();
    let welcomes = clients.and_then(|(socket, _peer_addr)| {
        tokio_core::io::write_all(socket, b"Hello!\n")
    });
    let server = welcomes.for_each(|(_socket, _welcome)| {
        Ok(())
    });

    core.run(server).unwrap();
}

Как и ранее, пройдёмся по строкам:

1
2
3
let mut core = Core::new().unwrap();
let address = "127.0.0.1:8080".parse().unwrap();
let listener = TcpListener::bind(&address, &core.handle()).unwrap();

Здесь мы инициализировали цикл событий, вызвав метод TcpListener: bind у LoopHandle для создания TCP слушателя, который будет принимать сокеты.

Далее взглянем на следующий код:

1
2
3
let server = listener.and_then(|listener| {
    // ...
});

Здесь видно, что TcpListener: bind, как и TcpStream: connect, не возвращает TcpListener, скорее, future его вычисляет. Затем мы используем метод and_then у Future, чтобы определить, что случится, когда TCP слушатель станет доступным.

Мы получили TCP слушатель и можем определить его состояние:

1
2
let addr = listener.local_addr().unwrap();
println!("Listening for connections on {}", addr);

Вызываем метод local_addr для печати адреса, с которым связали слушатель. С этого момента порт успешно связан, так что клиенты могут подключиться.

Далее создадим Stream.

1
let clients = listener.incoming();

Здесь метод incoming возвращает Stream пары TcpListener и SocketAddr. Это похоже на TcpListener из стандартной библиотеки и метод accept, только в данном случае мы, скорее, получаем все события в виде потока, а не принимаем сокеты вручную.

Поток clients производит сокеты постоянно. Это отражает работу серверов — они принимают клиентов в цикле и направляют их в остальную часть системы для обработки.

Теперь, имея поток клиентских соединений, мы можем манипулировать им при помощи стандартных методов типажа Stream:

1
2
3
let welcomes = clients.and_then(|(socket, _peer_addr)| {
    tokio_core::io::write_all(socket, b"Hello!\n")
});

Здесь мы используем метод and_then типажа Stream, чтобы выполнить действие над каждым элементом потока. В данном случае мы формируем цепочку вычислений для каждого элемента потока (TcpStream). Мы видели метод write_all ранее, он записывает переданный буфер данных в переданный сокет.

Этот блок означает, что welcomes теперь является потоком сокетов, в которые записана последовательность символов «Hello!». В рамках этого руководства мы завершаем работу с соединением, так что преобразуем весь поток welcomes в future с помощью метода for_each:

1
2
3
welcomes.for_each(|(_socket, _welcome)| {
    Ok(())
})

Здесь мы принимаем результаты предыдущего future, write_all, и отбрасываем их, в результате чего сокет закрывается.

Следует отметить, что важным ограничением этого сервера является отсутствие параллельности. Потоки представляют собой упорядоченную обработку данных, и в данном случае порядок исходного потока — это порядок, в котором сокеты были получены, а методы and_then и for_each этот порядок сохраняют. Таким образом, сцепление (chaining) создаёт эффект, когда берётся каждый сокет из потока и обрабатываются все связанные операции на нём перед переходом к следующем сокету.

Если, вместо этого, мы хотим управлять всеми клиентами параллельно, мы можем использовать метод spawn:

1
2
3
4
5
6
7
8
9
let clients = listener.incoming();
let welcomes = clients.map(|(socket, _peer_addr)| {
    tokio_core::io::write_all(socket, b"hello!\n")
});
let handle = core.handle();
let server = welcomes.for_each(|future| {
    handle.spawn(future.then(|_| Ok(())));
    Ok(())
});

Вместо метода and_then используется метод map, который преобразует поток клиентов в поток futures. Затем мы изменяем замыкание переданное в for_each используя метод spawn, что позволяет future быть запущенным параллельно в цикле событий. Обратите внимание, что spawn требует future c item/error имеющими тип ().

Конкретные реализации futures и потоков

На данном этапе имеется ясное понимание типажей [Future] и [Stream], того, как они реализованы и как их совмещать. Но откуда все эти futures изначально пришли? Взглянем на несколько конкретных реализаций futures и потоков.

Первым делом, любое доступное значение future находится в состоянии «готового». Для этого достаточно функций done, failed и finished. Функция done принимает Result<T,E> и возвращает Future<Item=I, Error=E>. Для функций failed и finished можно указать T или E и оставить другой ассоциированный тип в качестве шаблона (wildcard).

Для потоков эквивалентным понятием «готового» значения потока является функция iter, которая создаёт поток, отдающий элементы полученного итератора. В ситуациях, когда значение не находится в состоянии «готового», также имеется много общих реализаций Future и Stream, первая из которых — функция oneshot:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
extern crate futures;

use std::thread;
use futures::Future;

fn expensive_computation() -> u32 {
    // ...
    200
}

fn main() {
    let (tx, rx) = futures::oneshot();

    thread::spawn(move || {
        tx.complete(expensive_computation());
    });

    let rx = rx.map(|x| x + 3);
}

Здесь видно, что функция oneshot возвращает кортеж из двух элементов, как, например, mpsc: channel. Первая часть tx («transmitter») имеет тип Complete и используется для завершения oneshot, обеспечивая значение future на другом конце. Метод Complete: complete передаст значение принимающей стороне.

Вторая часть кортежа, это rx («receiver»), имеет тип Oneshot, для которого реализован типаж Future. Item имеет тип T, это тип Oneshot. Error имеет тип Canceled, что происходит, когда часть Complete отбрасывается не завершая выполнения вычислений.

Эта конкретная реализация future может быть использована (как здесь показано) для передачи значений между потоками. Каждая часть реализует типаж Send и по отдельности является владельцем сущности. Часто использовать эту реализацию, как правило, не рекомендуется, лучше использовать базовые future и комбинаторы, там где это возможно.

Для типажа Stream доступен аналогичный примитив channel. Этот тип также имеет две части, одна из которых используется для отправки сообщений, а другая, реализующая Stream, для их приёма.

Канальный тип Sender имеет важное отличие от стандартной библиотеки: когда значение отправляется в канал, он потребляет отправителя, возвращая future, который, в свою очередь, возвращает исходного отправителя только когда посланное значение будет потреблено. Это создаёт противодействие, чтобы производитель не смог совершить прогресс пока потребитель от него отстаёт.

Возвращение futures

Самое необходимое действие в работе с futures — это возвращение Future. Однако как и с типажом Iterator, это пока что не так уж легко. Рассмотрим имеющиеся варианты:

Типажи-объекты

Первое, что можно сделать, это вернуть упакованный типаж-объект:

1
2
3
fn foo() -> Box<Future<Item = u32, Error = io::Error>> {
    // ...
}

Достоинством этого подхода является простая запись и создание. Этот подход максимально гибок с точки зрения изменений future, так как любой тип future может быть возвращён в непрозрачном, упакованном виде.

Обратите внимание, что метод boxed возвращает BoxFuture, который на самом деле является всего лишь псевдонимом для Box<Future + Send>:

1
2
3
fn foo() -> BoxFuture<u32, u32> {
    finished(1).boxed()
}

Недостатком такого подхода является выделение памяти в ходе исполнения, когда future создаётся. Box будет выделен в куче, а future будет помещён внутрь. Однако, стоит заметить, что это единственное выделение памяти, и в ходе выполнения future выделений более не будет. Более того, стоимость этой операции в конечном счёте не всегда высокая, так как внутри нет упакованных future (т.е цепочка комбинаторов, как правило, не требует выделения памяти), и данный минус относится только к внешнему Box.

Пользовательские типы

Если вы не хотите возвращать Box, можете обернуть future в свой тип и возвращать его.

Пример:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct MyFuture {
    inner: Oneshot<i32>,
}

fn foo() -> MyFuture {
    let (tx, rx) = oneshot();
    // ...
    MyFuture { inner: tx }
}

impl Future for MyFuture {
    // ...
}

В этом примере возвращается пользовательский тип MyFuture и для него реализуется типаж Future. Эта реализация использует future Oneshot<i32>, но можно использовать любой другой future из контейнера.

Достоинством такого подхода является, то, что он не требует выделения памяти для Box и по-прежнему максимально гибок. Детали реализации MyFuture скрыты, так что он может меняться не ломая остального.

Недостаток такого подхода в том, что он не всегда может быть эргономичным. Объявление новых типов становится слишком громоздким через некоторое время, и при частом возвращении futures это может стать проблемой.

Именованные типы

Следующая возможная альтернатива — именование возвращаемого типа напрямую:

1
2
3
4
5
6
fn add_10<F>(f: F) -> Map<F, fn(i32) -> i32>
    where F: Future<Item = i32>,
{
    fn do_map(i: i32) -> i32 { i + 10 }
    f.map(do_map)
}

Здесь возвращаемый тип именуется так, как компилятор видит его. Функция map возвращает структуру map, которая содержит внутри future и функцию, которая вычисляет значения для map.

Достоинством данного подхода является его эргономичность в отличие от пользовательских типов future, а также отсутствие накладных расходов во время выполнения связанных с Box, как это было ранее.

Недостатком данного подхода можно назвать сложность именования возвращаемых типов. Иногда типы могут быть довольно-таки большими. Здесь используется указатель на функцию (fn(i32) -> i32), но в идеале мы должны использовать замыкание. К сожалению, на данный момент в типе возвращаемого значения не может присутствовать замыкание.

impl Trait

Благодаря новой возможности в Rust, называемой impl Trait, возможен ещё один вариант возвращения future.

Пример:

1
2
3
4
5
fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error>
    where F: Future<Item = i32>,
{
    f.map(|i| i + 10)
}

Здесь мы указываем, что возвращаемый тип — это «нечто, реализующее типаж Future» с учётом указанных ассоциированных типов. При этом использовать комбинаторы future можно как обычно.

Достоинством данного подхода является нулевая стоимость: нет необходимости упаковки в Box, он максимально гибок, так как реализации future скрывают возвращаемый тип и эргономичность написания настолько же хороша, как и в первом примере с Box.

Недостатком можно назвать, то что возможность impl Trait пока не входит в стабильную версию Rust. Хорошие новости в том, что как только она войдёт в стабильную сборку, все контейнеры, использующие futures, смогут немедленно ею воспользоваться. Они должны быть обратно-совместимыми, чтобы сменить типы возвращаемых значений с Box на impl Trait.

Task и Future

До сих пор мы говорили о том, как строить вычисления посредством создания futures, но мы едва ли коснулись того, как их запускать. Ранее, когда разговор шёл о методе poll, было отмечено, что если poll возвращает NotReady, он обеспечивает отправку уведомления задаче, но откуда эта задача вообще взялась? Кроме того, где poll был вызван впервые?

Рассмотрим Task.

Структура Task управляет вычислениями, представленными futures. Любой конкретный экземпляр future может иметь короткий цикл жизни, являясь частью большого вычисления. В примере «Здравствуй, мир!» имелось некоторое количество future, но только один выполнялся в момент времени. Для всей программы был один Task, который следовал логическому «потоку исполнения» по мере того, как обрабатывался каждый future и общее вычисление прогрессировало.

Когда future порождается она сливается с задачей и тогда эта структура может быть опрошена для завершения. Как и когда именно происходит опрос (poll), остаётся во власти функции, которая запустила future. Обычно вы не будете вызывать spawn, а скорее CpuPool: spawn с пулом потоков или Handle: spawn с циклом событий. Внутри они используют spawn и обрабатывают управляющие вызовы poll за вас.

В продуманной реализации типажа Task кроется эффективность контейнера futures: когда Task создан, все Future в цепочке вычислений объединяются в машину состояний и переносятся из стека в кучу. Это действие является единственным, которое требует выделение памяти в контейнере futures. В результате Task ведёт себя таким образом, как если бы вы написали машину состояний вручную, в качестве последовательности прямолинейных вычислений.

Локальные данные задачи

В предыдущем разделе мы увидели, что каждый отдельный future является частью большого асинхронного вычисления. Это означает, что futures приходят и уходят, но может возникнуть необходимость, чтобы у них был доступ к данным, которые живут на протяжении всего времени выполнения программы.

Futures требуют 'static, так что у нас есть два варианта для обмена данными между futures:

Оба эти решения относительно тяжеловесны, поэтому посмотрим, сможем ли мы сделать лучше.

В разделе Task и Future мы увидели, что асинхронные вычисления имеют доступ к Task на всём протяжении его жизни, и из сигнатуры метода poll было видно, что это изменяемый доступ. API Task использует эти особенности и позволяет хранить данные внутри Task. Данные ассоциированные с Task могут быть созданы с помощью двух методов:

Примечательно, что оба эти метода объединяют данные с текущей запущенной задачей, что не всегда может быть желательно, поэтому их следует использовать осторожно.