]> git.saurik.com Git - wxWidgets.git/commitdiff
* wxThread fixes
authorGuilhem Lavaux <lavaux@easynet.fr>
Tue, 25 May 1999 17:14:56 +0000 (17:14 +0000)
committerGuilhem Lavaux <lavaux@easynet.fr>
Tue, 25 May 1999 17:14:56 +0000 (17:14 +0000)
* wxStream fix (Read(wxStreamBase))
* wxEvent: GTK idle loop waking up was actually good, reenabled.
* wxSocket: all features working on Linux/RH6 (including high-level protocols)
       Needs testing on other platforms now.

git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@2563 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775

include/wx/sckint.h
include/wx/socket.h
samples/wxsocket/client.cpp
samples/wxsocket/server.cpp
src/common/event.cpp
src/common/ftp.cpp
src/common/http.cpp
src/common/sckint.cpp
src/common/socket.cpp
src/common/stream.cpp
src/unix/threadpsx.cpp

index 0a3ca5b5e576d1432085b790693ec92f90ea839c..3fec426a7a1efe1c5e57dde1174da75128f461d5 100644 (file)
@@ -82,7 +82,11 @@ class SocketWaiter: public wxThread {
   int m_fd;
 };
 
-class SocketRequester: public wxThread {
+class SocketRequester
+#if wxUSE_THREADS
+  : public wxThread 
+#endif
+         {
  public:
   SocketRequester(wxSocketBase *socket, wxSocketInternal *internal);
   ~SocketRequester();
@@ -136,8 +140,8 @@ class wxSocketInternal {
   wxMutex m_socket_locker, m_fd_locker, m_request_locker, m_end_requester;
   wxCondition m_socket_cond;
   SocketWaiter *m_thread_waiter;
-  SocketRequester *m_thread_requester;
 #endif
+  SocketRequester *m_thread_requester;
   wxList m_requests;
   int m_fd;
   bool m_invalid_requester;
index a8161406c8f31b97c476b01964c16673dd4c6912..ef410dbc3050eeb6b92aecceaff335af2b463de3 100644 (file)
@@ -95,6 +95,8 @@ public:
   wxSocketBase& Read(char* buffer, size_t nbytes);
   wxSocketBase& Write(const char *buffer, size_t nbytes);
   wxSocketBase& Unread(const char *buffer, size_t nbytes);
+  wxSocketBase& ReadMsg(char *buffer, size_t nbytes);
+  wxSocketBase& WriteMsg(const char *buffer, size_t nbytes);
   void Discard();
 
   // Try not to use this two methods (they sould be protected)
index 6a163e47f7c18150e30c18cb6e497189fe6d0f59..646aabfe8ca3976e9e0054de585f54ffaa3e0382 100644 (file)
@@ -3,7 +3,7 @@
  * Purpose:    wxSocket: client demo
  * Author:     LAVAUX Guilhem
  * Created:    June 1997
- * Updated:    
+ * CVS ID:     $Id$
  * Copyright:  (c) 1997, LAVAUX Guilhem
  */
 
@@ -163,11 +163,8 @@ void MyFrame::OnExecOpenConnection(wxCommandEvent& WXUNUSED(evt))
   if (sock->IsConnected())
     sock->Close();
 
-/*
   wxString hname = wxGetTextFromUser("Enter the address of the wxSocket Sample Server", 
                                   "Connect ...", "localhost");
-*/
-  wxString hname = "localhost";
   addr.Hostname(hname);
   addr.Service(3000);
   sock->SetNotify(0);
@@ -282,13 +279,15 @@ void MyFrame::OnExecUrlTest(wxCommandEvent& WXUNUSED(evt))
   wxURL url(urlname);
   wxInputStream *datas = url.GetInputStream();
 
-  if (!datas)
-    wxMessageBox("Error in getting data from the URL.", "Alert !");
-  else {
+  if (!datas) {
+    wxString error;
+    error.Printf(_T("Error in getting data from the URL. (error = %d)"), url.GetError());
+    wxMessageBox(error, "Alert !");
+  } else {
     wxFileOutputStream *str_out = new wxFileOutputStream("test.url");
     str_out->Write(*datas);
 
-    wxMessageBox("Success !! Click on OK to see the text.", "OK");
+    wxMessageBox(_T("Success !! Click on OK to see the text."), "OK");
     delete datas;
     delete str_out;
   }
index 5a380483ac7a77b17d097a7711c9bf0ea07226d2..56f2dfc30317e7ccb04b8bbccfe346561a51dbec 100644 (file)
@@ -3,7 +3,7 @@
  * Purpose:    wxSocket: server demo
  * Author:     LAVAUX Guilhem
  * Created:    June 1997
- * Updated:    
+ * CVS Id:     $Id$
  * Copyright: (C) 1997, LAVAUX Guilhem
  */
 
@@ -102,6 +102,9 @@ void MyFrame::OnSockRequest(wxSocketEvent& evt)
      waiting socket thread, i.e. here we are
      not in the main GUI thread and thus we
      must not call any GUI function here. */
+  /* Wrong ! This routine is called by the main GUI thread
+     because the main GUI thread received a signal from the other
+     thread using wxEvent::ProcessThreadEvent */
 
   wxSocketBase *sock = evt.Socket();
 
@@ -119,6 +122,7 @@ void MyFrame::OnSockRequest(wxSocketEvent& evt)
   case wxSocketBase::EVT_LOST:
     printf("Destroying socket\n");
     wxPendingDelete.Append(sock);
+    UpdateStatus(-1);
     return;
     break;
   }
@@ -132,17 +136,21 @@ void MyFrame::OnSockRequestServer(wxSocketEvent& evt)
      waiting socket thread, i.e. here we are
      not in the main GUI thread and thus we
      must not call any GUI function here. */
+  /* Wrong ! This routine is called by the main GUI thread
+     because the main GUI thread received a signal from the other
+     thread using wxEvent::ProcessThreadEvent */
 
   wxSocketBase *sock2;
   wxSocketServer *server = (wxSocketServer *) evt.Socket();
 
   printf("OnSockRequestServer OK\n");
-  printf("OnSockRequest (event = %d)\n",evt.SocketEvent());
+  printf("OnSockRequest (Main = %d) (event = %d)\n",wxThread::IsMain(), evt.SocketEvent());
 
   sock2 = server->Accept();
   if (sock2 == NULL)
     return;
 
+  UpdateStatus(1);
   sock2->SetFlags(wxSocketBase::SPEED);
   sock2->Notify(TRUE);
   sock2->SetEventHandler(*this, SKDEMO_SOCKET);
index 0849fdb6a0617c6bcce878a46cc29a18f523aa54..593d1fc13132f0b00c1bfe80260cd0987b27c791 100644 (file)
@@ -563,8 +563,8 @@ bool wxEvtHandler::ProcessThreadEvent(wxEvent& event)
     wxPendingEventsLocker->Leave();
 
 #ifdef __WXGTK__
-//    if (g_isIdle) 
-//        wxapp_install_idle_handler();
+    if (g_isIdle) 
+        wxapp_install_idle_handler();
 #endif
 
     return TRUE;
index fddfb1d1851c35fa39be367fb3973e34d5cccf36..090138b5d7a1cb472402f93fb175f19a65c6ef18 100644 (file)
@@ -355,6 +355,7 @@ wxInputStream *wxFTP::GetInputStream(const wxString& path)
 
     in_stream->m_ftpsize = wxAtoi(WXSTRINGCAST str_size);
   }
+  sock->SetFlags(WAITALL);
 
   return in_stream;
 }
@@ -401,13 +402,6 @@ wxList *wxFTP::GetList(const wxString& wildcard)
     return NULL;
   }
 
