Using Semaphores: Multithreaded Producer/Consumer

Home
Back To Tips Page

As usual, this essay was prompted by seeing the same question several times in one week on the newsgroup. I've been using semaphores since 1969, shortly after Edsgar Dijkstra first proposed them in his classic paper [E. W. Dijkstra. Cooperating Sequential Processes. Programming Languages (F. Genuys, ed.), Academic Press, New York, 1968]. I used them a lot in the C.mmp/Hydra system we did at CMU in the mid-1970s, and have used them extensively in Windows. They are actually easy to use. 

I have discovered that far too many people use an Event instead of a Semaphore to do synchronization. They are intended to solve different problems, and one does not address the problem domain of the other. You will see in the example below that I use both, for different purposes.

You may also want to read my essay on worker threads.

The solution here is fully general; it can work with multiple producers and multiple consumers. In the example code that accompanies it, I show one producer and two consumers, but the solution will generalize to multiple producers.

What I did was create a Queue class in C++. Rather than do a specific class or do a template class (template classes have some problems in MFC), I just created one that takes void * pointers and enqueues them. Thus you should be able to use this code directly in many projects.

You may ask, why am I using the API calls for synchronization and not the MFC synchronization classes like CMutex, CSemaphore, and CEvent. There are two reasons. One is pedagogical: I want you to see the primitives in action. The other is pragmatic. In an earlier release of MFC, the synchronization primitives were done so badly that they were actually wrong, and totally unusable. For example, I was using CEvent::SetEvent to synchronize two separate processes, but some overly-clever programmer at Microsoft decided it was cool to test a Boolean flag to see if the Event had already been set and not "waste time" calling the kernel if it was set. Stupid programmer; the code to call CEvent::ResetEvent was in a totally separate process, which is exactly one of the things that you want to do with an Event! And just to add to the incompetence, the programmer actually did an unsynchronized test of a Boolean flag to see if the Event should be set. Code this badly done is beneath contempt. The programmer was truly utterly clueless about the basic operations. Perhaps Microsoft has fixed this code. I've never bothered to look. I don't trust it. Nor do a lot of other people in the MFC newsgroup.

download.gif (1234 bytes)The code and a demo program are all available in a source download.

 

The following section describes the user interface to the class.


BOOL Queue::AddTail(LPVOID p)

This adds an item to the head of the queue and notifies the waiting thread(s) that the queue is nonempty. Returns TRUE if it successfully enqueued the request; FALSE if the semaphore limit was hit.

Note: The test program ignores the Boolean value in terms of recovery and simply discards the item it was enqueuing. You may wish to do otherwise.

LPVOID Queue::RemoveHead()

This removes an item from the queue. If the queue is empty, this call blocks. A call that is blocked on a RemoveTail operation will cause the thread to terminate if the shutdown method is called.

void Queue::shutdown()

Causes all threads that are blocked on RemoveTail to terminate. Note that there is no effort to dequeue any pending items from the queue; it is the responsibility of the caller to not call this method if there is anything in the queue.

Note: I do not stop threads from putting things into the queue during a shutdown. This would involve a Boolean flag that would be set to indicate subsequent AddTail should return FALSE. Note that in such a case the AddTail should call ::SetLastError to set a flag that says that a shutdown is in progress, so the caller can use ::GetLastError to determine the reason FALSE was returned. There is already an error code, ERROR_SHUTDOWN_IN_PROGRESS, that could be co-opted for this purpose.


The code for Queue.h contains the entire queue class. There is no .cpp file. Here it is in its entirety:

class Queue {
public:
   //----------------
   // Queue
   //----------------
   Queue(UINT limit)
     {
      handles[SemaphoreIndex] = ::CreateSemaphore(NULL,  // no security attributes
                                                  0,     // initial count
                                                  limit, // max count
                                                  NULL); // anonymous

      handles[StopperIndex] = ::CreateEvent(NULL,  // no security attributes
                                            TRUE,  // manual reset
                                            FALSE, // initially non-signaled
                                            NULL); // anonymous

      ::InitializeCriticalSection(&lock);
     } // Queue

