]> git.saurik.com Git - wxWidgets.git/blobdiff - src/common/sckipc.cpp
Large image-loading speedup and small attribute-loading speedup
[wxWidgets.git] / src / common / sckipc.cpp
index 1d43462ec2427ecda5e5b458ac8232fd5c96b71d..0f1173baa6d6db09af192b1ea800b38443b69a29 100644 (file)
@@ -6,6 +6,7 @@
 //              Guillermo Rodriguez (updated for wxSocket v2) Jan 2000
 //                                  (callbacks deprecated)    Mar 2000
 //              Vadim Zeitlin (added support for Unix sockets) Apr 2002
 //              Guillermo Rodriguez (updated for wxSocket v2) Jan 2000
 //                                  (callbacks deprecated)    Mar 2000
 //              Vadim Zeitlin (added support for Unix sockets) Apr 2002
+//                            (use buffering, many fixes/cleanup) Oct 2008
 // Created:     1993
 // RCS-ID:      $Id$
 // Copyright:   (c) Julian Smart 1993
 // Created:     1993
 // RCS-ID:      $Id$
 // Copyright:   (c) Julian Smart 1993
 // macros and constants
 // --------------------------------------------------------------------------
 
 // macros and constants
 // --------------------------------------------------------------------------
 
-// Message codes
-enum
+namespace
+{
+
+// Message codes (don't change them to avoid breaking the existing code using
+// wxIPC protocol!)
+enum IPCCode
 {
 {
-    IPC_EXECUTE = 1,
-    IPC_REQUEST,
-    IPC_POKE,
-    IPC_ADVISE_START,
-    IPC_ADVISE_REQUEST,
-    IPC_ADVISE,
-    IPC_ADVISE_STOP,
-    IPC_REQUEST_REPLY,
-    IPC_FAIL,
-    IPC_CONNECT,
-    IPC_DISCONNECT
+    IPC_EXECUTE         = 1,
+    IPC_REQUEST         = 2,
+    IPC_POKE            = 3,
+    IPC_ADVISE_START    = 4,
+    IPC_ADVISE_REQUEST  = 5,
+    IPC_ADVISE          = 6,
+    IPC_ADVISE_STOP     = 7,
+    IPC_REQUEST_REPLY   = 8,
+    IPC_FAIL            = 9,
+    IPC_CONNECT         = 10,
+    IPC_DISCONNECT      = 11,
+    IPC_MAX
 };
 
 };
 
-// All sockets will be created with the following flags
-#define SCKIPC_FLAGS (wxSOCKET_WAITALL|wxSOCKET_REUSEADDR)
+} // anonymous namespace
 
 // headers needed for umask()
 #ifdef __UNIX_LIKE__
 
 // headers needed for umask()
 #ifdef __UNIX_LIKE__
