WTF are Synchronization Primitives

Introduction

When writing a concurrent code, there’s bound to be some data that will be shared across multiple threads. In an ideal world, every thread has a reliable and consistent view of data and the order in which operations are requested by application is maintained across threads. BUT, we live in a world that leans towards chaos and if proper care is not taken when writing a multi-threaded application, we might be on an express-way to undefined behavior, memory corruption and undefined code. This is where synchronization primitives comes in.

All the bad quirks of a concurrent code boils down to one single thing: “Writing to a memory location that’s accessible across multiple threads when we are not supposed to”.
If we think about it, if the data does not change, there can be any number of threads reading the shared data with no consequence. We don’t need any synchronization mechanisms in this case and a data race in this scenario would be impossible.
Try running the code below and see what abomination you get!!

#include <iostream>
#include <thread>
#include <string>

std::string printableStr;

void AddToString(std::string str)
{
    for(int i = 0; i < str.length(); i++)
    {
        printableStr.append(1, str.at(i));
    }
}

int main()
{
    std::thread thread1(&AddToString, "Hello ");
    std::thread thread2(&AddToString, "Concurrent World. ");
    std::thread thread3(&AddToString, "It's all the rage now!!!");

    thread1.join();
    thread2.join();
    thread3.join();
    std::cout << "Result: " << printableStr << "\n";
}

In the code above, there’s no guarantee that the order of the words, letters or even the validity of the data is maintained.
Therefore, care needs to be taken when we are modifying shared data so that it doesn’t conflict with the threads that are currently reading/writing data. In either case, we need some mechanisms to make one thread wait while the thread that’s modifying the shared data completes its work.
Let’s study 3 such low-level primitives that helps us achieve synchronization.

Mutex:

Named aptly as it provides mutually-exclusive access to a shared resource across multiple threads. The concept of a mutex is quite simple. The thread that initially reaches a shared resource locks the mutex, thereby suggesting that the current thread is using the shared resource and any other thread that tries to access this resource should wait until the current thread completes it’s processing and unlocks the mutex.

In the example below, using a mutex ensures that the order of letters in the supplied string is maintained and also ensures the validity of data, i.e., no data corruption takes place. BUT, there’s no guarantee to the order of words!! There’s no guarantee that “thread1” runs first, “thread2” next and “thread3” at the end. OS might decide “thread2” or “thread3” should run first!!! To ensure a proper sentence is formed, we need some way to make the main thread wait until the first thread runs, then wait for the second and third. We can use conditional-variables to achieve this.
For now, try running this code and see what happens!

#include <iostream>
#include <thread>
#include <string>
#include <mutex>

std::string printableStr;
std::mutex globalMtx;

void AddToStringMtx(std::string str)
{
    globalMtx.lock();
    for(size_t i = 0; i < str.length(); i++)
    {
        printableStr.append(1, str.at(i));
    }
    globalMtx.unlock();
}

int main()
{   
    std::thread thread1(&AddToStringMtx, "Hello ");
    std::thread thread2(&AddToStringMtx, "Concurrent World. ");
    std::thread thread3(&AddToStringMtx, "It's all the rage now!!!");

    thread1.join();
    thread2.join();
    thread3.join();
    
    std::cout << "Result: " << printableStr << "\n";
}

Conditional:

Conditional variables are composite types that work with the help of a mutex and a queue/stack to keep track of threads that are waiting. Conditionals are a signaling mechanism that enables us to resume the execution of a thread from a different thread, whereas mutex doesn’t support unlocking from a different thread.
A conditional variable requires a locked mutex to be passed, which it then unlocks and suspends the thread. When signaled, the mutex is locked again and the thread resumes it’s execution.

In the example below, multiple threads wait on “globalMtx” at the end of their lifetime. When all threads are resumed using “notifyAll”, all the waiting threads fight to execute the instructions that follow. If the conditional variable doesn’t lock the mutex on resumption of thread, it results in undefined behavior. Since the conditional variable locks “globalMtx” when thread resumes, the initial thread that gains control executes the instructions. If any other thread tries to lock mutex, it enters a waiting state as “globalMtx” is already locked. Execution proceeds when the current thread executing unlocks “globalMtx”.
Here’s a sample implementation of a conditional variable and its usage using a Mutex, Stack and windows native functions using same example as before and printing to output in the order expected.

#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <Windows.h>
#include <stack>

class Conditional
{
public:
    Conditional(){}
    void wait(std::mutex& mx)
    {
        m_mutex.lock();
        HANDLE handle = OpenThread(THREAD_ALL_ACCESS, FALSE, GetCurrentThreadId());
        m_threadStack.push(handle);
        m_mutex.unlock();

        //Release the mutex, go into waiting state and then reaquire lock
        mx.unlock();
        SuspendThread(handle);
        mx.lock();
    }
    void signal()
    {   
        m_mutex.lock();
        HANDLE handle = m_threadStack.top();
        DWORD res = ResumeThread(handle);
        m_threadStack.pop();
        m_mutex.unlock();
    }
    void notifyAll()
    {
        while (!m_threadStack.empty())
        {
            signal();
        }
    }
private:
    std::mutex m_mutex; //Protects thread
    std::stack<HANDLE> m_threadStack;
};

std::string printableStr;
std::mutex globalMtx;
Conditional c;

void AddToString(std::string str)
{
    Sleep(10);
    for(size_t i = 0; i < str.length(); i++)
    {
        printableStr.append(1, str.at(i));
    }
    c.signal(); //Signals main thread

    globalMtx.lock();
    c.wait(globalMtx);   //This thread waits at the very end until signaled
    std::cout << "Thread " << GetCurrentThreadId() << " Finished\n";
    globalMtx.unlock();
}
int main()
{
    std::mutex mtx;

    mtx.lock();

    std::thread thread1(&AddToString, "Hello ");
    c.wait(mtx);
    std::thread thread2(&AddToString, "Concurrent World. ");
    c.wait(mtx);
    std::thread thread3(&AddToString, "It's all the rage now!!!");
    c.wait(mtx);
    
    mtx.unlock();

    std::cout << "Result: " << printableStr << "\n";

    mtx.lock();
    c.notifyAll();
    mtx.unlock();
    
    thread1.join();
    thread2.join();
    thread3.join();
}

Semaphore:

A semaphore is a specialized conditional that has an additional “count” parameter that keeps track of how many threads can access a resource simultaneously. Like Conditional-Variables, Semaphore is also a signaling mechanism, which implies that blocked/waiting threads must be resumed/signaled from different threads.
From C++ 20, standard library comes with “std::binary_semaphore” and “std::counting_semaphore” under “#include <semaphore>.

A binary semaphore only allows a single thread to access a resource, while a counting semaphore allows a user specified number of threads to run simultaneously.
For some of the important differences between semaphore and a conditional variable, you can refer: https://www.geeksforgeeks.org/difference-between-semaphore-and-condition-variable/

,

Leave a Reply

Your email address will not be published. Required fields are marked *