   //----------------
   // ~Queue
   //----------------
   ~Queue()
     {
      ::CloseHandle(handles[SemaphoreIndex]);
      ::CloseHandle(handles[StopperIndex]);
      ::DeleteCriticalSection(&lock);
     } // ~Queue

   //----------------
   // AddTail
   //----------------
   BOOL AddTail(LPVOID p)
     { 
      BOOL result;
      ::EnterCriticalSection(&lock);
      queue.AddTail(p);
      result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
      if(!result)
        { /* failed */
         // caller can use ::GetLastError to determine what went wrong
         queue.RemoveTail();
        } /* failed */
      ::LeaveCriticalSection(&lock);
      return result;
     } // AddTail

   //----------------
   // RemoveHead
   //----------------
   LPVOID RemoveHead()
     { 
      LPVOID result;

      switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
         { /* decode */
          case StopperIndex: // shut down thread
              AfxEndThread(0); // kill thread; see Note
          return NULL; // return keeps C compiler happy

          case SemaphoreIndex: // semaphore
              ::EnterCriticalSection(&lock);
              result = queue.RemoveHead();
              ::LeaveCriticalSection(&lock);
              return result;

          case WAIT_TIMEOUT: // not implemented
          default:
              ASSERT(FALSE); // impossible condition
              return NULL;
         } /* decode */
      } // RemoveHead

   //----------------
   // shutdown
   //----------------
   void shutdown()
      {
       ::SetEvent(handles[StopperIndex]);
      } // shutdown

 protected:
      enum {StopperIndex, SemaphoreIndex};
      HANDLE handles[2];
      CRITICAL_SECTION lock;
      CList<LPVOID, LPVOID> queue;
};

The basic idea is that the CList called queue is manipulated only from the two methods supplied, AddTail and RemoveHead. Since it is a protected member (very important!) there is no other access to it, and thus all accesses can be protected.

A Note on thread termination

Note: I do not believe in using an explicit thread-exit call.  In an earlier draft, for some reason I had an ::ExitThread(0) call, which was clearly erroneous.  I have no idea where my head was when I wrote that, since I checked back in the original code and something completely different was there.  This leads to an issue of the right way to terminate a thread.  I actually believe the right way to terminate a thread, the only right way, is to exit via the top-level thread function.  When I checked my original code, what I found was something I had not planned to discuss here: the use of exceptions to terminate a thread. 

What was in my original code was the following

class CTerminateThreadException : public CException {
     public:
        CTerminateThreadException() : CException() { }
};

      case StopperIndex:
           throw new CTerminateThreadException;

where the top-level thread loop in my caller looked like this:

UINT CMyThread::thread(LPVOID me)
    {
     (CMyThread *)self = (CMyThread *)me;
     self->run();
     return 0;
    }

void CMyThread::run()
   {
    ... initialize data structures...
    try { /* try */
      while(TRUE)
        { /* thread loop */
         ...
         LPVOID item = queue.RemoveHead();
         ... process queue item
        } /* thread loop */
       } /* try */
    catch(CTerminateThreadException * e)
       { /* CTerminateThreadException */
        ...cleanup if appropriate
        e->Delete();
       } /* CTerminateThreadException */

    ... clean up effects of initialization...  
   }

The Implementation

Now let's look at the code in more detail:


Constructor

   Queue(UINT limit)
     {
      handles[SemaphoreIndex] = ::CreateSemaphore(NULL,  // no security attributes
                                                  0,     // initial count
                                                  limit, // max count
                                                  NULL); // anonymous

      handles[StopperIndex] = ::CreateEvent(NULL,  // no security attributes
                                            TRUE,  // manual reset
                                            FALSE, // initially non-signaled
                                            NULL); // anonymous

      ::InitializeCriticalSection(&lock);
     } // Queue

