原文檔:crossbeam_channel - Rust (docs.rs)
Rust 標準庫提供了通道std::sync::mpsc
,其中mpsc
是 multiple producer, single consumer 的縮寫,代表了該通道支持多個發送者,但是只支持唯一的接收者。而 crossbeam_channel 是一個 mpmc (多發送者,多接收者) 的 Rust 庫,與 Go 語言的 channel 有異曲同工之妙。
Hello, world!#
use crossbeam_channel::unbounded;
// Create a channel of unbounded capacity.
let (s, r) = unbounded();
// Send a message into the channel.
s.send("Hello, world!").unwrap();
// Receive the message from the channel.
assert_eq!(r.recv(), Ok("Hello, world!"));
通道類型#
創建通道有兩種方式:
-
bounded:有限緩衝通道
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
-
unbounded:無限緩衝通道
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
兩個函數均返回一個 Sender
和一個 Receiver
,用於發送和接收消息。
bounded channel:
use crossbeam_channel::bounded;
// 創建一個通道最多同時容納5個消息
let (s, r) = bounded(5);
// Can send only 5 messages without blocking.
for i in 0..5 {
s.send(i).unwrap();
}
// 超過緩衝大小區繼續發送將導致阻塞
// s.send(5).unwrap();
unbounded channel:
use crossbeam_channel::unbounded;
// 創建一個無限緩衝通道
let (s, r) = unbounded();
// 可以發送任意數量的消息直到撐爆你的內存
for i in 0..1000 {
s.send(i).unwrap();
}
當有限緩衝通道的緩衝值設置為 0 時,就變成了同步通道 (無緩衝通道),這意味著只有在有接收方能夠接收值的時候才能發送成功,否則會一直處於等待發送的階段。
use std::thread;
use crossbeam_channel::bounded;
// 創建一個無緩衝通道
let (s, r) = bounded(0);
// 發送將阻塞直到被接收
thread::spawn(move || s.send("Hi!").unwrap());
// 接收將阻塞直到有人發送
assert_eq!(r.recv(), Ok("Hi!"));
多線程之間共享通道#
Senders 和 receivers 可以通過 clone 被多個線程持有。
use std::thread;
use crossbeam_channel::bounded;
let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());
// 開啟一個線程,接收一個消息,然後發送一個消息
thread::spawn(move || {
r2.recv().unwrap();
s2.send(2).unwrap();
});
// 發送一個消息,然後接收一個消息
s1.send(1).unwrap();
r1.recv().unwrap();
請注意 clone 產生的新 Senders 和 receivers 並不會創建新的消息流,而是與原來的 channel 共享同一個消息流,通道中的消息遵循先進先出的原則。
use crossbeam_channel::unbounded;
let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());
s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();
assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));
Senders 和 receivers 也可以通過引用在多線程間共享。
use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;
let (s, r) = bounded(0);
// thread::scope傳入一個閉包,在這個範圍內做spawn操作最後統一做join,
// 如果所有線程都join成功則返回Ok,否則返回Err。
scope(|scope| {
// 開啟一個線程,接收一個消息然後發送一個消息
scope.spawn(|_| {
r.recv().unwrap();
s.send(2).unwrap();
});
// Send a message and then receive one.
s.send(1).unwrap();
r.recv().unwrap();
}).unwrap();
關閉通道 (Disconnection)#
當與通道關聯的所有 senders 或所有 receivers 被drop
後,該通道將關閉 (斷開連接),不能再發送消息,但仍然可以接收任何剩餘的消息。在斷開連接的通道上發送和接收操作永遠不會阻塞。
use crossbeam_channel::{unbounded, RecvError};
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// 唯一的發送者被丟棄,通道關閉。
drop(s);
// 剩餘的消息仍然可以被接收。
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));
// 通道中沒有更多的消息了。
assert!(r.is_empty());
// 注意,調用`r.recv()`不會阻塞。
// 消息接收完後,`Err(RecvError)`會立即返回。
assert_eq!(r.recv(), Err(RecvError));
阻塞的通道#
發送和接收操作有三種情況:
- 非阻塞(立即返回成功或失敗)
- 阻塞(等待直到操作成功或通道斷開連接)
- 帶有超時的阻塞(僅阻塞一段時間)
下面是一個簡單的例子,展示非阻塞和阻塞操作的區別:
use crossbeam_channel::{bounded, RecvError, TryRecvError};
let (s, r) = bounded(1);
// 發送消息到通道
s.send("foo").unwrap();
// 這個調用會阻塞,因為通道已滿
// s.send("bar").unwrap();
// 接收消息
assert_eq!(r.recv(), Ok("foo"));
// 這個接收操作會阻塞,因為通道為空
// r.recv();
// try_recv()方法嘗試接收消息,不會阻塞,如果通道為空,會返回錯誤
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
// 關閉通道
drop(s);
// 這個接收操作不會阻塞,因為通道已經關閉
assert_eq!(r.recv(), Err(RecvError));
迭代器#
接收消息可以用作迭代器。例如,iter
方法創建一個迭代器,接收消息,直到通道為空並且關閉。注意,如果通道為空,迭代將會阻塞,直到通道關閉或下一條消息到達。
use std::thread;
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
thread::spawn(move || {
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
drop(s); // 關閉通道。
});
// 從通道中接收所有消息。
// 注意,調用`collect`會阻塞,直到通道被關閉。
let v: Vec<_> = r.iter().collect();
assert_eq!(v, [1, 2, 3]);
你也可以使用 try_iter 創建非阻塞迭代器,接收所有可用的消息,當通道為空時,迭代結束:
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// 不關閉通道
// 接收通道中當前的所有消息
let v: Vec<_> = r.try_iter().collect();
assert_eq!(v, [1, 2, 3]);
Selection#
select!
宏允許你定義一組通道操作,等待其中任何一個操作準備就緒,然後執行它。如果多個操作同時準備就緒,將隨機選擇一個。
也可以定義一個默認的操作,如果沒有操作準備就緒,則立即執行或者在一段時間後執行默認操作。
如果操作不阻塞,我們就認為它已經準備就緒了,即使通道已經關閉並返回一個了錯誤。
下面是從兩個通道接收消息的例子:
use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};
let (s1, r1) = unbounded();
let (s2, r2) = unbounded();
thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());
// 最多只有一個接收操作會被執行。
select! {
recv(r1) -> msg => assert_eq!(msg, Ok(10)),
recv(r2) -> msg => assert_eq!(msg, Ok(20)),
default(Duration::from_secs(1)) => println!("timed out"),
}
如果需要選擇動態創建的通道操作列表,請使用Select
。select! 宏只是 Select 的一個便利的封裝。
特殊的 channel#
三個函數可以創建特殊類型的通道,它們都只返回一個 Receiver:
-
after
創建一個通道,在一段時間後,傳遞一條消息
-
tick
創建一個通道,定期傳遞消息
-
never
創建一個通道,永遠不會傳遞消息
這些通道非常高效,因為消息僅在接收操作時才會被生成。
下面是一個例子,每 50 毫秒打印一次時間,持續 1 秒:
use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};
let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();
loop {
select! {
recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
recv(timeout) -> _ => break,
}
}