-  // Ininterresting ?!
-  /*
-  sock->SetEventHandler(*GetNextHandler(), m_id);
-  sock->Notify(m_notifyme);
-  sock->SetNotify(m_neededreq);
-  */
-
   return file_list;
 }
 #endif
index 38cb120b729085f2e0134bdd60bbaadc2f78c06e..19b1edf023052df280bf08c481c4304284d89d40 100644 (file)
@@ -195,8 +195,8 @@ bool wxHTTP::BuildRequest(const wxString& path, wxHTTP_Req req)
   }
 
   SaveState();
+  SetFlags(NONE);
   Notify(FALSE);
-  SetFlags(WAITALL);
 
   sprintf(buf, "%s %s HTTP/1.0\n\r", tmp_buf, (const char*)pathbuf);
   Write(buf, strlen(buf));
@@ -275,6 +275,7 @@ wxInputStream *wxHTTP::GetInputStream(const wxString& path)
   if (!GetHeader(_T("Content-Length")).IsEmpty())
     inp_stream->m_httpsize = wxAtoi(WXSTRINGCAST GetHeader(_T("Content-Length")));
 
+  SetFlags(WAITALL);
   return inp_stream;
 }
 
index dd17b9bdf6d55cd239b424236c84cdf5c34a39fd..8531c9b3a929b054c038023f924010e2a5816d2f 100644 (file)
@@ -100,7 +100,9 @@ void SocketWaiter::ProcessReadEvent()
   int ret;
   char c;
 
