GCD Primitives in Depth: Semaphore and Group

Alex Shchukin
6 min readOct 19, 2022

--

In this article, we will implement some of the GCD classes using low-level primitives to understand how GCD is actually functioning.

Semaphore

We’re going to begin with DispatchSemaphore, although what we’ll be implementing is a simplified version. The real DispatchSemaphore has a much more specific set of functionalities. To refresh your knowledge about DispatchSempahore you can check the article about synchronization.

Now we can start to implement the Semaphore. There is an initialization section in the code snippet below:

final class Semaphore {
private var mutex = pthread_mutex_t()
private var condition = pthread_cond_t()
private var counter = 0
private var maxCount = 0
private let queue: Queue = Queue()

// maxCount - max thread amount waiting for signal
init(maxCount: Int = 0) {
pthread_mutex_init(&mutex, nil)
pthread_cond_init(&condition, nil)
self.maxCount = maxCount
}
deinit {
pthread_mutex_destroy(&mutex)
pthread_cond_destroy(&condition)
}
}

In this section, we use pthread_mutex_t and pthread_cond_t, types from the POSIX thread library, written in C. pthread_mutex_t serves as a lock, allowing us to ensure thread-safe access to resources, such as counter. pthread_cond_t can block the calling thread via pthread_cond_wait and resume it using pthread_cond_signal.

To initialize these types, we employ pthread_mutex_init and pthread_cond_init respectively. It’s important to note of deinitialization: these primitives aren’t self-destructing. You must manually terminate them using pthread_mutex_destroy or pthread_cond_destroy.

Important to know that, pthread_cond_signal can unblock at least one thread if there are several threads blocked by pthread_cond_wait and the scheduling policy shall determine the order of the unblocking. In macOS by default, the order is determined by the priority which doesn’t correspond with the semaphore logic. To bypass this limitation we will employ pthread_cond_broadcast which unblocks all pthread_cond_wait used on the condition. And to provide sequential order we introduce FIFO Queue which we will use later on in the wait method. There is an implementation of Queue in the code snippet below:

    fileprivate final class Queue {
private var elements: [Int] = []

func enqueue(element: Int) {
elements.append(element)
}

func dequeue() -> Int? {
guard !elements.isEmpty else { return nil }

return elements.removeFirst()
}

func peek() -> Int? {
guard !elements.isEmpty else { return nil }

return elements.first
}

var isEmpty: Bool {
return elements.isEmpty
}
}

Let’s start with the simple implementation of the method wait. It blocks the calling thread using condition until it gets the signal.

func wait() {
pthread_mutex_lock(&self.mutex)
pthread_cond_wait(&self.condition, &self.mutex)
pthread_mutex_unlock(&self.mutex)
}

That will work with one thread solution but we need to make it works with multiple threads. Let’s improve wait using maxCount variable. Whenever counter is bigger than maxCount it calls pthread_cond_wait. So counter regulates the throughput of Semaphore. It means that the larger counter the more threads can execute wait simultaneously.

func wait() {
pthread_mutex_lock(&self.mutex)
counter += 1
while(self.counter > self.maxCount) {
pthread_cond_wait(&self.condition, &self.mutex)
}
pthread_mutex_unlock(&self.mutex)
}

Recall that pthread_cond_signal does not ensure the order in which threads are awakened. To resolve this, we will use pthread_cond_broadcast. This function wakes all threads that have been blocked by pthread_cond_wait. However, we desire to put only the earliest thread in the awake state and return the others to a waiting state. To accomplish this, we must keep track of the order in which the wait calls are made.

Here, we introduce a local variable tmpCounter to keep track of the order. Being local, tmpCounter will have a unique value for each wait call. Then we enqueue this counter into a Queue. If the value of the counter doesn’t match tmpCounter we put the thread on wait again. This ensures that we manage the concurrency of our threads in the FIFO order.

func wait() {
pthread_mutex_lock(&self.mutex)

counter += 1
let tmpCounter = counter
while(self.counter > self.maxCount && self.queue.peek() != tmpCounter) {
queue.enqueue(element: tmpCounter)
pthread_cond_wait(&self.condition, &self.mutex)
}

pthread_mutex_unlock(&self.mutex)
}

Now we need to implement the signal function. It decreases the counter we use to keep the semaphore waiting in the while loop and broadcast to all waits.

func signal() {
pthread_mutex_lock(&mutex)

counter -= 1
pthread_cond_broadcast(&condition)
pthread_mutex_unlock(&mutex)
}

