Prawdopodobnie problem producentów i konsumentów nie jest Ci obcy. Jest to coś bardzo podobnego do tego z czym stykamy się na co dzień – czyli hamujemy podaż kiedy brak popytu i vice versa, a kiedy popyt jest duży produkcje puszczamy pełną parą. Aby rozwiązać owo zagadnienie potrzebujemy stworzyć model takiego kontenera, który będzie w stanie przyblokować producenta lub konsumenta stosownie od warunków. Tymi warunkami są: brak danych i nadmiar danych. Przy nadmiarze będziemy hamować producenta, żeby nie tracił danych, a przy braku danych będziemy z kolei hamować konsumenta. Miałem 2 pomysły, ale dużo bezpieczniejszym pod względem współpracy w środowisku wielowątkowym okazał się ten drugi, którego przedstawiam poniżej. Jego dodatkową cechą jest to, że jego operacje są przerywalne – wykonując operację terminate(), oraz można podłączyć do niej wielu producentów i wielu konsumentów jednocześnie.
Zacznijmy od klasy narzędziowej zakładającej lock’a scope’owego. Aby go utworzyć wymagany jest semafor, a tworząc go powinniśmy mu od razu nadać stan SIGNALED, czyli początkowa wartość licznika 1 oraz max wartość licznika tez 1 np.: CreateSemaphore(NULL, 1, 1, NULL).
class ScopedLock
{
HANDLE hSemaphore;
public:
ScopedLock(HANDLE hSemaphore)
: hSemaphore(hSemaphore)
{
DWORD waitResult = WaitForSingleObject(hSemaphore, INFINITE);
if (waitResult != WAIT_OBJECT_0)
throw std::runtime_error("Lock acquireing error.");
}
~ScopedLock()
{
ReleaseSemaphore(hSemaphore, 1, NULL);
}
};
template <class T>
class BoundedBlockingQueue
{
T* items;
int batchSize;
int size;
int headPosition;
HANDLE hBufferAccessSemaphore;
HANDLE hTerminateEvent;
HANDLE hBufferNotFullSemaphore;
HANDLE hBufferNotEmptySemaphore;
BOOL waitWhileBufferFull(DWORD miliSecs)
{
HANDLE objects[2] = {hBufferNotFullSemaphore, hTerminateEvent};
DWORD waitResult = WaitForMultipleObjects(2, objects, FALSE, miliSecs);
return waitResult == WAIT_OBJECT_0;
}
BOOL waitWhileBufferEmpty(DWORD miliSecs)
{
HANDLE objects[2] = {hBufferNotEmptySemaphore, hTerminateEvent};
DWORD waitResult = WaitForMultipleObjects(2, objects, FALSE, miliSecs);
return waitResult == WAIT_OBJECT_0;
}
public:
BoundedBlockingQueue(int _batchSize)
: items(new T[_batchSize])
, batchSize(_batchSize)
, size(0)
, headPosition(0)
, hBufferAccessSemaphore(CreateSemaphore(NULL, 1, 1, NULL))
, hTerminateEvent(CreateEvent(NULL, TRUE, FALSE, NULL))
, hBufferNotFullSemaphore(CreateSemaphore(NULL, _batchSize, _batchSize, NULL))
, hBufferNotEmptySemaphore(CreateSemaphore(NULL, 0, _batchSize, NULL))
{ }
BOOL enqueue(T item, DWORD miliSecs)
{
if (!waitWhileBufferFull(miliSecs))
return FALSE;
{
ScopedLock sl(hBufferAccessSemaphore);
items[headPosition] = item;
++size;
++headPosition;
if (headPosition >= batchSize)
headPosition -= batchSize;
ReleaseSemaphore(hBufferNotEmptySemaphore, 1, NULL);
return TRUE;
}
}
BOOL dequeue(T& outValue, DWORD miliSecs)
{
if (!waitWhileBufferEmpty(miliSecs))
return FALSE;
{
ScopedLock sl(hBufferAccessSemaphore);
int tmpId;
if (headPosition >= size)
tmpId = headPosition - size;
else
tmpId = batchSize - size + headPosition;
T tmpItem = items[tmpId];
--size;
ReleaseSemaphore(hBufferNotFullSemaphore, 1, NULL);
outValue = tmpItem;
return TRUE;
}
}
void terminate()
{
SetEvent(hTerminateEvent);
}
virtual ~BoundedBlockingQueue()
{
ScopedLock sl(hBufferAccessSemaphore);
CloseHandle(hBufferNotEmptySemaphore);
CloseHandle(hBufferNotFullSemaphore);
CloseHandle(hTerminateEvent);
CloseHandle(hBufferAccessSemaphore);
delete [] items;
}
};
A teraz zrobimy testy z użyciem biblioteki, o której pisałem wcześniej – THERON. Nie jest to najlepszy wybór do tego celu, ponieważ biblioteka ta jest biblioteką do komunikacji synchronicznej gwarantując w obrębie procesu kolejność dostarczania komunikatów, i ich wykonanie. My będziemy potrzebowali przetestować przerywanie tych procesów, ale zamiast zrobić dedykowany komunikat zatrzymujący jakiś task, użyjemy przerywacza w postaci zdarzenia.
struct ProducerStartMessage
{
};
class Producer : public Theron::Actor
{
public:
struct Parameters
{
BoundedBlockingQueue<unsigned long>& bbq;
HANDLE hTermEvent;
Parameters(BoundedBlockingQueue<unsigned long>& bbq)
: bbq(bbq)
, hTermEvent(CreateEvent(NULL, FALSE, FALSE, NULL))
{ }
void terminate()
{
SetEvent(hTermEvent);
}
~Parameters()
{
CloseHandle(hTermEvent);
}
};
inline explicit Producer(const Parameters& params)
: bbq(params.bbq)
, hTermEvent(params.hTermEvent)
{
RegisterHandler(this, &Producer::StartHandler);
printf("Producer with address %ld has been created.n", GetAddress().AsInteger());
}
private:
BoundedBlockingQueue<unsigned long>& bbq;
HANDLE hTermEvent;
inline void StartHandler(const ProducerStartMessage& msg, const Theron::Address from)
{
printf("Producer with address %ld has been started at THID %ld.n", GetAddress().AsInteger(), GetCurrentThreadId());
static unsigned long i = 0;
while (WaitForSingleObject(hTermEvent, 10) != WAIT_OBJECT_0)
{
if (bbq.enqueue(i, 10))
{
printf(" ->[%ld] enqueued: %ldn", GetCurrentThreadId(), i);
++i;
}
}
printf("Producer with address %ld has been stopped.n", GetAddress().AsInteger());
}
};
struct ConsumerStartMessage
{
};
class Consumer : public Theron::Actor
{
public:
struct Parameters
{
BoundedBlockingQueue<unsigned long>& bbq;
HANDLE hTermEvent;
Parameters(BoundedBlockingQueue<unsigned long>& bbq)
: bbq(bbq)
, hTermEvent(CreateEvent(NULL, FALSE, FALSE, NULL))
{ }
void terminate()
{
SetEvent(hTermEvent);
}
~Parameters()
{
CloseHandle(hTermEvent);
}
};
inline explicit Consumer(const Parameters& params)
: bbq(params.bbq)
, hTermEvent(params.hTermEvent)
{
RegisterHandler(this, &Consumer::StartHandler);
printf("Consumer with address %ld has been created.n", GetAddress().AsInteger());
}
private:
BoundedBlockingQueue<unsigned long>& bbq;
HANDLE hTermEvent;
inline void StartHandler(const ConsumerStartMessage& msg, const Theron::Address from)
{
printf("Consumer with address %ld has been started at THID %ld.n", GetAddress().AsInteger(), GetCurrentThreadId());
unsigned long i = 0;
while (WaitForSingleObject(hTermEvent, 40) != WAIT_OBJECT_0)
{
unsigned long i;
if (bbq.dequeue(i, 40))
printf(" <-[%ld] dequeued: %ldn", GetCurrentThreadId(), i);
}
printf("Consumer with address %ld has been stopped.n", GetAddress().AsInteger());
}
};
int _tmain(int argc, _TCHAR* argv[])
{
Theron::Framework framework(10);
Theron::Receiver receiver;
BoundedBlockingQueue<unsigned long> bbq(10);
Producer::Parameters producerParams1(bbq);
Consumer::Parameters consumerParams1(bbq);
Producer::Parameters producerParams2(bbq);
Consumer::Parameters consumerParams2(bbq);
Theron::ActorRef producer1(framework.CreateActor<Producer>(producerParams1));
Theron::ActorRef consumer1(framework.CreateActor<Consumer>(consumerParams1));
Theron::ActorRef producer2(framework.CreateActor<Producer>(producerParams2));
Theron::ActorRef consumer2(framework.CreateActor<Consumer>(consumerParams2));
Sleep(500);
framework.Send(ProducerStartMessage(), receiver.GetAddress(), producer1.GetAddress());
Sleep(500);
framework.Send(ConsumerStartMessage(), receiver.GetAddress(), consumer1.GetAddress());
Sleep(500);
framework.Send(ProducerStartMessage(), receiver.GetAddress(), producer2.GetAddress());
Sleep(500);
framework.Send(ConsumerStartMessage(), receiver.GetAddress(), consumer2.GetAddress());
Sleep(5000);
producerParams1.terminate();
Sleep(50);
consumerParams1.terminate();
Sleep(50);
producerParams2.terminate();
Sleep(50);
consumerParams2.terminate();
system("PAUSE");
return 0;
}
Pobierz artykuł jako plik PDF