Futures нулевой стоимости в Rust

оригинал: Aaron Turon • перевод: Сергей Ефремов • обучение • поддержите на Patreon

Замечание: проект futures-rs был реорганизован и многие вещи были переименованы. Где возможно, ссылки были обновлены.

Одним из основных пробелов в экосистеме Rust был быстрый и эффективный асинхронный ввод/вывод. У нас есть прочный фундамент из библиотеки mio, но она очень низкоуровневая: приходится вручную создавать конечные автоматы и жонглировать обратными вызовами.

Нам бы хотелось чего-нибудь более высокоуровневого, с лучшей эргономикой, но чтобы оно обладало хорошей компонуемостью, поддерживая экосистему асинхронных абстракций, работающих вместе. Звучит очень знакомо: ту же цель преследовало внедрение futures (или promises) во многие языки, поддерживающие синтаксический сахар в виде async/await на вершине.

Основным принципом Rust является возможность строить абстракции с нулевой стоимостью, что приводит нас к дополнительной цели нашего рассказа о async I/O: в идеале абстракции как futures должны компилироваться в что-то эквивалентное коду в виде конечных-автоматов-и-жонглирования-обратными-вызовам, который мы сегодня пишем (без дополнительных накладных расходов во времени исполнения).

Последние несколько месяцев, Alex Crichton и я разрабатывали библиотеку futures нулевой стоимости для Rust, ту, которая, мы считаем, позволит достичь этих целей. (Спасибо Carl Lerche, Yehuda Katz, и Nicholas Matsakis за понимание на все пути.)

Сегодня мы рады начать серию статей о новой библиотеке. В этом посте рассказываются самые яркие моменты, ключевые идеи и несколько предварительных тестов. Дальнейшие посты покажут, как возможности Rust используются в проектировании этих абстракций с нулевой стоимостью. Также вас уже ждёт tutorial.

Почему async I/O?

Прежде, чем копать futures, полезно будет рассказать немного о прошлом.

Начнём с маленького кусочка I/O, который вы хотели бы выполнить: чтение определённого количества байт из сокета. Rust предоставляет функцию read_exact для этого:

1
2
// reads 256 bytes into `my_vec`
socket.read_exact(&mut my_vec[..256]);

Быстрый вопрос: что происходит, если у нас ещё недостаточно байт получено от сокета?

Для сегодняшнего Rust ответ такой: текущий поток блокируется, засыпая пока не будут получены ещё байты. Но так было не всегда.

Давным-давно в Rust была реализована модель зелёных потоков, похожая на Go. Вы могли завести огромное количество легковесных заданий, которые потом были распределены по реальным потокам ОС (иногда такая система называется M: N многопоточностью). В модели зелёных потоков функция read_exact заблокирует текущее задание, но не поток ОС; вместо этого, планировщик заданий переключится на другое задание. Это великолепно, можно использовать огромное количество заданий, большинство из которых блокировано, используя небольшое количество потоков ОС.

Проблема в том, что модель зелёных потоков шла в разрез амбициям Rust по полной замене Си, без какой-либо дополнительно навязанной системы выполнения или увеличения цены FFI: мы так и не нашли стратегию их реализации, которая бы не накладывала дополнительных серьёзных глобальных расходов. Вы можете почитать больше в RFC, в котором были удалены зелёные потоки.

Итак, если мы хотим держать большое число одновременных подключений, многие из которых ждут I/O, но при этом держать число потоков ОС на минимуме, что ещё мы можем сделать?

Асинхронный I/O — вот ответ. На самом деле он также используется и для реализации зелёных потоков.

В двух словах, благодаря async I/O вы можете попытаться выполнить операцию I/O без блокировки. Если она не может мгновенно выполниться, можно попробовать через какое-то время. Для того, чтобы это работало, ОС предоставляет различные инструменты, как epoll, позволяющие запросить, какие объекты из огромного списка I/O объектов готовы к чтению или записи — по существу это API, которое предоставляет mio.

Проблема в том, что надо выполнить много болезненной работы по слежению за списком интересных вам событий ввода-вывода, и передать эти события правильным обратным вызовам (не говоря уже о программировании чисто callback-driven способом). Это одна из ключевых проблем, которую решают futures.

Futures

Итак, что такое future?

По существу, future представляет собой значение, которое может быть ещё не готово. Обычно, future становится законченным (значение готово) после какого- то произошедшего события где-то в другом месте. Мы рассматривали их со стороны базового I/O, вы можете использовать future для представления огромного числа различных событий, например:

