]> git.saurik.com Git - wxWidgets.git/blobdiff - src/msw/thread.cpp
typo fixed: should be lpthread, not -lpthread in THREAD_OPTS
[wxWidgets.git] / src / msw / thread.cpp
index a70d1389c6cdc35f20ff76f50d19ab2cdbeadf5b..e2c76f9389403c4964138f1eb591905d9e6a4837 100644 (file)
@@ -146,7 +146,7 @@ public:
         }
     }
 
-    ~wxMutexInternal() { if ( m_mutex ) CloseHandle(m_mutex); }
+    ~wxMutexInternal() { if ( m_mutex ) ::CloseHandle(m_mutex); }
 
 public:
     HANDLE m_mutex;
@@ -223,97 +223,272 @@ wxMutexError wxMutex::Unlock()
     return wxMUTEX_NO_ERROR;
 }
 
-// ----------------------------------------------------------------------------
-// wxCondition implementation
-// ----------------------------------------------------------------------------
+// ==========================================================================
+// wxSemaphore
+// ==========================================================================
 
-class wxConditionInternal
+// --------------------------------------------------------------------------
+// wxSemaphoreInternal
+// --------------------------------------------------------------------------
+
+class wxSemaphoreInternal
 {
 public:
-    wxConditionInternal()
+    wxSemaphoreInternal( int initialcount = 0, int maxcount = 0 );
+    ~wxSemaphoreInternal();
+
+    void Wait();
+    bool TryWait();
+
+    bool Wait( unsigned long timeout_millis );
+
+    void Post();
+
+private:
+    HANDLE m_semaphore;
+};
+
+wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount )
+{
+    if ( maxcount == 0 )
     {
-        m_hEvent = ::CreateEvent(
-                                 NULL,   // default secutiry
-                                 FALSE,  // not manual reset
-                                 FALSE,  // nonsignaled initially
-                                 NULL    // nameless event
-                                );
-        if ( !m_hEvent )
-        {
-            wxLogSysError(_("Can not create event object."));
-        }
+        // make it practically infinite
+        maxcount = INT_MAX;
+    }
 
-        // nobody waits for us yet
-        m_nWaiters = 0;
+    m_semaphore = ::CreateSemaphore( NULL, initialcount, maxcount, NULL );
+    if ( !m_semaphore )
+    {
+        wxLogLastError(_T("CreateSemaphore()"));
     }
+}
+
+wxSemaphoreInternal::~wxSemaphoreInternal()
+{
+    CloseHandle( m_semaphore );
+}
 
-    bool Wait(DWORD timeout)
+void wxSemaphoreInternal::Wait()
+{
+    if ( ::WaitForSingleObject( m_semaphore, INFINITE ) != WAIT_OBJECT_0 )
     {
-        // as m_nWaiters variable is accessed from multiple waiting threads
-        // (and possibly from the broadcasting thread), we need to change its
-        // value atomically
-        ::InterlockedIncrement(&m_nWaiters);
+        wxLogLastError(_T("WaitForSingleObject"));
+    }
+}
+
+bool wxSemaphoreInternal::TryWait()
+{
+    return Wait(0);
+}
+
+bool wxSemaphoreInternal::Wait( unsigned long timeout_millis )
+{
+    DWORD result = ::WaitForSingleObject( m_semaphore, timeout_millis );
 
-        // FIXME this should be MsgWaitForMultipleObjects() as we want to keep
-        //       processing Windows messages while waiting (or don't we?)
-        DWORD rc = ::WaitForSingleObject(m_hEvent, timeout);
+    switch ( result )
+    {
+        case WAIT_OBJECT_0:
+           return TRUE;
 
-        ::InterlockedDecrement(&m_nWaiters);
+        case WAIT_TIMEOUT:
+           break;
 
-        return rc != WAIT_TIMEOUT;
+        default:
+            wxLogLastError(_T("WaitForSingleObject()"));
     }
 
-    void Signal()
+    return FALSE;
+}
+
+void wxSemaphoreInternal::Post()
+{
+    if ( !::ReleaseSemaphore( m_semaphore, 1, NULL ) )
     {
-        // set the event to signaled: if a thread is already waiting on it, it
-        // will be woken up, otherwise the event will remain in the signaled
-        // state until someone waits on it. In any case, the system will return
-        // it to a non signalled state afterwards. If multiple threads are
-        // waiting, only one will be woken up.
-        if ( !::SetEvent(m_hEvent) )
-        {
-            wxLogLastError(wxT("SetEvent"));
-        }
+        wxLogLastError(_T("ReleaseSemaphore"));
     }
+}
+
+// --------------------------------------------------------------------------
+// wxSemaphore
+// --------------------------------------------------------------------------
+
+wxSemaphore::wxSemaphore( int initialcount, int maxcount )
+{
+    m_internal = new wxSemaphoreInternal( initialcount, maxcount );
+}
+
+wxSemaphore::~wxSemaphore()
+{
+    delete m_internal;
+}
+
+void wxSemaphore::Wait()
+{
+    m_internal->Wait();
+}
+
+bool wxSemaphore::TryWait()
+{
+    return m_internal->TryWait();
+}
+
+bool wxSemaphore::Wait( unsigned long timeout_millis )
+{
+    return m_internal->Wait( timeout_millis );
+}
+
+void wxSemaphore::Post()
+{
+    m_internal->Post();
+}
+
+
+// ==========================================================================
+// wxCondition
+// ==========================================================================
 
