GCD Primitives in Depth: Semaphore and Group
In this article, we will implement some of the GCD classes using low-level primitives to understand how GCD is actually functioning.
Semaphore
We’re going to begin with DispatchSemaphore, although what we’ll be implementing is a simplified version. The real DispatchSemaphore has a much more specific set of functionalities. To refresh your knowledge about DispatchSempahore
you can check the article about synchronization.
Now we can start to implement the Semaphore. There is an initialization section in the code snippet below:
final class Semaphore {
private var mutex = pthread_mutex_t()
private var condition = pthread_cond_t()
private var counter = 0
private var maxCount = 0
private let queue: Queue = Queue()
// maxCount - max thread amount waiting for signal
init(maxCount: Int = 0) {
pthread_mutex_init(&mutex, nil)
pthread_cond_init(&condition, nil)
self.maxCount = maxCount
}
deinit {
pthread_mutex_destroy(&mutex)
pthread_cond_destroy(&condition)
}
}
In this section, we use pthread_mutex_t
and pthread_cond_t
, types from the POSIX thread library, written in C. pthread_mutex_t
serves as a lock, allowing us to ensure thread-safe access to resources, such as counter
. pthread_cond_t
can block the calling thread via pthread_cond_wait
and resume it using pthread_cond_signal
.
To initialize these types, we employ pthread_mutex_init
and pthread_cond_init
respectively. It’s important to note of deinitialization: these primitives aren’t self-destructing. You must manually terminate them using pthread_mutex_destroy
or pthread_cond_destroy
.
Important to know that, pthread_cond_signal
can unblock at least one thread if there are several threads blocked by pthread_cond_wait
and the scheduling policy shall determine the order of the unblocking. In macOS by default, the order is determined by the priority which doesn’t correspond with the semaphore logic. To bypass this limitation we will employ pthread_cond_broadcast
which unblocks all pthread_cond_wait
used on the condition. And to provide sequential order we introduce FIFO Queue
which we will use later on in the wait
method. There is an implementation of Queue
in the code snippet below:
fileprivate final class Queue {
private var elements: [Int] = []
func enqueue(element: Int) {
elements.append(element)
}
func dequeue() -> Int? {
guard !elements.isEmpty else { return nil }
return elements.removeFirst()
}
func peek() -> Int? {
guard !elements.isEmpty else { return nil }
return elements.first
}
var isEmpty: Bool {
return elements.isEmpty
}
}
Let’s start with the simple implementation of the method wait
. It blocks the calling thread using condition
until it gets the signal.
func wait() {
pthread_mutex_lock(&self.mutex)
pthread_cond_wait(&self.condition, &self.mutex)
pthread_mutex_unlock(&self.mutex)
}
That will work with one thread solution but we need to make it works with multiple threads. Let’s improve wait
using maxCount
variable. Whenever counter
is bigger than maxCount
it calls pthread_cond_wait
. So counter
regulates the throughput of Semaphore
. It means that the larger counter
the more threads can execute wait
simultaneously.
func wait() {
pthread_mutex_lock(&self.mutex)
counter += 1
while(self.counter > self.maxCount) {
pthread_cond_wait(&self.condition, &self.mutex)
}
pthread_mutex_unlock(&self.mutex)
}
Recall that pthread_cond_signal
does not ensure the order in which threads are awakened. To resolve this, we will use pthread_cond_broadcast
. This function wakes all threads that have been blocked by pthread_cond_wait
. However, we desire to put only the earliest thread in the awake state and return the others to a waiting state. To accomplish this, we must keep track of the order in which the wait calls are made.
Here, we introduce a local variable tmpCounter
to keep track of the order. Being local, tmpCounter
will have a unique value for each wait call. Then we enqueue this counter into a Queue. If the value of the counter doesn’t match tmpCounter
we put the thread on wait again. This ensures that we manage the concurrency of our threads in the FIFO order.
func wait() {
pthread_mutex_lock(&self.mutex)
counter += 1
let tmpCounter = counter
while(self.counter > self.maxCount && self.queue.peek() != tmpCounter) {
queue.enqueue(element: tmpCounter)
pthread_cond_wait(&self.condition, &self.mutex)
}
pthread_mutex_unlock(&self.mutex)
}
Now we need to implement the signal
function. It decreases the counter we use to keep the semaphore waiting in the while loop and broadcast to all waits.
func signal() {
pthread_mutex_lock(&mutex)
counter -= 1
pthread_cond_broadcast(&condition)
pthread_mutex_unlock(&mutex)
}
Let’s test the semaphore to check how it works. In the example below the calling thread gets blocked by the method wait
for 5 seconds until the global queue calls the signal
method:
let semaphore = Semaphore(maxCount: 0)
print(“Start”)
DispatchQueue.global().async {
sleep(5)
semaphore.signal()
}
semaphore.wait()
print(“Finish”)
Result:
Start
← 5 second →
Finish
Another example shows how semaphore works with multiple threads. Whenever we call the method wait
it increases counter
by 1. Then it prints test1 or test2 (depending on which queue gets acquired faster) and then sleeps for 5 seconds. Then it’s time for the second call of the method wait
in another queue. It increases the counter
to 2 and since maxCount
is 1 it puts the queue on hold because condition self.counter > self.maxCount
returns a negative value.
let semaphore = Semaphore(maxCount: 1)
DispatchQueue.global().async {
semaphore.wait()
print(“test1”)
sleep(5)
semaphore.signal()
}
DispatchQueue.global().async {
semaphore.wait()
print(“test2”)
sleep(5)
semaphore.signal()
}
Result:
test1
← 5 second →
test2
Group
In this part, we will implement the class Group
with similar functionality to DispatchGroup
. To refresh your knowledge you can check the article about groups. Let’s define private variables. There are pthread_mutex_t
and pthread_cond_t
known from the Semaphore
section and counter
to count how many times enter
was called.
final class Group {
private var mutex = pthread_mutex_t()
private var condition = pthread_cond_t()
private var counter = 0
init() {
pthread_mutex_init(&mutex, nil)
pthread_cond_init(&condition, nil)
}
deinit {
pthread_mutex_destroy(&mutex)
pthread_cond_destroy(&condition)
}
}
DispatchGroup
has two primary methods enter
and leave
to enter and leave the group accordingly. enter
thread-safely increases the counter and leave
thread-safely decreases the counter and calls the pthread_cond_broadcast
to unlock the threads blocked by the pthread_cond_wait
methods. Using group we don’t have any order of wait calls instead we need to unblock all the waits through pthread_cond_broadcast
.
In the code below there are implementations of these functions:
func enter() {
pthread_mutex_lock(&mutex)
counter += 1
pthread_mutex_unlock(&mutex)
}
func leave() {
pthread_mutex_lock(&mutex)
counter -= 1
pthread_cond_broadcast(&self.condition)
pthread_mutex_unlock(&mutex)
}
Method wait
blocks the calling thread until pthread_cond_wait
receives a signal from the method leave
and counter
is 0:
func wait() {
pthread_mutex_lock(&self.mutex)
while(self.counter != 0) {
pthread_cond_wait(&self.condition, &self.mutex)
}
pthread_mutex_unlock(&self.mutex)
}
pthread_cond_wait
takes two parameters condition and mutex. It’s pretty clear why it requires a condition but the mutex parameter needs more consideration. pthread_cond_wait
atomically unlocks the mutex during its execution and locks the mutex after it finishes. That is not obvious why the condition function operates with a mutex. If you notice, the methods working with the condition are guarded with locks. That’s needed to prevent modification of the variable that regulates the condition (in our case counter
). We wouldn’t be able to call pthread_cond_signal
(since it’s covered with the mutex) if the pthread_cond_wait
would not release the mutex on its execution. For a better understanding here is the scheme:
Now we can implement the example from the article and use Group
class. There are 2 tasks executing in the concurrent queue in the same Group
and there is a method wait
that blocks the calling thread until both tasks are done. As you can see the result is the same if we would use DispatchGroup
.
let group = Group()
let concurrentQueue = DispatchQueue(label: “com.test.testGroup”, attributes: .concurrent)
group.enter()
concurrentQueue.async {
sleep(1)
print(“test1”)
group.leave()
}
group.enter()
concurrentQueue.async {
print(“test2”)
group.leave()
}
group.wait()
print(“All tasks were executed”)
Result:
test2
← 1 second →
test1
All tasks were executed
Today we discussed how to implement some GCD classes: Semaphore
and Group
. Of course, these methods are simplistic versions of the real DispatchSempahore
and DispatchGroup
but it is a good exercise to try to create them ourselves for educational goals.
Update: The was a mistake in the article which I found after publishing. It was about incorrect usage pthread_cond_signal
(which doesn’t guarantee the wake order) instead of pthread_cond_broadcast
. Now, everything is fixed with the explanations.