1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
7 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@sf.net>
8 // Licence: wxWindows licence
9 /////////////////////////////////////////////////////////////////////////////
11 // ============================================================================
13 // ============================================================================
15 // ----------------------------------------------------------------------------
17 // ----------------------------------------------------------------------------
20 #include "wx/socket.h"
23 #include "wx/cmdline.h"
25 #include "wx/datetime.h"
27 #include "wx/thread.h"
29 const wxEventType wxEVT_WORKER
= wxNewEventType();
30 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
32 const int timeout_val
= 1000;
34 class WorkerEvent
: public wxEvent
{
43 WorkerEvent(void* pSender
, evt_type type
)
46 SetEventType(wxEVT_WORKER
);
52 void setFailed() { m_isFailed
= true; }
53 bool isFailed() const { return m_isFailed
; }
55 virtual wxEvent
* Clone() const
57 return new WorkerEvent(*this);
61 wxString m_workerIdent
;
65 typedef void (wxEvtHandler::*WorkerEventFunction
)(WorkerEvent
&);
70 WX_DECLARE_LIST(ThreadWorker
, TList
);
71 WX_DECLARE_LIST(EventWorker
, EList
);
73 class Client
: public wxApp
{
76 void RemoveEventWorker(EventWorker
* p_worker
);
97 virtual bool OnInit();
100 void OnInitCmdLine(wxCmdLineParser
& pParser
);
101 bool OnCmdLineParsed(wxCmdLineParser
& pParser
);
102 void OnWorkerEvent(WorkerEvent
& pEvent
);
103 void OnTimerEvent(wxTimerEvent
& pEvent
);
105 void StartWorker(workMode pMode
, const wxString
& pMessage
);
106 void StartWorker(workMode pMode
);
107 char* CreateBuffer(int *msgsize
);
109 void dumpStatistics();
111 TList m_threadWorkers
;
112 EList m_eventWorkers
;
114 unsigned m_statConnecting
;
115 unsigned m_statSending
;
116 unsigned m_statReceiving
;
117 unsigned m_statDisconnecting
;
119 unsigned m_statFailed
;
126 class ThreadWorker
: public wxThread
129 ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
);
130 virtual ExitCode
Entry();
133 wxSocketClient
* m_clientSocket
;
138 wxString m_workerIdent
;
141 class EventWorker
: public wxEvtHandler
143 DECLARE_EVENT_TABLE()
145 EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
);
147 virtual ~EventWorker();
150 wxSocketClient
* m_clientSocket
;
158 WorkerEvent::evt_type m_currentType
;
160 wxIPV4address m_localaddr
;
162 void OnSocketEvent(wxSocketEvent
& pEvent
);
163 void SendEvent(bool failed
);
166 /******************* Implementation ******************/
167 IMPLEMENT_APP_CONSOLE(Client
);
169 #include <wx/listimpl.cpp>
170 WX_DEFINE_LIST(TList
);
171 WX_DEFINE_LIST(EList
);
174 CreateIdent(const wxIPV4address
& addr
)
176 return wxString::Format(wxT("%s:%d"),addr
.IPAddress().c_str(),addr
.Service());
180 Client::OnInitCmdLine(wxCmdLineParser
& pParser
)
182 wxApp::OnInitCmdLine(pParser
);
183 pParser
.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL
);
184 pParser
.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL
);
185 pParser
.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL
);
186 pParser
.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
187 pParser
.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
188 pParser
.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING
,wxCMD_LINE_PARAM_OPTIONAL
);
189 pParser
.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER
,wxCMD_LINE_PARAM_OPTIONAL
);
194 Client::OnCmdLineParsed(wxCmdLineParser
& pParser
)
198 m_stressWorkers
= 50;
200 if (pParser
.Found(_("verbose")))
202 wxLog::AddTraceMask(wxT("wxSocket"));
203 wxLog::AddTraceMask(wxT("epolldispatcher"));
204 wxLog::AddTraceMask(wxT("selectdispatcher"));
205 wxLog::AddTraceMask(wxT("thread"));
206 wxLog::AddTraceMask(wxT("events"));
209 if (pParser
.Found(wxT("t")))
210 m_workMode
= THREADS
;
211 m_sendType
= SEND_RANDOM
;
213 if (pParser
.Found(wxT("m"),&m_message
))
214 m_sendType
= SEND_MESSAGE
;
215 else if (pParser
.Found(wxT("f"),&fname
))
218 if (!file
.IsOpened()) {
219 wxLogError(wxT("Cannot open file %s"),fname
.c_str());
222 if (!file
.ReadAll(&m_message
)) {
223 wxLogError(wxT("Cannot read conten of file %s"),fname
.c_str());
226 m_sendType
= SEND_MESSAGE
;
229 if (pParser
.Found(wxT("s"),&m_stressWorkers
))
230 m_sendType
= STRESS_TEST
;
232 m_host
= wxT("127.0.0.1");
233 pParser
.Found(wxT("H"),&m_host
);
234 return wxApp::OnCmdLineParsed(pParser
);
240 if (!wxApp::OnInit())
242 srand(wxDateTime::Now().GetTicks());
243 mTimer
.SetOwner(this);
244 m_statConnecting
= 0;
247 m_statDisconnecting
= 0;
263 for (i
= 0; i
< m_stressWorkers
; i
++) {
264 if (m_message
.empty())
265 StartWorker(THREADS
);
267 StartWorker(THREADS
, m_message
);
271 for (i
= 0; i
< m_stressWorkers
; i
++) {
272 if (m_message
.empty())
275 StartWorker(EVENTS
, m_message
);
279 for (i
= 0; i
< m_stressWorkers
; i
++) {
280 if (m_message
.empty())
281 StartWorker(i
% 5 == 0 ? THREADS
: EVENTS
);
283 StartWorker(i
% 5 == 0 ? THREADS
: EVENTS
, m_message
);
289 StartWorker(m_workMode
,m_message
);
292 StartWorker(m_workMode
);
295 mTimer
.Start(timeout_val
,true);
296 return wxApp::OnRun();
302 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
->GetNext()) {
303 delete it
->GetData();
308 // Create buffer to be sent by client. Buffer contains test indicator
309 // message size and place for data
310 // msgsize parameter contains size of data in bytes and
311 // if input value does not fit into 250 bytes then
312 // on exit is updated to new value that is multiply of 1024 bytes
314 Client::CreateBuffer(int* msgsize
)
318 //if message should have more than 256 bytes then set it as
319 //test3 for compatibility with GUI server sample
320 if ((*msgsize
) > 250)
322 //send at least one kb of data
323 int size
= (*msgsize
)/1024 + 1;
324 //returned buffer will contain test indicator, message size in kb and data
325 bufsize
= size
*1024+2;
326 buf
= new char[bufsize
];
327 buf
[0] = (unsigned char)0xDE; //second byte contains size in kilobytes
328 buf
[1] = (char)(size
);
329 *msgsize
= size
*1024;
333 //returned buffer will contain test indicator, message size in kb and data
334 bufsize
= (*msgsize
)+2;
335 buf
= new char[bufsize
];
336 buf
[0] = (unsigned char)0xBE; //second byte contains size in bytes
337 buf
[1] = (char)(*msgsize
);
343 Client::StartWorker(workMode pMode
) {
344 int msgsize
= 1 + (int) (250000.0 * (rand() / (RAND_MAX
+ 1.0)));
345 char* buf
= CreateBuffer(&msgsize
);
347 //fill data part of buffer with random bytes
348 for (int i
= 2; i
< (msgsize
); i
++) {
352 if (pMode
== THREADS
) {
353 ThreadWorker
* c
= new ThreadWorker(m_host
,buf
,msgsize
+2);
354 if (c
->Create() != wxTHREAD_NO_ERROR
) {
355 wxLogError(wxT("Cannot create more threads"));
358 m_threadWorkers
.Append(c
);
361 EventWorker
* e
= new EventWorker(m_host
,buf
,msgsize
+2);
363 m_eventWorkers
.Append(e
);
369 Client::StartWorker(workMode pMode
, const wxString
& pMessage
) {
370 char* tmpbuf
= wxStrdup(pMessage
.mb_str());
371 int msgsize
= strlen(tmpbuf
);
372 char* buf
= CreateBuffer(&msgsize
);
373 memset(buf
+2,0x0,msgsize
);
374 memcpy(buf
+2,tmpbuf
,msgsize
);
377 if (pMode
== THREADS
) {
378 ThreadWorker
* c
= new ThreadWorker(m_host
,buf
,msgsize
+2);
379 if (c
->Create() != wxTHREAD_NO_ERROR
) {
380 wxLogError(wxT("Cannot create more threads"));
383 m_threadWorkers
.Append(c
);
386 EventWorker
* e
= new EventWorker(m_host
,buf
,msgsize
+2);
388 m_eventWorkers
.Append(e
);
394 Client::OnWorkerEvent(WorkerEvent
& pEvent
) {
395 switch (pEvent
.m_eventType
) {
396 case WorkerEvent::CONNECTING
:
397 if (pEvent
.isFailed())
403 case WorkerEvent::SENDING
:
404 if (pEvent
.isFailed())
415 case WorkerEvent::RECEIVING
:
416 if (pEvent
.isFailed())
427 case WorkerEvent::DISCONNECTING
:
428 if (pEvent
.isFailed())
430 m_statDisconnecting
--;
436 m_statDisconnecting
++;
439 case WorkerEvent::DONE
:
441 m_statDisconnecting
--;
445 if (pEvent
.isFailed() || pEvent
.m_eventType
== WorkerEvent::DONE
)
447 for(TList::compatibility_iterator it
= m_threadWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
448 if (it
->GetData() == pEvent
.m_sender
) {
449 m_threadWorkers
.DeleteNode(it
);
453 for(EList::compatibility_iterator it2
= m_eventWorkers
.GetFirst(); it2
; it2
= it2
->GetNext())
455 if (it2
->GetData() == pEvent
.m_sender
) {
456 delete it2
->GetData();
457 m_eventWorkers
.DeleteNode(it2
);
461 if ((m_threadWorkers
.GetCount() == 0) && (m_eventWorkers
.GetCount() == 0))
470 mTimer
.Start(timeout_val
,true);
476 Client::RemoveEventWorker(EventWorker
* p_worker
) {
477 for(EList::compatibility_iterator it
= m_eventWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
478 if (it
->GetData() == p_worker
) {
479 //wxLogDebug(wxT("Deleting event worker"));
480 delete it
->GetData();
481 m_eventWorkers
.DeleteNode(it
);
488 Client::dumpStatistics() {
490 wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
499 wxLogMessage(wxT("Current status:\n%s\n"),msg
.c_str());
503 Client::OnTimerEvent(wxTimerEvent
&) {
507 BEGIN_EVENT_TABLE(Client
,wxEvtHandler
)
508 EVT_WORKER(Client::OnWorkerEvent
)
509 EVT_TIMER(wxID_ANY
,Client::OnTimerEvent
)
514 EventWorker::EventWorker(const wxString
& p_host
, char* p_buf
, int p_size
)
521 m_clientSocket
= new wxSocketClient(wxSOCKET_NOWAIT
);
522 m_clientSocket
->SetEventHandler(*this);
523 m_insize
= m_outsize
- 2;
524 m_inbuf
= new char[m_insize
];
532 m_clientSocket
->SetNotify(wxSOCKET_CONNECTION_FLAG
|wxSOCKET_LOST_FLAG
|wxSOCKET_OUTPUT_FLAG
|wxSOCKET_INPUT_FLAG
);
533 m_clientSocket
->Notify(true);
534 m_currentType
= WorkerEvent::CONNECTING
;
536 //wxLogMessage(wxT("EventWorker: Connecting....."));
537 m_clientSocket
->Connect(ca
,false);
541 EventWorker::OnSocketEvent(wxSocketEvent
& pEvent
) {
542 switch(pEvent
.GetSocketEvent()) {
544 //wxLogDebug(wxT("EventWorker: INPUT"));
546 if (m_readed
== m_insize
)
547 return; //event already posted
548 m_clientSocket
->Read(m_inbuf
+ m_readed
, m_insize
- m_readed
);
549 if (m_clientSocket
->Error())
551 if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
)
553 wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr
).c_str());
558 m_readed
+= m_clientSocket
->LastCount();
559 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
560 if (m_readed
== m_insize
)
562 if (!memcmp(m_inbuf
,m_outbuf
,m_insize
)) {
563 wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr
).c_str());
566 m_currentType
= WorkerEvent::DISCONNECTING
;
567 wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr
).c_str());
570 //wxLogDebug(wxT("EventWorker %p closing"),this);
571 m_clientSocket
->Close();
573 m_currentType
= WorkerEvent::DONE
;
574 wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr
).c_str());
577 } while (!m_clientSocket
->Error());
579 case wxSOCKET_OUTPUT
:
580 //wxLogDebug(wxT("EventWorker: OUTPUT"));
582 if (m_written
== m_outsize
)
586 m_currentType
= WorkerEvent::SENDING
;
587 wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr
).c_str());
589 m_clientSocket
->Write(m_outbuf
+ m_written
, m_outsize
- m_written
);
590 if (m_clientSocket
->Error())
592 if (m_clientSocket
->LastError() != wxSOCKET_WOULDBLOCK
) {
593 wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr
).c_str());
597 m_written
+= m_clientSocket
->LastCount();
598 if (m_written
!= m_outsize
)
600 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
604 //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
605 m_currentType
= WorkerEvent::RECEIVING
;
606 wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr
).c_str());
609 } while(!m_clientSocket
->Error());
611 case wxSOCKET_CONNECTION
:
613 //wxLogMessage(wxT("EventWorker: got connection"));
614 wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr
).c_str(),m_outsize
-2);
615 if (!m_clientSocket
->GetLocal(m_localaddr
))
617 wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket
);
619 m_currentType
= WorkerEvent::SENDING
;
620 wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr
).c_str());
626 wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr
).c_str());
634 EventWorker::SendEvent(bool failed
) {
637 WorkerEvent
e(this,m_currentType
);
638 if (failed
) e
.setFailed();
639 wxGetApp().AddPendingEvent(e
);
640 m_doneSent
= failed
|| m_currentType
== WorkerEvent::DONE
;
643 EventWorker::~EventWorker() {
644 m_clientSocket
->Destroy();
649 BEGIN_EVENT_TABLE(EventWorker
,wxEvtHandler
)
650 EVT_SOCKET(wxID_ANY
,EventWorker::OnSocketEvent
)
654 ThreadWorker::ThreadWorker(const wxString
& p_host
, char* p_buf
, int p_size
)
655 : wxThread(wxTHREAD_DETACHED
),
660 m_clientSocket
= new wxSocketClient(wxSOCKET_BLOCK
|wxSOCKET_WAITALL
);
661 m_insize
= m_outsize
- 2;
662 m_inbuf
= new char[m_insize
];
665 wxThread::ExitCode
ThreadWorker::Entry()
670 //wxLogDebug(wxT("ThreadWorker: Connecting....."));
671 m_clientSocket
->SetTimeout(60);
673 WorkerEvent::evt_type etype
= WorkerEvent::CONNECTING
;
674 if (!m_clientSocket
->Connect(ca
)) {
675 wxLogError(wxT("Cannot connect to %s:%d"),ca
.IPAddress().c_str(), ca
.Service());
678 //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
679 etype
= WorkerEvent::SENDING
;
680 WorkerEvent
e(this,etype
);
681 wxGetApp().AddPendingEvent(e
);
682 int to_process
= m_outsize
;
684 m_clientSocket
->Write(m_outbuf
,m_outsize
);
685 if (m_clientSocket
->Error()) {
686 wxLogError(wxT("ThreadWorker: Write error"));
689 to_process
-= m_clientSocket
->LastCount();
690 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
691 } while(!m_clientSocket
->Error() && to_process
!= 0);
694 etype
= WorkerEvent::RECEIVING
;
695 WorkerEvent
e(this,etype
);
696 wxGetApp().AddPendingEvent(e
);
697 to_process
= m_insize
;
699 m_clientSocket
->Read(m_inbuf
,m_insize
);
700 if (m_clientSocket
->Error()) {
701 wxLogError(wxT("ThreadWorker: Read error"));
705 to_process
-= m_clientSocket
->LastCount();
706 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
707 } while(!m_clientSocket
->Error() && to_process
!= 0);
710 char* outdat
= (char*)m_outbuf
+2;
711 if (!failed
&& (memcmp(m_inbuf
,outdat
,m_insize
) != 0))
713 wxLogError(wxT("Data mismatch"));
717 //wxLogDebug(wxT("ThreadWorker: Finished"));
719 etype
= WorkerEvent::DISCONNECTING
;
720 WorkerEvent
e(this,etype
);
721 wxGetApp().AddPendingEvent(e
);
723 m_clientSocket
->Close();
724 m_clientSocket
->Destroy();
725 m_clientSocket
= NULL
;
729 etype
= WorkerEvent::DONE
;
730 WorkerEvent
e(this,etype
);
731 if (failed
) e
.setFailed();
732 wxGetApp().AddPendingEvent(e
);