+  m_internal->AcquireFD();
   ret = recv(m_fd, &c, 1, MSG_PEEK);
+  m_internal->ReleaseFD();
   
   // We are a server => emit a EVT_ACCEPT event.
   if (ret == -1 && m_socket->GetType() == wxSocketBase::SOCK_SERVER) {
@@ -167,14 +169,12 @@ void *SocketWaiter::Entry()
 #endif
 #endif
 
-    if (ret == 0)
-      // If nothing happened, we wait for 100 ms.
-      wxUsleep(10);
+   // We wait for 100 ms to prevent the CPU from burning.
+   wxUsleep(100);
 
     // Check whether we should exit.
-    if (TestDestroy()) {
+    if (TestDestroy())
       return NULL;
-    }
   }
   return NULL;
 }
@@ -185,7 +185,10 @@ void *SocketWaiter::Entry()
 
 SocketRequester::SocketRequester(wxSocketBase *socket,
                                 wxSocketInternal *internal)
-  : wxThread(),
+  :
+#if wxUSE_THREADS
+    wxThread(),
+#endif
     m_socket(socket), m_internal(internal), m_fd(internal->GetFD())
 {
 }
@@ -204,12 +207,12 @@ bool SocketRequester::WaitFor(wxSocketBase::wxRequestNotify req, int millisec)
   tv.tv_sec = millisec / 1000;
   tv.tv_usec = (millisec % 1000) * 1000;
 
-  if ((req & READ_MASK) != 0)  
-    FD_ZERO(&sockrd_set);
+  FD_ZERO(&sockrd_set);
   FD_ZERO(&sockwr_set);
-
-  FD_SET(m_fd, &sockrd_set);
-  FD_SET(m_fd, &sockwr_set);
+  if ((req & READ_MASK) != 0)  
+    FD_SET(m_fd, &sockrd_set);
+  if ((req & WRITE_MASK) != 0)
+    FD_SET(m_fd, &sockwr_set);
   
   m_internal->AcquireFD();
   ret = select(m_fd+1, &sockrd_set, &sockwr_set, NULL, &tv);
@@ -247,6 +250,9 @@ void SocketRequester::ProcessReadEvent(SockRequest *req)
     req->size -= len;
     req->io_nbytes += len;
     req->buffer += len;
+
+    if (len == 0)
+      m_internal->EndRequest(req); 
     return;
   }
   // The End.
@@ -337,21 +343,17 @@ void *SocketRequester::Entry()
 wxSocketInternal::wxSocketInternal(wxSocketBase *socket)
 {
   m_socket = socket;
-#if wxUSE_THREADS
   m_thread_requester = NULL;
   m_thread_waiter = NULL;
   m_invalid_requester = TRUE;
-#endif
 }
 
 wxSocketInternal::~wxSocketInternal()
 {
-#if wxUSE_THREADS
   StopRequester();
   wxASSERT(m_thread_requester == NULL);
   StopWaiter();
   wxASSERT(m_thread_waiter == NULL);
-#endif
 }
 
 // ----------------------------------------------------------------------
@@ -363,10 +365,12 @@ SockRequest *wxSocketInternal::WaitForReq()
 #if wxUSE_THREADS
   wxNode *node;
 