@@ -87,7 +92,7 @@ GetAddressFromName(const wxString& serverName,
 #if defined(__UNIX__) && !defined(__WINDOWS__) && !defined(__WINE__)
     // under Unix, if the server name looks like a path, create a AF_UNIX
     // socket instead of AF_INET one
 #if defined(__UNIX__) && !defined(__WINDOWS__) && !defined(__WINE__)
     // under Unix, if the server name looks like a path, create a AF_UNIX
     // socket instead of AF_INET one
-    if ( serverName.Find(_T('/')) != wxNOT_FOUND )
+    if ( serverName.Find(wxT('/')) != wxNOT_FOUND )
     {
         wxUNIXaddress *addr = new wxUNIXaddress;
         addr->Filename(serverName);
     {
         wxUNIXaddress *addr = new wxUNIXaddress;
         addr->Filename(serverName);
@@ -114,13 +119,16 @@ GetAddressFromName(const wxString& serverName,
 class wxTCPEventHandler : public wxEvtHandler
 {
 public:
 class wxTCPEventHandler : public wxEvtHandler
 {
 public:
-    wxTCPEventHandler() : wxEvtHandler() {}
+    wxTCPEventHandler() : wxEvtHandler() { }
 
     void Client_OnRequest(wxSocketEvent& event);
     void Server_OnRequest(wxSocketEvent& event);
 
 
     void Client_OnRequest(wxSocketEvent& event);
     void Server_OnRequest(wxSocketEvent& event);
 
+private:
+    void HandleDisconnect(wxTCPConnection *connection);
+
     DECLARE_EVENT_TABLE()
     DECLARE_EVENT_TABLE()
-    DECLARE_NO_COPY_CLASS(wxTCPEventHandler)
+    wxDECLARE_NO_COPY_CLASS(wxTCPEventHandler);
 };
 
 enum
 };
 
 enum
@@ -129,7 +137,214 @@ enum
     _SERVER_ONREQUEST_ID
 };
 
     _SERVER_ONREQUEST_ID
 };
 
-static wxTCPEventHandler *gs_handler = NULL;
+// --------------------------------------------------------------------------
+// wxTCPEventHandlerModule (private class)
+// --------------------------------------------------------------------------
+
+class wxTCPEventHandlerModule : public wxModule
+{
+public:
+    wxTCPEventHandlerModule() : wxModule() { }
+
+    // get the global wxTCPEventHandler creating it if necessary
+    static wxTCPEventHandler& GetHandler()
+    {
+        if ( !ms_handler )
+            ms_handler = new wxTCPEventHandler;
+
+        return *ms_handler;
+    }
+
+    // as ms_handler is initialized on demand, don't do anything in OnInit()
+    virtual bool OnInit() { return true; }
+    virtual void OnExit() { wxDELETE(ms_handler); }
+
+private:
+    static wxTCPEventHandler *ms_handler;
+
+    DECLARE_DYNAMIC_CLASS(wxTCPEventHandlerModule)
+    wxDECLARE_NO_COPY_CLASS(wxTCPEventHandlerModule);
+};
+
+IMPLEMENT_DYNAMIC_CLASS(wxTCPEventHandlerModule, wxModule)
+
+wxTCPEventHandler *wxTCPEventHandlerModule::ms_handler = NULL;
+
+// --------------------------------------------------------------------------
+// wxIPCSocketStreams
+// --------------------------------------------------------------------------
+
+#define USE_BUFFER
+
+// this class contains the various (related) streams used by wxTCPConnection
+// and also provides a way to read from the socket stream directly
+//
+// for writing to the stream use the IPCOutput class below
+class wxIPCSocketStreams
+{
+public:
+    // ctor initializes all the streams on top of the given socket
+    //
+    // note that we use a bigger than default buffer size which matches the
+    // typical Ethernet MTU (minus TCP header overhead)
+    wxIPCSocketStreams(wxSocketBase& sock)
+        : m_socketStream(sock),
+#ifdef USE_BUFFER
+          m_bufferedOut(m_socketStream, 1448),
+#else
+          m_bufferedOut(m_socketStream),
+#endif
+          m_dataIn(m_socketStream),
+          m_dataOut(m_bufferedOut)
+    {
+    }
+
+    // expose the IO methods needed by IPC code (notice that writing is only
+    // done via IPCOutput)
+
+    // flush output
+    void Flush()
+    {
+#ifdef USE_BUFFER
+        m_bufferedOut.Sync();
+#endif
+    }
+
+    // simple wrappers around the functions with the same name in
+    // wxDataInputStream
+    wxUint8 Read8()
+    {
+        Flush();
+        return m_dataIn.Read8();
+    }
+
+    wxUint32 Read32()
+    {
+        Flush();
+        return m_dataIn.Read32();
+    }
+
+    wxString ReadString()
+    {
+        Flush();
+        return m_dataIn.ReadString();
+    }
+
+    // read arbitrary (size-prepended) data
+    //
+    // connection parameter is needed to call its GetBufferAtLeast() method
+    void *ReadData(wxConnectionBase *conn, size_t *size)
+    {
+        Flush();
+
+        wxCHECK_MSG( conn, NULL, "NULL connection parameter" );
+        wxCHECK_MSG( size, NULL, "NULL size parameter" );
+
+        *size = Read32();
+
+        void * const data = conn->GetBufferAtLeast(*size);
+        wxCHECK_MSG( data, NULL, "IPC buffer allocation failed" );
+
+        m_socketStream.Read(data, *size);
+
+        return data;
+    }
+
+    // same as above but for data preceded by the format
+    void *
+    ReadFormatData(wxConnectionBase *conn, wxIPCFormat *format, size_t *size)
+    {
+        wxCHECK_MSG( format, NULL, "NULL format parameter" );
+
+        *format = static_cast<wxIPCFormat>(Read8());
+
+        return ReadData(conn, size);
+    }
+
+
+    // these methods are only used by IPCOutput and not directly
+    wxDataOutputStream& GetDataOut() { return m_dataOut; }
+    wxOutputStream& GetUnformattedOut() { return m_bufferedOut; }
+
+private:
+    // this is the low-level underlying stream using the connection socket
+    wxSocketStream m_socketStream;
+
+    // the buffered stream is used to avoid writing all pieces of an IPC
+    // request to the socket one by one but to instead do it all at once when
+    // we're done with it
+#ifdef USE_BUFFER
+    wxBufferedOutputStream m_bufferedOut;
+#else
+    wxOutputStream& m_bufferedOut;
+#endif
+
+    // finally the data streams are used to be able to write typed data into
+    // the above streams easily
+    wxDataInputStream  m_dataIn;
+    wxDataOutputStream m_dataOut;
+
+    wxDECLARE_NO_COPY_CLASS(wxIPCSocketStreams);
+};
+
+namespace
+{
+
+// an object of this class should be instantiated on the stack to write to the
+// underlying socket stream
+//
+// this class is intentionally separated from wxIPCSocketStreams to ensure that
+// Flush() is always called
+class IPCOutput
+{
+public:
+    // construct an object associated with the given streams (which must have
+    // life time greater than ours as we keep a reference to it)
+    IPCOutput(wxIPCSocketStreams *streams)
+        : m_streams(*streams)
+    {
+        wxASSERT_MSG( streams, "NULL streams pointer" );
+    }
+
+    // dtor calls Flush() really sending the IPC data to the network
+    ~IPCOutput() { m_streams.Flush(); }
+
+
+    // write a byte
+    void Write8(wxUint8 i)
+    {
+        m_streams.GetDataOut().Write8(i);
+    }
+
+    // write the reply code and a string
+    void Write(IPCCode code, const wxString& str)
+    {
+        Write8(code);
+        m_streams.GetDataOut().WriteString(str);
+    }
+
+    // write the reply code, a string and a format in this order
+    void Write(IPCCode code, const wxString& str, wxIPCFormat format)
+    {
+        Write(code, str);
+        Write8(format);
+    }
+
+    // write arbitrary data
+    void WriteData(const void *data, size_t size)
+    {
+        m_streams.GetDataOut().Write32(size);
+        m_streams.GetUnformattedOut().Write(data, size);
+    }
+
+
+private:
+    wxIPCSocketStreams& m_streams;
+
+    wxDECLARE_NO_COPY_CLASS(IPCOutput);
+};
+
+} // anonymous namespace
 
 // ==========================================================================
 // implementation
 
 // ==========================================================================
 // implementation
@@ -155,31 +370,26 @@ bool wxTCPClient::ValidHost(const wxString& host)
     return addr.Hostname(host);
 }
 
     return addr.Hostname(host);
 }
 
-wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host,
-                                               const wxString& serverName,
-                                               const wxString& topic)
+wxConnectionBase *wxTCPClient::MakeConnection(const wxString& host,
+                                              const wxString& serverName,
+                                              const wxString& topic)
 {
     wxSockAddress *addr = GetAddressFromName(serverName, host);
     if ( !addr )
         return NULL;
 
 {
     wxSockAddress *addr = GetAddressFromName(serverName, host);
     if ( !addr )
         return NULL;
 
-    wxSocketClient *client = new wxSocketClient(SCKIPC_FLAGS);
-    wxSocketStream *stream = new wxSocketStream(*client);
-    wxDataInputStream *data_is = new wxDataInputStream(*stream);
-    wxDataOutputStream *data_os = new wxDataOutputStream(*stream);
+    wxSocketClient * const client = new wxSocketClient(wxSOCKET_WAITALL);
+    wxIPCSocketStreams * const streams = new wxIPCSocketStreams(*client);
 
     bool ok = client->Connect(*addr);
     delete addr;
 
     if ( ok )
     {
 
     bool ok = client->Connect(*addr);
     delete addr;
 
     if ( ok )
     {
-        unsigned char msg;
-
         // Send topic name, and enquire whether this has succeeded
         // Send topic name, and enquire whether this has succeeded
-        data_os->Write8(IPC_CONNECT);
-        data_os->WriteString(topic);
+        IPCOutput(streams).Write(IPC_CONNECT, topic);
 
 
-        msg = data_is->Read8();
+        unsigned char msg = streams->Read8();
 
         // OK! Confirmation.
         if (msg == IPC_CONNECT)
 
         // OK! Confirmation.
         if (msg == IPC_CONNECT)
@@ -193,10 +403,9 @@ wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host,
                 {
                     connection->m_topic = topic;
                     connection->m_sock  = client;
                 {
                     connection->m_topic = topic;
                     connection->m_sock  = client;
-                    connection->m_sockstrm = stream;
-                    connection->m_codeci = data_is;
-                    connection->m_codeco = data_os;
-                    client->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID);
+                    connection->m_streams = streams;
+                    client->SetEventHandler(wxTCPEventHandlerModule::GetHandler(),
+                                            _CLIENT_ONREQUEST_ID);
                     client->SetClientData(connection);
                     client->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
                     client->Notify(true);
                     client->SetClientData(connection);
                     client->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
                     client->Notify(true);
@@ -212,9 +421,7 @@ wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host,
     }
 
     // Something went wrong, delete everything
     }
 
     // Something went wrong, delete everything
-    delete data_is;
-    delete data_os;
-    delete stream;
+    delete streams;
     client->Destroy();
 
     return NULL;
     client->Destroy();
 
     return NULL;
@@ -273,8 +480,10 @@ bool wxTCPServer::Create(const wxString& serverName)
     }
 #endif // __UNIX_LIKE__
 
     }
 #endif // __UNIX_LIKE__
 
-    // Create a socket listening on the specified port
-    m_server = new wxSocketServer(*addr, SCKIPC_FLAGS);
+    // Create a socket listening on the specified port (reusing it to allow
+    // restarting the server listening on the same port as was used by the
+    // previous instance of this server)
+    m_server = new wxSocketServer(*addr, wxSOCKET_WAITALL | wxSOCKET_REUSEADDR);
 
 #ifdef __UNIX_LIKE__
     if ( addr->Type() == wxSockAddress::UNIX )
 
 #ifdef __UNIX_LIKE__
     if ( addr->Type() == wxSockAddress::UNIX )
@@ -297,7 +506,8 @@ bool wxTCPServer::Create(const wxString& serverName)
         return false;
     }
 
         return false;
     }
 
