From 8aea37a9df3a2f5ca949ff466bbf698a4b2e3062 Mon Sep 17 00:00:00 2001 From: Vadim Zeitlin Date: Wed, 29 Oct 2008 16:45:55 +0000 Subject: [PATCH] use buffered streams to reduce the number of TCP packets used per IPC command from up to 7 to 1 for reasonably sized payloads, this dramatically (by 150 times for the IPC benchmark on a LAN) increases performance; also centralize all the streams used in a single wxIPCSocketStreams class and allocate only it on the heap instead of doing it for all of the streams git-svn-id: https://svn.wxwidgets.org/svn/wx/wxWidgets/trunk@56584 c3d73ce0-8a6f-49c7-b76d-6d57e0e08775 --- docs/changes.txt | 1 + include/wx/sckipc.h | 20 +- src/common/sckipc.cpp | 518 ++++++++++++++++++++++++++---------------- 3 files changed, 337 insertions(+), 202 deletions(-) diff --git a/docs/changes.txt b/docs/changes.txt index a70db487d6..973417ae5b 100644 --- a/docs/changes.txt +++ b/docs/changes.txt @@ -316,6 +316,7 @@ All: All (Unix): - Added wx-config --optional-libs command line option (John Labenski). +- Noticeably (by a factor of ~150) improve wxIPC classes performance. All (GUI): diff --git a/include/wx/sckipc.h b/include/wx/sckipc.h index 205366d40b..11e612c227 100644 --- a/include/wx/sckipc.h +++ b/include/wx/sckipc.h @@ -52,6 +52,8 @@ class WXDLLIMPEXP_FWD_NET wxTCPServer; class WXDLLIMPEXP_FWD_NET wxTCPClient; +class wxIPCSocketStreams; + class WXDLLIMPEXP_NET wxTCPConnection : public wxConnectionBase { public: @@ -85,11 +87,19 @@ protected: wxIPCFormat format); - wxSocketBase *m_sock; - wxSocketStream *m_sockstrm; - wxDataInputStream *m_codeci; - wxDataOutputStream *m_codeco; - wxString m_topic; + // notice that all the members below are only initialized once the + // connection is made, i.e. in MakeConnection() for the client objects and + // after OnAcceptConnection() in the server ones + + // the underlying socket (wxSocketClient for IPC client and wxSocketServer + // for IPC server) + wxSocketBase *m_sock; + + // various streams that we use + wxIPCSocketStreams *m_streams; + + // the topic of this connection + wxString m_topic; private: // common part of both ctors diff --git a/src/common/sckipc.cpp b/src/common/sckipc.cpp index 1d43462ec2..ff7ebbcc2a 100644 --- a/src/common/sckipc.cpp +++ b/src/common/sckipc.cpp @@ -6,6 +6,7 @@ // Guillermo Rodriguez (updated for wxSocket v2) Jan 2000 // (callbacks deprecated) Mar 2000 // Vadim Zeitlin (added support for Unix sockets) Apr 2002 +// (use buffering, many fixes/cleanup) Oct 2008 // Created: 1993 // RCS-ID: $Id$ // Copyright: (c) Julian Smart 1993 @@ -49,8 +50,11 @@ // macros and constants // -------------------------------------------------------------------------- +namespace +{ + // Message codes -enum +enum IPCCode { IPC_EXECUTE = 1, IPC_REQUEST, @@ -65,8 +69,7 @@ enum IPC_DISCONNECT }; -// All sockets will be created with the following flags -#define SCKIPC_FLAGS (wxSOCKET_WAITALL|wxSOCKET_REUSEADDR) +} // anonymous namespace // headers needed for umask() #ifdef __UNIX_LIKE__ @@ -131,6 +134,182 @@ enum static wxTCPEventHandler *gs_handler = NULL; +// -------------------------------------------------------------------------- +// wxIPCSocketStreams +// -------------------------------------------------------------------------- + +#define USE_BUFFER + +// this class contains the various (related) streams used by wxTCPConnection +// and also provides a way to read from the socket stream directly +// +// for writing to the stream use the IPCOutput class below +class wxIPCSocketStreams +{ +public: + // ctor initializes all the streams on top of the given socket + // + // note that we use a bigger than default buffer size which matches the + // typical Ethernet MTU + wxIPCSocketStreams(wxSocketBase& sock) + : m_socketStream(sock), +#ifdef USE_BUFFER + m_bufferedOut(m_socketStream, 1500), +#else + m_bufferedOut(m_socketStream), +#endif + m_dataIn(m_socketStream), + m_dataOut(m_bufferedOut) + { + } + + // expose the IO methods needed by IPC code (notice that writing is only + // done via IPCOutput) + + // flush output + void Flush() + { +#ifdef USE_BUFFER + m_bufferedOut.Sync(); +#endif + } + + // simple wrappers around the functions with the same name in + // wxDataInputStream + wxUint8 Read8() + { + Flush(); + return m_dataIn.Read8(); + } + + wxUint32 Read32() + { + Flush(); + return m_dataIn.Read32(); + } + + wxString ReadString() + { + Flush(); + return m_dataIn.ReadString(); + } + + // read arbitrary (size-prepended) data + // + // connection parameter is needed to call its GetBufferAtLeast() method + void *ReadData(wxConnectionBase *conn, size_t *size) + { + Flush(); + + wxCHECK_MSG( conn, NULL, "NULL connection parameter" ); + wxCHECK_MSG( size, NULL, "NULL size parameter" ); + + *size = Read32(); + + void * const data = conn->GetBufferAtLeast(*size); + wxCHECK_MSG( data, NULL, "IPC buffer allocation failed" ); + + m_socketStream.Read(data, *size); + + return data; + } + + // same as above but for data preceded by the format + void * + ReadFormatData(wxConnectionBase *conn, wxIPCFormat *format, size_t *size) + { + wxCHECK_MSG( format, NULL, "NULL format parameter" ); + + *format = static_cast(Read8()); + + return ReadData(conn, size); + } + + + // these methods are only used by IPCOutput and not directly + wxDataOutputStream& GetDataOut() { return m_dataOut; } + wxOutputStream& GetUnformattedOut() { return m_bufferedOut; } + +private: + // this is the low-level underlying stream using the connection socket + wxSocketStream m_socketStream; + + // the buffered stream is used to avoid writing all pieces of an IPC + // request to the socket one by one but to instead do it all at once when + // we're done with it +#ifdef USE_BUFFER + wxBufferedOutputStream m_bufferedOut; +#else + wxOutputStream& m_bufferedOut; +#endif + + // finally the data streams are used to be able to write typed data into + // the above streams easily + wxDataInputStream m_dataIn; + wxDataOutputStream m_dataOut; + + DECLARE_NO_COPY_CLASS(wxIPCSocketStreams) +}; + +namespace +{ + +// an object of this class should be instantiated on the stack to write to the +// underlying socket stream +// +// this class is intentionally separated from wxIPCSocketStreams to ensure that +// Flush() is always called +class IPCOutput +{ +public: + // construct an object associated with the given streams (which must have + // life time greater than ours as we keep a reference to it) + IPCOutput(wxIPCSocketStreams *streams) + : m_streams(*streams) + { + wxASSERT_MSG( streams, "NULL streams pointer" ); + } + + // dtor calls Flush() really sending the IPC data to the network + ~IPCOutput() { m_streams.Flush(); } + + + // write a byte + void Write8(wxUint8 i) + { + m_streams.GetDataOut().Write8(i); + } + + // write the reply code and a string + void Write(IPCCode code, const wxString& str) + { + Write8(code); + m_streams.GetDataOut().WriteString(str); + } + + // write the reply code, a string and a format in this order + void Write(IPCCode code, const wxString& str, wxIPCFormat format) + { + Write(code, str); + Write8(format); + } + + // write arbitrary data + void WriteData(const void *data, size_t size) + { + m_streams.GetDataOut().Write32(size); + m_streams.GetUnformattedOut().Write(data, size); + } + + +private: + wxIPCSocketStreams& m_streams; + + DECLARE_NO_COPY_CLASS(IPCOutput) +}; + +} // anonymous namespace + // ========================================================================== // implementation // ========================================================================== @@ -155,31 +334,26 @@ bool wxTCPClient::ValidHost(const wxString& host) return addr.Hostname(host); } -wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host, - const wxString& serverName, - const wxString& topic) +wxConnectionBase *wxTCPClient::MakeConnection(const wxString& host, + const wxString& serverName, + const wxString& topic) { wxSockAddress *addr = GetAddressFromName(serverName, host); if ( !addr ) return NULL; - wxSocketClient *client = new wxSocketClient(SCKIPC_FLAGS); - wxSocketStream *stream = new wxSocketStream(*client); - wxDataInputStream *data_is = new wxDataInputStream(*stream); - wxDataOutputStream *data_os = new wxDataOutputStream(*stream); + wxSocketClient * const client = new wxSocketClient(wxSOCKET_WAITALL); + wxIPCSocketStreams * const streams = new wxIPCSocketStreams(*client); bool ok = client->Connect(*addr); delete addr; if ( ok ) { - unsigned char msg; - // Send topic name, and enquire whether this has succeeded - data_os->Write8(IPC_CONNECT); - data_os->WriteString(topic); + IPCOutput(streams).Write(IPC_CONNECT, topic); - msg = data_is->Read8(); + unsigned char msg = streams->Read8(); // OK! Confirmation. if (msg == IPC_CONNECT) @@ -193,9 +367,7 @@ wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host, { connection->m_topic = topic; connection->m_sock = client; - connection->m_sockstrm = stream; - connection->m_codeci = data_is; - connection->m_codeco = data_os; + connection->m_streams = streams; client->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID); client->SetClientData(connection); client->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG); @@ -212,9 +384,7 @@ wxConnectionBase *wxTCPClient::MakeConnection (const wxString& host, } // Something went wrong, delete everything - delete data_is; - delete data_os; - delete stream; + delete streams; client->Destroy(); return NULL; @@ -273,8 +443,10 @@ bool wxTCPServer::Create(const wxString& serverName) } #endif // __UNIX_LIKE__ - // Create a socket listening on the specified port - m_server = new wxSocketServer(*addr, SCKIPC_FLAGS); + // Create a socket listening on the specified port (reusing it to allow + // restarting the server listening on the same port as was used by the + // previous instance of this server) + m_server = new wxSocketServer(*addr, wxSOCKET_WAITALL | wxSOCKET_REUSEADDR); #ifdef __UNIX_LIKE__ if ( addr->Type() == wxSockAddress::UNIX ) @@ -336,10 +508,8 @@ wxTCPServer::OnAcceptConnection(const wxString& WXUNUSED(topic)) void wxTCPConnection::Init() { - m_sock = NULL; - m_sockstrm = NULL; - m_codeci = NULL; - m_codeco = NULL; + m_sock = NULL; + m_streams = NULL; } wxTCPConnection::~wxTCPConnection() @@ -352,10 +522,7 @@ wxTCPConnection::~wxTCPConnection() m_sock->Destroy(); } - /* Delete after destroy */ - wxDELETE(m_codeci); - wxDELETE(m_codeco); - wxDELETE(m_sockstrm); + delete m_streams; } void wxTCPConnection::Compress(bool WXUNUSED(on)) @@ -370,7 +537,7 @@ bool wxTCPConnection::Disconnect() return true; // Send the disconnect message to the peer. - m_codeco->Write8(IPC_DISCONNECT); + IPCOutput(m_streams).Write8(IPC_DISCONNECT); if ( m_sock ) { @@ -391,11 +558,11 @@ bool wxTCPConnection::DoExecute(const void *data, return false; // Prepare EXECUTE message - m_codeco->Write8(IPC_EXECUTE); - m_codeco->Write8(format); + IPCOutput out(m_streams); + out.Write8(IPC_EXECUTE); + out.Write8(format); - m_codeco->Write32(size); - m_sockstrm->Write(data, size); + out.WriteData(data, size); return true; } @@ -407,24 +574,13 @@ const void *wxTCPConnection::Request(const wxString& item, if ( !m_sock->IsConnected() ) return NULL; - m_codeco->Write8(IPC_REQUEST); - m_codeco->WriteString(item); - m_codeco->Write8(format); + IPCOutput(m_streams).Write(IPC_REQUEST, item, format); - int ret = m_codeci->Read8(); + int ret = m_streams->Read8(); if ( ret == IPC_FAIL ) return NULL; - size_t s = m_codeci->Read32(); - - void *data = GetBufferAtLeast( s ); - wxASSERT_MSG(data != NULL, - _T("Buffer too small in wxTCPConnection::Request") ); - m_sockstrm->Read(data, s); - - if (size) - *size = s; - return data; + return m_streams->ReadData(this, size); } bool wxTCPConnection::DoPoke(const wxString& item, @@ -435,25 +591,21 @@ bool wxTCPConnection::DoPoke(const wxString& item, if ( !m_sock->IsConnected() ) return false; - m_codeco->Write8(IPC_POKE); - m_codeco->WriteString(item); - m_codeco->Write8(format); - - m_codeco->Write32(size); - m_sockstrm->Write(data, size); + IPCOutput out(m_streams); + out.Write(IPC_POKE, item, format); + out.WriteData(data, size); return true; } -bool wxTCPConnection::StartAdvise (const wxString& item) +bool wxTCPConnection::StartAdvise(const wxString& item) { if ( !m_sock->IsConnected() ) return false; - m_codeco->Write8(IPC_ADVISE_START); - m_codeco->WriteString(item); + IPCOutput(m_streams).Write(IPC_ADVISE_START, item); - int ret = m_codeci->Read8(); + int ret = m_streams->Read8(); if (ret != IPC_FAIL) return true; else @@ -465,10 +617,9 @@ bool wxTCPConnection::StopAdvise (const wxString& item) if ( !m_sock->IsConnected() ) return false; - m_codeco->Write8(IPC_ADVISE_STOP); - m_codeco->WriteString(item); + IPCOutput(m_streams).Write(IPC_ADVISE_STOP, item); - int ret = m_codeci->Read8(); + int ret = m_streams->Read8(); if (ret != IPC_FAIL) return true; @@ -485,12 +636,9 @@ bool wxTCPConnection::DoAdvise(const wxString& item, if ( !m_sock->IsConnected() ) return false; - m_codeco->Write8(IPC_ADVISE); - m_codeco->WriteString(item); - m_codeco->Write8(format); - - m_codeco->Write32(size); - m_sockstrm->Write(data, size); + IPCOutput out(m_streams); + out.Write(IPC_ADVISE, item, format); + out.WriteData(data, size); return true; } @@ -517,12 +665,6 @@ void wxTCPEventHandler::Client_OnRequest(wxSocketEvent &event) if (!connection) return; - wxDataInputStream *codeci; - wxDataOutputStream *codeco; - wxSocketStream *sockstrm; - wxString topic_name = connection->m_topic; - wxString item; - // We lost the connection: destroy everything if (evt == wxSOCKET_LOST) { @@ -533,134 +675,120 @@ void wxTCPEventHandler::Client_OnRequest(wxSocketEvent &event) } // Receive message number. - codeci = connection->m_codeci; - codeco = connection->m_codeco; - sockstrm = connection->m_sockstrm; - int msg = codeci->Read8(); + wxIPCSocketStreams * const streams = connection->m_streams; - switch (msg) + const wxString topic = connection->m_topic; + wxString item; + + switch ( const int msg = streams->Read8() ) { case IPC_EXECUTE: { - void *data; - size_t size; wxIPCFormat format; + size_t size; + void * const + data = streams->ReadFormatData(connection, &format, &size); - format = (wxIPCFormat)codeci->Read8(); - size = codeci->Read32(); - - data = connection->GetBufferAtLeast( size ); - wxASSERT_MSG(data != NULL, - "Buffer too small in wxTCPEventHandler::Client_OnRequest" ); - sockstrm->Read(data, size); - - connection->OnExecute (topic_name, data, size, format); - - break; + connection->OnExecute(topic, data, size, format); } + break; + case IPC_ADVISE: { - item = codeci->ReadString(); - wxIPCFormat format = (wxIPCFormat)codeci->Read8(); - size_t size = codeci->Read32(); - void *data = connection->GetBufferAtLeast( size ); - wxASSERT_MSG(data != NULL, - "Buffer too small in wxTCPEventHandler::Client_OnRequest" ); - sockstrm->Read(data, size); + item = streams->ReadString(); - connection->OnAdvise (topic_name, item, data, size, format); + wxIPCFormat format; + size_t size; + void * const + data = streams->ReadFormatData(connection, &format, &size); - break; + connection->OnAdvise(topic, item, data, size, format); } + break; + case IPC_ADVISE_START: { - item = codeci->ReadString(); + item = streams->ReadString(); - bool ok = connection->OnStartAdvise (topic_name, item); - if (ok) - codeco->Write8(IPC_ADVISE_START); - else - codeco->Write8(IPC_FAIL); - - break; + IPCOutput(streams).Write8(connection->OnStartAdvise(topic, item) + ? IPC_ADVISE_START + : IPC_FAIL); } + break; + case IPC_ADVISE_STOP: { - item = codeci->ReadString(); + item = streams->ReadString(); - bool ok = connection->OnStopAdvise (topic_name, item); - if (ok) - codeco->Write8(IPC_ADVISE_STOP); - else - codeco->Write8(IPC_FAIL); - - break; + IPCOutput(streams).Write8(connection->OnStopAdvise(topic, item) + ? IPC_ADVISE_STOP + : IPC_FAIL); } + break; + case IPC_POKE: { - item = codeci->ReadString(); - wxIPCFormat format = (wxIPCFormat)codeci->Read8(); - size_t size = codeci->Read32(); - void *data = connection->GetBufferAtLeast( size ); - wxASSERT_MSG(data != NULL, - "Buffer too small in wxTCPEventHandler::Client_OnRequest" ); - sockstrm->Read(data, size); + item = streams->ReadString(); + wxIPCFormat format = (wxIPCFormat)streams->Read8(); - connection->OnPoke (topic_name, item, data, size, format); + size_t size; + void * const data = streams->ReadData(connection, &size); - break; + connection->OnPoke(topic, item, data, size, format); } + break; + case IPC_REQUEST: { - wxIPCFormat format; + item = streams->ReadString(); - item = codeci->ReadString(); - format = (wxIPCFormat)codeci->Read8(); + wxIPCFormat format = (wxIPCFormat)streams->Read8(); size_t user_size = wxNO_LEN; - const void *user_data = connection->OnRequest(topic_name, + const void *user_data = connection->OnRequest(topic, item, &user_size, format); - if (user_data) + if ( !user_data ) { - codeco->Write8(IPC_REQUEST_REPLY); + IPCOutput(streams).Write8(IPC_FAIL); + break; + } + + IPCOutput out(streams); + out.Write8(IPC_REQUEST_REPLY); - if (user_size == wxNO_LEN) + if ( user_size == wxNO_LEN ) + { + switch ( format ) { - switch (format) - { - case wxIPC_TEXT: - case wxIPC_UTF8TEXT: - user_size = strlen((const char *)user_data) + 1; // includes final NUL - break; - case wxIPC_UNICODETEXT: - user_size = (wcslen((const wchar_t *)user_data) + 1) * sizeof(wchar_t); // includes final NUL - break; - default: - user_size = 0; - } + case wxIPC_TEXT: + case wxIPC_UTF8TEXT: + user_size = strlen((const char *)user_data) + 1; // includes final NUL + break; + case wxIPC_UNICODETEXT: + user_size = (wcslen((const wchar_t *)user_data) + 1) * sizeof(wchar_t); // includes final NUL + break; + default: + user_size = 0; } - - codeco->Write32(user_size); - sockstrm->Write(user_data, user_size); } - else - codeco->Write8(IPC_FAIL); - break; + out.WriteData(user_data, user_size); } + break; + case IPC_DISCONNECT: - { - sock->Notify(false); - sock->Close(); - connection->SetConnected(false); - connection->OnDisconnect(); - break; - } + sock->Notify(false); + sock->Close(); + connection->SetConnected(false); + connection->OnDisconnect(); + break; + default: - codeco->Write8(IPC_FAIL); + wxLogDebug("Unknown message code %d received.", msg); + IPCOutput(streams).Write8(IPC_FAIL); break; } } @@ -689,52 +817,48 @@ void wxTCPEventHandler::Server_OnRequest(wxSocketEvent &event) return; } - wxSocketStream *stream = new wxSocketStream(*sock); - wxDataInputStream *codeci = new wxDataInputStream(*stream); - wxDataOutputStream *codeco = new wxDataOutputStream(*stream); - - int msg; - msg = codeci->Read8(); + wxIPCSocketStreams *streams = new wxIPCSocketStreams(*sock); - if (msg == IPC_CONNECT) { - wxString topic_name; - topic_name = codeci->ReadString(); + IPCOutput out(streams); - wxTCPConnection *new_connection = - (wxTCPConnection *)ipcserv->OnAcceptConnection (topic_name); - - if (new_connection) + const int msg = streams->Read8(); + if ( msg == IPC_CONNECT ) { - if (new_connection->IsKindOf(CLASSINFO(wxTCPConnection))) - { - // Acknowledge success - codeco->Write8(IPC_CONNECT); - new_connection->m_topic = topic_name; - new_connection->m_sock = sock; - new_connection->m_sockstrm = stream; - new_connection->m_codeci = codeci; - new_connection->m_codeco = codeco; - sock->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID); - sock->SetClientData(new_connection); - sock->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG); - sock->Notify(true); - return; - } - else + const wxString topic = streams->ReadString(); + + wxTCPConnection *new_connection = + (wxTCPConnection *)ipcserv->OnAcceptConnection (topic); + + if (new_connection) { - delete new_connection; - // and fall through to delete everything else + if (new_connection->IsKindOf(CLASSINFO(wxTCPConnection))) + { + // Acknowledge success + out.Write8(IPC_CONNECT); + + new_connection->m_sock = sock; + new_connection->m_streams = streams; + new_connection->m_topic = topic; + sock->SetEventHandler(*gs_handler, _CLIENT_ONREQUEST_ID); + sock->SetClientData(new_connection); + sock->SetNotify(wxSOCKET_INPUT_FLAG | wxSOCKET_LOST_FLAG); + sock->Notify(true); + return; + } + else + { + delete new_connection; + // and fall through to delete everything else + } } } - } - // Something went wrong, send failure message and delete everything - codeco->Write8(IPC_FAIL); + // Something went wrong, send failure message and delete everything + out.Write8(IPC_FAIL); + } // IPCOutput object is destroyed here, before destroying stream - delete codeco; - delete codeci; - delete stream; + delete streams; sock->Destroy(); } -- 2.45.2