(Naive) Concurrent Queue
A queue is an absolutely must-have structure for concurrent applications:
- a queue of requests can be used to route traffic to multiple worker threads
- a queue of tests can be used by a test framework to route them to worker threads
- a queue of background jobs that are executed by worker threads
First, let's build a simple, I would even say a "naive" version of the queue that is simply wrapped with a Mutex.
Oh, and let's make it have a fixed maximum size. If it's used to route requests in a multi-threaded server we don't want to open the door for DDoSing, right?
Here's a fixed-size queue that is not thread-safe:
#![allow(unused)] fn main() { use std::{collections::VecDeque, ffi::c_ulong}; struct UnsafeQueue { queue: VecDeque<c_ulong>, cap: usize, } impl UnsafeQueue { // Equivalent of `.allocate` method fn alloc() -> Self { Self { queue: VecDeque::new(), cap: 0, } } // Equivalent of a constructor fn init(&mut self, cap: usize) { self.cap = cap; } // A method to push a value to the queue // THIS CAN FAIL if the queue is full, and so it must return a boolean value fn try_push(&mut self, value: c_ulong) -> bool { if self.queue.len() < self.cap { self.queue.push_back(value); true } else { false } } // A method to pop a value from the queue // THIS CAN FAIL if the queue is empty fn try_pop(&mut self) -> Option<c_ulong> { self.queue.pop_front() } // A convenient helper for GC marking fn for_each(&self, f: extern "C" fn(c_ulong)) { for item in self.queue.iter() { f(*item); } } } }
Here we use Rust's built-in type called VecDeque that has push_back and pop_front method, plus it handles:
- the case when when push to a full queue (then falseis returned fromtry_push)
- when we pop from an empty queue (then Noneis returned from thepopmethod)
Now we wrap it with a Mutex:
#![allow(unused)] fn main() { // Exposed as `QueueWithMutex` class in Ruby pub struct QueueWithMutex { inner: Mutex<UnsafeQueue>, } impl QueueWithMutex { // Exposed as `QueueWithMutex.allocate` class in Ruby fn alloc() -> Self { Self { inner: Mutex::new(UnsafeQueue::alloc()), } } // Exposed as `QueueWithMutex#initialize` class in Ruby fn init(&mut self, cap: usize) { let mut inner = self.inner.lock(); inner.init(cap); } // GC marking logic fn mark(&self, f: extern "C" fn(c_ulong)) { let inner = self.inner.lock(); inner.for_each(f); } // Exposed as `QueueWithMutex#try_push` class in Ruby fn try_push(&self, value: c_ulong) -> bool { if let Some(mut inner) = self.inner.try_lock() { if inner.try_push(value) { return true; } } false } // Exposed as `QueueWithMutex#try_pop` class in Ruby fn try_pop(&self) -> Option<c_ulong> { if let Some(mut inner) = self.inner.try_lock() { if let Some(value) = inner.try_pop() { return Some(value); } } None } } }
As you can see it's a semi-transparent wrapper around UnsafeQueue, except that each operation on it first tries to acquire a lock on a Mutex and if it fails it also returns false or None, so our try_push and try_pop methods can now also fail because another thread holds a lock.
To escape Rust-specific Option<T> abstraction we can simply make a wrapping function take an additional fallback argument that is returned is the value of Option is None:
#![allow(unused)] fn main() { #[no_mangle] pub extern "C" fn queue_with_mutex_try_pop(queue: *mut QueueWithMutex, fallback: c_ulong) -> c_ulong { let queue = unsafe { queue.as_mut().unwrap() }; queue.try_pop().unwrap_or(fallback) } }
How can we safely push and pop in a blocking manner? Well, here for simplicty let's just add methods that retry try_push and try_pop in a loop, with a short sleep if it fails.
class QueueWithMutex
  class Undefined
    def inspect
      "#<Undefined>"
    end
  end
  UNDEFINED = Ractor.make_shareable(Undefined.new)
  def pop
    loop do
      value = try_pop(UNDEFINED)
      if value.equal?(UNDEFINED)
        # queue is empty, keep looping
      else
        return value
      end
      sleep 0.001
    end
  end
  def push(value)
    loop do
      pushed = try_push(value)
      return if pushed
      sleep 0.001
    end
  end
end
Here a special unique UNDEFINED object takes place of the fallback value that we use to identify absence of the value. This implementation is naive, but for now that's the goal (later, we'll implement a more advanced queue that doesn't rely on polling.).
Time to test it:
QUEUE = CAtomics::QueueWithMutex.new(10)
1.upto(5).map do |i|
  puts "Starting worker..."
  Ractor.new(name: "worker-#{i}") do
    puts "[#{Ractor.current.name}] Starting polling..."
    while (popped = QUEUE.pop) do
      puts "[#{Ractor.current.name}] #{popped}"
      sleep 3
    end
  end
end
value_to_push = 1
loop do
  QUEUE.push(value_to_push)
  sleep 0.5 # push twice a second to make workers "starve" and enter the polling loop
  value_to_push += 1
end
The output is the following (which means that it works!):
Starting worker...
Starting worker...
[worker-1] Starting polling...
Starting worker...
[worker-2] Starting polling...
Starting worker...
[worker-3] Starting polling...
Starting worker...
[worker-4] Starting polling...
[worker-5] Starting polling...
[worker-5] 1
[worker-2] 2
[worker-4] 3
[worker-1] 4
[worker-3] 5
[worker-5] 6
[worker-2] 7
[worker-4] 8
[worker-1] 9
// ...
What's interesting, this queue implementation is enough for use-cases where somewhat bad latency of starving workers is insignificant (because if the queue has items then .pop will immediately succeed in most cases). An example that I see is a test framework IF your individual tests are not trivial (i.e. take more than a microsecond).