Использование потоков для одновременного выполнения кода

В большинстве современных операционных систем программный код выполняется в виде процесса, причём операционная система способна управлять несколькими процессами сразу. Программа, в свою очередь, может состоять из нескольких независимых частей, выполняемых одновременно. Конструкция, благодаря которой эти независимые части выполняются, называется потоком. Например, веб-сервер может иметь несколько потоков для того, чтобы он мог обрабатывать больше одного запроса за раз.

Разбиение вычислений на несколько потоков может повысить производительность программы, поскольку программа выполняет несколько задач одновременно, но такое разбиение также добавляет сложности. Поскольку потоки могут работать одновременно, нет чёткой гарантии, определяющей порядок выполнения частей вашего кода в разных потоках. Это может привести к таким проблемам, как:

  • Состояния гонки, когда потоки обращаются к данным, либо ресурсам, несогласованно.
  • Взаимные блокировки, когда два потока ожидают друг друга, не позволяя тем самым продолжить работу каждому из потоков.
  • Ошибки, которые случаются только в определённых ситуациях, которые трудно воспроизвести и, соответственно, трудно надёжно исправить.

Rust пытается смягчить негативные последствия использования потоков, но программирование в многопоточном контексте все ещё требует тщательного обдумывания структуры кода, которая отличается от структуры кода программ, работающих в одном потоке.

Языки программирования реализуют потоки несколькими различными способами, и многие операционные системы предоставляют API, который язык может вызывать для создания новых потоков. Стандартная библиотека Rust использует модель реализации потоков 1:1, при которой одному потоку операционной системы соответствует ровно один "языковой" поток. Существуют крейты, в которых реализованы другие модели многопоточности, отличающиеся от модели 1:1.

Создание нового потока с помощью spawn

Чтобы создать новый поток, мы вызываем функцию thread::spawn и передаём ей замыкание (мы говорили о замыканиях в главе 13), содержащее код, который мы хотим запустить в новом потоке. Пример в листинге 16-1 печатает некоторый текст из основного потока, а также другой текст из нового потока:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Листинг 16-1: Создание нового потока для печати определённого текста, в то время как основной поток печатает что-то другое

Обратите внимание, что когда основной поток программы на Rust завершается, все порождённые потоки закрываются, независимо от того, завершили они работу или нет. Вывод этой программы может каждый раз немного отличаться, но он будет выглядеть примерно так:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

Вызовы thread::sleep заставляют поток на короткое время останавливать своё выполнение, позволяя выполняться другим потокам. Очерёдность выполнения потоков вероятно будет меняться, но это не гарантировано: это зависит от того, как ваша операционная система планирует потоки. В этом цикле основной поток печатает первым, несмотря на то, что инструкция печати из порождённого потока появляется раньше в коде. И даже несмотря на то, что мы проинструктировали порождённый поток печатать до тех пор, пока значение i не достигнет числа 9, оно успело дойти только до 5, когда основной поток завершился.

Если вы запустите этот код и увидите вывод только из основного потока или не увидите печати из других потоков, попробуйте увеличить числа в диапазонах, чтобы дать операционной системе больше возможностей для переключения между потоками.

Ожидание завершения работы всех потоков используя join

Код в листинге 16-1 преждевременно останавливает порождённый поток в большинстве случаев, из-за завершения основного потока. Более того, так как порядок выполнения потоков чётко не определён, этот код не даёт гарантии, что порождённый поток вообще начнёт исполняться!

Мы можем исправить проблему, когда созданный поток не запускается или завершается преждевременно, сохранив возвращаемое значение thread::spawn в какой-либо переменной. Тип возвращаемого значения thread::spawnJoinHandle . JoinHandle — это владеющее значение, которое, при вызове метода join , будет ждать завершения своего потока. Листинг 16-2 демонстрирует, как использовать JoinHandle потока, созданного в листинге 16-1, и вызывать функцию join , для того, чтобы убедиться, что порождённый поток завершится раньше, чем поток main:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

Листинг 16-2. Сохранение значения JoinHandle потока thread::spawn , гарантирующее, что поток выполнит всю необходимую работу, перед тем, как завершится

Вызов join у дескриптора блокирует текущий поток, пока поток, представленный дескриптором не завершится. Блокировка потока означает, что потоку запрещено выполнять работу или выходить из него. Поскольку мы поместили вызов join после цикла for основного потока, выполнение листинга 16-2 должно привести к выводу, подобному следующему:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

Два потока продолжают чередоваться, но основной поток находится в ожидании из-за вызова handle.join() и не завершается до тех пор, пока не завершится запущенный поток.

Но давайте посмотрим, что произойдёт, если мы вместо этого переместим handle.join() перед циклом for в main, например так:

