Producer / Consumer problem with Queues, Mutexes, Semaphores

Bahadir Balban

Buzz Founder

@learningalgorithms

Given multiple threads enqueuing and dequeueing data from a queue, make sure a synchronized operation of random reads and writes, also ensuring that all threads make progress. C++ solution is below:

#include <semaphore.h>
#include <mutex>
class BoundedBlockingQueue {
private:
    int capacity;
    int queued;
    int read;
    int write;
    std::mutex m;
    sem_t waitOnWrite;
    sem_t waitOnRead;
    int *buf;
    
public:
    
    bool isFull(void) {
        return this->queued == this->capacity;
    }
    
    bool isEmpty(void) {
        return this->queued == 0;
    }
    
    BoundedBlockingQueue(int capacity) {
        this->capacity = capacity;
        this->queued = 0;
        this->read = 0;
        this->write = 0;
        sem_init(&this->waitOnWrite, 0, capacity);
        sem_init(&this->waitOnRead, 0, capacity);
        this->buf = new int[capacity];
    }
    
    void enqueue(int element) {
        while (1) {
            this->m.lock();
            if (isFull()) {
                this->m.unlock();
                sem_wait(&this->waitOnWrite);
            } else if (!isFull()) {
                this->buf[this->write] = element;
                this->write++;
                if (this->write == this->capacity) {
                    this->write = 0;
                }
                this->queued++;
                this->m.unlock();
                sem_post(&this->waitOnRead);
                break;
            }
        }
    }
    
    int dequeue() {
        int val = 0;
        while(1) {
            this->m.lock();
            if (isEmpty()) {
                this->m.unlock();
                sem_wait(&this->waitOnRead);
            } else {
                val = this->buf[this->read];
                this->read++;
                if (this->read == this->capacity) {
                    this->read = 0;
                }
                this->queued--;
                this->m.unlock();
                sem_post(&this->waitOnWrite);
                break;
            }
        }
        return val;
    }
    
    int size() {
        return this->queued;
    }
};



Join The Discussion