+  // First try.
   node = m_requests.First();
   if (node == NULL) {
     m_socket_cond.Wait(m_request_locker, 10, 0);
 
+    // Second try, if it is unsuccessul we give up.
     node = m_requests.First();
     if (node == NULL)
       return NULL;
@@ -425,14 +429,17 @@ void wxSocketInternal::ResumeRequester()
 #if wxUSE_THREADS
   wxThreadError err;
 
-  wxASSERT(m_thread_requester == NULL || m_invalid_requester);
+  wxASSERT(m_invalid_requester);
 
   m_end_requester.Lock();
-  if (m_invalid_requester) {
-    if (m_thread_requester != NULL)
-      delete m_thread_requester;
-    m_invalid_requester = FALSE;
+
+  if (m_thread_requester != NULL) {
+    m_thread_requester->Delete(); // We must join it.
+    delete m_thread_requester;
   }
+
+  m_invalid_requester = FALSE;
+
   m_end_requester.Unlock();
 
   m_thread_requester = new SocketRequester(m_socket, this);
@@ -442,7 +449,11 @@ void wxSocketInternal::ResumeRequester()
 
   err = m_thread_requester->Run();
   wxASSERT(err == wxTHREAD_NO_ERROR);
-
+#else
+  if (!m_invalid_requester) 
+    return;
+  m_thread_requester = new SocketRequester(m_socket, this);
+  m_invalid_requester = FALSE;
 #endif
 }
 
@@ -453,6 +464,7 @@ void wxSocketInternal::StopRequester()
   if (m_invalid_requester) {
     m_end_requester.Unlock();
     if (m_thread_requester) {
+      m_thread_requester->Delete();
       delete m_thread_requester;
       m_thread_requester = NULL;
     }
@@ -475,6 +487,11 @@ void wxSocketInternal::StopRequester()
 
   delete m_thread_requester;
   m_thread_requester = NULL;
+  m_invalid_requester = TRUE;
+#else
+  delete m_thread_requester;
+  m_thread_requester = NULL;
+  m_invalid_requester = TRUE;
 #endif
 }
 
@@ -488,6 +505,8 @@ void wxSocketInternal::ResumeWaiter()
 
   m_thread_waiter = new SocketWaiter(m_socket, this);
 
+  m_thread_waiter->SetPriority(WXTHREAD_MIN_PRIORITY);
+
   err = m_thread_waiter->Create();
   wxASSERT(err == wxTHREAD_NO_ERROR);
 
@@ -514,11 +533,10 @@ void wxSocketInternal::StopWaiter()
 // ----------------------------------------------------------------------
 void wxSocketInternal::QueueRequest(SockRequest *request, bool async)
 {
-#if wxUSE_THREADS
   if (m_invalid_requester)
     ResumeRequester();
 
-  async = FALSE; 
+#if wxUSE_THREADS
   if (async) {
 
     m_request_locker.Lock();
@@ -541,6 +559,7 @@ void wxSocketInternal::QueueRequest(SockRequest *request, bool async)
     }
   } else {
     m_request_locker.Lock();
+#endif
 
     if ((request->type & wxSocketBase::REQ_WAIT) != 0) {
       m_thread_requester->ProcessWaitEvent(request);
@@ -559,6 +578,7 @@ void wxSocketInternal::QueueRequest(SockRequest *request, bool async)
       }
     }
     request->done = TRUE;
+#if wxUSE_THREADS
     m_request_locker.Unlock();
   }
 #endif
index c568508af135f840b873484b0a4f25db722f6d55..883ffb82eb39de2d1139072c4f3f43303d1094ef 100644 (file)
@@ -260,6 +260,65 @@ wxSocketBase& wxSocketBase::Read(char* buffer, size_t nbytes)
   return *this;
 }
 
