Threads and Pipes in console apps

Home
Back To Tips Page

Like many of my projects, this one started out due to a question in microsoft.public.vc.mfc.  Someone had written a program that was supposed to transform the output of a program and convert the stderr and stdout streams of a child process to different representations, and write them out to the stdout handler of the filter process.

Unfortunately, it was a really bad example of a program that would work for this purpose.  There is a problem with anonymous pipes; they can't be asynchronous.  So the problem was to grab data from stderr and stdout, but blocking on one pipe means that output that came via the other pipe would not be seen.  So the solution was to use ::PeekNamedPipe to see if there was any data to read.  If there was, then a ReadFile was issued to read it.  The problem with this was that it was in an infinite loop, and it was polling each time.  There was no way it could block waiting for input, so as a result it would run continuously, ultimately using close to 100% of the CPU time, accomplishing nothing with most of it.

So the problem was: how to create a program that would block, but would be able to receive data from stderr or stdout as the data became available.

The answer was obvious: threads.

The problem with threads is that far too many people suffer from klostiphobia, the morbid fear of threads, so never think to use threads when they are appropriate, or necessary.  In this case, they were necessary.

So I decided to write a program to demonstrate how threads could be used for this purpose.

Most of my essays on threading use GUI apps, so this shows how threading can be used in a console app.

The test program

I wrote a little test program, called tester, which simply generates a random number and, based on that random number, writes a line to either stdout (using an integer value and the imaginative contents "output") or stderr (using an integer value and the equally imaginative contents "error"). 

int _tmain(int argc, _TCHAR* argv[])
{
 for(int i = 0; i < 100; i++)
     { /* test */
      int r = rand();
      if(r & 0x100)
         { /* stderr */
          _ftprintf(stderr, _T("%4d: error\n"), i);
          fflush(stderr);
         } /* stderr */
      else
         { /* stdout */
          _ftprintf(stdout, _T("%4d: output\n"), i);
          fflush(stdout);
         } /* stdout */

      int w = rand() % 500;
      Sleep(200 + w);
     } /* test */
        return 0;
} 

The output from running this program by itself is shown below

Generating decorated console output

The goal of the poster was to enclose these in some HTML commands that would cause, with appropriate HTML definitions around them, to display the data in some suitable form.

I generalized this to support writing directly to a console window.  So when run under my piper program, the lines to stdout and stderr display differently, as shown below:

Generating HTML output

However, if I add the -html option to the command line, I get HTML commands wrapped around them, such as

To make this display work, I had to add (by hand) to this HTML file the following declarations in the <head>...</head> region:

<style type="text/css">
    p.stdout {margin-top:0; margin-bottom:0; color:green}
    p.stderr {margin-top:0; margin-bottom:0; font-weight:bold; color:red}
</style>

0: output

1: output

2: output

3: output

4: error

5: output

6: output

7: output

8: error

9: output

10: output

11: error

12: error

13: output

14: error

15: error

16: error

17: error

18: output

19: output

20: output

21: output

22: error

23: error

24: output

25: error

26: error

27: output

28: output

29: error

30: output

31: output

32: error

33: error

34: error

35: error

36: output

37: error

 

The program

The structure of the program was to create two threads: one thread to handle stdout and one thread to handle stdin.  The main thread would gather the data from the two worker threads and display it.  When both threads terminated, the main thread would then exit.

The question was what mechanisms to implement to do this.  I naturally fall back on my favorite interthread queuing mechanism, the I/O Completion Port.  I/O Completion Ports are cool, and I describe several techniques in my accompanying essay on them.  This is just another generalization of that mechanism, with some interesting twists and features to illustrate their generality.

I used VS.NET 2003 to generate this program; to generate it, I asked for a console application, with ATL support, so I could use the CString data type.  Note that this is not really possible in VS6.  In VS.NET, several important classes were moved out of MFC and moved into the ATL domain, making them usable in console apps and other non-MFC contexts.

_tmain(): Command Line Handling

The command line handling is fairly straightforward, but to give some elegance to the solution, I created a CommandLine class that held all the parameters

