]> git.saurik.com Git - wxWidgets.git/blobdiff - src/msw/thread.cpp
Committing in .
[wxWidgets.git] / src / msw / thread.cpp
index b654353bfce9748f5248201b11732b31b2737a8e..e2c76f9389403c4964138f1eb591905d9e6a4837 100644 (file)
@@ -146,7 +146,7 @@ public:
         }
     }
 
-    ~wxMutexInternal() { if ( m_mutex ) CloseHandle(m_mutex); }
+    ~wxMutexInternal() { if ( m_mutex ) ::CloseHandle(m_mutex); }
 
 public:
     HANDLE m_mutex;
@@ -223,97 +223,272 @@ wxMutexError wxMutex::Unlock()
     return wxMUTEX_NO_ERROR;
 }
 
-// ----------------------------------------------------------------------------
-// wxCondition implementation
-// ----------------------------------------------------------------------------
+// ==========================================================================
+// wxSemaphore
+// ==========================================================================
 
-class wxConditionInternal
+// --------------------------------------------------------------------------
+// wxSemaphoreInternal
+// --------------------------------------------------------------------------
+
+class wxSemaphoreInternal
 {
 public:
-    wxConditionInternal()
+    wxSemaphoreInternal( int initialcount = 0, int maxcount = 0 );
+    ~wxSemaphoreInternal();
+
+    void Wait();
+    bool TryWait();
+
+    bool Wait( unsigned long timeout_millis );
+
+    void Post();
+
+private:
+    HANDLE m_semaphore;
+};
+
+wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount )
+{
+    if ( maxcount == 0 )
     {
-        m_hEvent = ::CreateEvent(
-                                 NULL,   // default secutiry
-                                 FALSE,  // not manual reset
-                                 FALSE,  // nonsignaled initially
-                                 NULL    // nameless event
-                                );
-        if ( !m_hEvent )
-        {
-            wxLogSysError(_("Can not create event object."));
-        }
+        // make it practically infinite
+        maxcount = INT_MAX;
+    }
 
-        // nobody waits for us yet
-        m_nWaiters = 0;
+    m_semaphore = ::CreateSemaphore( NULL, initialcount, maxcount, NULL );
+    if ( !m_semaphore )
+    {
+        wxLogLastError(_T("CreateSemaphore()"));
     }
+}
+
+wxSemaphoreInternal::~wxSemaphoreInternal()
+{
+    CloseHandle( m_semaphore );
+}
 
-    bool Wait(DWORD timeout)
+void wxSemaphoreInternal::Wait()
+{
+    if ( ::WaitForSingleObject( m_semaphore, INFINITE ) != WAIT_OBJECT_0 )
     {
-        // as m_nWaiters variable is accessed from multiple waiting threads
-        // (and possibly from the broadcasting thread), we need to change its
-        // value atomically
-        ::InterlockedIncrement(&m_nWaiters);
+        wxLogLastError(_T("WaitForSingleObject"));
+    }
+}
+
+bool wxSemaphoreInternal::TryWait()
+{
+    return Wait(0);
+}
 
