Tworzymy własny BoundedBlockingQueue

Prawdopodobnie problem producentów i konsumentów nie jest Ci obcy. Jest to coś bardzo podobnego do tego z czym stykamy się na co dzień – czyli hamujemy podaż kiedy brak popytu i vice versa, a kiedy popyt jest duży produkcje puszczamy pełną parą. Aby rozwiązać owo zagadnienie potrzebujemy stworzyć model takiego kontenera, który będzie w stanie przyblokować producenta lub konsumenta stosownie od warunków. Tymi warunkami są: brak danych i nadmiar danych. Przy nadmiarze będziemy hamować producenta, żeby nie tracił danych, a przy braku danych będziemy z kolei hamować konsumenta. Miałem 2 pomysły, ale dużo bezpieczniejszym pod względem współpracy w środowisku wielowątkowym okazał się ten drugi, którego przedstawiam poniżej. Jego dodatkową cechą jest to, że jego operacje są przerywalne – wykonując operację terminate(), oraz można podłączyć do niej wielu producentów i wielu konsumentów jednocześnie.

Zacznijmy od klasy narzędziowej zakładającej lock’a scope’owego. Aby go utworzyć wymagany jest semafor, a tworząc go powinniśmy mu od razu nadać stan SIGNALED, czyli początkowa wartość licznika 1 oraz max wartość licznika tez 1 np.: CreateSemaphore(NULL, 1, 1, NULL).

 

class ScopedLock
{
	HANDLE hSemaphore;
public:
	ScopedLock(HANDLE hSemaphore)
		: hSemaphore(hSemaphore)
	{
		DWORD waitResult = WaitForSingleObject(hSemaphore, INFINITE);

		if (waitResult != WAIT_OBJECT_0)
			throw std::runtime_error("Lock acquireing error.");
	}

	~ScopedLock()
	{
		ReleaseSemaphore(hSemaphore, 1, NULL);
	}
};
template <class T>
class BoundedBlockingQueue
{
	T* items;
	int batchSize;
	int size;
	int headPosition;
	HANDLE hBufferAccessSemaphore;
	HANDLE hTerminateEvent;
	HANDLE hBufferNotFullSemaphore;
	HANDLE hBufferNotEmptySemaphore;

	BOOL waitWhileBufferFull(DWORD miliSecs)
	{
		HANDLE objects[2] = {hBufferNotFullSemaphore, hTerminateEvent};
		DWORD waitResult = WaitForMultipleObjects(2, objects, FALSE, miliSecs);
		return waitResult == WAIT_OBJECT_0;
	}

	BOOL waitWhileBufferEmpty(DWORD miliSecs)
	{
		HANDLE objects[2] = {hBufferNotEmptySemaphore, hTerminateEvent};
		DWORD waitResult = WaitForMultipleObjects(2, objects, FALSE, miliSecs);
		return waitResult == WAIT_OBJECT_0;
	}

public:
	BoundedBlockingQueue(int _batchSize)
		: items(new T[_batchSize])
		, batchSize(_batchSize)
		, size(0)
		, headPosition(0)
		, hBufferAccessSemaphore(CreateSemaphore(NULL, 1, 1, NULL))
		, hTerminateEvent(CreateEvent(NULL, TRUE, FALSE, NULL))
		, hBufferNotFullSemaphore(CreateSemaphore(NULL, _batchSize, _batchSize, NULL))
		, hBufferNotEmptySemaphore(CreateSemaphore(NULL, 0, _batchSize, NULL))
	{ }

	BOOL enqueue(T item, DWORD miliSecs)
	{
		if (!waitWhileBufferFull(miliSecs))
			return FALSE;
		{
			ScopedLock sl(hBufferAccessSemaphore);
			items[headPosition] = item;
			++size;
			++headPosition;

			if (headPosition >= batchSize)
				headPosition -= batchSize;

			ReleaseSemaphore(hBufferNotEmptySemaphore, 1, NULL);
			return TRUE;
		}
	}

	BOOL dequeue(T& outValue, DWORD miliSecs)
	{
		if (!waitWhileBufferEmpty(miliSecs))
			return FALSE;
		{
			ScopedLock sl(hBufferAccessSemaphore);
			int tmpId;

			if (headPosition >= size)
				tmpId = headPosition - size;
			else
				tmpId = batchSize - size + headPosition;

			T tmpItem = items[tmpId];
			--size;
			ReleaseSemaphore(hBufferNotFullSemaphore, 1, NULL);
			outValue = tmpItem;

			return TRUE;
		}
	}

	void terminate()
	{
		SetEvent(hTerminateEvent);
	}

	virtual ~BoundedBlockingQueue()
	{
		ScopedLock sl(hBufferAccessSemaphore);
		CloseHandle(hBufferNotEmptySemaphore);
		CloseHandle(hBufferNotFullSemaphore);
		CloseHandle(hTerminateEvent);
		CloseHandle(hBufferAccessSemaphore);
		delete [] items;
	}

};