class CommandLine {
    public:
       CommandLine() { HTML = FALSE; IsUnicode = FALSE; program = NULL; }
       BOOL HTML;
       BOOL IsUnicode;
       LPTSTR program;
    }; // class CommandLine 

I wanted to be able to support Unicode pipes, so I added an option to treat the incoming data as Unicode data.  The HTML member tells whether to use console decoration or generate the HTML I illustrated above.

int _tmain(int argc, _TCHAR* argv[])
   {
    //****************************************************************
    // Argument processing
    //****************************************************************
    if(argc == 1)
       { /* usage */
        CString module;
        LPTSTR p = module.GetBuffer(MAX_PATH);
        ::GetModuleFileName(NULL, p, MAX_PATH);
        module.ReleaseBuffer();
        int n = module.ReverseFind(_T('\\'));
        if(n < 0)
           n = 0;
        module = module.Mid(n + 1);
        n = module.ReverseFind(_T('.'));
        if(n < 0)
           n = module.GetLength();
        module = module.Left(n);
        _ftprintf(stderr, _T("Usage:\n%s [-u] [-html] command\n"), module);
        return Result::INVALID_ARGUMENT;
       } /* usage */

The above code may look a little odd, but I learned years ago in writing console apps that users will rename them.  So if I hardwire the name of the executable into the program, the "usage" message will show the wrong program name.  This code merely extracts the file name and displays it properly as part of the usage message. 

To deal with the return values, since this is a console app, I wanted to have unique codes for many of the error returns, but I didn't want to have to worry about assigning values to them, so I created a class to represent them.

    CommandLine cmd;

    for(int i = 1; i < argc; i++)
       { /* scan args */
        CString arg = argv[i];
        if(arg[0] == _T('-'))
           { /* option */
            if(arg == _T("-u"))
               { /* unicode */
                cmd.IsUnicode = TRUE;
                continue;
               } /* unicode */
            if(arg == _T("-html"))
               { /* html */
                cmd.HTML = TRUE;
                continue;
               } /* html */
            _ftprintf(stderr, _T("Unrecognized option \"%s\"\n"), arg);
            return Result::INVALID_ARGUMENT;
           } /* option */

        if(cmd.program != NULL)
           { /* two files */
            _ftprintf(stderr, _T("Two command directives given:\n  [1] \"%s\"\n  [2]\"%s\"\n"),
                      cmd,
                      arg);
            return Result::TWO_COMMANDS;
           } /* two files */
        cmd.program = argv[i];
       } /* scan args */

    if(cmd.program == NULL)
       { /* no args */
        _ftprintf(stderr, _T("need program to run\n"));
        return Result::NO_PROGRAM;
       } /* no args */ 

class Result

The result types are defined by the class

/****************************************************************************
*                                class Result
****************************************************************************/

class Result {
    public:
       typedef enum { SUCCESS = 0,
                      NO_PROGRAM,
                      INVALID_ARGUMENT,
                      TWO_COMMANDS,
                      IOCP_FAILED,
                      IOCP_ERROR,
                      THREAD_FAILURE,
                      STDOUT_CREATION_FAILED,
                      STDERR_CREATION_FAILED,
                      CREATEPROCESS_FAILED
       } Type;
}; 

class SmartHandle

One of the problems in many resource allocation schemes, such as opening handles, is that you end up having to make sure everything is closed properly when you terminate a subroutine.  There are some smart classes available, but I wanted to illustrate how easy it is to write one.  So I wrote the class SmartHandle, which closes the handle when the variable goes out of scope.

/****************************************************************************
*                              class SmartHandle
****************************************************************************/

class SmartHandle {
    public:
       SmartHandle() { handle = NULL; }
       SmartHandle(HANDLE h) { handle = h; }
       virtual ~SmartHandle() { if(handle != NULL) ::CloseHandle(handle); }
    public:
       operator HANDLE() { return handle; }
       operator LPHANDLE() { return & handle; }
       bool operator==(HANDLE h) { return handle == h; }
       SmartHandle & operator=(HANDLE h) { handle = h; return *this; }
    public:
       void Close() { if(handle != NULL) ::CloseHandle(handle); handle = NULL; }
    protected:
       HANDLE handle;
}; 

There is one limitation on this class: you must not explicitly call ::CloseHandle explicitly.  The SmartHandle::Close method must be used or exceptions can be thrown.

_tmain(): Create I/O Completion Port

The following code creates the I/O Completion Port:

    //****************************************************************
    // Create the I/O Completion Port queue
    //****************************************************************
    SmartHandle iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

    if(iocp == NULL)
       { /* failed iocp */
        DWORD err = ::GetLastError();
        _ftprintf(stderr, _T("CreateIoCompletionPort failed, error %s\n"), ErrorString(err));
        return Result::IOCP_FAILED;
       } /* failed iocp */ 

This creates an I/O Completion Port that is not associated with any file handle.

ErrorString

The ErrorString function is very simple, and is a stripped-down version of my more general ErrorString function. suitable for this simple program.

/****************************************************************************
*                                 ErrorString
* Inputs:
*       UINT err: Error code
* Result: CString
*       Result
****************************************************************************/

CString ErrorString(UINT err)
   {
    LPTSTR msg;
    FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
                  NULL,
                  err,
                  0,
                  (LPTSTR) &msg,
                  0,
                  NULL);
    return msg;
   } 