-        // FIXME this should be MsgWaitForMultipleObjects() as we want to keep
-        //       processing Windows messages while waiting (or don't we?)
-        DWORD rc = ::WaitForSingleObject(m_hEvent, timeout);
+bool wxSemaphoreInternal::Wait( unsigned long timeout_millis )
+{
+    DWORD result = ::WaitForSingleObject( m_semaphore, timeout_millis );
+
+    switch ( result )
+    {
+        case WAIT_OBJECT_0:
+           return TRUE;
 
-        ::InterlockedDecrement(&m_nWaiters);
+        case WAIT_TIMEOUT:
+           break;
 
-        return rc != WAIT_TIMEOUT;
+        default:
+            wxLogLastError(_T("WaitForSingleObject()"));
     }
 
-    void Signal()
+    return FALSE;
+}
+
+void wxSemaphoreInternal::Post()
+{
+    if ( !::ReleaseSemaphore( m_semaphore, 1, NULL ) )
     {
-        // set the event to signaled: if a thread is already waiting on it, it
-        // will be woken up, otherwise the event will remain in the signaled
-        // state until someone waits on it. In any case, the system will return
-        // it to a non signalled state afterwards. If multiple threads are
-        // waiting, only one will be woken up.
-        if ( !::SetEvent(m_hEvent) )
-        {
-            wxLogLastError(wxT("SetEvent"));
-        }
+        wxLogLastError(_T("ReleaseSemaphore"));
     }
+}
+
+// --------------------------------------------------------------------------
+// wxSemaphore
+// --------------------------------------------------------------------------
+
+wxSemaphore::wxSemaphore( int initialcount, int maxcount )
+{
+    m_internal = new wxSemaphoreInternal( initialcount, maxcount );
+}
+
+wxSemaphore::~wxSemaphore()
+{
+    delete m_internal;
+}
+
+void wxSemaphore::Wait()
+{
+    m_internal->Wait();
+}
+
+bool wxSemaphore::TryWait()
+{
+    return m_internal->TryWait();
+}
+
+bool wxSemaphore::Wait( unsigned long timeout_millis )
+{
+    return m_internal->Wait( timeout_millis );
+}
+
+void wxSemaphore::Post()
+{
+    m_internal->Post();
+}
+
 
-    void Broadcast()
+// ==========================================================================
+// wxCondition
+// ==========================================================================
+
+// --------------------------------------------------------------------------
+// wxConditionInternal
+// --------------------------------------------------------------------------
+
+class wxConditionInternal
+{
+public:
+    wxConditionInternal(wxMutex& mutex);
+
+    void Wait();
+
+    bool Wait( unsigned long timeout_millis );
+
+    void Signal();
+
+    void Broadcast();
+
+private:
+    int m_numWaiters;
+    wxMutex m_mutexNumWaiters;
+
+    wxMutex& m_mutex;
+
+    wxSemaphore m_semaphore;
+
+    DECLARE_NO_COPY_CLASS(wxConditionInternal)
+};
+
+wxConditionInternal::wxConditionInternal(wxMutex& mutex)
+                   : m_mutex(mutex)
+{
+
+    m_numWaiters = 0;
+}
+
+void wxConditionInternal::Wait()
+{
+    // increment the number of waiters
+    m_mutexNumWaiters.Lock();
+    m_numWaiters++;
+    m_mutexNumWaiters.Unlock();
+
+    m_mutex.Unlock();
+
+    // a potential race condition can occur here
+    //
+    // after a thread increments nwaiters, and unlocks the mutex and before the
+    // semaphore.Wait() is called, if another thread can cause a signal to be
+    // generated
+    //
+    // this race condition is handled by using a semaphore and incrementing the
+    // semaphore only if 'nwaiters' is greater that zero since the semaphore,
+    // can 'remember' signals the race condition will not occur
+
+    // wait ( if necessary ) and decrement semaphore
+    m_semaphore.Wait();
+
+    m_mutex.Lock();
+}
+
+bool wxConditionInternal::Wait( unsigned long timeout_millis )
+{
+    m_mutexNumWaiters.Lock();
+    m_numWaiters++;
+    m_mutexNumWaiters.Unlock();
+
+    m_mutex.Unlock();
+
+    // a race condition can occur at this point in the code
+    //
+    // please see the comments in Wait(), for details
+
+    bool success = TRUE;
+
+    bool result = m_semaphore.Wait( timeout_millis );
+
+    if ( !result )
     {
-        // we need to save the original value as m_nWaiters is goign to be
-        // decreased by the signalled thread resulting in the loop being
-        // executed less times than needed
-        LONG nWaiters = m_nWaiters;
-
-        // this works because all these threads are already waiting and so each
-        // SetEvent() inside Signal() is really a PulseEvent() because the
-        // event state is immediately returned to non-signaled
-        for ( LONG n = 0; n < nWaiters; n++ )
+        // another potential race condition exists here it is caused when a
+        // 'waiting' thread timesout, and returns from WaitForSingleObject, but
+        // has not yet decremented 'nwaiters'.
+        //
+        // at this point if another thread calls signal() then the semaphore
+        // will be incremented, but the waiting thread will miss it.
+        //
+        // to handle this particular case, the waiting thread calls
+        // WaitForSingleObject again with a timeout of 0, after locking
+        // 'nwaiters_mutex'. this call does not block because of the zero
+        // timeout, but will allow the waiting thread to catch the missed
+        // signals.
+        m_mutexNumWaiters.Lock();
+        result = m_semaphore.Wait( 0 );
+
+        if ( !result )
         {
-            Signal();
+            m_numWaiters--;
+            success = FALSE;
         }
+
+        m_mutexNumWaiters.Unlock();
     }
 
-    ~wxConditionInternal()
+    m_mutex.Lock();
+
+    return success;
+}
+
+void wxConditionInternal::Signal()
+{
+    m_mutexNumWaiters.Lock();
+
+    if ( m_numWaiters > 0 )
     {
-        if ( m_hEvent )
-        {
-            if ( !::CloseHandle(m_hEvent) )
-            {
-                wxLogLastError(wxT("CloseHandle(event)"));
-            }
-        }
+        // increment the semaphore by 1
+        m_semaphore.Post();
+
+        m_numWaiters--;
     }
 
-private:
-    // the Win32 synchronization object corresponding to this event
-    HANDLE m_hEvent;
+    m_mutexNumWaiters.Unlock();
+}
 
