1 ///////////////////////////////////////////////////////////////////////////// 
   2 // Name:        samples/sockbase/client.cpp 
   3 // Purpose:     Sockets sample for wxBase 
   4 // Author:      Lukasz Michalski 
   8 // Copyright:   (c) 2005 Lukasz Michalski <lmichalski@sf.net> 
   9 // Licence:     wxWindows licence 
  10 ///////////////////////////////////////////////////////////////////////////// 
  12 // ============================================================================ 
  14 // ============================================================================ 
  16 // ---------------------------------------------------------------------------- 
  18 // ---------------------------------------------------------------------------- 
  21 #include "wx/socket.h" 
  24 #include "wx/cmdline.h" 
  26 #include "wx/datetime.h" 
  28 #include "wx/thread.h" 
  30 const wxEventType wxEVT_WORKER 
= wxNewEventType(); 
  31 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ), 
  33 const int timeout_val 
= 1000; 
  35 class WorkerEvent 
: public wxEvent 
{ 
  44     WorkerEvent(void* pSender
, evt_type type
) 
  47         SetEventType(wxEVT_WORKER
); 
  53     void setFailed() { m_isFailed 
= true; } 
  54     bool isFailed() const { return m_isFailed
; } 
  56     virtual wxEvent
* Clone() const 
  58         return new WorkerEvent(*this); 
  62     wxString m_workerIdent
; 
  66 typedef void (wxEvtHandler::*WorkerEventFunction
)(WorkerEvent
&); 
  71 WX_DECLARE_LIST(ThreadWorker
, TList
); 
  72 WX_DECLARE_LIST(EventWorker
, EList
); 
  74 class Client 
: public wxApp 
{ 
  77     void RemoveEventWorker(EventWorker
* p_worker
); 
  98     virtual bool OnInit(); 
 100     virtual int OnExit(); 
 101     void OnInitCmdLine(wxCmdLineParser
& pParser
); 
 102     bool OnCmdLineParsed(wxCmdLineParser
& pParser
); 
 103     void OnWorkerEvent(WorkerEvent
& pEvent
); 
 104     void OnTimerEvent(wxTimerEvent
& pEvent
); 
 106     void StartWorker(workMode pMode
, const wxString
& pMessage
); 
 107     void StartWorker(workMode pMode
); 
 108     char* CreateBuffer(int *msgsize
); 
 110     void dumpStatistics(); 
 112     TList m_threadWorkers
; 
 113     EList m_eventWorkers
; 
 115     unsigned m_statConnecting
; 
 116     unsigned m_statSending
; 
 117     unsigned m_statReceiving
; 
 118     unsigned m_statDisconnecting
; 
 120     unsigned m_statFailed
; 
 127 class ThreadWorker 
: public wxThread
 
 130     ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
); 
 131     virtual ExitCode 
Entry(); 
 134     wxSocketClient
* m_clientSocket
; 
 139     wxString m_workerIdent
; 
 142 class EventWorker 
: public wxEvtHandler
 
 144     DECLARE_EVENT_TABLE() 
 146     EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
); 
 148     virtual ~EventWorker(); 
 151     wxSocketClient
* m_clientSocket
; 
 159     WorkerEvent::evt_type m_currentType
; 
 161     wxIPV4address m_localaddr
; 
 163     void OnSocketEvent(wxSocketEvent
& pEvent
); 
 164     void SendEvent(bool failed
); 
 167 /******************* Implementation ******************/ 
 168 IMPLEMENT_APP_CONSOLE(Client
); 
 170 #include <wx/listimpl.cpp> 
 171 WX_DEFINE_LIST(TList
); 
 172 WX_DEFINE_LIST(EList
); 
 175 CreateIdent(const wxIPV4address
& addr
) 
 177     return wxString::Format(wxT("%s:%d"),addr
.IPAddress().c_str(),addr
.Service()); 
 181 Client::OnInitCmdLine(wxCmdLineParser
& pParser
) 
 183     wxApp::OnInitCmdLine(pParser
); 
 184     pParser
