1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
8 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@sf.net>
9 // Licence: wxWindows licence
10 /////////////////////////////////////////////////////////////////////////////
12 // ============================================================================
14 // ============================================================================
16 // ----------------------------------------------------------------------------
18 // ----------------------------------------------------------------------------
21 #include "wx/socket.h"
24 #include "wx/cmdline.h"
26 #include "wx/datetime.h"
28 #include "wx/thread.h"
30 const wxEventType wxEVT_WORKER
= wxNewEventType();
31 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
33 const int timeout_val
= 1000;
35 class WorkerEvent
: public wxEvent
{
44 WorkerEvent(void* pSender
, evt_type type
)
47 SetEventType(wxEVT_WORKER
);
53 void setFailed() { m_isFailed
= true; }
54 bool isFailed() const { return m_isFailed
; }
56 virtual wxEvent
* Clone() const
58 return new WorkerEvent(*this);
62 wxString m_workerIdent
;
66 typedef void (wxEvtHandler::*WorkerEventFunction
)(WorkerEvent
&);
71 WX_DECLARE_LIST(ThreadWorker
, TList
);
72 WX_DECLARE_LIST(EventWorker
, EList
);
74 class Client
: public wxApp
{
77 void RemoveEventWorker(EventWorker
* p_worker
);
98 virtual bool OnInit();
100 virtual int OnExit();
101 void OnInitCmdLine(wxCmdLineParser
& pParser
);
102 bool OnCmdLineParsed(wxCmdLineParser
& pParser
);
103 void OnWorkerEvent(WorkerEvent
& pEvent
);
104 void OnTimerEvent(wxTimerEvent
& pEvent
);
106 void StartWorker(workMode pMode
, const wxString
& pMessage
);
107 void StartWorker(workMode pMode
);
108 char* CreateBuffer(int *msgsize
);
110 void dumpStatistics();
112 TList m_threadWorkers
;
113 EList m_eventWorkers
;
115 unsigned m_statConnecting
;
116 unsigned m_statSending
;
117 unsigned m_statReceiving
;
118 unsigned m_statDisconnecting
;
120 unsigned m_statFailed
;
127 class ThreadWorker
: public wxThread
130 ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
);
131 virtual ExitCode
Entry();
134 wxSocketClient
* m_clientSocket
;
139 wxString m_workerIdent
;
142 class EventWorker
: public wxEvtHandler
144 DECLARE_EVENT_TABLE()
146 EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
);
148 virtual ~EventWorker();
151 wxSocketClient
* m_clientSocket
;
159 WorkerEvent::evt_type m_currentType
;
161 wxIPV4address m_localaddr
;
163 void OnSocketEvent(wxSocketEvent
& pEvent
);
164 void SendEvent(bool failed
);
167 /******************* Implementation ******************/
168 IMPLEMENT_APP_CONSOLE(Client
);
170 #include <wx/listimpl.cpp>
171 WX_DEFINE_LIST(TList
);
172 WX_DEFINE_LIST(EList
);
175 CreateIdent(const wxIPV4address
& addr
)
177 return wxString::Format(wxT("%s:%d"),addr
.IPAddress().c_str(),addr
.Service());
181 Client::OnInitCmdLine(wxCmdLineParser
& pParser
)
183 wxApp::OnInitCmdLine(pParser
);
184 pParser
.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL
);
185 pParser
.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL
);
186 pParser
.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL
);
187 pParser
.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
188 pParser
.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
189 pParser
.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
190 pParser
.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER
,wxCMD_LINE_PARAM_OPTIONAL
);
195 Client::OnCmdLineParsed(wxCmdLineParser
& pParser
)
199 m_stressWorkers
= 50;
201 if (pParser
.Found(_("verbose")))
203 wxLog::AddTraceMask(wxT("wxSocket"));
204 wxLog::AddTraceMask(wxT("epolldispatcher"));
205 wxLog::AddTraceMask(wxT("selectdispatcher"));
206 wxLog::AddTraceMask(wxT("thread"));
207 wxLog::AddTraceMask(wxT("events"));
210 if (pParser
.Found(wxT("t")))
211 m_workMode
= THREADS
;
212 m_sendType
= SEND_RANDOM
;
214 if (pParser
.Found(wxT("m"),&m_message
))
215 m_sendType
= SEND_MESSAGE
;
216 else if (pParser
.Found(wxT("f"),&fname
))
219 if (!file
.IsOpened()) {
220 wxLogError(wxT("Cannot open file %s"),fname
.c_str());
223 if (!file
.ReadAll(&m_message
)) {
224 wxLogError(wxT("Cannot read conten of file %s"),fname
.c_str());
227 m_sendType
= SEND_MESSAGE
;
230 if (pParser
.Found(wxT("s"),&m_stressWorkers
))
231 m_sendType
= STRESS_TEST
;
233 m_host
= wxT("127.0.0.1");
234 pParser
.Found(wxT("H"),&m_host
);
235 return wxApp::OnCmdLineParsed(pParser
);
241 if (!wxApp::OnInit())
243 srand(wxDateTime::Now().GetTicks());
244 mTimer
.SetOwner(this);
245 m_statConnecting
= 0;
248 m_statDisconnecting
= 0;
264 for (i
= 0; i
< m_stressWorkers
; i
++) {
265 if (m_message
.empty())
266 StartWorker(THREADS
);
268 StartWorker(THREADS
, m_message
);
272 for (i
= 0; i
< m_stressWorkers
; i
++) {
273 if (m_message
.empty())
276 StartWorker(EVENTS
, m_message
);
280 for (i
= 0; i
< m_stressWorkers
; i
++) {
281 if (m_message
.empty())
282 StartWorker(i
% 5 == 0 ? THREADS
: EVENTS
);
284 StartWorker(i
% 5 == 0 ? THREADS
: EVENTS
, m_message
);
290 StartWorker(m_workMode
,m_message
);
293 StartWorker(m_workMode
);
296 mTimer
.Start(timeout_val
,true);
297 return wxApp::OnRun();
303 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
->GetNext()) {
304 delete it
->GetData();
309 // Create buffer to be sent by client. Buffer contains test indicator
310 // message size and place for data
311 // msgsize parameter contains size of data in bytes and
312 // if input value does not fit into 250 bytes then
313 // on exit is updated to new value that is multiply of 1024 bytes
315 Client::CreateBuffer(int* msgsize
)
319 //if message should have more than 256 bytes then set it as
320 //test3 for compatibility with GUI server sample
321 if ((*msgsize
) > 250)
323 //send at least one kb of data
324 int size
= (*msgsize
)/1024 + 1;
325 //returned buffer will contain test indicator, message size in kb and data
326 bufsize
= size
*1024+2;
327 buf
= new char[bufsize
];
328 buf
[0] = (unsigned char)0xDE; //second byte contains size in kilobytes
329 buf
[1] = (char)(size
);
330 *msgsize
= size
*1024;
334 //returned buffer will contain test indicator, message size in kb and data
335 bufsize
= (*msgsize
)+2;
336 buf
= new char[bufsize
];
337 buf
[0] = (unsigned char)0xBE; //second byte contains size in bytes
338 buf
[1] = (char)(*msgsize
);
344 Client::StartWorker(workMode pMode
) {
345 int msgsize
= 1 + (int) (250000.0 * (rand() / (RAND_MAX
+ 1.0)));
346 char* buf
= CreateBuffer(&msgsize
);
348 //fill data part of buffer with random bytes
349 for (int i
= 2; i
< (msgsize
); i
++) {
353 if (pMode
== THREADS
) {
354 ThreadWorker
* c
= new ThreadWorker(m_host
,buf
,msgsize
+2);
355 if (c
->Create() != wxTHREAD_NO_ERROR
) {
356 wxLogError(wxT("Cannot create more threads"));
359 m_threadWorkers
.Append(c
);
362 EventWorker
* e
= new EventWorker(m_host
,buf
,msgsize
+2);
364 m_eventWorkers
.Append(e
);
370 Client::StartWorker(workMode pMode
, const wxString
& pMessage
) {
371 char* tmpbuf
= wxStrdup(pMessage
.mb_str());
372 int msgsize
= strlen(tmpbuf
);
373 char* buf
= CreateBuffer(&msgsize
);
374 memset(buf
+2,0x0,msgsize
);
375 memcpy(buf
+2,tmpbuf
,msgsize
);
378 if (pMode
== THREADS
) {
379 ThreadWorker
* c
= new ThreadWorker(m_host
,buf
,msgsize
+2);
380 if (c
->Create() != wxTHREAD_NO_ERROR
) {
381 wxLogError(wxT("Cannot create more threads"));
384 m_threadWorkers
.Append(c
);
387 EventWorker
* e
= new EventWorker(m_host
,buf
,msgsize
+2);
389 m_eventWorkers
.Append(e
);
395 Client::OnWorkerEvent(WorkerEvent
& pEvent
) {
396 switch (pEvent
.m_eventType
) {
397 case WorkerEvent::CONNECTING
:
398 if (pEvent
.isFailed())
404 case WorkerEvent::SENDING
:
405 if (pEvent
.isFailed())
416 case WorkerEvent::RECEIVING
:
417 if (pEvent
.isFailed())
428 case WorkerEvent::DISCONNECTING
:
429 if (pEvent
.isFailed())
431 m_statDisconnecting
--;
437 m_statDisconnecting
++;
440 case WorkerEvent::DONE
:
442 m_statDisconnecting
--;
446 if (pEvent
.isFailed() || pEvent
.m_eventType
== WorkerEvent::DONE
)
448 for(TList::compatibility_iterator it
= m_threadWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
449 if (it
->GetData() == pEvent
.m_sender
) {
450 m_threadWorkers
.DeleteNode(it
);
454 for(EList::compatibility_iterator it2
= m_eventWorkers
.GetFirst(); it2
; it2
= it2
->GetNext())
456 if (it2
->GetData() == pEvent
.m_sender
) {
457 delete it2
->GetData();
458 m_eventWorkers
.DeleteNode(it2
);
462 if ((m_threadWorkers
.GetCount() == 0) && (m_eventWorkers
.GetCount() == 0))
471 mTimer
.Start(timeout_val
,true);
477 Client::RemoveEventWorker(EventWorker
* p_worker
) {
478 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
479 if (it
->GetData() == p_worker
) {
480 //wxLogDebug(wxT("Deleting event worker"));
481 delete it
->GetData();
482 m_eventWorkers
.DeleteNode(it
);
489 Client::dumpStatistics() {
491 wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
500 wxLogMessage(wxT("Current status:\n%s\n"),msg
.c_str());
504 Client::OnTimerEvent(wxTimerEvent
&) {
508 BEGIN_EVENT_TABLE(Client
,wxEvtHandler
)
509 EVT_WORKER(Client::OnWorkerEvent
)
510 EVT_TIMER(wxID_ANY
,Client::OnTimerEvent
)
515 EventWorker::EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
)
522 m_clientSocket
= new wxSocketClient(wxSOCKET_NOWAIT
);
523 m_clientSocket
->SetEventHandler(*this);
524 m_insize
= m_outsize
- 2;
525 m_inbuf
= new char[m_insize
];
533 m_clientSocket
->SetNotify(wxSOCKET_CONNECTION_FLAG
|wxSOCKET_LOST_FLAG
|wxSOCKET_OUTPUT_FLAG
|wxSOCKET_INPUT_FLAG
);
534 m_clientSocket
->Notify(true);
535 m_currentType
= WorkerEvent::CONNECTING
;
537 //wxLogMessage(wxT("EventWorker: Connecting....."));
538 m_clientSocket
->Connect(ca
,false);
542 EventWorker::OnSocketEvent(wxSocketEvent
& pEvent
) {
543 switch(pEvent
.GetSocketEvent()) {
545 //wxLogDebug(wxT("EventWorker: INPUT"));
547 if (m_readed
== m_insize
)
548 return; //event already posted
549 m_clientSocket
->Read(m_inbuf
+ m_readed
, m_insize
- m_readed
);
550 if (m_clientSocket
->Error())
552 if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
)
554 wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr
).c_str());
559 m_readed
+= m_clientSocket
->LastCount();
560 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
561 if (m_readed
== m_insize
)
563 if (!memcmp(m_inbuf
,m_outbuf
,m_insize
)) {
564 wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr
).c_str());
567 m_currentType
= WorkerEvent::DISCONNECTING
;
568 wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr
).c_str());
571 //wxLogDebug(wxT("EventWorker %p closing"),this);
572 m_clientSocket
->Close();
574 m_currentType
= WorkerEvent::DONE
;
575 wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr
).c_str());
578 } while (!m_clientSocket
->Error());
580 case wxSOCKET_OUTPUT
:
581 //wxLogDebug(wxT("EventWorker: OUTPUT"));
583 if (m_written
== m_outsize
)
587 m_currentType
= WorkerEvent::SENDING
;
588 wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr
).c_str());
590 m_clientSocket
->Write(m_outbuf
+ m_written
, m_outsize
- m_written
);
591 if (m_clientSocket
->Error())
593 if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
) {
594 wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr
).c_str());
598 m_written
+= m_clientSocket
->LastCount();
599 if (m_written
!= m_outsize
)
601 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
605 //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
606 m_currentType
= WorkerEvent::RECEIVING
;
607 wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr
).c_str());
610 } while(!m_clientSocket
->Error());
612 case wxSOCKET_CONNECTION
:
614 //wxLogMessage(wxT("EventWorker: got connection"));
615 wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr
).c_str(),m_outsize
-2);
616 if (!m_clientSocket
->GetLocal(m_localaddr
))
618 wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket
);
620 m_currentType
= WorkerEvent::SENDING
;
621 wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr
).c_str());
627 wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr
).c_str());
635 EventWorker::SendEvent(bool failed
) {
638 WorkerEvent
e(this,m_currentType
);
639 if (failed
) e
.setFailed();
640 wxGetApp().AddPendingEvent(e
);
641 m_doneSent
= failed
|| m_currentType
== WorkerEvent::DONE
;
644 EventWorker::~EventWorker() {
645 m_clientSocket
->Destroy();
650 BEGIN_EVENT_TABLE(EventWorker
,wxEvtHandler
)
651 EVT_SOCKET(wxID_ANY
,EventWorker::OnSocketEvent
)
655 ThreadWorker::ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
)
656 : wxThread(wxTHREAD_DETACHED
),
661 m_clientSocket
= new wxSocketClient(wxSOCKET_BLOCK
|wxSOCKET_WAITALL
);
662 m_insize
= m_outsize
- 2;
663 m_inbuf
= new char[m_insize
];
666 wxThread::ExitCode
ThreadWorker::Entry()
671 //wxLogDebug(wxT("ThreadWorker: Connecting....."));
672 m_clientSocket
->SetTimeout(60);
674 WorkerEvent::evt_type etype
= WorkerEvent::CONNECTING
;
675 if (!m_clientSocket
->Connect(ca
)) {
676 wxLogError(wxT("Cannot connect to %s:%d"),ca
.IPAddress().c_str(), ca
.Service());
679 //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
680 etype
= WorkerEvent::SENDING
;
681 WorkerEvent
e(this,etype
);
682 wxGetApp().AddPendingEvent(e
);
683 int to_process
= m_outsize
;
685 m_clientSocket
->Write(m_outbuf
,m_outsize
);
686 if (m_clientSocket
->Error()) {
687 wxLogError(wxT("ThreadWorker: Write error"));
690 to_process
-= m_clientSocket
->LastCount();
691 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
692 } while(!m_clientSocket
->Error() && to_process
!= 0);
695 etype
= WorkerEvent::RECEIVING
;
696 WorkerEvent
e(this,etype
);
697 wxGetApp().AddPendingEvent(e
);
698 to_process
= m_insize
;
700 m_clientSocket
->Read(m_inbuf
,m_insize
);
701 if (m_clientSocket
->Error()) {
702 wxLogError(wxT("ThreadWorker: Read error"));
706 to_process
-= m_clientSocket
->LastCount();
707 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
708 } while(!m_clientSocket
->Error() && to_process
!= 0);
711 char* outdat
= (char*)m_outbuf
+2;
712 if (!failed
&& (memcmp(m_inbuf
,outdat
,m_insize
) != 0))
714 wxLogError(wxT("Data mismatch"));
718 //wxLogDebug(wxT("ThreadWorker: Finished"));
720 etype
= WorkerEvent::DISCONNECTING
;
721 WorkerEvent
e(this,etype
);
722 wxGetApp().AddPendingEvent(e
);
724 m_clientSocket
->Close();
725 m_clientSocket
->Destroy();
726 m_clientSocket
= NULL
;
730 etype
= WorkerEvent::DONE
;
731 WorkerEvent
e(this,etype
);
732 if (failed
) e
.setFailed();
733 wxGetApp().AddPendingEvent(e
);