Передача данных с помощью сообщений между потоками

Всё большую популярность для обеспечения безопасной многопоточности набирает способ, называемый передача сообщений. В этом случае потоки или акторы взаимодействуют друг с другом путём отправки сообщений с данными. Идея этого подхода выражена в слогане из документации языка Go таким образом: «Не стоит передавать информацию с помощью разделяемой памяти; лучше делитесь памятью, передавая информацию».

Для обеспечения отправки многопоточных сообщений в стандартной библиотеке языка Rust реализованы каналы. Канал в программировании - это общепринятый механизм, с помощью которого данные из одного потока отправляются другому потоку.

Вы можете представить канал в программировании как направленное движение воды, например как ручей или реку. Если вы поместите какую-нибудь вещь на воду, например резиновую уточку, она будет плыть вниз по течению до тех пор, пока это течение не кончится.

Канал состоит из двух половин: передатчика и приёмника. Передатчик — это место вверх по течению, где вы опускаете резиновых уточек в реку, а приёмник — это место, где резиновые уточки оказываются в конце пути. Одна часть вашего кода вызывает методы передатчика с данными, которые вы хотите отправить, а другая часть проверяет принимающую сторону на наличие поступающих сообщений. Канал считается закрытым , если либо передающая, либо принимающая его половина уничтожена.

Давайте создадим программу, в которой один поток будет генерировать значения и отправлять их в канал, а другой поток будет получать значения и распечатывать их. Мы будем отправлять между потоками простые значения, используя канал, чтобы проиллюстрировать эту функцию. После того, как вы ознакомитесь с этим методом, вы сможете использовать каналы с любыми потоками, которым необходимо взаимодействовать друг с другом. Это может быть например система чата или система, в которой несколько вычислительных потоков выполняют свою часть расчёта, а затем отправляют эту часть в отдельный поток, который уже агрегирует полученные результаты.

Сначала в листинге 16-6 мы создадим канал, но не будем ничего с ним делать. Обратите внимание, что этот код ещё не компилируется, потому что Rust не может сказать, какой тип значений мы хотим отправить через канал.

Файл: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Листинг 16-6: Создание канала и присваивание двух значений переменным tx и rx

Мы создаём новый канал, используя функцию mpsc::channel; mpsc означает несколько производителей, один потребитель (multiple producer, single consumer). Коротко, способ которым стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих источников генерирующих значения, но только одну принимающую сторону, которая потребляет эти значения. Представьте, что несколько ручьёв втекают в одну большую реку: всё, что плывёт вниз по любому из ручьёв, в конце концов окажется в одной реке. Сейчас мы пока начнём с одного производителя, а когда пример заработает, добавим ещё несколько.

Функция mpsc::channel возвращает кортеж, первый элемент которого является отправляющей стороной (передатчиком), а вторым элементом является принимающая сторона (получатель). Аббревиатуры tx и rx традиционно используются во многих полях для передатчика и приёмника соответственно, поэтому мы называем соответствующие переменные именно так. Мы используем инструкцию let с шаблоном, который деструктурирует кортежи; мы обсудим использование шаблонов в инструкциях let и деструктуризацию в главе 18. А пока знайте, что описанное использование инструкции let является удобным способом извлечения частей кортежа, возвращаемых mpsc::channel .

Давайте переместим передающую часть в порождённый поток так, чтобы он отправлял одну строку и чтобы таким образом, порождённый поток связывался с основным потоком, как показано в листинге 16-7. Это похоже на то, как если бы вы поместили резиновую утку в реку вверх по течению или отправили сообщение чата из одного потока в другой.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

Листинг 16-7: Перемещение tx в созданный поток и отправка сообщения «привет»

Опять же, мы используем thread::spawn для создания нового потока, а затем используем move для перемещения tx в замыкание, чтобы порождённый поток владел tx . Порождённый поток должен владеть передатчиком, чтобы иметь возможность отправлять сообщения через канал. Передатчик имеет метод send , который принимает значение, которое мы хотим отправить. Метод send возвращает тип Result<T, E> , поэтому, если получатель уже удалён и отправить значение некуда, операция отправки вернёт ошибку. В этом примере мы вызываем unwrap для паники в случае ошибки. В реальном приложении мы обработали бы эту ситуацию более корректно: вернитесь к главе 9, если хотите ещё раз разобрать стратегии правильной обработки ошибок.

В листинге 16-8 мы получим значение от приёмника в основном потоке. Это похоже на извлечение резиновой уточки из воды в конце реки или получение сообщения в чате.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Листинг 16-8: В основном потоке получаем сообщение "hi" и печатаем его

Получатель имеет два важных метода: recv и try_recv. Мы используем recv, что является сокращением от receive, который блокирует выполнение основного потока и ждёт, пока данные не будут переданы по каналу. Как только значение будет получено, recv вернёт его в виде Result<T, E>. Когда канал закроется, recv вернёт ошибку, чтобы дать понять, что больше никаких сообщений не поступит.

