1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
8 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@sf.net>
9 // Licence: wxWindows license
10 /////////////////////////////////////////////////////////////////////////////
12 // ============================================================================
14 // ============================================================================
16 // ----------------------------------------------------------------------------
18 // ----------------------------------------------------------------------------
21 #include "wx/socket.h"
24 #include "wx/cmdline.h"
26 #include "wx/datetime.h"
28 #include "wx/thread.h"
30 const wxEventType wxEVT_WORKER
= wxNewEventType();
31 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
33 const int timeout_val
= 1000;
35 class WorkerEvent
: public wxEvent
{
44 WorkerEvent(void* pSender
, evt_type type
)
47 SetEventType(wxEVT_WORKER
);
53 void setFailed() { m_isFailed
= true; }
54 bool isFailed() const { return m_isFailed
; }
56 virtual wxEvent
* Clone() const
58 return new WorkerEvent(*this);
62 wxString m_workerIdent
;
66 typedef void (wxEvtHandler::*WorkerEventFunction
)(WorkerEvent
&);
71 WX_DECLARE_LIST(ThreadWorker
, TList
);
72 WX_DECLARE_LIST(EventWorker
, EList
);
74 class Client
: public wxApp
{
75 DECLARE_EVENT_TABLE();
77 void RemoveEventWorker(EventWorker
* p_worker
);
98 virtual bool OnInit();
100 virtual int OnExit();
101 void OnInitCmdLine(wxCmdLineParser
& pParser
);
102 bool OnCmdLineParsed(wxCmdLineParser
& pParser
);
103 void OnWorkerEvent(WorkerEvent
& pEvent
);
104 void OnTimerEvent(wxTimerEvent
& pEvent
);
106 void StartWorker(workMode pMode
, const wxString
& pMessage
);
107 void StartWorker(workMode pMode
);
108 char* CreateBuffer(int *msgsize
);
110 void dumpStatistics();
112 TList m_threadWorkers
;
113 EList m_eventWorkers
;
115 unsigned m_statConnecting
;
116 unsigned m_statSending
;
117 unsigned m_statReceiving
;
118 unsigned m_statDisconnecting
;
120 unsigned m_statFailed
;
127 class ThreadWorker
: public wxThread
130 ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
);
131 virtual ExitCode
Entry();
134 wxSocketClient
* m_clientSocket
;
139 wxString m_workerIdent
;
142 class EventWorker
: public wxEvtHandler
144 DECLARE_EVENT_TABLE();
146 EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
);
148 virtual ~EventWorker();
151 wxSocketClient
* m_clientSocket
;
159 WorkerEvent::evt_type m_currentType
;
161 wxIPV4address m_localaddr
;
163 void OnSocketEvent(wxSocketEvent
& pEvent
);
164 void SendEvent(bool failed
);
167 /******************* Implementation ******************/
168 IMPLEMENT_APP_CONSOLE(Client
);
170 #include <wx/listimpl.cpp>
171 WX_DEFINE_LIST(TList
);
172 WX_DEFINE_LIST(EList
);
175 CreateIdent(const wxIPV4address
& addr
)
177 return wxString::Format(wxT("%s:%d"),addr
.IPAddress().c_str(),addr
.Service());
181 Client::OnInitCmdLine(wxCmdLineParser
& pParser
)
183 wxApp::OnInitCmdLine(pParser
);
184 pParser
.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL
);
185 pParser
.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL
);
186 pParser
.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL
);
187 pParser
.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
188 pParser
.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
189 pParser
.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
190 pParser
.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER
,wxCMD_LINE_PARAM_OPTIONAL
);
195 Client::OnCmdLineParsed(wxCmdLineParser
& pParser
)
199 m_stressWorkers
= 50;
201 if (pParser
.Found(_("verbose")))
203 wxLog::AddTraceMask(wxT("wxSocket"));
204 wxLog::AddTraceMask(wxT("epolldispatcher"));
205 wxLog::AddTraceMask(wxT("selectdispatcher"));
206 wxLog::AddTraceMask(wxT("thread"));
207 wxLog::AddTraceMask(wxT("events"));
210 if (pParser
.Found(wxT("t")))
211 m_workMode
= THREADS
;
212 m_sendType
= SEND_RANDOM
;
214 if (pParser
.Found(wxT("m"),&m_message
))
215 m_sendType
= SEND_MESSAGE
;
216 else if (pParser
.Found(wxT("f"),&fname
))
219 if (!file
.IsOpened()) {
220 wxLogError(wxT("Cannot open file %s"),fname
.c_str());
223 if (!file
.ReadAll(&m_message
)) {
224 wxLogError(wxT("Cannot read conten of file %s"),fname
.c_str());
227 m_sendType
= SEND_MESSAGE
;
230 if (pParser
.Found(wxT("s"),&m_stressWorkers
))
231 m_sendType
= STRESS_TEST
;
233 m_host
= wxT("127.0.0.1");
234 pParser
.Found(wxT("H"),&m_host
);
235 return wxApp::OnCmdLineParsed(pParser
);
241 if (!wxApp::OnInit())
243 srand(wxDateTime::Now().GetTicks());
244 mTimer
.SetOwner(this);
245 m_statConnecting
= 0;
248 m_statDisconnecting
= 0;
263 for (int i
= 0; i
< m_stressWorkers
; i
++) {
264 if (m_message
.empty())
265 StartWorker(THREADS
);
267 StartWorker(THREADS
, m_message
);
271 for (int i
= 0; i
< m_stressWorkers
; i
++) {
272 if (m_message
.empty())
275 StartWorker(EVENTS
, m_message
);
279 for (int i
= 0; i
< m_stressWorkers
; i
++) {
280 if (m_message
.empty())
281 StartWorker(i
% 5 == 0 ? THREADS
: EVENTS
);
283 StartWorker(i
% 5 == 0 ? THREADS
: EVENTS
, m_message
);
289 StartWorker(m_workMode
,m_message
);
292 StartWorker(m_workMode
);
295 mTimer
.Start(timeout_val
,true);
296 return wxApp::OnRun();
302 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
->GetNext()) {
303 delete it
->GetData();
308 // Create buffer to be sent by client. Buffer contains test indicator
309 // message size and place for data
310 // msgsize parameter contains size of data in bytes and
311 // if input value does not fit into 250 bytes then
312 // on exit is updated to new value that is multiply of 1024 bytes
314 Client::CreateBuffer(int* msgsize
)
318 //if message should have more than 256 bytes then set it as
319 //test3 for compatibility with GUI server sample
320 if ((*msgsize
) > 250)
322 //send at least one kb of data
323 int size
= (*msgsize
)/1024 + 1;
324 //returned buffer will contain test indicator, message size in kb and data
325 bufsize
= size
*1024+2;
326 buf
= new char[bufsize
];
327 buf
[0] = 0xDE; //second byte contains size in kilobytes
328 buf
[1] = (char)(size
);
329 *msgsize
= size
*1024;
333 //returned buffer will contain test indicator, message size in kb and data
334 bufsize
= (*msgsize
)+2;
335 buf
= new char[bufsize
];
336 buf
[0] = 0xBE; //second byte contains size in bytes
337 buf
[1] = (char)(*msgsize
);
343 Client::StartWorker(workMode pMode
) {
344 int msgsize
= 1 + (int) (250000.0 * (rand() / (RAND_MAX
+ 1.0)));
345 char* buf
= CreateBuffer(&msgsize
);
347 //fill data part of buffer with random bytes
348 for (int i
= 2; i
< (msgsize
); i
++) {
352 if (pMode
== THREADS
) {
353 ThreadWorker
* c
= new ThreadWorker(m_host
,buf
,msgsize
+2);
354 if (c
->Create() != wxTHREAD_NO_ERROR
) {
355 wxLogError(wxT("Cannot create more threads"));
358 m_threadWorkers
.Append(c
);
361 EventWorker
* e
= new EventWorker(m_host
,buf
,msgsize
+2);
363 m_eventWorkers
.Append(e
);
369 Client::StartWorker(workMode pMode
, const wxString
& pMessage
) {
370 char* tmpbuf
= strdup(pMessage
.mb_str());
371 int msgsize
= strlen(tmpbuf
);
372 char* buf
= CreateBuffer(&msgsize
);
373 memset(buf
+2,0x0,msgsize
);
374 memcpy(buf
+2,tmpbuf
,msgsize
);
377 if (pMode
== THREADS
) {
378 ThreadWorker
* c
= new ThreadWorker(m_host
,buf
,msgsize
+2);
379 if (c
->Create() != wxTHREAD_NO_ERROR
) {
380 wxLogError(wxT("Cannot create more threads"));
383 m_threadWorkers
.Append(c
);
386 EventWorker
* e
= new EventWorker(m_host
,buf
,msgsize
+2);
388 m_eventWorkers
.Append(e
);
394 Client::OnWorkerEvent(WorkerEvent
& pEvent
) {
395 switch (pEvent
.m_eventType
) {
396 case WorkerEvent::CONNECTING
:
397 if (pEvent
.isFailed())
403 case WorkerEvent::SENDING
:
404 if (pEvent
.isFailed())
415 case WorkerEvent::RECEIVING
:
416 if (pEvent
.isFailed())
427 case WorkerEvent::DISCONNECTING
:
428 if (pEvent
.isFailed())
430 m_statDisconnecting
--;
436 m_statDisconnecting
++;
439 case WorkerEvent::DONE
:
441 m_statDisconnecting
--;
445 if (pEvent
.isFailed() || pEvent
.m_eventType
== WorkerEvent::DONE
)
447 for(TList::compatibility_iterator it
= m_threadWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
448 if (it
->GetData() == pEvent
.m_sender
) {
449 m_threadWorkers
.DeleteNode(it
);
453 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
= it
->GetNext())
455 if (it
->GetData() == pEvent
.m_sender
) {
456 delete it
->GetData();
457 m_eventWorkers
.DeleteNode(it
);
461 if ((m_threadWorkers
.GetCount() == 0) && (m_eventWorkers
.GetCount() == 0))
470 mTimer
.Start(timeout_val
,true);
476 Client::RemoveEventWorker(EventWorker
* p_worker
) {
477 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
478 if (it
->GetData() == p_worker
) {
479 //wxLogDebug(wxT("Deleting event worker"));
480 delete it
->GetData();
481 m_eventWorkers
.DeleteNode(it
);
488 Client::dumpStatistics() {
490 wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
499 wxLogMessage(wxT("Current status:\n%s\n"),msg
.c_str());
503 Client::OnTimerEvent(wxTimerEvent
&) {
507 BEGIN_EVENT_TABLE(Client
,wxEvtHandler
)
508 EVT_WORKER(Client::OnWorkerEvent
)
509 EVT_TIMER(wxID_ANY
,Client::OnTimerEvent
)
514 EventWorker::EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
)
521 m_clientSocket
= new wxSocketClient(wxSOCKET_NOWAIT
);
522 m_clientSocket
->SetEventHandler(*this);
523 m_insize
= m_outsize
- 2;
524 m_inbuf
= new char[m_insize
];
532 m_clientSocket
->SetNotify(wxSOCKET_CONNECTION_FLAG
|wxSOCKET_LOST_FLAG
|wxSOCKET_OUTPUT_FLAG
|wxSOCKET_INPUT_FLAG
);
533 m_clientSocket
->Notify(true);
534 m_currentType
= WorkerEvent::CONNECTING
;
536 //wxLogMessage(wxT("EventWorker: Connecting....."));
537 m_clientSocket
->Connect(ca
,false);
541 EventWorker::OnSocketEvent(wxSocketEvent
& pEvent
) {
542 switch(pEvent
.GetSocketEvent()) {
544 //wxLogDebug(wxT("EventWorker: INPUT"));
546 if (m_readed
== m_insize
)
547 return; //event already posted
548 m_clientSocket
->Read(m_inbuf
+ m_readed
, m_insize
- m_readed
);
549 if (m_clientSocket
->Error())
551 if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
)
553 wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr
).c_str());
558 m_readed
+= m_clientSocket
->LastCount();
559 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
560 if (m_readed
== m_insize
)
562 if (!memcmp(m_inbuf
,m_outbuf
,m_insize
)) {
563 wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr
).c_str());
566 m_currentType
= WorkerEvent::DISCONNECTING
;
567 wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr
).c_str());
570 //wxLogDebug(wxT("EventWorker %p closing"),this);
571 m_clientSocket
->Close();
573 m_currentType
= WorkerEvent::DONE
;
574 wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr
).c_str());
577 } while (!m_clientSocket
->Error());
579 case wxSOCKET_OUTPUT
:
580 //wxLogDebug(wxT("EventWorker: OUTPUT"));
582 if (m_written
== m_outsize
)
586 m_currentType
= WorkerEvent::SENDING
;
587 wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr
).c_str());
589 m_clientSocket
->Write(m_outbuf
+ m_written
, m_outsize
- m_written
);
590 if (m_clientSocket
->Error())
592 if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
) {
593 wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr
).c_str());
597 m_written
+= m_clientSocket
->LastCount();
598 if (m_written
!= m_outsize
)
600 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
604 //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
605 m_currentType
= WorkerEvent::RECEIVING
;
606 wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr
).c_str());
609 } while(!m_clientSocket
->Error());
611 case wxSOCKET_CONNECTION
:
613 //wxLogMessage(wxT("EventWorker: got connection"));
614 wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr
).c_str(),m_outsize
-2);
615 if (!m_clientSocket
->GetLocal(m_localaddr
))
616 wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket
);
617 m_currentType
= WorkerEvent::SENDING
;
618 wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr
).c_str());
624 wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr
).c_str());
632 EventWorker::SendEvent(bool failed
) {
635 WorkerEvent
e(this,m_currentType
);
636 if (failed
) e
.setFailed();
637 wxGetApp().AddPendingEvent(e
);
638 m_doneSent
= failed
|| m_currentType
== WorkerEvent::DONE
;
641 EventWorker::~EventWorker() {
642 m_clientSocket
->Destroy();
647 BEGIN_EVENT_TABLE(EventWorker
,wxEvtHandler
)
648 EVT_SOCKET(wxID_ANY
,EventWorker::OnSocketEvent
)
652 ThreadWorker::ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
)
653 : wxThread(wxTHREAD_DETACHED
),
658 m_clientSocket
= new wxSocketClient(wxSOCKET_BLOCK
|wxSOCKET_WAITALL
);
659 m_insize
= m_outsize
- 2;
660 m_inbuf
= new char[m_insize
];
663 wxThread::ExitCode
ThreadWorker::Entry()
668 //wxLogDebug(wxT("ThreadWorker: Connecting....."));
669 m_clientSocket
->SetTimeout(60);
671 WorkerEvent::evt_type etype
= WorkerEvent::CONNECTING
;
672 if (!m_clientSocket
->Connect(ca
)) {
673 wxLogError(wxT("Cannot connect to %s:%d"),ca
.IPAddress().c_str(), ca
.Service());
676 //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
677 etype
= WorkerEvent::SENDING
;
678 WorkerEvent
e(this,etype
);
679 wxGetApp().AddPendingEvent(e
);
680 int to_process
= m_outsize
;
682 m_clientSocket
->Write(m_outbuf
,m_outsize
);
683 if (m_clientSocket
->Error()) {
684 wxLogError(wxT("ThreadWorker: Write error"));
687 to_process
-= m_clientSocket
->LastCount();
688 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
689 } while(!m_clientSocket
->Error() && to_process
!= 0);
692 etype
= WorkerEvent::RECEIVING
;
693 WorkerEvent
e(this,etype
);
694 wxGetApp().AddPendingEvent(e
);
695 to_process
= m_insize
;
697 m_clientSocket
->Read(m_inbuf
,m_insize
);
698 if (m_clientSocket
->Error()) {
699 wxLogError(wxT("ThreadWorker: Read error"));
703 to_process
-= m_clientSocket
->LastCount();
704 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
705 } while(!m_clientSocket
->Error() && to_process
!= 0);
708 char* outdat
= (char*)m_outbuf
+2;
709 if (!failed
&& (memcmp(m_inbuf
,outdat
,m_insize
) != 0))
711 wxLogError(wxT("Data mismatch"));
715 //wxLogDebug(wxT("ThreadWorker: Finished"));
717 etype
= WorkerEvent::DISCONNECTING
;
718 WorkerEvent
e(this,etype
);
719 wxGetApp().AddPendingEvent(e
);
721 m_clientSocket
->Close();
722 m_clientSocket
->Destroy();
723 m_clientSocket
= NULL
;
727 etype
= WorkerEvent::DONE
;
728 WorkerEvent
e(this,etype
);
729 if (failed
) e
.setFailed();
730 wxGetApp().AddPendingEvent(e
);