И так далее. Фишка futures в том, что их можно применять к асинхронным событиям любой формы и размера. Асинхронность отражается в факте правильного получения future, без блокировки, при том, что значение, которое представляет future, будет готовым только в какой-то момент в будущем (future).

В Rust мы представляем futures в виде типажа (например, интерфейса), грубо говоря:

1
2
3
4
trait Future {
    type Item;
    // ... все остальное опущено ...
}

Item говорит, какой тип значения future вернёт после выполнения.

Возвращаясь к нашему ранее приведённому списку примеров, напишем несколько функций, возвращающих различные futures (используя синтаксис impl):

1
2
3
4
5
6
7
8
// Получение строки из таблицы по id, возвращает строку после выполнения
fn get_row(id: i32) -> impl Future<Item = Row>;

// Вызов RPC, который возвращает i32 после выполнения
fn id_rpc(server: &RpcServer) -> impl Future<Item = i32>;

// Запись строки в TcpStream, возвращает stream после выполнения
fn write_string(socket: TcpStream, data: String) -> impl Future<Item = TcpStream>;

Все эти функции немедленно вернут future, случилось или нет событие, которое future представляет; функции неблокирующие.

Все становится ещё интереснее с futures, когда вы сочетаете их. Существует бесчисленное количество вариантов их сочетания, например:

В качестве простого примера использования futures выше, можем написать что-то вроде такого:

1
2
3
4
5
6
7
id_rpc(&my_server).and_then(|id| {
    get_row(id)
}).map(|row| {
    json::encode(row)
}).and_then(|encoded| {
    write_string(my_socket, encoded)
})

Смотри этот код с более конкретными примерами.

Это неблокирующий код, который проходит через несколько состояний: сначала мы делаем вызов RPC для получения ID; затем смотрим соответствующую строку; кодируем её в json; записываем её в сокет. Под капотом этого кода находится конечный автомат, меняющий своё состояние с помощью обратных вызовов (без дополнительных накладных расходов), хотя по стилю этот код очень похож на блокирующий код. (Rustaceans заметят, что эта история очень похожа на историю с Iterator в стандартной библиотеке.) Эргономичный, высокоуровневый код, компилирующийся в конечный-автомат-с-обратными-вызовами: вот к чему мы пришли наконец!

Кроме того, стоит учесть, что каждый из future, используемых здесь, может прийти из другой библиотеки. Абстракции future позволяют их легко объединять вместе.

Streams

Но погодите, есть ещё кое-что! Когда вы начнёте использовать комбинаторы для future, вы не только добьётесь равенства с простым блокирующим кодом, но сможете делать вещи гораздо более хитрые или те, что было бы очень трудно написать по- другому. Для примера нам нужен ещё один концепт: streams.

Futures предназначены для получения одного значения после того, как произойдёт какое-то событие, но часто источники событий периодически создают поток значений. Например, входящее TCP соединение или входящие запросы по сокету являются сами по себе потоками.

Библиотека futures также включает в себя типаж Stream, который очень похож на futures, но настраивается на создание последовательности значений с течением времени. Внутри есть набор комбинаторов, некоторые из которых работают с futures.

Например, если s это поток, то можно написать:

1
s.and_then(|val| some_future(val))

Этот код даст вам новый поток, который работает так: сначала достаётся первое значение val из s, вычисляется some_future(val) из него, затем выполняется этот future и возвращается его значение — затем все опять по кругу для вычисления следующего значения из потока.

Рассмотрим реальный пример:

1
2
3
4
5
6
7
8
9
// Обладая объектом I/O `input`, создаём поток запросов
let requests = ParseStream::new(input);

// Для каждого запроса запускаем нашу функцию сервиса `process` для обработки
// запроса и создаём ответ
let responses = requests.and_then(|req| service.process(req));

// Создаём новый future, который запишет каждый наш ответ в объект I/O `output`
StreamWriter::new(responses, output)

Здесь мы написали ядро простого сервера работающего на потоках. Это, конечно, не космонавтика, но все равно довольно круто манипулировать значениями responses, которые представляют весь сервер целиком.

