heapless/mpmc.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
//! A fixed capacity Multiple-Producer Multiple-Consumer (MPMC) lock-free queue
//!
//! NOTE: This module requires atomic CAS operations. On targets where they're not natively available,
//! they are emulated by the [`portable-atomic`](https://crates.io/crates/portable-atomic) crate.
//!
//! # Example
//!
//! This queue can be constructed in "const context". Placing it in a `static` variable lets *all*
//! contexts (interrupts / threads / `main`) safely enqueue and dequeue items from it.
//!
//! ``` ignore
//! #![no_main]
//! #![no_std]
//!
//! use panic_semihosting as _;
//!
//! use cortex_m::{asm, peripheral::syst::SystClkSource};
//! use cortex_m_rt::{entry, exception};
//! use cortex_m_semihosting::hprintln;
//! use heapless::mpmc::Q2;
//!
//! static Q: Q2<u8> = Q2::new();
//!
//! #[entry]
//! fn main() -> ! {
//! if let Some(p) = cortex_m::Peripherals::take() {
//! let mut syst = p.SYST;
//!
//! // configures the system timer to trigger a SysTick exception every second
//! syst.set_clock_source(SystClkSource::Core);
//! syst.set_reload(12_000_000);
//! syst.enable_counter();
//! syst.enable_interrupt();
//! }
//!
//! loop {
//! if let Some(x) = Q.dequeue() {
//! hprintln!("{}", x).ok();
//! } else {
//! asm::wfi();
//! }
//! }
//! }
//!
//! #[exception]
//! fn SysTick() {
//! static mut COUNT: u8 = 0;
//!
//! Q.enqueue(*COUNT).ok();
//! *COUNT += 1;
//! }
//! ```
//!
//! # Benchmark
//!
//! Measured on a ARM Cortex-M3 core running at 8 MHz and with zero Flash wait cycles
//!
//! N| `Q8::<u8>::enqueue().ok()` (`z`) | `Q8::<u8>::dequeue()` (`z`) |
//! -|----------------------------------|-----------------------------|
//! 0|34 |35 |
//! 1|52 |53 |
//! 2|69 |71 |
//!
//! - `N` denotes the number of *interruptions*. On Cortex-M, an interruption consists of an
//! interrupt handler preempting the would-be atomic section of the `enqueue` / `dequeue`
//! operation. Note that it does *not* matter if the higher priority handler uses the queue or
//! not.
//! - All execution times are in clock cycles. 1 clock cycle = 125 ns.
//! - Execution time is *dependent* of `mem::size_of::<T>()`. Both operations include one
//! `memcpy(T)` in their successful path.
//! - The optimization level is indicated in parentheses.
//! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue`
//! and `Ok` is returned by `enqueue`).
//!
//! # Portability
//!
//! This module requires CAS atomic instructions which are not available on all architectures
//! (e.g. ARMv6-M (`thumbv6m-none-eabi`) and MSP430 (`msp430-none-elf`)). These atomics can be
//! emulated however with [`portable-atomic`](https://crates.io/crates/portable-atomic), which is
//! enabled with the `cas` feature and is enabled by default for `thumbv6m-none-eabi` and `riscv32`
//! targets.
//!
//! # References
//!
//! This is an implementation of Dmitry Vyukov's ["Bounded MPMC queue"][0] minus the cache padding.
//!
//! [0]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
use core::{cell::UnsafeCell, mem::MaybeUninit};
#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;
use atomic::Ordering;
#[cfg(feature = "mpmc_large")]
type AtomicTargetSize = atomic::AtomicUsize;
#[cfg(not(feature = "mpmc_large"))]
type AtomicTargetSize = atomic::AtomicU8;
#[cfg(feature = "mpmc_large")]
type IntSize = usize;
#[cfg(not(feature = "mpmc_large"))]
type IntSize = u8;
/// MPMC queue with a capability for 2 elements.
pub type Q2<T> = MpMcQueue<T, 2>;
/// MPMC queue with a capability for 4 elements.
pub type Q4<T> = MpMcQueue<T, 4>;
/// MPMC queue with a capability for 8 elements.
pub type Q8<T> = MpMcQueue<T, 8>;
/// MPMC queue with a capability for 16 elements.
pub type Q16<T> = MpMcQueue<T, 16>;
/// MPMC queue with a capability for 32 elements.
pub type Q32<T> = MpMcQueue<T, 32>;
/// MPMC queue with a capability for 64 elements.
pub type Q64<T> = MpMcQueue<T, 64>;
/// MPMC queue with a capacity for N elements
/// N must be a power of 2
/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
pub struct MpMcQueue<T, const N: usize> {
buffer: UnsafeCell<[Cell<T>; N]>,
dequeue_pos: AtomicTargetSize,
enqueue_pos: AtomicTargetSize,
}
impl<T, const N: usize> MpMcQueue<T, N> {
const MASK: IntSize = (N - 1) as IntSize;
const EMPTY_CELL: Cell<T> = Cell::new(0);
const ASSERT: [(); 1] = [()];
/// Creates an empty queue
pub const fn new() -> Self {
// Const assert
crate::sealed::greater_than_1::<N>();
crate::sealed::power_of_two::<N>();
// Const assert on size.
Self::ASSERT[!(N < (IntSize::MAX as usize)) as usize];
let mut cell_count = 0;
let mut result_cells: [Cell<T>; N] = [Self::EMPTY_CELL; N];
while cell_count != N {
result_cells[cell_count] = Cell::new(cell_count);
cell_count += 1;
}
Self {
buffer: UnsafeCell::new(result_cells),
dequeue_pos: AtomicTargetSize::new(0),
enqueue_pos: AtomicTargetSize::new(0),
}
}
/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) }
}
/// Adds an `item` to the end of the queue
///
/// Returns back the `item` if the queue is full
pub fn enqueue(&self, item: T) -> Result<(), T> {
unsafe {
enqueue(
self.buffer.get() as *mut _,
&self.enqueue_pos,
Self::MASK,
item,
)
}
}
}
impl<T, const N: usize> Default for MpMcQueue<T, N> {
fn default() -> Self {
Self::new()
}
}
unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}
struct Cell<T> {
data: MaybeUninit<T>,
sequence: AtomicTargetSize,
}
impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as IntSize),
}
}
}
unsafe fn dequeue<T>(
buffer: *mut Cell<T>,
dequeue_pos: &AtomicTargetSize,
mask: IntSize,
) -> Option<T> {
let mut pos = dequeue_pos.load(Ordering::Relaxed);
let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8);
if dif == 0 {
if dequeue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
} else if dif < 0 {
return None;
} else {
pos = dequeue_pos.load(Ordering::Relaxed);
}
}
let data = (*cell).data.as_ptr().read();
(*cell)
.sequence
.store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
Some(data)
}
unsafe fn enqueue<T>(
buffer: *mut Cell<T>,
enqueue_pos: &AtomicTargetSize,
mask: IntSize,
item: T,
) -> Result<(), T> {
let mut pos = enqueue_pos.load(Ordering::Relaxed);
let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as i8).wrapping_sub(pos as i8);
if dif == 0 {
if enqueue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
} else if dif < 0 {
return Err(item);
} else {
pos = enqueue_pos.load(Ordering::Relaxed);
}
}
(*cell).data.as_mut_ptr().write(item);
(*cell)
.sequence
.store(pos.wrapping_add(1), Ordering::Release);
Ok(())
}
#[cfg(test)]
mod tests {
use super::Q2;
#[test]
fn sanity() {
let q = Q2::new();
q.enqueue(0).unwrap();
q.enqueue(1).unwrap();
assert!(q.enqueue(2).is_err());
assert_eq!(q.dequeue(), Some(0));
assert_eq!(q.dequeue(), Some(1));
assert_eq!(q.dequeue(), None);
}
#[test]
fn drain_at_pos255() {
let q = Q2::new();
for _ in 0..255 {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
// this should not block forever
assert_eq!(q.dequeue(), None);
}
#[test]
fn full_at_wrapped_pos0() {
let q = Q2::new();
for _ in 0..254 {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
assert!(q.enqueue(0).is_ok());
assert!(q.enqueue(0).is_ok());
// this should not block forever
assert!(q.enqueue(0).is_err());
}
}