原文档:crossbeam_channel - Rust (docs.rs)
Rust 标准库提供了通道std::sync::mpsc
,其中mpsc
是 multiple producer, single consumer 的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。而 crossbeam_channel 是一个 mpmc (多发送者,多接收者) 的 Rust 库,与 Go 语言的 channel 有异曲同工之妙。
Hello, world!#
use crossbeam_channel::unbounded;
// Create a channel of unbounded capacity.
let (s, r) = unbounded();
// Send a message into the channel.
s.send("Hello, world!").unwrap();
// Receive the message from the channel.
assert_eq!(r.recv(), Ok("Hello, world!"));
通道类型#
创建通道有两种方式:
-
bounded:无限缓冲通道
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
-
unbounded 有限缓冲通道
pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
两个函数均返回一个 Sender
和 一个 Receiver
, 用于发送和接收消息。
bounded channel:
use crossbeam_channel::bounded;
// 创建一个通道最多同时容纳5个消息
let (s, r) = bounded(5);
// Can send only 5 messages without blocking.
for i in 0..5 {
s.send(i).unwrap();
}
// 超过缓冲大小区继续发送将导致阻塞
// s.send(5).unwrap();
unbounded channel:
use crossbeam_channel::unbounded;
// 创建一个无限缓冲通道
let (s, r) = unbounded();
// 可以发送任意数量的消息直到撑爆你的内存
for i in 0..1000 {
s.send(i).unwrap();
}
当有限缓冲通道的缓冲值设置为 0 时,就变成了同步通道 (无缓冲通道),这意味着只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。
use std::thread;
use crossbeam_channel::bounded;
// 创建一个无缓冲通道
let (s, r) = bounded(0);
// 发送将阻塞直到被接收
thread::spawn(move || s.send("Hi!").unwrap());
// 接收将阻塞直到有人发送
assert_eq!(r.recv(), Ok("Hi!"));
多线程之间共享通道#
Senders 和 receivers 可以通过 clone 被多个线程持有。
use std::thread;
use crossbeam_channel::bounded;
let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());
// 开启一个线程,接收一个消息,然后发送一个消息
thread::spawn(move || {
r2.recv().unwrap();
s2.send(2).unwrap();
});
// 发送一个消息,然后接收一个消息
s1.send(1).unwrap();
r1.recv().unwrap();
请注意 clone 产生的新 Senders 和 receivers 并不会创建新的消息流,而是与原来的 channel 共享同一个消息流,通道中的消息遵循先进先出的原则。
use crossbeam_channel::unbounded;
let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());
s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();
assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));
Senders 和 receivers 也可以通过引用在多线程间共享。
use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;
let (s, r) = bounded(0);
// thread::scope 传入一个闭包,在这个范围内做spawn操作最后统一做join,
// 如果所有线程都join成功则返回Ok,否则返回Err。
scope(|scope| {
// 开启一个线程,接收一个消息然后发送一个消息
scope.spawn(|_| {
r.recv().unwrap();
s.send(2).unwrap();
});
// Send a message and then receive one.
s.send(1).unwrap();
r.recv().unwrap();
}).unwrap();
关闭通道 (Disconnection)#
当与通道关联的 所有 senders 或 所有 receivers 被drop
后,该通道将关闭 (断开连接),不能再发送消息,但仍然可以接收任何剩余的消息。在断开连接的通道上发送和接收操作永远不会阻塞。
use crossbeam_channel::{unbounded, RecvError};
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// 唯一的发送者被丢弃,通道关闭。
drop(s);
// 剩余的消息仍然可以被接收。
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));
// 通道中没有更多的消息了。
assert!(r.is_empty());
// 注意,调用`r.recv()`不会阻塞。
// 消息接收完后,`Err(RecvError)`会立即返回。
assert_eq!(r.recv(), Err(RecvError));
阻塞的通道#
发送和接收操作有三种情况:
- 非阻塞(立即返回成功或失败)
- 阻塞(等待直到操作成功或通道断开连接)
- 带有超时的阻塞(仅阻塞一段时间)
下面是一个简单的例子,展示非阻塞和阻塞操作的区别:
use crossbeam_channel::{bounded, RecvError, TryRecvError};
let (s, r) = bounded(1);
// 发送消息到通道
s.send("foo").unwrap();
// 这个调用会阻塞,因为通道已满
// s.send("bar").unwrap();
// 接收消息
assert_eq!(r.recv(), Ok("foo"));
// 这个接收操作会阻塞,因为通道为空
// r.recv();
// try_recv()方法尝试接收消息,不会阻塞,如果通道为空,会返回错误
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
// 关闭通道
drop(s);
// 这个接收操作不会阻塞,因为通道已经关闭
assert_eq!(r.recv(), Err(RecvError));
迭代器#
接收消息可以用作迭代器。例如,iter
方法创建一个迭代器,接收消息,直到通道为空并且关闭。注意,如果通道为空,迭代将会阻塞,直到通道关闭或下一条消息到达。
use std::thread;
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
thread::spawn(move || {
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
drop(s); // 关闭通道。
});
// 从通道中接收所有消息。
// 注意,调用`collect`会阻塞,直到通道被关闭。
let v: Vec<_> = r.iter().collect();
assert_eq!(v, [1, 2, 3]);
你也可以使用 try_iter 创建非阻塞迭代器,接收所有可用的消息,当通道为空时,迭代结束:
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// 不关闭通道
// 接收通道中当前的所有消息
let v: Vec<_> = r.try_iter().collect();
assert_eq!(v, [1, 2, 3]);
Selection#
select!
宏允许你定义一组通道操作,等待其中任何一个操作准备就绪,然后执行它。如果多个操作同时准备就绪,将随机选择一个。
也可以定义一个默认的操作,如果没有操作准备就绪,则立即执行或者在一段时间后执行默认操作。
如果操作不阻塞,我们就认为它已经准备就绪了,即使通道已经关闭并返回一个了错误。
下面是从两个通道接收消息的例子:
use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};
let (s1, r1) = unbounded();
let (s2, r2) = unbounded();
thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());
// 最多只有一个接收操作会被执行。
select! {
recv(r1) -> msg => assert_eq!(msg, Ok(10)),
recv(r2) -> msg => assert_eq!(msg, Ok(20)),
default(Duration::from_secs(1)) => println!("timed out"),
}
如果需要选择动态创建的通道操作列表,请使用Select
。select! 宏只是 Select 的一个便利的封装。
特殊的 channel#
三个函数可以创建特殊类型的通道,它们都只返回一个 Receiver:
-
after
创建一个通道,在一段时间后,传递一条消息
-
tick
创建一个通道,定期传递消息
-
never
创建一个通道,永远不会传递消息
这些通道非常高效,因为消息仅在接收操作时才会被生成。
下面是一个例子,每 50 毫秒打印一次时间,持续 1 秒:
use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};
let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();
loop {
select! {
recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
recv(timeout) -> _ => break,
}
}