-    void Broadcast()
+// --------------------------------------------------------------------------
+// wxConditionInternal
+// --------------------------------------------------------------------------
+
+class wxConditionInternal
+{
+public:
+    wxConditionInternal(wxMutex& mutex);
+
+    void Wait();
+
+    bool Wait( unsigned long timeout_millis );
+
+    void Signal();
+
+    void Broadcast();
+
+private:
+    int m_numWaiters;
+    wxMutex m_mutexNumWaiters;
+
+    wxMutex& m_mutex;
+
+    wxSemaphore m_semaphore;
+
+    DECLARE_NO_COPY_CLASS(wxConditionInternal)
+};
+
+wxConditionInternal::wxConditionInternal(wxMutex& mutex)
+                   : m_mutex(mutex)
+{
+
+    m_numWaiters = 0;
+}
+
+void wxConditionInternal::Wait()
+{
+    // increment the number of waiters
+    m_mutexNumWaiters.Lock();
+    m_numWaiters++;
+    m_mutexNumWaiters.Unlock();
+
+    m_mutex.Unlock();
+
+    // a potential race condition can occur here
+    //
+    // after a thread increments nwaiters, and unlocks the mutex and before the
+    // semaphore.Wait() is called, if another thread can cause a signal to be
+    // generated
+    //
+    // this race condition is handled by using a semaphore and incrementing the
+    // semaphore only if 'nwaiters' is greater that zero since the semaphore,
+    // can 'remember' signals the race condition will not occur
+
+    // wait ( if necessary ) and decrement semaphore
+    m_semaphore.Wait();
+
+    m_mutex.Lock();
+}
+
+bool wxConditionInternal::Wait( unsigned long timeout_millis )
+{
+    m_mutexNumWaiters.Lock();
+    m_numWaiters++;
+    m_mutexNumWaiters.Unlock();
+
+    m_mutex.Unlock();
+
+    // a race condition can occur at this point in the code
+    //
+    // please see the comments in Wait(), for details
+
+    bool success = TRUE;
+
+    bool result = m_semaphore.Wait( timeout_millis );
+
+    if ( !result )
     {
-        // we need to save the original value as m_nWaiters is goign to be
-        // decreased by the signalled thread resulting in the loop being
-        // executed less times than needed
-        LONG nWaiters = m_nWaiters;
-
-        // this works because all these threads are already waiting and so each
-        // SetEvent() inside Signal() is really a PulseEvent() because the
-        // event state is immediately returned to non-signaled
-        for ( LONG n = 0; n < nWaiters; n++ )
+        // another potential race condition exists here it is caused when a
+        // 'waiting' thread timesout, and returns from WaitForSingleObject, but
+        // has not yet decremented 'nwaiters'.
+        //
+        // at this point if another thread calls signal() then the semaphore
+        // will be incremented, but the waiting thread will miss it.
+        //
+        // to handle this particular case, the waiting thread calls
+        // WaitForSingleObject again with a timeout of 0, after locking
+        // 'nwaiters_mutex'. this call does not block because of the zero
+        // timeout, but will allow the waiting thread to catch the missed
+        // signals.
+        m_mutexNumWaiters.Lock();
+        result = m_semaphore.Wait( 0 );
+
+        if ( !result )
         {
-            Signal();
+            m_numWaiters--;
+            success = FALSE;
         }
+
+        m_mutexNumWaiters.Unlock();
+    }
+
+    m_mutex.Lock();
+
+    return success;
+}
+
+void wxConditionInternal::Signal()
+{
+    m_mutexNumWaiters.Lock();
+
+    if ( m_numWaiters > 0 )
+    {
+        // increment the semaphore by 1
+        m_semaphore.Post();
+
+        m_numWaiters--;
     }
 
-    ~wxConditionInternal()
+    m_mutexNumWaiters.Unlock();
+}
+
+void wxConditionInternal::Broadcast()
+{
+    m_mutexNumWaiters.Lock();
+
+    while ( m_numWaiters > 0 )
     {
-        if ( m_hEvent )
-        {
-            if ( !::CloseHandle(m_hEvent) )
-            {
-                wxLogLastError(wxT("CloseHandle(event)"));
-            }
-        }
+        m_semaphore.Post();
+        m_numWaiters--;
     }
 
-private:
-    // the Win32 synchronization object corresponding to this event
-    HANDLE m_hEvent;
+    m_mutexNumWaiters.Unlock();
+}
 
-    // number of threads waiting for this condition
-    LONG m_nWaiters;
-};
+// ----------------------------------------------------------------------------
+// wxCondition implementation
+// ----------------------------------------------------------------------------
 
-wxCondition::wxCondition()
+wxCondition::wxCondition(wxMutex& mutex)
 {
-    m_internal = new wxConditionInternal;
+    m_internal = new wxConditionInternal( mutex );
 }
 
 wxCondition::~wxCondition()
@@ -323,13 +498,12 @@ wxCondition::~wxCondition()
 
 void wxCondition::Wait()
 {
-    (void)m_internal->Wait(INFINITE);
+    m_internal->Wait();
 }
 
-bool wxCondition::Wait(unsigned long sec,
-                       unsigned long nsec)
+bool wxCondition::Wait( unsigned long timeout_millis )
 {
-    return m_internal->Wait(sec*1000 + nsec/1000000);
+    return m_internal->Wait(timeout_millis);
 }
 
 void wxCondition::Signal()
@@ -350,9 +524,9 @@ wxCriticalSection::wxCriticalSection()
 {
 #ifdef __WXDEBUG__
     // Done this way to stop warnings during compilation about statement
-    // always being false
-        int csSize = sizeof(CRITICAL_SECTION);
-        int bSize  = sizeof(m_buffer);
+    // always being FALSE
+    int csSize = sizeof(CRITICAL_SECTION);
+    int bSize  = sizeof(m_buffer);
     wxASSERT_MSG( csSize <= bSize,
                   _T("must increase buffer size in wx/thread.h") );
 #endif
@@ -411,7 +585,7 @@ public:
     }
 
     // create a new (suspended) thread (for the given thread object)
-    bool Create(wxThread *thread);
+    bool Create(wxThread *thread, unsigned int stackSize);
 
     // suspend/resume/terminate
     bool Suspend();
@@ -440,7 +614,7 @@ private:
     DWORD         m_tid;        // thread id
 };
 
-THREAD_RETVAL wxThreadInternal::WinThreadStart(void *param)
+THREAD_RETVAL THREAD_CALLCONV wxThreadInternal::WinThreadStart(void *param)
 {
     THREAD_RETVAL rc;
     bool wasCancelled;
@@ -514,30 +688,34 @@ void wxThreadInternal::SetPriority(unsigned int priority)
     }
 }
 
-bool wxThreadInternal::Create(wxThread *thread)
+bool wxThreadInternal::Create(wxThread *thread, unsigned int stackSize)
 {
     // for compilers which have it, we should use C RTL function for thread
     // creation instead of Win32 API one because otherwise we will have memory
     // leaks if the thread uses C RTL (and most threads do)
 #ifdef wxUSE_BEGIN_THREAD
+
+    // Watcom is reported to not like 0 stack size (which means "use default"
+    // for the other compilers and is also the default value for stackSize)
+#ifdef __WATCOMC__
+    if ( !stackSize )
+        stackSize = 10240;
+#endif // __WATCOMC__
+
     m_hThread = (HANDLE)_beginthreadex
                         (
-                            NULL,   // default security
-#ifdef __WATCOMC__
-                            10240,  // stack size can't be NULL in Watcom
-#else
-                            0,      // default stack size
-#endif
-                            wxThreadInternal::WinThreadStart, // entry point
-                            thread,
-                            CREATE_SUSPENDED,
-                            (unsigned int *)&m_tid
+                          NULL,                             // default security
+                          stackSize,
+                          wxThreadInternal::WinThreadStart, // entry point
+                          thread,
+                          CREATE_SUSPENDED,
+                          (unsigned int *)&m_tid
                         );
 #else // compiler doesn't have _beginthreadex
     m_hThread = ::CreateThread
                   (
                     NULL,                               // default security
-                    0,                                  // default stack size
+                    stackSize,                          // stack size
                     wxThreadInternal::WinThreadStart,   // thread entry point
                     (LPVOID)thread,                     // parameter
                     CREATE_SUSPENDED,                   // flags
@@ -643,6 +821,11 @@ int wxThread::GetCPUCount()
     return si.dwNumberOfProcessors;
 }
 
+unsigned long wxThread::GetCurrentId()
+{
+    return (unsigned long)::GetCurrentThreadId();
+}
+
 bool wxThread::SetConcurrency(size_t level)
 {
     wxASSERT_MSG( IsMain(), _T("should only be called from the main thread") );
@@ -756,11 +939,11 @@ wxThread::~wxThread()
 // create/start thread
 // -------------------
 
-wxThreadError wxThread::Create()
+wxThreadError wxThread::Create(unsigned int stackSize)
 {
     wxCriticalSectionLocker lock(m_critsect);
 
-    if ( !m_internal->Create(this) )
+    if ( !m_internal->Create(this, stackSize) )
         return wxTHREAD_NO_RESOURCE;
 
     return wxTHREAD_NO_ERROR;
@@ -954,12 +1137,18 @@ wxThreadError wxThread::Delete(ExitCode *pRc)
         }
     }
 
-    if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) )
+    // although the thread might be already in the EXITED state it might not
+    // have terminated yet and so we are not sure that it has actually
+    // terminated if the "if" above hadn't been taken
+    do
     {
-        wxLogLastError(wxT("GetExitCodeThread"));
+        if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) )
+        {
+            wxLogLastError(wxT("GetExitCodeThread"));
 
-        rc = (ExitCode)-1;
-    }
+            rc = (ExitCode)-1;
+        }
+    } while ( (DWORD)rc == STILL_ACTIVE );
 
     if ( IsDetached() )
     {
@@ -970,9 +1159,6 @@ wxThreadError wxThread::Delete(ExitCode *pRc)
         delete this;
     }
 
-    wxASSERT_MSG( (DWORD)rc != STILL_ACTIVE,
-                  wxT("thread must be already terminated.") );
-
     if ( pRc )
         *pRc = rc;
 
@@ -1244,3 +1430,5 @@ bool WXDLLEXPORT wxIsWaitingForThread()
 }
 
 #endif // wxUSE_THREADS
+
+// vi:sts=4:sw=4:et