]> git.saurik.com Git - wxWidgets.git/blob - samples/sockets/baseserver.cpp
use WXDLLIMPEXP_CORE, not WXDLLIMPEXP_BASE for wxThreadEvent
[wxWidgets.git] / samples / sockets / baseserver.cpp
1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
5 // Modified by:
6 // Created: 27.06.2005
7 // RCS-ID: $Id$
8 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@user.sourceforge.net>
9 // Licence: wxWindows license
10 /////////////////////////////////////////////////////////////////////////////
11
12 // ============================================================================
13 // declarations
14 // ============================================================================
15
16 // ----------------------------------------------------------------------------
17 // headers
18 // ----------------------------------------------------------------------------
19
20 #include "wx/wx.h"
21 #include "wx/socket.h"
22 #include "wx/event.h"
23 #include "wx/list.h"
24 #include "wx/cmdline.h"
25 #include "wx/datetime.h"
26 #include "wx/timer.h"
27 #include "wx/thread.h"
28
29 const char *GetSocketErrorMsg(int pSockError)
30 {
31 switch(pSockError)
32 {
33 case wxSOCKET_NOERROR:
34 return "wxSOCKET_NOERROR";
35
36 case wxSOCKET_INVOP:
37 return "wxSOCKET_INVOP";
38
39 case wxSOCKET_IOERR:
40 return "wxSOCKET_IOERR";
41
42 case wxSOCKET_INVADDR:
43 return "wxSOCKET_INVADDR";
44
45 case wxSOCKET_NOHOST:
46 return "wxSOCKET_NOHOST";
47
48 case wxSOCKET_INVPORT:
49 return "wxSOCKET_INVPORT";
50
51 case wxSOCKET_WOULDBLOCK:
52 return "wxSOCKET_WOULDBLOCK";
53
54 case wxSOCKET_TIMEDOUT:
55 return "wxSOCKET_TIMEDOUT";
56
57 case wxSOCKET_MEMERR:
58 return "wxSOCKET_MEMERR";
59
60 default:
61 return "Unknown";
62 }
63 }
64
65 //event sent by workers to server class
66 //after client is served
67 const wxEventType wxEVT_WORKER = wxNewEventType();
68 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
69
70 class WorkerEvent : public wxEvent
71 {
72 public:
73 WorkerEvent(void* pSender)
74 {
75 SetId(-1);
76 SetEventType(wxEVT_WORKER);
77 m_sender = pSender;
78 m_exit = false;
79 m_workerFailed = false;
80 }
81
82 virtual wxEvent* Clone() const
83 {
84 return new WorkerEvent(*this);
85 }
86
87 void* m_sender;
88 bool m_exit;
89 bool m_workerFailed;
90 };
91
92 typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);
93
94 class ThreadWorker;
95 class EventWorker;
96
97 WX_DECLARE_LIST(ThreadWorker, TList);
98 WX_DECLARE_LIST(EventWorker, EList);
99
100 //main server class contains listening socket
101 //and list of two type worker classes that serve clients
102 class Server : public wxApp
103 {
104 DECLARE_EVENT_TABLE()
105 public:
106 Server() : m_maxConnections(-1) {}
107 ~Server() {}
108 private:
109 enum WorkMode
110 {
111 MIXED,
112 THREADS,
113 EVENTS
114 };
115
116 virtual bool OnInit();
117 virtual int OnExit();
118
119 void OnInitCmdLine(wxCmdLineParser& pParser);
120 bool OnCmdLineParsed(wxCmdLineParser& pParser);
121
122 void OnSocketEvent(wxSocketEvent& pEvent);
123 void OnWorkerEvent(WorkerEvent& pEvent);
124 void OnTimerEvent(wxTimerEvent& pEvent);
125 void DumpStatistics();
126
127 TList m_threadWorkers;
128 EList m_eventWorkers;
129 WorkMode m_workMode;
130 wxSocketServer* m_listeningSocket;
131
132 // statistics
133 unsigned m_threadWorkersCreated;
134 unsigned m_threadWorkersDone;
135 unsigned m_threadWorkersFailed;
136 unsigned m_maxThreadWorkers;
137
138 unsigned m_eventWorkersCreated;
139 unsigned m_eventWorkersDone;
140 unsigned m_eventWorkersFailed;
141 unsigned m_maxEventWorkers;
142
143 long int m_maxConnections;
144
145 long m_port;
146
147 wxTimer mTimer;
148 };
149
150 DECLARE_APP(Server);
151
152 // just some common things shared between ThreadWorker and EventWorker
153 class WorkerBase
154 {
155 protected:
156 // outputs log message with IP and TCP port number prepended
157 void LogWorker(const wxString& msg, wxLogLevel level = wxLOG_Info)
158 {
159 wxLogGeneric(level,
160 "%s:%d %s", m_peer.IPAddress(), m_peer.Service(), msg);
161 }
162
163 wxIPV4address m_peer;
164 };
165
166 //thread based worker reads signature and all data first from connected client
167 //and resends data to client after reading
168 class ThreadWorker : public wxThread, private WorkerBase
169 {
170 public:
171 ThreadWorker(wxSocketBase* pSocket);
172 virtual ExitCode Entry();
173
174 private:
175 wxSocketBase* m_socket;
176 };
177
178 //event based worker reads signature and creates buffer for incoming data.
179 //When part of data arrives this worker resends it as soon as possible.
180 class EventWorker : public wxEvtHandler, private WorkerBase
181 {
182 public:
183 EventWorker(wxSocketBase* pSock);
184 virtual ~EventWorker();
185
186 private:
187 wxSocketBase* m_socket;
188
189 unsigned char m_signature[2];
190 char* m_inbuf;
191 int m_infill;
192 int m_size;
193 char* m_outbuf;
194 int m_outfill;
195 int m_written;
196
197 void OnSocketEvent(wxSocketEvent& pEvent);
198 void DoWrite();
199 void DoRead();
200
201 DECLARE_EVENT_TABLE()
202 };
203
204 /******************* Implementation ******************/
205 IMPLEMENT_APP_CONSOLE(Server)
206
207 #include <wx/listimpl.cpp>
208 WX_DEFINE_LIST(TList);
209 WX_DEFINE_LIST(EList);
210
211
212 void
213 Server::OnInitCmdLine(wxCmdLineParser& pParser)
214 {
215 wxApp::OnInitCmdLine(pParser);
216 pParser.AddSwitch("t","threads","Use thread based workers only");
217 pParser.AddSwitch("e","events","Use event based workers only");
218 pParser.AddOption("m","max","Exit after <n> connections",
219 wxCMD_LINE_VAL_NUMBER);
220 pParser.AddOption("p","port","listen on given port (default 3000)",
221 wxCMD_LINE_VAL_NUMBER);
222 }
223
224 void
225 Server::DumpStatistics()
226 {
227 wxString mode;
228 switch(m_workMode)
229 {
230 case EVENTS:
231 mode = "Event based workers";
232 break;
233 case THREADS:
234 mode = "Thread based workers";
235 break;
236 case MIXED:
237 mode = "Event and thread based workers";
238 break;
239 }
240 wxLogMessage("Server mode: %s",mode);
241 wxLogMessage("\t\t\t\tThreads\tEvents\tTotal");
242 wxLogMessage("Workers created:\t\t%d\t%d\t%d",
243 m_threadWorkersCreated,
244 m_eventWorkersCreated,
245 m_threadWorkersCreated + m_eventWorkersCreated);
246 wxLogMessage("Max concurrent workers:\t%d\t%d\t%d",
247 m_maxThreadWorkers,
248 m_maxEventWorkers,
249 m_maxThreadWorkers + m_maxEventWorkers);
250 wxLogMessage("Workers failed:\t\t%d\t%d\t%d",
251 m_threadWorkersFailed,
252 m_eventWorkersFailed,
253 m_threadWorkersFailed + m_eventWorkersFailed);
254 wxLogMessage("Workers done:\t\t%d\t%d\t%d",
255 m_threadWorkersDone,
256 m_eventWorkersDone,
257 m_threadWorkersDone + m_eventWorkersDone);
258
259 if ((int)(m_threadWorkersDone+m_eventWorkersDone) == m_maxConnections)
260 {
261 wxLogMessage("%d connection(s) served, exiting",m_maxConnections);
262 ExitMainLoop();
263 }
264 }
265
266
267 bool
268 Server::OnCmdLineParsed(wxCmdLineParser& pParser)
269 {
270 if (pParser.Found("verbose"))
271 {
272 wxLog::AddTraceMask("wxSocket");
273 wxLog::AddTraceMask("epolldispatcher");
274 wxLog::AddTraceMask("selectdispatcher");
275 wxLog::AddTraceMask("thread");
276 wxLog::AddTraceMask("events");
277 wxLog::AddTraceMask("timer");
278 }
279
280 if (pParser.Found("m",&m_maxConnections))
281 {
282 wxLogMessage("%d connection(s) to exit",m_maxConnections);
283 }
284
285 if (pParser.Found("p",&m_port))
286 {
287 wxLogMessage("%d connection(s) to exit",m_maxConnections);
288 }
289
290 if (pParser.Found("t"))
291 m_workMode = THREADS;
292 else if (pParser.Found("e"))
293 m_workMode = EVENTS;
294 else
295 m_workMode = MIXED;
296
297 return wxApp::OnCmdLineParsed(pParser);
298 }
299
300 bool Server::OnInit()
301 {
302 wxLog* logger = new wxLogStderr();
303 wxLog::SetActiveTarget(logger);
304
305 m_port = 3000;
306
307 //send interesting things to console
308 if (!wxApp::OnInit())
309 return false;
310
311 //setup listening socket
312 wxIPV4address la;
313 la.Service(m_port);
314 m_listeningSocket = new wxSocketServer(la,wxSOCKET_NOWAIT|wxSOCKET_REUSEADDR);
315 m_listeningSocket->SetEventHandler(*this);
316 m_listeningSocket->SetNotify(wxSOCKET_CONNECTION_FLAG);
317 m_listeningSocket->Notify(true);
318 if (!m_listeningSocket->Ok())
319 {
320 wxLogError("Cannot bind listening socket");
321 return false;
322 }
323
324 m_threadWorkersCreated = 0;
325 m_threadWorkersDone = 0;
326 m_threadWorkersFailed = 0;
327 m_maxThreadWorkers = 0;
328
329 m_eventWorkersCreated = 0;
330 m_eventWorkersDone = 0;
331 m_eventWorkersFailed = 0;
332 m_maxEventWorkers = 0;
333
334 wxLogMessage("Server listening at port %d, waiting for connections", m_port);
335 return true;
336 }
337
338
339 int Server::OnExit()
340 {
341 for ( TList::compatibility_iterator it = m_threadWorkers.GetFirst();
342 it;
343 it = it->GetNext() )
344 {
345 it->GetData()->Wait();
346 delete it->GetData();
347 }
348
349 for ( EList::compatibility_iterator it2 = m_eventWorkers.GetFirst();
350 it2;
351 it2->GetNext() )
352 {
353 delete it2->GetData();
354 }
355
356 m_threadWorkers.Clear();
357 m_eventWorkers.Clear();
358 m_listeningSocket->Destroy();
359 return 0;
360 }
361
362 void Server::OnSocketEvent(wxSocketEvent& pEvent)
363 {
364 switch(pEvent.GetSocketEvent())
365 {
366 case wxSOCKET_INPUT:
367 wxLogError("Unexpected wxSOCKET_INPUT in wxSocketServer");
368 break;
369 case wxSOCKET_OUTPUT:
370 wxLogError("Unexpected wxSOCKET_OUTPUT in wxSocketServer");
371 break;
372 case wxSOCKET_CONNECTION:
373 {
374 wxSocketBase* sock = m_listeningSocket->Accept();
375 wxIPV4address addr;
376 if (!sock->GetPeer(addr))
377 {
378 wxLogError("Server: cannot get peer info");
379 } else {
380 wxLogMessage("Got connection from %s:%d",addr.IPAddress().c_str(), addr.Service());
381 }
382 bool createThread;
383
384 if (m_workMode != MIXED)
385 createThread = m_workMode == THREADS;
386 else
387 createThread = (wxDateTime::Now().GetSecond())%2 == 0;
388
389 if (createThread)
390 {
391 ThreadWorker* c = new ThreadWorker(sock);
392 if (c->Create() == wxTHREAD_NO_ERROR)
393 {
394 m_threadWorkers.Append(c);
395 if (m_threadWorkers.GetCount() > m_maxThreadWorkers)
396 m_maxThreadWorkers++;
397 m_threadWorkersCreated++;
398 c->Run();
399 }
400 else
401 {
402 wxLogError("Server: cannot create next thread (current threads: %d", m_threadWorkers.size());
403 };
404 }
405 else
406 {
407 EventWorker* w = new EventWorker(sock);
408 m_eventWorkers.Append(w);
409 if (m_eventWorkers.GetCount() > m_maxEventWorkers)
410 m_maxEventWorkers++;
411 m_eventWorkersCreated++;
412 }
413 }
414 break;
415 case wxSOCKET_LOST:
416 wxLogError("Unexpected wxSOCKET_LOST in wxSocketServer");
417 break;
418 }
419 }
420
421 void Server::OnWorkerEvent(WorkerEvent& pEvent)
422 {
423 //wxLogMessage("Got worker event");
424 for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext())
425 {
426 if (it->GetData() == pEvent.m_sender)
427 {
428 wxLogVerbose("Deleting thread worker (%d left)",
429 m_threadWorkers.GetCount());
430 it->GetData()->Wait();
431 delete it->GetData();
432 m_threadWorkers.DeleteNode(it);
433 if (!pEvent.m_workerFailed)
434 m_threadWorkersDone++;
435 else
436 m_threadWorkersFailed++;
437 break;
438 }
439 }
440 for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
441 {
442 if (it2->GetData() == pEvent.m_sender)
443 {
444 wxLogVerbose("Deleting event worker (%d left)",
445 m_eventWorkers.GetCount());
446 delete it2->GetData();
447 m_eventWorkers.DeleteNode(it2);
448 if (!pEvent.m_workerFailed)
449 m_eventWorkersDone++;
450 else
451 m_eventWorkersFailed++;
452 break;
453 }
454 }
455
456 if (m_eventWorkers.GetCount() == 0 && m_threadWorkers.GetCount() == 0)
457 {
458 mTimer.Start(1000,true);
459 }
460 }
461
462 void Server::OnTimerEvent(wxTimerEvent&)
463 {
464 DumpStatistics();
465 }
466
467
468 BEGIN_EVENT_TABLE(Server,wxEvtHandler)
469 EVT_SOCKET(wxID_ANY,Server::OnSocketEvent)
470 EVT_WORKER(Server::OnWorkerEvent)
471 EVT_TIMER(wxID_ANY,Server::OnTimerEvent)
472 END_EVENT_TABLE()
473
474
475 ThreadWorker::ThreadWorker(wxSocketBase* pSocket) : wxThread(wxTHREAD_JOINABLE)
476 {
477 m_socket = pSocket;
478 //Notify() cannot be called in thread context. We have to detach from main loop
479 //before switching thread contexts.
480 m_socket->Notify(false);
481 m_socket->SetFlags(wxSOCKET_WAITALL|wxSOCKET_BLOCK);
482 pSocket->GetPeer(m_peer);
483 }
484
485 wxThread::ExitCode ThreadWorker::Entry()
486 {
487 WorkerEvent e(this);
488 if (!m_socket->IsConnected())
489 {
490 LogWorker("ThreadWorker: not connected",wxLOG_Error);
491 return 0;
492 }
493 int to_process = -1;
494 if (m_socket->IsConnected())
495 {
496 unsigned char signature[2];
497 LogWorker("ThreadWorker: reading for data");
498 to_process = 2;
499 do
500 {
501 m_socket->Read(&signature,to_process);
502 if (m_socket->Error())
503 {
504 LogWorker("ThreadWorker: Read error",wxLOG_Error);
505 wxGetApp().AddPendingEvent(e);
506 return 0;
507 }
508 to_process -= m_socket->LastCount();
509 LogWorker(wxString::Format("to_process: %d",to_process));
510
511 }
512 while (!m_socket->Error() && to_process != 0);
513
514 if (signature[0] == 0)
515 {
516 e.m_exit = true;
517 return 0;
518 }
519
520 if (signature[0] == 0xCE)
521 {
522 LogWorker("This server does not support test2 from GUI client",wxLOG_Error);
523 e.m_workerFailed = true;
524 e.m_exit = true;
525 return 0;
526 }
527 int size = signature[1] * (signature[0] == 0xBE ? 1 : 1024);
528 char* buf = new char[size];
529 LogWorker(wxString::Format("Message signature: chunks: %d, kilobytes: %d, size: %d (bytes)",signature[0],signature[1],size));
530
531 to_process = size;
532 LogWorker(wxString::Format("ThreadWorker: reading %d bytes of data",to_process));
533
534 do
535 {
536 m_socket->Read(buf,to_process);
537 if (m_socket->Error())
538 {
539 LogWorker("ThreadWorker: Read error",wxLOG_Error);
540 wxGetApp().AddPendingEvent(e);
541 return 0;
542 }
543 to_process -= m_socket->LastCount();
544 LogWorker(wxString::Format("ThreadWorker: %d bytes readed, %d todo",m_socket->LastCount(),to_process));
545
546 }
547 while(!m_socket->Error() && to_process != 0);
548
549 to_process = size;
550
551 do
552 {
553 m_socket->Write(buf,to_process);
554 if (m_socket->Error())
555 {
556 LogWorker("ThreadWorker: Write error",wxLOG_Error);
557 break;
558 }
559 to_process -= m_socket->LastCount();
560 LogWorker(wxString::Format("ThreadWorker: %d bytes written, %d todo",m_socket->LastCount(),to_process));
561 }
562 while(!m_socket->Error() && to_process != 0);
563 }
564
565 LogWorker("ThreadWorker: done");
566 e.m_workerFailed = to_process != 0;
567 m_socket->Destroy();
568 wxGetApp().AddPendingEvent(e);
569 return 0;
570 }
571
572 EventWorker::EventWorker(wxSocketBase* pSock)
573 : m_socket(pSock),
574 m_inbuf(NULL),
575 m_infill(0),
576 m_outbuf(NULL),
577 m_outfill(0)
578 {
579 m_socket->SetNotify(wxSOCKET_LOST_FLAG|wxSOCKET_INPUT_FLAG|wxSOCKET_OUTPUT_FLAG);
580 m_socket->Notify(true);
581 m_socket->SetEventHandler(*this);
582 m_socket->SetFlags(wxSOCKET_NOWAIT);
583 m_socket->GetPeer(m_peer);
584 }
585
586 EventWorker::~EventWorker()
587 {
588 m_socket->Destroy();
589 delete [] m_inbuf;
590 delete [] m_outbuf;
591 }
592
593 void
594 EventWorker::DoRead()
595 {
596 if (m_inbuf == NULL)
597 {
598 //read message header
599 do
600 {
601 m_socket->Read(m_signature + m_infill, 2 - m_infill);
602 if (m_socket->Error())
603 {
604 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
605 {
606 LogWorker(wxString::Format("Read error (%d): %s",m_socket->LastError(),GetSocketErrorMsg(m_socket->LastError())),wxLOG_Error);
607 m_socket->Close();
608 }
609 }
610 else
611 {
612 m_infill += m_socket->LastCount();
613 if (m_infill == 2)
614 {
615 unsigned char chunks = m_signature[1];
616 unsigned char type = m_signature[0];
617 if (type == 0xCE)
618 {
619 LogWorker("This server does not support test2 from GUI client",wxLOG_Error);
620 m_written = -1; //wxSOCKET_LOST will interpret this as failure
621 m_socket->Close();
622 }
623 else if (type == 0xBE || type == 0xDE)
624 {
625 m_size = chunks * (type == 0xBE ? 1 : 1024);
626 m_inbuf = new char[m_size];
627 m_outbuf = new char[m_size];
628 m_infill = 0;
629 m_outfill = 0;
630 m_written = 0;
631 LogWorker(wxString::Format("Message signature: len: %d, type: %s, size: %d (bytes)",chunks,type == 0xBE ? "b" : "kB",m_size));
632 break;
633 }
634 else
635 {
636 LogWorker(wxString::Format("Unknown test type %x",type));
637 m_socket->Close();
638 }
639 }
640 }
641 }
642 while(!m_socket->Error() && (2 - m_infill != 0));
643 }
644
645 if (m_inbuf == NULL)
646 return;
647 //read message data
648 do
649 {
650 if (m_size == m_infill)
651 {
652 m_signature[0] = m_signature[1] = 0x0;
653 delete [] m_inbuf;
654 m_inbuf = NULL;
655 m_infill = 0;
656 return;
657 }
658 m_socket->Read(m_inbuf + m_infill,m_size - m_infill);
659 if (m_socket->Error())
660 {
661 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
662 {
663 LogWorker(wxString::Format("Read error (%d): %s",
664 m_socket->LastError(),
665 GetSocketErrorMsg(m_socket->LastError())),
666 wxLOG_Error);
667
668 m_socket->Close();
669 }
670 }
671 else
672 {
673 memcpy(m_outbuf+m_outfill,m_inbuf+m_infill,m_socket->LastCount());
674 m_infill += m_socket->LastCount();
675 m_outfill += m_socket->LastCount();
676 DoWrite();
677 }
678 }
679 while(!m_socket->Error());
680 };
681
682 void EventWorker::OnSocketEvent(wxSocketEvent& pEvent)
683 {
684 switch(pEvent.GetSocketEvent())
685 {
686 case wxSOCKET_INPUT:
687 DoRead();
688 break;
689
690 case wxSOCKET_OUTPUT:
691 if ( m_outbuf )
692 DoWrite();
693 break;
694
695 case wxSOCKET_CONNECTION:
696 LogWorker("Unexpected wxSOCKET_CONNECTION in EventWorker", wxLOG_Error);
697 break;
698
699 case wxSOCKET_LOST:
700 {
701 LogWorker("Connection lost");
702 WorkerEvent e(this);
703 e.m_workerFailed = m_written != m_size;
704 wxGetApp().AddPendingEvent(e);
705 }
706 break;
707 }
708 }
709
710 void EventWorker::DoWrite()
711 {
712 do
713 {
714 if (m_written == m_size)
715 {
716 delete [] m_outbuf;
717 m_outbuf = NULL;
718 m_outfill = 0;
719 LogWorker( "All data written");
720 return;
721 }
722 if (m_outfill - m_written == 0)
723 {
724 return;
725 }
726 m_socket->Write(m_outbuf + m_written,m_outfill - m_written);
727 if (m_socket->Error())
728 {
729 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
730 {
731 LogWorker(
732 wxString::Format("Write error (%d): %s",
733 m_socket->LastError(),
734 GetSocketErrorMsg(m_socket->LastError())
735 )
736 ,wxLOG_Error
737 );
738 m_socket->Close();
739 }
740 else
741 {
742 LogWorker("Write would block, waiting for OUTPUT event");
743 }
744 }
745 else
746 {
747 memmove(m_outbuf,m_outbuf+m_socket->LastCount(),m_outfill-m_socket->LastCount());
748 m_written += m_socket->LastCount();
749 }
750 LogWorker(wxString::Format("Written %d of %d bytes, todo %d",
751 m_socket->LastCount(),m_size,m_size - m_written));
752 }
753 while (!m_socket->Error());
754 }
755
756 BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
757 EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
758 END_EVENT_TABLE()