Файл: src/main.rs

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

Основной поток будет ждать завершения порождённого потока, а затем запустит свой цикл for , поэтому выходные данные больше не будут чередоваться, как показано ниже:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

Небольшие детали, такие как место вызова join, могут повлиять на то, выполняются ли ваши потоки одновременно.

Использование move-замыканий в потоках

Мы часто используем ключевое слово move с замыканиями, переданными в thread::spawn, потому что в этом случае замыкание получает из окружения права владения на используемые им значения, таким образом передавая права владения этими значениями от одного потока к другому. В разделе "Захват ссылок или перемещение прав владения" главы 13 мы обсудили move в контексте замыканий. Теперь мы сосредоточимся на взаимодействии между move и thread::spawn.

Обратите внимание, что в листинге 16-1 замыкание, которое мы передаём в thread::spawn не принимает аргументов: мы не используем никаких данных из основного потока в коде порождённого потока. Чтобы использовать данные из основного потока в порождённом потоке, замыкание порождённого потока должно захватывать значения, которые ему необходимы. Листинг 16-3 показывает попытку создать вектор в главном потоке и использовать его в порождённом потоке. Тем не менее, это не будет работать, как вы увидите через мгновение.

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-3: Попытка использовать вектор, созданный основным потоком, в другом потоке

Замыкание использует переменную v, поэтому оно захватит v и сделает его частью окружения замыкания. Поскольку thread::spawn запускает это замыкание в новом потоке, мы должны иметь доступ к v внутри этого нового потока. Но при компиляции этого примера, мы получаем следующую ошибку:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {:?}", v);
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

For more information about this error, try `rustc --explain E0373`.
error: could not compile `threads` (bin "threads") due to 1 previous error

Rust выводит как захватить v и так как в println! нужна только ссылка на v, то замыкание пытается заимствовать v. Однако есть проблема: Rust не может определить, как долго будет работать порождённый поток, поэтому он не знает, будет ли всегда действительной ссылка на v.

В листинге 16-4 приведён сценарий, который с большей вероятностью будет иметь ссылку на v, что будет недопустимо:

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}

Листинг 16-4. Поток с замыканием, который пытается захватить ссылку на v из основного потока, удаляющего v

Если бы Rust позволил нам запустить этот код, есть вероятность, что порождённый поток был бы немедленно переведён в фоновый режим, не выполнив ничего. Порождённый поток имеет ссылку на v, но основной поток немедленно удаляет v , используя функцию drop , которую мы обсуждали в главе 15. Затем, когда порождённый поток начинает выполняться, v уже не существует, поэтому ссылка на него также будет недействительной. О, нет!

Чтобы исправить ошибку компилятора в листинге 16-3, мы можем использовать совет из сообщения об ошибке:

help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

Добавляя ключевое слово move перед замыканием, мы заставляем замыкание забирать используемые значения во владение, вместо того, чтобы позволить Rust вывести необходимость заимствования значения. Модификация Листинга 16-3, показанная в Листинге 16-5, будет скомпилирована и запущена так, как мы ожидаем:

Файл: src/main.rs

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

Листинг 16-5. Использование ключевого слова move , чтобы замыкание стало владельцем используемых им значений.

У нас может возникнуть соблазн попробовать то же самое, чтобы исправить код в листинге 16.4, где основной поток вызывал drop с помощью замыкания move . Однако это исправление не сработает, потому что то, что пытается сделать листинг 16.4, запрещено по другой причине. Если мы добавим move к замыканию, мы переместим v в окружение замыкания и больше не сможем вызывать для него drop в основном потоке. Вместо этого мы получим эту ошибку компилятора:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
4  |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec<i32>`, which does not implement the `Copy` trait
5  |
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
7  |         println!("Here's a vector: {:?}", v);
   |                                           - variable moved due to use in closure
...
10 |     drop(v); // oh no!
   |          ^ value used here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `threads` (bin "threads") due to 1 previous error

Правила владения Rust снова нас спасли! Мы получили ошибку кода из листинга 16-3, потому что Rust был консервативен и заимствовал v только для потока, что означало, что основной поток теоретически может сделать недействительной ссылку на порождённый поток. Сообщив Rust о передаче владения v в порождаемый поток, мы гарантируем Rust, что основной поток больше не будет использовать v. Если мы изменим Листинг 16-4 таким же образом, то мы нарушаем правила владения при попытке использовать v в главном потоке. Ключевое слово move отменяет основное консервативное поведение Rust по заимствованию, что не позволяет нам нарушать правила владения.

Имея базовое понимание потоков и API потоков, давайте посмотрим, что мы можем делать с помощью потоков.