Многопоточное разделяемое состояние

Передача сообщений — прекрасный способ обработки параллелизма, но не единственный. Другим методом может быть доступ нескольких потоков к одним и тем же общим данным. Рассмотрим ещё раз часть слогана из документации по языку Go: «Не стоит передавать информацию с помощью разделяемой памяти».

Как бы выглядело общение, используя разделяемую память? Кроме того, почему энтузиасты передачи сообщений предостерегают от его использования?

В каком-то смысле каналы в любом языке программирования похожи на единоличное владение, потому что после передачи значения по каналу вам больше не следует использовать отправленное значение. Многопоточная, совместно используемая память подобна множественному владению: несколько потоков могут одновременно обращаться к одной и той же области памяти. Как вы видели в главе 15, где умные указатели сделали возможным множественное владение, множественное владение может добавить сложность, потому что нужно управлять этими разными владельцами. Система типов Rust и правила владения очень помогают в их правильном управлении. Для примера давайте рассмотрим мьютексы, один из наиболее распространённых многопоточных примитивов для разделяемой памяти.

Мьютексы предоставляют доступ к данным из одного потока (за раз)

Mutex - это сокращение от взаимное исключение (mutual exclusion), так как мьютекс позволяет только одному потоку получать доступ к некоторым данным в любой момент времени. Для того, чтобы получить доступ к данным в мьютексе, поток должен сначала подать сигнал, что он хочет получить доступ запрашивая блокировку (lock) мьютекса. Блокировка - это структура данных, являющаяся частью мьютекса, которая отслеживает кто в настоящее время имеет эксклюзивный доступ к данным. Поэтому мьютекс описывается как объект защищающий данные, которые он хранит через систему блокировки.

Мьютексы имеют репутацию трудных в использовании, потому что вы должны помнить два правила:

  • Перед тем как попытаться получить доступ к данным необходимо получить блокировку.
  • Когда вы закончили работу с данными, которые защищает мьютекс, вы должны разблокировать данные, чтобы другие потоки могли получить блокировку.

Для понимания мьютекса, представьте пример из жизни как групповое обсуждение на конференции с одним микрофоном. Прежде чем участник дискуссии сможет говорить, он должен спросить или дать сигнал, что он хочет использовать микрофон. Когда он получает микрофон, то может говорить столько, сколько хочет, а затем передаёт микрофон следующему участнику, который попросит дать ему выступить. Если участник дискуссии забудет освободить микрофон, когда закончит с ним, то никто больше не сможет говорить. Если управление общим микрофоном идёт не правильно, то конференция не будет работать как было запланировано!

Правильное управление мьютексами может быть невероятно сложным и именно поэтому многие люди с энтузиазмом относятся к каналам. Однако, благодаря системе типов и правилам владения в Rust, вы не можете использовать блокировку и разблокировку неправильным образом.

Mutex<T> API

Давайте рассмотрим пример использования мьютекса в листинге 16-12 без использования нескольких потоков:

Файл: src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

Листинг 16-12: Изучение API Mutex<T> для простоты в однопоточном контексте

Как и во многих других типах, мы создаём Mutex<T> с помощью сопутствующей функции new. Чтобы получить доступ к данным внутри мьютекса, мы используем метод lock для получения блокировки. Этот вызов блокирует выполнение текущего потока, так что он не сможет выполнять никакие действия, до тех пор пока не наступит наша очередь получить блокировку.

Вызов lock потерпит неудачу, если другой поток, удерживающий блокировку, запаникует. В таком случае никто не сможет получить блокировку, поэтому мы предпочли использовать unwrap и заставить этот поток паниковать, если мы окажемся в такой ситуации.

После получения блокировки мы можем воспринимать возвращённое значение, названное в данном случае num, как изменяемую ссылку на содержащиеся внутри данные. Система типов гарантирует, что мы получим блокировку перед использованием значения в m. Тип m - Mutex<i32>, а не i32, поэтому мы должны вызвать lock, чтобы иметь возможность использовать значение i32. Мы не должны об этом забывать, тем более что в иных случаях система типов и не даст нам доступ к внутреннему значению i32.

