X-Git-Url: https://git.saurik.com/wxWidgets.git/blobdiff_plain/08d7397c9a177bc73c28017933cfdc4c57369202..e7300ec6d9ebbd1cfa2fcf12c0ce6e5aee85a152:/src/msw/thread.cpp diff --git a/src/msw/thread.cpp b/src/msw/thread.cpp index ab8030e584..e2c76f9389 100644 --- a/src/msw/thread.cpp +++ b/src/msw/thread.cpp @@ -146,7 +146,7 @@ public: } } - ~wxMutexInternal() { if ( m_mutex ) CloseHandle(m_mutex); } + ~wxMutexInternal() { if ( m_mutex ) ::CloseHandle(m_mutex); } public: HANDLE m_mutex; @@ -223,97 +223,272 @@ wxMutexError wxMutex::Unlock() return wxMUTEX_NO_ERROR; } -// ---------------------------------------------------------------------------- -// wxCondition implementation -// ---------------------------------------------------------------------------- +// ========================================================================== +// wxSemaphore +// ========================================================================== -class wxConditionInternal +// -------------------------------------------------------------------------- +// wxSemaphoreInternal +// -------------------------------------------------------------------------- + +class wxSemaphoreInternal { public: - wxConditionInternal() + wxSemaphoreInternal( int initialcount = 0, int maxcount = 0 ); + ~wxSemaphoreInternal(); + + void Wait(); + bool TryWait(); + + bool Wait( unsigned long timeout_millis ); + + void Post(); + +private: + HANDLE m_semaphore; +}; + +wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount ) +{ + if ( maxcount == 0 ) { - m_hEvent = ::CreateEvent( - NULL, // default secutiry - FALSE, // not manual reset - FALSE, // nonsignaled initially - NULL // nameless event - ); - if ( !m_hEvent ) - { - wxLogSysError(_("Can not create event object.")); - } + // make it practically infinite + maxcount = INT_MAX; + } - // nobody waits for us yet - m_nWaiters = 0; + m_semaphore = ::CreateSemaphore( NULL, initialcount, maxcount, NULL ); + if ( !m_semaphore ) + { + wxLogLastError(_T("CreateSemaphore()")); } +} + +wxSemaphoreInternal::~wxSemaphoreInternal() +{ + CloseHandle( m_semaphore ); +} - bool Wait(DWORD timeout) +void wxSemaphoreInternal::Wait() +{ + if ( ::WaitForSingleObject( m_semaphore, INFINITE ) != WAIT_OBJECT_0 ) { - // as m_nWaiters variable is accessed from multiple waiting threads - // (and possibly from the broadcasting thread), we need to change its - // value atomically - ::InterlockedIncrement(&m_nWaiters); + wxLogLastError(_T("WaitForSingleObject")); + } +} + +bool wxSemaphoreInternal::TryWait() +{ + return Wait(0); +} + +bool wxSemaphoreInternal::Wait( unsigned long timeout_millis ) +{ + DWORD result = ::WaitForSingleObject( m_semaphore, timeout_millis ); - // FIXME this should be MsgWaitForMultipleObjects() as we want to keep - // processing Windows messages while waiting (or don't we?) - DWORD rc = ::WaitForSingleObject(m_hEvent, timeout); + switch ( result ) + { + case WAIT_OBJECT_0: + return TRUE; - ::InterlockedDecrement(&m_nWaiters); + case WAIT_TIMEOUT: + break; - return rc != WAIT_TIMEOUT; + default: + wxLogLastError(_T("WaitForSingleObject()")); } - void Signal() + return FALSE; +} + +void wxSemaphoreInternal::Post() +{ + if ( !::ReleaseSemaphore( m_semaphore, 1, NULL ) ) { - // set the event to signaled: if a thread is already waiting on it, it - // will be woken up, otherwise the event will remain in the signaled - // state until someone waits on it. In any case, the system will return - // it to a non signalled state afterwards. If multiple threads are - // waiting, only one will be woken up. - if ( !::SetEvent(m_hEvent) ) - { - wxLogLastError(wxT("SetEvent")); - } + wxLogLastError(_T("ReleaseSemaphore")); } +} + +// -------------------------------------------------------------------------- +// wxSemaphore +// -------------------------------------------------------------------------- + +wxSemaphore::wxSemaphore( int initialcount, int maxcount ) +{ + m_internal = new wxSemaphoreInternal( initialcount, maxcount ); +} + +wxSemaphore::~wxSemaphore() +{ + delete m_internal; +} + +void wxSemaphore::Wait() +{ + m_internal->Wait(); +} + +bool wxSemaphore::TryWait() +{ + return m_internal->TryWait(); +} + +bool wxSemaphore::Wait( unsigned long timeout_millis ) +{ + return m_internal->Wait( timeout_millis ); +} + +void wxSemaphore::Post() +{ + m_internal->Post(); +} + + +// ========================================================================== +// wxCondition +// ========================================================================== - void Broadcast() +// -------------------------------------------------------------------------- +// wxConditionInternal +// -------------------------------------------------------------------------- + +class wxConditionInternal +{ +public: + wxConditionInternal(wxMutex& mutex); + + void Wait(); + + bool Wait( unsigned long timeout_millis ); + + void Signal(); + + void Broadcast(); + +private: + int m_numWaiters; + wxMutex m_mutexNumWaiters; + + wxMutex& m_mutex; + + wxSemaphore m_semaphore; + + DECLARE_NO_COPY_CLASS(wxConditionInternal) +}; + +wxConditionInternal::wxConditionInternal(wxMutex& mutex) + : m_mutex(mutex) +{ + + m_numWaiters = 0; +} + +void wxConditionInternal::Wait() +{ + // increment the number of waiters + m_mutexNumWaiters.Lock(); + m_numWaiters++; + m_mutexNumWaiters.Unlock(); + + m_mutex.Unlock(); + + // a potential race condition can occur here + // + // after a thread increments nwaiters, and unlocks the mutex and before the + // semaphore.Wait() is called, if another thread can cause a signal to be + // generated + // + // this race condition is handled by using a semaphore and incrementing the + // semaphore only if 'nwaiters' is greater that zero since the semaphore, + // can 'remember' signals the race condition will not occur + + // wait ( if necessary ) and decrement semaphore + m_semaphore.Wait(); + + m_mutex.Lock(); +} + +bool wxConditionInternal::Wait( unsigned long timeout_millis ) +{ + m_mutexNumWaiters.Lock(); + m_numWaiters++; + m_mutexNumWaiters.Unlock(); + + m_mutex.Unlock(); + + // a race condition can occur at this point in the code + // + // please see the comments in Wait(), for details + + bool success = TRUE; + + bool result = m_semaphore.Wait( timeout_millis ); + + if ( !result ) { - // we need to save the original value as m_nWaiters is goign to be - // decreased by the signalled thread resulting in the loop being - // executed less times than needed - LONG nWaiters = m_nWaiters; - - // this works because all these threads are already waiting and so each - // SetEvent() inside Signal() is really a PulseEvent() because the - // event state is immediately returned to non-signaled - for ( LONG n = 0; n < nWaiters; n++ ) + // another potential race condition exists here it is caused when a + // 'waiting' thread timesout, and returns from WaitForSingleObject, but + // has not yet decremented 'nwaiters'. + // + // at this point if another thread calls signal() then the semaphore + // will be incremented, but the waiting thread will miss it. + // + // to handle this particular case, the waiting thread calls + // WaitForSingleObject again with a timeout of 0, after locking + // 'nwaiters_mutex'. this call does not block because of the zero + // timeout, but will allow the waiting thread to catch the missed + // signals. + m_mutexNumWaiters.Lock(); + result = m_semaphore.Wait( 0 ); + + if ( !result ) { - Signal(); + m_numWaiters--; + success = FALSE; } + + m_mutexNumWaiters.Unlock(); + } + + m_mutex.Lock(); + + return success; +} + +void wxConditionInternal::Signal() +{ + m_mutexNumWaiters.Lock(); + + if ( m_numWaiters > 0 ) + { + // increment the semaphore by 1 + m_semaphore.Post(); + + m_numWaiters--; } - ~wxConditionInternal() + m_mutexNumWaiters.Unlock(); +} + +void wxConditionInternal::Broadcast() +{ + m_mutexNumWaiters.Lock(); + + while ( m_numWaiters > 0 ) { - if ( m_hEvent ) - { - if ( !::CloseHandle(m_hEvent) ) - { - wxLogLastError(wxT("CloseHandle(event)")); - } - } + m_semaphore.Post(); + m_numWaiters--; } -private: - // the Win32 synchronization object corresponding to this event - HANDLE m_hEvent; + m_mutexNumWaiters.Unlock(); +} - // number of threads waiting for this condition - LONG m_nWaiters; -}; +// ---------------------------------------------------------------------------- +// wxCondition implementation +// ---------------------------------------------------------------------------- -wxCondition::wxCondition() +wxCondition::wxCondition(wxMutex& mutex) { - m_internal = new wxConditionInternal; + m_internal = new wxConditionInternal( mutex ); } wxCondition::~wxCondition() @@ -323,13 +498,12 @@ wxCondition::~wxCondition() void wxCondition::Wait() { - (void)m_internal->Wait(INFINITE); + m_internal->Wait(); } -bool wxCondition::Wait(unsigned long sec, - unsigned long nsec) +bool wxCondition::Wait( unsigned long timeout_millis ) { - return m_internal->Wait(sec*1000 + nsec/1000000); + return m_internal->Wait(timeout_millis); } void wxCondition::Signal() @@ -350,9 +524,9 @@ wxCriticalSection::wxCriticalSection() { #ifdef __WXDEBUG__ // Done this way to stop warnings during compilation about statement - // always being false - int csSize = sizeof(CRITICAL_SECTION); - int bSize = sizeof(m_buffer); + // always being FALSE + int csSize = sizeof(CRITICAL_SECTION); + int bSize = sizeof(m_buffer); wxASSERT_MSG( csSize <= bSize, _T("must increase buffer size in wx/thread.h") ); #endif @@ -411,7 +585,7 @@ public: } // create a new (suspended) thread (for the given thread object) - bool Create(wxThread *thread); + bool Create(wxThread *thread, unsigned int stackSize); // suspend/resume/terminate bool Suspend(); @@ -514,30 +688,34 @@ void wxThreadInternal::SetPriority(unsigned int priority) } } -bool wxThreadInternal::Create(wxThread *thread) +bool wxThreadInternal::Create(wxThread *thread, unsigned int stackSize) { // for compilers which have it, we should use C RTL function for thread // creation instead of Win32 API one because otherwise we will have memory // leaks if the thread uses C RTL (and most threads do) #ifdef wxUSE_BEGIN_THREAD + + // Watcom is reported to not like 0 stack size (which means "use default" + // for the other compilers and is also the default value for stackSize) +#ifdef __WATCOMC__ + if ( !stackSize ) + stackSize = 10240; +#endif // __WATCOMC__ + m_hThread = (HANDLE)_beginthreadex ( - NULL, // default security -#ifdef __WATCOMC__ - 10240, // stack size can't be NULL in Watcom -#else - 0, // default stack size -#endif - wxThreadInternal::WinThreadStart, // entry point - thread, - CREATE_SUSPENDED, - (unsigned int *)&m_tid + NULL, // default security + stackSize, + wxThreadInternal::WinThreadStart, // entry point + thread, + CREATE_SUSPENDED, + (unsigned int *)&m_tid ); #else // compiler doesn't have _beginthreadex m_hThread = ::CreateThread ( NULL, // default security - 0, // default stack size + stackSize, // stack size wxThreadInternal::WinThreadStart, // thread entry point (LPVOID)thread, // parameter CREATE_SUSPENDED, // flags @@ -643,6 +821,11 @@ int wxThread::GetCPUCount() return si.dwNumberOfProcessors; } +unsigned long wxThread::GetCurrentId() +{ + return (unsigned long)::GetCurrentThreadId(); +} + bool wxThread::SetConcurrency(size_t level) { wxASSERT_MSG( IsMain(), _T("should only be called from the main thread") ); @@ -756,11 +939,11 @@ wxThread::~wxThread() // create/start thread // ------------------- -wxThreadError wxThread::Create() +wxThreadError wxThread::Create(unsigned int stackSize) { wxCriticalSectionLocker lock(m_critsect); - if ( !m_internal->Create(this) ) + if ( !m_internal->Create(this, stackSize) ) return wxTHREAD_NO_RESOURCE; return wxTHREAD_NO_ERROR; @@ -954,12 +1137,18 @@ wxThreadError wxThread::Delete(ExitCode *pRc) } } - if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) ) + // although the thread might be already in the EXITED state it might not + // have terminated yet and so we are not sure that it has actually + // terminated if the "if" above hadn't been taken + do { - wxLogLastError(wxT("GetExitCodeThread")); + if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) ) + { + wxLogLastError(wxT("GetExitCodeThread")); - rc = (ExitCode)-1; - } + rc = (ExitCode)-1; + } + } while ( (DWORD)rc == STILL_ACTIVE ); if ( IsDetached() ) { @@ -970,9 +1159,6 @@ wxThreadError wxThread::Delete(ExitCode *pRc) delete this; } - wxASSERT_MSG( (DWORD)rc != STILL_ACTIVE, - wxT("thread must be already terminated.") ); - if ( pRc ) *pRc = rc; @@ -1244,3 +1430,5 @@ bool WXDLLEXPORT wxIsWaitingForThread() } #endif // wxUSE_THREADS + +// vi:sts=4:sw=4:et