1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
8 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@user.sourceforge.net>
9 // Licence: wxWindows license
10 /////////////////////////////////////////////////////////////////////////////
12 // ============================================================================
14 // ============================================================================
16 // ----------------------------------------------------------------------------
18 // ----------------------------------------------------------------------------
21 #include "wx/socket.h"
24 #include "wx/cmdline.h"
25 #include "wx/datetime.h"
27 #include "wx/thread.h"
29 wxChar
* GetSocketErrorMsg(int pSockError
)
33 case wxSOCKET_NOERROR
:
34 return wxT("wxSOCKET_NOERROR");
37 return wxT("wxSOCKET_INVOP");
40 return wxT("wxSOCKET_IOERR");
42 case wxSOCKET_INVADDR
:
43 return wxT("wxSOCKET_INVADDR");
46 return wxT("wxSOCKET_NOHOST");
48 case wxSOCKET_INVPORT
:
49 return wxT("wxSOCKET_INVPORT");
51 case wxSOCKET_WOULDBLOCK
:
52 return wxT("wxSOCKET_WOULDBLOCK");
54 case wxSOCKET_TIMEDOUT
:
55 return wxT("wxSOCKET_TIMEDOUT");
58 return wxT("wxSOCKET_MEMERR");
61 return wxT("Unknown");
66 //log output types for LogWorker helper function
74 //outputs log message with IP and TCP port number prepended
76 LogWorker(const wxIPV4address
& pAddr
, const wxString
& pMessage
, logWorker_t pType
= LOG_VERBOSE
)
78 wxString
msg(wxString::Format(wxT("%s:%d "),pAddr
.IPAddress().c_str(),pAddr
.Service()));
94 //event sent by workers to server class
95 //after client is served
96 const wxEventType wxEVT_WORKER
= wxNewEventType();
97 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
99 class WorkerEvent
: public wxEvent
{
101 WorkerEvent(void* pSender
)
104 SetEventType(wxEVT_WORKER
);
107 m_workerFailed
= false;
110 virtual wxEvent
* Clone() const
112 return new WorkerEvent(*this);
120 typedef void (wxEvtHandler::*WorkerEventFunction
)(WorkerEvent
&);
125 WX_DECLARE_LIST(ThreadWorker
, TList
);
126 WX_DECLARE_LIST(EventWorker
, EList
);
128 //main server class contains listening socket
129 //and list of two type worker classes that serve clients
130 class Server
: public wxApp
132 DECLARE_EVENT_TABLE();
134 Server() : m_maxConnections(-1) {}
144 virtual bool OnInit();
145 virtual int OnExit();
147 void OnInitCmdLine(wxCmdLineParser
& pParser
);
148 bool OnCmdLineParsed(wxCmdLineParser
& pParser
);
150 void OnSocketEvent(wxSocketEvent
& pEvent
);
151 void OnWorkerEvent(WorkerEvent
& pEvent
);
152 void OnTimerEvent(wxTimerEvent
& pEvent
);
153 void DumpStatistics();
155 TList m_threadWorkers
;
156 EList m_eventWorkers
;
158 wxSocketServer
* m_listeningSocket
;
161 unsigned m_threadWorkersCreated
;
162 unsigned m_threadWorkersDone
;
163 unsigned m_threadWorkersFailed
;
164 unsigned m_maxThreadWorkers
;
166 unsigned m_eventWorkersCreated
;
167 unsigned m_eventWorkersDone
;
168 unsigned m_eventWorkersFailed
;
169 unsigned m_maxEventWorkers
;
171 long int m_maxConnections
;
180 //thread based worker reads signature and all data first from connected client
181 //and resends data to client after reading
182 class ThreadWorker
: public wxThread
185 ThreadWorker(wxSocketBase
* pSocket
);
186 virtual ExitCode
Entry();
188 wxSocketBase
* m_socket
;
189 wxIPV4address m_peer
;
192 //event based worker reads signature and creates buffer for incoming data.
193 //When part of data arrives this worker resends it as soon as possible.
194 class EventWorker
: public wxEvtHandler
196 DECLARE_EVENT_TABLE();
198 EventWorker(wxSocketBase
* pSock
);
201 wxSocketBase
* m_socket
;
202 wxIPV4address m_peer
;
204 unsigned char m_signature
[2];
212 void OnSocketEvent(wxSocketEvent
& pEvent
);
217 /******************* Implementation ******************/
218 IMPLEMENT_APP_CONSOLE(Server
)
220 #include <wx/listimpl.cpp>
221 WX_DEFINE_LIST(TList
);
222 WX_DEFINE_LIST(EList
);
226 Server::OnInitCmdLine(wxCmdLineParser
& pParser
)
228 wxApp::OnInitCmdLine(pParser
);
229 pParser
.AddSwitch(wxT("t"),wxT("threads"),_("Use thread based workers only"));
230 pParser
.AddSwitch(wxT("e"),wxT("events"),_("Use event based workers only"));
231 pParser
.AddOption(wxT("m"),wxT("max"),_("Exit after <n> connections"),wxCMD_LINE_VAL_NUMBER
,wxCMD_LINE_PARAM_OPTIONAL
);
232 pParser
.AddOption(wxT("p"),wxT("port"),_("listen on given port (default 3000)"),wxCMD_LINE_VAL_NUMBER
,wxCMD_LINE_PARAM_OPTIONAL
);
236 Server::DumpStatistics()
242 mode
= _("Event based workers");
245 mode
= _("Thread based workers");
248 mode
= _("Event and thread based workers");
251 wxLogMessage(wxString::Format(wxT("Server mode: %s"),mode
.c_str()));
252 wxLogMessage(wxString::Format(wxT("\t\t\t\tThreads\tEvents\tTotal")));
253 wxLogMessage(wxString::Format(wxT("Workers created:\t\t%d\t%d\t%d"),m_threadWorkersCreated
,m_eventWorkersCreated
,m_threadWorkersCreated
+m_eventWorkersCreated
));
254 wxLogMessage(wxString::Format(wxT("Max concurrent workers:\t%d\t%d\t%d"),m_maxThreadWorkers
,m_maxEventWorkers
,m_maxThreadWorkers
+m_maxEventWorkers
));
255 wxLogMessage(wxString::Format(wxT("Workers failed:\t\t%d\t%d\t%d"),m_threadWorkersFailed
,m_eventWorkersFailed
,m_threadWorkersFailed
+m_eventWorkersFailed
));
256 wxLogMessage(wxString::Format(wxT("Workers done:\t\t%d\t%d\t%d"),m_threadWorkersDone
,m_eventWorkersDone
,m_threadWorkersDone
+m_eventWorkersDone
));
258 if ((int)(m_threadWorkersDone
+m_eventWorkersDone
) == m_maxConnections
)
260 wxLogMessage(wxT("%d connection(s) served, exiting"),m_maxConnections
);
267 Server::OnCmdLineParsed(wxCmdLineParser
& pParser
)
269 if (pParser
.Found(_("verbose")))
271 wxLog::AddTraceMask(wxT("wxSocket"));
272 wxLog::AddTraceMask(wxT("epolldispatcher"));
273 wxLog::AddTraceMask(wxT("selectdispatcher"));
274 wxLog::AddTraceMask(wxT("thread"));
275 wxLog::AddTraceMask(wxT("events"));
276 wxLog::AddTraceMask(wxT("timer"));
279 if (pParser
.Found(wxT("m"),&m_maxConnections
))
281 wxLogMessage(wxT("%d connection(s) to exit"),m_maxConnections
);
284 if (pParser
.Found(wxT("p"),&m_port
))
286 wxLogMessage(wxT("%d connection(s) to exit"),m_maxConnections
);
289 if (pParser
.Found(wxT("t")))
290 m_workMode
= THREADS
;
291 else if (pParser
.Found(wxT("e")))
296 return wxApp::OnCmdLineParsed(pParser
);
299 bool Server::OnInit()
301 wxLog
* logger
= new wxLogStderr();
302 wxLog::SetActiveTarget(logger
);
306 //send interesting things to console
307 if (!wxApp::OnInit())
310 //setup listening socket
313 m_listeningSocket
= new wxSocketServer(la
,wxSOCKET_NOWAIT
|wxSOCKET_REUSEADDR
);
314 m_listeningSocket
->SetEventHandler(*this);
315 m_listeningSocket
->SetNotify(wxSOCKET_CONNECTION_FLAG
);
316 m_listeningSocket
->Notify(true);
317 if (!m_listeningSocket
->Ok())
319 wxLogError(wxT("Cannot bind listening socket"));
323 m_threadWorkersCreated
= 0;
324 m_threadWorkersDone
= 0;
325 m_threadWorkersFailed
= 0;
326 m_maxThreadWorkers
= 0;
328 m_eventWorkersCreated
= 0;
329 m_eventWorkersDone
= 0;
330 m_eventWorkersFailed
= 0;
331 m_maxEventWorkers
= 0;
333 wxLogMessage(wxT("Server listening at port %d, waiting for connections"), m_port
);
340 for(TList::compatibility_iterator it
= m_threadWorkers
.GetFirst(); it
; it
= it
->GetNext()) {
341 it
->GetData()->Wait();
342 delete it
->GetData();
345 for(EList::compatibility_iterator it2
= m_eventWorkers
.GetFirst(); it2
; it2
->GetNext()) {
346 delete it2
->GetData();
349 m_threadWorkers
.Clear();
350 m_eventWorkers
.Clear();
351 m_listeningSocket
->Destroy();
355 void Server::OnSocketEvent(wxSocketEvent
& pEvent
)
357 switch(pEvent
.GetSocketEvent())
360 wxLogError(wxT("Unexpected wxSOCKET_INPUT in wxSocketServer"));
362 case wxSOCKET_OUTPUT
:
363 wxLogError(wxT("Unexpected wxSOCKET_OUTPUT in wxSocketServer"));
365 case wxSOCKET_CONNECTION
:
367 wxSocketBase
* sock
= m_listeningSocket
->Accept();
369 if (!sock
->GetPeer(addr
))
371 wxLogError(wxT("Server: cannot get peer info"));
373 wxLogMessage(wxT("Got connection from %s:%d"),addr
.IPAddress().c_str(), addr
.Service());
377 if (m_workMode
!= MIXED
)
378 createThread
= m_workMode
== THREADS
;
380 createThread
= (wxDateTime::Now().GetSecond())%2
== 0;
384 ThreadWorker
* c
= new ThreadWorker(sock
);
385 if (c
->Create() == wxTHREAD_NO_ERROR
)
387 m_threadWorkers
.Append(c
);
388 if (m_threadWorkers
.GetCount() > m_maxThreadWorkers
)
389 m_maxThreadWorkers
++;
390 m_threadWorkersCreated
++;
395 wxLogError(wxT("Server: cannot create next thread (current threads: %d"), m_threadWorkers
.size());
400 EventWorker
* w
= new EventWorker(sock
);
401 m_eventWorkers
.Append(w
);
402 if (m_eventWorkers
.GetCount() > m_maxEventWorkers
)
404 m_eventWorkersCreated
++;
409 wxLogError(wxT("Unexpected wxSOCKET_LOST in wxSocketServer"));
414 void Server::OnWorkerEvent(WorkerEvent
& pEvent
)
416 //wxLogMessage(wxT("Got worker event"));
417 for(TList::compatibility_iterator it
= m_threadWorkers
.GetFirst(); it
; it
= it
->GetNext())
419 if (it
->GetData() == pEvent
.m_sender
)
421 wxLogVerbose(wxT("Deleting thread worker (%d left)"),m_threadWorkers
.GetCount());
422 it
->GetData()->Wait();
423 delete it
->GetData();
424 m_threadWorkers
.DeleteNode(it
);
425 if (!pEvent
.m_workerFailed
)
426 m_threadWorkersDone
++;
428 m_threadWorkersFailed
++;
432 for(EList::compatibility_iterator it2
= m_eventWorkers
.GetFirst(); it2
; it2
= it2
->GetNext())
434 if (it2
->GetData() == pEvent
.m_sender
)
436 wxLogVerbose(wxT("Deleting event worker (%d left)"),m_eventWorkers
.GetCount());
437 delete it2
->GetData();
438 m_eventWorkers
.DeleteNode(it2
);
439 if (!pEvent
.m_workerFailed
)
440 m_eventWorkersDone
++;
442 m_eventWorkersFailed
++;
447 if (m_eventWorkers
.GetCount() == 0 && m_threadWorkers
.GetCount() == 0)
449 mTimer
.Start(1000,true);
453 void Server::OnTimerEvent(wxTimerEvent
&)
459 BEGIN_EVENT_TABLE(Server
,wxEvtHandler
)
460 EVT_SOCKET(wxID_ANY
,Server::OnSocketEvent
)
461 EVT_WORKER(Server::OnWorkerEvent
)
462 EVT_TIMER(wxID_ANY
,Server::OnTimerEvent
)
466 ThreadWorker::ThreadWorker(wxSocketBase
* pSocket
) : wxThread(wxTHREAD_JOINABLE
)
469 //Notify() cannot be called in thread context. We have to detach from main loop
470 //before switching thread contexts.
471 m_socket
->Notify(false);
472 m_socket
->SetFlags(wxSOCKET_WAITALL
|wxSOCKET_BLOCK
);
473 pSocket
->GetPeer(m_peer
);
476 wxThread::ExitCode
ThreadWorker::Entry()
479 if (!m_socket
->IsConnected())
481 LogWorker(m_peer
,wxT("ThreadWorker: not connected"),LOG_ERROR
);
485 if (m_socket
->IsConnected())
487 unsigned char signature
[2];
488 LogWorker(m_peer
,wxT("ThreadWorker: reading for data"));
492 m_socket
->Read(&signature
,to_process
);
493 if (m_socket
->Error())
495 LogWorker(m_peer
,wxT("ThreadWorker: Read error"),LOG_ERROR
);
496 wxGetApp().AddPendingEvent(e
);
499 to_process
-= m_socket
->LastCount();
500 LogWorker(m_peer
,wxString::Format(wxT("to_process: %d"),to_process
));
503 while (!m_socket
->Error() && to_process
!= 0);
505 if (signature
[0] == 0)
511 if (signature
[0] == 0xCE)
513 LogWorker(m_peer
,_("This server does not support test2 from GUI client"),LOG_ERROR
);
514 e
.m_workerFailed
= true;
518 int size
= signature
[1] * (signature
[0] == 0xBE ? 1 : 1024);
519 char* buf
= new char[size
];
520 LogWorker(m_peer
,wxString::Format(wxT("Message signature: chunks: %d, kilobytes: %d, size: %d (bytes)"),signature
[0],signature
[1],size
));
523 LogWorker(m_peer
,wxString::Format(wxT("ThreadWorker: reading %d bytes of data"),to_process
));
527 m_socket
->Read(buf
,to_process
);
528 if (m_socket
->Error())
530 LogWorker(m_peer
,wxT("ThreadWorker: Read error"),LOG_ERROR
);
531 wxGetApp().AddPendingEvent(e
);
534 to_process
-= m_socket
->LastCount();
535 LogWorker(m_peer
,wxString::Format(wxT("ThreadWorker: %d bytes readed, %d todo"),m_socket
->LastCount(),to_process
));
538 while(!m_socket
->Error() && to_process
!= 0);
544 m_socket
->Write(buf
,to_process
);
545 if (m_socket
->Error()) {
546 LogWorker(m_peer
,wxT("ThreadWorker: Write error"),LOG_ERROR
);
549 to_process
-= m_socket
->LastCount();
550 LogWorker(m_peer
,wxString::Format(wxT("ThreadWorker: %d bytes written, %d todo"),m_socket
->LastCount(),to_process
));
552 while(!m_socket
->Error() && to_process
!= 0);
555 LogWorker(m_peer
,wxT("ThreadWorker: done"));
556 e
.m_workerFailed
= to_process
!= 0;
558 wxGetApp().AddPendingEvent(e
);
562 EventWorker::EventWorker(wxSocketBase
* pSock
)
569 m_socket
->SetNotify(wxSOCKET_LOST_FLAG
|wxSOCKET_INPUT_FLAG
|wxSOCKET_OUTPUT_FLAG
);
570 m_socket
->Notify(true);
571 m_socket
->SetEventHandler(*this);
572 m_socket
->SetFlags(wxSOCKET_NOWAIT
);
573 m_socket
->GetPeer(m_peer
);
576 EventWorker::~EventWorker() {
583 EventWorker::DoRead()
587 //read message header
590 m_socket
->Read(m_signature
,2 - m_infill
);
591 if (m_socket
->Error()) {
592 if (m_socket
->LastError() != wxSOCKET_WOULDBLOCK
)
594 LogWorker(m_peer
,wxString::Format(wxT("Read error (%d): %s"),m_socket
->LastError(),GetSocketErrorMsg(m_socket
->LastError())),LOG_ERROR
);
600 m_infill
+= m_socket
->LastCount();
602 unsigned char chunks
= m_signature
[1];
603 unsigned char type
= m_signature
[0];
606 LogWorker(m_peer
,_("This server does not support test2 from GUI client"),LOG_ERROR
);
607 m_written
= -1; //wxSOCKET_LOST will interpret this as failure
610 else if (type
== 0xBE || type
== 0xDE)
612 m_size
= chunks
* (type
== 0xBE ? 1 : 1024);
613 m_inbuf
= new char[m_size
];
614 m_outbuf
= new char[m_size
];
618 LogWorker(m_peer
,wxString::Format(wxT("Message signature: len: %d, type: %s, size: %d (bytes)"),chunks
,type
== 0xBE ? wxT("b") : wxT("kB"),m_size
));
622 LogWorker(m_peer
,wxString::Format(wxT("Unknown test type %x"),type
));
628 while(!m_socket
->Error() && (2 - m_infill
!= 0));
635 if (m_size
== m_infill
) {
636 m_signature
[0] = m_signature
[1] = 0x0;
642 m_socket
->Read(m_inbuf
+ m_infill
,m_size
- m_infill
);
643 if (m_socket
->Error()) {
644 if (m_socket
->LastError() != wxSOCKET_WOULDBLOCK
)
648 wxString::Format(wxT("Read error (%d): %s"),
649 m_socket
->LastError(),
650 GetSocketErrorMsg(m_socket
->LastError())
659 memcpy(m_outbuf
+m_outfill
,m_inbuf
+m_infill
,m_socket
->LastCount());
660 m_infill
+= m_socket
->LastCount();
661 m_outfill
+= m_socket
->LastCount();
665 while(!m_socket
->Error());
668 void EventWorker::OnSocketEvent(wxSocketEvent
& pEvent
)
670 switch(pEvent
.GetSocketEvent())
675 case wxSOCKET_OUTPUT
:
679 case wxSOCKET_CONNECTION
:
680 LogWorker(m_peer
,wxString::Format(wxT("Unexpected wxSOCKET_CONNECTION in EventWorker")),LOG_ERROR
);
684 LogWorker(m_peer
,wxString::Format(wxT("Connection lost")));
686 e
.m_workerFailed
= m_written
!= m_size
;
687 wxGetApp().AddPendingEvent(e
);
693 void EventWorker::DoWrite() {
695 if (m_written
== m_size
)
700 LogWorker(m_peer
,wxString::Format(wxT("All data written")));
703 if (m_outfill
- m_written
== 0)
707 m_socket
->Write(m_outbuf
+ m_written
,m_outfill
- m_written
);
708 if (m_socket
->Error())
710 if (m_socket
->LastError() != wxSOCKET_WOULDBLOCK
)
713 wxString::Format(wxT("Write error (%d): %s"),
714 m_socket
->LastError(),
715 GetSocketErrorMsg(m_socket
->LastError())
723 LogWorker(m_peer
,wxString::Format(wxT("Write would block, waiting for OUTPUT event")));
728 memmove(m_outbuf
,m_outbuf
+m_socket
->LastCount(),m_outfill
-m_socket
->LastCount());
729 m_written
+= m_socket
->LastCount();
731 LogWorker(m_peer
,wxString::Format(wxT("Written %d of %d bytes, todo %d"),m_socket
->LastCount(),m_size
,m_size
- m_written
));
733 while (!m_socket
->Error());
736 BEGIN_EVENT_TABLE(EventWorker
,wxEvtHandler
)
737 EVT_SOCKET(wxID_ANY
,EventWorker::OnSocketEvent
)