We will need an array of two handles for a later ::WaitForMultipleObjects, so I just create the handles directly in the array. To give mnemonic names for the two indices, I use an enum. The choice of the enumeration constants is very important. When ::WaitForMultipleObjects leaves a wait, it reports the index of the lowest numbered array item that became signaled. Therefore, the order in which you select these indices can determine if preference is given to shutdown or queue element processing. In this example, I have given preference to shutting down, so I chose the index of the Event that is used for shutdown to be 0, and the index of the semaphore to be 1. This is encoded in the protected declaration

      enum {StopperIndex, SemaphoreIndex};

Note that an enum by definition starts assigning at index 0 unless otherwise specified.

While the semaphore gives us protection against trying to execute on an empty queue, and blocks the waiting thread, it does not provide for thread safety in manipulating the queue. In this case, I also have to provide a mutual-exclusion primitive to keep multiple threads from concurrently trying to enqueue or dequeue data. To do this, I create a CRITICAL_SECTION object, which must be initialized by ::InitializeCriticalSection.

The Event is a manual-reset event, initially non-signaled. When it comes time to shut the threads down (all the threads), all that is necessary is that I call ::SetEvent to set the Event to a signaled state. Note that the Event provides no synchronization protection; it provides only notification.


Destructor

Like any good destructor, this one deallocates resources allocated by the constructor:

   ~Queue()
     {
      ::CloseHandle(handles[SemaphoreIndex]);
      ::CloseHandle(handles[StopperIndex]);
      ::DeleteCriticalSection(&lock);
     } // ~Queue

AddTail

The only trick in adding an element is that a semaphore always has an upper bound; attempting to add an item above that will always fail. We deal with this by returning FALSE if the item cannot be enqueued.

Otherwise, we must make sure that no more than one thread at a time is manipulating the queue. We do this by doing an ::EnterCriticalSection to lock out other threads. Note that you must not do a return in the scope of a critical section; you must always do a ::LeaveCriticalSection and a return would bypass this. This would mean that the next thread that tried to access the critical section would block and nothing would make it possible to release this critical section.

What we do is add the item to the queue and then attempt to release the semaphore. If we are within the limits of the semaphore, the ::ReleaseSemaphore will return TRUE. However, if we overflowed the semaphore, we now have one more item in the queue than the semaphore could account for, so we have to roll back the transaction, which we do by doing a RemoveTail operation on the queue. Thus, when we finally return FALSE, there has been no net change in the queue.

This technique of doing a return outside the scope of a synchronization primitive is fundamental. I also find it easier to write code of this nature where the wait/release operations are balanced, that is, there is exactly one of each.

   BOOL AddTail(LPVOID p)
     { 
      BOOL result;
      ::EnterCriticalSection(&lock);
      queue.AddTail(p);
      result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
      if(!result)
        { /* failed */
         // caller can use ::GetLastError to determine what went wrong
         queue.RemoveTail();
        } /* failed */
      ::LeaveCriticalSection(&lock);
      return result;
     } // AddTail

RemoveHead

This function is perhaps overkill; I chose to actually shut down the thread rather than return FALSE, although you might choose to do otherwise. 

What makes this work is the ::WaitForMultipleObjects, which allows us to block waiting for either a queue item or for a shutdown event. An Event created in the constructor is used to signal the shutdown. As discussed above, the indices of the items were chosen to favor shutdown over processing, but should you switch the two events, this code still works since it uses the index values directly.

