From 062c4861718a681a27384d6a2313fc26cda3b3e4 Mon Sep 17 00:00:00 2001 From: Guilhem Lavaux Date: Tue, 25 May 1999 17:14:56 +0000 Subject: [PATCH] * wxThread fixes * wxStream fix (Read(wxStreamBase)) * wxEvent: GTK idle loop waking up was actually good, reenabled. * wxSocket: all features working on Linux/RH6 (including high-level protocols) Needs testing on other platforms now. git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@2563 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775 --- include/wx/sckint.h | 8 ++- include/wx/socket.h | 2 + samples/wxsocket/client.cpp | 15 +++-- samples/wxsocket/server.cpp | 12 +++- src/common/event.cpp | 4 +- src/common/ftp.cpp | 8 +-- src/common/http.cpp | 3 +- src/common/sckint.cpp | 66 +++++++++++++-------- src/common/socket.cpp | 111 +++++++++++++++++++++++++++++++++++- src/common/stream.cpp | 2 +- src/unix/threadpsx.cpp | 26 +++++++-- 11 files changed, 204 insertions(+), 53 deletions(-) diff --git a/include/wx/sckint.h b/include/wx/sckint.h index 0a3ca5b5e5..3fec426a7a 100644 --- a/include/wx/sckint.h +++ b/include/wx/sckint.h @@ -82,7 +82,11 @@ class SocketWaiter: public wxThread { int m_fd; }; -class SocketRequester: public wxThread { +class SocketRequester +#if wxUSE_THREADS + : public wxThread +#endif + { public: SocketRequester(wxSocketBase *socket, wxSocketInternal *internal); ~SocketRequester(); @@ -136,8 +140,8 @@ class wxSocketInternal { wxMutex m_socket_locker, m_fd_locker, m_request_locker, m_end_requester; wxCondition m_socket_cond; SocketWaiter *m_thread_waiter; - SocketRequester *m_thread_requester; #endif + SocketRequester *m_thread_requester; wxList m_requests; int m_fd; bool m_invalid_requester; diff --git a/include/wx/socket.h b/include/wx/socket.h index a8161406c8..ef410dbc30 100644 --- a/include/wx/socket.h +++ b/include/wx/socket.h @@ -95,6 +95,8 @@ public: wxSocketBase& Read(char* buffer, size_t nbytes); wxSocketBase& Write(const char *buffer, size_t nbytes); wxSocketBase& Unread(const char *buffer, size_t nbytes); + wxSocketBase& ReadMsg(char *buffer, size_t nbytes); + wxSocketBase& WriteMsg(const char *buffer, size_t nbytes); void Discard(); // Try not to use this two methods (they sould be protected) diff --git a/samples/wxsocket/client.cpp b/samples/wxsocket/client.cpp index 6a163e47f7..646aabfe8c 100644 --- a/samples/wxsocket/client.cpp +++ b/samples/wxsocket/client.cpp @@ -3,7 +3,7 @@ * Purpose: wxSocket: client demo * Author: LAVAUX Guilhem * Created: June 1997 - * Updated: + * CVS ID: $Id$ * Copyright: (c) 1997, LAVAUX Guilhem */ @@ -163,11 +163,8 @@ void MyFrame::OnExecOpenConnection(wxCommandEvent& WXUNUSED(evt)) if (sock->IsConnected()) sock->Close(); -/* wxString hname = wxGetTextFromUser("Enter the address of the wxSocket Sample Server", "Connect ...", "localhost"); -*/ - wxString hname = "localhost"; addr.Hostname(hname); addr.Service(3000); sock->SetNotify(0); @@ -282,13 +279,15 @@ void MyFrame::OnExecUrlTest(wxCommandEvent& WXUNUSED(evt)) wxURL url(urlname); wxInputStream *datas = url.GetInputStream(); - if (!datas) - wxMessageBox("Error in getting data from the URL.", "Alert !"); - else { + if (!datas) { + wxString error; + error.Printf(_T("Error in getting data from the URL. (error = %d)"), url.GetError()); + wxMessageBox(error, "Alert !"); + } else { wxFileOutputStream *str_out = new wxFileOutputStream("test.url"); str_out->Write(*datas); - wxMessageBox("Success !! Click on OK to see the text.", "OK"); + wxMessageBox(_T("Success !! Click on OK to see the text."), "OK"); delete datas; delete str_out; } diff --git a/samples/wxsocket/server.cpp b/samples/wxsocket/server.cpp index 5a380483ac..56f2dfc303 100644 --- a/samples/wxsocket/server.cpp +++ b/samples/wxsocket/server.cpp @@ -3,7 +3,7 @@ * Purpose: wxSocket: server demo * Author: LAVAUX Guilhem * Created: June 1997 - * Updated: + * CVS Id: $Id$ * Copyright: (C) 1997, LAVAUX Guilhem */ @@ -102,6 +102,9 @@ void MyFrame::OnSockRequest(wxSocketEvent& evt) waiting socket thread, i.e. here we are not in the main GUI thread and thus we must not call any GUI function here. */ + /* Wrong ! This routine is called by the main GUI thread + because the main GUI thread received a signal from the other + thread using wxEvent::ProcessThreadEvent */ wxSocketBase *sock = evt.Socket(); @@ -119,6 +122,7 @@ void MyFrame::OnSockRequest(wxSocketEvent& evt) case wxSocketBase::EVT_LOST: printf("Destroying socket\n"); wxPendingDelete.Append(sock); + UpdateStatus(-1); return; break; } @@ -132,17 +136,21 @@ void MyFrame::OnSockRequestServer(wxSocketEvent& evt) waiting socket thread, i.e. here we are not in the main GUI thread and thus we must not call any GUI function here. */ + /* Wrong ! This routine is called by the main GUI thread + because the main GUI thread received a signal from the other + thread using wxEvent::ProcessThreadEvent */ wxSocketBase *sock2; wxSocketServer *server = (wxSocketServer *) evt.Socket(); printf("OnSockRequestServer OK\n"); - printf("OnSockRequest (event = %d)\n",evt.SocketEvent()); + printf("OnSockRequest (Main = %d) (event = %d)\n",wxThread::IsMain(), evt.SocketEvent()); sock2 = server->Accept(); if (sock2 == NULL) return; + UpdateStatus(1); sock2->SetFlags(wxSocketBase::SPEED); sock2->Notify(TRUE); sock2->SetEventHandler(*this, SKDEMO_SOCKET); diff --git a/src/common/event.cpp b/src/common/event.cpp index 0849fdb6a0..593d1fc131 100644 --- a/src/common/event.cpp +++ b/src/common/event.cpp @@ -563,8 +563,8 @@ bool wxEvtHandler::ProcessThreadEvent(wxEvent& event) wxPendingEventsLocker->Leave(); #ifdef __WXGTK__ -// if (g_isIdle) -// wxapp_install_idle_handler(); + if (g_isIdle) + wxapp_install_idle_handler(); #endif return TRUE; diff --git a/src/common/ftp.cpp b/src/common/ftp.cpp index fddfb1d185..090138b5d7 100644 --- a/src/common/ftp.cpp +++ b/src/common/ftp.cpp @@ -355,6 +355,7 @@ wxInputStream *wxFTP::GetInputStream(const wxString& path) in_stream->m_ftpsize = wxAtoi(WXSTRINGCAST str_size); } + sock->SetFlags(WAITALL); return in_stream; } @@ -401,13 +402,6 @@ wxList *wxFTP::GetList(const wxString& wildcard) return NULL; } - // Ininterresting ?! - /* - sock->SetEventHandler(*GetNextHandler(), m_id); - sock->Notify(m_notifyme); - sock->SetNotify(m_neededreq); - */ - return file_list; } #endif diff --git a/src/common/http.cpp b/src/common/http.cpp index 38cb120b72..19b1edf023 100644 --- a/src/common/http.cpp +++ b/src/common/http.cpp @@ -195,8 +195,8 @@ bool wxHTTP::BuildRequest(const wxString& path, wxHTTP_Req req) } SaveState(); + SetFlags(NONE); Notify(FALSE); - SetFlags(WAITALL); sprintf(buf, "%s %s HTTP/1.0\n\r", tmp_buf, (const char*)pathbuf); Write(buf, strlen(buf)); @@ -275,6 +275,7 @@ wxInputStream *wxHTTP::GetInputStream(const wxString& path) if (!GetHeader(_T("Content-Length")).IsEmpty()) inp_stream->m_httpsize = wxAtoi(WXSTRINGCAST GetHeader(_T("Content-Length"))); + SetFlags(WAITALL); return inp_stream; } diff --git a/src/common/sckint.cpp b/src/common/sckint.cpp index dd17b9bdf6..8531c9b3a9 100644 --- a/src/common/sckint.cpp +++ b/src/common/sckint.cpp @@ -100,7 +100,9 @@ void SocketWaiter::ProcessReadEvent() int ret; char c; + m_internal->AcquireFD(); ret = recv(m_fd, &c, 1, MSG_PEEK); + m_internal->ReleaseFD(); // We are a server => emit a EVT_ACCEPT event. if (ret == -1 && m_socket->GetType() == wxSocketBase::SOCK_SERVER) { @@ -167,14 +169,12 @@ void *SocketWaiter::Entry() #endif #endif - if (ret == 0) - // If nothing happened, we wait for 100 ms. - wxUsleep(10); + // We wait for 100 ms to prevent the CPU from burning. + wxUsleep(100); // Check whether we should exit. - if (TestDestroy()) { + if (TestDestroy()) return NULL; - } } return NULL; } @@ -185,7 +185,10 @@ void *SocketWaiter::Entry() SocketRequester::SocketRequester(wxSocketBase *socket, wxSocketInternal *internal) - : wxThread(), + : +#if wxUSE_THREADS + wxThread(), +#endif m_socket(socket), m_internal(internal), m_fd(internal->GetFD()) { } @@ -204,12 +207,12 @@ bool SocketRequester::WaitFor(wxSocketBase::wxRequestNotify req, int millisec) tv.tv_sec = millisec / 1000; tv.tv_usec = (millisec % 1000) * 1000; - if ((req & READ_MASK) != 0) - FD_ZERO(&sockrd_set); + FD_ZERO(&sockrd_set); FD_ZERO(&sockwr_set); - - FD_SET(m_fd, &sockrd_set); - FD_SET(m_fd, &sockwr_set); + if ((req & READ_MASK) != 0) + FD_SET(m_fd, &sockrd_set); + if ((req & WRITE_MASK) != 0) + FD_SET(m_fd, &sockwr_set); m_internal->AcquireFD(); ret = select(m_fd+1, &sockrd_set, &sockwr_set, NULL, &tv); @@ -247,6 +250,9 @@ void SocketRequester::ProcessReadEvent(SockRequest *req) req->size -= len; req->io_nbytes += len; req->buffer += len; + + if (len == 0) + m_internal->EndRequest(req); return; } // The End. @@ -337,21 +343,17 @@ void *SocketRequester::Entry() wxSocketInternal::wxSocketInternal(wxSocketBase *socket) { m_socket = socket; -#if wxUSE_THREADS m_thread_requester = NULL; m_thread_waiter = NULL; m_invalid_requester = TRUE; -#endif } wxSocketInternal::~wxSocketInternal() { -#if wxUSE_THREADS StopRequester(); wxASSERT(m_thread_requester == NULL); StopWaiter(); wxASSERT(m_thread_waiter == NULL); -#endif } // ---------------------------------------------------------------------- @@ -363,10 +365,12 @@ SockRequest *wxSocketInternal::WaitForReq() #if wxUSE_THREADS wxNode *node; + // First try. node = m_requests.First(); if (node == NULL) { m_socket_cond.Wait(m_request_locker, 10, 0); + // Second try, if it is unsuccessul we give up. node = m_requests.First(); if (node == NULL) return NULL; @@ -425,14 +429,17 @@ void wxSocketInternal::ResumeRequester() #if wxUSE_THREADS wxThreadError err; - wxASSERT(m_thread_requester == NULL || m_invalid_requester); + wxASSERT(m_invalid_requester); m_end_requester.Lock(); - if (m_invalid_requester) { - if (m_thread_requester != NULL) - delete m_thread_requester; - m_invalid_requester = FALSE; + + if (m_thread_requester != NULL) { + m_thread_requester->Delete(); // We must join it. + delete m_thread_requester; } + + m_invalid_requester = FALSE; + m_end_requester.Unlock(); m_thread_requester = new SocketRequester(m_socket, this); @@ -442,7 +449,11 @@ void wxSocketInternal::ResumeRequester() err = m_thread_requester->Run(); wxASSERT(err == wxTHREAD_NO_ERROR); - +#else + if (!m_invalid_requester) + return; + m_thread_requester = new SocketRequester(m_socket, this); + m_invalid_requester = FALSE; #endif } @@ -453,6 +464,7 @@ void wxSocketInternal::StopRequester() if (m_invalid_requester) { m_end_requester.Unlock(); if (m_thread_requester) { + m_thread_requester->Delete(); delete m_thread_requester; m_thread_requester = NULL; } @@ -475,6 +487,11 @@ void wxSocketInternal::StopRequester() delete m_thread_requester; m_thread_requester = NULL; + m_invalid_requester = TRUE; +#else + delete m_thread_requester; + m_thread_requester = NULL; + m_invalid_requester = TRUE; #endif } @@ -488,6 +505,8 @@ void wxSocketInternal::ResumeWaiter() m_thread_waiter = new SocketWaiter(m_socket, this); + m_thread_waiter->SetPriority(WXTHREAD_MIN_PRIORITY); + err = m_thread_waiter->Create(); wxASSERT(err == wxTHREAD_NO_ERROR); @@ -514,11 +533,10 @@ void wxSocketInternal::StopWaiter() // ---------------------------------------------------------------------- void wxSocketInternal::QueueRequest(SockRequest *request, bool async) { -#if wxUSE_THREADS if (m_invalid_requester) ResumeRequester(); - async = FALSE; +#if wxUSE_THREADS if (async) { m_request_locker.Lock(); @@ -541,6 +559,7 @@ void wxSocketInternal::QueueRequest(SockRequest *request, bool async) } } else { m_request_locker.Lock(); +#endif if ((request->type & wxSocketBase::REQ_WAIT) != 0) { m_thread_requester->ProcessWaitEvent(request); @@ -559,6 +578,7 @@ void wxSocketInternal::QueueRequest(SockRequest *request, bool async) } } request->done = TRUE; +#if wxUSE_THREADS m_request_locker.Unlock(); } #endif diff --git a/src/common/socket.cpp b/src/common/socket.cpp index c568508af1..883ffb82eb 100644 --- a/src/common/socket.cpp +++ b/src/common/socket.cpp @@ -260,6 +260,65 @@ wxSocketBase& wxSocketBase::Read(char* buffer, size_t nbytes) return *this; } +wxSocketBase& wxSocketBase::ReadMsg(char* buffer, size_t nbytes) +{ + unsigned long len, len2, sig; + struct { + char sig[4]; + char len[4]; + } msg; + + // sig should be an explicit 32-bit unsigned integer; I've seen + // compilers in which size_t was actually a 16-bit unsigned integer + + Read((char *)&msg, sizeof(msg)); + if (m_lcount != sizeof(msg)) + return *this; + + sig = msg.sig[0] & 0xff; + sig |= (size_t)(msg.sig[1] & 0xff) << 8; + sig |= (size_t)(msg.sig[2] & 0xff) << 16; + sig |= (size_t)(msg.sig[3] & 0xff) << 24; + + if (sig != 0xfeeddead) + return *this; + len = msg.len[0] & 0xff; + len |= (size_t)(msg.len[1] & 0xff) << 8; + len |= (size_t)(msg.len[2] & 0xff) << 16; + len |= (size_t)(msg.len[3] & 0xff) << 24; + + // len2 is incorrectly computed in the original; this sequence is + // the fix + if (len > nbytes) { + len2 = len - nbytes; + len = nbytes; + } + else + len2 = 0; + + // the "len &&" in the following statement is necessary so that + // we don't attempt to read (and possibly hang the system) + // if the message was zero bytes long + if (len && Read(buffer, len).LastCount() != len) + return *this; + if (len2 && (Read(NULL, len2).LastCount() != len2)) + return *this; + if (Read((char *)&msg, sizeof(msg)).LastCount() != sizeof(msg)) + return *this; + + sig = msg.sig[0] & 0xff; + sig |= (size_t)(msg.sig[1] & 0xff) << 8; + sig |= (size_t)(msg.sig[2] & 0xff) << 16; + sig |= (size_t)(msg.sig[3] & 0xff) << 24; +// ERROR +// we return *this either way, so a smart optimizer will +// optimize the following sequence out; I'm leaving it in anyway + if (sig != 0xdeadfeed) + return *this; + + return *this; +} + wxSocketBase& wxSocketBase::Peek(char* buffer, size_t nbytes) { m_lcount = GetPushback(buffer, nbytes, TRUE); @@ -282,9 +341,54 @@ wxSocketBase& wxSocketBase::Write(const char *buffer, size_t nbytes) return *this; } +wxSocketBase& wxSocketBase::WriteMsg(const char *buffer, size_t nbytes) +{ + struct { + char sig[4]; + char len[4]; + } msg; + + // warning about 'cast truncates constant value' +#ifdef __VISUALC__ + #pragma warning(disable: 4310) +#endif // __VISUALC__ + + msg.sig[0] = (char) 0xad; + msg.sig[1] = (char) 0xde; + msg.sig[2] = (char) 0xed; + msg.sig[3] = (char) 0xfe; + + msg.len[0] = (char) nbytes & 0xff; + msg.len[1] = (char) (nbytes >> 8) & 0xff; + msg.len[2] = (char) (nbytes >> 16) & 0xff; + msg.len[3] = (char) (nbytes >> 24) & 0xff; + + if (Write((char *)&msg, sizeof(msg)).LastCount() < sizeof(msg)) + return *this; + if (Write(buffer, nbytes).LastCount() < nbytes) + return *this; + + msg.sig[0] = (char) 0xed; + msg.sig[1] = (char) 0xfe; + msg.sig[2] = (char) 0xad; + msg.sig[3] = (char) 0xde; + msg.len[0] = msg.len[1] = msg.len[2] = msg.len[3] = (char) 0; + Write((char *)&msg, sizeof(msg)); + + return *this; + +#ifdef __VISUALC__ + #pragma warning(default: 4310) +#endif // __VISUALC__ +} + wxSocketBase& wxSocketBase::Unread(const char *buffer, size_t nbytes) { - CreatePushbackAfter(buffer, nbytes); + m_lcount = 0; + if (nbytes != 0) { + CreatePushbackAfter(buffer, nbytes); + m_lcount = nbytes; + } return *this; } @@ -304,7 +408,7 @@ bool wxSocketBase::IsData() const tv.tv_usec = 0; FD_ZERO(&sock_set); FD_SET(m_fd, &sock_set); - select(FD_SETSIZE, &sock_set, NULL, NULL, &tv); + select(m_fd+1, &sock_set, NULL, NULL, &tv); m_internal->ReleaseFD(); @@ -424,7 +528,9 @@ void wxSocketBase::RestoreState() state = (SocketState *)node->Data(); SetFlags(state->socket_flags); + m_internal->AcquireData(); m_neededreq = state->evt_notify_state; + m_internal->ReleaseData(); m_cbk = state->c_callback; m_cdata = state->c_callback_data; Notify(state->notify_state); @@ -682,7 +788,6 @@ void wxSocketBase::WantBuffer(char *buffer, size_t nbytes, SockRequest *buf = new SockRequest; SaveState(); - m_internal->StopWaiter(); buf->buffer = buffer; buf->size = nbytes; buf->done = FALSE; diff --git a/src/common/stream.cpp b/src/common/stream.cpp index 1c34deffff..d5954d71d8 100644 --- a/src/common/stream.cpp +++ b/src/common/stream.cpp @@ -340,7 +340,7 @@ size_t wxStreamBuffer::Read(wxStreamBuffer *s_buf) if (m_mode == write) return 0; - while (bytes_read == BUF_TEMP_SIZE) { + while (bytes_read != 0) { bytes_read = Read(buf, bytes_read); bytes_read = s_buf->Write(buf, bytes_read); s += bytes_read; diff --git a/src/unix/threadpsx.cpp b/src/unix/threadpsx.cpp index edd373cdc4..aad7082be4 100644 --- a/src/unix/threadpsx.cpp +++ b/src/unix/threadpsx.cpp @@ -271,6 +271,8 @@ private: // state // 2. The Delete() function blocks until the condition is signaled when the // thread exits. + // GL: On Linux, this may fail because we can have a deadlock in either + // SignalExit() or Wait(): so we add m_end_mutex for the finalization. wxMutex m_mutex, m_end_mutex; wxCondition m_cond; @@ -299,7 +301,7 @@ void *wxThreadInternal::PthreadStart(void *ptr) } #if HAVE_THREAD_CLEANUP_FUNCTIONS // Install the cleanup handler. -// pthread_cleanup_push(wxThreadInternal::PthreadCleanup, ptr); + pthread_cleanup_push(wxThreadInternal::PthreadCleanup, ptr); #endif // wait for the condition to be signaled from Run() @@ -311,7 +313,7 @@ void *wxThreadInternal::PthreadStart(void *ptr) status = thread->Entry(); #if HAVE_THREAD_CLEANUP_FUNCTIONS -// pthread_cleanup_pop(FALSE); + pthread_cleanup_pop(FALSE); #endif // terminate the thread @@ -371,8 +373,16 @@ wxThreadInternal::~wxThreadInternal() // note that m_mutex will be unlocked by the thread which waits for our // termination - // m_end_mutex can be unlocked here. - m_end_mutex.Unlock(); + // In the case, we didn't start the thread, all these mutex are locked: + // we must unlock them. + if (m_mutex.IsLocked()) + m_mutex.Unlock(); + + if (m_end_mutex.IsLocked()) + m_end_mutex.Unlock(); + + if (m_mutexSuspend.IsLocked()) + m_mutexSuspend.Unlock(); } wxThreadError wxThreadInternal::Run() @@ -748,6 +758,14 @@ bool wxThread::TestDestroy() wxThread::~wxThread() { + m_critsect.Enter(); + if (p_internal->GetState() != STATE_EXITED && + p_internal->GetState() != STATE_NEW) + wxLogDebug(_T("The thread is being destroyed althought it is still running ! The application may crash.")); + + m_critsect.Leave(); + + delete p_internal; // remove this thread from the global array gs_allThreads.Remove(this); } -- 2.45.2