GCD Primitives in Depth: Serial Queue

Alex Shchukin
12 min readMay 5, 2023

--

In the previous article, we implemented DispatchSemaphore and DispatchGroup ourselves. Today, we will develop a simplified version of DispatchQueue, called SerialQueue. This solution focuses on two basic methods: sync and async. The sync method executes a task on the calling thread and awaits its completion, whereas the async method carries out the task on a background thread (though not always, but we will simplify this aspect) without blocking the calling thread. For a more comprehensive understanding of queues, please check the following link.

Unlike GCD’s DispatchQueue, we will make a separate class called SerialQueue, which will only contain the serial logic, excluding any concurrent components. In the code snippet provided below, we delineate the essential variables:

  • thread: Provides the execution environment where the async task will be performed.
  • condition: Organized communication between the sync and async methods.
  • mutex: Ensures thread safety by preventing simultaneous access to shared resources.
  • stop: Terminates the while loop in the background thread, concluding the thread's execution.
final class SerialQueue {
private var thread: pthread_t?
private var condition = pthread_cond_t()
private var mutex = pthread_mutex_t()

// Used to stop executing background thread
private var stop = false

init() {
let weakSelfPointer = Unmanaged.passUnretained(self).toOpaque()

_ = pthread_create(&thread, nil, { (pointer: UnsafeMutableRawPointer) in
let weakSelf = Unmanaged<SerialQueue>.fromOpaque(pointer).takeRetainedValue()
weakSelf.runThread()
pthread_exit(nil)
}, weakSelfPointer)

pthread_cond_init(&condition, nil)
pthread_mutex_init(&mutex, nil)
}
}

The initialization section contains standard setup functions such as pthread_cond_init and pthread_mutex_init. It also does the creation of a background thread through pthread_create. However, we cannot directly call self.runThread() from the closure passed as a parameter to pthread_create, as the compiler would identify the closure as a pointer to a C-function and return an error. To address this issue, we will create an Unmanaged pointer to self, which will be passed as a parameter to pthread_create.

Within the closure, we use Unmanaged<SerialQueue> to obtain a typed reference to self. The passUnretained and takeRetainedValue methods are used to manage the retain counter, ensuring a weak reference to self in this context. This implies that we do not want to increase the reference counter. Balancing the retain counter can be challenging, so using CFGetRetainCount (during debugging) is an effective approach to make adjustments. Finally, we call pthread_exit to terminate the thread, which will be executed when runThread completes its operation, in our case duringdeinit.

The underlying principle of the serial queue implementation is based on the notion that all async tasks will be executed on the background thread (the one we just created), while sync tasks will be carried out on the calling thread. The mechanism for scheduling these tasks is through the using the condition variable.

The subsequent component to develop is the Task, which represents the actual task to be executed. It comprises the async or sync type, a closure containing the logic to be executed within the queue, and a threadId. The meaning of the threadId will be elaborated later in the discussion.

fileprivate final class Task {

fileprivate enum TaskType: String {
case sync
case async
}

let type: TaskType
let execute: (() -> ())
let threadId: UInt32

init(type: TaskType, threadId: UInt32, execute: @escaping (() -> ())) {
self.type = type
self.execute = execute
self.threadId = threadId
}
}

To maintain the order of tasks, we introduce a queue that follows to FIFO principle. Another critical aspect of this queue is its thread safety, as it will be extensively used in a multithreaded environment. We will not delve too deeply into the implementation of the queue itself, as it is pretty straightforward.

fileprivate final class Queue {
private var elements: [Task] = []
private var mutex = pthread_mutex_t()

init() {
pthread_mutex_init(&mutex, nil)
}

func enqueue(element: Task) {
pthread_mutex_lock(&mutex)
elements.append(element)
pthread_mutex_unlock(&mutex)
}

func dequeue() -> Task? {
pthread_mutex_lock(&mutex)
defer {
pthread_mutex_unlock(&mutex)
}

guard !elements.isEmpty else { return nil }

return elements.removeFirst()
}

var isEmpty: Bool {
pthread_mutex_lock(&mutex)
defer {
pthread_mutex_unlock(&mutex)
}
return elements.isEmpty
}
}

