]> git.saurik.com Git - wxWidgets.git/blob - samples/sockets/baseclient.cpp
Better handling for asserts in non-main threads.
[wxWidgets.git] / samples / sockets / baseclient.cpp
1 /////////////////////////////////////////////////////////////////////////////
2 // Name: samples/sockbase/client.cpp
3 // Purpose: Sockets sample for wxBase
4 // Author: Lukasz Michalski
5 // Modified by:
6 // Created: 27.06.2005
7 // RCS-ID: $Id$
8 // Copyright: (c) 2005 Lukasz Michalski <lmichalski@sf.net>
9 // Licence: wxWindows license
10 /////////////////////////////////////////////////////////////////////////////
11
12 // ============================================================================
13 // declarations
14 // ============================================================================
15
16 // ----------------------------------------------------------------------------
17 // headers
18 // ----------------------------------------------------------------------------
19
20 #include "wx/wx.h"
21 #include "wx/socket.h"
22 #include "wx/event.h"
23 #include "wx/list.h"
24 #include "wx/cmdline.h"
25 #include "wx/ffile.h"
26 #include "wx/datetime.h"
27 #include "wx/timer.h"
28 #include "wx/thread.h"
29
30 const wxEventType wxEVT_WORKER = wxNewEventType();
31 #define EVT_WORKER(func) DECLARE_EVENT_TABLE_ENTRY( wxEVT_WORKER, -1, -1, (wxObjectEventFunction) (wxEventFunction) (WorkerEventFunction) & func, (wxObject *) NULL ),
32
33 const int timeout_val = 1000;
34
35 class WorkerEvent : public wxEvent {
36 public:
37 typedef enum {
38 CONNECTING,
39 SENDING,
40 RECEIVING,
41 DISCONNECTING,
42 DONE
43 } evt_type;
44 WorkerEvent(void* pSender, evt_type type)
45 {
46 SetId(-1);
47 SetEventType(wxEVT_WORKER);
48 m_sender = pSender;
49 m_eventType = type;
50 m_isFailed = false;
51 }
52
53 void setFailed() { m_isFailed = true; }
54 bool isFailed() const { return m_isFailed; }
55
56 virtual wxEvent* Clone() const
57 {
58 return new WorkerEvent(*this);
59 }
60 void* m_sender;
61 bool m_isFailed;
62 wxString m_workerIdent;
63 evt_type m_eventType;
64 };
65
66 typedef void (wxEvtHandler::*WorkerEventFunction)(WorkerEvent&);
67
68 class ThreadWorker;
69 class EventWorker;
70
71 WX_DECLARE_LIST(ThreadWorker, TList);
72 WX_DECLARE_LIST(EventWorker, EList);
73
74 class Client : public wxApp {
75 DECLARE_EVENT_TABLE()
76 public:
77 void RemoveEventWorker(EventWorker* p_worker);
78 private:
79 typedef enum
80 {
81 THREADS,
82 EVENTS
83 } workMode;
84
85 typedef enum
86 {
87 SEND_RANDOM,
88 SEND_MESSAGE,
89 STRESS_TEST
90 } sendType;
91
92 workMode m_workMode;
93 sendType m_sendType;
94 wxString m_message;
95 wxString m_host;
96 long m_stressWorkers;
97
98 virtual bool OnInit();
99 virtual int OnRun();
100 virtual int OnExit();
101 void OnInitCmdLine(wxCmdLineParser& pParser);
102 bool OnCmdLineParsed(wxCmdLineParser& pParser);
103 void OnWorkerEvent(WorkerEvent& pEvent);
104 void OnTimerEvent(wxTimerEvent& pEvent);
105
106 void StartWorker(workMode pMode, const wxString& pMessage);
107 void StartWorker(workMode pMode);
108 char* CreateBuffer(int *msgsize);
109
110 void dumpStatistics();
111
112 TList m_threadWorkers;
113 EList m_eventWorkers;
114
115 unsigned m_statConnecting;
116 unsigned m_statSending;
117 unsigned m_statReceiving;
118 unsigned m_statDisconnecting;
119 unsigned m_statDone;
120 unsigned m_statFailed;
121
122 wxTimer mTimer;
123 };
124
125 DECLARE_APP(Client);
126
127 class ThreadWorker : public wxThread
128 {
129 public:
130 ThreadWorker(const wxString& p_host, char* p_buf, int p_size);
131 virtual ExitCode Entry();
132 private:
133 wxString m_host;
134 wxSocketClient* m_clientSocket;
135 char* m_inbuf;
136 char* m_outbuf;
137 int m_outsize;
138 int m_insize;
139 wxString m_workerIdent;
140 };
141
142 class EventWorker : public wxEvtHandler
143 {
144 DECLARE_EVENT_TABLE()
145 public:
146 EventWorker(const wxString& p_host, char* p_buf, int p_size);
147 void Run();
148 virtual ~EventWorker();
149 private:
150 wxString m_host;
151 wxSocketClient* m_clientSocket;
152 char* m_inbuf;
153 char* m_outbuf;
154 int m_outsize;
155 int m_written;
156 int m_insize;
157 int m_readed;
158
159 WorkerEvent::evt_type m_currentType;
160 bool m_doneSent;
161 wxIPV4address m_localaddr;
162
163 void OnSocketEvent(wxSocketEvent& pEvent);
164 void SendEvent(bool failed);
165 };
166
167 /******************* Implementation ******************/
168 IMPLEMENT_APP_CONSOLE(Client);
169
170 #include <wx/listimpl.cpp>
171 WX_DEFINE_LIST(TList);
172 WX_DEFINE_LIST(EList);
173
174 wxString
175 CreateIdent(const wxIPV4address& addr)
176 {
177 return wxString::Format(wxT("%s:%d"),addr.IPAddress().c_str(),addr.Service());
178 }
179
180 void
181 Client::OnInitCmdLine(wxCmdLineParser& pParser)
182 {
183 wxApp::OnInitCmdLine(pParser);
184 pParser.AddSwitch(wxT("e"),wxT("event"),_("Use event based worker (default)"),wxCMD_LINE_PARAM_OPTIONAL);
185 pParser.AddSwitch(wxT("t"),wxT("thread"),_("Use thread based worker"),wxCMD_LINE_PARAM_OPTIONAL);
186 pParser.AddSwitch(wxT("r"),wxT("random"),_("Send radnom data (default)"),wxCMD_LINE_PARAM_OPTIONAL);
187 pParser.AddOption(wxT("m"),wxT("message"),_("Send message from <str>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
188 pParser.AddOption(wxT("f"),wxT("file"),_("Send contents of <file>"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
189 pParser.AddOption(wxT("H"),wxT("hostname"),_("IP or name of host to connect to"),wxCMD_LINE_VAL_STRING,wxCMD_LINE_PARAM_OPTIONAL);
190 pParser.AddOption(wxT("s"),wxT("stress"),_("stress test with <num> concurrent connections"),wxCMD_LINE_VAL_NUMBER,wxCMD_LINE_PARAM_OPTIONAL);
191 }
192
193
194 bool
195 Client::OnCmdLineParsed(wxCmdLineParser& pParser)
196 {
197 wxString fname;
198 m_workMode = EVENTS;
199 m_stressWorkers = 50;
200
201 if (pParser.Found(_("verbose")))
202 {
203 wxLog::AddTraceMask(wxT("wxSocket"));
204 wxLog::AddTraceMask(wxT("epolldispatcher"));
205 wxLog::AddTraceMask(wxT("selectdispatcher"));
206 wxLog::AddTraceMask(wxT("thread"));
207 wxLog::AddTraceMask(wxT("events"));
208 }
209
210 if (pParser.Found(wxT("t")))
211 m_workMode = THREADS;
212 m_sendType = SEND_RANDOM;
213
214 if (pParser.Found(wxT("m"),&m_message))
215 m_sendType = SEND_MESSAGE;
216 else if (pParser.Found(wxT("f"),&fname))
217 {
218 wxFFile file(fname);
219 if (!file.IsOpened()) {
220 wxLogError(wxT("Cannot open file %s"),fname.c_str());
221 return false;
222 };
223 if (!file.ReadAll(&m_message)) {
224 wxLogError(wxT("Cannot read conten of file %s"),fname.c_str());
225 return false;
226 };
227 m_sendType = SEND_MESSAGE;
228 };
229
230 if (pParser.Found(wxT("s"),&m_stressWorkers))
231 m_sendType = STRESS_TEST;
232
233 m_host = wxT("127.0.0.1");
234 pParser.Found(wxT("H"),&m_host);
235 return wxApp::OnCmdLineParsed(pParser);
236 };
237
238 bool
239 Client::OnInit()
240 {
241 if (!wxApp::OnInit())
242 return false;
243 srand(wxDateTime::Now().GetTicks());
244 mTimer.SetOwner(this);
245 m_statConnecting = 0;
246 m_statSending = 0;
247 m_statReceiving = 0;
248 m_statDisconnecting = 0;
249 m_statDone = 0;
250 m_statFailed = 0;
251 return true;
252 }
253
254 int
255 Client::OnRun()
256 {
257 int i;
258 switch(m_sendType)
259 {
260 case STRESS_TEST:
261 switch(m_workMode)
262 {
263 case THREADS:
264 for (i = 0; i < m_stressWorkers; i++) {
265 if (m_message.empty())
266 StartWorker(THREADS);
267 else
268 StartWorker(THREADS, m_message);
269 }
270 break;
271 case EVENTS:
272 for (i = 0; i < m_stressWorkers; i++) {
273 if (m_message.empty())
274 StartWorker(EVENTS);
275 else
276 StartWorker(EVENTS, m_message);
277 }
278 break;
279 default:
280 for (i = 0; i < m_stressWorkers; i++) {
281 if (m_message.empty())
282 StartWorker(i % 5 == 0 ? THREADS : EVENTS);
283 else
284 StartWorker(i % 5 == 0 ? THREADS : EVENTS, m_message);
285 }
286 break;
287 }
288 break;
289 case SEND_MESSAGE:
290 StartWorker(m_workMode,m_message);
291 break;
292 case SEND_RANDOM:
293 StartWorker(m_workMode);
294 break;
295 }
296 mTimer.Start(timeout_val,true);
297 return wxApp::OnRun();
298 }
299
300 int
301 Client::OnExit()
302 {
303 for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it->GetNext()) {
304 delete it->GetData();
305 }
306 return 0;
307 }
308
309 // Create buffer to be sent by client. Buffer contains test indicator
310 // message size and place for data
311 // msgsize parameter contains size of data in bytes and
312 // if input value does not fit into 250 bytes then
313 // on exit is updated to new value that is multiply of 1024 bytes
314 char*
315 Client::CreateBuffer(int* msgsize)
316 {
317 int bufsize = 0;
318 char* buf;
319 //if message should have more than 256 bytes then set it as
320 //test3 for compatibility with GUI server sample
321 if ((*msgsize) > 250)
322 {
323 //send at least one kb of data
324 int size = (*msgsize)/1024 + 1;
325 //returned buffer will contain test indicator, message size in kb and data
326 bufsize = size*1024+2;
327 buf = new char[bufsize];
328 buf[0] = (unsigned char)0xDE; //second byte contains size in kilobytes
329 buf[1] = (char)(size);
330 *msgsize = size*1024;
331 }
332 else
333 {
334 //returned buffer will contain test indicator, message size in kb and data
335 bufsize = (*msgsize)+2;
336 buf = new char[bufsize];
337 buf[0] = (unsigned char)0xBE; //second byte contains size in bytes
338 buf[1] = (char)(*msgsize);
339 }
340 return buf;
341 }
342
343 void
344 Client::StartWorker(workMode pMode) {
345 int msgsize = 1 + (int) (250000.0 * (rand() / (RAND_MAX + 1.0)));
346 char* buf = CreateBuffer(&msgsize);
347
348 //fill data part of buffer with random bytes
349 for (int i = 2; i < (msgsize); i++) {
350 buf[i] = i % 256;
351 }
352
353 if (pMode == THREADS) {
354 ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
355 if (c->Create() != wxTHREAD_NO_ERROR) {
356 wxLogError(wxT("Cannot create more threads"));
357 } else {
358 c->Run();
359 m_threadWorkers.Append(c);
360 }
361 } else {
362 EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
363 e->Run();
364 m_eventWorkers.Append(e);
365 }
366 m_statConnecting++;
367 }
368
369 void
370 Client::StartWorker(workMode pMode, const wxString& pMessage) {
371 char* tmpbuf = wxStrdup(pMessage.mb_str());
372 int msgsize = strlen(tmpbuf);
373 char* buf = CreateBuffer(&msgsize);
374 memset(buf+2,0x0,msgsize);
375 memcpy(buf+2,tmpbuf,msgsize);
376 free(tmpbuf);
377
378 if (pMode == THREADS) {
379 ThreadWorker* c = new ThreadWorker(m_host,buf,msgsize+2);
380 if (c->Create() != wxTHREAD_NO_ERROR) {
381 wxLogError(wxT("Cannot create more threads"));
382 } else {
383 c->Run();
384 m_threadWorkers.Append(c);
385 }
386 } else {
387 EventWorker* e = new EventWorker(m_host,buf,msgsize+2);
388 e->Run();
389 m_eventWorkers.Append(e);
390 }
391 m_statConnecting++;
392 }
393
394 void
395 Client::OnWorkerEvent(WorkerEvent& pEvent) {
396 switch (pEvent.m_eventType) {
397 case WorkerEvent::CONNECTING:
398 if (pEvent.isFailed())
399 {
400 m_statConnecting--;
401 m_statFailed++;
402 }
403 break;
404 case WorkerEvent::SENDING:
405 if (pEvent.isFailed())
406 {
407 m_statFailed++;
408 m_statSending--;
409 }
410 else
411 {
412 m_statConnecting--;
413 m_statSending++;
414 }
415 break;
416 case WorkerEvent::RECEIVING:
417 if (pEvent.isFailed())
418 {
419 m_statReceiving--;
420 m_statFailed++;
421 }
422 else
423 {
424 m_statSending--;
425 m_statReceiving++;
426 }
427 break;
428 case WorkerEvent::DISCONNECTING:
429 if (pEvent.isFailed())
430 {
431 m_statDisconnecting--;
432 m_statFailed++;
433 }
434 else
435 {
436 m_statReceiving--;
437 m_statDisconnecting++;
438 }
439 break;
440 case WorkerEvent::DONE:
441 m_statDone++;
442 m_statDisconnecting--;
443 break;
444 };
445
446 if (pEvent.isFailed() || pEvent.m_eventType == WorkerEvent::DONE)
447 {
448 for(TList::compatibility_iterator it = m_threadWorkers.GetFirst(); it ; it = it->GetNext()) {
449 if (it->GetData() == pEvent.m_sender) {
450 m_threadWorkers.DeleteNode(it);
451 break;
452 }
453 }
454 for(EList::compatibility_iterator it2 = m_eventWorkers.GetFirst(); it2 ; it2 = it2->GetNext())
455 {
456 if (it2->GetData() == pEvent.m_sender) {
457 delete it2->GetData();
458 m_eventWorkers.DeleteNode(it2);
459 break;
460 }
461 }
462 if ((m_threadWorkers.GetCount() == 0) && (m_eventWorkers.GetCount() == 0))
463 {
464 mTimer.Stop();
465 dumpStatistics();
466 wxSleep(2);
467 ExitMainLoop();
468 }
469 else
470 {
471 mTimer.Start(timeout_val,true);
472 }
473 }
474 }
475
476 void
477 Client::RemoveEventWorker(EventWorker* p_worker) {
478 for(EList::compatibility_iterator it = m_eventWorkers.GetFirst(); it ; it = it->GetNext()) {
479 if (it->GetData() == p_worker) {
480 //wxLogDebug(wxT("Deleting event worker"));
481 delete it->GetData();
482 m_eventWorkers.DeleteNode(it);
483 return;
484 }
485 }
486 }
487
488 void
489 Client::dumpStatistics() {
490 wxString msg(
491 wxString::Format(_("Connecting:\t%d\nSending\t\t%d\nReceiving\t%d\nDisconnecting:\t%d\nDone:\t\t%d\nFailed:\t\t%d\n"),
492 m_statConnecting,
493 m_statSending,
494 m_statReceiving,
495 m_statDisconnecting,
496 m_statDone,
497 m_statFailed
498 ));
499
500 wxLogMessage(wxT("Current status:\n%s\n"),msg.c_str());
501 }
502
503 void
504 Client::OnTimerEvent(wxTimerEvent&) {
505 dumpStatistics();
506 }
507
508 BEGIN_EVENT_TABLE(Client,wxEvtHandler)
509 EVT_WORKER(Client::OnWorkerEvent)
510 EVT_TIMER(wxID_ANY,Client::OnTimerEvent)
511 END_EVENT_TABLE()
512
513
514
515 EventWorker::EventWorker(const wxString& p_host, char* p_buf, int p_size)
516 : m_host(p_host),
517 m_outbuf(p_buf),
518 m_outsize(p_size),
519 m_written(0),
520 m_readed(0)
521 {
522 m_clientSocket = new wxSocketClient(wxSOCKET_NOWAIT);
523 m_clientSocket->SetEventHandler(*this);
524 m_insize = m_outsize - 2;
525 m_inbuf = new char[m_insize];
526 }
527
528 void
529 EventWorker::Run() {
530 wxIPV4address ca;
531 ca.Hostname(m_host);
532 ca.Service(3000);
533 m_clientSocket->SetNotify(wxSOCKET_CONNECTION_FLAG|wxSOCKET_LOST_FLAG|wxSOCKET_OUTPUT_FLAG|wxSOCKET_INPUT_FLAG);
534 m_clientSocket->Notify(true);
535 m_currentType = WorkerEvent::CONNECTING;
536 m_doneSent = false;
537 //wxLogMessage(wxT("EventWorker: Connecting....."));
538 m_clientSocket->Connect(ca,false);
539 }
540
541 void
542 EventWorker::OnSocketEvent(wxSocketEvent& pEvent) {
543 switch(pEvent.GetSocketEvent()) {
544 case wxSOCKET_INPUT:
545 //wxLogDebug(wxT("EventWorker: INPUT"));
546 do {
547 if (m_readed == m_insize)
548 return; //event already posted
549 m_clientSocket->Read(m_inbuf + m_readed, m_insize - m_readed);
550 if (m_clientSocket->Error())
551 {
552 if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK)
553 {
554 wxLogError(wxT("%s: read error"),CreateIdent(m_localaddr).c_str());
555 SendEvent(true);
556 }
557 }
558
559 m_readed += m_clientSocket->LastCount();
560 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(), m_insize - m_readed);
561 if (m_readed == m_insize)
562 {
563 if (!memcmp(m_inbuf,m_outbuf,m_insize)) {
564 wxLogError(wxT("%s: data mismatch"),CreateIdent(m_localaddr).c_str());
565 SendEvent(true);
566 }
567 m_currentType = WorkerEvent::DISCONNECTING;
568 wxLogDebug(wxT("%s: DISCONNECTING"),CreateIdent(m_localaddr).c_str());
569 SendEvent(false);
570
571 //wxLogDebug(wxT("EventWorker %p closing"),this);
572 m_clientSocket->Close();
573
574 m_currentType = WorkerEvent::DONE;
575 wxLogDebug(wxT("%s: DONE"),CreateIdent(m_localaddr).c_str());
576 SendEvent(false);
577 }
578 } while (!m_clientSocket->Error());
579 break;
580 case wxSOCKET_OUTPUT:
581 //wxLogDebug(wxT("EventWorker: OUTPUT"));
582 do {
583 if (m_written == m_outsize)
584 return;
585 if (m_written == 0)
586 {
587 m_currentType = WorkerEvent::SENDING;
588 wxLogDebug(wxT("%s: SENDING"),CreateIdent(m_localaddr).c_str());
589 }
590 m_clientSocket->Write(m_outbuf + m_written, m_outsize - m_written);
591 if (m_clientSocket->Error())
592 {
593 if (m_clientSocket->LastError() != wxSOCKET_WOULDBLOCK) {
594 wxLogError(wxT("%s: Write error"),CreateIdent(m_localaddr).c_str());
595 SendEvent(true);
596 }
597 }
598 m_written += m_clientSocket->LastCount();
599 if (m_written != m_outsize)
600 {
601 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),m_outsize - m_written);
602 }
603 else
604 {
605 //wxLogDebug(wxT("EventWorker %p SENDING->RECEIVING"),this);
606 m_currentType = WorkerEvent::RECEIVING;
607 wxLogDebug(wxT("%s: RECEIVING"),CreateIdent(m_localaddr).c_str());
608 SendEvent(false);
609 }
610 } while(!m_clientSocket->Error());
611 break;
612 case wxSOCKET_CONNECTION:
613 {
614 //wxLogMessage(wxT("EventWorker: got connection"));
615 wxLogMessage(wxT("%s: starting writing message (2 bytes for signature and %d bytes of data to write)"),CreateIdent(m_localaddr).c_str(),m_outsize-2);
616 if (!m_clientSocket->GetLocal(m_localaddr))
617 {
618 wxLogError(_("Cannot get peer data for socket %p"),m_clientSocket);
619 }
620 m_currentType = WorkerEvent::SENDING;
621 wxLogDebug(wxT("%s: CONNECTING"),CreateIdent(m_localaddr).c_str());
622 SendEvent(false);
623 }
624 break;
625 case wxSOCKET_LOST:
626 {
627 wxLogError(_("%s: connection lost"),CreateIdent(m_localaddr).c_str());
628 SendEvent(true);
629 }
630 break;
631 }
632 }
633
634 void
635 EventWorker::SendEvent(bool failed) {
636 if (m_doneSent)
637 return;
638 WorkerEvent e(this,m_currentType);
639 if (failed) e.setFailed();
640 wxGetApp().AddPendingEvent(e);
641 m_doneSent = failed || m_currentType == WorkerEvent::DONE;
642 };
643
644 EventWorker::~EventWorker() {
645 m_clientSocket->Destroy();
646 delete [] m_outbuf;
647 delete [] m_inbuf;
648 }
649
650 BEGIN_EVENT_TABLE(EventWorker,wxEvtHandler)
651 EVT_SOCKET(wxID_ANY,EventWorker::OnSocketEvent)
652 END_EVENT_TABLE()
653
654
655 ThreadWorker::ThreadWorker(const wxString& p_host, char* p_buf, int p_size)
656 : wxThread(wxTHREAD_DETACHED),
657 m_host(p_host),
658 m_outbuf(p_buf),
659 m_outsize(p_size)
660 {
661 m_clientSocket = new wxSocketClient(wxSOCKET_BLOCK|wxSOCKET_WAITALL);
662 m_insize = m_outsize - 2;
663 m_inbuf = new char[m_insize];
664 }
665
666 wxThread::ExitCode ThreadWorker::Entry()
667 {
668 wxIPV4address ca;
669 ca.Hostname(m_host);
670 ca.Service(5678);
671 //wxLogDebug(wxT("ThreadWorker: Connecting....."));
672 m_clientSocket->SetTimeout(60);
673 bool failed = false;
674 WorkerEvent::evt_type etype = WorkerEvent::CONNECTING;
675 if (!m_clientSocket->Connect(ca)) {
676 wxLogError(wxT("Cannot connect to %s:%d"),ca.IPAddress().c_str(), ca.Service());
677 failed = true;
678 } else {
679 //wxLogMessage(wxT("ThreadWorker: Connected. Sending %d bytes of data"),m_outsize);
680 etype = WorkerEvent::SENDING;
681 WorkerEvent e(this,etype);
682 wxGetApp().AddPendingEvent(e);
683 int to_process = m_outsize;
684 do {
685 m_clientSocket->Write(m_outbuf,m_outsize);
686 if (m_clientSocket->Error()) {
687 wxLogError(wxT("ThreadWorker: Write error"));
688 failed = true;
689 }
690 to_process -= m_clientSocket->LastCount();
691 //wxLogDebug(wxT("EventWorker: written %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
692 } while(!m_clientSocket->Error() && to_process != 0);
693
694 if (!failed) {
695 etype = WorkerEvent::RECEIVING;
696 WorkerEvent e(this,etype);
697 wxGetApp().AddPendingEvent(e);
698 to_process = m_insize;
699 do {
700 m_clientSocket->Read(m_inbuf,m_insize);
701 if (m_clientSocket->Error()) {
702 wxLogError(wxT("ThreadWorker: Read error"));
703 failed = true;
704 break;
705 }
706 to_process -= m_clientSocket->LastCount();
707 //wxLogDebug(wxT("EventWorker: readed %d bytes, %d bytes to do"),m_clientSocket->LastCount(),to_process);
708 } while(!m_clientSocket->Error() && to_process != 0);
709 }
710
711 char* outdat = (char*)m_outbuf+2;
712 if (!failed && (memcmp(m_inbuf,outdat,m_insize) != 0))
713 {
714 wxLogError(wxT("Data mismatch"));
715 failed = true;
716 }
717 }
718 //wxLogDebug(wxT("ThreadWorker: Finished"));
719 if (!failed) {
720 etype = WorkerEvent::DISCONNECTING;
721 WorkerEvent e(this,etype);
722 wxGetApp().AddPendingEvent(e);
723 };
724 m_clientSocket->Close();
725 m_clientSocket->Destroy();
726 m_clientSocket = NULL;
727 delete [] m_outbuf;
728 delete [] m_inbuf;
729 if (!failed)
730 etype = WorkerEvent::DONE;
731 WorkerEvent e(this,etype);
732 if (failed) e.setFailed();
733 wxGetApp().AddPendingEvent(e);
734 return 0;
735 }
736