Как вы наверное подозреваете, Mutex<T> является умным указателем. Точнее, вызов lock возвращает умный указатель, называемый MutexGuard, обёрнутый в LockResult, который мы обработали с помощью вызова unwrap. Умный указатель типа MutexGuard реализует типаж Deref для указания на внутренние данные; умный указатель также имеет реализацию типажа Drop, автоматически снимающего блокировку, когда MutexGuard выходит из области видимости, что происходит в конце внутренней области видимости. В результате у нас нет риска забыть снять блокировку и оставить мьютекс в заблокированном состоянии, препятствуя его использованию другими потоками (снятие блокировки происходит автоматически).

После снятия блокировки можно напечатать значение мьютекса и увидеть, что мы смогли изменить внутреннее i32 на 6.

Разделение Mutex<T> между множеством потоков

Теперь давайте попробуем с помощью Mutex<T> совместно использовать значение между несколькими потоками. Мы стартуем 10 потоков и каждый из них увеличивает значение счётчика на 1, поэтому счётчик изменяется от 0 до 10. Обратите внимание, что в следующих нескольких примерах будут ошибки компилятора и мы будем использовать эти ошибки, чтобы узнать больше об использовании типа Mutex<T> и как Rust помогает нам правильно его использовать. Листинг 16-13 содержит наш начальный пример:

Файл: src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Листинг 16-13. Десять потоков, увеличивающих счётчик, защищённый Mutex<T>

Мы создаём переменную-счётчик counter для хранения i32 значения внутри Mutex<T>, как мы это делали в листинге 16-12. Затем мы создаём 10 потоков, перебирая диапазон чисел. Мы используем thread::spawn и передаём всем этим потокам одинаковое замыкание, которое перемещает счётчик в поток, запрашивает блокировку на Mutex<T>, вызывая метод lock, а затем добавляет 1 к значению в мьютексе. Когда поток завершит выполнение своего замыкания, num выйдет из области видимости и освободит блокировку, чтобы её мог получить другой поток.

В основном потоке мы собираем все дескрипторы в переменную handles. Затем, как мы это делали в листинге 16-2, вызываем join для каждого дескриптора, чтобы убедиться в завершении всех потоков. В этот момент основной поток получит доступ к блокировке и тоже напечатает результат программы.

Компилятор намекнул, что этот пример не компилируется. Давайте выясним почему!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here, in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Сообщение об ошибке указывает, что значение counter было перемещёно в замыкание на предыдущей итерации цикла. Rust говорит нам, что мы не можем передать counter во владение нескольким потокам. Давайте исправим ошибку компилятора с помощью метода множественного владения, который мы обсуждали в главе 15.

Множественное владение между множеством потоков

В главе 15 мы давали значение нескольким владельцам, используя умный указатель Rc<T> для создания значения подсчитанных ссылок. Давайте сделаем то же самое здесь и посмотрим, что произойдёт. Мы завернём Mutex<T> в Rc<T> в листинге 16-14 и клонируем Rc<T> перед передачей владения в поток. Теперь, когда мы увидели ошибки, мы также вернёмся к использованию цикла for и сохраним ключевое слово move у замыкания.

Файл: src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Листинг 16-14: Попытка использования Rc<T>, чтобы позволить нескольким потокам владеть Mutex<T>

Ещё раз, мы компилируем и получаем ... другие ошибки! Компилятор учит нас.

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/07dca489ac2d933c78d3c5158e3f43beefeb02ce/library/std/src/thread/mod.rs:678:1

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Ничего себе, это сообщение об ошибке очень многословно! Вот важная часть, на которой следует сосредоточиться: ``Rc<Mutex> cannot be sent between threads safely. Компилятор также сообщает нам причину: the trait Sendis not implemented forRc<Mutex> . Мы поговорим о Send в следующем разделе: это один из типажей, который гарантирует, что типы которые мы используем с потоками, предназначены для использования в многопоточном коде.

К сожалению, Rc<T> небезопасен для совместного использования между потоками. Когда Rc<T> управляет счётчиком ссылок, он добавляется значение к счётчику для каждого вызова clone и вычитается значение из счётчика, когда каждое клонированное значение удаляется при выходе из области видимости. Но он не использует примитивы многопоточности, чтобы гарантировать, что изменения в подсчёте не могут быть прерваны другим потоком. Это может привести к неправильным подсчётам - незначительным ошибкам, которые в свою очередь, могут привести к утечкам памяти или удалению значения до того, как мы отработали с ним. Нам нужен тип точно такой же как Rc<T>, но который позволяет изменять счётчик ссылок безопасно из разных потоков.