_tmain(): Create pipes

    //****************************************************************
    // Create the pipes to route to the child process
    //****************************************************************
    SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), NULL, TRUE};
    SmartHandle stdout_read;
    SmartHandle stdout_write;
    SmartHandle stderr_read;
    SmartHandle stderr_write;


    static const UINT PIPE_BUFFER_SIZE = 32;

    if(!::CreatePipe((LPHANDLE)stdout_read, (LPHANDLE)stdout_write, &sa, PIPE_BUFFER_SIZE))
       { /* failed stdout */
        DWORD err = ::GetLastError();
        _tprintf(_T("stdout pipe failure: %s\n"), ErrorString(err));
        return Result::STDOUT_CREATION_FAILED;
       } /* failed stdout */

    if(!::CreatePipe((LPHANDLE)stderr_read, (LPHANDLE)stderr_write, &sa, PIPE_BUFFER_SIZE))
       { /* failed stderr */
        DWORD err = ::GetLastError();
        _tprintf(_T("stderr pipe failure: %s\n"), ErrorString(err));
        return Result::STDERR_CREATION_FAILED;
       } /* failed stderr */ 

This creates four handles, representing the read-side and write-side of the stdout and stderr pipes.

_tmain(): Create process

    //****************************************************************
    // Create the child process
    //****************************************************************
    STARTUPINFO startup = {sizeof(STARTUPINFO)};
    startup.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW;
    startup.wShowWindow = SW_HIDE;
    startup.hStdOutput = stdout_write;
    startup.hStdError = stderr_write;

    PROCESS_INFORMATION procinfo;

    if(!::CreateProcess(NULL, cmd.program, NULL, NULL, TRUE, CREATE_NEW_CONSOLE, NULL, NULL, &startup, &procinfo))
       { /* failed */
        DWORD err = ::GetLastError();
        _tprintf(_T("CreateProcess failed for \"%s\": %s"), cmd.program, ErrorString(err));
        return Result::CREATEPROCESS_FAILED;
       } /* failed */

    ::CloseHandle(procinfo.hProcess);   // handle will never be needed
    ::CloseHandle(procinfo.hThread);    // handle will never be needed

    stdout_write.Close();               // Close our end of the pipe
    stderr_write.Close();               // Close our end of the pipe

 

After successful completion, there will be no further need of the process and thread handles returned by CreateProcess, so they are closed.  Since these handles are not in a SmartHandle structure, they are closed explicitly with ::CloseHandle.  The handles this process has to the write side of the stdout and stderr handles are closed.

_tmain(): Thread creation