+wxSocketBase& wxSocketBase::ReadMsg(char* buffer, size_t nbytes)
+{
+  unsigned long len, len2, sig;
+  struct {
+    char sig[4];
+    char len[4];
+  } msg;
+
+  // sig should be an explicit 32-bit unsigned integer; I've seen
+  // compilers in which size_t was actually a 16-bit unsigned integer
+
+  Read((char *)&msg, sizeof(msg));
+  if (m_lcount != sizeof(msg))
+    return *this;
+
+  sig = msg.sig[0] & 0xff;
+  sig |= (size_t)(msg.sig[1] & 0xff) << 8;
+  sig |= (size_t)(msg.sig[2] & 0xff) << 16;
+  sig |= (size_t)(msg.sig[3] & 0xff) << 24;
+
+  if (sig != 0xfeeddead)
+    return *this;
+  len = msg.len[0] & 0xff;
+  len |= (size_t)(msg.len[1] & 0xff) << 8;
+  len |= (size_t)(msg.len[2] & 0xff) << 16;
+  len |= (size_t)(msg.len[3] & 0xff) << 24;
+
+  // len2 is incorrectly computed in the original; this sequence is
+  // the fix
+  if (len > nbytes) {
+    len2 = len - nbytes;
+    len = nbytes;
+  }
+  else
+    len2 = 0;
+
+  // the "len &&" in the following statement is necessary so that
+  // we don't attempt to read (and possibly hang the system)
+  // if the message was zero bytes long
+  if (len && Read(buffer, len).LastCount() != len)
+    return *this;
+  if (len2 && (Read(NULL, len2).LastCount() != len2))
+    return *this;
+  if (Read((char *)&msg, sizeof(msg)).LastCount() != sizeof(msg))
+    return *this;
+
+  sig = msg.sig[0] & 0xff;
+  sig |= (size_t)(msg.sig[1] & 0xff) << 8;
+  sig |= (size_t)(msg.sig[2] & 0xff) << 16;
+  sig |= (size_t)(msg.sig[3] & 0xff) << 24;
+// ERROR
+// we return *this either way, so a smart optimizer will
+// optimize the following sequence out; I'm leaving it in anyway
+  if (sig != 0xdeadfeed)
+    return *this;
+
+  return *this;
+}
+
 wxSocketBase& wxSocketBase::Peek(char* buffer, size_t nbytes)
 {
   m_lcount = GetPushback(buffer, nbytes, TRUE);
@@ -282,9 +341,54 @@ wxSocketBase& wxSocketBase::Write(const char *buffer, size_t nbytes)
   return *this;
 }
 
+wxSocketBase& wxSocketBase::WriteMsg(const char *buffer, size_t nbytes)
+{
+  struct {
+    char sig[4];
+    char len[4];
+  } msg;
+
+  // warning about 'cast truncates constant value'
+#ifdef __VISUALC__
+    #pragma warning(disable: 4310)
+#endif // __VISUALC__
+
+  msg.sig[0] = (char) 0xad;
+  msg.sig[1] = (char) 0xde;
+  msg.sig[2] = (char) 0xed;
+  msg.sig[3] = (char) 0xfe;
+
+    msg.len[0] = (char) nbytes & 0xff;
+  msg.len[1] = (char) (nbytes >> 8) & 0xff;
+  msg.len[2] = (char) (nbytes >> 16) & 0xff;
+  msg.len[3] = (char) (nbytes >> 24) & 0xff;
+
+  if (Write((char *)&msg, sizeof(msg)).LastCount() < sizeof(msg))
+    return *this;
+  if (Write(buffer, nbytes).LastCount() < nbytes)
+    return *this;
+
+  msg.sig[0] = (char) 0xed;
+  msg.sig[1] = (char) 0xfe;
+  msg.sig[2] = (char) 0xad;
+  msg.sig[3] = (char) 0xde;
+  msg.len[0] = msg.len[1] = msg.len[2] = msg.len[3] = (char) 0;
+  Write((char *)&msg, sizeof(msg));
+
+  return *this;
+
+#ifdef __VISUALC__
+    #pragma warning(default: 4310)
+#endif // __VISUALC__
+}
+
 wxSocketBase& wxSocketBase::Unread(const char *buffer, size_t nbytes)
 {
-  CreatePushbackAfter(buffer, nbytes);
+  m_lcount = 0;
+  if (nbytes != 0) {
+    CreatePushbackAfter(buffer, nbytes);
+    m_lcount = nbytes;
+  }
   return *this;
 }
 
