doraemon

doraemon

let's go write some rusty code and revolutionize the world!

Rust Crate: crossbeam_channel

Original Document: crossbeam_channel - Rust (docs.rs)

The Rust standard library provides the channel std::sync::mpsc, where mpsc is an abbreviation for multiple producer, single consumer, which means that the channel supports multiple senders but only one receiver. On the other hand, crossbeam_channel is a Rust library for mpmc (multiple producer, multiple consumer) channels, similar to the channel in the Go language.

Hello, world!#

use crossbeam_channel::unbounded;

// Create a channel of unbounded capacity.
let (s, r) = unbounded();

// Send a message into the channel.
s.send("Hello, world!").unwrap();

// Receive the message from the channel.
assert_eq!(r.recv(), Ok("Hello, world!"));

Channel Types#

There are two ways to create channels:

  • bounded: bounded buffer channel

    pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>)
    
  • unbounded: unbounded buffer channel

    pub fn unbounded<T>() -> (Sender<T>, Receiver<T>)
    

Both functions return a Sender and a Receiver for sending and receiving messages.

bounded channel:

use crossbeam_channel::bounded;

// Create a channel with a maximum capacity of 5 messages.
let (s, r) = bounded(5);

// Can send only 5 messages without blocking.
for i in 0..5 {
    s.send(i).unwrap();
}

// Sending more than the buffer size will block.
// s.send(5).unwrap();

unbounded channel:

use crossbeam_channel::unbounded;

// Create an unbounded buffer channel.
let (s, r) = unbounded();

// Can send any number of messages until you run out of memory.
for i in 0..1000 {
    s.send(i).unwrap();
}

When the buffer value of a bounded buffer channel is set to 0, it becomes a synchronous channel (unbuffered channel), which means that sending can only succeed when there is a receiver ready to receive the value, otherwise it will be in a waiting state.

use std::thread;
use crossbeam_channel::bounded;

// Create an unbuffered channel.
let (s, r) = bounded(0);

// Sending will block until it is received.
thread::spawn(move || s.send("Hi!").unwrap());

// Receiving will block until someone sends.
assert_eq!(r.recv(), Ok("Hi!"));

Sharing Channels Between Threads#

Senders and receivers can be held by multiple threads through cloning.

use std::thread;
use crossbeam_channel::bounded;

let (s1, r1) = bounded(0);
let (s2, r2) = (s1.clone(), r1.clone());

// Spawn a thread to receive a message and then send a message.
thread::spawn(move || {
    r2.recv().unwrap();
    s2.send(2).unwrap();
});

// Send a message and then receive a message.
s1.send(1).unwrap();
r1.recv().unwrap();

Please note that the new Senders and receivers produced by cloning do not create new message flows, but share the same message flow as the original channel. Messages in the channel follow the first-in-first-out principle.

use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());

s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();

assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));

Senders and receivers can also be shared between threads through references.

use crossbeam_channel::bounded;
use crossbeam_utils::thread::scope;

let (s, r) = bounded(0);

// The thread::scope takes a closure and does spawn operations within this scope,
// and finally does join. If all threads join successfully, it returns Ok, otherwise it returns Err.
scope(|scope| {
    // Spawn a thread to receive a message and then send a message.
    scope.spawn(|_| {
        r.recv().unwrap();
        s.send(2).unwrap();
    });

    // Send a message and then receive a message.
    s.send(1).unwrap();
    r.recv().unwrap();
}).unwrap();

Closing Channels (Disconnection)#

When all senders or all receivers associated with a channel are drop, the channel will be closed (disconnected), and no more messages can be sent, but any remaining messages can still be received. Sending and receiving operations on a disconnected channel will never block.

use crossbeam_channel::{unbounded, RecvError};

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();

// The only sender is dropped, and the channel is closed.
drop(s);

// The remaining messages can still be received.
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

// There are no more messages in the channel.
assert!(r.is_empty());

// Note that calling `r.recv()` will not block.
// After receiving all the messages, `Err(RecvError)` will be returned immediately.
assert_eq!(r.recv(), Err(RecvError));

Blocking Channels#

There are three cases for sending and receiving operations:

  • Non-blocking (returns immediately with success or failure)
  • Blocking (waits until the operation succeeds or the channel is disconnected)
  • Blocking with timeout (blocks for a certain period of time)

Here is a simple example that demonstrates the difference between non-blocking and blocking operations:

use crossbeam_channel::{bounded, RecvError, TryRecvError};

let (s, r) = bounded(1);

// Send a message to the channel
s.send("foo").unwrap();

// This call will block because the channel is full
// s.send("bar").unwrap();

// Receive a message
assert_eq!(r.recv(), Ok("foo"));

// This receive operation will block because the channel is empty
// r.recv();

 // The try_recv() method attempts to receive a message without blocking, and returns an error if the channel is empty
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

 // Close the channel
drop(s);

// This receive operation will not block because the channel is already closed
assert_eq!(r.recv(), Err(RecvError));

Iterators#

Receiving messages can be used as iterators. For example, the iter method creates an iterator that receives messages until the channel is empty and closed. Note that if the channel is empty, the iteration will block until the channel is closed or the next message arrives.

use std::thread;
use crossbeam_channel::unbounded;

let (s, r) = unbounded();

thread::spawn(move || {
    s.send(1).unwrap();
    s.send(2).unwrap();
    s.send(3).unwrap();
    drop(s); // Close the channel.
});

// Receive all messages from the channel.
// Note that calling `collect` will block until the channel is closed.
let v: Vec<_> = r.iter().collect();

assert_eq!(v, [1, 2, 3]);

You can also use try_iter to create a non-blocking iterator that receives all available messages until the channel is empty:

use crossbeam_channel::unbounded;

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// Don't close the channel

// Receive all current messages in the channel
let v: Vec<_> = r.try_iter().collect();

assert_eq!(v, [1, 2, 3]);

Selection#

The select! macro allows you to define a set of channel operations, wait for any of them to be ready, and then execute it. If multiple operations are ready at the same time, one of them will be randomly selected.

You can also define a default operation that will be executed immediately if no operation is ready, or after a certain period of time.

If an operation is not blocking, we consider it ready even if the channel is closed and an error is returned.

Here is an example of receiving messages from two channels:

use std::thread;
use std::time::Duration;
use crossbeam_channel::{select, unbounded};

let (s1, r1) = unbounded();
let (s2, r2) = unbounded();

thread::spawn(move || s1.send(10).unwrap());
thread::spawn(move || s2.send(20).unwrap());

// Only one receive operation will be executed at most.
select! {
    recv(r1) -> msg => assert_eq!(msg, Ok(10)),
    recv(r2) -> msg => assert_eq!(msg, Ok(20)),
    default(Duration::from_secs(1)) => println!("timed out"),
}

If you need to select a dynamically created list of channel operations, use Select. The select! macro is just a convenient wrapper for Select.

Special Channels#

Three functions can create channels of special types, all of which only return a Receiver:

  • after

    Creates a channel that passes a message after a certain period of time

  • tick

    Creates a channel that passes messages at regular intervals

  • never

    Creates a channel that never passes messages

These channels are very efficient because messages are only generated when receiving operations are performed.

Here is an example that prints the time every 50 milliseconds for 1 second:

use std::time::{Duration, Instant};
use crossbeam_channel::{after, select, tick};

let ticker = tick(Duration::from_millis(50));
let timeout = after(Duration::from_secs(1));
let start = Instant::now();

loop {
    select! {
        recv(ticker) -> _ => println!("elapsed: {:?}", start.elapsed()),
        recv(timeout) -> _ => break,
    }
}
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.