To create the thread, I will use _beginthreadex.  But there's a problem I have to solve first.  By default, a console app assumes that it is not going to be multithreaded, so it is configured with the "single threaded" C runtime library.  To create threads, I have to reconfigure the build to use the "multithreaded" C runtime library.  Select the project in the Solution tab, right click on it, and ask for Properties.  The screen below will be displayed.

Select the configuration you want (in this case, the Debug configuration), select the C/C++ property, select Code Generation, go to the Runtime Library option, and drop it down.  Select the appropriate Multi-threaded library.  For the Release configuration, this would be the Multi-threaded runtime library; for the Debug configuration, it would be the Multi-threaded Debug library.  Alternatively, you may choose to use the DLL versions of the C runtime library.

    //****************************************************************
    // Create the threads to handle the pipes
    //****************************************************************
    unsigned id;

    //----------------
    // stdout
    //----------------
    SmartHandle stdoutThread = (HANDLE)_beginthreadex(NULL, 0, reader, new ThreadParms(stdout_read, SourceFlags::StdOut, iocp, cmd.IsUnicode), 0, &id);
    if(stdoutThread == NULL)
       { /* thread create failed */
        DWORD err = ::GetLastError();
        _ftprintf(stderr, _T("Thread creation for stdout failed, error %s\n"), ErrorString(err));
        return Result::THREAD_FAILURE;
       } /* thread create failed */        

    stdoutThread.Close(); // handle will never be used

    //----------------
    // stderr
    //----------------
    SmartHandle stderrThread = (HANDLE)_beginthreadex(NULL, 0, reader, new ThreadParms(stderr_read, SourceFlags::StdErr, iocp, cmd.IsUnicode), 0, &id);
    if(stderrThread == NULL)
       { /* thread create failed */
        DWORD err = ::GetLastError();
        _ftprintf(stderr, _T("Thread creation for stderr failed, error %s\n"), ErrorString(err));
        return Result::THREAD_FAILURE;
       } /* thread create failed */        

    stderrThread.Close(); // handle will never be used 

To pass parameters to the thread, I use the ThreadParms class.  It packages up the HANDLE of the stream to read, a flag that we will use to distinguish which stream is notifying us about an event, the handle the the I/O Completion Port, and the Boolean flag to indicate if the child pipe is Unicode or not.  If one of these fails, it will return, and note that all currently-open handles are all SmartHandle objects, so the handles will be implicitly closed by the destructor SmartHandle::~SmartHandle, so we don't need to keep track of the handles on our own.

class ThreadParms

/****************************************************************************
*                              class ThreadParms
****************************************************************************/

class ThreadParms {
    public:
       ThreadParms(HANDLE h, SourceFlags::FlagType f, HANDLE io, BOOL uni) {
          stream = h;
          flags = f;
          iocp = io;
          IsUnicode = uni;
       }
    public:
       HANDLE stream;
       SourceFlags::FlagType flags;
       HANDLE iocp;
       BOOL IsUnicode;
}; 

The I/O Completion Port protocol

I am going to use the I/O Completion Port for interthread communication from the worker threads to the main thread.  To do this, I will use ::PostQueuedCompletionStatus which allows me to pass three parameters: a DWORD, a ULONG_PTR, and a pointer, which is nominally an LPOVERLAPPED pointer, but in fact can be any pointer of our choosing for ::PostQueuedCompletionStatus.

I could have chosen a variety of techniques to handle this, and I chose the following encoding.  There is no reason to use this design in preference to other designs, such as packaging everything up in an object pointed to by the pointer parameter.  Any combination not shown below would be an error.

DWORD NumberOfBytesTransferred ULONG_PTR CompletionKey LPOVERLAPPED Overlapped meaning
SourceFlags::StdOut 0 (LPOVERLAPPED)(CString *) stdout line to display
SourceFlags::StdErr 0 (LPOVERALLPED)(CString *) stderr line to display
0 SourceFlags::StdOut NULL stdout has terminated
0 SourceFlags::StdErr NULL stderr has terminated

class SourceFlags

The flags are specified in the SourceFlags class

