doraemon

doraemon

let's go write some rusty code and revolutionize the world!

Rust Crate: crossbeam_channel

原文檔:crossbeam_channel - Rust (docs.rs)

Rust 標準庫提供了通道std::sync::mpsc,其中mpsc是 multiple producer, single consumer 的縮寫,代表了該通道支持多個發送者,但是只支持唯一的接收者。而 crossbeam_channel 是一個 mpmc (多發送者,多接收者) 的 Rust 庫,與 Go 語言的 channel 有異曲同工之妙。

Hello, world!#

use crossbeam_channel::unbounded;

// Create a channel of unbounded capacity.
let (s, r) = unbounded();

// Send a message into the channel.
s.send("Hello, world!").unwrap();

// Receive the message from the channel.
assert_eq!(r.recv(), Ok("Hello, world!"));

通道類型#

創建通道有兩種方式:

  • bounded:有限緩衝通道

    pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
    
  • unbounded:無限緩衝通道

    pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
    

兩個函數均返回一個 Sender 和一個 Receiver,用於發送和接收消息。

bounded channel

use crossbeam_channel::bounded;

// 創建一個通道最多同時容納5個消息
let (s, r) = bounded(5);

// Can send only 5 messages without blocking.
for i in 0..5 {
    s.send(i).unwrap();
}

// 超過緩衝大小區繼續發送將導致阻塞
// s.send(5).unwrap();

unbounded channel:

use crossbeam_channel::unbounded;

// 創建一個無限緩衝通道
let (s, r) = unbounded();

// 可以發送任意數量的消息直到撐爆你的內存
for i in 0..1000 {
    s.send(i).unwrap();
}

當有限緩衝通道的緩衝值設置為 0 時,就變成了同步通道 (無緩衝通道),這意味著只有在有接收方能夠接收值的時候才能發送成功,否則會一直處於等待發送的階段。

use std::thread;
use crossbeam_channel::bounded;

// 創建一個無緩衝通道
let (s, r) = bounded(0);

// 發送將阻塞直到被接收
thread::spawn(move || s.send("Hi!").unwrap());

// 接收將阻塞直到有人發送
assert_eq!(r.recv(), Ok("Hi!"));

多線程之間共享通道#

Senders 和 receivers 可以通過 clone 被多個線程持有。

use std::thread;
use crossbeam_channel::bounded;

let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());

// 開啟一個線程,接收一個消息,然後發送一個消息
thread::spawn(move || {
    r2.recv().unwrap();
    s2.send(2).unwrap();
});

// 發送一個消息,然後接收一個消息
s1.send(1).unwrap();
r1.recv().unwrap();

請注意 clone 產生的新 Senders 和 receivers 並不會創建新的消息流,而是與原來的 channel 共享同一個消息流,通道中的消息遵循先進先出的原則。

use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());

s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();

assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));

Senders 和 receivers 也可以通過引用在多線程間共享。

use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;

let (s, r) = bounded(0);

// thread::scope傳入一個閉包,在這個範圍內做spawn操作最後統一做join,
// 如果所有線程都join成功則返回Ok,否則返回Err。
scope(|scope| {
    // 開啟一個線程,接收一個消息然後發送一個消息
    scope.spawn(|_| {
        r.recv().unwrap();
        s.send(2).unwrap();
    });

    // Send a message and then receive one.
    s.send(1).unwrap();
    r.recv().unwrap();
}).unwrap();

關閉通道 (Disconnection)#

當與通道關聯的所有 senders 或所有 receivers 被drop後,該通道將關閉 (斷開連接),不能再發送消息,但仍然可以接收任何剩餘的消息。在斷開連接的通道上發送和接收操作永遠不會阻塞。

use crossbeam_channel::{unbounded, RecvError};

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();

// 唯一的發送者被丟棄,通道關閉。
drop(s);

// 剩餘的消息仍然可以被接收。
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

// 通道中沒有更多的消息了。
assert!(r.is_empty());

// 注意,調用`r.recv()`不會阻塞。
// 消息接收完後,`Err(RecvError)`會立即返回。
assert_eq!(r.recv(), Err(RecvError));

阻塞的通道#

發送和接收操作有三種情況:

  • 非阻塞(立即返回成功或失敗)
  • 阻塞(等待直到操作成功或通道斷開連接)
  • 帶有超時的阻塞(僅阻塞一段時間)

下面是一個簡單的例子,展示非阻塞和阻塞操作的區別:

use crossbeam_channel::{bounded, RecvError, TryRecvError};

let (s, r) = bounded(1);

// 發送消息到通道
s.send("foo").unwrap();

// 這個調用會阻塞,因為通道已滿
// s.send("bar").unwrap();

// 接收消息
assert_eq!(r.recv(), Ok("foo"));

// 這個接收操作會阻塞,因為通道為空
// r.recv();

 // try_recv()方法嘗試接收消息,不會阻塞,如果通道為空,會返回錯誤
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

 // 關閉通道
drop(s);

// 這個接收操作不會阻塞,因為通道已經關閉
assert_eq!(r.recv(), Err(RecvError));

迭代器#

接收消息可以用作迭代器。例如,iter方法創建一個迭代器,接收消息,直到通道為空並且關閉。注意,如果通道為空,迭代將會阻塞,直到通道關閉或下一條消息到達。

use std::thread;
use crossbeam_channel::unbounded;

let (s, r) = unbounded();

thread::spawn(move || {
    s.send(1).unwrap();
    s.send(2).unwrap();
    s.send(3).unwrap();
    drop(s); // 關閉通道。
});

// 從通道中接收所有消息。
// 注意,調用`collect`會阻塞,直到通道被關閉。
let v: Vec<_> = r.iter().collect();

assert_eq!(v, [1, 2, 3]);

你也可以使用 try_iter 創建非阻塞迭代器,接收所有可用的消息,當通道為空時,迭代結束:

use crossbeam_channel::unbounded;

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// 不關閉通道

// 接收通道中當前的所有消息
let v: Vec<_> = r.try_iter().collect();

assert_eq!(v, [1, 2, 3]);

Selection#

select!宏允許你定義一組通道操作,等待其中任何一個操作準備就緒,然後執行它。如果多個操作同時準備就緒,將隨機選擇一個。

也可以定義一個默認的操作,如果沒有操作準備就緒,則立即執行或者在一段時間後執行默認操作。

如果操作不阻塞,我們就認為它已經準備就緒了,即使通道已經關閉並返回一個了錯誤。

下面是從兩個通道接收消息的例子:

use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};

let (s1, r1) = unbounded();
let (s2, r2) = unbounded();

thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());

// 最多只有一個接收操作會被執行。
select! {
    recv(r1) -> msg => assert_eq!(msg, Ok(10)),
    recv(r2) -> msg => assert_eq!(msg, Ok(20)),
    default(Duration::from_secs(1)) => println!("timed out"),
}

如果需要選擇動態創建的通道操作列表,請使用Select。select! 宏只是 Select 的一個便利的封裝。

特殊的 channel#

三個函數可以創建特殊類型的通道,它們都只返回一個 Receiver:

  • after

    創建一個通道,在一段時間後,傳遞一條消息

  • tick

    創建一個通道,定期傳遞消息

  • never

    創建一個通道,永遠不會傳遞消息

這些通道非常高效,因為消息僅在接收操作時才會被生成。

下面是一個例子,每 50 毫秒打印一次時間,持續 1 秒:

use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};

let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();

loop {
    select! {
        recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
        recv(timeout) -> _ => break,
    }
}
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。