no changes, just reformat and avoid unnecessary uses of wxString::Format()
[wxWidgets.git] / samples / sockets / baseserver.cpp
1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
5 // Modified by:
6 // Created: 27.06.2005
7 // RCS-ID: $Id$
8 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@user.sourceforge.net>
9 // Licence: wxWindows license
10 /////////////////////////////////////////////////////////////////////////////
11
12 // ============================================================================
13 // declarations
14 // ============================================================================
15
16 // ----------------------------------------------------------------------------
17 // headers
18 // ----------------------------------------------------------------------------
19
20 #include "wx/wx.h"
21 #include "wx/socket.h"
22 #include "wx/event.h"
23 #include "wx/list.h"
24 #include "wx/cmdline.h"
25 #include "wx/datetime.h"
26 #include "wx/timer.h"
27 #include "wx/thread.h"
28
29 const char *GetSocketErrorMsg(int pSockError)
30 {
31 switch(pSockError)
32 {
33 case wxSOCKET_NOERROR:
34 return "wxSOCKET_NOERROR";
35
36 case wxSOCKET_INVOP:
37 return "wxSOCKET_INVOP";
38
39 case wxSOCKET_IOERR:
40 return "wxSOCKET_IOERR";
41
42 case wxSOCKET_INVADDR:
43 return "wxSOCKET_INVADDR";
44
45 case wxSOCKET_NOHOST:
46 return "wxSOCKET_NOHOST";
47
48 case wxSOCKET_INVPORT:
49 return "wxSOCKET_INVPORT";
50
51 case wxSOCKET_WOULDBLOCK:
52 return "wxSOCKET_WOULDBLOCK";
53
54 case wxSOCKET_TIMEDOUT:
55 return "wxSOCKET_TIMEDOUT";
56
57 case wxSOCKET_MEMERR:
58 return "wxSOCKET_MEMERR";
59
60 default:
61 return "Unknown";
62 }
63 }
64
65 // outputs log message with IP and TCP port number prepended
66 void
67 LogWorker(const wxIPV4address& addr,
68 const wxString& msg,
69 wxLogLevel level = wxLOG_Info)
70 {
71 wxLogGeneric(level, "%s:%d %s", addr.IPAddress(), addr.Service(), msg);
72 }
73
74 //event sent by workers to server class
75 //after client is served
76 const wxEventType wxEVT_WORKER = wxNewEventType();
77 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
78
79 class WorkerEvent : public wxEvent
80 {
81 public:
82 WorkerEvent(void* pSender)
83 {
84 SetId(-1);
85 SetEventType(wxEVT_WORKER);
86 m_sender = pSender;
87 m_exit = false;
88 m_workerFailed = false;
89 }
90
91 virtual wxEvent* Clone() const
92 {
93 return new WorkerEvent(*this);
94 }
95
96 void* m_sender;
97 bool m_exit;
98 bool m_workerFailed;
99 };
100
101 typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);
102
103 class ThreadWorker;
104 class EventWorker;
105
106 WX_DECLARE_LIST(ThreadWorker, TList);
107 WX_DECLARE_LIST(EventWorker, EList);
108
109 //main server class contains listening socket
110 //and list of two type worker classes that serve clients
111 class Server : public wxApp
112 {
113 DECLARE_EVENT_TABLE();
114 public:
115 Server() : m_maxConnections(-1) {}
116 ~Server() {}
117 private:
118 enum WorkMode
119 {
120 MIXED,
121 THREADS,
122 EVENTS
123 };
124
125 virtual bool OnInit();
126 virtual int OnExit();
127
128 void OnInitCmdLine(wxCmdLineParser& pParser);
129 bool OnCmdLineParsed(wxCmdLineParser& pParser);
130
131 void OnSocketEvent(wxSocketEvent& pEvent);
132 void OnWorkerEvent(WorkerEvent& pEvent);
133 void OnTimerEvent(wxTimerEvent& pEvent);
134 void DumpStatistics();
135
136 TList m_threadWorkers;
137 EList m_eventWorkers;
138 WorkMode m_workMode;
139 wxSocketServer* m_listeningSocket;
140
141 // statistics
142 unsigned m_threadWorkersCreated;
143 unsigned m_threadWorkersDone;
144 unsigned m_threadWorkersFailed;
145 unsigned m_maxThreadWorkers;
146
147 unsigned m_eventWorkersCreated;
148 unsigned m_eventWorkersDone;
149 unsigned m_eventWorkersFailed;
150 unsigned m_maxEventWorkers;
151
152 long int m_maxConnections;
153
154 long m_port;
155
156 wxTimer mTimer;
157 };
158
159 DECLARE_APP(Server);
160
161 //thread based worker reads signature and all data first from connected client
162 //and resends data to client after reading
163 class ThreadWorker : public wxThread
164 {
165 public:
166 ThreadWorker(wxSocketBase* pSocket);
167 virtual ExitCode Entry();
168 private:
169 wxSocketBase* m_socket;
170 wxIPV4address m_peer;
171 };
172
173 //event based worker reads signature and creates buffer for incoming data.
174 //When part of data arrives this worker resends it as soon as possible.
175 class EventWorker : public wxEvtHandler
176 {
177 DECLARE_EVENT_TABLE();
178 public:
179 EventWorker(wxSocketBase* pSock);
180 ~EventWorker();
181 private:
182 wxSocketBase* m_socket;
183 wxIPV4address m_peer;
184
185 unsigned char m_signature[2];
186 char* m_inbuf;
187 int m_infill;
188 int m_size;
189 char* m_outbuf;
190 int m_outfill;
191 int m_written;
192
193 void OnSocketEvent(wxSocketEvent& pEvent);
194 void DoWrite();
195 void DoRead();
196 };
197
198 /******************* Implementation ******************/
199 IMPLEMENT_APP_CONSOLE(Server)
200
201 #include <wx/listimpl.cpp>
202 WX_DEFINE_LIST(TList);
203 WX_DEFINE_LIST(EList);
204
205
206 void
207 Server::OnInitCmdLine(wxCmdLineParser& pParser)
208 {
209 wxApp::OnInitCmdLine(pParser);
210 pParser.AddSwitch("t","threads",_("Use thread based workers only"));
211 pParser.AddSwitch("e","events",_("Use event based workers only"));
212 pParser.AddOption("m","max",_("Exit after <n> connections"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL);
213 pParser.AddOption("p","port",_("listen on given port (default 3000)"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL);
214 }
215
216 void
217 Server::DumpStatistics()
218 {
219 wxString mode;
220 switch(m_workMode)
221 {
222 case EVENTS:
223 mode = _("Event based workers");
224 break;
225 case THREADS:
226 mode = _("Thread based workers");
227 break;
228 case MIXED:
229 mode = _("Event and thread based workers");
230 break;
231 }
232 wxLogMessage("Server mode: %s",mode);
233 wxLogMessage("\t\t\t\tThreads\tEvents\tTotal");
234 wxLogMessage("Workers created:\t\t%d\t%d\t%d",
235 m_threadWorkersCreated,
236 m_eventWorkersCreated,
237 m_threadWorkersCreated + m_eventWorkersCreated);
238 wxLogMessage("Max concurrent workers:\t%d\t%d\t%d",
239 m_maxThreadWorkers,
240 m_maxEventWorkers,
241 m_maxThreadWorkers + m_maxEventWorkers);
242 wxLogMessage("Workers failed:\t\t%d\t%d\t%d",
243 m_threadWorkersFailed,
244 m_eventWorkersFailed,
245 m_threadWorkersFailed + m_eventWorkersFailed);
246 wxLogMessage("Workers done:\t\t%d\t%d\t%d",
247 m_threadWorkersDone,
248 m_eventWorkersDone,
249 m_threadWorkersDone + m_eventWorkersDone);
250
251 if ((int)(m_threadWorkersDone+m_eventWorkersDone) == m_maxConnections)
252 {
253 wxLogMessage("%d connection(s) served, exiting",m_maxConnections);
254 ExitMainLoop();
255 }
256 }
257
258
259 bool
260 Server::OnCmdLineParsed(wxCmdLineParser& pParser)
261 {
262 if (pParser.Found(_("verbose")))
263 {
264 wxLog::AddTraceMask("wxSocket");
265 wxLog::AddTraceMask("epolldispatcher");
266 wxLog::AddTraceMask("selectdispatcher");
267 wxLog::AddTraceMask("thread");
268 wxLog::AddTraceMask("events");
269 wxLog::AddTraceMask("timer");
270 }
271
272 if (pParser.Found("m",&m_maxConnections))
273 {
274 wxLogMessage("%d connection(s) to exit",m_maxConnections);
275 }
276
277 if (pParser.Found("p",&m_port))
278 {
279 wxLogMessage("%d connection(s) to exit",m_maxConnections);
280 }
281
282 if (pParser.Found("t"))
283 m_workMode = THREADS;
284 else if (pParser.Found("e"))
285 m_workMode = EVENTS;
286 else
287 m_workMode = MIXED;
288
289 return wxApp::OnCmdLineParsed(pParser);
290 }
291
292 bool Server::OnInit()
293 {
294 wxLog* logger = new wxLogStderr();
295 wxLog::SetActiveTarget(logger);
296
297 m_port = 3000;
298
299 //send interesting things to console
300 if (!wxApp::OnInit())
301 return false;
302
303 //setup listening socket
304 wxIPV4address la;
305 la.Service(m_port);
306 m_listeningSocket = new wxSocketServer(la,wxSOCKET_NOWAIT|wxSOCKET_REUSEADDR);
307 m_listeningSocket->SetEventHandler(*this);
308 m_listeningSocket->SetNotify(wxSOCKET_CONNECTION_FLAG);
309 m_listeningSocket->Notify(true);
310 if (!m_listeningSocket->Ok())
311 {
312 wxLogError("Cannot bind listening socket");
313 return false;
314 }
315
316 m_threadWorkersCreated = 0;
317 m_threadWorkersDone = 0;
318 m_threadWorkersFailed = 0;
319 m_maxThreadWorkers = 0;
320
321 m_eventWorkersCreated = 0;
322 m_eventWorkersDone = 0;
323 m_eventWorkersFailed = 0;
324 m_maxEventWorkers = 0;
325
326 wxLogMessage("Server listening at port %d, waiting for connections", m_port);
327 return true;
328 }
329
330
331 int Server::OnExit()
332 {
333 for ( TList::compatibility_iterator it = m_threadWorkers.GetFirst();
334 it;
335 it = it->GetNext() )
336 {
337 it->GetData()->Wait();
338 delete it->GetData();
339 }
340
341 for ( EList::compatibility_iterator it2 = m_eventWorkers.GetFirst();
342 it2;
343 it2->GetNext() )
344 {
345 delete it2->GetData();
346 }
347
348 m_threadWorkers.Clear();
349 m_eventWorkers.Clear();
350 m_listeningSocket->Destroy();
351 return 0;
352 }
353
354 void Server::OnSocketEvent(wxSocketEvent& pEvent)
355 {
356 switch(pEvent.GetSocketEvent())
357 {
358 case wxSOCKET_INPUT:
359 wxLogError("Unexpected wxSOCKET_INPUT in wxSocketServer");
360 break;
361 case wxSOCKET_OUTPUT:
362 wxLogError("Unexpected wxSOCKET_OUTPUT in wxSocketServer");
363 break;
364 case wxSOCKET_CONNECTION:
365 {
366 wxSocketBase* sock = m_listeningSocket->Accept();
367 wxIPV4address addr;
368 if (!sock->GetPeer(addr))
369 {
370 wxLogError("Server: cannot get peer info");
371 } else {
372 wxLogMessage("Got connection from %s:%d",addr.IPAddress().c_str(), addr.Service());
373 }
374 bool createThread;
375
376 if (m_workMode != MIXED)
377 createThread = m_workMode == THREADS;
378 else
379 createThread = (wxDateTime::Now().GetSecond())%2 == 0;
380
381 if (createThread)
382 {
383 ThreadWorker* c = new ThreadWorker(sock);
384 if (c->Create() == wxTHREAD_NO_ERROR)
385 {
386 m_threadWorkers.Append(c);
387 if (m_threadWorkers.GetCount() > m_maxThreadWorkers)
388 m_maxThreadWorkers++;
389 m_threadWorkersCreated++;
390 c->Run();
391 }
392 else
393 {
394 wxLogError("Server: cannot create next thread (current threads: %d", m_threadWorkers.size());
395 };
396 }
397 else
398 {
399 EventWorker* w = new EventWorker(sock);
400 m_eventWorkers.Append(w);
401 if (m_eventWorkers.GetCount() > m_maxEventWorkers)
402 m_maxEventWorkers++;
403 m_eventWorkersCreated++;
404 }
405 }
406 break;
407 case wxSOCKET_LOST:
408 wxLogError("Unexpected wxSOCKET_LOST in wxSocketServer");
409 break;
410 }
411 }
412
413 void Server::OnWorkerEvent(WorkerEvent& pEvent)
414 {
415 //wxLogMessage("Got worker event");
416 for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext())
417 {
418 if (it->GetData() == pEvent.m_sender)
419 {
420 wxLogVerbose("Deleting thread worker (%d left)",
421 m_threadWorkers.GetCount());
422 it->GetData()->Wait();
423 delete it->GetData();
424 m_threadWorkers.DeleteNode(it);
425 if (!pEvent.m_workerFailed)
426 m_threadWorkersDone++;
427 else
428 m_threadWorkersFailed++;
429 break;
430 }
431 }
432 for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
433 {
434 if (it2->GetData() == pEvent.m_sender)
435 {
436 wxLogVerbose("Deleting event worker (%d left)",
437 m_eventWorkers.GetCount());
438 delete it2->GetData();
439 m_eventWorkers.DeleteNode(it2);
440 if (!pEvent.m_workerFailed)
441 m_eventWorkersDone++;
442 else
443 m_eventWorkersFailed++;
444 break;
445 }
446 }
447
448 if (m_eventWorkers.GetCount() == 0 && m_threadWorkers.GetCount() == 0)
449 {
450 mTimer.Start(1000,true);
451 }
452 }
453
454 void Server::OnTimerEvent(wxTimerEvent&)
455 {
456 DumpStatistics();
457 }
458
459
460 BEGIN_EVENT_TABLE(Server,wxEvtHandler)
461 EVT_SOCKET(wxID_ANY,Server::OnSocketEvent)
462 EVT_WORKER(Server::OnWorkerEvent)
463 EVT_TIMER(wxID_ANY,Server::OnTimerEvent)
464 END_EVENT_TABLE()
465
466
467 ThreadWorker::ThreadWorker(wxSocketBase* pSocket) : wxThread(wxTHREAD_JOINABLE)
468 {
469 m_socket = pSocket;
470 //Notify() cannot be called in thread context. We have to detach from main loop
471 //before switching thread contexts.
472 m_socket->Notify(false);
473 m_socket->SetFlags(wxSOCKET_WAITALL|wxSOCKET_BLOCK);
474 pSocket->GetPeer(m_peer);
475 }
476
477 wxThread::ExitCode ThreadWorker::Entry()
478 {
479 WorkerEvent e(this);
480 if (!m_socket->IsConnected())
481 {
482 LogWorker(m_peer,"ThreadWorker: not connected",wxLOG_Error);
483 return 0;
484 }
485 int to_process = -1;
486 if (m_socket->IsConnected())
487 {
488 unsigned char signature[2];
489 LogWorker(m_peer,"ThreadWorker: reading for data");
490 to_process = 2;
491 do
492 {
493 m_socket->Read(&signature,to_process);
494 if (m_socket->Error())
495 {
496 LogWorker(m_peer,"ThreadWorker: Read error",wxLOG_Error);
497 wxGetApp().AddPendingEvent(e);
498 return 0;
499 }
500 to_process -= m_socket->LastCount();
501 LogWorker(m_peer,wxString::Format("to_process: %d",to_process));
502
503 }
504 while (!m_socket->Error() && to_process != 0);
505
506 if (signature[0] == 0)
507 {
508 e.m_exit = true;
509 return 0;
510 }
511
512 if (signature[0] == 0xCE)
513 {
514 LogWorker(m_peer,_("This server does not support test2 from GUI client"),wxLOG_Error);
515 e.m_workerFailed = true;
516 e.m_exit = true;
517 return 0;
518 }
519 int size = signature[1] * (signature[0] == 0xBE ? 1 : 1024);
520 char* buf = new char[size];
521 LogWorker(m_peer,wxString::Format("Message signature: chunks: %d, kilobytes: %d, size: %d (bytes)",signature[0],signature[1],size));
522
523 to_process = size;
524 LogWorker(m_peer,wxString::Format("ThreadWorker: reading %d bytes of data",to_process));
525
526 do
527 {
528 m_socket->Read(buf,to_process);
529 if (m_socket->Error())
530 {
531 LogWorker(m_peer,"ThreadWorker: Read error",wxLOG_Error);
532 wxGetApp().AddPendingEvent(e);
533 return 0;
534 }
535 to_process -= m_socket->LastCount();
536 LogWorker(m_peer,wxString::Format("ThreadWorker: %d bytes readed, %d todo",m_socket->LastCount(),to_process));
537
538 }
539 while(!m_socket->Error() && to_process != 0);
540
541 to_process = size;
542
543 do
544 {
545 m_socket->Write(buf,to_process);
546 if (m_socket->Error())
547 {
548 LogWorker(m_peer,"ThreadWorker: Write error",wxLOG_Error);
549 break;
550 }
551 to_process -= m_socket->LastCount();
552 LogWorker(m_peer,wxString::Format("ThreadWorker: %d bytes written, %d todo",m_socket->LastCount(),to_process));
553 }
554 while(!m_socket->Error() && to_process != 0);
555 }
556
557 LogWorker(m_peer,"ThreadWorker: done");
558 e.m_workerFailed = to_process != 0;
559 m_socket->Destroy();
560 wxGetApp().AddPendingEvent(e);
561 return 0;
562 }
563
564 EventWorker::EventWorker(wxSocketBase* pSock)
565 : m_socket(pSock),
566 m_inbuf(NULL),
567 m_infill(0),
568 m_outbuf(NULL),
569 m_outfill(0)
570 {
571 m_socket->SetNotify(wxSOCKET_LOST_FLAG|wxSOCKET_INPUT_FLAG|wxSOCKET_OUTPUT_FLAG);
572 m_socket->Notify(true);
573 m_socket->SetEventHandler(*this);
574 m_socket->SetFlags(wxSOCKET_NOWAIT);
575 m_socket->GetPeer(m_peer);
576 }
577
578 EventWorker::~EventWorker()
579 {
580 m_socket->Destroy();
581 delete [] m_inbuf;
582 delete [] m_outbuf;
583 }
584
585 void
586 EventWorker::DoRead()
587 {
588 if (m_inbuf == NULL)
589 {
590 //read message header
591 do
592 {
593 m_socket->Read(m_signature,2 - m_infill);
594 if (m_socket->Error())
595 {
596 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
597 {
598 LogWorker(m_peer,wxString::Format("Read error (%d): %s",m_socket->LastError(),GetSocketErrorMsg(m_socket->LastError())),wxLOG_Error);
599 m_socket->Close();
600 }
601 }
602 else
603 {
604 m_infill += m_socket->LastCount();
605 if (m_infill == 2)
606 {
607 unsigned char chunks = m_signature[1];
608 unsigned char type = m_signature[0];
609 if (type == 0xCE)
610 {
611 LogWorker(m_peer,_("This server does not support test2 from GUI client"),wxLOG_Error);
612 m_written = -1; //wxSOCKET_LOST will interpret this as failure
613 m_socket->Close();
614 }
615 else if (type == 0xBE || type == 0xDE)
616 {
617 m_size = chunks * (type == 0xBE ? 1 : 1024);
618 m_inbuf = new char[m_size];
619 m_outbuf = new char[m_size];
620 m_infill = 0;
621 m_outfill = 0;
622 m_written = 0;
623 LogWorker(m_peer,wxString::Format("Message signature: len: %d, type: %s, size: %d (bytes)",chunks,type == 0xBE ? "b" : "kB",m_size));
624 break;
625 } else
626 {
627 LogWorker(m_peer,wxString::Format("Unknown test type %x",type));
628 m_socket->Close();
629 }
630 }
631 }
632 }
633 while(!m_socket->Error() && (2 - m_infill != 0));
634 }
635
636 if (m_inbuf == NULL)
637 return;
638 //read message data
639 do
640 {
641 if (m_size == m_infill)
642 {
643 m_signature[0] = m_signature[1] = 0x0;
644 delete [] m_inbuf;
645 m_inbuf = NULL;
646 m_infill = 0;
647 return;
648 }
649 m_socket->Read(m_inbuf + m_infill,m_size - m_infill);
650 if (m_socket->Error())
651 {
652 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
653 {
654 LogWorker(
655 m_peer,
656 wxString::Format("Read error (%d): %s",
657 m_socket->LastError(),
658 GetSocketErrorMsg(m_socket->LastError())
659 ),
660 wxLOG_Error);
661
662 m_socket->Close();
663 }
664 }
665 else
666 {
667 memcpy(m_outbuf+m_outfill,m_inbuf+m_infill,m_socket->LastCount());
668 m_infill += m_socket->LastCount();
669 m_outfill += m_socket->LastCount();
670 DoWrite();
671 }
672 }
673 while(!m_socket->Error());
674 };
675
676 void EventWorker::OnSocketEvent(wxSocketEvent& pEvent)
677 {
678 switch(pEvent.GetSocketEvent())
679 {
680 case wxSOCKET_INPUT:
681 DoRead();
682 break;
683
684 case wxSOCKET_OUTPUT:
685 if (m_inbuf != NULL)
686 DoWrite();
687 break;
688
689 case wxSOCKET_CONNECTION:
690 LogWorker(m_peer,wxString::Format("Unexpected wxSOCKET_CONNECTION in EventWorker"),wxLOG_Error);
691 break;
692
693 case wxSOCKET_LOST:
694 {
695 LogWorker(m_peer,wxString::Format("Connection lost"));
696 WorkerEvent e(this);
697 e.m_workerFailed = m_written != m_size;
698 wxGetApp().AddPendingEvent(e);
699 }
700 break;
701 }
702 }
703
704 void EventWorker::DoWrite()
705 {
706 do
707 {
708 if (m_written == m_size)
709 {
710 delete [] m_outbuf;
711 m_outbuf = NULL;
712 m_outfill = 0;
713 LogWorker(m_peer, "All data written");
714 return;
715 }
716 if (m_outfill - m_written == 0)
717 {
718 return;
719 }
720 m_socket->Write(m_outbuf + m_written,m_outfill - m_written);
721 if (m_socket->Error())
722 {
723 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
724 {
725 LogWorker(m_peer,
726 wxString::Format("Write error (%d): %s",
727 m_socket->LastError(),
728 GetSocketErrorMsg(m_socket->LastError())
729 )
730 ,wxLOG_Error
731 );
732 m_socket->Close();
733 }
734 else
735 {
736 LogWorker(m_peer,"Write would block, waiting for OUTPUT event");
737 }
738 }
739 else
740 {
741 memmove(m_outbuf,m_outbuf+m_socket->LastCount(),m_outfill-m_socket->LastCount());
742 m_written += m_socket->LastCount();
743 }
744 LogWorker(m_peer,wxString::Format("Written %d of %d bytes, todo %d",m_socket->LastCount(),m_size,m_size - m_written));
745 }
746 while (!m_socket->Error());
747 }
748
749 BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
750 EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
751 END_EVENT_TABLE()