Given multiple threads enqueuing and dequeueing data from a queue, make sure a synchronized operation of random reads and writes, also ensuring that all threads make progress. C++ solution is below:
#include <semaphore.h>
#include <mutex>
class BoundedBlockingQueue {
private:
int capacity;
int queued;
int read;
int write;
std::mutex m;
sem_t waitOnWrite;
sem_t waitOnRead;
int *buf;
public:
bool isFull(void) {
return this->queued == this->capacity;
}
bool isEmpty(void) {
return this->queued == 0;
}
BoundedBlockingQueue(int capacity) {
this->capacity = capacity;
this->queued = 0;
this->read = 0;
this->write = 0;
sem_init(&this->waitOnWrite, 0, capacity);
sem_init(&this->waitOnRead, 0, capacity);
this->buf = new int[capacity];
}
void enqueue(int element) {
while (1) {
this->m.lock();
if (isFull()) {
this->m.unlock();
sem_wait(&this->waitOnWrite);
} else if (!isFull()) {
this->buf[this->write] = element;
this->write++;
if (this->write == this->capacity) {
this->write = 0;
}
this->queued++;
this->m.unlock();
sem_post(&this->waitOnRead);
break;
}
}
}
int dequeue() {
int val = 0;
while(1) {
this->m.lock();
if (isEmpty()) {
this->m.unlock();
sem_wait(&this->waitOnRead);
} else {
val = this->buf[this->read];
this->read++;
if (this->read == this->capacity) {
this->read = 0;
}
this->queued--;
this->m.unlock();
sem_post(&this->waitOnWrite);
break;
}
}
return val;
}
int size() {
return this->queued;
}
};