doraemon

doraemon

let's go write some rusty code and revolutionize the world!

Rust Crate: crossbeam_channel

原文档:crossbeam_channel - Rust (docs.rs)

Rust 标准库提供了通道std::sync::mpsc,其中mpsc是 multiple producer, single consumer 的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。而 crossbeam_channel 是一个 mpmc (多发送者,多接收者) 的 Rust 库,与 Go 语言的 channel 有异曲同工之妙。

Hello, world!#

use crossbeam_channel::unbounded;

// Create a channel of unbounded capacity.
let (s, r) = unbounded();

// Send a message into the channel.
s.send("Hello, world!").unwrap();

// Receive the message from the channel.
assert_eq!(r.recv(), Ok("Hello, world!"));

通道类型#

创建通道有两种方式:

  • bounded:无限缓冲通道

    pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
    
  • unbounded 有限缓冲通道

    pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
    

两个函数均返回一个 Sender 和 一个 Receiver, 用于发送和接收消息。

bounded channel

use crossbeam_channel::bounded;

// 创建一个通道最多同时容纳5个消息
let (s, r) = bounded(5);

// Can send only 5 messages without blocking.
for i in 0..5 {
    s.send(i).unwrap();
}

// 超过缓冲大小区继续发送将导致阻塞
// s.send(5).unwrap();

unbounded channel:

use crossbeam_channel::unbounded;

// 创建一个无限缓冲通道
let (s, r) = unbounded();

// 可以发送任意数量的消息直到撑爆你的内存
for i in 0..1000 {
    s.send(i).unwrap();
}

当有限缓冲通道的缓冲值设置为 0 时,就变成了同步通道 (无缓冲通道),这意味着只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。

use std::thread;
use crossbeam_channel::bounded;

// 创建一个无缓冲通道
let (s, r) = bounded(0);

// 发送将阻塞直到被接收
thread::spawn(move || s.send("Hi!").unwrap());

// 接收将阻塞直到有人发送
assert_eq!(r.recv(), Ok("Hi!"));

多线程之间共享通道#

Senders 和 receivers 可以通过 clone 被多个线程持有。

use std::thread;
use crossbeam_channel::bounded;

let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());

// 开启一个线程,接收一个消息,然后发送一个消息
thread::spawn(move || {
    r2.recv().unwrap();
    s2.send(2).unwrap();
});

// 发送一个消息,然后接收一个消息
s1.send(1).unwrap();
r1.recv().unwrap();

请注意 clone 产生的新 Senders 和 receivers 并不会创建新的消息流,而是与原来的 channel 共享同一个消息流,通道中的消息遵循先进先出的原则。

use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());

s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();

assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));

Senders 和 receivers 也可以通过引用在多线程间共享。

use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;

let (s, r) = bounded(0);

// thread::scope 传入一个闭包,在这个范围内做spawn操作最后统一做join,
// 如果所有线程都join成功则返回Ok,否则返回Err。
scope(|scope| {
    // 开启一个线程,接收一个消息然后发送一个消息
    scope.spawn(|_| {
        r.recv().unwrap();
        s.send(2).unwrap();
    });

    // Send a message and then receive one.
    s.send(1).unwrap();
    r.recv().unwrap();
}).unwrap();

关闭通道 (Disconnection)#

当与通道关联的 所有 senders 或 所有 receivers 被drop后,该通道将关闭 (断开连接),不能再发送消息,但仍然可以接收任何剩余的消息。在断开连接的通道上发送和接收操作永远不会阻塞。

use crossbeam_channel::{unbounded, RecvError};

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();

// 唯一的发送者被丢弃,通道关闭。
drop(s);

// 剩余的消息仍然可以被接收。
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

// 通道中没有更多的消息了。
assert!(r.is_empty());

// 注意,调用`r.recv()`不会阻塞。
// 消息接收完后,`Err(RecvError)`会立即返回。
assert_eq!(r.recv(), Err(RecvError));

阻塞的通道#

发送和接收操作有三种情况:

  • 非阻塞(立即返回成功或失败)
  • 阻塞(等待直到操作成功或通道断开连接)
  • 带有超时的阻塞(仅阻塞一段时间)

下面是一个简单的例子,展示非阻塞和阻塞操作的区别:

use crossbeam_channel::{bounded, RecvError, TryRecvError};

let (s, r) = bounded(1);

// 发送消息到通道
s.send("foo").unwrap();

// 这个调用会阻塞,因为通道已满
// s.send("bar").unwrap();

// 接收消息
assert_eq!(r.recv(), Ok("foo"));

// 这个接收操作会阻塞,因为通道为空
// r.recv();

 // try_recv()方法尝试接收消息,不会阻塞,如果通道为空,会返回错误
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

 // 关闭通道
drop(s);

// 这个接收操作不会阻塞,因为通道已经关闭
assert_eq!(r.recv(), Err(RecvError));

迭代器#

接收消息可以用作迭代器。例如,iter方法创建一个迭代器,接收消息,直到通道为空并且关闭。注意,如果通道为空,迭代将会阻塞,直到通道关闭或下一条消息到达。

use std::thread;
use crossbeam_channel::unbounded;

let (s, r) = unbounded();

thread::spawn(move || {
    s.send(1).unwrap();
    s.send(2).unwrap();
    s.send(3).unwrap();
    drop(s); // 关闭通道。
});

// 从通道中接收所有消息。
// 注意,调用`collect`会阻塞,直到通道被关闭。
let v: Vec<_> = r.iter().collect();

assert_eq!(v, [1, 2, 3]);

你也可以使用 try_iter 创建非阻塞迭代器,接收所有可用的消息,当通道为空时,迭代结束:

use crossbeam_channel::unbounded;

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// 不关闭通道

// 接收通道中当前的所有消息
let v: Vec<_> = r.try_iter().collect();

assert_eq!(v, [1, 2, 3]);

Selection#

select!宏允许你定义一组通道操作,等待其中任何一个操作准备就绪,然后执行它。如果多个操作同时准备就绪,将随机选择一个。

也可以定义一个默认的操作,如果没有操作准备就绪,则立即执行或者在一段时间后执行默认操作。

如果操作不阻塞,我们就认为它已经准备就绪了,即使通道已经关闭并返回一个了错误。

下面是从两个通道接收消息的例子:

use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};

let (s1, r1) = unbounded();
let (s2, r2) = unbounded();

thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());

// 最多只有一个接收操作会被执行。
select! {
    recv(r1) -> msg => assert_eq!(msg, Ok(10)),
    recv(r2) -> msg => assert_eq!(msg, Ok(20)),
    default(Duration::from_secs(1)) => println!("timed out"),
}

如果需要选择动态创建的通道操作列表,请使用Select。select! 宏只是 Select 的一个便利的封装。

特殊的 channel#

三个函数可以创建特殊类型的通道,它们都只返回一个 Receiver:

  • after

    创建一个通道,在一段时间后,传递一条消息

  • tick

    创建一个通道,定期传递消息

  • never

    创建一个通道,永远不会传递消息

这些通道非常高效,因为消息仅在接收操作时才会被生成。

下面是一个例子,每 50 毫秒打印一次时间,持续 1 秒:

use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};

let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();

loop {
    select! {
        recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
        recv(timeout) -> _ => break,
    }
}
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。