}
}
- ~wxMutexInternal() { if ( m_mutex ) CloseHandle(m_mutex); }
+ ~wxMutexInternal() { if ( m_mutex ) ::CloseHandle(m_mutex); }
public:
HANDLE m_mutex;
return wxMUTEX_NO_ERROR;
}
-// ----------------------------------------------------------------------------
-// wxCondition implementation
-// ----------------------------------------------------------------------------
+// ==========================================================================
+// wxSemaphore
+// ==========================================================================
-class wxConditionInternal
+// --------------------------------------------------------------------------
+// wxSemaphoreInternal
+// --------------------------------------------------------------------------
+
+class wxSemaphoreInternal
{
public:
- wxConditionInternal()
+ wxSemaphoreInternal( int initialcount = 0, int maxcount = 0 );
+ ~wxSemaphoreInternal();
+
+ void Wait();
+ bool TryWait();
+
+ bool Wait( unsigned long timeout_millis );
+
+ void Post();
+
+private:
+ HANDLE m_semaphore;
+};
+
+wxSemaphoreInternal::wxSemaphoreInternal( int initialcount, int maxcount )
+{
+ if ( maxcount == 0 )
{
- m_hEvent = ::CreateEvent(
- NULL, // default secutiry
- FALSE, // not manual reset
- FALSE, // nonsignaled initially
- NULL // nameless event
- );
- if ( !m_hEvent )
- {
- wxLogSysError(_("Can not create event object."));
- }
+ // make it practically infinite
+ maxcount = INT_MAX;
+ }
- // nobody waits for us yet
- m_nWaiters = 0;
+ m_semaphore = ::CreateSemaphore( NULL, initialcount, maxcount, NULL );
+ if ( !m_semaphore )
+ {
+ wxLogLastError(_T("CreateSemaphore()"));
}
+}
+
+wxSemaphoreInternal::~wxSemaphoreInternal()
+{
+ CloseHandle( m_semaphore );
+}
- bool Wait(DWORD timeout)
+void wxSemaphoreInternal::Wait()
+{
+ if ( ::WaitForSingleObject( m_semaphore, INFINITE ) != WAIT_OBJECT_0 )
{
- // as m_nWaiters variable is accessed from multiple waiting threads
- // (and possibly from the broadcasting thread), we need to change its
- // value atomically
- ::InterlockedIncrement(&m_nWaiters);
+ wxLogLastError(_T("WaitForSingleObject"));
+ }
+}
+
+bool wxSemaphoreInternal::TryWait()
+{
+ return Wait(0);
+}
+
+bool wxSemaphoreInternal::Wait( unsigned long timeout_millis )
+{
+ DWORD result = ::WaitForSingleObject( m_semaphore, timeout_millis );
- // FIXME this should be MsgWaitForMultipleObjects() as we want to keep
- // processing Windows messages while waiting (or don't we?)
- DWORD rc = ::WaitForSingleObject(m_hEvent, timeout);
+ switch ( result )
+ {
+ case WAIT_OBJECT_0:
+ return TRUE;
- ::InterlockedDecrement(&m_nWaiters);
+ case WAIT_TIMEOUT:
+ break;
- return rc != WAIT_TIMEOUT;
+ default:
+ wxLogLastError(_T("WaitForSingleObject()"));
}
- void Signal()
+ return FALSE;
+}
+
+void wxSemaphoreInternal::Post()
+{
+ if ( !::ReleaseSemaphore( m_semaphore, 1, NULL ) )
{
- // set the event to signaled: if a thread is already waiting on it, it
- // will be woken up, otherwise the event will remain in the signaled
- // state until someone waits on it. In any case, the system will return
- // it to a non signalled state afterwards. If multiple threads are
- // waiting, only one will be woken up.
- if ( !::SetEvent(m_hEvent) )
- {
- wxLogLastError(wxT("SetEvent"));
- }
+ wxLogLastError(_T("ReleaseSemaphore"));
}
+}
+
+// --------------------------------------------------------------------------
+// wxSemaphore
+// --------------------------------------------------------------------------
+
+wxSemaphore::wxSemaphore( int initialcount, int maxcount )
+{
+ m_internal = new wxSemaphoreInternal( initialcount, maxcount );
+}
+
+wxSemaphore::~wxSemaphore()
+{
+ delete m_internal;
+}
+
+void wxSemaphore::Wait()
+{
+ m_internal->Wait();
+}
+
+bool wxSemaphore::TryWait()
+{
+ return m_internal->TryWait();
+}
+
+bool wxSemaphore::Wait( unsigned long timeout_millis )
+{
+ return m_internal->Wait( timeout_millis );
+}
+
+void wxSemaphore::Post()
+{
+ m_internal->Post();
+}
+
+
+// ==========================================================================
+// wxCondition
+// ==========================================================================
- void Broadcast()
+// --------------------------------------------------------------------------
+// wxConditionInternal
+// --------------------------------------------------------------------------
+
+class wxConditionInternal
+{
+public:
+ wxConditionInternal(wxMutex& mutex);
+
+ void Wait();
+
+ bool Wait( unsigned long timeout_millis );
+
+ void Signal();
+
+ void Broadcast();
+
+private:
+ int m_numWaiters;
+ wxMutex m_mutexNumWaiters;
+
+ wxMutex& m_mutex;
+
+ wxSemaphore m_semaphore;
+
+ DECLARE_NO_COPY_CLASS(wxConditionInternal)
+};
+
+wxConditionInternal::wxConditionInternal(wxMutex& mutex)
+ : m_mutex(mutex)
+{
+
+ m_numWaiters = 0;
+}
+
+void wxConditionInternal::Wait()
+{
+ // increment the number of waiters
+ m_mutexNumWaiters.Lock();
+ m_numWaiters++;
+ m_mutexNumWaiters.Unlock();
+
+ m_mutex.Unlock();
+
+ // a potential race condition can occur here
+ //
+ // after a thread increments nwaiters, and unlocks the mutex and before the
+ // semaphore.Wait() is called, if another thread can cause a signal to be
+ // generated
+ //
+ // this race condition is handled by using a semaphore and incrementing the
+ // semaphore only if 'nwaiters' is greater that zero since the semaphore,
+ // can 'remember' signals the race condition will not occur
+
+ // wait ( if necessary ) and decrement semaphore
+ m_semaphore.Wait();
+
+ m_mutex.Lock();
+}
+
+bool wxConditionInternal::Wait( unsigned long timeout_millis )
+{
+ m_mutexNumWaiters.Lock();
+ m_numWaiters++;
+ m_mutexNumWaiters.Unlock();
+
+ m_mutex.Unlock();
+
+ // a race condition can occur at this point in the code
+ //
+ // please see the comments in Wait(), for details
+
+ bool success = TRUE;
+
+ bool result = m_semaphore.Wait( timeout_millis );
+
+ if ( !result )
{
- // we need to save the original value as m_nWaiters is goign to be
- // decreased by the signalled thread resulting in the loop being
- // executed less times than needed
- LONG nWaiters = m_nWaiters;
-
- // this works because all these threads are already waiting and so each
- // SetEvent() inside Signal() is really a PulseEvent() because the
- // event state is immediately returned to non-signaled
- for ( LONG n = 0; n < nWaiters; n++ )
+ // another potential race condition exists here it is caused when a
+ // 'waiting' thread timesout, and returns from WaitForSingleObject, but
+ // has not yet decremented 'nwaiters'.
+ //
+ // at this point if another thread calls signal() then the semaphore
+ // will be incremented, but the waiting thread will miss it.
+ //
+ // to handle this particular case, the waiting thread calls
+ // WaitForSingleObject again with a timeout of 0, after locking
+ // 'nwaiters_mutex'. this call does not block because of the zero
+ // timeout, but will allow the waiting thread to catch the missed
+ // signals.
+ m_mutexNumWaiters.Lock();
+ result = m_semaphore.Wait( 0 );
+
+ if ( !result )
{
- Signal();
+ m_numWaiters--;
+ success = FALSE;
}
+
+ m_mutexNumWaiters.Unlock();
+ }
+
+ m_mutex.Lock();
+
+ return success;
+}
+
+void wxConditionInternal::Signal()
+{
+ m_mutexNumWaiters.Lock();
+
+ if ( m_numWaiters > 0 )
+ {
+ // increment the semaphore by 1
+ m_semaphore.Post();
+
+ m_numWaiters--;
}
- ~wxConditionInternal()
+ m_mutexNumWaiters.Unlock();
+}
+
+void wxConditionInternal::Broadcast()
+{
+ m_mutexNumWaiters.Lock();
+
+ while ( m_numWaiters > 0 )
{
- if ( m_hEvent )
- {
- if ( !::CloseHandle(m_hEvent) )
- {
- wxLogLastError(wxT("CloseHandle(event)"));
- }
- }
+ m_semaphore.Post();
+ m_numWaiters--;
}
-private:
- // the Win32 synchronization object corresponding to this event
- HANDLE m_hEvent;
+ m_mutexNumWaiters.Unlock();
+}
- // number of threads waiting for this condition
- LONG m_nWaiters;
-};
+// ----------------------------------------------------------------------------
+// wxCondition implementation
+// ----------------------------------------------------------------------------
-wxCondition::wxCondition()
+wxCondition::wxCondition(wxMutex& mutex)
{
- m_internal = new wxConditionInternal;
+ m_internal = new wxConditionInternal( mutex );
}
wxCondition::~wxCondition()
void wxCondition::Wait()
{
- (void)m_internal->Wait(INFINITE);
+ m_internal->Wait();
}
-bool wxCondition::Wait(unsigned long sec,
- unsigned long nsec)
+bool wxCondition::Wait( unsigned long timeout_millis )
{
- return m_internal->Wait(sec*1000 + nsec/1000000);
+ return m_internal->Wait(timeout_millis);
}
void wxCondition::Signal()
{
#ifdef __WXDEBUG__
// Done this way to stop warnings during compilation about statement
- // always being false
- int csSize = sizeof(CRITICAL_SECTION);
- int bSize = sizeof(m_buffer);
+ // always being FALSE
+ int csSize = sizeof(CRITICAL_SECTION);
+ int bSize = sizeof(m_buffer);
wxASSERT_MSG( csSize <= bSize,
_T("must increase buffer size in wx/thread.h") );
#endif
}
// create a new (suspended) thread (for the given thread object)
- bool Create(wxThread *thread);
+ bool Create(wxThread *thread, unsigned int stackSize);
// suspend/resume/terminate
bool Suspend();
DWORD m_tid; // thread id
};
-THREAD_RETVAL wxThreadInternal::WinThreadStart(void *param)
+THREAD_RETVAL THREAD_CALLCONV wxThreadInternal::WinThreadStart(void *param)
{
THREAD_RETVAL rc;
bool wasCancelled;
}
}
-bool wxThreadInternal::Create(wxThread *thread)
+bool wxThreadInternal::Create(wxThread *thread, unsigned int stackSize)
{
// for compilers which have it, we should use C RTL function for thread
// creation instead of Win32 API one because otherwise we will have memory
// leaks if the thread uses C RTL (and most threads do)
#ifdef wxUSE_BEGIN_THREAD
+
+ // Watcom is reported to not like 0 stack size (which means "use default"
+ // for the other compilers and is also the default value for stackSize)
+#ifdef __WATCOMC__
+ if ( !stackSize )
+ stackSize = 10240;
+#endif // __WATCOMC__
+
m_hThread = (HANDLE)_beginthreadex
(
- NULL, // default security
-#ifdef __WATCOMC__
- 10240, // stack size can't be NULL in Watcom
-#else
- 0, // default stack size
-#endif
- wxThreadInternal::WinThreadStart, // entry point
- thread,
- CREATE_SUSPENDED,
- (unsigned int *)&m_tid
+ NULL, // default security
+ stackSize,
+ wxThreadInternal::WinThreadStart, // entry point
+ thread,
+ CREATE_SUSPENDED,
+ (unsigned int *)&m_tid
);
#else // compiler doesn't have _beginthreadex
m_hThread = ::CreateThread
(
NULL, // default security
- 0, // default stack size
+ stackSize, // stack size
wxThreadInternal::WinThreadStart, // thread entry point
(LPVOID)thread, // parameter
CREATE_SUSPENDED, // flags
return si.dwNumberOfProcessors;
}
+unsigned long wxThread::GetCurrentId()
+{
+ return (unsigned long)::GetCurrentThreadId();
+}
+
bool wxThread::SetConcurrency(size_t level)
{
wxASSERT_MSG( IsMain(), _T("should only be called from the main thread") );
// create/start thread
// -------------------
-wxThreadError wxThread::Create()
+wxThreadError wxThread::Create(unsigned int stackSize)
{
wxCriticalSectionLocker lock(m_critsect);
- if ( !m_internal->Create(this) )
+ if ( !m_internal->Create(this, stackSize) )
return wxTHREAD_NO_RESOURCE;
return wxTHREAD_NO_ERROR;
}
}
- if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) )
+ // although the thread might be already in the EXITED state it might not
+ // have terminated yet and so we are not sure that it has actually
+ // terminated if the "if" above hadn't been taken
+ do
{
- wxLogLastError(wxT("GetExitCodeThread"));
+ if ( !::GetExitCodeThread(hThread, (LPDWORD)&rc) )
+ {
+ wxLogLastError(wxT("GetExitCodeThread"));
- rc = (ExitCode)-1;
- }
+ rc = (ExitCode)-1;
+ }
+ } while ( (DWORD)rc == STILL_ACTIVE );
if ( IsDetached() )
{
delete this;
}
- wxASSERT_MSG( (DWORD)rc != STILL_ACTIVE,
- wxT("thread must be already terminated.") );
-
if ( pRc )
*pRc = rc;
}
#endif // wxUSE_THREADS
+
+// vi:sts=4:sw=4:et