@@ -304,7 +408,7 @@ bool wxSocketBase::IsData() const
   tv.tv_usec = 0;
   FD_ZERO(&sock_set);
   FD_SET(m_fd, &sock_set);
-  select(FD_SETSIZE, &sock_set, NULL, NULL, &tv);
+  select(m_fd+1, &sock_set, NULL, NULL, &tv);
 
   m_internal->ReleaseFD();
 
@@ -424,7 +528,9 @@ void wxSocketBase::RestoreState()
   state = (SocketState *)node->Data();
 
   SetFlags(state->socket_flags);
+  m_internal->AcquireData();
   m_neededreq = state->evt_notify_state;
+  m_internal->ReleaseData();
   m_cbk       = state->c_callback;
   m_cdata     = state->c_callback_data;
   Notify(state->notify_state);
@@ -682,7 +788,6 @@ void wxSocketBase::WantBuffer(char *buffer, size_t nbytes,
   SockRequest *buf = new SockRequest;
 
   SaveState();
-  m_internal->StopWaiter(); 
   buf->buffer = buffer;
   buf->size = nbytes;
   buf->done = FALSE;
index 1c34deffff0687d552063a9e1d6141689ebb3aec..d5954d71d8c2340fd3383968df78354c960b6793 100644 (file)
@@ -340,7 +340,7 @@ size_t wxStreamBuffer::Read(wxStreamBuffer *s_buf)
   if (m_mode == write)
     return 0;
 
-  while (bytes_read == BUF_TEMP_SIZE) {
+  while (bytes_read != 0) {
     bytes_read = Read(buf, bytes_read);
     bytes_read = s_buf->Write(buf, bytes_read);
     s += bytes_read;
index edd373cdc467c86356acf5a00ba3c8628b69f153..aad7082be4d7ab0aa5f6342a5b2c28d388b977b8 100644 (file)
@@ -271,6 +271,8 @@ private:
     //     state
     //  2. The Delete() function blocks until the condition is signaled when the
     //     thread exits.
+    // GL: On Linux, this may fail because we can have a deadlock in either
+    //     SignalExit() or Wait(): so we add m_end_mutex for the finalization.
     wxMutex     m_mutex, m_end_mutex;
     wxCondition m_cond;
 
@@ -299,7 +301,7 @@ void *wxThreadInternal::PthreadStart(void *ptr)
     }
 #if HAVE_THREAD_CLEANUP_FUNCTIONS
     // Install the cleanup handler.
-//    pthread_cleanup_push(wxThreadInternal::PthreadCleanup, ptr);
+    pthread_cleanup_push(wxThreadInternal::PthreadCleanup, ptr);
 #endif
 
     // wait for the condition to be signaled from Run()
@@ -311,7 +313,7 @@ void *wxThreadInternal::PthreadStart(void *ptr)
     status = thread->Entry();
 
 #if HAVE_THREAD_CLEANUP_FUNCTIONS
-//    pthread_cleanup_pop(FALSE);
+    pthread_cleanup_pop(FALSE);
 #endif
 
     // terminate the thread
@@ -371,8 +373,16 @@ wxThreadInternal::~wxThreadInternal()
     // note that m_mutex will be unlocked by the thread which waits for our
     // termination
 
-    // m_end_mutex can be unlocked here.
-    m_end_mutex.Unlock();
+    // In the case, we didn't start the thread, all these mutex are locked:
+    // we must unlock them.
+    if (m_mutex.IsLocked())
+      m_mutex.Unlock();
+
+    if (m_end_mutex.IsLocked())
+      m_end_mutex.Unlock();
+
+    if (m_mutexSuspend.IsLocked())
+      m_mutexSuspend.Unlock();
 }
 
 wxThreadError wxThreadInternal::Run()
@@ -748,6 +758,14 @@ bool wxThread::TestDestroy()
 
 wxThread::~wxThread()
 {
+    m_critsect.Enter();
+    if (p_internal->GetState() != STATE_EXITED &&
+        p_internal->GetState() != STATE_NEW)
+      wxLogDebug(_T("The thread is being destroyed althought it is still running ! The application may crash."));
+
+    m_critsect.Leave();
+
+    delete p_internal;
     // remove this thread from the global array
     gs_allThreads.Remove(this);
 }