Intro

This story is about concurrent data structures in the context of Ruby. The goal here is to demonstrate how true parallelism can be achieved with global mutable state (which at the time of writing, is not supported by built-in Ruby primitives).

Familiarity with Ruby, Rust, C, (and a bit of other tooling) is nice to have, but hopefully not mandatory.

The repository with code examples can be found on GitHub, to run it you need a relatively new version of Ruby (master branch is probably the best option if you can compile it locally), Rust and C compilers.

Ractors, what and why

A bit of history first (I promise, there will be code, a lot, soon). Ruby had threads for a really long time, but they are ... not quite parallel. That's right, Thread class in Ruby is a native thread under the hood (POSIX thread on Linux to be more specific, using pthread_* and friends) but in order to evaluate any instructions, a thread needs to acquire what's called the Global Interpreter Lock (or GIL). The consequence is obvious: only one thread can evaluate code at any point in time.

There has always been one good exception: I/O-related work and only if it's cooked "right". There are C APIs in Ruby that allow you to call your C function (let's say something like sql_adapter_execute_query) without acquiring the lock. Then, once the function returns, the lock is acquired again. If this API is used you can do I/O in parallel.

To sum up, in the world of Threads

  1. you can do I/O in parallel (like reading files)
  2. you can't do CPU-bound computations in parallel (like calculating Fibonacci numbers)

But things changed after Ruby 3.0 was released in 2020, now we have a new building brick called Ractor. Ractors are also implemented using threads internally but each Ractor has its own GIL. It was a very promising moment of "hey, we can have true multi-threaded parallel apps now!". As always there was a catch.

