Embassy
embassy-sync

Crates

git

Versions

default

Flavors

Struct embassy_sync::pubsub::PubSubChannel

source ·
pub struct PubSubChannel<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> { /* private fields */ }
Expand description

A broadcast channel implementation where multiple publishers can send messages to multiple subscribers

Any published message can be read by all subscribers. A publisher can choose how it sends its message.

  • With Pub::publish() the publisher has to wait until there is space in the internal message queue.
  • With Pub::publish_immediate() the publisher doesn’t await and instead lets the oldest message in the queue drop if necessary. This will cause any Subscriber that missed the message to receive an error to indicate that it has lagged.

§Example

// Create the channel. This can be static as well
let channel = PubSubChannel::<NoopRawMutex, u32, 4, 4, 4>::new();

// This is a generic subscriber with a direct reference to the channel
let mut sub0 = channel.subscriber().unwrap();
// This is a dynamic subscriber with a dynamic (trait object) reference to the channel
let mut sub1 = channel.dyn_subscriber().unwrap();

let pub0 = channel.publisher().unwrap();

// Publish a message, but wait if the queue is full
pub0.publish(42).await;

// Publish a message, but if the queue is full, just kick out the oldest message.
// This may cause some subscribers to miss a message
pub0.publish_immediate(43);

// Wait for a new message. If the subscriber missed a message, the WaitResult will be a Lag result
assert_eq!(sub0.next_message().await, WaitResult::Message(42));
assert_eq!(sub1.next_message().await, WaitResult::Message(42));

// Wait again, but this time ignore any Lag results
assert_eq!(sub0.next_message_pure().await, 43);
assert_eq!(sub1.next_message_pure().await, 43);

// There's also a polling interface
assert_eq!(sub0.try_next_message(), None);
assert_eq!(sub1.try_next_message(), None);

Implementations§

source§

impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubChannel<M, T, CAP, SUBS, PUBS>

source

pub const fn new() -> Self

Create a new channel

source

pub fn subscriber(&self) -> Result<Subscriber<'_, M, T, CAP, SUBS, PUBS>, Error>

Create a new subscriber. It will only receive messages that are published after its creation.

If there are no subscriber slots left, an error will be returned.

source

pub fn dyn_subscriber(&self) -> Result<DynSubscriber<'_, T>, Error>

Create a new subscriber. It will only receive messages that are published after its creation.

If there are no subscriber slots left, an error will be returned.

source

pub fn publisher(&self) -> Result<Publisher<'_, M, T, CAP, SUBS, PUBS>, Error>

Create a new publisher

If there are no publisher slots left, an error will be returned.

source

pub fn dyn_publisher(&self) -> Result<DynPublisher<'_, T>, Error>

Create a new publisher

If there are no publisher slots left, an error will be returned.

source

pub fn immediate_publisher( &self ) -> ImmediatePublisher<'_, M, T, CAP, SUBS, PUBS>

Create a new publisher that can only send immediate messages. This kind of publisher does not take up a publisher slot.

source

pub fn dyn_immediate_publisher(&self) -> DynImmediatePublisher<'_, T>

Create a new publisher that can only send immediate messages. This kind of publisher does not take up a publisher slot.

Trait Implementations§

source§

impl<M: RawMutex, T: Clone, const CAP: usize, const SUBS: usize, const PUBS: usize> PubSubBehavior<T> for PubSubChannel<M, T, CAP, SUBS, PUBS>

source§

fn get_message_with_context( &self, next_message_id: &mut u64, cx: Option<&mut Context<'_>> ) -> Poll<WaitResult<T>>

Try to get a message from the queue with the given message id. Read more
source§

fn available(&self, next_message_id: u64) -> u64

Get the amount of messages that are between the given the next_message_id and the most recent message. This is not necessarily the amount of messages a subscriber can still received as it may have lagged.
source§

fn publish_with_context( &self, message: T, cx: Option<&mut Context<'_>> ) -> Result<(), T>

Try to publish a message to the queue. Read more
source§

fn publish_immediate(&self, message: T)

Publish a message immediately
source§

fn space(&self) -> usize

The amount of messages that can still be published without having to wait or without having to lag the subscribers
source§

fn unregister_subscriber(&self, subscriber_next_message_id: u64)

Let the channel know that a subscriber has dropped
source§

fn unregister_publisher(&self)

Let the channel know that a publisher has dropped

Auto Trait Implementations§

§

impl<M, T, const CAP: usize, const SUBS: usize, const PUBS: usize> !Freeze for PubSubChannel<M, T, CAP, SUBS, PUBS>

§

impl<M, T, const CAP: usize, const SUBS: usize, const PUBS: usize> !RefUnwindSafe for PubSubChannel<M, T, CAP, SUBS, PUBS>

§

impl<M, T, const CAP: usize, const SUBS: usize, const PUBS: usize> Send for PubSubChannel<M, T, CAP, SUBS, PUBS>
where M: Send, T: Send,

§

impl<M, T, const CAP: usize, const SUBS: usize, const PUBS: usize> Sync for PubSubChannel<M, T, CAP, SUBS, PUBS>
where M: Sync, T: Send,

§

impl<M, T, const CAP: usize, const SUBS: usize, const PUBS: usize> Unpin for PubSubChannel<M, T, CAP, SUBS, PUBS>
where M: Unpin, T: Unpin,

§

impl<M, T, const CAP: usize, const SUBS: usize, const PUBS: usize> UnwindSafe for PubSubChannel<M, T, CAP, SUBS, PUBS>
where M: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.