A teraz zrobimy testy z użyciem biblioteki, o której pisałem wcześniej – THERON. Nie jest to najlepszy wybór do tego celu, ponieważ biblioteka ta jest biblioteką do komunikacji synchronicznej gwarantując w obrębie procesu kolejność dostarczania komunikatów, i ich wykonanie. My będziemy potrzebowali przetestować przerywanie tych procesów, ale zamiast zrobić dedykowany komunikat zatrzymujący jakiś task, użyjemy przerywacza w postaci zdarzenia.

struct ProducerStartMessage
{
};

class Producer : public Theron::Actor
{
public:

	struct Parameters
	{
		BoundedBlockingQueue<unsigned long>& bbq;
		HANDLE hTermEvent;

		Parameters(BoundedBlockingQueue<unsigned long>& bbq)
			: bbq(bbq)
			, hTermEvent(CreateEvent(NULL, FALSE, FALSE, NULL))
		{ }

		void terminate()
		{
			SetEvent(hTermEvent);
		}

		~Parameters()
		{
			CloseHandle(hTermEvent);
		}
	};

	inline explicit Producer(const Parameters& params)
		: bbq(params.bbq)
		, hTermEvent(params.hTermEvent)
	{
		RegisterHandler(this, &Producer::StartHandler);
		printf("Producer with address %ld has been created.n", GetAddress().AsInteger());
	}

private:
	BoundedBlockingQueue<unsigned long>& bbq;
	HANDLE hTermEvent;

	inline void StartHandler(const ProducerStartMessage& msg, const Theron::Address from)
	{
		printf("Producer with address %ld has been started at THID %ld.n", GetAddress().AsInteger(), GetCurrentThreadId());
		static unsigned long i = 0;

		while (WaitForSingleObject(hTermEvent, 10) != WAIT_OBJECT_0)
		{
			if (bbq.enqueue(i, 10))
			{
				printf(" ->[%ld] enqueued: %ldn", GetCurrentThreadId(), i);
				++i;
			}
		}

		printf("Producer with address %ld has been stopped.n", GetAddress().AsInteger());
	}
};

struct ConsumerStartMessage
{
};

class Consumer : public Theron::Actor
{
public:

	struct Parameters
	{
		BoundedBlockingQueue<unsigned long>& bbq;
		HANDLE hTermEvent;

		Parameters(BoundedBlockingQueue<unsigned long>& bbq)
			: bbq(bbq)
			, hTermEvent(CreateEvent(NULL, FALSE, FALSE, NULL))
		{ }

		void terminate()
		{
			SetEvent(hTermEvent);
		}

		~Parameters()
		{
			CloseHandle(hTermEvent);
		}
	};

	inline explicit Consumer(const Parameters& params)
		: bbq(params.bbq)
		, hTermEvent(params.hTermEvent)
	{
		RegisterHandler(this, &Consumer::StartHandler);
		printf("Consumer with address %ld has been created.n", GetAddress().AsInteger());
	}

private:
	BoundedBlockingQueue<unsigned long>& bbq;
	HANDLE hTermEvent;

	inline void StartHandler(const ConsumerStartMessage& msg, const Theron::Address from)
	{
		printf("Consumer with address %ld has been started at THID %ld.n", GetAddress().AsInteger(), GetCurrentThreadId());
		unsigned long i = 0;

		while (WaitForSingleObject(hTermEvent, 40) != WAIT_OBJECT_0)
		{
			unsigned long i;

			if (bbq.dequeue(i, 40))
				printf(" <-[%ld] dequeued: %ldn", GetCurrentThreadId(), i);
		}
		printf("Consumer with address %ld has been stopped.n", GetAddress().AsInteger());
	}
};

int _tmain(int argc, _TCHAR* argv[])
{
	Theron::Framework framework(10);
	Theron::Receiver receiver;

	BoundedBlockingQueue<unsigned long> bbq(10);

	Producer::Parameters producerParams1(bbq);
	Consumer::Parameters consumerParams1(bbq);

	Producer::Parameters producerParams2(bbq);
	Consumer::Parameters consumerParams2(bbq);

	Theron::ActorRef producer1(framework.CreateActor<Producer>(producerParams1));
	Theron::ActorRef consumer1(framework.CreateActor<Consumer>(consumerParams1));

	Theron::ActorRef producer2(framework.CreateActor<Producer>(producerParams2));
	Theron::ActorRef consumer2(framework.CreateActor<Consumer>(consumerParams2));

	Sleep(500);
	framework.Send(ProducerStartMessage(), receiver.GetAddress(), producer1.GetAddress());
	Sleep(500);
	framework.Send(ConsumerStartMessage(), receiver.GetAddress(), consumer1.GetAddress());
	Sleep(500);
	framework.Send(ProducerStartMessage(), receiver.GetAddress(), producer2.GetAddress());
	Sleep(500);
	framework.Send(ConsumerStartMessage(), receiver.GetAddress(), consumer2.GetAddress());

	Sleep(5000);

	producerParams1.terminate();
	Sleep(50);
	consumerParams1.terminate();

	Sleep(50);
	producerParams2.terminate();
	Sleep(50);
	consumerParams2.terminate();

	system("PAUSE");
	return 0;
}

A oto jak poszły testy Test BoundedBlockingQueue’

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *