X-Git-Url: https://git.saurik.com/wxWidgets.git/blobdiff_plain/4958ea8f7b27a4213e092234a13d151b7b08fbea..e3dbf5934e997729c6f4f5154b1b7f29d5d22149:/src/msw/thread.cpp diff --git a/src/msw/thread.cpp b/src/msw/thread.cpp index b654353bfc..e2c76f9389 100644 --- a/src/msw/thread.cpp +++ b/src/msw/thread.cpp @@ -146,7 +146,7 @@ public: } } - ~wxMutexInternal() { if ( m_mutex ) CloseHandle(m_mutex); } + ~wxMutexInternal() { if ( m_mutex ) ::CloseHandle(m_mutex); } public: HANDLE m_mutex; @@ -223,97 +223,272 @@ wxMutexError wxMutex::Unlock() return wxMUTEX_NO_ERROR; } -// ---------------------------------------------------------------------------- -// wxCondition implementation -// ---------------------------------------------------------------------------- +// ========================================================================== +// wxSemaphore +// ========================================================================== -class wxConditionInternal +// -------------------------------------------------------------------------- +// wxSemaphoreInternal +// -------------------------------------------------------------------------- + +class wxSemaphoreInternal { public: - wxConditionInternal() + wxSemaphoreInternal( int initialcount = 0, int maxcount = 0 ); + ~wxSemaphoreInternal(); + + void Wait(); + bool TryWait(); + + bool Wait( unsigned long timeout_millis ); + + void Post(); + +private: + HANDLE m_semaphore; +}; + +wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount ) +{ + if ( maxcount == 0 ) { - m_hEvent = ::CreateEvent( - NULL, // default secutiry - FALSE, // not manual reset - FALSE, // nonsignaled initially - NULL // nameless event - ); - if ( !m_hEvent ) - { - wxLogSysError(_("Can not create event object.")); - } + // make it practically infinite + maxcount = INT_MAX; + } - // nobody waits for us yet - m_nWaiters = 0; + m_semaphore = ::CreateSemaphore( NULL, initialcount, maxcount, NULL ); + if ( !m_semaphore ) + { + wxLogLastError(_T("CreateSemaphore()")); } +} + +wxSemaphoreInternal::~wxSemaphoreInternal() +{ + CloseHandle( m_semaphore ); +} - bool Wait(DWORD timeout) +void wxSemaphoreInternal::Wait() +{ + if ( ::WaitForSingleObject( m_semaphore, INFINITE ) != WAIT_OBJECT_0 ) { - // as m_nWaiters variable is accessed from multiple waiting threads - // (and possibly from the broadcasting thread), we need to change its - // value atomically - ::InterlockedIncrement(&m_nWaiters); + wxLogLastError(_T("WaitForSingleObject")); + } +} + +bool wxSemaphoreInternal::TryWait() +{ + return Wait(0); +} - // FIXME this should be MsgWaitForMultipleObjects() as we want to keep - // processing Windows messages while waiting (or don't we?) - DWORD rc = ::WaitForSingleObject(m_hEvent, timeout); +bool wxSemaphoreInternal::Wait( unsigned long timeout_millis ) +{ + DWORD result = ::WaitForSingleObject( m_semaphore, timeout_millis ); + + switch ( result ) + { + case WAIT_OBJECT_0: + return TRUE; - ::InterlockedDecrement(&m_nWaiters); + case WAIT_TIMEOUT: + break; - return rc != WAIT_TIMEOUT; + default: + wxLogLastError(_T("WaitForSingleObject()")); } - void Signal() + return FALSE; +} + +void wxSemaphoreInternal::Post() +{ + if ( !::ReleaseSemaphore( m_semaphore, 1, NULL ) ) { - // set the event to signaled: if a thread is already waiting on it, it - // will be woken up, otherwise the event will remain in the signaled - // state until someone waits on it. In any case, the system will return - // it to a non signalled state afterwards. If multiple threads are - // waiting, only one will be woken up. - if ( !::SetEvent(m_hEvent) ) - { - wxLogLastError(wxT("SetEvent")); - } + wxLogLastError(_T("ReleaseSemaphore")); } +} + +// -------------------------------------------------------------------------- +// wxSemaphore +// -------------------------------------------------------------------------- + +wxSemaphore::wxSemaphore( int initialcount, int maxcount ) +{ + m_internal = new wxSemaphoreInternal( initialcount, maxcount ); +} + +wxSemaphore::~wxSemaphore() +{ + delete m_internal; +} + +void wxSemaphore::Wait() +{ + m_internal->Wait(); +} + +bool wxSemaphore::TryWait() +{ + return m_internal->TryWait(); +} + +bool wxSemaphore::Wait( unsigned long timeout_millis ) +{ + return m_internal->Wait( timeout_millis ); +} + +void wxSemaphore::Post() +{ + m_internal->Post(); +} + - void Broadcast() +// ========================================================================== +// wxCondition +// ========================================================================== + +// -------------------------------------------------------------------------- +// wxConditionInternal +// -------------------------------------------------------------------------- + +class wxConditionInternal +{ +public: + wxConditionInternal(wxMutex& mutex); + + void Wait(); + + bool Wait( unsigned long timeout_millis ); + + void Signal(); + + void Broadcast(); + +private: + int m_numWaiters; + wxMutex m_mutexNumWaiters; + + wxMutex& m_mutex; + + wxSemaphore m_semaphore; + + DECLARE_NO_COPY_CLASS(wxConditionInternal) +}; + +wxConditionInternal::wxConditionInternal(wxMutex& mutex) + : m_mutex(mutex) +{ + + m_numWaiters = 0; +} + +void wxConditionInternal::Wait() +{ + // increment the number of waiters + m_mutexNumWaiters.Lock(); + m_numWaiters++; + m_mutexNumWaiters.Unlock(); + + m_mutex.Unlock(); + + // a potential race condition can occur here + // + // after a thread increments nwaiters, and unlocks the mutex and before the + // semaphore.Wait() is called, if another thread can cause a signal to be + // generated + // + // this race condition is handled by using a semaphore and incrementing the + // semaphore only if 'nwaiters' is greater that zero since the semaphore, + // can 'remember' signals the race condition will not occur + + // wait ( if necessary ) and decrement semaphore + m_semaphore.Wait(); + + m_mutex.Lock(); +} + +bool wxConditionInternal::Wait( unsigned long timeout_millis ) +{ + m_mutexNumWaiters.Lock(); + m_numWaiters++; + m_mutexNumWaiters.Unlock(); + + m_mutex.Unlock(); + + // a race condition can occur at this point in the code + // + // please see the comments in Wait(), for details + + bool success = TRUE; + + bool result = m_semaphore.Wait( timeout_millis ); + + if ( !result ) { - // we need to save the original value as m_nWaiters is goign to be - // decreased by the signalled thread resulting in the loop being - // executed less times than needed - LONG nWaiters = m_nWaiters; - - // this works because all these threads are already waiting and so each - // SetEvent() inside Signal() is really a PulseEvent() because the - // event state is immediately returned to non-signaled - for ( LONG n = 0; n < nWaiters; n++ ) + // another potential race condition exists here it is caused when a + // 'waiting' thread timesout, and returns from WaitForSingleObject, but + // has not yet decremented 'nwaiters'. + // + // at this point if another thread calls signal() then the semaphore + // will be incremented, but the waiting thread will miss it. + // + // to handle this particular case, the waiting thread calls + // WaitForSingleObject again with a timeout of 0, after locking + // 'nwaiters_mutex'. this call does not block because of the zero + // timeout, but will allow the waiting thread to catch the missed + // signals. + m_mutexNumWaiters.Lock(); + result = m_semaphore.Wait( 0 ); + + if ( !result ) { - Signal(); + m_numWaiters--; + success = FALSE; } + + m_mutexNumWaiters.Unlock(); } - ~wxConditionInternal() + m_mutex.Lock(); + + return success; +} + +void wxConditionInternal::Signal() +{ + m_mutexNumWaiters.Lock(); + + if ( m_numWaiters > 0 ) { - if ( m_hEvent ) - { - if ( !::CloseHandle(m_hEvent) ) - { - wxLogLastError(wxT("CloseHandle(event)")); - } - } + // increment the semaphore by 1 + m_semaphore.Post(); + + m_numWaiters--; } -private: - // the Win32 synchronization object corresponding to this event - HANDLE m_hEvent; + m_mutexNumWaiters.Unlock(); +} - // number of threads waiting for this condition - LONG m_nWaiters; -}; +void wxConditionInternal::Broadcast() +{ + m_mutexNumWaiters.Lock(); + + while ( m_numWaiters > 0 ) + { + m_semaphore.Post(); + m_numWaiters--; + } + + m_mutexNumWaiters.Unlock(); +} + +// ---------------------------------------------------------------------------- +// wxCondition implementation +// ---------------------------------------------------------------------------- -wxCondition::wxCondition() +wxCondition::wxCondition(wxMutex& mutex) { - m_internal = new wxConditionInternal; + m_internal = new wxConditionInternal( mutex ); } wxCondition::~wxCondition() @@ -323,13 +498,12 @@ wxCondition::~wxCondition() void wxCondition::Wait() { - (void)m_internal->Wait(INFINITE); + m_internal->Wait(); } -bool wxCondition::Wait(unsigned long sec, - unsigned long nsec) +bool wxCondition::Wait( unsigned long timeout_millis ) { - return m_internal->Wait(sec*1000 + nsec/1000000); + return m_internal->Wait(timeout_millis); } void wxCondition::Signal() @@ -350,7 +524,7 @@ wxCriticalSection::wxCriticalSection() { #ifdef __WXDEBUG__ // Done this way to stop warnings during compilation about statement - // always being false + // always being FALSE int csSize = sizeof(CRITICAL_SECTION); int bSize = sizeof(m_buffer); wxASSERT_MSG( csSize <= bSize, @@ -963,12 +1137,18 @@ wxThreadError wxThread::Delete(ExitCode *pRc) } } - if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) ) + // although the thread might be already in the EXITED state it might not + // have terminated yet and so we are not sure that it has actually + // terminated if the "if" above hadn't been taken + do { - wxLogLastError(wxT("GetExitCodeThread")); + if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) ) + { + wxLogLastError(wxT("GetExitCodeThread")); - rc = (ExitCode)-1; - } + rc = (ExitCode)-1; + } + } while ( (DWORD)rc == STILL_ACTIVE ); if ( IsDetached() ) { @@ -979,9 +1159,6 @@ wxThreadError wxThread::Delete(ExitCode *pRc) delete this; } - wxASSERT_MSG( (DWORD)rc != STILL_ACTIVE, - wxT("thread must be already terminated.") ); - if ( pRc ) *pRc = rc;