.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL
); 
 185     pParser
.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL
); 
 186     pParser
.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL
); 
 187     pParser
.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
); 
 188     pParser
.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
); 
 189     pParser
.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
); 
 190     pParser
.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER
,wxCMD_LINE_PARAM_OPTIONAL
); 
 195 Client::OnCmdLineParsed(wxCmdLineParser
& pParser
) 
 199     m_stressWorkers 
= 50; 
 201     if (pParser
.Found(_("verbose"))) 
 203         wxLog::AddTraceMask(wxT("wxSocket")); 
 204         wxLog::AddTraceMask(wxT("epolldispatcher")); 
 205         wxLog::AddTraceMask(wxT("selectdispatcher")); 
 206         wxLog::AddTraceMask(wxT("thread")); 
 207         wxLog::AddTraceMask(wxT("events")); 
 210     if (pParser
.Found(wxT("t"))) 
 211         m_workMode 
= THREADS
; 
 212     m_sendType 
= SEND_RANDOM
; 
 214     if (pParser
.Found(wxT("m"),&m_message
)) 
 215         m_sendType 
= SEND_MESSAGE
; 
 216     else if (pParser
.Found(wxT("f"),&fname
)) 
 219         if (!file
.IsOpened()) { 
 220             wxLogError(wxT("Cannot open file %s"),fname
.c_str()); 
 223         if (!file
.ReadAll(&m_message
)) { 
 224             wxLogError(wxT("Cannot read conten of file %s"),fname
.c_str()); 
 227         m_sendType 
= SEND_MESSAGE
; 
 230     if (pParser
.Found(wxT("s"),&m_stressWorkers
)) 
 231         m_sendType 
= STRESS_TEST
; 
 233     m_host 
= wxT("127.0.0.1"); 
 234     pParser
.Found(wxT("H"),&m_host
); 
 235     return wxApp::OnCmdLineParsed(pParser
); 
 241     if (!wxApp::OnInit()) 
 243     srand(wxDateTime::Now().GetTicks()); 
 244     mTimer
.SetOwner(this); 
 245     m_statConnecting 
= 0; 
 248     m_statDisconnecting 
= 0; 
 264                     for (i 
= 0; i 
< m_stressWorkers
; i
++) { 
 265                         if (m_message
.empty()) 
 266                             StartWorker(THREADS
); 
 268                             StartWorker(THREADS
, m_message
); 
 272                     for (i 
= 0; i 
< m_stressWorkers
; i
++) { 
 273                         if (m_message
.empty()) 
 276                             StartWorker(EVENTS
, m_message
); 
 280                     for (i 
= 0; i 
< m_stressWorkers
; i
++) { 
 281                         if (m_message
.empty()) 
 282                             StartWorker(i 
% 5 == 0 ? THREADS 
: EVENTS
); 
 284                             StartWorker(i 
% 5 == 0 ? THREADS 
: EVENTS
, m_message
); 
 290             StartWorker(m_workMode
,m_message
); 
 293             StartWorker(m_workMode
); 
 296     mTimer
.Start(timeout_val
,true); 
 297     return wxApp::OnRun(); 
 303     for(EList::compatibility_iterator it 
= m_eventWorkers
.GetFirst(); it 
; it
->GetNext()) { 
 304         delete it
->GetData(); 
 309 // Create buffer to be sent by client. Buffer contains test indicator  
 310 // message size and place for data 
 311 // msgsize parameter contains size of data in bytes and  
 312 // if input value does not fit into 250 bytes then 
 313 // on exit is updated to new value that is multiply of 1024 bytes 
 315 Client::CreateBuffer(int* msgsize
) 
 319     //if message should have more than 256 bytes then set it as 
 320     //test3 for compatibility with GUI server sample 
 321     if ((*msgsize
) > 250)  
 323         //send at least one kb of data 
 324         int size 
= (*msgsize
)/1024 + 1; 
 325         //returned buffer will contain test indicator, message size in kb and data 
 326         bufsize 
= size
*1024+2; 
 327         buf 
= new char[bufsize
]; 
 328         buf
[0] = (unsigned char)0xDE; //second byte contains size in kilobytes 
 329         buf
[1] = (char)(size
); 
 330         *msgsize 
= size
*1024; 
 334         //returned buffer will contain test indicator, message size in kb and data 
 335         bufsize 
= (*msgsize
)+2; 
 336         buf 
= new char[bufsize
]; 
 337         buf
[0] = (unsigned char)0xBE; //second byte contains size in bytes 
 338         buf
[1] = (char)(*msgsize
); 
 344 Client::StartWorker(workMode pMode
) { 
 345     int msgsize 
= 1 + (int) (250000.0 * (rand() / (RAND_MAX 
+ 1.0))); 
 346     char* buf 
= CreateBuffer(&msgsize
); 
 348     //fill data part of buffer with random bytes 
 349     for (int i 
= 2; i 
< (msgsize
); i
++) { 
 353     if (pMode 
== THREADS
) { 
 354         ThreadWorker
* c 
= new ThreadWorker(m_host
,buf
,msgsize
+2); 
 355         if (c
->Create() != wxTHREAD_NO_ERROR
) { 
 356             wxLogError(wxT("Cannot create more threads")); 
 359             m_threadWorkers
.Append(c
); 
 362         EventWorker
* e 
= new EventWorker(m_host
,buf
,msgsize
+2); 
 364         m_eventWorkers
.Append(e
); 
 370 Client::StartWorker(workMode pMode
, const wxString
& pMessage
) { 
 371     char* tmpbuf 
= wxStrdup(pMessage
.mb_str()); 
 372     int msgsize 
= strlen(tmpbuf
); 
 373     char* buf 
= CreateBuffer(&msgsize
); 
 374     memset(buf
+2,0x0,msgsize
); 
 375     memcpy(buf
+2,tmpbuf
,msgsize
); 
 378     if (pMode 
== THREADS
) { 
 379         ThreadWorker
* c 
= new ThreadWorker(m_host
,buf
,msgsize
+2); 
 380         if (c
->Create() != wxTHREAD_NO_ERROR
) { 
 381             wxLogError(wxT("Cannot create more threads")); 
 384             m_threadWorkers
.Append(c
); 
 387         EventWorker
* e 
= new EventWorker(m_host
,buf
,msgsize
+2); 
 389         m_eventWorkers
.Append(e
); 
 395 Client::OnWorkerEvent(WorkerEvent
& pEvent
) { 
 396     switch (pEvent
.m_eventType
) { 
 397         case WorkerEvent::CONNECTING
: 
 398             if (pEvent
.isFailed()) 
 404         case WorkerEvent::SENDING
: 
 405             if (pEvent
.isFailed()) 
 416         case WorkerEvent::RECEIVING
: 
 417             if (pEvent
.isFailed()) 
 428         case WorkerEvent::DISCONNECTING
: 
 429             if (pEvent
.isFailed()) 
 431                 m_statDisconnecting
--; 
 437                 m_statDisconnecting
++; 
 440         case WorkerEvent::DONE
: 
 442             m_statDisconnecting
--; 
 446     if (pEvent
.isFailed() || pEvent
.m_eventType 
== WorkerEvent::DONE
) 
 448         for(TList::compatibility_iterator it 
= m_threadWorkers
.GetFirst(); it 
; it 
= it
->GetNext()) { 
 449             if (it
->GetData() == pEvent
.m_sender
) { 
 450                 m_threadWorkers
.DeleteNode(it
); 
 454         for(EList::compatibility_iterator it2 
= m_eventWorkers
.GetFirst(); it2 
; it2 
= it2
->GetNext()) 
 456             if (it2
->GetData() == pEvent
.m_sender
) { 
 457                 delete it2
->GetData(); 
 458                 m_eventWorkers
.DeleteNode(it2
); 
 462         if ((m_threadWorkers
.GetCount() == 0) && (m_eventWorkers
.GetCount() == 0)) 
 471             mTimer
.Start(timeout_val
,true); 
 477 Client::RemoveEventWorker(EventWorker
* p_worker
) { 
 478     for(EList::compatibility_iterator it 
= m_eventWorkers
.GetFirst(); it 
; it 
= it
->GetNext()) { 
 479         if (it
->GetData() == p_worker
) { 
 480             //wxLogDebug(wxT("Deleting event worker")); 
 481             delete it
->GetData(); 
 482             m_eventWorkers
.DeleteNode(it
); 
 489 Client::dumpStatistics() { 
 491         wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"), 
 500     wxLogMessage(wxT("Current status:\n%s\n"),msg
.c_str()); 
 504 Client::OnTimerEvent(wxTimerEvent
&) { 
 508 BEGIN_EVENT_TABLE(Client
,wxEvtHandler
) 
 509     EVT_WORKER(Client::OnWorkerEvent
) 
 510     EVT_TIMER(wxID_ANY
,Client::OnTimerEvent
) 
 515 EventWorker::EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
) 
 522     m_clientSocket 
= new wxSocketClient(wxSOCKET_NOWAIT
); 
 523     m_clientSocket
->SetEventHandler(*this); 
 524     m_insize 
= m_outsize 
- 2; 
 525     m_inbuf 
= new char[m_insize
]; 
 533     m_clientSocket
->SetNotify(wxSOCKET_CONNECTION_FLAG
|wxSOCKET_LOST_FLAG
|wxSOCKET_OUTPUT_FLAG
|wxSOCKET_INPUT_FLAG
); 
 534     m_clientSocket
->Notify(true); 
 535     m_currentType 
= WorkerEvent::CONNECTING
; 
 537     //wxLogMessage(wxT("EventWorker: Connecting.....")); 
 538     m_clientSocket
->Connect(ca
,false); 
 542 EventWorker::OnSocketEvent(wxSocketEvent
& pEvent
) { 
 543     switch(pEvent
.GetSocketEvent()) { 
 545             //wxLogDebug(wxT("EventWorker: INPUT")); 
 547                 if (m_readed 
== m_insize
) 
 548                     return; //event already posted 
 549                 m_clientSocket
->Read(m_inbuf 
+ m_readed
, m_insize 
- m_readed
); 
 550                 if (m_clientSocket
->Error()) 
 552                     if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
) 
 554                         wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr
).c_str()); 
 559                 m_readed 
+= m_clientSocket
->LastCount(); 
 560                 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed); 
 561                 if (m_readed 
== m_insize
) 
 563                     if (!memcmp(m_inbuf
,m_outbuf
,m_insize
)) { 
 564                         wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr
).c_str()); 
 567                     m_currentType 
= WorkerEvent::DISCONNECTING
; 
 568                     wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr
).c_str()); 
 571                     //wxLogDebug(wxT("EventWorker %p closing"),this); 
 572                     m_clientSocket
->Close(); 
 574                     m_currentType 
= WorkerEvent::DONE
; 
 575                     wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr
).c_str()); 
 578             } while (!m_clientSocket
->Error()); 
 580         case wxSOCKET_OUTPUT
: 
 581             //wxLogDebug(wxT("EventWorker: OUTPUT")); 
 583                 if (m_written 
== m_outsize
) 
 587                     m_currentType 
= WorkerEvent::SENDING
; 
 588                     wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr
).c_str()); 
 590                 m_clientSocket
->Write(m_outbuf 
+ m_written
, m_outsize 
- m_written
); 
 591                 if (m_clientSocket
->Error()) 
 593                     if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
) { 
 594                         wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr
).c_str()); 
 598                 m_written 
+= m_clientSocket
->LastCount(); 
 599                 if (m_written 
!= m_outsize
) 
 601                     //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written); 
 605                     //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this); 
 606                     m_currentType 
= WorkerEvent::RECEIVING
; 
 607                     wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr
).c_str()); 
 610             } while(!m_clientSocket
->Error()); 
 612         case wxSOCKET_CONNECTION
: 
 614             //wxLogMessage(wxT("EventWorker: got connection")); 
 615             wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr
).c_str(),m_outsize
-2); 
 616             if (!m_clientSocket
->GetLocal(m_localaddr
)) 
 618                 wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket
); 
 620             m_currentType 
= WorkerEvent::SENDING
; 
 621             wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr
).c_str()); 
 627             wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr
).c_str()); 
 635 EventWorker::SendEvent(bool failed
) { 
 638     WorkerEvent 
e(this,m_currentType
); 
 639     if (failed
) e
.setFailed(); 
 640     wxGetApp().AddPendingEvent(e
); 
 641     m_doneSent 
= failed 
|| m_currentType 
== WorkerEvent::DONE
; 
 644 EventWorker::~EventWorker() { 
 645     m_clientSocket
->Destroy(); 
 650 BEGIN_EVENT_TABLE(EventWorker
,wxEvtHandler
) 
 651     EVT_SOCKET(wxID_ANY
,EventWorker::OnSocketEvent
) 
 655 ThreadWorker::ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
) 
 656   : wxThread(wxTHREAD_DETACHED
), 
 661     m_clientSocket 
= new wxSocketClient(wxSOCKET_BLOCK
|wxSOCKET_WAITALL
); 
 662     m_insize 
= m_outsize 
- 2; 
 663     m_inbuf 
= new char[m_insize
]; 
 666 wxThread::ExitCode 
ThreadWorker::Entry() 
 671     //wxLogDebug(wxT("ThreadWorker: Connecting.....")); 
 672     m_clientSocket
->SetTimeout(60); 
 674     WorkerEvent::evt_type etype 
= WorkerEvent::CONNECTING
; 
 675     if (!m_clientSocket
->Connect(ca
)) { 
 676         wxLogError(wxT("Cannot connect to %s:%d"),ca
.IPAddress().c_str(), ca
.Service()); 
 679         //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize); 
 680         etype 
= WorkerEvent::SENDING
; 
 681         WorkerEvent 
e(this,etype
); 
 682         wxGetApp().AddPendingEvent(e
); 
 683         int to_process 
= m_outsize
; 
 685             m_clientSocket
->Write(m_outbuf
,m_outsize
); 
 686             if (m_clientSocket
->Error()) { 
 687                 wxLogError(wxT("ThreadWorker: Write error")); 
 690             to_process 
-= m_clientSocket
->LastCount(); 
 691             //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process); 
 692         } while(!m_clientSocket
->Error() && to_process 
!= 0); 
 695             etype 
= WorkerEvent::RECEIVING
; 
 696             WorkerEvent 
e(this,etype
); 
 697             wxGetApp().AddPendingEvent(e
); 
 698             to_process 
= m_insize
; 
 700                 m_clientSocket
->Read(m_inbuf
,m_insize
); 
 701                 if (m_clientSocket
->Error()) { 
 702                     wxLogError(wxT("ThreadWorker: Read error")); 
 706                 to_process 
-= m_clientSocket
->LastCount(); 
 707                 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process); 
 708             } while(!m_clientSocket
->Error() && to_process 
!= 0); 
 711         char* outdat 
= (char*)m_outbuf
+2; 
 712         if (!failed 
&& (memcmp(m_inbuf
,outdat
,m_insize
) != 0)) 
 714             wxLogError(wxT("Data mismatch")); 
 718     //wxLogDebug(wxT("ThreadWorker: Finished")); 
 720         etype 
= WorkerEvent::DISCONNECTING
; 
 721         WorkerEvent 
e(this,etype
); 
 722         wxGetApp().AddPendingEvent(e
); 
 724     m_clientSocket
->Close(); 
 725     m_clientSocket
->Destroy(); 
 726     m_clientSocket 
= NULL
; 
 730         etype 
= WorkerEvent::DONE
; 
 731     WorkerEvent 
e(this,etype
); 
 732     if (failed
) e
.setFailed(); 
 733     wxGetApp().AddPendingEvent(e
);