Ruby objects have no internal synchronization logic, so if Ractor A pushes to an array and so does Ractor B then... nobody knows what's going to happen; it's a race condition. At best, it crashes, at worst one push overwrites the other, and something weird starts happening. Fixing it requires wrapping every single object with a mutex or forbidding access to the same object from multiple threads. The solution was somewhere in the middle: you can only share objects but only if they are deeply frozen (there's a special Ractor.make_shareable API specifically for that). And don't get me wrong, I think it's a good compromise.

So now you can do computations in parallel if they don't share any mutable data which sounds like a HUGE limitation for real-world apps. Just off the top of my head, things that I'd like to have:

  1. a global queue of requests (main thread accepts incoming connections and pushes them to the queue. Worker threads poll the queue and process requests.)
  2. a global pool of objects (to store database connections)
  3. a global data structure to store metrics
  4. a global in-memory cache for things that change rarely but are needed everywhere (e.g. dynamic app configuration)

Calling require in a non-main Ractor wasn't possible before the latest version of Ruby (because it mutates shared global variable $LOADED_FEATURES), but now it's doable by sending a special message to the main Ractor that does require and waiting until it's done (remember, the main Ractor can mutate anything; otherwise it would be the biggest breaking change in the history of programming languages), and then it responds back to Ractor that asked for it so that it can continue its execution loop.

What's wrong with forking

Without truly parallel threads a common option was (and de-facto is) to use fork. It works but it comes with its own set of problems:

  1. child processes share some memory with their parent, but only if the actual memory hasn't been changed by a child. Any attempt to modify it on the child level makes the OS create a copy of the page that is about to change, copy the content from parent to child, and then apply changes there. In practice it means that if your app does a lot of lazy initialization then most probably you'll not share much memory. With threads nothing has to be copied
  2. you can't have any shared global state unless you use shared memory object API which is not easy to get right. If you absolutely must track some global progress then you have to introduce some IPC (e.g. via socketpair) which is not trivial. With threads everything can be shared and no additional abstraction is needed

Not a long time ago there was a series of interesting articles that mentioned Ractors in multiple places. One significant thing that I learned from it is that when you fork you can't share many internal data structures that are filled by Ruby under the hood. For example, inline method caches that are used to speed up method lookup. These caches depend on your runtime behaviour, and since each child process has its own flow they end up having different caches that are filled differently and in different order. This makes the OS to copy all pages that contain them.

Side note: do you remember a thing called "REE" (Ruby Enterprise Edition)? It was an "optimized" version of Ruby in pre-2.0 era. One of its key features was "copy-on-write friendly GC" that was about storing bitflags for marked objects not in the object itself but in a separate centralized place. Then, when GC runs, it would only change those "externally" stored bits instead of modifying objects. This way each process only has to copy this table of flags instead of copying the entire heap. By the way, from what I know these patches have been backported to Ruby in 2.0.

Ruby heap

After reading the previous section you might be under the impression that it's easier to think about Ruby heap as if there was a single "shared" heap full of frozen objects and a bunch of per-Ractor heaps that are mutable if you access it from the Ractor that owns it. Yes, maybe it's easier, but in reality, it's still a single heap, and every object is accessible by every Ractor.

o = Object.new
Ractor.make_shareable(o)
ID = o.object_id
puts "[MAIN] #{ID} #{o}"

r = Ractor.new do
  o2 = ObjectSpace._id2ref(ID)
  puts "[NON-MAIN] #{ID} #{o2}"
  Ractor.yield :done
end

r.take

This code prints

[MAIN] 5016 #<Object:0x00007f97f72523a8>
[NON-MAIN] 5016 #<Object:0x00007f97f72523a8>

which proves the statement above. However, removing the line Ractor.make_shareable(o) breaks the code with an error "5016" is id of the unshareable object on multi-ractor (RangeError) (By the way, why is it a RangeError?).

How can we make an object shareable (i.e. deeply frozen) but still mutable? Well, we can attach data on the C level to this object and make it mutable.

Side note: concurrent access is still possible

The snippet above requires calling Ractor.make_shareable because we use built-in Ruby methods, but what if we define our own functions?

// Converts given `obj` to its address
VALUE rb_obj_to_address(VALUE self, VALUE obj) { return LONG2NUM(obj); }
// Converts given address back to the object
VALUE rb_address_to_obj(VALUE self, VALUE obj) { return NUM2LONG(obj); }

// and then somewhere in the initialization logic
rb_define_global_function("obj_to_address", rb_obj_to_address, 1);
rb_define_global_function("address_to_obj", rb_address_to_obj, 1);

We defined two functions:

irb> o = "foo"
=> "foo"
irb> obj_to_address(o)
=> 140180443876200
irb> obj_to_address(o)
=> 140180443876200
irb> address_to_obj(obj_to_address(o))
=> "foo"

Let's see if the hack works:

require_relative './helper'

o = Object.new
ADDRESS = obj_to_address(o)
puts "[MAIN] #{ADDRESS} #{o}"

r = Ractor.new do
  o2 = address_to_obj(ADDRESS)
  puts "[NON-MAIN] #{ADDRESS} #{o2}"
  Ractor.yield :done
end

r.take

prints

[MAIN] 140194730661200 #<Object:0x00007f81a11ed550>
[NON-MAIN] 140194730661200 #<Object:0x00007f81a11ed550>

Of course doing this without adding any thread-safe wrappers is simply wrong. For example, the following snippet causes segfault:

require_relative './helper'

array = []
ADDRESS = obj_to_address(array)

ractors = 2.times.map do\
  # 2 threads
  Ractor.new do
    obj = address_to_obj(ADDRESS)
    # each mutates a shared non-thread-safe array
    1_000_000.times do
      obj.push(42)
      obj.pop
    end
    Ractor.yield :done
  end
end

p ractors.map(&:take)

Counter, the wrong way

I'm going to write all data structures here in Rust, and then wrap them with C.

Here's the wrong, non-thread-safe counter struct:

#![allow(unused)]
fn main() {
#[derive(Debug)]
pub struct PlainCounter {
    value: u64,
}

impl PlainCounter {
    // exposed as `PlainCounter.new` in Ruby
    pub fn new(n: u64) -> Self {
        Self { value: n }
    }

    // exposed as `PlainCounter#increment` in Ruby
    pub fn increment(&mut self) {
        self.value += 1;
    }

    // exposed as `PlainCounter#read` in Ruby
    pub fn read(&self) -> u64 {
        self.value
    }
}
}

There's no synchronization internally, and so calling increment from multiple threads is simply wrong.

By the way, you can't mutate it from multiple threads in Rust too. It simply won't compile.

Then, we need some glue code to expose these methods to C.

#![allow(unused)]
fn main() {
#[no_mangle]
pub extern "C" fn plain_counter_init(counter: *mut PlainCounter, n: u64) {
    unsafe { counter.write(PlainCounter::new(n)) }
}

#[no_mangle]
pub extern "C" fn plain_counter_increment(counter: *mut PlainCounter) {
    let counter = unsafe { counter.as_mut().unwrap() };
    counter.increment();
}

#[no_mangle]
pub extern "C" fn plain_counter_read(counter: *const PlainCounter) -> u64 {
    let counter = unsafe { counter.as_ref().unwrap() };
    counter.read()
}

pub const PLAIN_COUNTER_SIZE: usize = 8;
}

Why do we need size? That's a part of the C API that we'll use in a moment. Ruby will own our struct, and so it must know its size (but for some reason it doesn't care about alignment, I guess because it always places it at an address that is a multiple of 16 bytes?)

Then we call bindgen to generate C headers with 3 functions and one constant.

// rust-atomics.h

// THIS CODE IS AUTO-GENERATED
#define PLAIN_COUNTER_SIZE 8

typedef struct plain_counter_t plain_counter_t;

void plain_counter_init(plain_counter_t *counter, uint64_t n);
void plain_counter_increment(plain_counter_t *counter);
uint64_t plain_counter_read(const plain_counter_t *counter);

As you can see we don't even expose internal structure of the plain_counter_t, only its size.

Then we can finally write C extension:

// c_atomics.c
#include <ruby.h>
#include "plain-counter.h"

RUBY_FUNC_EXPORTED void Init_c_atomics(void) {
  rb_ext_ractor_safe(true);

  VALUE rb_mCAtomics = rb_define_module("CAtomics");

  init_plain_counter(rb_mCAtomics);
}

c_atomics is the main file of our extension:

  1. first, it calls rb_ext_ractor_safe which is absolutely required if we want to call functions defined by our C extension from non-main Ractors
  2. then, it declares (or re-opens if it's already defined) a module called CAtomics
  3. and finally it calls init_plain_counter that is defined in a file plain-counter.h (see below)
// plain-counter.h
#include "rust-atomics.h"
#include <ruby.h>

const rb_data_type_t plain_counter_data = {
    .function = {
        .dfree = RUBY_DEFAULT_FREE
    },
    .flags = RUBY_TYPED_FROZEN_SHAREABLE
};

VALUE rb_plain_counter_alloc(VALUE klass) {
  plain_counter_t *counter;
  TypedData_Make_Struct0(obj, klass, plain_counter_t, PLAIN_COUNTER_SIZE, &plain_counter_data, counter);
  plain_counter_init(counter, 0);
  VALUE rb_cRactor = rb_const_get(rb_cObject, rb_intern("Ractor"));
  rb_funcall(rb_cRactor, rb_intern("make_shareable"), 1, obj);
  return obj;
}

VALUE rb_plain_counter_increment(VALUE self) {
  plain_counter_t *counter;
  TypedData_Get_Struct(self, plain_counter_t, &plain_counter_data, counter);
  plain_counter_increment(counter);
  return Qnil;
}

VALUE rb_plain_counter_read(VALUE self) {
  plain_counter_t *counter;
  TypedData_Get_Struct(self, plain_counter_t, &plain_counter_data, counter);
  return LONG2FIX(plain_counter_read(counter));
}

static void init_plain_counter(VALUE rb_mCAtomics) {
  VALUE rb_cPlainCounter = rb_define_class_under(rb_mCAtomics, "PlainCounter", rb_cObject);
  rb_define_alloc_func(rb_cPlainCounter, rb_plain_counter_alloc);
  rb_define_method(rb_cPlainCounter, "increment", rb_plain_counter_increment, 0);
  rb_define_method(rb_cPlainCounter, "read", rb_plain_counter_read, 0);
}

Here we:

  1. Declare metadata of the native data type that will be attached to instances of our PlainCounter Ruby class
    1. It has default deallocation logic that does nothing (because we don't allocate anything on creation)
    2. It's marked as RUBY_TYPED_FROZEN_SHAREABLE, this is required or otherwise we'll get an error if we call Ractor.make_shareable on it
  2. Then there's an allocating function (which basically is what's called when you do YourClass.allocate):
    1. It calls TypedData_Make_Struct0 macro that defines an obj variable (the first argument) as an instance of klass (second argument) with data of type plain_counter_t that has size PLAIN_COUNTER_SIZE (the one we generated with bindgen) and has metadata plain_counter_data. The memory that is allocated and attached to obj is stored in the given counter argument.
    2. Then we call plain_counter_init which goes to Rust and properly initializes our struct with value = 0
    3. Then it makes the object Ractor-shareable literally by calling Ractor.make_shareable(obj) but in C.
    4. And finally it returns obj
  3. rb_plain_counter_increment and rb_plain_counter_read are just wrappers around Rust functions on the native attached data.
  4. Finally init_plain_counter function defines a PlainCounter Ruby class, attaches an allocating function and defines methods increment and read.

Does this work?

First, single-threaded mode to verify correctness:

require 'c_atomics'

counter = CAtomics::PlainCounter.new
1_000.times do
  counter.increment
end
p counter.read
# => 1000

Of course it works. Let's try multi-Ractor mode:

require 'c_atomics'

COUNTER = CAtomics::PlainCounter.new
ractors = 5.times.map do
  Ractor.new do
    1_000.times { COUNTER.increment }
    Ractor.yield :completed
  end
end
p ractors.map(&:take)
# => [:completed, :completed, :completed, :completed, :completed]
p COUNTER.read
# => 2357

That's a race condition, GREAT! Now we understand that it's possible to have objects that are shareable on the surface but mutable inside. All we need is to guarantee that internal data structure is synchronized and the key trick here is to use mutexes, atomic variables and lock-free data structures.

If you have some experience with Rust and you heard about lock-free data structures it might sound similar to you. Lock-free data structures have the same interface in Rust: they allow mutation through shared references to an object, like this:

#![allow(unused)]
fn main() {
struct LockFreeQueue<T> {
    // ...
}

impl LockFreeQueue<T> {
    fn push(&self, item: T) {
        // ...
    }

    fn pop(&self) -> Option<T> {
        // ...
    }
}
}

Atomics

SPOILER: I'm not an expert in this area and if you are really interested in learning how atomics work better read something else.

If you are comfortable with Rust I would personally recommend "Rust Atomics and Locks" by Mara Bos

I'll do my best to explain what I know but please take it with a grain of salt.

When I say "atomics" I mean atomic variables. In Rust there's a set of data types representing atomic variables, e.g. std::sync::atomic::AtomicU64. They can be modified using atomic operations like fetch_add and compare_and_swap and the change that happens is always atomic.

Internally they rely on a set of special CPU instructions (or rather a special lock instruction prefix):

#![allow(unused)]
fn main() {
#[no_mangle]
pub fn add_relaxed(n: std::sync::atomic::AtomicU64) -> u64 {
    n.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
}

becomes

add_relaxed:
        mov     qword ptr [rsp - 8], rdi
        mov     eax, 1
        lock            xadd    qword ptr [rsp - 8], rax
        ret

Of course it's possible to load and store them as well. However, you might've noticed that there's a special argument called "memory ordering" that needs to be passed. Rust follows C++ memory model which is not the only one but I think it's the most popular model as of now.

The problem with both modern compilers and CPUs (well, in fact, it's a feature) is that they can re-order instructions if they think that it makes the code run faster, but it can also produce a race condition.

The idea is that for each atomic operation that you perform you need to additionally pass a special enum flag that is one of:

relaxed

That's the "weakest" requirement for the CPU. This mode requires no synchronization and allows any kind of re-ordering. It's the fastest type of atomic operation and it's very suitable for things like counters or just reads/writes where order doesn't matter, or when you only care about the final result. This is what we are going to use in the next chapter to implement correct atomic counter.

acquire/release

I'm going to quote C++ documentation here:

A load operation with acquire memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load. All writes in other threads that release the same atomic variable are visible in the current thread.

A store operation with release memory order performs the release operation: no reads or writes in the current thread can be reordered after this store. All writes in the current thread are visible in other threads that acquire the same atomic variable and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic.

If it sounds complicated you are not alone. Here's a nice example from C++:

std::atomic<std::string*> ptr;
int data;

void producer()
{
    std::string* p = new std::string("Hello");
    data = 42;
    ptr.store(p, std::memory_order_release);
}

void consumer()
{
    std::string* p2;
    while (!(p2 = ptr.load(std::memory_order_acquire)))
        ;
    assert(*p2 == "Hello"); // never fires
    assert(data == 42); // never fires
}

int main()
{
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join(); t2.join();
}

Here when we call store(release) in producer it's guaranteed that any other threads that loads the value using load(acquire) will see the change to the underlying value (a string) together with other changes made by the writing thread (int data).

This synchronization primitive might look unusual to you if you have never seen it before, but the idea is simple: this memory ordering level guarantees that all of your changes made in one thread become visible to other thread in one go.

seq_cst

Stands for "Sequentially Consistent" ordering.

A load operation with seq_cst memory order performs an acquire operation, a store performs a release operation, and read-modify-write performs both an acquire operation and a release operation, plus a single total order exists in which all threads observe all modifications in the same order.

That's the strongest level of "consistency" and also the slowest.

It all looks similar to transactional databases, right?

Kind of, there's something in common:

Memory OrderingDatabase Isolation Level
RelaxedUncommitted
Acquire/ReleaseRepeatable Read
SequentialSerializable

But in my opinion it's better NOT to think of atomics in terms of databases. Levels of memory ordering aim to represent how instructions can/cannot be reordered and what happens-before or happens-after what.

Counter, the right way

Okay, at this point we know that instead of a plain integer we need to use an atomic int and we should use Ordering::Relaxed to fetch_add and load it.

Starting from this section, I'll omit the part with C functions, instead there will be only comments like "Exposed as XXX in Ruby"

use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug)]
pub struct AtomicCounter {
    value: AtomicU64,
}

impl AtomicCounter {
    // Exposed as `AtomicCounter.new` in Ruby
    pub fn new(n: u64) -> Self {
        Self {
            value: AtomicU64::new(n),
        }
    }

    // Exposed as `AtomicCounter#increment` in Ruby
    pub fn increment(&self) {
        self.value.fetch_add(1, Ordering::Relaxed);
    }

    pub fn read(&self) -> u64 {
        self.value.load(Ordering::Relaxed)
    }
}

The main question is "does it actually work?". First, single-threaded code

require 'c_atomics'

counter = CAtomics::AtomicCounter.new
1_000.times do
  counter.increment
end
p counter.read
# => 1000

Great, it works. Time for multi-threaded code:

require 'c_atomics'

COUNTER = CAtomics::AtomicCounter.new
ractors = 5.times.map do
  Ractor.new do
    1_000.times { COUNTER.increment }
    Ractor.yield :completed
  end
end
p ractors.map(&:take)
# => [:completed, :completed, :completed, :completed, :completed]
p COUNTER.read
# => 5000

Isn't it great?

Containers, Ractors, and GC

Remember: we are here to build concurrent data structures, not just plain counters. What are containers in high-level programming languages with managed memory? They are still "normal" containers that hold references to other objects, in other words an array of data objects is not a blob of memory with objects located one after another, it's a blob of pointers to those objects.

Objects in Ruby are represented using VALUE type which is just an unsigned long C type that is a 64-bit unsigned integer. In fact it's a tagged pointer where top bits define what is this VALUE and low bits represent the actual value.

Just like in other interpreted languages small integers, true, false, nil and some other values are represented with a special pattern of bits that can be checked using special macros FIXNUM_P, NIL_P and others. Also it means that "not every object in Ruby is passed by reference" but that's a separate topic.

So an object that we want to store in our containers is a number, std::ffi::c_ulong to be more specific. Ok, sounds good so far, but two questions immediately pop into my head.

1. Can we have containers that allow us to temporarily get a reference to stored objects?

Here's an example:

COLLECTION = SomeThreadSafeStruct.new

r1 = Ractor.new { COLLECTION.get(key).update(value) }
r2 = Ractor.new { COLLECTION.get(key).update(value) }

This is basically a race condition. I see two options:

  1. we can definitely have data structures that DON'T allow "borrowing" of any value from the inside. An example of such data structure would be a queue, .push(value) "moves" the value to the queue and nobody else in this thread can access it anymore. .pop "moves" the value from the queue back to the user code. This way we can guarantee that only one thread accesses each element at any point in time. Unfortunately there's no way to enforce it but it could be done safely on the level of a single library that uses this queue internally.
  2. we can definitely have data structures that only store other concurrent values, then we can safely "borrow" them

For 1 here's a rough equivalent of the code:

QUEUE = SafeQueue.new

N.times do
  Ractor.new do
    process(QUEUE.pop)
  end
end

DATA.each do |value|
  QUEUE.push(value)
end

# However you can't get nth element of the queue, e.g.
# QUEUE[3] or QUEUE.peek or QUEUE.last is not allowed

For 2 I think something like this is very doable:

# All keys are Ractor-shareable
KEYS = Ractor.make_shareable(["key-1", "key-2", "key-3"])

METRICS = SafeHashMap.new

KEYS.each do |key|
  METRICS[key] = SafeCounter.new
end

N.times do
  Ractor.new do
    METRICS[KEYS.sample].increment
  end
end

This code is safe because keys are frozen and values are thread-safe objects that have a static lifetime (i.e. they live through the whole lifetime of the program)

IMO anything else is not really possible unless you write code in a certain way that guarantees the lack of race conditions (which is possible but definitely fragile).

2. How does it work when GC runs in parallel?

This is a tricky question and I should start from the scratch. When GC starts, it iterates over Ractors and acquires an Interpreter Lock for each of them. We can demonstrate it with a simple code:

use std::{ffi::c_ulong, time::Duration};

pub struct SlowObject {
    n: u64,
}

impl SlowObject {
    fn alloc() -> Self {
        Self { n: 0 }
    }

    fn init(&mut self, n: u64) {
        self.n = n;
    }

    fn mark(&self, _: extern "C" fn(c_ulong)) {
        eprintln!("[mark] started");
        std::thread::sleep(Duration::from_secs(2));
        eprintln!("[mark] finished");
    }

    fn slow_op(&self) {
        eprintln!("[slow_op] started");
        for i in 1..=10 {
            eprintln!("tick {i}");
            std::thread::sleep(Duration::from_millis(100));
        }
        eprintln!("[slow_op] finished");
    }
}

I'm not sure if an integer field here is required but as I remember C doesn't support zero-sized structs, so that's just a way to guarantee that things are going to work.

This struct has:

  1. a mark callback that will be called by Ruby GC to mark its internals and it takes 2 seconds to run, so basically if we have N objects of this class on the heap GC will take at least 2*N seconds to run
  2. a slow_op method that prints tick <N> 10 times with a 100ms delay (so it takes a second to run)

Then we'll define these 2 methods in the C extension:

VALUE rb_slow_object_slow_op(VALUE self) {
  slow_object_t *slow;
  TypedData_Get_Struct(self, slow_object_t, &slow_object_data, slow);
  slow_object_slow_op(slow);
  return Qnil;
}

VALUE rb_slow_object_slow_op_no_gvl_lock(VALUE self) {
  slow_object_t *slow;
  TypedData_Get_Struct(self, slow_object_t, &slow_object_data, slow);
  rb_thread_call_without_gvl(slow_object_slow_op, slow, NULL, NULL);
  return Qnil;
}

static void init_slow_object(VALUE rb_mCAtomics) {
  VALUE rb_cSlowObject = rb_define_class_under(rb_mCAtomics, "SlowObject", rb_cObject);
  // ...
  rb_define_method(rb_cSlowObject, "slow_op", rb_slow_object_slow_op, 0);
  rb_define_method(rb_cSlowObject, "slow_op_no_gvl_lock", rb_slow_object_slow_op_no_gvl_lock, 0);
}

When we run the following code first (note that it calls slow_op that does acquire an Interpreter Lock) Ruby waits for our Rust method to return control to Ruby:

slow = CAtomics::SlowObject.new(42)
Ractor.new(slow) do |slow|
  5.times { slow.slow_op }
  Ractor.yield :done
end
5.times { GC.start; sleep 0.1 }

With this code we see the following repeating pattern:

[mark] started
[mark] finished
[slow_op] started
tick 1
tick 2
tick 3
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9
tick 10
[slow_op] finished
[mark] started
[mark] finished

Which means that GC waits for our slow_op method to finish its looping, so normally Ruby DOES NOT run your code in parallel to GC. But what if we call slow_op_no_gvl_lock?

slow = CAtomics::SlowObject.new(42)
Ractor.new(slow) do |slow|
  5.times { slow.slow_op_no_gvl_lock }
  Ractor.yield :done
end
5.times { GC.start; sleep 0.1 }

Now our slow_op function runs in parallel:

[mark] started
[mark] finished
[slow_op] started
tick 1
tick 2
[mark] started
tick 3
tick 4
tick 5
tick 6
tick 7
tick 8
tick 9
tick 10
[slow_op] finished
[mark] finished
[slow_op] started
tick 1
tick 2
[mark] started
tick 3

bonus question: what about GC compaction?

Starting from Ruby 3.0 there's a new step of GC called "compaction". It's a process of moving Ruby objects from one place to another (similar to "file system defragmentation"). How can we keep Ruby object addresses in our structure AND at the same time support their potential moving?

Turns out there's an API for that, it's called rb_gc_location. This function is called during compaction step and for any given "old" address of an object it returns a "new" one, so we can simply iterate over our data structure and do element = rb_gc_location(element).

Concurrent HashMap

We are already using Rust at this point, so can we just take a popular Rust package that implements it? Of course, I'm going to use dashmap. Internally it locks individual buckets (or shards if you prefer) when we access certain parts of the hashmap.

use std::ffi::c_ulong;

struct ConcurrentHashMap {
    map: dashmap::DashMap<c_ulong, c_ulong>,
}

impl ConcurrentHashMap {
    // Exposed as `ConcurrentHashMap.new` in Ruby
    fn new() -> Self {
        Self {
            map: dashmap::DashMap::new(),
        }
    }

    // Exposed as `ConcurrentHashMap#get` in Ruby
    fn get(&self, key: c_ulong) -> Option<c_ulong> {
        self.map.get(&key).map(|v| *v)
    }

    // Exposed as `ConcurrentHashMap#set` in Ruby
    fn set(&self, key: c_ulong, value: c_ulong) {
        self.map.insert(key, value);
    }

    // Exposed as `ConcurrentHashMap#clear` in Ruby
    fn clear(&self) {
        self.map.clear()
    }

    // Exposed as `ConcurrentHashMap#fetch_and_modify` in Ruby
    fn fetch_and_modify(&self, key: c_ulong, f: extern "C" fn(c_ulong) -> c_ulong) {
        self.map.alter(&key, |_, v| f(v));
    }

    // Callback for marking an object
    // Exposed as `concurrent_hash_map_mark` in C
    fn mark(&self, f: extern "C" fn(c_ulong)) {
        for pair in self.map.iter() {
            f(*pair.key());
            f(*pair.value());
        }
    }
}

mark function is used as .dmark field in our native type configuration:

void rb_concurrent_hash_map_mark(void *ptr) {
  concurrent_hash_map_t *hashmap = ptr;
  concurrent_hash_map_mark(hashmap, rb_gc_mark);
}

const rb_data_type_t concurrent_hash_map_data = {
    .function = {
        .dmark = rb_concurrent_hash_map_mark,
        // ...
    },
    // ...
};

The trick for fetch_and_modify is to pass rb_yield function that calls block of the current scope with a given value and returns whatever the block returns:

VALUE rb_concurrent_hash_map_fetch_and_modify(VALUE self, VALUE key) {
  rb_need_block();
  concurrent_hash_map_t *hashmap;
  TypedData_Get_Struct(self, concurrent_hash_map_t, &concurrent_hash_map_data, hashmap);
  concurrent_hash_map_fetch_and_modify(hashmap, key, rb_yield);
  return Qnil;
}

Then we can add a few helper functions in Ruby:

class CAtomics::ConcurrentHashMap
  def self.with_keys(known_keys)
    map = new
    known_keys.each { |key| map.set(key, 0) }
    map
  end

  def increment_random_value(known_keys)
    fetch_and_modify(known_keys.sample) { |v| v + 1 }
  end

  def sum(known_keys)
    known_keys.map { |k| get(k) }.sum
  end
end

It's definitely not the best interface, but it works for testing.

KEYS = 1.upto(5).map { |i| "key-#{i}" }
# => ["key-1", "key-2", "key-3", "key-4", "key-5"]
Ractor.make_shareable(KEYS)

MAP = CAtomics::ConcurrentHashMap.with_keys(KEYS)

ractors = 5.times.map do
  Ractor.new do
    1_000.times { MAP.increment(KEYS.sample) }
    Ractor.yield :completed
  end
end
p ractors.map(&:take)
# => [:completed, :completed, :completed, :completed, :completed]

MAP.sum(KEYS)
# => 5000

Wait, why do the values increment correctly? Shouldn't the values inside the hashmap be atomic as well? No, this is actually fine, the code is correct. DashMap locks individual parts of our hashmap every time we call fetch_and_modify and so no threads can update the same key/value pair in parallel.

There are two problems with our API though

it's unsafe

anyone can get a reference to any object from .get or keep what they pass to .set for future use. I see no solutions other than keeping it private with a HUGE note saying "this is actually internal, WE know how to use it, you don't" or simply not introducing such API at all.

does it work with non-static Ruby values?

I think it doesn't respect Ruby's .hash and .eql? methods and works only if you pass the same object again (one of the frozen static KEYS), so in some sense it works as if we called compare_by_identity on it.

Let's fix it! First, there are two C functions that we need to call from our C code:

unsafe extern "C" {
    fn rb_hash(obj: c_ulong) -> c_ulong;
    fn rb_eql(lhs: c_ulong, rhs: c_ulong) -> c_int;
}

The first one returns a hash of the given object as a Ruby number (i.e. Ruby Integer, not int from C). We don't care about it, any value is fine. The second one calls lhs == rhs using Ruby method dispatch and returns non-zero if the objects are equal. For DashMap we need to implement a few Rust traits to call them properly:

// This is our wrapper type that uses Ruby functions for `.hash` and `.eql?`
#[derive(Debug)]
struct RubyHashEql(c_ulong);

// Called by `dashmap` to compare objects
impl PartialEq for RubyHashEql {
    fn eq(&self, other: &Self) -> bool {
        unsafe { rb_eql(self.0, other.0) != 0 }
    }
}
impl Eq for RubyHashEql {}

// Called to compute hash
impl std::hash::Hash for RubyHashEql {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        let ruby_hash = unsafe { rb_hash(self.0) };
        ruby_hash.hash(state);
    }
}

struct ConcurrentHashMap {
    // And here is the change, so now the keys are hashed and compared using Ruby semantics
    map: dashmap::DashMap<RubyHashEql, c_ulong>,
}

And finally it works as expected:

Point = Struct.new(:x, :y)

map = CAtomics::ConcurrentHashMap.new

map.set(Point.new("one-point-two", "seven"), "BAR")
map.get(Point.new("one-point-two", "seven"))
# => "BAR"

Concurrent ObjectPool

That's a pretty common pattern in multi-threaded apps in case you need:

  1. something like a connection pool for your database or any kind of external storage service
  2. maybe a pool of persistent connections for an external service if it has rate limits
  3. a pool of worker threads
  4. or maybe even a pool of pre-allocated memory buffers that are reused for heavy data loading

So let's think about the interface first, how about this?

size = 5
timeout_in_ms = 3_000
def make_object
  # connect to the DB and return connection
end
pool = OurObjectPool.new(size, timeout_in_ms) { make_object }

pool.with do |connection|
  # do something with `connection`
end
# the object is automatically returned to the pool once the block exits

Having a timeout is a must for a database connection pool in real-world apps, but if it's a pool of workers in some cases it doesn't make sense, so I think it can be made optional so that if it's not passed then no timeout error should occur. I'm going with a non-flexible approach here, timeout configuration will be a required parameter.

Another data structure, another Rust dependency

After all, that was the reason I chose Rust here.

crossbeam_channel is a Rust library for multi-producer multi-consumer queues, with timeout support. Why do we need it here? Good question.

We can store the pool as a plain array of objects and keep track of all "unused" indexes in the queue (in any order), so that when you call .checkout it'll .pop from an index the queue and return a tuple of[array[idx], idx], then you do somthing with the object and at thend you call .checkin(idx) to push it back to the queue. Of course, initially the queue should be filled with all available indexes from 0 to POOL_SIZE.

Internally it can be visualized as this:

object pool

  • green items are safe for direct usage by multiple threads
  • blue items are not safe, but access to them is protected by green items

Here's how it looks when 2 threads temporarily pop the value (orange values are still in the pool, but no thread can take them because their indices are not in the queue):

object pool 2 popped

And finally this is what happens when the second thread returns the value back to the pool:

object pool 1 returned

This way each object will be either in the "unused" queue (implicitly, via its index) or in use by exactly one thread. And no synchronization of the underlying array is needed. That's the beauty of using existing ecosystem of great libraries.

#![allow(unused)]
fn main() {
use crossbeam_channel::{Receiver, Sender};
use std::{ffi::c_ulong, time::Duration};

// This is the pool itself
pub struct FixedSizeObjectPool {
    // a fixed-size array, as a Vec because we know its size only at runtime
    // however, it never resizes
    pool: Vec<c_ulong>,
    // "sending" part of the queue (that we "push" to)
    tx: Sender<usize>,
    // "receiving" part of the queue (that we "pop" from)
    rx: Receiver<usize>,
    timeout: Duration,
}

// We need to return a combination of `idx` and `object` from `.checkout` method,
// so this struct simply represents this tuple
#[repr(C)]
pub struct PooledItem {
    pub idx: usize,
    pub rbobj: c_ulong,
}

impl FixedSizeObjectPool {
    // Exposed as `FixedSizeObjectPool.allocate` in Ruby
    fn new() -> Self {
        let (tx, rx) = crossbeam_channel::unbounded();

        Self {
            pool: vec![],
            tx,
            rx,
            timeout: Duration::MAX,
        }
    }

    // Exposed as `FixedSizeObjectPool#initialize` in Ruby
    fn init(
        &mut self,
        size: usize,
        timeout_in_ms: u64,
        rb_make_obj: extern "C" fn(c_ulong) -> c_ulong,
    ) {
        self.timeout = Duration::from_millis(timeout_in_ms);

        self.pool = Vec::with_capacity(size);
        for idx in 0..size {
            self.pool.push((rb_make_obj)(0));
            self.tx.send(idx).unwrap();
        }
    }

    // Our standard "marking" routine, similar to the one we had for DashMap
    fn mark(&self, f: extern "C" fn(c_ulong)) {
        for item in self.pool.iter() {
            f(*item);
        }
    }

    // Exposed as `FixedSizeObjectPool#checkout` in Ruby
    fn checkout(&mut self) -> Option<PooledItem> {
        let idx = self.rx.recv_timeout(self.timeout).ok()?;
        Some(PooledItem {
            idx,
            rbobj: self.pool[idx],
        })
    }

    // Exposed as `FixedSizeObjectPool#checkin` in Ruby
    fn checkin(&mut self, idx: usize) {
        self.tx.send(idx).expect("bug: receiver has been dropped");
    }
}
}

Then the only unusual part is error handling around checkout method:

#![allow(unused)]
fn main() {
#[no_mangle]
pub unsafe extern "C" fn fixed_size_object_pool_checkout(pool: *mut FixedSizeObjectPool) -> PooledItem {
    let pool = unsafe { pool.as_mut().unwrap() };
    pool.checkout().unwrap_or(PooledItem { idx: 0, rbobj: 0 })
}
}

So if we get a timeout error we return [0, 0] pair as [obj, idx]. And then in C we can do:

VALUE rb_fixed_size_object_pool_checkout(VALUE self) {
  fixed_size_object_pool_t *pool;
  TypedData_Get_Struct(self, fixed_size_object_pool_t, &fixed_size_object_pool_data, pool);
  PooledItem pooled = fixed_size_object_pool_checkout(pool);
  if (pooled.idx == 0 && pooled.rbobj == 0) {
    return Qnil;
  }
  VALUE ary = rb_ary_new_capa(2);
  rb_ary_push(ary, pooled.rbobj);
  rb_ary_push(ary, LONG2FIX(pooled.idx));
  return ary;
}

Which either returns [obj, idx] literally as an array of two elements or returns nil otherwise.

With these .checkout and .checkin methods we can build a wrapper:

module CAtomics
  class FixedSizeObjectPool
    def with
      obj_and_idx = checkout
      if obj_and_idx.nil?
        raise 'timeout error'
      else
        yield obj_and_idx[0]
      end
    ensure
      unless obj_and_idx.nil?
        checkin(obj_and_idx[1])
      end
    end
  end
end

Does this work?

POOL_SIZE = 5
objects = 1.upto(POOL_SIZE).map { |i| ["pool-object-#{i}"] }
POOL = CAtomics::FixedSizeObjectPool.new(POOL_SIZE, 1_000) { objects.shift }

ractors = 1.upto(POOL_SIZE).map do |i|
  Ractor.new(i) do |i|
    10.times do |j|
      POOL.with do |v|
        v.push([i, j])
      end
    end

    Ractor.yield :done
  end
end

p ractors.map(&:take)
# => [:done, :done, :done, :done, :done]

POOL_SIZE.times do
  p POOL.checkout
end
# => [["pool-object-1", [1, 0], [2, 3], [1, 5], [1, 7], [3, 0], [3, 5], [4, 0], [4, 5], [5, 0], [5, 5]], 0]
# => [["pool-object-2", [2, 0], [1, 2], [2, 5], [2, 8], [3, 1], [3, 6], [4, 1], [4, 6], [5, 1], [5, 6]], 1]
# => [["pool-object-3", [1, 1], [2, 4], [2, 6], [1, 8], [3, 2], [3, 7], [4, 2], [4, 7], [5, 2], [5, 7]], 2]
# => [["pool-object-4", [2, 1], [1, 3], [1, 6], [2, 9], [3, 3], [3, 8], [4, 3], [4, 8], [5, 3], [5, 8]], 3]
# => [["pool-object-5", [2, 2], [1, 4], [2, 7], [1, 9], [3, 4], [3, 9], [4, 4], [4, 9], [5, 4], [5, 9]], 4]

POOL.with { |obj| }
# => c_atomics/lib/c_atomics.rb:24:in 'CAtomics::FixedSizeObjectPool#with': timeout error (RuntimeError)
# =>    from tests/fixed-size-object-pool.rb:23:in '<main>'

As you can see each object in our pool (which is an array that accumulates values from different Ractors) has been used by 5 different threads, and when we take all items from the pool at the end (using .checkout) and call .with again on an empty pool, then it throws an error after 1 second.

(Naive) Concurrent Queue

A queue is an absolutely must-have structure for concurrent applications:

  1. a queue of requests can be used to route traffic to multiple worker threads
  2. a queue of tests can be used by a test framework to route them to worker threads
  3. a queue of background jobs that are executed by worker threads

First, let's build a simple, I would even say a "naive" version of the queue that is simply wrapped with a Mutex.

Oh, and let's make it have a fixed maximum size. If it's used to route requests in a multi-threaded server we don't want to open the door for DDoSing, right?

Here's a fixed-size queue that is not thread-safe:

#![allow(unused)]
fn main() {
use std::{collections::VecDeque, ffi::c_ulong};

struct UnsafeQueue {
    queue: VecDeque<c_ulong>,
    cap: usize,
}

impl UnsafeQueue {
    // Equivalent of `.allocate` method
    fn alloc() -> Self {
        Self {
            queue: VecDeque::new(),
            cap: 0,
        }
    }

    // Equivalent of a constructor
    fn init(&mut self, cap: usize) {
        self.cap = cap;
    }

    // A method to push a value to the queue
    // THIS CAN FAIL if the queue is full, and so it must return a boolean value
    fn try_push(&mut self, value: c_ulong) -> bool {
        if self.queue.len() < self.cap {
            self.queue.push_back(value);
            true
        } else {
            false
        }
    }

    // A method to pop a value from the queue
    // THIS CAN FAIL if the queue is empty
    fn try_pop(&mut self) -> Option<c_ulong> {
        self.queue.pop_front()
    }

    // A convenient helper for GC marking
    fn for_each(&self, f: extern "C" fn(c_ulong)) {
        for item in self.queue.iter() {
            f(*item);
        }
    }
}
}

Here we use Rust's built-in type called VecDeque that has push_back and pop_front method, plus it handles:

  1. the case when when push to a full queue (then false is returned from try_push)
  2. when we pop from an empty queue (then None is returned from the pop method)

Now we wrap it with a Mutex:

#![allow(unused)]
fn main() {
// Exposed as `QueueWithMutex` class in Ruby
pub struct QueueWithMutex {
    inner: Mutex<UnsafeQueue>,
}

impl QueueWithMutex {
    // Exposed as `QueueWithMutex.allocate` class in Ruby
    fn alloc() -> Self {
        Self {
            inner: Mutex::new(UnsafeQueue::alloc()),
        }
    }

    // Exposed as `QueueWithMutex#initialize` class in Ruby
    fn init(&mut self, cap: usize) {
        let mut inner = self.inner.lock();
        inner.init(cap);
    }

    // GC marking logic
    fn mark(&self, f: extern "C" fn(c_ulong)) {
        let inner = self.inner.lock();
        inner.for_each(f);
    }

    // Exposed as `QueueWithMutex#try_push` class in Ruby
    fn try_push(&self, value: c_ulong) -> bool {
        if let Some(mut inner) = self.inner.try_lock() {
            if inner.try_push(value) {
                return true;
            }
        }
        false
    }

    // Exposed as `QueueWithMutex#try_pop` class in Ruby
    fn try_pop(&self) -> Option<c_ulong> {
        if let Some(mut inner) = self.inner.try_lock() {
            if let Some(value) = inner.try_pop() {
                return Some(value);
            }
        }

        None
    }
}
}

As you can see it's a semi-transparent wrapper around UnsafeQueue, except that each operation on it first tries to acquire a lock on a Mutex and if it fails it also returns false or None, so our try_push and try_pop methods can now also fail because another thread holds a lock.

To escape Rust-specific Option<T> abstraction we can simply make a wrapping function take an additional fallback argument that is returned is the value of Option is None:

#![allow(unused)]
fn main() {
#[no_mangle]
pub extern "C" fn queue_with_mutex_try_pop(queue: *mut QueueWithMutex, fallback: c_ulong) -> c_ulong {
    let queue = unsafe { queue.as_mut().unwrap() };
    queue.try_pop().unwrap_or(fallback)
}
}

How can we safely push and pop in a blocking manner? Well, here for simplicty let's just add methods that retry try_push and try_pop in a loop, with a short sleep if it fails.

class QueueWithMutex
  class Undefined
    def inspect
      "#<Undefined>"
    end
  end
  UNDEFINED = Ractor.make_shareable(Undefined.new)

  def pop
    loop do
      value = try_pop(UNDEFINED)
      if value.equal?(UNDEFINED)
        # queue is empty, keep looping
      else
        return value
      end
      sleep 0.001
    end
  end

  def push(value)
    loop do
      pushed = try_push(value)
      return if pushed
      sleep 0.001
    end
  end
end

Here a special unique UNDEFINED object takes place of the fallback value that we use to identify absence of the value. This implementation is naive, but for now that's the goal (later, we'll implement a more advanced queue that doesn't rely on polling.).

Time to test it:

QUEUE = CAtomics::QueueWithMutex.new(10)

1.upto(5).map do |i|
  puts "Starting worker..."

  Ractor.new(name: "worker-#{i}") do
    puts "[#{Ractor.current.name}] Starting polling..."
    while (popped = QUEUE.pop) do
      puts "[#{Ractor.current.name}] #{popped}"
      sleep 3
    end
  end
end

value_to_push = 1
loop do
  QUEUE.push(value_to_push)
  sleep 0.5 # push twice a second to make workers "starve" and enter the polling loop
  value_to_push += 1
end

The output is the following (which means that it works!):

Starting worker...
Starting worker...
[worker-1] Starting polling...
Starting worker...
[worker-2] Starting polling...
Starting worker...
[worker-3] Starting polling...
Starting worker...
[worker-4] Starting polling...
[worker-5] Starting polling...
[worker-5] 1
[worker-2] 2
[worker-4] 3
[worker-1] 4
[worker-3] 5
[worker-5] 6
[worker-2] 7
[worker-4] 8
[worker-1] 9
// ...

What's interesting, this queue implementation is enough for use-cases where somewhat bad latency of starving workers is insignificant (because if the queue has items then .pop will immediately succeed in most cases). An example that I see is a test framework IF your individual tests are not trivial (i.e. take more than a microsecond).

Parallel Test Framework

Its interface is inspired by minitest but I'm not going to implement all features, so let's call it microtest.

First, we need a TestCase class with at least one assertion helper:

class Microtest::TestCase
  def assert_eq(lhs, rhs, message = 'assertion failed')
    if lhs != rhs
      raise "#{message}: #{lhs} != #{rhs}"
    end
  end
end

Then, there should be a hook that keeps track of all subclasses of our Microtest::TestCase class:

class Microtest::TestCase
  class << self
    def inherited(subclass)
      subclasses << subclass
    end

    def subclasses
      @subclasses ||= []
    end
  end
end

And finally we can write a helper to run an individual test method, measure time taken, record an error and track it on some imaginary report object:

class Microtest::TestCase
  class << self
    def now
      Process.clock_gettime(Process::CLOCK_MONOTONIC)
    end

    def measure
      start = now
      yield
      now - start
    end

    def run(method_name, report)
      instance = new
      time = measure { instance.send(method_name) }
      print "."
      report.passed!(self, method_name, time)
    rescue => err
      print "F"
      report.failed!(self, method_name, err)
    end
  end
end

No support for custom formatters, no setup/teardown hooks. We build a micro-framework.

Time to build a Report class. I'll paste it as a single snippet because it's completely unrelated to parallel execution:

class Microtest::Report
  attr_reader :passed, :failed

  def initialize
    @passed = []
    @failed = []
  end

  def passed!(klass, method_name, time)
    @passed << [klass, method_name, time]
  end

  def failed!(klass, method_name, err)
    @failed << [klass, method_name, err]
  end

  # Why do we need this? Because we'll merge the reports produced by multiple Ractors.
  def merge!(other)
    @passed += other.passed
    @failed += other.failed
  end

  def print
    puts "Passed: #{passed.count}"
    passed.each do |klass, method_name, time|
      puts "  - #{klass}##{method_name} (in #{time}ms)"
    end
    puts "Failed: #{failed.count}"
    failed.each do |klass, method_name, err|
      puts "  - #{klass}##{method_name}: #{err}"
    end
  end
end

The last part is spawning Ractors and pushing all test methods to a shared queue:

class Microtest::TestCase
  class << self
    def test_methods
      instance_methods.grep(/\Atest_/)
    end
  end
end

module Microtest
  QUEUE = CAtomics::QueueWithMutex.new(100)

  # yes, this is not portable, but it works on my machine
  CPU_COUNT = `cat /proc/cpuinfo | grep processor | wc -l`.to_i
  puts "CPU count: #{CPU_COUNT}"

  def self.run!
    # First, spawn worker per core
    workers = 1.upto(CPU_COUNT).map do |i|
      Ractor.new(name: "worker-#{i}") do
        # inside allocate a per-Ractor report
        report = Report.new

        # and just run every `pop`-ed [class, method_name] combination
        while (item = QUEUE.pop) do
          klass, method_name = item
          klass.run(method_name, report)
        end

        # at the end send back the report that we've accumulated
        Ractor.yield report
      end
    end

    # back to the main thread. push all tests to the queue
    Microtest::TestCase.subclasses.each do |klass|
      klass.test_methods.each do |method_name|
        QUEUE.push([klass, method_name])
      end
    end
    # push our stop-the-worker flag so that every workers that `pop`s it exits the loop
    CPU_COUNT.times { QUEUE.push(nil) }

    report = Report.new
    # merge reports
    workers.map(&:take).each do |subreport|
      report.merge!(subreport)
    end
    puts
    # and print it
    report.print
  end
end

This code is not very different from the one we had to test correctness of our queue implementation. One important change here is that nil is used as a special flag that stops the worker from looping. If we need to support passing nil through the queue we can introduce another unique object called EXIT similar to the UNDEFINED that we used to indicate the absence of the value at the moment.

How can we use this code?

require_relative './microtest'

def heavy_computation(ms)
  finish_at = now + ms / 1000.0
  counter = 0
  while now < finish_at
    1000.times { counter += 1 }
  end
end

class TestClassOne < Microtest::TestCase
  1.upto(20) do |i|
    class_eval <<~RUBY
      def test_#{i}
        heavy_computation(rand(1000) + 1000)
        assert_eq 1, 1
      end
    RUBY
  end
end

class TestClassTwo < Microtest::TestCase
  def test_that_fails
    heavy_computation(rand(1000) + 1000)
    assert_eq 1, 2
  end
end

Microtest.run!

This code defines two classes:

  1. TestClassOne that has 20 methods, each takes time between 1 and 2 seconds to pass.
  2. TestClassTwo that has a single method that also runs for up to 2 seconds and then fails

Here’s the output I get:

$ time ruby tests/parallel-tests.rb
CPU count: 12
.................F...
Passed: 20
  - TestClassOne#test_2 (in 1.8681494970005588ms)
  - TestClassOne#test_14 (in 1.326054810999267ms)
  - TestClassOne#test_20 (in 1.608019522000177ms)
  - TestClassOne#test_7 (in 1.2940692579995812ms)
  - TestClassOne#test_11 (in 1.1290194040002461ms)
  - TestClassOne#test_15 (in 1.9610371879998638ms)
  - TestClassOne#test_1 (in 1.0031792079998922ms)
  - TestClassOne#test_8 (in 1.6210197430000335ms)
  - TestClassOne#test_17 (in 1.5390436239995324ms)
  - TestClassOne#test_4 (in 1.5251295820007726ms)
  - TestClassOne#test_13 (in 1.5610484249991714ms)
  - TestClassOne#test_19 (in 1.5790689580007893ms)
  - TestClassOne#test_6 (in 1.0661311869998826ms)
  - TestClassOne#test_9 (in 1.5110340849996646ms)
  - TestClassOne#test_16 (in 1.21403959700001ms)
  - TestClassOne#test_5 (in 1.421094257999357ms)
  - TestClassOne#test_12 (in 1.7910449749997497ms)
  - TestClassOne#test_3 (in 1.1941248209996047ms)
  - TestClassOne#test_10 (in 1.7080213600002025ms)
  - TestClassOne#test_18 (in 1.9290160210002796ms)
Failed: 1
  - TestClassTwo#test_that_fails: assertion failed: 1 != 2

real    0m4.978s
user    0m31.265s
sys     0m0.026s

So as you can see it took only 5 seconds to run what would take 31 seconds in single-threaded mode and during its execution multiple (but not all) cores have been utilized.

SPOILER

In the next chapter we'll build a more advanced queue that doesn't acquire the Interpreter Lock and with it I get all cores used at 100%.

If I remove randomness from tests and change each test to take 2 seconds, I get these numbers:

QueueWithMutex:

real  0m6.171s
user  0m42.128s
sys   0m0.036s

vs ToBeDescribedSoonQueue:

real  0m4.173s
user  0m42.020s
sys   0m0.020s

Which is close to 10x speedup on my 8 cores + 4 threads. There might be a hard parallelism limit that is somehow impacted by GIL but I can't verify it. Note that our queue is large enough to hold all 20 tests + 12 nils, and so workers don't starve in this case. Also the tests take long enough to have no contention at all and so no looping-and-sleeping happens internally. It should utilize all cores, but for some reason it doesn't.

A Better Queue

To implement a "better" version of the queue we need to:

  1. get rid of the loop-until-succeeds logic in Ruby, in theory we can move it to Rust, but that would block the GC while we are looping in the pop method.
  2. to avoid it we must call our function with rb_thread_call_without_gvl and on top of that our pop method can't exclusively lock the data
  3. but it means that we'll have parallel access to our data structure by threads that push/pop and by the thread that runs GC (which is the main thread).

The latter sounds like something that can't be achieved because it's clearly a race condition. We want to have a data structure that:

  1. supports parallel non-blocking modification
  2. AND iteration by other thread in parallel (to mark each item in the queue)

And IF, just IF we make a mistake and don't mark a single object that is still in use then the whole VM crashes.

Here starts the fun part about lock-free data structures.

Lock-free data structures provides a guarantee that at least one thread is able to make a progress at any point in time.

There's also a term "wait-free data structures" that means that all threads can make progress and don't block each other, and that every operation requires a constant (potentially large but constant) number of steps to complete. In practice it's a rare beast and from what I know most of the time they are slower than lock-free alternative implementations (because they require threads to run cooperatively and "help each other").

A famous example of a lock-free data structure is a spinlock mutex:

struct Spinlock<T> {
    data: T,
    in_use: AtomicBool
}

impl<T> Spinlock<T> {
    fn new(data: T) -> Self {
        Self {
            data,
            in_use: AtomicBool::new(false)
        }
    }

    fn try_lock(&self) -> bool {
        self.in_use.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_ok()
    }

    fn lock(&self) {
        loop {
            if self.try_lock() {
                // swapped from "not in use" to "in use"
                return;
            }
        }
    }

    fn unlock(&self) {
        self.in_use.compare_exchange(true, false, Ordering::Release, Ordering::Relaxed)
    }
}

try_lock method is lock-free. It tries to compare-and-exchange value of in_use from false (not in use) to true (in use by current thread). If it succeeds true is returned.

To lock an object in a blocking manner we spin and keep trying to lock it. Once it succeeds we know that we own the object until we call unlock.

Unlocking is done by the thread that owns it, and so it's guaranteed still to be true; no looping is needed. Once we compare-exchange it from true to false we lose ownership and some other thread spinning in parallel can get access now.

This kind of locking is totally acceptable if you don't have high contention and if you lock for a short period of time. No syscall is needed and if you spin only a few times on average it should be faster than a syscall-based approach.

In the next chapter we'll use a lock-free, multi-producer, multi-consumer queue and then we'll wrap it with a somewhat efficient blocking interface.

Lock Free MPMC Queue

There's one important rule about lock-free data structures: don't write them yourself unless you absolutely know how to do that.

Lock-free data structures are very complex and if you make a mistake you may only find it on a different hardware, or when it's used with a different pattern.

Just use existing libraries, there's a plenty of them in C/C++/Rust worlds.

Here I'm porting this C++ implementation to Rust, mostly to show what happens inside. If I had a chance to use existing Rust package I'd do that without thinking even for a second.

#![allow(unused)]
fn main() {
// This is a wrapper of a single element of the queue
struct QueueElement {
    sequence: AtomicUsize,
    data: Cell<c_ulong>,
}
unsafe impl Send for QueueElement {}
unsafe impl Sync for QueueElement {}

struct MpmcQueue {
    buffer: Vec<QueueElement>,
    buffer_mask: usize,
    enqueue_pos: AtomicUsize,
    dequeue_pos: AtomicUsize,
}

impl MpmcQueue {
    fn alloc() -> Self {
        Self {
            buffer: vec![],
            buffer_mask: 0,
            enqueue_pos: AtomicUsize::new(0),
            dequeue_pos: AtomicUsize::new(0),
        }
    }

    fn init(&mut self, buffer_size: usize, default: c_ulong) {
        assert!(buffer_size >= 2);
        assert_eq!(buffer_size & (buffer_size - 1), 0);

        let mut buffer = Vec::with_capacity(buffer_size);
        for i in 0..buffer_size {
            buffer.push(QueueElement {
                sequence: AtomicUsize::new(i),
                data: Cell::new(default),
            });
        }

        self.buffer_mask = buffer_size - 1;
        self.buffer = buffer;
        self.enqueue_pos.store(0, Ordering::Relaxed);
        self.dequeue_pos.store(0, Ordering::Relaxed);
    }

    fn try_push(&self, data: c_ulong) -> bool {
        let mut cell;
        let mut pos = self.enqueue_pos.load(Ordering::Relaxed);
        loop {
            cell = &self.buffer[pos & self.buffer_mask];
            let seq = cell.sequence.load(Ordering::Acquire);
            let diff = seq as isize - pos as isize;
            if diff == 0 {
                if self
                    .enqueue_pos
                    .compare_exchange_weak(pos, pos + 1, Ordering::Relaxed, Ordering::Relaxed)
                    .is_ok()
                {
                    break;
                }
            } else if diff < 0 {
                return false;
            } else {
                pos = self.enqueue_pos.load(Ordering::Relaxed);
            }
        }
        cell.data.set(data);
        cell.sequence.store(pos + 1, Ordering::Release);
        true
    }

    fn try_pop(&self) -> Option<c_ulong> {
        let mut cell;
        let mut pos = self.dequeue_pos.load(Ordering::Relaxed);
        loop {
            cell = &self.buffer[pos & self.buffer_mask];
            let seq = cell.sequence.load(Ordering::Acquire);
            let diff = seq as isize - (pos + 1) as isize;
            if diff == 0 {
                if self
                    .dequeue_pos
                    .compare_exchange_weak(pos, pos + 1, Ordering::Relaxed, Ordering::Relaxed)
                    .is_ok()
                {
                    break;
                }
            } else if diff < 0 {
                return None;
            } else {
                pos = self.dequeue_pos.load(Ordering::Relaxed);
            }
        }

        let data = cell.data.get();
        cell.sequence
            .store(pos + self.buffer_mask + 1, Ordering::Release);

        Some(data)
    }
}
}

Here we have a struct that contains N elements and two atomic indexes. The first index for reading, the second is for writing. Basically it's an atomic version of the "ring buffer". When we push we shift "write" index to the right, when we pop we shift "read" index to the right. If any of these pointers overflows we reset it to 0 and start reading/writing from the beginning of the buffer.

On top of that, each cell of the queue has a field called sequence that is used to make sure that a push that we are trying to do in a loop happens in sync with bumping a "write" pointer (same for pop-ing).

Additionally, there's an assertion at the beginning of the constructor that only accepts buffer_size that is a power of two. Why is it needed? Well, buffer_mask that is derived from it is the answer.

Let's say our buffer_size is set to 8 (0b1000), then buffer_mask becomes 7 (0b111). If we use bit-and on a monotonically increasing number with this mask we'll get a sequence of numbers in 0-7 range that wraps on overflow. You can try it yourself in REPL by running 0.upto(50).map { |n| n & 0b111 } - this returns a cycling sequence from 0 to 7.

That's a clever trick to avoid checking for read/write pointer overflows.

Could I write this code from scratch just by myself? Definitely no. Use existing implementations.

Adding Blocking Interface

Now we need to write a function that tries to push (and pop) in a loop until it succeeds.

#![allow(unused)]
fn main() {
impl MpmcQueue {
    pub fn push(&self, data: c_ulong) {
        loop {
            if self.try_push(data) {
                return;
            }
        }
    }

    pub fn pop(&self) -> c_ulong {
        loop {
            if let Some(data) = self.try_pop() {
                return data;
            }
        }
    }
}
}

There's only one problem: busy-looping. We can't use the same approach with spinning that we had in a SpinLock.

"Busy-looping" means that our loop burns CPU while spinning. It's fine in some cases but if we write a queue for a web server then we definitely don't want it to burn all CPU cores if no requests are coming.

There are different solutions to avoid it (like FUTEX_WAIT sycall on Linux) but here we'll use POSIX semaphores. I haven't compared it to other solutions, so there's a chance that it's terribly slow. I have an excuse though: semaphores are relatively easy to understand.

Right now we need 3 functions:

  1. sem_init - initializes a semaphore object, in our case must be called as sem_init(ptr_to_sem_object, 0, initial_value) where 0 means "shared between threads of the current process, but not with other processes" (yes that's also supported but then semaphore must be located in shared memory).
  2. sem_post - increments the value of the semaphore by 1, wakes up threads that are waiting for this semaphore
  3. sem_wait - waits for a semaphore value to be greater than zero and atomically decrements its value. Goes to sleep if the value is zero.
  4. sem_destroy - self-explanatory

Here's a Rust wrapper for these APIs:

#![allow(unused)]
fn main() {
use libc::{sem_destroy, sem_init, sem_post, sem_t, sem_wait};

pub(crate) struct Semaphore {
    inner: *mut sem_t,
}

impl Semaphore {
    pub(crate) fn alloc() -> Self {
        unsafe { std::mem::zeroed() }
    }

    pub(crate) fn init(&mut self, initial: u32) {
        // sem_t is not movable, so it has to have a fixed address on the heap
        let ptr = Box::into_raw(Box::new(unsafe { std::mem::zeroed() }));

        let res = unsafe { sem_init(ptr, 0, initial) };
        if res != 0 {
            panic!(
                "bug: failed to create semaphore: {:?}",
                std::io::Error::last_os_error()
            )
        }

        self.inner = ptr;
    }

    pub(crate) fn post(&self) {
        let res = unsafe { sem_post(self.inner) };
        if res != 0 {
            panic!(
                "bug: failed to post to semaphore: {:?}",
                std::io::Error::last_os_error()
            )
        }
    }

    pub(crate) fn wait(&self) {
        let res = unsafe { sem_wait(self.inner) };
        if res != 0 {
            panic!(
                "bug: failed to wait for semaphore: {:?}",
                std::io::Error::last_os_error()
            )
        }
    }
}

impl Drop for Semaphore {
    fn drop(&mut self) {
        unsafe {
            sem_destroy(self.inner);
            drop(Box::from_raw(self.inner));
        }
    }
}

unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}
}

Now we can add two semaphores to our struct:

#![allow(unused)]
fn main() {
struct MpmcQueue {
    // ...

    // Semaphore for readers, equal to the number of elements that can be pop-ed
    read_sem: Semaphore,

    // Semaphore for writers, equal to the number of elements that can be push-ed
    // (i.e. a number of free slots in the queue)
    write_sem: Semaphore,
}

impl MpmcQueue {
    fn alloc() {
        MpmcQueue {
            // ...
            read_sem: Semaphore::alloc(),
            write_sem: Semaphore::alloc(),
        }
    }

    fn init(&mut self, buffer_size: usize, default: c_ulong) {
        // ...

        // Initially 0 elements can be pop-ed
        self.read_sem.init(0);

        // And `buffer_size` elements can be pushed
        self.write_sem.init(buffer_size as u32);
    }

    fn try_push(&self, data: c_ulong) -> bool {
        // ...

        // Wake up one waiting reader, there's at least one element in the queue
        self.read_sem.post();
        true
    }

    fn try_pop(&self) -> Option<c_ulong> {
        // ...

        // Wake up one waiting writer, there's at least one empty slot
        self.write_sem.post();
        Some(data)
    }
}
}

And finally we can add .push and .pop methods that go to sleep if they can't proceed:

#![allow(unused)]
fn main() {
pub fn push(&self, data: c_ulong) {
    loop {
        if self.try_push(data) {
            return;
        }
        self.write_sem.wait();
    }
}

pub fn pop(&self) -> c_ulong {
    loop {
        if let Some(data) = self.try_pop() {
            return data;
        }
        self.read_sem.wait();
    }
}
}

Now if you call .push on an full queue it doesn't burn CPU, same with calling .pop on an empty queue.

Marking

Here comes the tricky part. We do want to call our push and pop functions using rb_thread_call_without_gvl that doesn't acquire an Interpreter Lock and lets GC run in parallel.

What if one thread pushes to the queue the moment GC has finished iterating over it? Well, then it's going to be collected and then Ruby VM will crash really soon once we pop this item from the queue and do something with it (that would be an equivalent of "use-after-free" in languages with manual memory management).

I'm going to go with a non-standard approach here that will probably work with other kinds of containers as well. It looks similar to what's called "quiescent state tracking" (at least in some sources). Briefly:

  1. every time we try to .pop we register ourselves as a "consumer". It will be an atomic counter that is incemented before the modification of the queue and decremented after.
  2. before starting to .pop each consumer must make sure that a special atomic boolean flag is not set, and if it's set it must wait, busy-looping is fine here.
  3. when marking starts we
    1. enable this flag in order to put other consumers (that are about to start) on "pause"
    2. wait for "consumers" counter to reach 0.
  4. at this point we know that no other threads try to mutate our container (existing consumers have finished and no new consumers can start because of the boolean flag), so it's safe to iterate it and call mark on each element
  5. finally, we set flag back to false and unlock other threads
#![allow(unused)]
fn main() {
struct GcGuard {
    // boolean flag
    locked: AtomicBool,
    // number of active consumers
    count: AtomicUsize,
}
}

Initialization is simple, flag is false and counter is 0.

#![allow(unused)]
fn main() {
impl GcGuard {
    pub(crate) fn alloc() -> Self {
        GcGuard {
            locked: AtomicBool::new(false),
            count: AtomicUsize::new(0),
        }
    }

    pub(crate) fn init(&mut self) {
        self.locked.store(false, Ordering::Relaxed);
        self.count.store(0, Ordering::Relaxed);
    }
}
}

Then we need helpers to track and modify the counter:

#![allow(unused)]
fn main() {
impl GcGuard {
    // must be called by every consumer before accessing the data
    fn add_consumer(&self) {
        self.count.fetch_add(1, Ordering::SeqCst);
    }
    // must be called by every consumer after accessing the data
    fn remove_consumer(&self) {
        self.count.fetch_sub(1, Ordering::SeqCst);
    }
    // a method that will be used by "mark" function to wait
    // for the counter to reach zero
    fn wait_for_no_consumers(&self) {
        loop {
            let count = self.count.load(Ordering::SeqCst);
            if count == 0 {
                eprintln!("[producer] 0 running consumers");
                break;
            } else {
                // spin until they are done
                eprintln!("[producer] waiting for {count} consumers to finish");
            }
        }
    }
}
}

The code in this section uses SeqCst but I'm pretty sure Acquire/Release and Relaxed are enough in all cases. I'm intentionally omitting it here for the sake of simplicity.

We can also add helpers for the flag:

#![allow(unused)]
fn main() {
impl GcGuard {
    // must be invoked at the beginning of the "mark" function
    fn lock(&self) {
        self.locked.store(true, Ordering::SeqCst);
    }
    // must be invoked at the end of the "mark" function
    fn unlock(&self) {
        self.locked.store(false, Ordering::SeqCst)
    }
    fn is_locked(&self) -> bool {
        self.locked.load(Ordering::SeqCst)
    }
    // must be invoked by consumers if they see that it's locked
    fn wait_until_unlocked(&self) {
        while self.is_locked() {
            // spin
        }
    }
}
}

And finally we can write some high-level functions that are called by consumers and the "mark" function:

#![allow(unused)]
fn main() {
impl GcGuard {
    pub(crate) fn acquire_as_gc<F, T>(&self, f: F) -> T
    where
        F: FnOnce() -> T,
    {
        eprintln!("Locking consumers");
        self.lock();
        eprintln!("Waiting for consumers to finish");
        self.wait_for_no_consumers();
        eprintln!("All consumers have finished");
        let out = f();
        eprintln!("Unlocking consumers");
        self.unlock();
        out
    }

    pub(crate) fn acquire_as_consumer<F, T>(&self, f: F) -> T
    where
        F: FnOnce() -> T,
    {
        if self.is_locked() {
            self.wait_until_unlocked();
        }
        self.add_consumer();
        let out = f();
        self.remove_consumer();
        out
    }
}
}

Both take a function as a callback and call it when it's time.

This pattern definitely can be implemented by returning GuardAsGc and GuardAsConsumer objects that do unlocking in their destructors, like it's usually implementation in all languages with RAII.

Now we can change our MpmcQueue to embed and utilize this code:

#![allow(unused)]
fn main() {
struct MpmcQueue {
    // ...
    gc_guard: GcGuard
}

impl MpmcQueue {
    fn alloc() -> Self {
        Self {
            // ...
            gc_guard: GcGuard::alloc(),
        }
    }

    fn init(&mut self, buffer_size: usize, default: c_ulong) {
        // ...
        self.gc_guard.init();
    }

    pub fn pop(&self) -> c_ulong {
        loop {
            // Here's the difference, we wrap `try_pop` with the consumer's lock
            if let Some(data) = self.gc_guard.acquire_as_consumer(|| self.try_pop()) {
                return data;
            }
            self.read_sem.wait();
        }
    }

    // And to mark an object...
    fn mark(&self, mark: extern "C" fn(c_ulong)) {
        // ... we first lock it to prevent concurrent modification
        self.gc_guard.acquire_as_gc(|| {
            // ... and once it's not in use we simply iterate and mark each element
            for item in self.buffer.iter() {
                let value = item.data.get();
                mark(item);
            }
        });
    }
}
}

We can even write a relatively simple Rust program to see how it works.

  1. The code in GcGuard prints with eprintln that writes to non-buffered stderr so the output should be readable.
  2. The program spawns 10 threads that try to .pop from the queue
  3. The main thread spins in a loop that
    1. pushes monotonically increasing numbers to the queue for 1 second
    2. acquires a GC lock
    3. sleeps for 1 second
    4. releases a GC lock
  4. At the end we get all values that have been popped and merges them to a single array and then sorts it. In this array each pair of consecutive elements must look like N -> N + 1 and the last element must be equal to the last value that we pushed (i.e. it's a series from 1 to last_pushed_value)

In other words, that's a simplified emulation of how GC works. Its output however shows us that it does what we planned:

[ThreadId(9)] popped 509
[ThreadId(3)] popped 513
[ThreadId(7)] popped 515
Locking consumers
[ThreadId(5)] popped 517
[ThreadId(8)] popped 516
Waiting for consumers to finish
[producer] waiting for 8 consumers to finish
[producer] waiting for 7 consumers to finish
[producer] waiting for 6 consumers to finish
[ThreadId(10)] popped 519
[ThreadId(4)] popped 520
[producer] waiting for 6 consumers to finish
[producer] waiting for 5 consumers to finish
[ThreadId(11)] popped 518
[producer] waiting for 5 consumers to finish
[producer] waiting for 4 consumers to finish
[ThreadId(6)] popped 522
[producer] waiting for 3 consumers to finish
[ThreadId(9)] popped 523
[ThreadId(3)] popped 524
[producer] waiting for 2 consumers to finish
[producer] waiting for 1 consumers to finish
[ThreadId(2)] popped 521
[producer] waiting for 1 consumers to finish
[ThreadId(7)] popped 525
[producer] 0 running consumers
All consumers have finished
===== GC START ======
===== GC END ========
Unlocking consumers
[ThreadId(7)] popped 528
[ThreadId(4)] popped 534
[ThreadId(3)] popped 532
[ThreadId(11)] popped 529

That's exactly what we wanted:

  1. first, we lock to prevent new consumers
  2. existing consumers however must finish their job
  3. the total number of active consumers goes down and once it reaches 0 we mark the queue
  4. then we unlock it and let all consumer threads continue

Writing a Web Server

This is our destination point. We'll try to make a server that:

  1. spawns a Ractor per core
  2. starts a TCP server loop in the main thread
  3. uses a shared queue to send incoming requests from the main thread to workers
  4. parses a request in the worker, does some trivial routing and calls a request handler
  5. uses a pool of "dummy" DB connections
  6. writes a dynamic response back

First, we need a queue and a connection pool:

QUEUE = CAtomics::MpmcQueue.new(16)

class DummyConnection
  def initialize(conn_id)
    @conn_id = conn_id
  end

  def read_data(id)
    {
      loaded_using_conn_id: @conn_id,
      id: id,
      name: "Record #{id}"
    }
  end
end

connections = 1.upto(16).map { |conn_id| DummyConnection.new(conn_id) }
DB_CONNECTION_POOL = CAtomics::FixedSizeObjectPool.new(16, 1_000) { connections.shift }
  1. Queue's capacity is 16
  2. Connection pool also consists of 16 dummy objects that simulate what a connection would do under the hood. You give it an input (id in our case) and it returns dynamic data based on it. Plus, it embeds an ID of the connection.
  3. Connection pool has a 1s timeout, so if the pool is empty for more than 1 second it'll throw a timeout error.

Then we can start our workers:

def log(s)
  $stderr.puts "[#{Ractor.current.name}] #{s}"
end

workers = 1.upto(CPU_COUNT).map do |i|
  puts "Starting worker-#{i}..."

  Ractor.new(name: "worker-#{i}") do
    while (conn = QUEUE.pop) do
      process_request(conn)
    end
    log "exiting..."
    Ractor.yield :done
  rescue Exception => e
    log e.class.name + " " + e.message + " " + e.backtrace.join("\n    ")
    Ractor.yield :crashed
  end
end

We'll use nil as a special terminating object that stops a Ractor from polling the queue.

Then we can add a signal handler for graceful shutdown:

trap("SIGINT") do
  puts "Exiting..."
  CPU_COUNT.times { QUEUE.push(nil) }
  p workers.map(&:take)
  exit(0)
end

This handler pushes nil for each running Ractor which lets them process what's already in the queue but after that they'll stop.

And finally we can start our TCP server:

server = Socket.tcp_server_loop(8080) do |conn, addr|
  # Got incoming connection, forwarding it to a worker...
  QUEUE.push(conn)
end

The only missing part is the process_request(conn) method:

def process_request(conn)
  body = read_body(conn)
  http_method, path, protocol, headers, body = parse_body(body)

  log "#{http_method} #{path}"

  case [http_method, path]
  in ["GET", "/slow"]
    heavy_computation(100)
    reply(conn, 200, {}, "the endpoint is slow (100ms)")
  in ["GET", "/fast"]
    reply(conn, 200, {}, "yes, it's fast")
  in ["GET", /^\/dynamic\/(?<id>\d+)$/]
    id = Regexp.last_match[:id].to_i
    data = DB_CONNECTION_POOL.with { |db| db.read_data(id) }
    reply(conn, 200, {}, data.to_json)
  else
    reply(conn, 404, {}, "Unknown path #{path}")
  end
rescue Exception => e
  log e.class.name + " " + e.message + " " + e.backtrace.join("\n    ")

  reply(conn, 500, {}, "Internal server error")
ensure
  conn.close
end

It doesn't really matter how we read and parse request body, but if you are curious feel free to take a look at the full example. In short, I'm reading from the socket with read_nonblock until there's nothing to read and then there's a dummy parser that can only handle HTTP 1 text-based format.

We could re-use an existing library like webrick but I'm not sure if they can be called from non-main Ractors.

This server has 3 endpoints:

  1. /slow - takes 100ms to execute, during all this time it does CPU-only work
  2. /fast - replies immediately with a static payload
  3. /dynamic/:id - "loads" the data from our fake database and returns dynamic response

It's absolutely OK it if looks ugly, I made it simple to take as few space as possible. Things like database connection that we've got from the pool can be easily placed in Ractor.current[:database] to make it globally accessible within the scope of request (so User.find(<id>) from ActiveRecord can still exist in this world).

When we run our script we get the following output:

$ ruby tests/web-server.rb
CPU count: 12
Starting worker-1...
Starting worker-2...
Starting worker-3...
Starting worker-4...
Starting worker-5...
Starting worker-6...
Starting worker-7...
Starting worker-8...
Starting worker-9...
Starting worker-10...
Starting worker-11...
Starting worker-12...
Starting server...

And each endpoint also works fine.

fast

$ curl http://localhost:8080/fast
yes, it's fast

# => [worker-6] GET /fast

slow

$ curl http://localhost:8080/slow
the endpoint is slow (100ms)

# => [worker-1] GET /slow

dynamic

$ curl http://localhost:8080/dynamic/42
{"loaded_using_conn_id":1,"id":42,"name":"Record 42"}
$ curl http://localhost:8080/dynamic/17
{"loaded_using_conn_id":2,"id":17,"name":"Record 17"}

# => [worker-4] GET /dynamic/42
# => [worker-7] GET /dynamic/17

a bit of stress testing

Let's see if it survives if we send more requests (each takes 100ms of pure computations):

$ ab -c12 -n1000 http://localhost:8080/slow
// ...
Completed 900 requests
Completed 1000 requests

Concurrency Level:      12
Time taken for tests:   8.536 seconds
Complete requests:      1000
Failed requests:        0
Total transferred:      67000 bytes
HTML transferred:       28000 bytes
Requests per second:    117.16 [#/sec] (mean)
Time per request:       102.427 [ms] (mean)
Time per request:       8.536 [ms] (mean, across all concurrent requests)
Transfer rate:          7.67 [Kbytes/sec] received

and meawhile we get a nice picture in Htop:

htop

Once ab is done the process goes back to idle:

htop_idle

Conclusion

  1. It's definitely possible to write true multi-threaded apps in Ruby with global mutable state
  2. Existing primitives are not enough but nothing stops you from bringing your own
  3. This kind of code can be "kindly borrowed" from other languages and packaged as independent Ruby libraries
  4. Writing concurrent data structures is hard but it doesn't have to be repeated in every app, we can reuse code
  5. I hope that much of what's been explained in this article is applicable not only to Ruby, but to other languages with limited conccurency primitives (e.g. Python/JavaScript)