Note that even if I call ::ExitThread (and don't ever call ::TerminateThread in a situation like this!), the compiler can often complain that there is no return result. For sanity, I added a return NULL statement which keeps me from getting warnings from the compiler.

Note the only manipulation of the queue variable is done in the scope of a CRITICAL_SECTION so that no other thread can be trying to add to the queue, or remove from the queue, concurrently. Again, note how the value must be stored and then returned after the ::LeaveCriticalSection has released the lock.

I do not support timeout but added the case; it falls through to the default case and does an ASSERT(FALSE) to trap anything bad that might happen during development.

   LPVOID RemoveHead()
     { 
      LPVOID result;

      switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
         { /* decode */
          case StopperIndex:   // shut down thread
              AfxEndThread(0); // kill thread
              return NULL;     // return keeps C compiler happy

          case SemaphoreIndex: // semaphore
              ::EnterCriticalSection(&lock);
              result = queue.RemoveHead();
              ::LeaveCriticalSection(&lock);
              return result;

          case WAIT_TIMEOUT: // not implemented
          default:
              ASSERT(FALSE); // impossible condition
              return NULL;
         } /* decode */
      } // RemoveHead

shutdown

This is actually a trivial routine. All it does is a ::SetEvent on the "stopper" event, which releases the ::WaitForMultipleObjects with the result StopperIndex, which will terminate all the threads that get back to the wait.

This does not handle the case where there might be a long compute cycle you wish to abort. In that case, you should also set a Boolean variable as I have described in my essay on worker threads.

   void shutdown()
      {
       ::SetEvent(handles[StopperIndex]);
      } // shutdown


That's it! You now have a thread-safe queue that supports multiple threads.

To test this, I wrote a little test program that you get when you download the source (or you can simply copy-and-paste from this page, but you don't get the test program). To make this more realistic, I added in an artificial delay of 200-700ms, randomly selected, so as you push the button to queue up entries you actually have a chance of getting ahead of the dequeuing.

Here's the thread routine from the test program:

void CQueuetestDlg::run()
    {
     CString * s = new CString;
     s->Format(_T("Thread %08x started"), ::GetCurrentThreadId());
     PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)s);

     while(TRUE)
	{ /* thread loop */
	 LPVOID p = q.RemoveHead();
	 long n = InterlockedDecrement(&count);
	 showCount(n);
	 PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)p);
	 ::Sleep(200 + rand() % 500); // make simulated delay
	} /* thread loop */
    } // CQueuetestDlg::run

I post a message to the main GUI thread (this is executing in the context of the main CDialog, so PostMessage is posting to that object) telling that the thread has started. The list box into which this is placed is determined by the thread ID, which I obtain by ::GetCurrentThreadId()

Each time I add something to the queue, I call InterlockedIncrement to add one to the count of items in the queue, and on removal I call InterlockedDecement. This value is displayed by the showCount routine.

The main loop retrieves an item from the queue and posts it back to the main thread. The items in the queue are all CString * objects, and will be deleted by the recipient of the PostMessage, as described in my essay on worker threads.

I then introduce the random ::Sleep to slow this down to human interaction speeds. If the shutdown is called, the thread executing this code will terminate (see the RemoveHead code).

Here's an example of the program running:

Note that the items do not strictly alternate; otherwise we would see all of the even-numbered messages in the thread 1 window and all the odd-numbered messages in the thread 2 window.

What do those phrases mean? Well, I found this table of buzzwords and wrote a little buzzword generator. I carry it around from project to project when I need a way of generating sentences for various input contexts. Note that after you are done with this example you can use them in your business plan.

Comparison to other mechanisms

I have seen a number of really  bad implementations of synchronization that do not use semaphores. I will discuss them here and show why they are not sufficient.

Polling

I've seen a number of attempts to work with polling. Sometimes it is something of the form

while(counter <= 0) /* do nothing */ ;

Then someplace else the programmer does

counter++; 

and to remove something, the programmer does

counter--;

Now, the first thing to understand is that counter++, where counter is a variable accessible from multiple threads, does not, cannot, and most likely will not, work correctly in a multithreaded environment. Period. If you think it will work correctly, you're wrong. Added volatile as a declaration will also not help. All this does is tell the compiler that it must not cache a copy of the value in a register; it doesn't protect the value itself from being incorrectly used. It will particularly fail miserably if you run on a multiprocessor.

Aha, you say. You should have used ::InterlockedIncrement and ::InterlockedDecrement. You're right. These would guarantee that the value was incremented and decremented safely, and in fact would work correctly. But we're now left with the polling problem. A polling loop eats up a whole 200ms (more or less, on most NT/2000/XP boxes) timeslice doing absolutely nothing. I have a wonderful example I use in a class where there is a control panel with about 40 controls on it. The program goes into a polling loop. When the control panel pops up while the loop is running, you get to see every... single... control... being... redrawn... one... at... a... time... very... very... slowly... because that polling loop is eating up CPU cycles. In the next lab we show how to add interrupts and get rid of the polling.