So we will add the queue as a private variable to our SerialQueue implementation:

// Used to make an order for sync and async tasks
private let queue = Queue()

Now we have all the primitives we need so we can start to design the methods. Since async is much easier to implement we will start with it. It gets the threadId of the calling thread and adds the task with async type to the task queue and calls pthread_cond_broadcast to wake the background thread. Again want to emphasize that threadId will be used later on when we implement sync method.

func async(task: @escaping () -> ()) {
let threadId = pthread_mach_thread_np(pthread_self())
queue.enqueue(element: Task(type: .async, threadId: threadId, execute: task))
pthread_cond_broadcast(&condition)
}

There is a valid reason for not using pthread_cond_signal in this context. The main issue with pthread_cond_signal is its lack of guarantee as to which pthread_cond_wait it will awaken. The scheduling policy ultimately determines which thread will be unblocked first. By calling pthread_getschedparam, we can obtain the scheduling policy for the thread (which is often SCHED_OTHER by default in many systems), with the order being based on a priority determined by a parameter called the nice value. Instead of going deep into this direction, we will provide a more universal solution that remains independent of the calling order. This is achieved through the usage of pthread_cond_broadcast, which unblocks all wait methods using the same condition. Also, we introduce a while loop surrounding pthread_cond_wait to prevent the continuation of execution for all waiting tasks, except for the one that satisfies the condition within the loop.

Once the async task has been added to the task queue, we need to execute tasks from the queue on the background thread. To achieve this, we create a while loop that runs indefinitely until the stop flag is set. Within this infinite loop, there is another loop that retrieves tasks from the task queue until it gets empty.

This inner task loop contains a condition that verifies the type of task; if it’s an async task, it will be executed on the background thread. We will discuss the sync component in more detail shortly. Following the task’s execution, another while loop continually calls pthread_cond_wait if the task queue is empty and the stop flag is unset. This particular scenario is the one we previously discussed, which prevents the thread from being unblocked since we use pthread_cond_broadcast throughout our implementation.

private func runThread() {
while(!stop) {
while let task = queue.dequeue() {
if task.type == .sync {
// TODO Implement
} else {
task.execute()
}
}

// Until the task queue is empty put on wait
while(queue.isEmpty && !stop) {
pthread_cond_wait(&condition, &mutex)
}
}
}

To get a better grasp of the async method’s algorithm there is a schema that depicts its process of working on both the calling thread and the background thread assuming there are no tasks executing at the moment:

Async method

The implementation is not overly complex when dealing with only async tasks in the serial queue. However, integrating both sync and async tasks introduces additional complexity to the overall logic. Let us now proceed with the sync method. Prior to diving into its implementation, we must first introduce another container that will be used within the sync method. This container, called SynchronizedDictionary, is a simple thread-safe wrapper around a Dictionary. It offers two primary operations: set, which adds a value to the dictionary, and value, which returns a boolean value based on a UInt32 key.

fileprivate final class SynchronizedDictionary {
private var storage: [UInt32: Bool] = [:]
private var mutex = pthread_mutex_t()

init() {
pthread_mutex_init(&mutex, nil)
}

func add(value: Bool, key: UInt32) {
pthread_mutex_lock(&mutex)
storage[key] = value
pthread_mutex_unlock(&mutex)
}

func value(key: UInt32) -> Bool? {
pthread_mutex_lock(&mutex)
defer {
pthread_mutex_unlock(&mutex)
}
return storage[key]
}
}

Now we can add SynchronizedDictionary as a field to SerialQueue:

// Used to store sync execution status per thread
private var syncExecutionStates = SynchronizedDictionary()

With all the components prepared, we are now ready to implement the sync method. First, we get the calling thread's ID (which will be used later on in the background thread), create the sync task, and add it to the tasks queue. The next step involves setting the value to false in the SynchronizedDictionary for the given thread ID, meaning that the sync task is not currently executing. This process is a crucial component of the scheduling mechanism, as it ensures the proper serial execution of both sync and async tasks.