/****************************************************************************
*                              class SourceFlags
****************************************************************************/

class SourceFlags {
    public:
       typedef enum { None = 0, StdOut=1, StdErr=2 } FlagType;
    }; // class SourceFlags 

_tmain(): Receive thread messages

    //****************************************************************
    // Run the loop until both stdout and stderr are broken
    //****************************************************************

    SourceFlags::FlagType broken = SourceFlags::None;

    Result::Type result = Result::SUCCESS;
    
    while(broken != (SourceFlags::StdOut | SourceFlags::StdErr))
       { /* watch pipes */
        OVERLAPPED * ovl;
        DWORD bytesRead;
        ULONG_PTR key;

        //----------------------------------------------------------------
        // bytesRead: the item flag
        //          SourceFlags::StdOut for stdout data
        //          SourceFlags::StdErr for stderr data
        // key: the termination flag
        //          SourceFlags::StdOut when stdout breaks
        //          SourceFlags::StdErr when stderr breaks
        // Note: the <bytesRead, key> pair will either be of the form
        //              <flag_*, 0>   for data notification
        //              <0, flag_*>   for termination notification
        // ovl: (LPOVERLAPPED)(CString *)
        //----------------------------------------------------------------
        BOOL ok = ::GetQueuedCompletionStatus(iocp, &bytesRead, &key, &ovl, INFINITE);

        if(!ok)
           { /* failed */
            DWORD err = ::GetLastError();
            result = Result::IOCP_ERROR;
            _ftprintf(stderr, _T("GetQueuedCompletionStatus failed, error %s\n"), ErrorString(err));
            break;
           } /* failed */

       broken = (SourceFlags::FlagType)(broken | (int)key);
        if(key != 0)
           continue;  // termination notifications contain no data

        CString * s = (CString *)ovl;

        WriteToOutput(*s, (SourceFlags::FlagType)bytesRead, cmd);

        delete s;
       } /* watch pipes */

The trick here is that as each thread finishes, it sends a termination notification.  When both threads have sent their termination notification, the main thread will exit the loop, and then the program will terminate.-

_tmain(): Cleanup

    //****************************************************************
    // Cleanup
    //****************************************************************
    stdout_read.Close();
    stderr_read.Close();

    return result;
   }  // _tmain 

WriteToOutput

This method takes a pointer to the string, the flags that indicate the source, and a pointer to the command line options structure (which is used to determine the format of the output, for example). It's responsibility is to "wrap" the string it is given in whatever context is required to make it "display" correctly.  If the output is direct to a console, I do this by setting the text attributes of the console buffer to display in the correct colors; if it is going to HTML (using the -html flag in the command line) I put the right kind of environment around it.  There isn't anything really deep going on here.

/****************************************************************************
*                                WriteToOutput
* Inputs:
*       const CString & s: String to write
*       SourceFlags::FlagType flag: Flag to indicate source, StdIn or StdOut
*       CommandLine & cmd: Command line options
* Result: void
*       
* Effect: 
*       Writes the string to the output stream
* Notes:
*       
****************************************************************************/

void WriteToOutput(const CString & s, SourceFlags::FlagType flag, CommandLine & cmd)
    {
     if(cmd.HTML)
        { /* HTML */
         CString classname;
         switch(flag)
            { /* decode */
             case SourceFlags::StdOut:
                classname = _T("stdout");
                break;
             case SourceFlags::StdErr:
                classname = _T("stderr");
                break;
            } /* decode */
         _ftprintf(stdout, _T("<p.%s>%s</p>\n"), classname, ToHTML(s));
        } /* HTML */
     else
        { /* console */
         HANDLE console = ::GetStdHandle(STD_OUTPUT_HANDLE);

         CONSOLE_SCREEN_BUFFER_INFO info;
         ::GetConsoleScreenBufferInfo(console, &info);

         switch(flag)
            { /* decode */
             case SourceFlags::StdOut:
                ::SetConsoleTextAttribute(console, FOREGROUND_INTENSITY | FOREGROUND_GREEN);
                break;
             case SourceFlags::StdErr:
                ::SetConsoleTextAttribute(console, FOREGROUND_INTENSITY | FOREGROUND_RED);
                break;
            } /* decode */
         _fputts(s, stdout);
         _fputts(_T("\r\n"), stdout);
         ::SetConsoleTextAttribute(console, info.wAttributes);
        } /* console */
    } // WriteToOutput