Сделаем вещи ещё интереснее. Предположим, что протокол обмена конвейерный, т. е., что клиент может послать дополнительные запросы в сокет до того, как выполнятся ранее посланные. На самом деле мы хотим, чтобы обработка запросов шла последовательно, но тут имеет место и использование параллелизма: мы могли бы прочесть и обработать несколько запросов вперёд, пока обрабатывается текущий запрос. Сделать это также просто как добавить ещё один комбинатор в нужное место:

1
2
3
let requests = ParseStream::new(input);
let responses = requests.map(|req| service.process(req)).buffered(32); // <--
StreamWriter::new(responsesm, output)

Комбинатор buffered берет поток futures и буферизирует их до какого-то конечного числа. Буферизация потока означает, что будет с жадностью выдёргиваться большее, чем запрошено количество объектов, а полученные futures будут прятаться в буфер для дальнейшей обработки. В данном случае это означает, что будет читаться и обрабатываться до 32 дополнительных запросов в параллель, пока происходит обработка текущего запроса.

Это относительно простые примеры использования futures и streams, но мы надеемся, что они объяснят, как комбинаторы могут сильно помочь вам в высокоуровневом асинхронном программировании.

Нулевая стоимость?

Я несколько раз утверждал, что наша библиотека futures предоставляет абстракции с нулевой стоимостью, в которых компиляция происходит в что-то очень близкое к коду конечного автомата, который вы бы написали от руки. Чтобы быть более конкретным:

И так далее. Будущие блог-посты погрузят вас глубже в детали этих утверждений и покажут, как мы использовали преимущества Rust для получения нулевой стоимости.

Но лучше один раз увидеть, чем сто раз услышать. Мы написали простой фреймворк HTTP-серверов minihttp, который поддерживает конвейерную обработку и TLS. Этот сервер использует futures на каждом уровне своей реализации, начиная с чтения байт из сокета и заканчивая обработкой потоков запросов. Помимо того, что писать сервер таким образом — приятно, это дало возможность провести сильные стресс тесты накладных расходов абстракций futures.

Чтобы оценить эти накладные расходы, мы реализовали бенчмарк TechEmpower «plaintext».

Этот микротест проверяет «hello world» HTTP сервер, кидая большое число параллельных и конвейерных запросов. Из-за того, что работа, которую выполняет этот сервер по обработке запроса, тривиальна, производительность в большой степени зависит от базовых накладных расходов серверного фреймворка (в нашем случае, futures framework).

TechEmpower используется для сравнения большого количества веб фреймворков на разных языках. Мы сравнили minihttp с несколькими лучшими представителями:

Вот результаты, в количестве обслуженных «Hello world!» в секунду на 8 ядрах Linux машины:

alt

Кажется, можно уверенно сказать, что futures не вносят существенных дополнительных расходов.

Дополнительно: для предоставления дополнительных свидетельств, мы добавили сравнение minihttp с версией с вручную созданным конечным автоматом на Rust (см. «raw mio» по ссылке). Они находятся в 0,3% друг от друга.

Будущее

Итак, завершим наше ураганное введение в futures с нулевой стоимостью в Rust. Рассмотрим больше деталей в дальнейших постах.

На текущий момент библиотека достаточно готова к использованию, и довольно тщательно документирована; в неё входит tutorial и несколько примеров, включающих:

а также разные интеграции, т. е. основанные на futures интерфейсы к curl. Мы активно работаем с несколькими людьми из сообщества Rust для интеграции их работы; если вам интересно, пожалуйста, связывайтесь с Alex или со мной!

Если вам нужно низкоуровневое I/O программирование с futures, можете использовать futures-mio поверх mio. Мы считаем, что это перспективное направление для асинхронного программирования I/O в общем в Rust, последующие посты будут детально рассказывать об этом.

Кроме того, если вы просто хотите обрабатывать HTTP, вы можете работать с minihttp, предоставляя сервис: функцию, которая принимает HTTP запрос, и возвращает future HTTP ответа. Этот вид абстракций RPC/сервисов открывает двери написанию огромного числа повторного использования «middleware» для серверов, и нашёл отражение в библиотеке Twitter Finagle на Scala; он также используется в библиотеке Facebook Wangle. В мире Rust уже в разработке библиотека Tokio, которая строит абстракцию общего сервиса с помощью нашей библиотеки futures, и может играть роль, похожую на Finagle.

Предстоит ещё очень много работы:

Если вас что-то заинтересовало, мы готовы ответить на любые вопросы — мы это acrichto и aturon на каналах IRC по Rust. Присоединяйтесь!

Также заходите в чат русскоязычного сообщества — обсудим!