В свою очередь, метод try_recv не блокирует, а сразу возвращает результат Result<T, E>: значение Ok, содержащее сообщение, если оно доступно или значение Err, если никаких сообщений не поступило. Использование try_recv полезно, если у этого потока есть и другая работа в то время, пока происходит ожидание сообщений: так, мы можем написать цикл, который вызывает try_recv время от времени, обрабатывает сообщение, если оно доступно, а в промежутке выполняет другую работу до того момента, как вновь будет произведена проверка.

Мы использовали recv в этом примере для простоты; у нас нет никакой другой работы для основного потока, кроме как ждать сообщений, поэтому блокировка основного потока уместна.

При запуске кода листинга 16-8, мы увидим значение, напечатанное из основного потока:

Got: hi

Отлично!

Каналы и передача владения

Правила владения играют жизненно важную роль в отправке сообщений, потому что они помогают писать безопасный многопоточный код. Предотвращение ошибок в многопоточном программировании является преимуществом для размышлений о владении во всех ваших Rust программах. Давайте проведём эксперимент, чтобы показать как каналы и владение действуют совместно для предотвращения проблем: мы попытаемся использовать значение val в порождённом потоке после того как отправим его в канал. Попробуйте скомпилировать код в листинге 16-9, чтобы понять, почему этот код не разрешён:

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

Листинг 16-9: Попытка использовать val после того, как мы отправили его по каналу

Здесь мы пытаемся напечатать значение val после того, как отправили его в канал вызвав tx.send. Разрешить это было бы плохой идеей: после того, как значение было отправлено в другой поток, текущий поток мог бы изменить или удалить значение, прежде чем мы попытались бы использовать значение снова. Потенциально изменения в другом потоке могут привести к ошибкам или не ожидаемым результатам из-за противоречивых или несуществующих данных. Однако Rust выдаёт нам ошибку, если мы пытаемся скомпилировать код в листинге 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
   |
9  |         tx.send(val.clone()).unwrap();
   |                    ++++++++

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Наша ошибка для многопоточности привела к ошибке компиляции. Функция send вступает во владение своим параметром и когда значение перемещается, получатель становится владельцем этого параметра. Это останавливает нас от случайного использования значения снова после его отправки; анализатор заимствования проверяет, что все в порядке.

Отправка нескольких значений и ожидание получателем

Код в листинге 16-8 компилируется и выполняется, но в нем неясно показано то, что два отдельных потока общаются друг с другом через канал. В листинге 16-10 мы внесли некоторые изменения, которые докажут, что код в листинге 16-8 работает одновременно: порождённый поток теперь будет отправлять несколько сообщений и делать паузу на секунду между каждым сообщением.

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

Листинг 16-10: Отправка нескольких сообщений и пауза между ними

На этот раз порождённый поток имеет вектор строк, которые мы хотим отправить основному потоку. Мы перебираем их, отправляя каждую строку по отдельности и делаем паузу между ними, вызывая функцию thread::sleep со значением Duration равным 1 секунде.

В основном потоке мы больше не вызываем функцию recv явно: вместо этого мы используем rx как итератор. Для каждого полученного значения мы печатаем его. Когда канал будет закрыт, итерация закончится.

При выполнении кода в листинге 16-10 вы должны увидеть следующий вывод с паузой в 1 секунду между каждой строкой:

Got: hi
Got: from
Got: the
Got: thread

Поскольку у нас нет кода, который приостанавливает или задерживает цикл for в основном потоке, мы можем сказать, что основной поток ожидает получения значений из порождённого потока.

Создание нескольких отправителей путём клонирования передатчика

Ранее мы упоминали, что mpsc — это аббревиатура от множество поставщиков, один потребитель . Давайте используем mpsc в полной мере и расширим код в листинге 16.10, создав несколько потоков, которые отправляют значения одному и тому же получателю. Мы можем сделать это, клонировав передатчик, как показано в листинге 16.11:

Файл: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--
}

Листинг 16-11: Отправка нескольких сообщений от нескольких производителей

На этот раз, прежде чем мы создадим первый порождённый поток, мы вызовем функцию clone на передатчике. В результате мы получим новый передатчик, который мы сможем передать первому порождённому потоку. Исходный передатчик мы передадим второму порождённому потоку. Это даст нам два потока, каждый из которых отправляет разные сообщения одному получателю.

Когда вы запустите код, вывод должен выглядеть примерно так:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Вы можете увидеть значения в другом порядке, в зависимости от вашей системы. Именно такое поведение делает параллелизм как интересным, так и сложным. Если вы поэкспериментируете с thread::sleep , задавая различные значения аргумента в разных потоках, каждый запуск будет более недетерминированным и каждый раз будут выводиться разные данные.

Теперь, когда мы посмотрели, как работают каналы, давайте рассмотрим другой метод многопоточности.