ToHTML: HTML conversion

Because the output might contain characters such as '<', '>' or '&', which have meaning in HTML as metacharacters for formatting, it is necessary to convert such characters so they are not going to cause a conflict with the HTML rendering engine.  The translations are

Character Translation
& &amp;
< &lt;
> &gt;
/****************************************************************************
*                                   ToHTML
* Inputs:
*       const CString & s:
* Result: CString
*       Modified string with <, > and & replaced with HTML escapes
****************************************************************************/

CString ToHTML(const CString & s)
    {
     CString t = s;
     t.Replace(_T("&"), _T("&")); // this must be the first one
     t.Replace(_T("<"), _T("<"));
     t.Replace(_T(">"), _T(">"));
     return t;
    } // ToHTML
 

reader: Top-level thread function

This is the top-level thread function.  As such, it is coded very simply: while there is something in the pipe, send a notification to the main thread for each line in the data.  Then flush any pending partial line, and finally, send a notification that the thread has terminated.

/****************************************************************************
*                                   reader
* Inputs:
*       LPVOID p: (LPVOID)(ThreadParms *) Thread parameters
* Result: UINT
*       irrelevant, 0, always
* Effect: 
*       Parses the data from the stream and emits it as a sequence of lines
****************************************************************************/

UINT __stdcall reader(LPVOID p)
    {
     ThreadParms * parms = (ThreadParms *)p;

     PipeReader pipe(parms->stream, parms->IsUnicode);
     
     CString Prefix;

     while(TRUE)
        { /* processing loop */      
         if(!pipe.Read())
            { /* failed stream */
             break;
            } /* failed stream */

         FormatAndOutput(pipe.GetString(), Prefix, parms);
        } /* processing loop */

     if(!Prefix.IsEmpty())
        { /* write out last line */
         CString text(_T("\r\n"));
         FormatAndOutput(text, Prefix, parms);
        } /* write out last line */

     ::PostQueuedCompletionStatus(parms->iocp, 0, parms->flags, NULL);
     return 0;
    } // reader

FormatAndOutput: line splitter

This function simply splits up the packet that comes back (which can contain several lines), and sends each complete line to the main thread for subsequent formatting and display.  If there is any partial line left over, that is placed in the Prefix variable and will be concatenated to the front of the next line that comes in on the next iteration.  Because this means that if the last few characters of the pipe stream will be held in the Prefix because they do not end with a line terminator sequence, it is necessary to "flush" these by calling it one last time with a newline sequence and the remaining characters of the partial line, but only if there is a remaining partial line.  That is the code shown in the reader thread.

/****************************************************************************
*                               FormatAndOutput
* Inputs:
*       CString text: data to show
*       CString & prefix: Prefix for data--leftover partial line from last call
*       ThreadParms * parms: Parameters for the thread
* Result: void
*       
* Effect: 
*       Parses the data into lines.  Retains any partial line (not terminated
*       by a newline sequence) in the Prefix for the next call
* Notes:
*       To force the last partial line out, it must be called with a newline
*       string as text.
*
*       The text is allocated in this thread, on the heap, and must be
*       disposed of by the recipient.
****************************************************************************/

void FormatAndOutput(CString text, CString & prefix, ThreadParms * parms)
    {
     text = prefix + text;

     while(TRUE)
        { /* break into lines */
         int n = text.Find(_T("\r\n"));
         if(n < 0)
            { /* done */
             prefix = text;
             return;
            } /* done */
         CString * s = new CString(text.Left(n));
         
         ::PostQueuedCompletionStatus(parms->iocp, (DWORD)parms->flags, 0, (LPOVERLAPPED)s);
         text = text.Mid(n+2);
        } /* break into lines */
    } // FormatAndOutput

