Что такое Tokio и Async IO и зачем это нужно?

оригинал: Manish Goregaokar • перевод: bmusin • обучение • поддержите на Patreon

Сообщество Rust в последнее время сконцентрировало много своих усилий на асинхронном вводе/выводе, реализованном в виде библиотеки Tokio. И это замечательно.

Многим из участников сообщества, тем, которые не работали с веб-серверами и связанными с этим вещами, не ясно, чего же мы хотим добиться. Когда эти вещи обсуждались во времена версии 1.0, я тоже имел смутное представление об этом, никогда прежде не работав с этим раньше.


Какую проблему мы пытаемся решить?

Одной из ключевых особенностей Rust является «безбоязненная конкурентность» (fearless concurrency). Однако тот вид конкурентности, который нужен для обработки большого количества задач, зависящих от производительности ввода/вывода, и который имеется в Go, Elixir, Erlang — отсутствует в Rust.

Давайте предположим, что вы хотите собрать что-то наподобие веб-сервера. Он будет обрабатывать тысячи запросов в каждый момент времени (проблема c10k). Говоря общими словами, рассматриваемая нами проблема состоит из многих задач, выполняющих в основном операции ввода/вывода (особенно связанных с сетевым взаимодействием).

«Одновременная обработка N задач» — такая задача лучше всего решается использованием потоков. Однако… Тысячи потоков? Наверное, это слишком много. Работа с потоками может быть довольно ресурсозатратной: каждый поток должен выделить большой стек, настроить поток, используя набор системных вызовов. Ко всему прочему переключение контекста тоже затратно.

Конечно, тысячи одновременно работающих потоков не будет: вы имеете ограниченное число ядер, и в любой момент времени только один поток будет исполняться на этом ядре.

Но в примерах подобных данному веб-серверу большинство этих потоков не будут делать никакой работы. Они будут ждать или поступления запроса, или отправки ответа.

C обычными потоками, когда вы производите блокирующую операцию ввода/вывода, системный вызов возвращает управление ядру, которое не возвратит потоку управление обратно, потому что, вероятно, операция ввода/вывода ещё не завершилась. Вместо этого ядро будет использовать данный момент как возможность «подгрузить» (swap in) другой поток и продолжить выполнение исходного потока (начавшего операцию ввода/вывода) когда операция ввода/вывода будет завершена, то есть когда исходный поток будет разблокирован. Вот так вы решаете такие задачи в Rust, когда не используете Tokio и подобные ей библиотеки — запускаете миллион потоков и позволяете ОС самостоятельно планировать запуск и завершение потоков в зависимости от ввода/вывода.

Но, как мы уже выяснили, использование потоков не очень хорошо масштабируется для задач, подобных данной. (1)

Нам нужны более «лёгкие» потоки.


Модель многопоточности основанная на легковесных потоках

Думаю, для того чтобы лучше понять данную модель, нужно на некоторое время отвлечься от Rust и посмотреть на язык, который справляется с этим хорошо, Go.

Так, Go имеет легковесные потоки, называемые горутинами. Вы запускаете их посредством ключевого слова go. Веб-сервер может выполнять код, подобный следующему:

1
2
3
4
5
6
7
8
9
listener, err = net.Listen(...)
// обработать ошибку
for {
    conn, err := listener.Accept()
    // обработать ошибку

    // запустить горутину
    go handler(conn)
}

Это цикл, который ожидает поступления новых TCP-запросов и запускает новую горутину с функцией handler. Каждое сетевое соединение будет обрабатываться отдельной горутиной, которая будет уничтожаться при завершении выполнения функции handler. В то же время главный цикл будет выполняться, потому что он работает в отдельной горутине.

Если это не настоящие (поддерживаемые ОС) потоки, то что тогда происходит?

Горутины — это легковесные потоки, о которых ОС не имеет никакой информации. ОС видит только N потоков, принадлежащих системе выполнения Go, и Go распределяет между этими потоками M горутин, выгружая и подгружая их подобно планировщику ОС. Это возможно, потому что Go допускает прерывания своего кода для сборки мусора, так что планировщик всегда может «попросить» горутину остановиться. Горутины также возвращают выполнение планировщику на время ожидания операций ввода/вывода.