-    m_server->SetEventHandler(*gs_handler, _SERVER_ONREQUEST_ID);
+    m_server->SetEventHandler(wxTCPEventHandlerModule::GetHandler(),
+                              _SERVER_ONREQUEST_ID);
     m_server->SetClientData(this);
     m_server->SetNotify(wxSOCKET_CONNECTION_FLAG);
     m_server->Notify(true);
     m_server->SetClientData(this);
     m_server->SetNotify(wxSOCKET_CONNECTION_FLAG);
     m_server->Notify(true);
@@ -318,7 +528,7 @@ wxTCPServer::~wxTCPServer()
     {
         if ( remove(m_filename.fn_str()) != 0 )
         {
     {
         if ( remove(m_filename.fn_str()) != 0 )
         {
-            wxLogDebug(_T("Stale AF_UNIX file '%s' left."), m_filename.c_str());
+            wxLogDebug(wxT("Stale AF_UNIX file '%s' left."), m_filename.c_str());
         }
     }
 #endif // __UNIX_LIKE__
         }
     }
 #endif // __UNIX_LIKE__
@@ -336,10 +546,8 @@ wxTCPServer::OnAcceptConnection(const wxString& WXUNUSED(topic))
 
 void wxTCPConnection::Init()
 {
 
 void wxTCPConnection::Init()
 {
-    m_sock     = NULL;
-    m_sockstrm = NULL;
-    m_codeci   = NULL;
-    m_codeco   = NULL;
+    m_sock = NULL;
+    m_streams = NULL;
 }
 
 wxTCPConnection::~wxTCPConnection()
 }
 
 wxTCPConnection::~wxTCPConnection()
@@ -352,10 +560,7 @@ wxTCPConnection::~wxTCPConnection()
         m_sock->Destroy();
     }
 
         m_sock->Destroy();
     }
 
-    /* Delete after destroy */
-    wxDELETE(m_codeci);
-    wxDELETE(m_codeco);
-    wxDELETE(m_sockstrm);
+    delete m_streams;
 }
 
 void wxTCPConnection::Compress(bool WXUNUSED(on))
 }
 
 void wxTCPConnection::Compress(bool WXUNUSED(on))
@@ -370,7 +575,7 @@ bool wxTCPConnection::Disconnect()
         return true;
 
     // Send the disconnect message to the peer.
         return true;
 
     // Send the disconnect message to the peer.