-    // number of threads waiting for this condition
-    LONG m_nWaiters;
-};
+void wxConditionInternal::Broadcast()
+{
+    m_mutexNumWaiters.Lock();
+
+    while ( m_numWaiters > 0 )
+    {
+        m_semaphore.Post();
+        m_numWaiters--;
+    }
+
+    m_mutexNumWaiters.Unlock();
+}
+
+// ----------------------------------------------------------------------------
+// wxCondition implementation
+// ----------------------------------------------------------------------------
 
-wxCondition::wxCondition()
+wxCondition::wxCondition(wxMutex& mutex)
 {
-    m_internal = new wxConditionInternal;
+    m_internal = new wxConditionInternal( mutex );
 }
 
 wxCondition::~wxCondition()
@@ -323,13 +498,12 @@ wxCondition::~wxCondition()
 
 void wxCondition::Wait()
 {
-    (void)m_internal->Wait(INFINITE);
+    m_internal->Wait();
 }
 
-bool wxCondition::Wait(unsigned long sec,
-                       unsigned long nsec)
+bool wxCondition::Wait( unsigned long timeout_millis )
 {
-    return m_internal->Wait(sec*1000 + nsec/1000000);
+    return m_internal->Wait(timeout_millis);
 }
 
 void wxCondition::Signal()
@@ -350,7 +524,7 @@ wxCriticalSection::wxCriticalSection()
 {
 #ifdef __WXDEBUG__
     // Done this way to stop warnings during compilation about statement
-    // always being false
+    // always being FALSE
     int csSize = sizeof(CRITICAL_SECTION);
     int bSize  = sizeof(m_buffer);
     wxASSERT_MSG( csSize <= bSize,
@@ -963,12 +1137,18 @@ wxThreadError wxThread::Delete(ExitCode *pRc)
         }
     }
 
-    if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) )
+    // although the thread might be already in the EXITED state it might not
+    // have terminated yet and so we are not sure that it has actually
+    // terminated if the "if" above hadn't been taken
+    do
     {
-        wxLogLastError(wxT("GetExitCodeThread"));
+        if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) )
+        {
+            wxLogLastError(wxT("GetExitCodeThread"));
 
-        rc = (ExitCode)-1;
-    }
+            rc = (ExitCode)-1;
+        }
+    } while ( (DWORD)rc == STILL_ACTIVE );
 
     if ( IsDetached() )
     {
@@ -979,9 +1159,6 @@ wxThreadError wxThread::Delete(ExitCode *pRc)
         delete this;
     }
 
-    wxASSERT_MSG( (DWORD)rc != STILL_ACTIVE,
-                  wxT("thread must be already terminated.") );
-
     if ( pRc )
         *pRc = rc;