Let’s test the semaphore to check how it works. In the example below the calling thread gets blocked by the method wait for 5 seconds until the global queue calls the signal method:

let semaphore = Semaphore(maxCount: 0)
print(“Start”)
DispatchQueue.global().async {
sleep(5)
semaphore.signal()
}
semaphore.wait()
print(“Finish”)

Result:

Start
← 5 second →
Finish

Another example shows how semaphore works with multiple threads. Whenever we call the method wait it increases counter by 1. Then it prints test1 or test2 (depending on which queue gets acquired faster) and then sleeps for 5 seconds. Then it’s time for the second call of the method wait in another queue. It increases the counter to 2 and since maxCount is 1 it puts the queue on hold because condition self.counter > self.maxCount returns a negative value.

let semaphore = Semaphore(maxCount: 1)
DispatchQueue.global().async {
semaphore.wait()
print(“test1”)
sleep(5)
semaphore.signal()
}

DispatchQueue.global().async {
semaphore.wait()
print(“test2”)
sleep(5)
semaphore.signal()
}

Result:

test1
← 5 second →
test2

Group

In this part, we will implement the class Group with similar functionality to DispatchGroup. To refresh your knowledge you can check the article about groups. Let’s define private variables. There are pthread_mutex_t and pthread_cond_t known from the Semaphore section and counter to count how many times enter was called.

final class Group {
private var mutex = pthread_mutex_t()
private var condition = pthread_cond_t()
private var counter = 0

init() {
pthread_mutex_init(&mutex, nil)
pthread_cond_init(&condition, nil)
}

deinit {
pthread_mutex_destroy(&mutex)
pthread_cond_destroy(&condition)
}
}

DispatchGroup has two primary methods enter and leave to enter and leave the group accordingly. enter thread-safely increases the counter and leave thread-safely decreases the counter and calls the pthread_cond_broadcast to unlock the threads blocked by the pthread_cond_wait methods. Using group we don’t have any order of wait calls instead we need to unblock all the waits through pthread_cond_broadcast .

In the code below there are implementations of these functions:

func enter() {
pthread_mutex_lock(&mutex)
counter += 1
pthread_mutex_unlock(&mutex)
}

func leave() {
pthread_mutex_lock(&mutex)

counter -= 1
pthread_cond_broadcast(&self.condition)
pthread_mutex_unlock(&mutex)
}

Method wait blocks the calling thread until pthread_cond_wait receives a signal from the method leave and counter is 0:

func wait() {
pthread_mutex_lock(&self.mutex)
while(self.counter != 0) {
pthread_cond_wait(&self.condition, &self.mutex)
}
pthread_mutex_unlock(&self.mutex)
}

pthread_cond_wait takes two parameters condition and mutex. It’s pretty clear why it requires a condition but the mutex parameter needs more consideration. pthread_cond_wait atomically unlocks the mutex during its execution and locks the mutex after it finishes. That is not obvious why the condition function operates with a mutex. If you notice, the methods working with the condition are guarded with locks. That’s needed to prevent modification of the variable that regulates the condition (in our case counter). We wouldn’t be able to call pthread_cond_signal (since it’s covered with the mutex) if the pthread_cond_wait would not release the mutex on its execution. For a better understanding here is the scheme:

Group call stack and lock state

Now we can implement the example from the article and use Group class. There are 2 tasks executing in the concurrent queue in the same Group and there is a method wait that blocks the calling thread until both tasks are done. As you can see the result is the same if we would use DispatchGroup.

let group = Group()
let concurrentQueue = DispatchQueue(label: “com.test.testGroup”, attributes: .concurrent)

group.enter()
concurrentQueue.async {
sleep(1)
print(“test1”)
group.leave()
}

group.enter()
concurrentQueue.async {
print(“test2”)
group.leave()
}
group.wait()

print(“All tasks were executed”)

Result:

test2
← 1 second →
test1
All tasks were executed

Today we discussed how to implement some GCD classes: Semaphore and Group. Of course, these methods are simplistic versions of the real DispatchSempahore and DispatchGroup but it is a good exercise to try to create them ourselves for educational goals.

Update: The was a mistake in the article which I found after publishing. It was about incorrect usage pthread_cond_signal (which doesn’t guarantee the wake order) instead of pthread_cond_broadcast . Now, everything is fixed with the explanations.

All the implementations you can find by following this link.

Check my Twitter to get the newest updates.

--

--

Responses (1)