-    m_codeco->Write8(IPC_DISCONNECT);
+    IPCOutput(m_streams).Write8(IPC_DISCONNECT);
 
     if ( m_sock )
     {
 
     if ( m_sock )
     {
@@ -391,11 +596,11 @@ bool wxTCPConnection::DoExecute(const void *data,
         return false;
 
     // Prepare EXECUTE message
         return false;
 
     // Prepare EXECUTE message
-    m_codeco->Write8(IPC_EXECUTE);
-    m_codeco->Write8(format);
+    IPCOutput out(m_streams);
+    out.Write8(IPC_EXECUTE);
+    out.Write8(format);
 
 
-    m_codeco->Write32(size);
-    m_sockstrm->Write(data, size);
+    out.WriteData(data, size);
 
     return true;
 }
 
     return true;
 }
@@ -407,24 +612,17 @@ const void *wxTCPConnection::Request(const wxString& item,
     if ( !m_sock->IsConnected() )
         return NULL;
 
     if ( !m_sock->IsConnected() )
         return NULL;
 
-    m_codeco->Write8(IPC_REQUEST);
-    m_codeco->WriteString(item);
-    m_codeco->Write8(format);
+    IPCOutput(m_streams).Write(IPC_REQUEST, item, format);
 
 
-    int ret = m_codeci->Read8();
-    if ( ret == IPC_FAIL )
+    const int ret = m_streams->Read8();
+    if ( ret != IPC_REQUEST_REPLY )
         return NULL;
 
         return NULL;
 
-    size_t s = m_codeci->Read32();
-
-    void *data = GetBufferAtLeast( s );
-    wxASSERT_MSG(data != NULL,
-            _T("Buffer too small in wxTCPConnection::Request") );
-    m_sockstrm->Read(data, s);
-
-    if (size)
-        *size = s;
-    return data;
+    // ReadData() needs a non-NULL size pointer but the client code can call us
+    // with NULL pointer (this makes sense if it knows that it always works
+    // with NUL-terminated strings)
+    size_t sizeFallback;
+    return m_streams->ReadData(this, size ? size : &sizeFallback);
 }
 
 bool wxTCPConnection::DoPoke(const wxString& item,
 }
 
 bool wxTCPConnection::DoPoke(const wxString& item,
@@ -435,29 +633,23 @@ bool wxTCPConnection::DoPoke(const wxString& item,
     if ( !m_sock->IsConnected() )
         return false;
 
     if ( !m_sock->IsConnected() )
         return false;
 
-    m_codeco->Write8(IPC_POKE);
-    m_codeco->WriteString(item);
-    m_codeco->Write8(format);
-
-    m_codeco->Write32(size);
-    m_sockstrm->Write(data, size);
+    IPCOutput out(m_streams);
+    out.Write(IPC_POKE, item, format);
+    out.WriteData(data, size);
 
     return true;
 }
 
 
     return true;
 }
 
-bool wxTCPConnection::StartAdvise (const wxString& item)
+bool wxTCPConnection::StartAdvise(const wxString& item)
 {
     if ( !m_sock->IsConnected() )
         return false;
 
 {
     if ( !m_sock->IsConnected() )
         return false;
 
-    m_codeco->Write8(IPC_ADVISE_START);
-    m_codeco->WriteString(item);
+    IPCOutput(m_streams).Write(IPC_ADVISE_START, item);
 
 
-    int ret = m_codeci->Read8();
-    if (ret != IPC_FAIL)
-        return true;
-    else
-        return false;
+    const int ret = m_streams->Read8();
+
+    return ret == IPC_ADVISE_START;
 }
 
 bool wxTCPConnection::StopAdvise (const wxString& item)
 }
 
 bool wxTCPConnection::StopAdvise (const wxString& item)
@@ -465,15 +657,11 @@ bool wxTCPConnection::StopAdvise (const wxString& item)
     if ( !m_sock->IsConnected() )
         return false;
 
     if ( !m_sock->IsConnected() )
         return false;
 
-    m_codeco->Write8(IPC_ADVISE_STOP);
-    m_codeco->WriteString(item);
+    IPCOutput(m_streams).Write(IPC_ADVISE_STOP, item);
 
 
-    int ret = m_codeci->Read8();
+    const int ret = m_streams->Read8();
 
 
-    if (ret != IPC_FAIL)
-        return true;
-    else
-        return false;
+    return ret == IPC_ADVISE_STOP;
 }
 
 // Calls that SERVER can make
 }
 
 // Calls that SERVER can make
@@ -485,12 +673,9 @@ bool wxTCPConnection::DoAdvise(const wxString& item,
     if ( !m_sock->IsConnected() )
         return false;
 
     if ( !m_sock->IsConnected() )
         return false;
 
-    m_codeco->Write8(IPC_ADVISE);
-    m_codeco->WriteString(item);
-    m_codeco->Write8(format);
-
-    m_codeco->Write32(size);
-    m_sockstrm->Write(data, size);
+    IPCOutput out(m_streams);
+    out.Write(IPC_ADVISE, item, format);
+    out.WriteData(data, size);
 
     return true;
 }
 
     return true;
 }
@@ -504,172 +689,181 @@ BEGIN_EVENT_TABLE(wxTCPEventHandler, wxEvtHandler)
     EVT_SOCKET(_SERVER_ONREQUEST_ID, wxTCPEventHandler::Server_OnRequest)
 END_EVENT_TABLE()
 
     EVT_SOCKET(_SERVER_ONREQUEST_ID, wxTCPEventHandler::Server_OnRequest)
 END_EVENT_TABLE()
 