The important feature here is that the string which is passed across the thread boundary is allocated from the heap, and disposed of by the recipient.

class PipeReader

This is a class I adapted from another application I did.  What it does is read from the pipe and handle all the details of what happens if an odd number of bytes is read from a pipe which is expected to be sending Unicode data.  Otherwise, there's nothing really deep going on.

/****************************************************************************
*                              class PipeReader
****************************************************************************/

class PipeReader {
    protected:
       static const UINT MAX_BUFFER = 1024;

    public:
       PipeReader(HANDLE str, BOOL uni) { Init(); stream = str; IsUnicode = uni; }
       //****************************************************************
       //                       PipeReader::GetString
       // Result: CString
       //       The most recent string read by the Read method
       // Notes:
       //       This must be done before the next call on Read or the
       //       buffer will be overwritten
       //       Unicode reception is fully supported only in Unicode builds
       //****************************************************************
       CString GetString() { 
           if(IsUnicode) 
               return CString((LPCWSTR)buffer); 
           else 
               return CString((LPCSTR)buffer); }
       //****************************************************************
       //                         PipeReader::Read
       // Result: BOOL
       //       TRUE if there is data in the buffer
       //       FALSE if the pipe has broken
       // Effect:
       //       Reads data from the pipe, and makes it available for
       //       the next GetString call
       // Notes:
       //       If the pipe is being treated as Unicode and an odd number
       //       of bytes has been read, retain the last byte and prepend it
       //       to the next ReadFile buffer
       //****************************************************************
       BOOL Read() {
          if(Offset == 1)
             buffer[0] = reread;

          if(!ReadFile(stream, &buffer[Offset], MAX_BUFFER - (IsUnicode ? sizeof(WCHAR) : sizeof(char)), &bytesRead, NULL))
             return FALSE;

          if(IsUnicode)
             { /* unicode pipe */
              if((Offset + bytesRead) & 1)
                 { /* odd bytes read */
                  Offset = 1; // offset for next read
                  reread = buffer[Offset + bytesRead - 1]; // force reread
                  buffer[Offset + bytesRead - 1] = 0; // remove from current buffer
                  bytesRead--;   // pretend we didn't see it
                 } /* odd bytes read */
              else
                 { /* even bytes read */
                  Offset = 0; // offset for next read
                 } /* even bytes read */

              buffer[Offset + bytesRead] = 0;
              buffer[Offset + bytesRead + 1] = 0; // create Unicode NUL
             } /* unicode pipe */
          else
             { /* ANSI pipe */
              buffer[bytesRead] = '\0';
             } /* ANSI pipe */
         return TRUE;
       } // PipeReader::Read

    protected:
       void Init() { stream = NULL; Offset = 0; IsUnicode = FALSE; }
       BOOL IsUnicode;
       HANDLE stream;
    protected:
       BYTE buffer[MAX_BUFFER];
       DWORD Offset;
       BYTE reread;
       DWORD bytesRead;
}; // class PipeReader

Summary: Use Threads

Whenever you get into a situation where you end up polling, because you can't block the thread that is polling, consider instead using secondary threads to perform the computations.  If there is nothing to do, this application consumes zero CPU time.

As far as programming style, not that there is not a single global variable in any of this code.  None are needed, none are used.  There is no explicit synchronization; all synchronization is implicit in the use of ::GetQueuedCompletionStatus and the corresponding ::PostQueuedCompletionStatus calls.  The best synchronization is no synchronization.  This code is a combination of the "positive handoff" model (wherein responsibility for an object is handed off from one thread to another) and the "central manager" model (the output stream, whether console or HTML stream, is managed by a single thread).

download.gif (1234 bytes)

[Dividing Line Image]

The views expressed in these essays are those of the author, and in no way represent, nor are they endorsed by, Microsoft.

Send mail to newcomer@flounder.com with questions or comments about this web site.
Copyright © 2007 FlounderCraft Ltd., All Rights Reserved
Last modified: May 14, 2011