correcting text alignment flag
[wxWidgets.git] / samples / sockets / baseserver.cpp
0 / 765 (  0%)
CommitLineData
1/////////////////////////////////////////////////////////////////////////////
2// Name: samples/sockbase/client.cpp
3// Purpose: Sockets sample for wxBase
4// Author: Lukasz Michalski
5// Modified by:
6// Created: 27.06.2005
7// RCS-ID: $Id$
8// Copyright: (c) 2005 Lukasz Michalski <lmichalski@user.sourceforge.net>
9// Licence: wxWindows licence
10/////////////////////////////////////////////////////////////////////////////
11
12// ============================================================================
13// declarations
14// ============================================================================
15
16// ----------------------------------------------------------------------------
17// headers
18// ----------------------------------------------------------------------------
19
20#include "wx/wx.h"
21#include "wx/socket.h"
22#include "wx/event.h"
23#include "wx/list.h"
24#include "wx/cmdline.h"
25#include "wx/datetime.h"
26#include "wx/timer.h"
27#include "wx/thread.h"
28
29const char *GetSocketErrorMsg(int pSockError)
30{
31 switch(pSockError)
32 {
33 case wxSOCKET_NOERROR:
34 return "wxSOCKET_NOERROR";
35
36 case wxSOCKET_INVOP:
37 return "wxSOCKET_INVOP";
38
39 case wxSOCKET_IOERR:
40 return "wxSOCKET_IOERR";
41
42 case wxSOCKET_INVADDR:
43 return "wxSOCKET_INVADDR";
44
45 case wxSOCKET_NOHOST:
46 return "wxSOCKET_NOHOST";
47
48 case wxSOCKET_INVPORT:
49 return "wxSOCKET_INVPORT";
50
51 case wxSOCKET_WOULDBLOCK:
52 return "wxSOCKET_WOULDBLOCK";
53
54 case wxSOCKET_TIMEDOUT:
55 return "wxSOCKET_TIMEDOUT";
56
57 case wxSOCKET_MEMERR:
58 return "wxSOCKET_MEMERR";
59
60 default:
61 return "Unknown";
62 }
63}
64
65//event sent by workers to server class
66//after client is served
67const wxEventType wxEVT_WORKER = wxNewEventType();
68#define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
69
70class WorkerEvent : public wxEvent
71{
72public:
73 WorkerEvent(void* pSender)
74 {
75 SetId(-1);
76 SetEventType(wxEVT_WORKER);
77 m_sender = pSender;
78 m_exit = false;
79 m_workerFailed = false;
80 }
81
82 virtual wxEvent* Clone() const
83 {
84 return new WorkerEvent(*this);
85 }
86
87 void* m_sender;
88 bool m_exit;
89 bool m_workerFailed;
90};
91
92typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);
93
94class ThreadWorker;
95class EventWorker;
96
97WX_DECLARE_LIST(ThreadWorker, TList);
98WX_DECLARE_LIST(EventWorker, EList);
99
100//main server class contains listening socket
101//and list of two type worker classes that serve clients
102class Server : public wxApp
103{
104 DECLARE_EVENT_TABLE()
105public:
106 Server() : m_maxConnections(-1) {}
107 ~Server() {}
108private:
109 enum WorkMode
110 {
111 MIXED,
112 THREADS,
113 EVENTS
114 };
115
116 virtual bool OnInit();
117 virtual int OnExit();
118
119 void OnInitCmdLine(wxCmdLineParser& pParser);
120 bool OnCmdLineParsed(wxCmdLineParser& pParser);
121
122 void OnSocketEvent(wxSocketEvent& pEvent);
123 void OnWorkerEvent(WorkerEvent& pEvent);
124 void OnTimerEvent(wxTimerEvent& pEvent);
125 void DumpStatistics();
126
127 TList m_threadWorkers;
128 EList m_eventWorkers;
129 WorkMode m_workMode;
130 wxSocketServer* m_listeningSocket;
131
132 // statistics
133 unsigned m_threadWorkersCreated;
134 unsigned m_threadWorkersDone;
135 unsigned m_threadWorkersFailed;
136 unsigned m_maxThreadWorkers;
137
138 unsigned m_eventWorkersCreated;
139 unsigned m_eventWorkersDone;
140 unsigned m_eventWorkersFailed;
141 unsigned m_maxEventWorkers;
142
143 long int m_maxConnections;
144
145 unsigned short m_port;
146
147 wxTimer mTimer;
148};
149
150DECLARE_APP(Server);
151
152// just some common things shared between ThreadWorker and EventWorker
153class WorkerBase
154{
155protected:
156 // outputs log message with IP and TCP port number prepended
157 void LogWorker(const wxString& msg, wxLogLevel level = wxLOG_Info)
158 {
159 wxLogGeneric(level,
160 "%s:%d %s", m_peer.IPAddress(), m_peer.Service(), msg);
161 }
162
163 wxIPV4address m_peer;
164};
165
166//thread based worker reads signature and all data first from connected client
167//and resends data to client after reading
168class ThreadWorker : public wxThread, private WorkerBase
169{
170public:
171 ThreadWorker(wxSocketBase* pSocket);
172 virtual ExitCode Entry();
173
174private:
175 wxSocketBase* m_socket;
176};
177
178//event based worker reads signature and creates buffer for incoming data.
179//When part of data arrives this worker resends it as soon as possible.
180class EventWorker : public wxEvtHandler, private WorkerBase
181{
182public:
183 EventWorker(wxSocketBase* pSock);
184 virtual ~EventWorker();
185
186private:
187 wxSocketBase* m_socket;
188
189 unsigned char m_signature[2];
190 char* m_inbuf;
191 int m_infill;
192 int m_size;
193 char* m_outbuf;
194 int m_outfill;
195 int m_written;
196
197 void OnSocketEvent(wxSocketEvent& pEvent);
198 void DoWrite();
199 void DoRead();
200
201 DECLARE_EVENT_TABLE()
202};
203
204/******************* Implementation ******************/
205IMPLEMENT_APP_CONSOLE(Server)
206
207#include <wx/listimpl.cpp>
208WX_DEFINE_LIST(TList);
209WX_DEFINE_LIST(EList);
210
211
212void
213Server::OnInitCmdLine(wxCmdLineParser& pParser)
214{
215 wxApp::OnInitCmdLine(pParser);
216 pParser.AddSwitch("t","threads","Use thread based workers only");
217 pParser.AddSwitch("e","events","Use event based workers only");
218 pParser.AddOption("m","max","Exit after <n> connections",
219 wxCMD_LINE_VAL_NUMBER);
220 pParser.AddOption("p","port","listen on given port (default 3000)",
221 wxCMD_LINE_VAL_NUMBER);
222}
223
224void
225Server::DumpStatistics()
226{
227 wxString mode;
228 switch(m_workMode)
229 {
230 case EVENTS:
231 mode = "Event based workers";
232 break;
233 case THREADS:
234 mode = "Thread based workers";
235 break;
236 case MIXED:
237 mode = "Event and thread based workers";
238 break;
239 }
240 wxLogMessage("Server mode: %s",mode);
241 wxLogMessage("\t\t\t\tThreads\tEvents\tTotal");
242 wxLogMessage("Workers created:\t\t%d\t%d\t%d",
243 m_threadWorkersCreated,
244 m_eventWorkersCreated,
245 m_threadWorkersCreated + m_eventWorkersCreated);
246 wxLogMessage("Max concurrent workers:\t%d\t%d\t%d",
247 m_maxThreadWorkers,
248 m_maxEventWorkers,
249 m_maxThreadWorkers + m_maxEventWorkers);
250 wxLogMessage("Workers failed:\t\t%d\t%d\t%d",
251 m_threadWorkersFailed,
252 m_eventWorkersFailed,
253 m_threadWorkersFailed + m_eventWorkersFailed);
254 wxLogMessage("Workers done:\t\t%d\t%d\t%d",
255 m_threadWorkersDone,
256 m_eventWorkersDone,
257 m_threadWorkersDone + m_eventWorkersDone);
258
259 if ((int)(m_threadWorkersDone+m_eventWorkersDone) == m_maxConnections)
260 {
261 wxLogMessage("%ld connection(s) served, exiting",m_maxConnections);
262 ExitMainLoop();
263 }
264}
265
266
267bool
268Server::OnCmdLineParsed(wxCmdLineParser& pParser)
269{
270 if (pParser.Found("verbose"))
271 {
272 wxLog::AddTraceMask("wxSocket");
273 wxLog::AddTraceMask("epolldispatcher");
274 wxLog::AddTraceMask("selectdispatcher");
275 wxLog::AddTraceMask("thread");
276 wxLog::AddTraceMask("events");
277 wxLog::AddTraceMask("timer");
278 }
279
280 if (pParser.Found("m",&m_maxConnections))
281 {
282 wxLogMessage("%ld connection(s) to exit",m_maxConnections);
283 }
284
285 long port;
286 if (pParser.Found("p", &port))
287 {
288 if ( port <= 0 || port > USHRT_MAX )
289 {
290 wxLogError("Invalid port number %ld, must be in 0..%u range.",
291 port, USHRT_MAX);
292 return false;
293 }
294
295 m_port = static_cast<unsigned short>(port);
296 wxLogMessage("Will listen on port %u", m_port);
297 }
298
299 if (pParser.Found("t"))
300 m_workMode = THREADS;
301 else if (pParser.Found("e"))
302 m_workMode = EVENTS;
303 else
304 m_workMode = MIXED;
305
306 return wxApp::OnCmdLineParsed(pParser);
307}
308
309bool Server::OnInit()
310{
311 wxLog* logger = new wxLogStderr();
312 wxLog::SetActiveTarget(logger);
313
314 m_port = 3000;
315
316 //send interesting things to console
317 if (!wxApp::OnInit())
318 return false;
319
320 //setup listening socket
321 wxIPV4address la;
322 la.Service(m_port);
323 m_listeningSocket = new wxSocketServer(la,wxSOCKET_NOWAIT|wxSOCKET_REUSEADDR);
324 m_listeningSocket->SetEventHandler(*this);
325 m_listeningSocket->SetNotify(wxSOCKET_CONNECTION_FLAG);
326 m_listeningSocket->Notify(true);
327 if (!m_listeningSocket->IsOk())
328 {
329 wxLogError("Cannot bind listening socket");
330 return false;
331 }
332
333 m_threadWorkersCreated = 0;
334 m_threadWorkersDone = 0;
335 m_threadWorkersFailed = 0;
336 m_maxThreadWorkers = 0;
337
338 m_eventWorkersCreated = 0;
339 m_eventWorkersDone = 0;
340 m_eventWorkersFailed = 0;
341 m_maxEventWorkers = 0;
342
343 wxLogMessage("Server listening at port %u, waiting for connections", m_port);
344 return true;
345}
346
347
348int Server::OnExit()
349{
350 for ( TList::compatibility_iterator it = m_threadWorkers.GetFirst();
351 it;
352 it = it->GetNext() )
353 {
354 it->GetData()->Wait();
355 delete it->GetData();
356 }
357
358 for ( EList::compatibility_iterator it2 = m_eventWorkers.GetFirst();
359 it2;
360 it2->GetNext() )
361 {
362 delete it2->GetData();
363 }
364
365 m_threadWorkers.Clear();
366 m_eventWorkers.Clear();
367 m_listeningSocket->Destroy();
368 return 0;
369}
370
371void Server::OnSocketEvent(wxSocketEvent& pEvent)
372{
373 switch(pEvent.GetSocketEvent())
374 {
375 case wxSOCKET_INPUT:
376 wxLogError("Unexpected wxSOCKET_INPUT in wxSocketServer");
377 break;
378 case wxSOCKET_OUTPUT:
379 wxLogError("Unexpected wxSOCKET_OUTPUT in wxSocketServer");
380 break;
381 case wxSOCKET_CONNECTION:
382 {
383 wxSocketBase* sock = m_listeningSocket->Accept();
384 wxIPV4address addr;
385 if (!sock->GetPeer(addr))
386 {
387 wxLogError("Server: cannot get peer info");
388 } else {
389 wxLogMessage("Got connection from %s:%d",addr.IPAddress().c_str(), addr.Service());
390 }
391 bool createThread;
392
393 if (m_workMode != MIXED)
394 createThread = m_workMode == THREADS;
395 else
396 createThread = (wxDateTime::Now().GetSecond())%2 == 0;
397
398 if (createThread)
399 {
400 ThreadWorker* c = new ThreadWorker(sock);
401 if (c->Create() == wxTHREAD_NO_ERROR)
402 {
403 m_threadWorkers.Append(c);
404 if (m_threadWorkers.GetCount() > m_maxThreadWorkers)
405 m_maxThreadWorkers++;
406 m_threadWorkersCreated++;
407 c->Run();
408 }
409 else
410 {
411 wxLogError("Server: cannot create next thread (current threads: %d", m_threadWorkers.size());
412 };
413 }
414 else
415 {
416 EventWorker* w = new EventWorker(sock);
417 m_eventWorkers.Append(w);
418 if (m_eventWorkers.GetCount() > m_maxEventWorkers)
419 m_maxEventWorkers++;
420 m_eventWorkersCreated++;
421 }
422 }
423 break;
424 case wxSOCKET_LOST:
425 wxLogError("Unexpected wxSOCKET_LOST in wxSocketServer");
426 break;
427 }
428}
429
430void Server::OnWorkerEvent(WorkerEvent& pEvent)
431{
432 //wxLogMessage("Got worker event");
433 for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext())
434 {
435 if (it->GetData() == pEvent.m_sender)
436 {
437 wxLogVerbose("Deleting thread worker (%lu left)",
438 static_cast<unsigned long>( m_threadWorkers.GetCount() ));
439 it->GetData()->Wait();
440 delete it->GetData();
441 m_threadWorkers.DeleteNode(it);
442 if (!pEvent.m_workerFailed)
443 m_threadWorkersDone++;
444 else
445 m_threadWorkersFailed++;
446 break;
447 }
448 }
449 for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
450 {
451 if (it2->GetData() == pEvent.m_sender)
452 {
453 wxLogVerbose("Deleting event worker (%lu left)",
454 static_cast<unsigned long>( m_eventWorkers.GetCount() ));
455 delete it2->GetData();
456 m_eventWorkers.DeleteNode(it2);
457 if (!pEvent.m_workerFailed)
458 m_eventWorkersDone++;
459 else
460 m_eventWorkersFailed++;
461 break;
462 }
463 }
464
465 if (m_eventWorkers.GetCount() == 0 && m_threadWorkers.GetCount() == 0)
466 {
467 mTimer.Start(1000,true);
468 }
469}
470
471void Server::OnTimerEvent(wxTimerEvent&)
472{
473 DumpStatistics();
474}
475
476
477BEGIN_EVENT_TABLE(Server,wxEvtHandler)
478 EVT_SOCKET(wxID_ANY,Server::OnSocketEvent)
479 EVT_WORKER(Server::OnWorkerEvent)
480 EVT_TIMER(wxID_ANY,Server::OnTimerEvent)
481END_EVENT_TABLE()
482
483
484ThreadWorker::ThreadWorker(wxSocketBase* pSocket) : wxThread(wxTHREAD_JOINABLE)
485{
486 m_socket = pSocket;
487 //Notify() cannot be called in thread context. We have to detach from main loop
488 //before switching thread contexts.
489 m_socket->Notify(false);
490 m_socket->SetFlags(wxSOCKET_WAITALL|wxSOCKET_BLOCK);
491 pSocket->GetPeer(m_peer);
492}
493
494wxThread::ExitCode ThreadWorker::Entry()
495{
496 WorkerEvent e(this);
497 if (!m_socket->IsConnected())
498 {
499 LogWorker("ThreadWorker: not connected",wxLOG_Error);
500 return 0;
501 }
502 int to_process = -1;
503 if (m_socket->IsConnected())
504 {
505 unsigned char signature[2];
506 LogWorker("ThreadWorker: reading for data");
507 to_process = 2;
508 do
509 {
510 m_socket->Read(&signature,to_process);
511 if (m_socket->Error())
512 {
513 LogWorker("ThreadWorker: Read error",wxLOG_Error);
514 wxGetApp().AddPendingEvent(e);
515 return 0;
516 }
517 to_process -= m_socket->LastCount();
518 LogWorker(wxString::Format("to_process: %d",to_process));
519
520 }
521 while (!m_socket->Error() && to_process != 0);
522
523 if (signature[0] == 0)
524 {
525 e.m_exit = true;
526 return 0;
527 }
528
529 if (signature[0] == 0xCE)
530 {
531 LogWorker("This server does not support test2 from GUI client",wxLOG_Error);
532 e.m_workerFailed = true;
533 e.m_exit = true;
534 return 0;
535 }
536 int size = signature[1] * (signature[0] == 0xBE ? 1 : 1024);
537 char* buf = new char[size];
538 LogWorker(wxString::Format("Message signature: chunks: %d, kilobytes: %d, size: %d (bytes)",signature[0],signature[1],size));
539
540 to_process = size;
541 LogWorker(wxString::Format("ThreadWorker: reading %d bytes of data",to_process));
542
543 do
544 {
545 m_socket->Read(buf,to_process);
546 if (m_socket->Error())
547 {
548 LogWorker("ThreadWorker: Read error",wxLOG_Error);
549 wxGetApp().AddPendingEvent(e);
550 return 0;
551 }
552 to_process -= m_socket->LastCount();
553 LogWorker(wxString::Format("ThreadWorker: %d bytes readed, %d todo",m_socket->LastCount(),to_process));
554
555 }
556 while(!m_socket->Error() && to_process != 0);
557
558 to_process = size;
559
560 do
561 {
562 m_socket->Write(buf,to_process);
563 if (m_socket->Error())
564 {
565 LogWorker("ThreadWorker: Write error",wxLOG_Error);
566 break;
567 }
568 to_process -= m_socket->LastCount();
569 LogWorker(wxString::Format("ThreadWorker: %d bytes written, %d todo",m_socket->LastCount(),to_process));
570 }
571 while(!m_socket->Error() && to_process != 0);
572 }
573
574 LogWorker("ThreadWorker: done");
575 e.m_workerFailed = to_process != 0;
576 m_socket->Destroy();
577 wxGetApp().AddPendingEvent(e);
578 return 0;
579}
580
581EventWorker::EventWorker(wxSocketBase* pSock)
582 : m_socket(pSock),
583 m_inbuf(NULL),
584 m_infill(0),
585 m_outbuf(NULL),
586 m_outfill(0)
587{
588 m_socket->SetNotify(wxSOCKET_LOST_FLAG|wxSOCKET_INPUT_FLAG|wxSOCKET_OUTPUT_FLAG);
589 m_socket->Notify(true);
590 m_socket->SetEventHandler(*this);
591 m_socket->SetFlags(wxSOCKET_NOWAIT);
592 m_socket->GetPeer(m_peer);
593}
594
595EventWorker::~EventWorker()
596{
597 m_socket->Destroy();
598 delete [] m_inbuf;
599 delete [] m_outbuf;
600}
601
602void
603EventWorker::DoRead()
604{
605 if (m_inbuf == NULL)
606 {
607 //read message header
608 do
609 {
610 m_socket->Read(m_signature + m_infill, 2 - m_infill);
611 if (m_socket->Error())
612 {
613 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
614 {
615 LogWorker(wxString::Format("Read error (%d): %s",m_socket->LastError(),GetSocketErrorMsg(m_socket->LastError())),wxLOG_Error);
616 m_socket->Close();
617 }
618 }
619 else
620 {
621 m_infill += m_socket->LastCount();
622 if (m_infill == 2)
623 {
624 unsigned char chunks = m_signature[1];
625 unsigned char type = m_signature[0];
626 if (type == 0xCE)
627 {
628 LogWorker("This server does not support test2 from GUI client",wxLOG_Error);
629 m_written = -1; //wxSOCKET_LOST will interpret this as failure
630 m_socket->Close();
631 }
632 else if (type == 0xBE || type == 0xDE)
633 {
634 m_size = chunks * (type == 0xBE ? 1 : 1024);
635 m_inbuf = new char[m_size];
636 m_outbuf = new char[m_size];
637 m_infill = 0;
638 m_outfill = 0;
639 m_written = 0;
640 LogWorker(wxString::Format("Message signature: len: %d, type: %s, size: %d (bytes)",chunks,type == 0xBE ? "b" : "kB",m_size));
641 break;
642 }
643 else
644 {
645 LogWorker(wxString::Format("Unknown test type %x",type));
646 m_socket->Close();
647 }
648 }
649 }
650 }
651 while(!m_socket->Error() && (2 - m_infill != 0));
652 }
653
654 if (m_inbuf == NULL)
655 return;
656 //read message data
657 do
658 {
659 if (m_size == m_infill)
660 {
661 m_signature[0] = m_signature[1] = 0x0;
662 wxDELETEA(m_inbuf);
663 m_infill = 0;
664 return;
665 }
666 m_socket->Read(m_inbuf + m_infill,m_size - m_infill);
667 if (m_socket->Error())
668 {
669 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
670 {
671 LogWorker(wxString::Format("Read error (%d): %s",
672 m_socket->LastError(),
673 GetSocketErrorMsg(m_socket->LastError())),
674 wxLOG_Error);
675
676 m_socket->Close();
677 }
678 }
679 else
680 {
681 memcpy(m_outbuf+m_outfill,m_inbuf+m_infill,m_socket->LastCount());
682 m_infill += m_socket->LastCount();
683 m_outfill += m_socket->LastCount();
684 DoWrite();
685 }
686 }
687 while(!m_socket->Error());
688};
689
690void EventWorker::OnSocketEvent(wxSocketEvent& pEvent)
691{
692 switch(pEvent.GetSocketEvent())
693 {
694 case wxSOCKET_INPUT:
695 DoRead();
696 break;
697
698 case wxSOCKET_OUTPUT:
699 if ( m_outbuf )
700 DoWrite();
701 break;
702
703 case wxSOCKET_CONNECTION:
704 LogWorker("Unexpected wxSOCKET_CONNECTION in EventWorker", wxLOG_Error);
705 break;
706
707 case wxSOCKET_LOST:
708 {
709 LogWorker("Connection lost");
710 WorkerEvent e(this);
711 e.m_workerFailed = m_written != m_size;
712 wxGetApp().AddPendingEvent(e);
713 }
714 break;
715 }
716}
717
718void EventWorker::DoWrite()
719{
720 do
721 {
722 if (m_written == m_size)
723 {
724 wxDELETEA(m_outbuf);
725 m_outfill = 0;
726 LogWorker( "All data written");
727 return;
728 }
729 if (m_outfill - m_written == 0)
730 {
731 return;
732 }
733 m_socket->Write(m_outbuf + m_written,m_outfill - m_written);
734 if (m_socket->Error())
735 {
736 if (m_socket->LastError() != wxSOCKET_WOULDBLOCK)
737 {
738 LogWorker(
739 wxString::Format("Write error (%d): %s",
740 m_socket->LastError(),
741 GetSocketErrorMsg(m_socket->LastError())
742 )
743 ,wxLOG_Error
744 );
745 m_socket->Close();
746 }
747 else
748 {
749 LogWorker("Write would block, waiting for OUTPUT event");
750 }
751 }
752 else
753 {
754 memmove(m_outbuf,m_outbuf+m_socket->LastCount(),m_outfill-m_socket->LastCount());
755 m_written += m_socket->LastCount();
756 }
757 LogWorker(wxString::Format("Written %d of %d bytes, todo %d",
758 m_socket->LastCount(),m_size,m_size - m_written));
759 }
760 while (!m_socket->Error());
761}
762
763BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
764 EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
765END_EVENT_TABLE()