+void wxTCPEventHandler::HandleDisconnect(wxTCPConnection *connection)
+{
+    // connection was closed (either gracefully or not): destroy everything
+    connection->m_sock->Notify(false);
+    connection->m_sock->Close();
+
+    // don't leave references to this soon-to-be-dangling connection in the
+    // socket as it won't be destroyed immediately as its destruction will be
+    // delayed in case there are more events pending for it
+    connection->m_sock->SetClientData(NULL);
+
+    connection->SetConnected(false);
+    connection->OnDisconnect();
+}
+
 void wxTCPEventHandler::Client_OnRequest(wxSocketEvent &event)
 {
     wxSocketBase *sock = event.GetSocket();
     if (!sock)
 void wxTCPEventHandler::Client_OnRequest(wxSocketEvent &event)
 {
     wxSocketBase *sock = event.GetSocket();
     if (!sock)
-        return ;
+        return;
 
     wxSocketNotify evt = event.GetSocketEvent();
 
     wxSocketNotify evt = event.GetSocketEvent();
-    wxTCPConnection *connection = (wxTCPConnection *)(sock->GetClientData());
+    wxTCPConnection * const
+        connection = static_cast<wxTCPConnection *>(sock->GetClientData());
 
     // This socket is being deleted; skip this event
     if (!connection)
         return;
 
 
     // This socket is being deleted; skip this event
     if (!connection)
         return;
 
-    wxDataInputStream *codeci;
-    wxDataOutputStream *codeco;
-    wxSocketStream *sockstrm;
-    wxString topic_name = connection->m_topic;
-    wxString item;
-
-    // We lost the connection: destroy everything
-    if (evt == wxSOCKET_LOST)
+    if ( evt == wxSOCKET_LOST )
     {
     {
-        sock->Notify(false);
-        sock->Close();
-        connection->OnDisconnect();
+        HandleDisconnect(connection);
         return;
     }
 
     // Receive message number.
         return;
     }
 
     // Receive message number.
-    codeci = connection->m_codeci;
-    codeco = connection->m_codeco;
-    sockstrm = connection->m_sockstrm;
-    int msg = codeci->Read8();
+    wxIPCSocketStreams * const streams = connection->m_streams;
 
 
-    switch (msg)
+    const wxString topic = connection->m_topic;
+    wxString item;
+
+    bool error = false;
+
+    const int msg = streams->Read8();
+    switch ( msg )
     {
         case IPC_EXECUTE:
             {
     {
         case IPC_EXECUTE:
             {
-                void *data;
-                size_t size;
                 wxIPCFormat format;
                 wxIPCFormat format;
-
-                format = (wxIPCFormat)codeci->Read8();
-                size = codeci->Read32();
-
-                data = connection->GetBufferAtLeast( size );
-                wxASSERT_MSG(data != NULL,
-                    "Buffer too small in wxTCPEventHandler::Client_OnRequest" );
-                sockstrm->Read(data, size);
-
-                connection->OnExecute (topic_name, data, size, format);
-
-                break;
+                size_t size wxDUMMY_INITIALIZE(0);
+                void * const
+                    data = streams->ReadFormatData(connection, &format, &size);
+                if ( data )
+                    connection->OnExecute(topic, data, size, format);
+                else
+                    error = true;
             }
             }
+            break;
+
         case IPC_ADVISE:
             {
         case IPC_ADVISE:
             {
-                item = codeci->ReadString();
-                wxIPCFormat format = (wxIPCFormat)codeci->Read8();
-                size_t size = codeci->Read32();
-                void *data = connection->GetBufferAtLeast( size );
-                wxASSERT_MSG(data != NULL,
-                    "Buffer too small in wxTCPEventHandler::Client_OnRequest" );
-                sockstrm->Read(data, size);
+                item = streams->ReadString();
 
 
-                connection->OnAdvise (topic_name, item, data, size, format);
+                wxIPCFormat format;
+                size_t size wxDUMMY_INITIALIZE(0);
+                void * const
+                    data = streams->ReadFormatData(connection, &format, &size);
 
 
-                break;
+                if ( data )
+                    connection->OnAdvise(topic, item, data, size, format);
+                else
+                    error = true;
             }
             }
+            break;
+
         case IPC_ADVISE_START:
             {
         case IPC_ADVISE_START:
             {
-                item = codeci->ReadString();
+                item = streams->ReadString();
 
 
-                bool ok = connection->OnStartAdvise (topic_name, item);
-                if (ok)
-                    codeco->Write8(IPC_ADVISE_START);
-                else
-                    codeco->Write8(IPC_FAIL);
-
-                break;
+                IPCOutput(streams).Write8(connection->OnStartAdvise(topic, item)
+                                            ? IPC_ADVISE_START
+                                            : IPC_FAIL);
             }
             }
+            break;
+
         case IPC_ADVISE_STOP:
             {
         case IPC_ADVISE_STOP:
             {
-                item = codeci->ReadString();
-
-                bool ok = connection->OnStopAdvise (topic_name, item);
-                if (ok)
-                    codeco->Write8(IPC_ADVISE_STOP);
-                else
-                    codeco->Write8(IPC_FAIL);
+                item = streams->ReadString();
 
 
-                break;
+                IPCOutput(streams).Write8(connection->OnStopAdvise(topic, item)
+                                             ? IPC_ADVISE_STOP
+                                             : IPC_FAIL);
             }
             }
+            break;
+
         case IPC_POKE:
             {
         case IPC_POKE:
             {
-                item = codeci->ReadString();
-                wxIPCFormat format = (wxIPCFormat)codeci->Read8();
-                size_t size = codeci->Read32();
-                void *data = connection->GetBufferAtLeast( size );
-                wxASSERT_MSG(data != NULL,
-                    "Buffer too small in wxTCPEventHandler::Client_OnRequest" );
-                sockstrm->Read(data, size);
+                item = streams->ReadString();
+                wxIPCFormat format = (wxIPCFormat)streams->Read8();
 
 
-                connection->OnPoke (topic_name, item, data, size, format);
+                size_t size wxDUMMY_INITIALIZE(0);
+                void * const data = streams->ReadData(connection, &size);
 
 
-                break;
+                if ( data )
+                    connection->OnPoke(topic, item, data, size, format);
+                else
+                    error = true;
             }
             }
+            break;
+
         case IPC_REQUEST:
             {
         case IPC_REQUEST:
             {
-                wxIPCFormat format;
+                item = streams->ReadString();
 
 
-                item = codeci->ReadString();
-                format = (wxIPCFormat)codeci->Read8();
+                wxIPCFormat format = (wxIPCFormat)streams->Read8();
 
                 size_t user_size = wxNO_LEN;
 
                 size_t user_size = wxNO_LEN;
-                const void *user_data = connection->OnRequest(topic_name,
+                const void *user_data = connection->OnRequest(topic,
                                                               item,
                                                               &user_size,
                                                               format);
 
                                                               item,
                                                               &user_size,
                                                               format);
 
-                if (user_data)
+                if ( !user_data )
                 {
                 {
-                    codeco->Write8(IPC_REQUEST_REPLY);
+                    IPCOutput(streams).Write8(IPC_FAIL);
+                    break;
+                }
+
+                IPCOutput out(streams);
+                out.Write8(IPC_REQUEST_REPLY);
 
 
-                    if (user_size == wxNO_LEN)
+                if ( user_size == wxNO_LEN )
+                {
+                    switch ( format )
                     {
                     {
-                        switch (format)
-                        {
-                            case wxIPC_TEXT:
-                            case wxIPC_UTF8TEXT:
-                                user_size = strlen((const char *)user_data) + 1;  // includes final NUL
-                                break;
-                            case wxIPC_UNICODETEXT:
-                                user_size = (wcslen((const wchar_t *)user_data) + 1) * sizeof(wchar_t);  // includes final NUL
-                                break;
-                            default:
-                                user_size = 0;
-                        }
+                        case wxIPC_TEXT:
+                        case wxIPC_UTF8TEXT:
+                            user_size = strlen((const char *)user_data) + 1;  // includes final NUL
+                            break;
+                        case wxIPC_UNICODETEXT:
+                            user_size = (wcslen((const wchar_t *)user_data) + 1) * sizeof(wchar_t);  // includes final NUL
+                            break;
+                        default:
+                            user_size = 0;
                     }
                     }
-
-                    codeco->Write32(user_size);
-                    sockstrm->Write(user_data, user_size);
                 }
                 }
-                else
-                    codeco->Write8(IPC_FAIL);
 
 
-                break;
+                out.WriteData(user_data, user_size);
             }
             }
+            break;
+
         case IPC_DISCONNECT:
         case IPC_DISCONNECT:
-            {
-                sock->Notify(false);
-                sock->Close();
-                connection->SetConnected(false);
-                connection->OnDisconnect();
-                break;
-            }
+            HandleDisconnect(connection);
+            break;
+
+        case IPC_FAIL:
+            wxLogDebug("Unexpected IPC_FAIL received");
+            error = true;
+            break;
+
         default:
         default:
-            codeco->Write8(IPC_FAIL);
+            wxLogDebug("Unknown message code %d received.", msg);
+            error = true;
             break;
     }
             break;
     }
+
+    if ( error )
+        IPCOutput(streams).Write8(IPC_FAIL);
 }
 
 void wxTCPEventHandler::Server_OnRequest(wxSocketEvent &event)
 {
     wxSocketServer *server = (wxSocketServer *) event.GetSocket();
     if (!server)
 }
 
 void wxTCPEventHandler::Server_OnRequest(wxSocketEvent &event)
 {
     wxSocketServer *server = (wxSocketServer *) event.GetSocket();
     if (!server)
-        return ;
+        return;
     wxTCPServer *ipcserv = (wxTCPServer *) server->GetClientData();
 
     // This socket is being deleted; skip this event
     wxTCPServer *ipcserv = (wxTCPServer *) server->GetClientData();
 
     // This socket is being deleted; skip this event
@@ -682,75 +876,57 @@ void wxTCPEventHandler::Server_OnRequest(wxSocketEvent &event)
     // Accept the connection, getting a new socket
     wxSocketBase *sock = server->Accept();
     if (!sock)
     // Accept the connection, getting a new socket
     wxSocketBase *sock = server->Accept();
     if (!sock)
-        return ;
+        return;
     if (!sock->Ok())
     {
         sock->Destroy();
         return;
     }
 
     if (!sock->Ok())
     {
         sock->Destroy();
         return;
     }
 
-    wxSocketStream *stream     = new wxSocketStream(*sock);
-    wxDataInputStream *codeci  = new wxDataInputStream(*stream);
-    wxDataOutputStream *codeco = new wxDataOutputStream(*stream);
-
-    int msg;
-    msg = codeci->Read8();
+    wxIPCSocketStreams *streams = new wxIPCSocketStreams(*sock);
 
 
-    if (msg == IPC_CONNECT)
     {
     {
-        wxString topic_name;
-        topic_name = codeci->ReadString();
+        IPCOutput out(streams);
 
 
-        wxTCPConnection *new_connection =
-            (wxTCPConnection *)ipcserv->OnAcceptConnection (topic_name);
-
-        if (new_connection)
+        const int msg = streams->Read8();
+        if ( msg == IPC_CONNECT )
         {
         {
-            if (new_connection->IsKindOf(CLASSINFO(wxTCPConnection)))
-            {
-                // Acknowledge success
-                codeco->Write8(IPC_CONNECT);
-                new_connection->m_topic = topic_name;
-                new_connection->m_sock = sock;
-                new_connection->m_sockstrm = stream;
-                new_connection->m_codeci = codeci;
-                new_connection->m_codeco = codeco;
-                sock->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID);
-                sock->SetClientData(new_connection);
-                sock->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
-                sock->Notify(true);
-                return;
-            }
-            else
+            const wxString topic = streams->ReadString();
+
+            wxTCPConnection *new_connection =
+                (wxTCPConnection *)ipcserv->OnAcceptConnection (topic);
+
+            if (new_connection)
             {
             {
-                delete new_connection;
-                // and fall through to delete everything else
+                if (new_connection->IsKindOf(CLASSINFO(wxTCPConnection)))
+                {
+                    // Acknowledge success
+                    out.Write8(IPC_CONNECT);
+
+                    new_connection->m_sock = sock;
+                    new_connection->m_streams = streams;
+                    new_connection->m_topic = topic;
+                    sock->SetEventHandler(wxTCPEventHandlerModule::GetHandler(),
+                                          _CLIENT_ONREQUEST_ID);
+                    sock->SetClientData(new_connection);
+                    sock->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG);
+                    sock->Notify(true);
+                    return;
+                }
+                else
+                {
+                    delete new_connection;
+                    // and fall through to delete everything else
+                }
             }
         }
             }
         }
-    }
 
 
-    // Something went wrong, send failure message and delete everything
-    codeco->Write8(IPC_FAIL);
+        // Something went wrong, send failure message and delete everything
+        out.Write8(IPC_FAIL);
+    } // IPCOutput object is destroyed here, before destroying stream
 
 
-    delete codeco;
-    delete codeci;
-    delete stream;
+    delete streams;
     sock->Destroy();
 }
 
     sock->Destroy();
 }
 
-// --------------------------------------------------------------------------
-// wxTCPEventHandlerModule (private class)
-// --------------------------------------------------------------------------
-
-class wxTCPEventHandlerModule: public wxModule
-{
-public:
-    virtual bool OnInit() { gs_handler = new wxTCPEventHandler; return true; }
-    virtual void OnExit() { wxDELETE(gs_handler); }
-
-    DECLARE_DYNAMIC_CLASS(wxTCPEventHandlerModule)
-};
-
-IMPLEMENT_DYNAMIC_CLASS(wxTCPEventHandlerModule, wxModule)
-
 #endif // wxUSE_SOCKETS && wxUSE_IPC && wxUSE_STREAMS
 #endif // wxUSE_SOCKETS && wxUSE_IPC && wxUSE_STREAMS