Futures нулевой стоимости в Rust
Замечание: проект futures-rs был реорганизован и многие вещи были переименованы. Где возможно, ссылки были обновлены.
Одним из основных пробелов в экосистеме Rust был быстрый и эффективный асинхронный ввод/вывод. У нас есть прочный фундамент из библиотеки mio, но она очень низкоуровневая: приходится вручную создавать конечные автоматы и жонглировать обратными вызовами.
Нам бы хотелось чего-нибудь более высокоуровневого, с лучшей эргономикой, но чтобы оно обладало хорошей компонуемостью, поддерживая экосистему асинхронных абстракций, работающих вместе. Звучит очень знакомо: ту же цель преследовало внедрение futures (или promises) во многие языки, поддерживающие синтаксический сахар в виде async/await на вершине.
Основным принципом Rust является возможность строить абстракции с нулевой стоимостью, что приводит нас к дополнительной цели нашего рассказа о async I/O: в идеале абстракции как futures должны компилироваться в что-то эквивалентное коду в виде конечных-автоматов-и-жонглирования-обратными-вызовам, который мы сегодня пишем (без дополнительных накладных расходов во времени исполнения).
Последние несколько месяцев, Alex Crichton и я разрабатывали библиотеку futures нулевой стоимости для Rust, ту, которая, мы считаем, позволит достичь этих целей. (Спасибо Carl Lerche, Yehuda Katz, и Nicholas Matsakis за понимание на все пути.)
Сегодня мы рады начать серию статей о новой библиотеке. В этом посте рассказываются самые яркие моменты, ключевые идеи и несколько предварительных тестов. Дальнейшие посты покажут, как возможности Rust используются в проектировании этих абстракций с нулевой стоимостью. Также вас уже ждёт tutorial.
Почему async I/O?
Прежде, чем копать futures, полезно будет рассказать немного о прошлом.
Начнём с маленького кусочка I/O, который вы хотели бы выполнить: чтение определённого количества байт из сокета. Rust предоставляет функцию read_exact для этого:
1 2 | // reads 256 bytes into `my_vec` socket.read_exact(&mut my_vec[..256]); |
Быстрый вопрос: что происходит, если у нас ещё недостаточно байт получено от сокета?
Для сегодняшнего Rust ответ такой: текущий поток блокируется, засыпая пока не будут получены ещё байты. Но так было не всегда.
Давным-давно в Rust была реализована модель зелёных потоков, похожая на Go. Вы могли завести огромное количество легковесных заданий, которые потом были распределены по реальным потокам ОС (иногда такая система называется M: N многопоточностью). В модели зелёных потоков функция read_exact заблокирует текущее задание, но не поток ОС; вместо этого, планировщик заданий переключится на другое задание. Это великолепно, можно использовать огромное количество заданий, большинство из которых блокировано, используя небольшое количество потоков ОС.
Проблема в том, что модель зелёных потоков шла в разрез амбициям Rust по полной замене Си, без какой-либо дополнительно навязанной системы выполнения или увеличения цены FFI: мы так и не нашли стратегию их реализации, которая бы не накладывала дополнительных серьёзных глобальных расходов. Вы можете почитать больше в RFC, в котором были удалены зелёные потоки.
Итак, если мы хотим держать большое число одновременных подключений, многие из которых ждут I/O, но при этом держать число потоков ОС на минимуме, что ещё мы можем сделать?
Асинхронный I/O — вот ответ. На самом деле он также используется и для реализации зелёных потоков.
В двух словах, благодаря async I/O вы можете попытаться выполнить операцию I/O без блокировки. Если она не может мгновенно выполниться, можно попробовать через какое-то время. Для того, чтобы это работало, ОС предоставляет различные инструменты, как epoll, позволяющие запросить, какие объекты из огромного списка I/O объектов готовы к чтению или записи — по существу это API, которое предоставляет mio.
Проблема в том, что надо выполнить много болезненной работы по слежению за списком интересных вам событий ввода-вывода, и передать эти события правильным обратным вызовам (не говоря уже о программировании чисто callback-driven способом). Это одна из ключевых проблем, которую решают futures.
Futures
Итак, что такое future?
По существу, future представляет собой значение, которое может быть ещё не готово. Обычно, future становится законченным (значение готово) после какого- то произошедшего события где-то в другом месте. Мы рассматривали их со стороны базового I/O, вы можете использовать future для представления огромного числа различных событий, например:
- Запрос к БД, который выполняется в пуле потоков. Если запрос выполнился, future станет законченным, а в его значении будет результат запроса.
- Выполнение RPC на сервере. Если сервер ответил, future станет законченным, а в его значении будет ответ сервера.
- Таймаут. Если время вышло, future станет законченным, а его значением будет () (единичное значение в Rust).
- Долго выполняющееся CPU-затратное задание, выполняющееся в пуле потоков. Когда задание заканчивается, future станет законченным, а его значением будет значение задания.
- Чтение байт из сокета. Если байты готовы, future станет законченным — и в зависимости от стратегии буферизации, байты могут быть получены напрямую или записаны в дополнительный уже существующий буфер.
И так далее. Фишка futures в том, что их можно применять к асинхронным событиям любой формы и размера. Асинхронность отражается в факте правильного получения future, без блокировки, при том, что значение, которое представляет future, будет готовым только в какой-то момент в будущем (future).
В Rust мы представляем futures в виде типажа (например, интерфейса), грубо говоря:
1 2 3 4 | trait Future { type Item; // ... все остальное опущено ... } |
Item говорит, какой тип значения future вернёт после выполнения.
Возвращаясь к нашему ранее приведённому списку примеров, напишем несколько функций, возвращающих различные futures (используя синтаксис impl):
1 2 3 4 5 6 7 8 | // Получение строки из таблицы по id, возвращает строку после выполнения fn get_row(id: i32) -> impl Future<Item = Row>; // Вызов RPC, который возвращает i32 после выполнения fn id_rpc(server: &RpcServer) -> impl Future<Item = i32>; // Запись строки в TcpStream, возвращает stream после выполнения fn write_string(socket: TcpStream, data: String) -> impl Future<Item = TcpStream>; |
Все эти функции немедленно вернут future, случилось или нет событие, которое future представляет; функции неблокирующие.
Все становится ещё интереснее с futures, когда вы сочетаете их. Существует бесчисленное количество вариантов их сочетания, например:
-
Sequential composition: f.and_then(|val| some_new_future(val)). Возвращает вам future, который выполняет future
f
, беретval
, который он создаёт, и строит ещё один futuresome_new_future(val)
, затем выполняет его. -
Mapping:
f.map(|val| some_new_value(val))
. Возвращает вам future, который выполняет futuref
и применяет его к результатуsome_new_value(val)
. -
Joining:
f.join(g)
. Возвращает вам future, который выполняет futuresf
иg
в параллель, и заканчивается, когда оба из них закончатся, возвращая оба их значения. -
Selecting:
f.select(g)
. Возвращает вам future, который выполняет futuresf
иg
в параллель, и заканчивается, когда один из них закончится, возвращая его значения и другой future. (Хотите добавить таймаут к любому future? Просто выполнитеselect
этого future и future таймаута!)
В качестве простого примера использования futures выше, можем написать что-то вроде такого:
1 2 3 4 5 6 7 | id_rpc(&my_server).and_then(|id| { get_row(id) }).map(|row| { json::encode(row) }).and_then(|encoded| { write_string(my_socket, encoded) }) |
Смотри этот код с более конкретными примерами.
Это неблокирующий код, который проходит через несколько состояний: сначала мы делаем вызов RPC для получения ID; затем смотрим соответствующую строку; кодируем её в json; записываем её в сокет. Под капотом этого кода находится конечный автомат, меняющий своё состояние с помощью обратных вызовов (без дополнительных накладных расходов), хотя по стилю этот код очень похож на блокирующий код. (Rustaceans заметят, что эта история очень похожа на историю с Iterator в стандартной библиотеке.) Эргономичный, высокоуровневый код, компилирующийся в конечный-автомат-с-обратными-вызовами: вот к чему мы пришли наконец!
Кроме того, стоит учесть, что каждый из future, используемых здесь, может прийти из другой библиотеки. Абстракции future позволяют их легко объединять вместе.
Streams
Но погодите, есть ещё кое-что! Когда вы начнёте использовать комбинаторы для future, вы не только добьётесь равенства с простым блокирующим кодом, но сможете делать вещи гораздо более хитрые или те, что было бы очень трудно написать по- другому. Для примера нам нужен ещё один концепт: streams.
Futures предназначены для получения одного значения после того, как произойдёт какое-то событие, но часто источники событий периодически создают поток значений. Например, входящее TCP соединение или входящие запросы по сокету являются сами по себе потоками.
Библиотека futures также включает в себя типаж Stream, который очень похож на futures, но настраивается на создание последовательности значений с течением времени. Внутри есть набор комбинаторов, некоторые из которых работают с futures.
Например, если s
это поток, то можно написать:
1 | s.and_then(|val| some_future(val)) |
Этот код даст вам новый поток, который работает так: сначала достаётся первое
значение val
из s
, вычисляется some_future(val)
из него, затем выполняется
этот future и возвращается его значение — затем все опять по кругу для
вычисления следующего значения из потока.
Рассмотрим реальный пример:
1 2 3 4 5 6 7 8 9 | // Обладая объектом I/O `input`, создаём поток запросов let requests = ParseStream::new(input); // Для каждого запроса запускаем нашу функцию сервиса `process` для обработки // запроса и создаём ответ let responses = requests.and_then(|req| service.process(req)); // Создаём новый future, который запишет каждый наш ответ в объект I/O `output` StreamWriter::new(responses, output) |
Здесь мы написали ядро простого сервера работающего на потоках. Это, конечно, не
космонавтика, но все равно довольно круто манипулировать значениями responses
,
которые представляют весь сервер целиком.
Сделаем вещи ещё интереснее. Предположим, что протокол обмена конвейерный, т. е., что клиент может послать дополнительные запросы в сокет до того, как выполнятся ранее посланные. На самом деле мы хотим, чтобы обработка запросов шла последовательно, но тут имеет место и использование параллелизма: мы могли бы прочесть и обработать несколько запросов вперёд, пока обрабатывается текущий запрос. Сделать это также просто как добавить ещё один комбинатор в нужное место:
1 2 3 | let requests = ParseStream::new(input); let responses = requests.map(|req| service.process(req)).buffered(32); // <-- StreamWriter::new(responsesm, output) |
Комбинатор buffered берет поток futures
и
буферизирует их до какого-то конечного числа. Буферизация потока означает, что
будет с жадностью выдёргиваться большее, чем запрошено количество объектов, а
полученные futures будут прятаться в буфер для дальнейшей обработки. В данном
случае это означает, что будет читаться и обрабатываться до 32 дополнительных
запросов в параллель, пока происходит обработка текущего запроса.
Это относительно простые примеры использования futures и streams, но мы надеемся, что они объяснят, как комбинаторы могут сильно помочь вам в высокоуровневом асинхронном программировании.
Нулевая стоимость?
Я несколько раз утверждал, что наша библиотека futures предоставляет абстракции с нулевой стоимостью, в которых компиляция происходит в что-то очень близкое к коду конечного автомата, который вы бы написали от руки. Чтобы быть более конкретным:
-
Ни один из комбинаторов future не производит никакого выделения памяти. Когда мы создаём цепочки, используя
and_then
, мы не только не аллоцируем ничего, мы на самом деле строим большойenum
, представляющий конечный автомат. (Одна аллокация нужна для задания, что обычно означает одну аллокацию на соединение.) -
Когда происходит событие, нужен только один динамический вызов.
-
Практически отсутствуют затраты, налагаемые синхронизацией; если вы хотите связать данные, находящиеся в вашем цикле событий, и иметь доступ к ним в однопоточном виде из futures, мы даём вам инструменты для этого.
И так далее. Будущие блог-посты погрузят вас глубже в детали этих утверждений и покажут, как мы использовали преимущества Rust для получения нулевой стоимости.
Но лучше один раз увидеть, чем сто раз услышать. Мы написали простой фреймворк HTTP-серверов minihttp, который поддерживает конвейерную обработку и TLS. Этот сервер использует futures на каждом уровне своей реализации, начиная с чтения байт из сокета и заканчивая обработкой потоков запросов. Помимо того, что писать сервер таким образом — приятно, это дало возможность провести сильные стресс тесты накладных расходов абстракций futures.
Чтобы оценить эти накладные расходы, мы реализовали бенчмарк TechEmpower «plaintext».
Этот микротест проверяет «hello world» HTTP сервер, кидая большое число параллельных и конвейерных запросов. Из-за того, что работа, которую выполняет этот сервер по обработке запроса, тривиальна, производительность в большой степени зависит от базовых накладных расходов серверного фреймворка (в нашем случае, futures framework).
TechEmpower используется для сравнения большого количества веб фреймворков на разных языках. Мы сравнили minihttp с несколькими лучшими представителями:
- rapidoid, фреймворка на Java, который был первым в последнем раунде официальных тестов.
- Go, реализация, использующая поддержку HTTP из стандартной библиотеки Go.
- fasthttp, конкурент стандартной библиотеки Go.
- node.js.
Вот результаты, в количестве обслуженных «Hello world!» в секунду на 8 ядрах Linux машины:
Кажется, можно уверенно сказать, что futures не вносят существенных дополнительных расходов.
Дополнительно: для предоставления дополнительных свидетельств, мы добавили сравнение minihttp с версией с вручную созданным конечным автоматом на Rust (см. «raw mio» по ссылке). Они находятся в 0,3% друг от друга.
Будущее
Итак, завершим наше ураганное введение в futures с нулевой стоимостью в Rust. Рассмотрим больше деталей в дальнейших постах.
На текущий момент библиотека достаточно готова к использованию, и довольно тщательно документирована; в неё входит tutorial и несколько примеров, включающих:
- простой TCP echo server
- эффективный SOCKSv5 proxy server
minihttp
, высокоэффективный HTTP server, поддерживающий TLS и использующий Hyper’s parser- пример использования minihttp для TLS соединений,
а также разные интеграции, т. е. основанные на futures интерфейсы к curl. Мы активно работаем с несколькими людьми из сообщества Rust для интеграции их работы; если вам интересно, пожалуйста, связывайтесь с Alex или со мной!
Если вам нужно низкоуровневое I/O программирование с futures, можете использовать futures-mio поверх mio. Мы считаем, что это перспективное направление для асинхронного программирования I/O в общем в Rust, последующие посты будут детально рассказывать об этом.
Кроме того, если вы просто хотите обрабатывать HTTP, вы можете работать с
minihttp,
предоставляя сервис: функцию, которая принимает HTTP запрос, и
возвращает future
HTTP ответа. Этот вид абстракций RPC/сервисов открывает двери
написанию огромного числа повторного использования «middleware» для серверов, и
нашёл отражение в библиотеке Twitter
Finagle на Scala; он также используется в
библиотеке Facebook Wangle. В мире Rust
уже в разработке библиотека Tokio, которая строит абстракцию общего сервиса с помощью
нашей библиотеки futures, и может играть роль, похожую на Finagle.
Предстоит ещё очень много работы:
- Для начала, мы бы очень хотели получить отзыв об абстракциях future и stream, также есть некоторые особые детали для некоторых комбинаторов, в которых мы не уверены.
- Во-вторых, хотя мы создали несколько абстракций future вокруг базовых принципов I/O, есть ещё много мест, где их можно использовать, и мы будем рады получить помощь в этом.
- В более широком смысле, надо написать ещё бесконечное число «привязок» futures к разным библиотекам (как на Си так и на Rust); если у вас есть библиотека, к которой вы хотите привязать futures, мы готовы помочь!
- В глобальной перспективе следующим очевидным шагом будет исследование нотации async/await поверх futures, возможно тем же образом, что предлагается в Javascript. Но для начала мы хотим получить побольше опыта, используя futures напрямую как библиотеку, перед тем, как решимся на такой шаг.
Если вас что-то заинтересовало, мы готовы ответить на любые вопросы — мы это acrichto и aturon на каналах IRC по Rust. Присоединяйтесь!
Также заходите в чат русскоязычного сообщества — обсудим!