So if you believe that you shouldn't use a Semaphore because it is "inefficient", I can guarantee that no matter what, using a Semaphore will always be more efficient than polling. In fact, the polling usually happens at the worst possible time in your program, the time when it should be working hard creating the next object to be queued up. Yes, you can poll. But do not ever believe that it is "more efficient". It almost certainly is not.

And, I should point out, you still need a CRITICAL_SECTION or Mutex to protect your data object, so you probably haven't gained all that much by using polling.

CRITICAL_SECTION blocking 

I've seen at least one implementation that did something of the following:

if(count == 0)
    ::EnterCriticalSection(&wait);

This will not work. For one thing, between the time the test is made (count == 0) and the critical section is entered, the count could change. Since a CRITICAL_SECTION is not a counted object, doing a release before a wait will not work, so there is a race condition that will end up blocking forever.

Event blocking

This has exactly the same problem as CRITICAL_SECTION blocking. While you could argue that it is possible to set an Event successfully before actually trying to block, because an Event is not counted, you have to do a synchronized update of the count variable before blocking. Because one thread could have decremented the count to zero and done a ::ResetEvent at about the same time another thread was incrementing the count and doing a ::SetEvent, you have no guarantee that the ::SetEvent will not happen a few hundred nanoseconds before the ::ResetEvent, thus getting out of order and blocking when the Event should be passable. Work it out. Remember that the processor can be preempted between any two instructions, even while executing in the kernel, if it is in a preemptible context (which it nearly always is: the mantra of Windows developers was "Always preemptible, always interruptible"). On a multiprocessor with true concurrency this is even more dangerous, but it will usually fail on a uniprocessor as well. It just takes a while--as in, it has been out in the field for months and it happens to your most important customer.

Semaphores are not mutual exclusion!

I've seen code of the form

::WaitForSingleObject(semaphore, INFINITE);
queue.RemoveHead(p)

where the argument is that "the semaphore protects the queue manipulation". No, semaphores do not protect the queue unless the maximum semaphore value is 1. In that case you have a special case of the semaphore, the binary semaphore, which is also known as a "mutex". Note that a true Mutex object (as in ::CreateMutex) actually has slightly different semantics than a semaphore; a thread is said to own a Mutex, and if a thread attempts to reacquire a Mutex it already owns, it is permitted to pass without blocking. You must execute as many ::ReleaseMutex operations as you did ::WaitFor operations on it. Semaphores do not work this way; the second time a thread would attempt to acquire a binary semaphore it already "owned" it would block indefinitely. Also, any thread can release a Semaphore but only the thread that owns a Mutex is permitted to release it.. A semaphore merely provides flow control for countable resources (such as the number of items in a queue), and does not protect other objects from concurrent execution. Therefore, the ::RemoveHead operation above must be protected by either a Mutex or a CRITICAL_SECTION.

There's no substitute for a Semaphore

If you think you have invented a clever, faster, more efficient, easier, or whatever way of doing a semaphore without actually using a Semaphore, the chances approach unity that you have simply fooled yourself. Read Dijkstra's earlier papers where he was developing the notion of synchronization primitives that were preemptive-threading safe, and there was no InterlockedIncrement operation to help him. These are complex papers; the techniques are subtle. Only if you fully understand the issues of synchronization should you even consider trying something like this. The rest of the time, particularly if you are new to parallelism and synchronization, take this as a rule: you haven't a clue as to how to create a semaphore effect without using semaphores. I've been doing this professionally for a quarter century and I don't feel confident trying to fake out a semaphore's functionality with some other mechanism. Trust me in this: You Don't Want To Go There.

Change history

3-Mar-2003: Created
19-Mar-2006: Wrote note about the proper termination of the thread.

 [Dividing Line Image]

The views expressed in these essays are those of the author, and in no way represent, nor are they endorsed by, Microsoft.

Send mail to newcomer@flounder.com with questions or comments about this web site.
Copyright © 1999-2006 FlounderCraft Ltd./The Joseph M. Newcomer Co.
Last modified: May 14, 2011