Планировщик осведомлён о системе ввода/вывода, поэтому когда горутина ждёт завершения операции ввода/вывода, она возвращает право на исполнение планировщику обратно.

В сущности скомпилированная Go-функция будет иметь набор разбросанных по ней мест, где она говорит планировщику и GC: «Возьмите управление себе, если вы хотите» (и также «Я ожидаю то-то и то-то, пожалуйста, возьмите контроль себе).

Когда горутина подгружена в поток ОС, некоторые регистры будут сохранены и указатель на текущую инструкцию будет переведён на новую горутину.

Но что происходит со стеком? Потоки ОС имеют большой стек при себе, он нужен для того, чтобы функции могли работать.

Go использует сегментированные стеки. Каждый поток требует сравнительно большой стек, потому что большинство языков программирования (включая С) поддерживают только непрерывные стеки с постоянным размером. Это позволяет быть уверенным в том, что указатели на стек никогда не будут испорчены перевыделением памяти.

Так что мы благоразумно резервируем для себе весь стек, который по нашему мнению, нам может понадобиться (примерно 8 МБ). При этом мы ожидаем, что этого нам хватит.

Но ожидание того, что стеки будут непрерывными, строго говоря, не обязательно. В Go стеки состоят из небольших кусочков. Когда функция вызывается, она проверяет, достаточно ли места на стеке для её исполнения, и если нет, то выделяет новый кусочек стека и запускается на нём. Так что если вы захотите иметь тысячи потоков, выполняющих небольшой объём работы, они все получат тысячи небольших стеков, и всё будет хорошо.

На самом деле сегодня Go делает немножко другое: он копирует стеки. Я упомянул, что стеки не могут быть просто так перевыделены, Ожидается, что данные на стеке будут оставаться на том же месте. Но это не всегда так, потому что в Go есть GC, а поэтому он в любом случае знает про все указатели и может при необходимости перезаписывать те из них, что указывают на стек.

В любом случае система выполнения Go хорошо работает с подобными вещами. горутины очень дёшевы, и вы можете запустить их тысячами, без каких бы то ни было проблем.

Rust поддерживал легковесные/green потоки (думаю, что он использовал сегментированные стеки). Однако Rust тщательно следит за тем, чтобы вы не платили за те вещи, которые не используете, а это (использование легковесных потоков) накладывает ограничения на весь ваш код, даже если вы эти самые легковесные потоки не используете.

Легковесные потоки были удалены из Rust до выпуска версии 1.0.


Асинхронный ввод/вывод

Как было упомянуто в предыдущем разделе, с обычным блокирующим вводом/выводом, в момент, когда вы начинаете операцию ввода/вывода, вашему потоку не будет позволено продолжать выполнение (он станет заблокированным) до тех пор, пока операция не будет завершена. Это хорошо, когда вы работаете с потоками ОС (планировщик ОС делает за вас всю работу!), но если вы имеет легковесные потоки, вы захотите заменить один легковесный поток на другой.

Вместо этого вы используете неблокирующий ввод/вывод, где поток ставит в очередь запрос на выполнение ввода/вывода в ОС и продолжает выполнение. Запрос ввода/вывода будет выполнен ядром через некоторое время. После этого поток должен будет спросить у ОС: «Завершилась ли моя операция ввода/вывода?», перед тем как использовать результат ввода/вывода.

Разумеется, частое выяснение у ОС того, завершила ли она ввод/вывод, требует ресурсы. Вот почему имеются такие системные вызовы, как epoll. Здесь вы можете собрать вместе набор незавершённых операций ввода/вывода и затем попросить ОС разбудить ваш поток, когда когда какая-либо из этих операций будет завершена. Так что вы можете иметь поток планировщика (настоящий поток), который выгружает легковесные потоки, ожидающие завершения ввода/вывода. Когда же ничего не происходит, поток планировщика может уйти в сон, вызвав epoll, ожидая до тех пор, пока ОС не разбудит его (когда одна из операций ввода/вывода будет завершена).

Задействованный здесь внутренний механизм, вероятно, сложнее.

Вернёмся к Rust. Rust имеет библиотеку mio, которая является платформо-независимой обёрткой над неблокирующим вводом/выводом и такими инструментами, как epoll (GNU/Linux), kqueue (FreeBSD), и т. д. Это строительный блок, и хотя тем, которые привыкли использовать epoll в C напрямую, это может показаться удобным, это не предоставляет замечательной модели, как в Go. Однако мы можем этого достичь.


Футуры

Являются ещё одним строительным блоком. Future — обещание того, что рано или поздно будет получено значение (в JavaScript они называются Promise).

Например, вы можете ожидать приход запроса на сетевой сокет и получить Future обратно (на самом деле Stream, который подобен футуре, но используется для получения последовательности значений). Эта Future не будет содержать в себе ответа, но будет знать, когда он придёт. Вы можете вызвать wait() у Future, который будет заблокирован до тех пор, пока не будет получено результирующее значение. Также вы можете вызвать poll() у Future, спрашивая, не готов ли у Future ответ (она даст вам полученный результат, если он имеется).

Футуры могут быть связаны в цепочку, так что вы можете писать код наподобие future.then(|result| process(result)). Переданное then замыкание само может произвести ещё одну футуру, так что вы можете соединять в цепочку несколько сущностей, например, операции ввода/вывода. C футурами на цепочке poll() является способом постепенного исполнения программы; каждый раз, когда вы вызываете её, она будет переходить к следующей футуре, при условии, что текущая футура готова (содержит результат).

Это хорошее объяснение работы неблокирующего ввода/вывода.

Связывание футур в цепочку работает подобно связанным в цепочку итераторам. Каждый вызов and_then (или любой другой комбинатор) возвращает структуру, которая содержит в себе внутреннюю футуру, которая дополнительно может содержать в себе замыкание. Замыкания содержат внутри себя все нужные ссылки и данные.


🗼 Tokio 🗼

Tokio — это в сущности замечательная обёртка над mio, которая использует футуры. Tokio имеет главный цикл обработки событий, в который вы можете передавать замыкания, возвращающие футуры. Этот цикл будет выполнять все замыкания, которые вы ему передадите, используя mio для выяснения того, какие футуры, могут прогрессировать, и продвигать их далее (вызывая poll()).

На идейном уровне это уже довольно похоже на то, что делает Go. Вы должны вручную настроить главный событийный цикл, и как только вы сделаете это, вы сможете передавать циклу задачи, которые периодически делают ввод/вывод. Каждый раз когда одна из текущих задач будет заблокирована на операции ввода/вывода, цикл будет подгружать новую задачу. Ключевым отличием является то, что Tokio работает в одном потоке, в то время как планировщик Go может использовать несколько потоков ОС для исполнения. Однако вы можете переместить задачи, сильно зависящие от процессора, на другие потоки ОС и использовать каналы для их координирования, что не сложно.

Хотя на идейном уровне это похоже на то, что мы имеем в Go, код выглядит не очень приглядно. Следующий код на Go:

1
2
3
4
5
6
7
8
9
10
// обработка ошибок опущена для простоты

func foo(...) ReturnType {
    data := doIo()
    result := compute(data)
    moreData = doMoreIo(result)
    moreResult := moreCompute(data)
    // ...
    return someFinalResult
}

на Rust выглядит так:

1
2
3
4
5
6
7
// обработка ошибок опущена для простоты

fn foo(...) -> Future<ReturnType, ErrorType> {
    do_io().and_then(|data| do_more_io(compute(data)))
          .and_then(|more_data| do_even_more_io(more_compute(more_data)))
    // ......
}

Не красиво. Код становится хуже, если вы вводите ветвление и циклы. Проблемой является то, что в Go мы получали точки прерывания бесплатно, но в Rust мы должны вручную кодировать это связыванием комбинаторов в цепочку, получая некое подобие машины состояний (автомата).


Генераторы и async/await

Это то место, где появляются генераторы (ещё их называют корутинами). Генераторы являются экспериментальной возможностью в Rust. Вот пример:

1
2
3
4
5
6
7
8
9
10
let mut generator = || {
    let i = 0;
    loop {
        yield i;
        i += 1;
    }
};
assert_eq!(generator.resume(), GeneratorState::Yielded(0));
assert_eq!(generator.resume(), GeneratorState::Yielded(1));
assert_eq!(generator.resume(), GeneratorState::Yielded(2));