After that, we lock the execution and notify the background thread if it was set to wait. Next, we place the calling thread on hold until the background thread has finished executing the other tasks in the task queue. Once the task execution is complete, we mark the calling thread as non-executing using syncExecutionStates. Then we notify the background thread that the sync execution has finished and unlock the calling thread.

func sync(task: @escaping () -> ()) {
// Storing sync task with dedicated thread id into thread safe queue for the following execution
let threadId = pthread_mach_thread_np(pthread_self())
queue.enqueue(element: Task(type: .sync, threadId: threadId, execute: task))

// Mark the task is NOT executing yet for the calling thread (threadId)
syncExecutionStates.set(value: false, key: threadId)

pthread_mutex_lock(&mutex)
// Unblock the queue thread if it doesn’t execute any task
pthread_cond_broadcast(&condition)

// Put on wait current thread if the queue thread is executing the other task
while(syncExecutionStates.value(key: threadId) == false) {
pthread_cond_wait(&condition, &mutex)
}
// Execute the task on the calling thread
task()

// Mark the task is NOT executing for the calling thread (threadId)
syncExecutionStates.set(value: false, key: threadId)
pthread_cond_broadcast(&condition)
pthread_mutex_unlock(&mutex)
}

We intentionally place the first part of the sync method outside of the lock for a reason. To better understand this, let's consider an example: multiple calls of sync and async are made from different threads. If we were to place pthread_mutex_lock at the beginning of the sync method, it would lock the method, causing other sync calls to wait until it gets unlocked. At the same time, an async call may occur. Since the async method does not use locks, it would immediately add the task to the task queue before the sync methods that were called earlier. This would break the order of execution, so we must be sure that the task is added to the task queue immediately after the method gets called.

The same reason applies to syncExecutionStates, which needs to be placed outside the lock because it may block the calling thread. If the background thread attempts to execute the associated sync task while the calling thread is blocked, the calling thread will miss the broadcast call and the order of execution will be compromised.

The first part of the sync logic is done, and now we will switch to the remaining part that takes place within the background thread marked with TODO. If the task type is sync, we need to notify the associated sync method that the task is ready to begin execution. In order to achieve this, we pass the threadId as a parameter when adding the task to the task queue. By using syncExecutionStates, we can determine that a task with the specified threadId is currently executing.

Next, we call pthread_cond_broadcast to wake the sync method, allowing it to execute the task on the associated calling thread. Finally, we have to wait for the task execution to complete using pthread_cond_wait.

private func runThread() {
while(!stop) {
while let task = queue.dequeue() {
if task.type == .sync {
// Mark the task is executing for the thread id
syncExecutionStates.set(value: true, key: task.threadId)
pthread_cond_broadcast(&condition)

// Lock the queue thread while the sync task is executing on the calling thread
while(syncExecutionStates.value(key: task.threadId) == true) {
pthread_cond_wait(&condition, &mutex)
}
continue
} else {
task.execute()
}
}

// Until the task queue is empty put on wait
while(queue.isEmpty && !stop) {
pthread_cond_wait(&condition, &mutex)
}
}
}

The SynchronizedDictionary determines the calling thread's ID that must be executed within the sync method. For instance, imagine multiple threads calling the sync method of the same SerialQueue instance. When the background thread retrieves a sync task, it contains the thread's ID, which allows the background thread to identify the specific thread that has called the sync task.

Using the SynchronizedDictionary, we can target the appropriate thread (by setting its threadId value to true). When the target thread wakes up after calling pthread_cond_broadcast, execution will only proceed for the marked thread. Meanwhile, the other threads will go through the while loop and return to a waiting state.

The following diagram illustrates the algorithm of the sync method, depicting the shared logic between the calling thread and the background thread under the assumption that the tasks queue is empty:

Sync method

Indeed, the sync and async methods are designed to work together as a cohesive mechanism, with one method affecting the other. This interconnection is precisely why we extensively use the condition variable in our implementation. The condition variable enables the proper coordination and synchronization between the calling and background threads, ensuring that tasks are executed in a serialized manner.