Атомарный счётчик ссылок Arc<T>

К счастью, Arc<T> является типом аналогичным типу Rc<T>, который безопасен для использования в ситуациях многопоточности. Буква А означает атомарное, что означает тип ссылка подсчитываемая атомарно. Atomics - это дополнительный вид примитивов для многопоточности, который мы не будем здесь подробно описывать: дополнительную информацию смотрите в документации стандартной библиотеки для std::sync::atomic. На данный момент вам просто нужно знать, что atomics работают как примитивные типы, но безопасны для совместного использования между потоками.

Вы можете спросить, почему все примитивные типы не являются атомарными и почему стандартные типы библиотек не реализованы для использования вместе с типом Arc<T> по умолчанию. Причина в том, что безопасность потоков сопровождается снижением производительности, которое вы хотите платить только тогда, когда вам это действительно нужно. Если вы просто выполняете операции со значениями в одном потоке, то ваш код может работать быстрее, если он не должен обеспечивать гарантии предоставляемые atomics.

Давайте вернёмся к нашему примеру: типы Arc<T> и Rc<T> имеют одинаковый API, поэтому мы исправляем нашу программу, заменяя тип в строках use, вызове new и вызове clone. Код в листинге 16-15, наконец скомпилируется и запустится:

Файл: src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Листинг 16-15: Использование типа Arc<T> для обёртывания Mutex<T>, теперь несколько потоков могут совместно владеть мьютексом

Код напечатает следующее:

Result: 10

Мы сделали это! Мы посчитали от 0 до 10, что может показаться не очень впечатляющим, но это позволило больше узнать про Mutex<T> и безопасность потоков. Вы также можете использовать структуру этой программы для выполнения более сложных операций, чем просто увеличение счётчика. Используя эту стратегию, вы можете разделить вычисления на независимые части, разделить эти части на потоки, а затем использовать Mutex<T>, чтобы каждый поток обновлял конечный результат своей частью кода.

Обратите внимание, что если вы выполняете простые числовые операции, то существуют типы более простые, чем Mutex<T>, которые предоставляет модуль std::sync::atomic стандартной библиотеки. Эти типы обеспечивают безопасный, параллельный, атомарный доступ к простым типам. Мы решили использовать Mutex<T> с простым типом в этом примере, чтобы подробнее рассмотреть, как работает Mutex<T>.

Сходства RefCell<T> / Rc<T> и Mutex<T> / Arc<T>

Вы могли заметить, что counter сам по себе не изменяемый (immutable), но мы можем получить изменяемую ссылку на значение внутри него; это означает, что Mutex<T> обеспечивает внутреннюю изменяемость, также как и семейство Cell типов. Мы использовали RefCell<T> в главе 15, чтобы получить возможность изменять содержимое внутри Rc<T>, теперь аналогичным образом мы используем Mutex<T> для изменения содержимого внутри Arc<T> .

Ещё одна деталь, на которую стоит обратить внимание: Rust не может защитить вас от всевозможных логических ошибок при использовании Mutex<T>. Вспомните в главе 15, что использование Rc<T> сопряжено с риском создания ссылочной зацикленности, где два значения Rc<T> ссылаются друг на друга, что приводит к утечкам памяти. Аналогичным образом, Mutex<T> сопряжён с риском создания взаимных блокировок (deadlocks). Это происходит, когда операции необходимо заблокировать два ресурса и каждый из двух потоков получил одну из блокировок, заставляя оба потока ждать друг друга вечно. Если вам интересна тема взаимных блокировок, попробуйте создать программу Rust, которая её содержит; затем исследуйте стратегии устранения взаимных блокировок для мьютексов на любом языке и попробуйте реализовать их в Rust. Документация стандартной библиотеки для Mutex<T> и MutexGuard предлагает полезную информацию.

Мы завершим эту главу, рассказав о типажах Send и Sync и о том, как мы можем использовать их с пользовательскими типами.