Функции — это сущности, который выполняют задачу и возвращают результат вызывающему коду один раз. А генераторы могут возвращать несколько раз: они останавливают выполнение для того, чтобы вернуть некоторые данные, и могут быть продолжены, при это они будут выполняться до следующего yield. Хотя мой пример не показывает этого, генераторы могут завершать выполнение так же, как и обычные функции.

Замыкания в Rust являются синтаксическим сахаром над структурой, которая содержит захваченные переменные + реализацию одного из Fn-типажей, для того, чтобы сделать структуру вызываемой.

Генераторы похожи на них, ко всему прочему они реализуют типаж Generator и обычно содержат в себе enum, который представляет разные состояния.

«Нестабильная» книга содержит несколько примеров того, как выглядит данная машина состояний.

Это гораздо ближе к тому, что мы искали! Теперь наш код выглядит так:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fn foo(...) -> Future<ReturnType, ErrorType> {
    let generator = || {
        let mut future = do_io();
        let data;
        loop {
            // опросить футуру, возвращая управление каждый раз
            // когда случается ошибка, но продолжая при успехе
            match future.poll() {
                Ok(Async::Ready(d)) => { data = d; break },
                Ok(Async::NotReady(d)) => (),
                Err(..) => ...
            };
            yield future.polling_info();
        }
        let result = compute(data);
        // делать то же самое для `doMoreIo()`, и т. д.
    }

    futurify(generator)
}

futurify — функция, принимающая генератор и возвращающая футуру, которая при каждом вызове poll будет делать resume() генератора и возвращать NotReady до тех пор, пока генератор не завершит выполнение.

Но постойте, это ещё более уродливо! В чём смысл преобразования нашей относительно чистой цепочки из callback’ов в это «месиво»?

Если вы посмотрите внимательно, то увидите, что код является последовательным. Мы преобразовали наш callback-код в тот же линейный поток, как и Go-код, однако он содержит странный yield-код в цикле, futurify тоже выглядит не приглядно.

Здесь на помощь приходит futures-await. futures-await — это процедурное макроопределение, которое убирает данный избыточный код. В сущности оно позволяет переписать функцию так:

1
2
3
4
5
6
#[async]
fn foo(...) -> Result<ReturnType, ErrorType> {
    let data = await!(do_io());
    let result = compute(data);
    let more_data = await!(do_more_io());
    // ....

Аккуратно и чисто. Почти так же чисто, как и код на Go, также мы непосредственно вызываем await!(). Данные await-вызовы предоставляют ту же функциональность, что и автоматически установленные точки прерывания в Go.

И, разумеется, так как код использует генераторы, вы можете использовать цикл, вводить ветвление и делать все, что вам хочется, а код все так же будет оставаться чистым.


Подводя итоги

Футуры в Rust могут быть соединены в цепочку для предоставления легковесной стеко-подобной системы. С async/await мы можете красиво писать цепочки футур, а await предоставляет явные точки прерывания на каждой операции ввода/вывода. Tokio предоставляет «планировщик» — главный событийный цикл, которому вы можете передавать асинхронные функции, «под капотом» используется mio для абстрагирования от низкоуровневых неблокирующих примитивов ввода/вывода.

Это компоненты, которые могут быть использованы по отдельности — вы можете использовать Tokio с футурами, без async/await. Вы можете использовать async/await без использования Tokio. Например, это может подходить сетевому стеку Servo. Ему не нужно делать очень много операций параллельного ввода/вывода (по крайней мере, не порядка тысячи потоков), так что он может использовать мультиплексированные потоки ОС. Однако мы по-прежнему хотим иметь пул потоков и последовательно обрабатывать данные, а async/await является здесь хорошим подспорьем.

Обобщим: все эти компоненты, несмотря на небольшое количество избыточного кода, дают почти то же самое, что и горутины в Go. А так как генераторы (а следовательно, и async/await) хорошо сосуществуют c анализатором заимствований, то инварианты безопасности, которые поддерживает Rust, по-прежнему в силе, и мы получаем «безбоязненную конкурентность» для программ, которые выполняют большой объем задач ввода/вывода.


Apache HTTP Server использует потоки ОС. Часто потоки ОС являются наилучшим инструментом для решения задачи.