From be8098689121d2352751c51005c133797d77ecea Mon Sep 17 00:00:00 2001 From: Vadim Zeitlin Date: Tue, 2 Apr 2002 13:15:16 +0000 Subject: [PATCH] added wxSemaphore (with docs), new version of wxCondition and bug fixes to wxThread (patch 538242 by K.S. Sreeram) git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@14907 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775 --- docs/latex/wx/category.tex | 1 + docs/latex/wx/classes.tex | 1 + docs/latex/wx/conditn.tex | 83 ++++-- docs/latex/wx/semaphor.tex | 84 ++++++ include/wx/thread.h | 96 +++++-- src/msw/thread.cpp | 333 +++++++++++++++++----- src/unix/threadpsx.cpp | 555 +++++++++++++++++++++++-------------- 7 files changed, 825 insertions(+), 328 deletions(-) create mode 100644 docs/latex/wx/semaphor.tex diff --git a/docs/latex/wx/category.tex b/docs/latex/wx/category.tex index a3d8afe9a2..a0c874f5cf 100644 --- a/docs/latex/wx/category.tex +++ b/docs/latex/wx/category.tex @@ -505,6 +505,7 @@ capabilities of the various platforms. \twocolitem{\helpref{wxCriticalSection}{wxcriticalsection}}{Critical section class} \twocolitem{\helpref{wxCriticalSectionLocker}{wxcriticalsectionlocker}}{Critical section locker utility class} \twocolitem{\helpref{wxCondition}{wxcondition}}{Condition class} +\twocolitem{\helpref{wxSemaphore}{wxsemaphore}}{Semaphore class} \end{twocollist} {\large {\bf HTML classes}} diff --git a/docs/latex/wx/classes.tex b/docs/latex/wx/classes.tex index bc7e82b958..9ec19fa776 100644 --- a/docs/latex/wx/classes.tex +++ b/docs/latex/wx/classes.tex @@ -270,6 +270,7 @@ \input scrolwin.tex \input scrlwevt.tex \input scrolevt.tex +\input semaphor.tex \input hprovsmp.tex \input sngchdlg.tex \input snglinst.tex diff --git a/docs/latex/wx/conditn.tex b/docs/latex/wx/conditn.tex index 88b04eeb28..bb71a7e7d7 100644 --- a/docs/latex/wx/conditn.tex +++ b/docs/latex/wx/conditn.tex @@ -12,28 +12,22 @@ perfect because in this particular case it would be much better to just worker threads it already makes much more sense). Note that a call to \helpref{Signal()}{wxconditionsignal} may happen before the -other thread calls \helpref{Wait()}{wxconditionwait} but, in marked contrast -with the pthread conditions, this will still work as the missed signals are -queued and \helpref{Wait()}{wxconditionwait} simply returns immediately if -there are ny pending signals. - -However, the calls to \helpref{Broadcast()}{wxconditionbroadcast} are {\bf not} -queued and so it will only wake up the threads already waiting on the -condition. Accordingly, you will probably want to use a mutex to ensure that -the thread(s) you want to be waken up have indeed started to wait before -calling \helpref{Broadcast()}{wxconditionbroadcast}. +other thread calls \helpref{Wait()}{wxconditionwait} and, just as with the +pthread conditions, the signal is then lost and so if you want to be sure to +get it you must use a mutex together with the condition variable. \wxheading{Example} -This example shows how a main thread may launch a worker thread and wait until -it starts running: +This example shows how a main thread may launch a worker thread which starts +running and then waits until the main thread signals it to continue: \begin{verbatim} class MyWaitingThread : public wxThread { public: - MyWaitingThread(wxCondition *condition) + MyWaitingThread(wxMutex *mutex, wxCondition *condition) { + m_mutex = mutex; m_condition = condition; Create(); @@ -41,8 +35,11 @@ public: virtual ExitCode Entry() { - // let the main thread know that we started running + // wait for the signal from the main thread: it is absolutely necessary + // to look the mutex before doing it! + m_mutex->Lock(); m_condition->Signal(); + m_mutex->Unlock(); ... do our job ... @@ -55,15 +52,23 @@ private: int main() { - wxCondition condition; - MyWaitingThread *thread - new MyWaitingThread(&condition); + wxMutex mutex; + wxCondition condition(&mutex); + + for ( int i = 0; i < 10; i++ ) + { + MyWaitingThread *thread = new MyWaitingThread(&mutex, &condition); - thread->Run(); + thread->Run(); + } - // wait until the thread really starts running - condition.Wait(); + // wake up one of the threads + condition.Signal(); - ... + // wake up all the other ones + condition.Broadcast(); + + ... wait until they terminate or do something else ... return 0; } @@ -85,37 +90,57 @@ None. \membersection{wxCondition::wxCondition}\label{wxconditionconstr} -\func{}{wxCondition}{\void} +\func{}{wxCondition}{\param{wxMutex }{*mutex}} -Default constructor. +Default and only constructor. The {\it mutex} must be non {\tt NULL}. \membersection{wxCondition::\destruct{wxCondition}} \func{}{\destruct{wxCondition}}{\void} -Destroys the wxCondition object. +Destroys the wxCondition object. The destructor is not virtual so this class +should not be used polymorphically. \membersection{wxCondition::Broadcast}\label{wxconditionbroadcast} \func{void}{Broadcast}{\void} -Broadcasts to all waiting objects. +Broadcasts to all waiting threads, waking all of them up. Note that this method +may be called whether the mutex associated with this condition is locked or +not. + +\wxheading{See also} + +\helpref{wxCondition::Signal}{wxconditionsignal} \membersection{wxCondition::Signal}\label{wxconditionsignal} \func{void}{Signal}{\void} -Signals the object. +Signals the object waking up at most one thread. If several threads are waiting +on the same condition, the exact thread which is woken up is undefined. If no +threads are waiting, the signal is lost and the condition would have to be +signalled again to wake up any thread which may start waiting on it later. + +Note that this method may be called whether the mutex associated with this +condition is locked or not. + +\wxheading{See also} + +\helpref{wxCondition::Broadcast}{wxconditionbroadcast} \membersection{wxCondition::Wait}\label{wxconditionwait} \func{void}{Wait}{\void} -Waits indefinitely. +Waits until the condition is signalled. \func{bool}{Wait}{\param{unsigned long}{ sec}, \param{unsigned long}{ nsec}} -Waits until a signal is raised or the timeout has elapsed. +Waits until the condition is signalled or the timeout has elapsed. + +Note that the mutex associated with this condition {\bf must} be acquired by +the thread before calling this method. \wxheading{Parameters} @@ -125,5 +150,7 @@ Waits until a signal is raised or the timeout has elapsed. \wxheading{Return value} -The second form returns if the signal was raised, or FALSE if there was a timeout. +The second form returns {\tt TRUE} if the condition has been signalled, or +{\tt FALSE} if it returned because the timeout has elapsed. + diff --git a/docs/latex/wx/semaphor.tex b/docs/latex/wx/semaphor.tex new file mode 100644 index 0000000000..31938286cc --- /dev/null +++ b/docs/latex/wx/semaphor.tex @@ -0,0 +1,84 @@ +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Name: semaphore.tex +%% Purpose: wxSemaphore documentation +%% Author: Vadim Zeitlin +%% Modified by: +%% Created: 02.04.02 +%% RCS-ID: $Id$ +%% Copyright: (c) 2002 Vadim Zeitlin +%% License: wxWindows license +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +\section{\class{wxSemaphore}}\label{wxsemaphore} + +wxSemaphore is a counter limiting the number of threads concurrently accessing +a shared resource. This counter is always between $0$ and the maximum value +specified during the semaphore creation. When the counter is strictly greater +than $0$, a call to \helpref{Wait}{wxsemaphorewait} returns immediately and +decrements the counter. As soon as it reaches $0$, any subsequent calls to +\helpref{Wait}{wxsemaphorewait} block and only return when the semaphore +counter becomes strictly positive again as the result of calling +\helpref{Post}{wxsemaphorepost} which increments the counter. + +In general, the semaphores are useful to restict access to a shared resource +which can only be accessed by some fixed number of clients at once. For +example, when modeling a hotel reservation system a semaphore with the counter +equal to the total number of available rooms could be created. Each time a room +is reserved, the semaphore should be acquired by calling +\helpref{Wait}{wxsemaphorewait} and each time a room is freed it should be +released by calling \helpref{Post}{wxsemaphorepost}. + +\wxheading{Derived from} + +No base class + +\wxheading{Include files} + + + +\latexignore{\rtfignore{\wxheading{Members}}} + +\membersection{wxSemaphore::wxSemaphore}\label{wxsemaphorewxsemaphore} + +\func{}{wxSemaphore}{\param{int }{initialcount = 0}, \param{int }{maxcount = 0}} + +Specifying a {\it maxcount} of $0$ actually makes wxSemaphore behave as if +there is no upper limit. If maxcount is $1$ the semaphore behaves exactly as a +mutex. + +{\it initialcount} is the initial value of the semaphore which must be between +$0$ and {\it maxcount} (if it is not set to $0$). + +\membersection{wxSemaphore::\destruct{wxSemaphore}}\label{wxsemaphoredtor} + +\func{}{\destruct{wxSemaphore}}{\void} + +Destructor is not virtual, don't use this class polymorphically. + +\membersection{wxSemaphore::Post}\label{wxsemaphorepost} + +\func{void}{Post}{\void} + +Increments the semaphore count and signals one of the waiting threads in an +atomic way. + +\membersection{wxSemaphore::TryWait}\label{wxsemaphoretrywait} + +\func{bool}{TryWait}{\void} + +Same as \helpref{Wait()}{wxsemaphorewait}, but does not block, returns +{\tt TRUE} if the semaphore was successfully acquired and {\tt FALSE} if the +count is zero and it couldn't be done. + +\membersection{wxSemaphore::Wait}\label{wxsemaphorewait} + +\func{void}{Wait}{\void} + +Wait indefinitely until the semaphore count becomes strictly positive +and then decrement it and return. + +\func{bool}{Wait}{\param{unsigned long }{timeout\_millis}} + +Same as the version above, but with a timeout limit: returns {\tt TRUE| if the +semaphore was acquired and {\tt FALSE} if the timeout has ellapsed + diff --git a/include/wx/thread.h b/include/wx/thread.h index 7dd5cfa621..ff78911271 100644 --- a/include/wx/thread.h +++ b/include/wx/thread.h @@ -80,6 +80,7 @@ enum // you should consider wxMutexLocker whenever possible instead of directly // working with wxMutex class - it is safer +class WXDLLEXPORT wxConditionInternal; class WXDLLEXPORT wxMutexInternal; class WXDLLEXPORT wxMutex { @@ -105,6 +106,8 @@ protected: int m_locked; wxMutexInternal *m_internal; + + friend class wxConditionInternal; }; // a helper class which locks the mutex in the ctor and unlocks it in the dtor: @@ -211,41 +214,97 @@ private: }; // ---------------------------------------------------------------------------- -// Condition variable: allows to block the thread execution until something -// happens (== condition is signaled) +// wxCondition models a POSIX condition variable which allows one (or more) +// thread(s) to wait until some condition is fulfilled // ---------------------------------------------------------------------------- -class wxConditionInternal; class WXDLLEXPORT wxCondition { public: - // constructor & destructor - wxCondition(); + // constructor and destructor + + // Each wxCondition object is associated with with a wxMutex object The + // mutex object MUST be locked before calling Wait() + wxCondition( wxMutex *mutex ); + + // dtor is not virtual, don't use this class polymorphically ~wxCondition(); - // wait until the condition is signaled - // waits indefinitely. + // NB: the associated mutex MUST be locked beforehand by the calling thread + // + // it atomically releases the lock on the associated mutex + // and starts waiting to be woken up by a Signal()/Broadcast() + // once its signaled, then it will wait until it can reacquire + // the lock on the associated mutex object, before returning. void Wait(); - // waits until a signal is raised or the timeout elapses - bool Wait(unsigned long sec, unsigned long nsec); - // signal the condition - // wakes up one (and only one) of the waiting threads + // exactly as Wait() except that it may also return if the specified + // timeout ellapses even if the condition hasn't been signalled: in this + // case, the return value is FALSE, otherwise (i.e. in case of a normal + // return) it is TRUE + // + // the timeeout parameter specifies a interval that needs to be waited in + // milliseconds + bool Wait( unsigned long timeout_millis ); + + // NB: the associated mutex may or may not be locked by the calling thread + // + // this method unblocks one thread if any are blocking on the condition. + // if no thread is blocking in Wait(), then the signal is NOT remembered + // The thread which was blocking on Wait(), will then reacquire the lock + // on the associated mutex object before returning void Signal(); - // wakes up all threads waiting on this condition - void Broadcast(); -#ifdef __WXDEBUG__ - // for debugging purposes only - void *GetId() const { return m_internal; } -#endif // __WXDEBUG__ + // NB: the associated mutex may or may not be locked by the calling thread + // + // this method unblocks all threads if any are blocking on the condition. + // if no thread is blocking in Wait(), then the signal is NOT remembered + // The threads which were blocking on Wait(), will then reacquire the lock + // on the associated mutex object before returning. + void Broadcast(); private: wxConditionInternal *m_internal; }; // ---------------------------------------------------------------------------- -// Thread class +// wxSemaphore: a counter limiting the number of threads concurrently accessing +// a shared resource +// ---------------------------------------------------------------------------- + +class WXDLLEXPORT wxSemaphoreInternal; +class WXDLLEXPORT wxSemaphore +{ +public: + // specifying a maxcount of 0 actually makes wxSemaphore behave as if there + // is no upper limit, if maxcount is 1 the semaphore behaves as a mutex + wxSemaphore( int initialcount = 0, int maxcount = 0 ); + + // dtor is not virtual, don't use this class polymorphically + ~wxSemaphore(); + + // wait indefinitely, until the semaphore count goes beyond 0 + // and then decrement it and return (this method might have been called + // Acquire()) + void Wait(); + + // same as Wait(), but does not block, returns TRUE if successful and + // FALSE if the count is zero + bool TryWait(); + + // same as Wait(), but as a timeout limit, returns TRUE if the semaphore + // was acquired and FALSE if the timeout has ellapsed + bool Wait( unsigned long timeout_millis ); + + // increments the semaphore count and signals one of the waiting threads + void Post(); + +private: + wxSemaphoreInternal *m_internal; +}; + +// ---------------------------------------------------------------------------- +// wxThread: class encpasulating a thread of execution // ---------------------------------------------------------------------------- // there are two different kinds of threads: joinable and detached (default) @@ -540,4 +599,3 @@ public: #endif // __THREADH__ -// vi:sts=4:sw=4:et diff --git a/src/msw/thread.cpp b/src/msw/thread.cpp index b654353bfc..ec70911da4 100644 --- a/src/msw/thread.cpp +++ b/src/msw/thread.cpp @@ -146,7 +146,7 @@ public: } } - ~wxMutexInternal() { if ( m_mutex ) CloseHandle(m_mutex); } + ~wxMutexInternal() { if ( m_mutex ) ::CloseHandle(m_mutex); } public: HANDLE m_mutex; @@ -223,97 +223,284 @@ wxMutexError wxMutex::Unlock() return wxMUTEX_NO_ERROR; } -// ---------------------------------------------------------------------------- -// wxCondition implementation -// ---------------------------------------------------------------------------- +// ========================================================================== +// wxSemaphore +// ========================================================================== -class wxConditionInternal +// -------------------------------------------------------------------------- +// wxSemaphoreInternal +// -------------------------------------------------------------------------- + +class wxSemaphoreInternal { public: - wxConditionInternal() + wxSemaphoreInternal( int initialcount = 0, int maxcount = 0 ); + ~wxSemaphoreInternal(); + + void Wait(); + bool TryWait(); + + bool Wait( unsigned long timeout_millis ); + + void Post(); + +private: + HANDLE m_semaphore; +}; + +wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount ) +{ + if ( maxcount == 0 ) { - m_hEvent = ::CreateEvent( - NULL, // default secutiry - FALSE, // not manual reset - FALSE, // nonsignaled initially - NULL // nameless event - ); - if ( !m_hEvent ) - { - wxLogSysError(_("Can not create event object.")); - } + // make it practically infinite + maxcount = INT_MAX; + } - // nobody waits for us yet - m_nWaiters = 0; + m_semaphore = ::CreateSemaphore( NULL, initialcount, maxcount, NULL ); + if ( !m_semaphore ) + { + wxLogLastError(_T("CreateSemaphore()")); } +} + +wxSemaphoreInternal::~wxSemaphoreInternal() +{ + CloseHandle( m_semaphore ); +} - bool Wait(DWORD timeout) +void wxSemaphoreInternal::Wait() +{ + if ( ::WaitForSingleObject( m_semaphore, INFINITE ) != WAIT_OBJECT_0 ) { - // as m_nWaiters variable is accessed from multiple waiting threads - // (and possibly from the broadcasting thread), we need to change its - // value atomically - ::InterlockedIncrement(&m_nWaiters); + wxLogLastError(_T("WaitForSingleObject")); + } +} - // FIXME this should be MsgWaitForMultipleObjects() as we want to keep - // processing Windows messages while waiting (or don't we?) - DWORD rc = ::WaitForSingleObject(m_hEvent, timeout); +bool wxSemaphoreInternal::TryWait() +{ + return Wait(0); +} + +bool wxSemaphoreInternal::Wait( unsigned long timeout_millis ) +{ + DWORD result = ::WaitForSingleObject( m_semaphore, timeout_millis ); + + switch ( result ) + { + case WAIT_OBJECT_0: + return TRUE; - ::InterlockedDecrement(&m_nWaiters); + case WAIT_TIMEOUT: + break; - return rc != WAIT_TIMEOUT; + default: + wxLogLastError(_T("WaitForSingleObject()")); } - void Signal() + return FALSE; +} + +void wxSemaphoreInternal::Post() +{ + if ( !::ReleaseSemaphore( m_semaphore, 1, NULL ) ) { - // set the event to signaled: if a thread is already waiting on it, it - // will be woken up, otherwise the event will remain in the signaled - // state until someone waits on it. In any case, the system will return - // it to a non signalled state afterwards. If multiple threads are - // waiting, only one will be woken up. - if ( !::SetEvent(m_hEvent) ) - { - wxLogLastError(wxT("SetEvent")); - } + wxLogLastError(_T("ReleaseSemaphore")); } +} + +// -------------------------------------------------------------------------- +// wxSemaphore +// -------------------------------------------------------------------------- + +wxSemaphore::wxSemaphore( int initialcount, int maxcount ) +{ + m_internal = new wxSemaphoreInternal( initialcount, maxcount ); +} + +wxSemaphore::~wxSemaphore() +{ + delete m_internal; +} + +void wxSemaphore::Wait() +{ + m_internal->Wait(); +} + +bool wxSemaphore::TryWait() +{ + return m_internal->TryWait(); +} + +bool wxSemaphore::Wait( unsigned long timeout_millis ) +{ + return m_internal->Wait( timeout_millis ); +} + +void wxSemaphore::Post() +{ + m_internal->Post(); +} + + +// ========================================================================== +// wxCondition +// ========================================================================== + +// -------------------------------------------------------------------------- +// wxConditionInternal +// -------------------------------------------------------------------------- - void Broadcast() +class wxConditionInternal +{ +public: + wxConditionInternal( wxMutex *mutex ); + ~wxConditionInternal(); + + void Wait(); + + bool Wait( unsigned long timeout_millis ); + + void Signal(); + + void Broadcast(); + +private: + int m_numWaiters; + wxMutex m_mutexNumWaiters; + + wxMutex *m_mutex; + + wxSemaphore m_semaphore; +}; + +wxConditionInternal::wxConditionInternal( wxMutex *mutex ) +{ + m_mutex = mutex; + + m_numWaiters = 0; +} + +wxConditionInternal::~wxConditionInternal() +{ +} + +void wxConditionInternal::Wait() +{ + // increment the number of waiters + m_mutexNumWaiters.Lock(); + m_numWaiters++; + m_mutexNumWaiters.Unlock(); + + m_mutex->Unlock(); + + // a potential race condition can occur here + // + // after a thread increments nwaiters, and unlocks the mutex and before the + // semaphore.Wait() is called, if another thread can cause a signal to be + // generated + // + // this race condition is handled by using a semaphore and incrementing the + // semaphore only if 'nwaiters' is greater that zero since the semaphore, + // can 'remember' signals the race condition will not occur + + // wait ( if necessary ) and decrement semaphore + m_semaphore.Wait(); + + m_mutex->Lock(); +} + +bool wxConditionInternal::Wait( unsigned long timeout_millis ) +{ + m_mutexNumWaiters.Lock(); + m_numWaiters++; + m_mutexNumWaiters.Unlock(); + + m_mutex->Unlock(); + + // a race condition can occur at this point in the code + // + // please see the comments in Wait(), for details + + bool success = TRUE; + + bool result = m_semaphore.Wait( timeout_millis ); + + if ( !result ) { - // we need to save the original value as m_nWaiters is goign to be - // decreased by the signalled thread resulting in the loop being - // executed less times than needed - LONG nWaiters = m_nWaiters; - - // this works because all these threads are already waiting and so each - // SetEvent() inside Signal() is really a PulseEvent() because the - // event state is immediately returned to non-signaled - for ( LONG n = 0; n < nWaiters; n++ ) + // another potential race condition exists here it is caused when a + // 'waiting' thread timesout, and returns from WaitForSingleObject, but + // has not yet decremented 'nwaiters'. + // + // at this point if another thread calls signal() then the semaphore + // will be incremented, but the waiting thread will miss it. + // + // to handle this particular case, the waiting thread calls + // WaitForSingleObject again with a timeout of 0, after locking + // 'nwaiters_mutex'. this call does not block because of the zero + // timeout, but will allow the waiting thread to catch the missed + // signals. + m_mutexNumWaiters.Lock(); + result = m_semaphore.Wait( 0 ); + + if ( !result ) { - Signal(); + m_numWaiters--; + success = FALSE; } + + m_mutexNumWaiters.Unlock(); } - ~wxConditionInternal() + m_mutex->Lock(); + + return success; +} + +void wxConditionInternal::Signal() +{ + m_mutexNumWaiters.Lock(); + + if ( m_numWaiters > 0 ) { - if ( m_hEvent ) - { - if ( !::CloseHandle(m_hEvent) ) - { - wxLogLastError(wxT("CloseHandle(event)")); - } - } + // increment the semaphore by 1 + m_semaphore.Post(); + + m_numWaiters--; } -private: - // the Win32 synchronization object corresponding to this event - HANDLE m_hEvent; + m_mutexNumWaiters.Unlock(); +} - // number of threads waiting for this condition - LONG m_nWaiters; -}; +void wxConditionInternal::Broadcast() +{ + m_mutexNumWaiters.Lock(); -wxCondition::wxCondition() + while ( m_numWaiters > 0 ) + { + m_semaphore.Post(); + m_numWaiters--; + } + + m_mutexNumWaiters.Unlock(); +} + +// ---------------------------------------------------------------------------- +// wxCondition implementation +// ---------------------------------------------------------------------------- + +wxCondition::wxCondition( wxMutex *mutex ) { - m_internal = new wxConditionInternal; + if ( !mutex ) + { + wxFAIL_MSG( _T("NULL mutex in wxCondition ctor") ); + + m_internal = NULL; + } + else + { + m_internal = new wxConditionInternal( mutex ); + } } wxCondition::~wxCondition() @@ -323,23 +510,25 @@ wxCondition::~wxCondition() void wxCondition::Wait() { - (void)m_internal->Wait(INFINITE); + if ( m_internal ) + m_internal->Wait(); } -bool wxCondition::Wait(unsigned long sec, - unsigned long nsec) +bool wxCondition::Wait( unsigned long timeout_millis ) { - return m_internal->Wait(sec*1000 + nsec/1000000); + return m_internal ? m_internal->Wait(timeout_millis) : FALSE; } void wxCondition::Signal() { - m_internal->Signal(); + if ( m_internal ) + m_internal->Signal(); } void wxCondition::Broadcast() { - m_internal->Broadcast(); + if ( m_internal ) + m_internal->Broadcast(); } // ---------------------------------------------------------------------------- @@ -350,7 +539,7 @@ wxCriticalSection::wxCriticalSection() { #ifdef __WXDEBUG__ // Done this way to stop warnings during compilation about statement - // always being false + // always being FALSE int csSize = sizeof(CRITICAL_SECTION); int bSize = sizeof(m_buffer); wxASSERT_MSG( csSize <= bSize, diff --git a/src/unix/threadpsx.cpp b/src/unix/threadpsx.cpp index 5e8ba13423..4524bae5b8 100644 --- a/src/unix/threadpsx.cpp +++ b/src/unix/threadpsx.cpp @@ -2,13 +2,14 @@ // Name: threadpsx.cpp // Purpose: wxThread (Posix) Implementation // Author: Original from Wolfram Gloger/Guilhem Lavaux -// Modified by: +// Modified by: K. S. Sreeram (2002): POSIXified wxCondition, added wxSemaphore // Created: 04/22/98 // RCS-ID: $Id$ // Copyright: (c) Wolfram Gloger (1996, 1997) // Guilhem Lavaux (1998) // Vadim Zeitlin (1999-2002) // Robert Roebling (1999) +// K. S. Sreeram (2002) // Licence: wxWindows licence ///////////////////////////////////////////////////////////////////////////// @@ -34,6 +35,7 @@ #include "wx/log.h" #include "wx/intl.h" #include "wx/dynarray.h" +#include "wx/timer.h" #include #include @@ -72,15 +74,6 @@ static const wxThread::ExitCode EXITCODE_CANCELLED = (wxThread::ExitCode)-1; // our trace mask #define TRACE_THREADS _T("thread") -// ---------------------------------------------------------------------------- -// pseudo template types -// ---------------------------------------------------------------------------- - -WX_DECLARE_LIST(pthread_mutex_t, wxMutexList); - -#include "wx/listimpl.cpp" -WX_DEFINE_LIST(wxMutexList); - // ---------------------------------------------------------------------------- // private functions // ---------------------------------------------------------------------------- @@ -88,35 +81,6 @@ WX_DEFINE_LIST(wxMutexList); static void ScheduleThreadForDeletion(); static void DeleteThread(wxThread *This); -// ---------------------------------------------------------------------------- -// private classes -// ---------------------------------------------------------------------------- - -// same as wxMutexLocker but for "native" mutex -class MutexLock -{ -public: - MutexLock(pthread_mutex_t& mutex) - { - m_mutex = &mutex; - if ( pthread_mutex_lock(m_mutex) != 0 ) - { - wxLogDebug(_T("pthread_mutex_lock() failed")); - } - } - - ~MutexLock() - { - if ( pthread_mutex_unlock(m_mutex) != 0 ) - { - wxLogDebug(_T("pthread_mutex_unlock() failed")); - } - } - -private: - pthread_mutex_t *m_mutex; -}; - // ---------------------------------------------------------------------------- // types // ---------------------------------------------------------------------------- @@ -143,7 +107,7 @@ static pthread_key_t gs_keySelf; static size_t gs_nThreadsBeingDeleted = 0; // a mutex to protect gs_nThreadsBeingDeleted -static pthread_mutex_t gs_mutexDeleteThread; +static wxMutex *gs_mutexDeleteThread = (wxMutex *)NULL; // and a condition variable which will be signaled when all // gs_nThreadsBeingDeleted will have been deleted @@ -174,6 +138,8 @@ public: private: pthread_mutex_t m_mutex; + + friend class wxConditionInternal; }; wxMutexInternal::wxMutexInternal() @@ -342,209 +308,379 @@ wxMutexError wxMutex::Unlock() return m_internal->Unlock(); } -// ============================================================================ +// =========================================================================== // wxCondition implementation -// ============================================================================ +// =========================================================================== -// ---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- // wxConditionInternal -// ---------------------------------------------------------------------------- +// --------------------------------------------------------------------------- -// The native POSIX condition variables are dumb: if the condition is signaled -// before another thread starts to wait on it, the signal is lost and so this -// other thread will be never woken up. It's much more convenient to us to -// remember that the condition was signaled and to return from Wait() -// immediately in this case (this is more like Win32 automatic event objects) class wxConditionInternal { public: - wxConditionInternal(); + wxConditionInternal( wxMutex *mutex ); ~wxConditionInternal(); - // wait with the given timeout or indefinitely if NULL - bool Wait(const timespec* ts = NULL); + void Wait(); - void Signal(bool all = FALSE); + bool Wait( const timespec *ts ); -private: - // the number of Signal() calls we "missed", i.e. which were done while - // there were no threads to wait for them - size_t m_nQueuedSignals; + void Signal(); - // counts all pending waiters - size_t m_nWaiters; + void Broadcast(); - // the condition itself - pthread_cond_t m_condition; +private: - // the mutex used with the conditon: it also protects the counters above - pthread_mutex_t m_mutex; + wxMutex *m_mutex; + pthread_cond_t m_cond; }; -wxConditionInternal::wxConditionInternal() +wxConditionInternal::wxConditionInternal( wxMutex *mutex ) { - m_nQueuedSignals = - m_nWaiters = 0; - - if ( pthread_cond_init(&m_condition, (pthread_condattr_t *)NULL) != 0 ) - { - // this is supposed to never happen - wxFAIL_MSG( _T("pthread_cond_init() failed") ); - } - - if ( pthread_mutex_init(&m_mutex, NULL) != 0 ) + m_mutex = mutex; + if ( pthread_cond_init( &m_cond, NULL ) != 0 ) { - // neither this - wxFAIL_MSG( _T("wxCondition: pthread_mutex_init() failed") ); + wxLogDebug(_T("pthread_cond_init() failed")); } } wxConditionInternal::~wxConditionInternal() { - if ( pthread_cond_destroy( &m_condition ) != 0 ) + if ( pthread_cond_destroy( &m_cond ) != 0 ) { - wxLogDebug(_T("Failed to destroy condition variable (some " - "threads are probably still waiting on it?)")); + wxLogDebug(_T("pthread_cond_destroy() failed")); } +} - if ( pthread_mutex_destroy( &m_mutex ) != 0 ) +void wxConditionInternal::Wait() +{ + if ( pthread_cond_wait( &m_cond, &(m_mutex->m_internal->m_mutex) ) != 0 ) { - wxLogDebug(_T("Failed to destroy mutex (it is probably locked)")); + wxLogDebug(_T("pthread_cond_wait() failed")); } } -bool wxConditionInternal::Wait(const timespec* ts) +bool wxConditionInternal::Wait( const timespec *ts ) { - MutexLock lock(m_mutex); + int result = pthread_cond_timedwait( &m_cond, + &(m_mutex->m_internal->m_mutex), + ts ); + if ( result == ETIMEDOUT ) + return FALSE; - if ( m_nQueuedSignals ) - { - m_nQueuedSignals--; + wxASSERT_MSG( result == 0, _T("pthread_cond_timedwait() failed") ); - wxLogTrace(TRACE_THREADS, - _T("wxCondition(%08x)::Wait(): Has been signaled before"), - this); + return TRUE; +} - return TRUE; +void wxConditionInternal::Signal() +{ + int result = pthread_cond_signal( &m_cond ); + if ( result != 0 ) +{ + wxFAIL_MSG( _T("pthread_cond_signal() failed") ); } +} - // there are no queued signals, so start really waiting - m_nWaiters++; +void wxConditionInternal::Broadcast() +{ + int result = pthread_cond_broadcast( &m_cond ); + if ( result != 0 ) +{ + wxFAIL_MSG( _T("pthread_cond_broadcast() failed") ); + } +} - // calling wait function below unlocks the mutex and Signal() or - // Broadcast() will be able to continue to run now if they were - // blocking for it in the loop locking all mutexes) - wxLogTrace(TRACE_THREADS, - _T("wxCondition(%08x)::Wait(): starting to wait"), this); - int err = ts ? pthread_cond_timedwait(&m_condition, &m_mutex, ts) - : pthread_cond_wait(&m_condition, &m_mutex); - switch ( err ) +// --------------------------------------------------------------------------- +// wxCondition +// --------------------------------------------------------------------------- + +wxCondition::wxCondition( wxMutex *mutex ) +{ + if ( !mutex ) { - case 0: - // condition was signaled - wxLogTrace(TRACE_THREADS, - _T("wxCondition(%08x)::Wait(): ok"), this); - break; + wxFAIL_MSG( _T("NULL mutex in wxCondition ctor") ); - default: - wxLogDebug(_T("unexpected pthread_cond_[timed]wait() return")); - // fall through + m_internal = NULL; + } + else + { + m_internal = new wxConditionInternal( mutex ); + } +} - case ETIMEDOUT: - case EINTR: - // The condition has not been signaled, so we have to - // decrement the counter manually - --m_nWaiters; +wxCondition::~wxCondition() +{ + delete m_internal; +} - // wait interrupted or timeout elapsed - wxLogTrace(TRACE_THREADS, - _T("wxCondition(%08x)::Wait(): timeout/intr"), this); - } +void wxCondition::Wait() +{ + if ( m_internal ) + m_internal->Wait(); +} + +bool wxCondition::Wait( unsigned long timeout_millis ) +{ + wxCHECK_MSG( m_internal, FALSE, _T("can't wait on uninitalized condition") ); + + wxLongLong curtime = wxGetLocalTimeMillis(); + curtime += timeout_millis; + wxLongLong temp = curtime / 1000; + int sec = temp.GetLo(); + temp = temp * 1000; + temp = curtime - temp; + int millis = temp.GetLo(); + + timespec tspec; + + tspec.tv_sec = sec; + tspec.tv_nsec = millis * 1000L * 1000L; - return err == 0; + return m_internal->Wait(&tspec); } -void wxConditionInternal::Signal(bool all) +void wxCondition::Signal() { - // make sure that only one Signal() or Broadcast() is in progress - MutexLock lock(m_mutex); + if ( m_internal ) + m_internal->Signal(); +} - // Are there any waiters? - if ( m_nWaiters == 0 ) - { - // No, there are not, so don't signal but keep in mind for the next - // Wait() - m_nQueuedSignals++; +void wxCondition::Broadcast() +{ + if ( m_internal ) + m_internal->Broadcast(); +} + +// =========================================================================== +// wxSemaphore implementation +// =========================================================================== - return; +// --------------------------------------------------------------------------- +// wxSemaphoreInternal +// --------------------------------------------------------------------------- + +class wxSemaphoreInternal +{ +public: + wxSemaphoreInternal( int initialcount, int maxcount ); + + void Wait(); + bool TryWait(); + + bool Wait( unsigned long timeout_millis ); + + void Post(); + +private: + wxMutex m_mutex; + wxCondition m_cond; + + int count, + maxcount; +}; + +wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount ) + : m_cond(m_mutex) +{ + + if ( (initialcount < 0) || ((maxcount > 0) && (initialcount > maxcount)) ) + { + wxFAIL_MSG( _T("wxSemaphore: invalid initial count") ); } - // now we can finally signal it - wxLogTrace(TRACE_THREADS, _T("wxCondition(%08x)::Signal(): preparing to %s"), - this, all ? _T("broadcast") : _T("signal")); + maxcount = maxcount; + count = initialcount; +} - int err = all ? pthread_cond_broadcast(&m_condition) - : pthread_cond_signal(&m_condition); +void wxSemaphoreInternal::Wait() +{ + wxMutexLocker locker(*m_mutex); - if ( all ) + while ( count <= 0 ) { - m_nWaiters = 0; + m_cond->Wait(); } - else + + count--; +} + +bool wxSemaphoreInternal::TryWait() +{ + wxMutexLocker locker(*m_mutex); + + if ( count <= 0 ) + return FALSE; + + count--; + + return TRUE; +} + +bool wxSemaphoreInternal::Wait( unsigned long timeout_millis ) +{ + wxMutexLocker locker( *m_mutex ); + + wxLongLong startTime = wxGetLocalTimeMillis(); + + while ( count <= 0 ) { - --m_nWaiters; + wxLongLong elapsed = wxGetLocalTimeMillis() - startTime; + long remainingTime = (long)timeout_millis - (long)elapsed.GetLo(); + if ( remainingTime <= 0 ) + return FALSE; + + bool result = m_cond->Wait( remainingTime ); + if ( !result ) + return FALSE; } - if ( err ) + count--; + + return TRUE; +} + +void wxSemaphoreInternal::Post() +{ + wxMutexLocker locker(*m_mutex); + + if ( (maxcount > 0) && (count == maxcount) ) { - // shouldn't ever happen - wxFAIL_MSG(_T("pthread_cond_{broadcast|signal}() failed")); + wxFAIL_MSG( _T("wxSemaphore::Post() overflow") ); } + + count++; + + m_cond->Signal(); } -// ---------------------------------------------------------------------------- -// wxCondition -// ---------------------------------------------------------------------------- +// -------------------------------------------------------------------------- +// wxSemaphore +// -------------------------------------------------------------------------- -wxCondition::wxCondition() +wxSemaphore::wxSemaphore( int initialcount, int maxcount ) { - m_internal = new wxConditionInternal; + m_internal = new wxSemaphoreInternal( initialcount, maxcount ); } -wxCondition::~wxCondition() +wxSemaphore::~wxSemaphore() { delete m_internal; } -void wxCondition::Wait() +void wxSemaphore::Wait() { - (void)m_internal->Wait(); + m_internal->Wait(); } -bool wxCondition::Wait(unsigned long sec, unsigned long nsec) +bool wxSemaphore::TryWait() { - timespec tspec; - - tspec.tv_sec = time(0L) + sec; // FIXME is time(0) correct here? - tspec.tv_nsec = nsec; - - return m_internal->Wait(&tspec); + return m_internal->TryWait(); } -void wxCondition::Signal() +bool wxSemaphore::Wait( unsigned long timeout_millis ) { - m_internal->Signal(); + return m_internal->Wait( timeout_millis ); } -void wxCondition::Broadcast() +void wxSemaphore::Post() { - m_internal->Signal(TRUE /* all */); + m_internal->Post(); } -// ============================================================================ +// This class is used by wxThreadInternal to support Delete() on +// a detached thread +class wxRefCountedCondition +{ +public: + // start with a initial reference count of 1 + wxRefCountedCondition() + { + m_refCount = 1; + m_signaled = FALSE; + + m_mutex = new wxMutex(); + m_cond = new wxCondition( m_mutex ); + } + + // increment the reference count + void AddRef() + { + wxMutexLocker locker( *m_mutex ); + + m_refCount++; + } + + // decrement the reference count if reference count is zero then delete the + // object + void DeleteRef() + { + bool shouldDelete = FALSE; + + m_mutex->Lock(); + + if ( --m_refCount == 0 ) + { + shouldDelete = TRUE; + } + + m_mutex->Unlock(); + + if ( shouldDelete ) + { + delete this; + } + } + + + // sets the object to signaled this signal will be a persistent signal all + // further Wait()s on the object will return without blocking + void SetSignaled() + { + wxMutexLocker locker( *m_mutex ); + + m_signaled = TRUE; + + m_cond->Broadcast(); + } + + // wait till the object is signaled if the object was already signaled then + // return immediately + void Wait() + { + wxMutexLocker locker( *m_mutex ); + + if ( !m_signaled ) + { + m_cond->Wait(); + } + } + +private: + int m_refCount; + + wxMutex *m_mutex; + wxCondition *m_cond; + + bool m_signaled; + + // Cannot delete this object directly, call DeleteRef() instead + ~wxRefCountedCondition() + { + delete m_cond; + delete m_mutex; + } + + // suppress gcc warning about the class having private dtor and not having + // friend (so what??) + friend class wxDummyFriend; +}; + +// =========================================================================== // wxThread implementation -// ============================================================================ +// =========================================================================== // the thread callback functions must have the C linkage extern "C" @@ -580,7 +716,7 @@ public: // wake up threads waiting for our termination void SignalExit(); // wake up threads waiting for our start - void SignalRun() { m_condRun.Signal(); } + void SignalRun() { m_semRun.Post(); } // go to sleep until Resume() is called void Pause(); // resume the thread @@ -630,7 +766,7 @@ private: // this flag is set when the thread should terminate bool m_cancelled; - // this flag is set when the thread is blocking on m_condSuspend + // this flag is set when the thread is blocking on m_semSuspend bool m_isPaused; // the thread exit code - only used for joinable (!detached) threads and @@ -644,22 +780,18 @@ private: bool m_shouldBroadcast; bool m_isDetached; - // VZ: it's possible that we might do with less than three different - // condition objects - for example, m_condRun and m_condEnd a priori - // won't be used in the same time. But for now I prefer this may be a - // bit less efficient but safer solution of having distinct condition - // variables for each purpose. - - // this condition is signaled by Run() and the threads Entry() is not + // this semaphore is posted by Run() and the threads Entry() is not // called before it is done - wxCondition m_condRun; + wxSemaphore m_semRun; // this one is signaled when the thread should resume after having been // Pause()d - wxCondition m_condSuspend; + wxSemaphore m_semSuspend; // finally this one is signalled when the thread exits - wxCondition m_condEnd; + // we are using a reference counted condition to support + // Delete() for a detached thread + wxRefCountedCondition *m_condEnd; }; // ---------------------------------------------------------------------------- @@ -697,8 +829,8 @@ void *wxThreadInternal::PthreadStart(wxThread *thread) pthread_cleanup_push(wxPthreadCleanup, thread); #endif // HAVE_THREAD_CLEANUP_FUNCTIONS - // wait for the condition to be signaled from Run() - pthread->m_condRun.Wait(); + // wait for the semaphore to be posted from Run() + pthread->m_semRun.Wait(); // test whether we should run the run at all - may be it was deleted // before it started to Run()? @@ -792,17 +924,20 @@ wxThreadInternal::wxThreadInternal() m_threadId = 0; m_exitcode = 0; - // set to TRUE only when the thread starts waiting on m_condSuspend + // set to TRUE only when the thread starts waiting on m_semSuspend m_isPaused = FALSE; // defaults for joinable threads m_shouldBeJoined = TRUE; m_shouldBroadcast = TRUE; m_isDetached = FALSE; + + m_condEnd = new wxRefCountedCondition(); } wxThreadInternal::~wxThreadInternal() { + m_condEnd->DeleteRef(); } wxThreadError wxThreadInternal::Run() @@ -832,7 +967,15 @@ void wxThreadInternal::Wait() // wait until the thread terminates (we're blocking in _another_ thread, // of course) - m_condEnd.Wait(); + + // a reference counting condition is used to handle the + // case where a detached thread deletes itself + // before m_condEnd->Wait() returns + // in this case the deletion of the condition object is deferred until + // all Wait()ing threads have finished calling DeleteRef() + m_condEnd->AddRef(); + m_condEnd->Wait(); + m_condEnd->DeleteRef(); wxLogTrace(TRACE_THREADS, _T("Finished waiting for thread %ld."), id); @@ -877,7 +1020,7 @@ void wxThreadInternal::SignalExit() wxLogTrace(TRACE_THREADS, _T("Thread %ld signals end condition."), GetId()); - m_condEnd.Broadcast(); + m_condEnd->SetSignaled(); } } @@ -890,8 +1033,8 @@ void wxThreadInternal::Pause() wxLogTrace(TRACE_THREADS, _T("Thread %ld goes to sleep."), GetId()); - // wait until the condition is signaled from Resume() - m_condSuspend.Wait(); + // wait until the semaphore is Post()ed from Resume() + m_semSuspend.Wait(); } void wxThreadInternal::Resume() @@ -906,7 +1049,7 @@ void wxThreadInternal::Resume() wxLogTrace(TRACE_THREADS, _T("Waking up thread %ld"), GetId()); // wake up Pause() - m_condSuspend.Signal(); + m_semSuspend.Post(); // reset the flag SetReallyPaused(FALSE); @@ -1307,7 +1450,7 @@ wxThreadError wxThread::Delete(ExitCode *rc) { case STATE_NEW: // we need to wake up the thread so that PthreadStart() will - // terminate - right now it's blocking on m_condRun + // terminate - right now it's blocking on m_semRun m_internal->SignalRun(); // fall through @@ -1490,7 +1633,8 @@ wxThread::~wxThread() // is not called for the joinable threads, so do it here if ( !m_isDetached ) { - MutexLock lock(gs_mutexDeleteThread); + wxMutexLocker lock( *gs_mutexDeleteThread ); + gs_nThreadsBeingDeleted--; wxLogTrace(TRACE_THREADS, _T("%u scheduled for deletion threads left."), @@ -1566,9 +1710,8 @@ bool wxThreadModule::OnInit() gs_mutexGui->Lock(); #endif // wxUSE_GUI - // under Solaris we get a warning from CC when using - // PTHREAD_MUTEX_INITIALIZER, so do it dynamically - pthread_mutex_init(&gs_mutexDeleteThread, NULL); + gs_mutexDeleteThread = new wxMutex(); + gs_condAllDeleted = new wxCondition( gs_mutexDeleteThread ); return TRUE; } @@ -1579,18 +1722,19 @@ void wxThreadModule::OnExit() // are there any threads left which are being deleted right now? size_t nThreadsBeingDeleted; + { - MutexLock lock(gs_mutexDeleteThread); + wxMutexLocker lock( *gs_mutexDeleteThread ); nThreadsBeingDeleted = gs_nThreadsBeingDeleted; - } - if ( nThreadsBeingDeleted > 0 ) - { - wxLogTrace(TRACE_THREADS, _T("Waiting for %u threads to disappear"), - nThreadsBeingDeleted); + if ( nThreadsBeingDeleted > 0 ) +{ + wxLogTrace(TRACE_THREADS, _T("Waiting for %u threads to disappear"), + nThreadsBeingDeleted); - // have to wait until all of them disappear - gs_condAllDeleted->Wait(); + // have to wait until all of them disappear + gs_condAllDeleted->Wait(); + } } // terminate any threads left @@ -1617,6 +1761,9 @@ void wxThreadModule::OnExit() // and free TLD slot (void)pthread_key_delete(gs_keySelf); + + delete gs_condAllDeleted; + delete gs_mutexDeleteThread; } // ---------------------------------------------------------------------------- @@ -1625,12 +1772,7 @@ void wxThreadModule::OnExit() static void ScheduleThreadForDeletion() { - MutexLock lock(gs_mutexDeleteThread); - - if ( gs_nThreadsBeingDeleted == 0 ) - { - gs_condAllDeleted = new wxCondition; - } + wxMutexLocker lock( *gs_mutexDeleteThread ); gs_nThreadsBeingDeleted++; @@ -1643,17 +1785,15 @@ static void DeleteThread(wxThread *This) { // gs_mutexDeleteThread should be unlocked before signalling the condition // or wxThreadModule::OnExit() would deadlock - { - MutexLock lock(gs_mutexDeleteThread); + wxMutexLocker locker( *gs_mutexDeleteThread ); - wxLogTrace(TRACE_THREADS, _T("Thread %ld auto deletes."), This->GetId()); + wxLogTrace(TRACE_THREADS, _T("Thread %ld auto deletes."), This->GetId()); - delete This; + delete This; - wxCHECK_RET( gs_nThreadsBeingDeleted > 0, + wxCHECK_RET( gs_nThreadsBeingDeleted > 0, _T("no threads scheduled for deletion, yet we delete " "one?") ); - } wxLogTrace(TRACE_THREADS, _T("%u scheduled for deletion threads left."), gs_nThreadsBeingDeleted - 1); @@ -1662,9 +1802,6 @@ static void DeleteThread(wxThread *This) { // no more threads left, signal it gs_condAllDeleted->Signal(); - - delete gs_condAllDeleted; - gs_condAllDeleted = (wxCondition *)NULL; } } -- 2.45.2