Now we need to implement deallocation logic for SerialQueue which is not trivial either. First, we set the stop flag to true, which will terminate the main while loop in the background thread within the runThread method. Next, if the background thread is not currently executing any tasks, we wake it using pthread_cond_broadcast. That ensures that the background thread can properly exit the while loop. Then we call pthread_join to make the calling thread wait until the background thread completes its execution. This step is crucial in avoiding crashes that may result from accessing deallocated memory. Once the background thread has finished executing, we proceed to deallocate the condition and mutex variables.

deinit {
// Set stop flag to finish while loop in the background thread
stop = true

// Wake background thread if it was put on wait
pthread_cond_broadcast(&condition)

// Wait until background thread finishes its execution
if let thread = thread {
pthread_join(thread, nil)
}

pthread_cond_destroy(&condition)
pthread_mutex_destroy(&mutex)
}

Here is a schema representing how the deinit logic work:

# Tests

We now have implemented all the pieces of the comprehensive solution. Although it may seem complex, especially when addressing numerous corner cases and sometimes catching them becomes almost impossible. Gladly there are techniques and tools to help tackle these challenges. Unit testing is one such technique. In terms of the best practice for developing software, unit tests take the key role. They play an essential role in ensuring the code’s correctness and stability. While there are various approaches to writing tests, this article will focus on applying unit tests specifically to SerialQueue.

When working with asynchronous code, we often encounter floating errors that don’t occur during every execution. To help address these errors, Xcode offers a special feature called repetitive run. This feature allows you to run the test multiple times in a row, increasing the likelihood of encountering and identifying any sporadic issues. To initiate a repetitive run, simply right-click on the test and select this option from the context menu:

After that, we can specify the number of repetitions, termination conditions, and other parameters:

In my efforts to cover most of the behavioral scenarios for SerialQueue, I wrote various unit tests. While we will only examine one such test in this article, you can explore the other tests by following the link. I encourage you to share any potential corner cases you discover in the comments below.

In the test below, we use expectation multiple times, which is another tool that aids in testing asynchronous code. It operates similarly to semaphores and conditions. By using the wait function, we block the calling thread until all expectations signal completion using fulfill. As you can see, some methods are called from the main thread, while others are from the global queue. There is a result array that is expected to contain the ordered sequence [1, 2, 3, 4, 5] since we are working with a serial queue, the order should be guaranteed by the call sequence. We also choose not to lock the result since only one thread can access it at a time.

func testFromMainAndBackgroundThreads() {
let serialQueue = SerialQueue()
var result: [Int] = []

let expectation1 = expectation(description: “test1”)
serialQueue.async {
sleep(1)
result.append(1)
expectation1.fulfill()
}

let expectation2 = expectation(description: “test2”)
serialQueue.async {
sleep(1)
result.append(2)
expectation2.fulfill()
}

let expectation3 = expectation(description: “test3”)
DispatchQueue.global().asyncAfter(deadline: .now() + 0.3) {
serialQueue.sync {
sleep(1)
result.append(3)
expectation3.fulfill()
}
}

let expectation4 = expectation(description: “test4”)
DispatchQueue.global().asyncAfter(deadline: .now() + 0.4) {
serialQueue.async {
sleep(1)
result.append(4)
expectation4.fulfill()
}
}

let expectation5 = expectation(description: “test5”)
DispatchQueue.global().asyncAfter(deadline: .now() + 0.5) {
serialQueue.sync {
sleep(1)
result.append(5)
expectation5.fulfill()
}
}

wait(for: [expectation1, expectation2, expectation3, expectation4, expectation5], timeout: 10.0)
XCTAssertEqual([1, 2, 3, 4, 5], result)
}

In this article, we’ve implemented a simplified version of the serial queue, which is widely used in iOS applications, to get a deeper understanding of its functionality. It is important to know that the original queue implementation uses lock-free primitives, which may provide performance advantages. However, we chose not to use them here to avoid overcomplicating the article, as even with locks, the explanation was not very easy. Additionally, please remember not to use this implementation in production, as it was created solely for educational purposes and may be unstable in certain corner cases compared to the GCD version.

To see the full implementation feel free to visit the GitHub repository.

Check my Twitter to get the newest updates.

--

--

No responses yet