]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
warning: unused parameter ‘foo’ [-Wunused-parameter]
[apt.git] / apt-pkg / acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
5
6 Acquire Worker
7
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
12 ##################################################################### */
13 /*}}}*/
14 // Include Files /*{{{*/
15 #include <config.h>
16
17 #include <apt-pkg/acquire-worker.h>
18 #include <apt-pkg/acquire-item.h>
19 #include <apt-pkg/configuration.h>
20 #include <apt-pkg/error.h>
21 #include <apt-pkg/fileutl.h>
22 #include <apt-pkg/strutl.h>
23
24 #include <iostream>
25 #include <sstream>
26 #include <fstream>
27
28 #include <sys/stat.h>
29 #include <unistd.h>
30 #include <fcntl.h>
31 #include <signal.h>
32 #include <stdio.h>
33 #include <errno.h>
34
35 #include <apti18n.h>
36 /*}}}*/
37
38 using namespace std;
39
40 // Worker::Worker - Constructor for Queue startup /*{{{*/
41 // ---------------------------------------------------------------------
42 /* */
43 pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
44 pkgAcquireStatus *Log) : Log(Log)
45 {
46 OwnerQ = Q;
47 Config = Cnf;
48 Access = Cnf->Access;
49 CurrentItem = 0;
50 TotalSize = 0;
51 CurrentSize = 0;
52
53 Construct();
54 }
55 /*}}}*/
56 // Worker::Worker - Constructor for method config startup /*{{{*/
57 // ---------------------------------------------------------------------
58 /* */
59 pkgAcquire::Worker::Worker(MethodConfig *Cnf)
60 {
61 OwnerQ = 0;
62 Config = Cnf;
63 Access = Cnf->Access;
64 CurrentItem = 0;
65 TotalSize = 0;
66 CurrentSize = 0;
67
68 Construct();
69 }
70 /*}}}*/
71 // Worker::Construct - Constructor helper /*{{{*/
72 // ---------------------------------------------------------------------
73 /* */
74 void pkgAcquire::Worker::Construct()
75 {
76 NextQueue = 0;
77 NextAcquire = 0;
78 Process = -1;
79 InFd = -1;
80 OutFd = -1;
81 OutReady = false;
82 InReady = false;
83 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
84 }
85 /*}}}*/
86 // Worker::~Worker - Destructor /*{{{*/
87 // ---------------------------------------------------------------------
88 /* */
89 pkgAcquire::Worker::~Worker()
90 {
91 close(InFd);
92 close(OutFd);
93
94 if (Process > 0)
95 {
96 /* Closing of stdin is the signal to exit and die when the process
97 indicates it needs cleanup */
98 if (Config->NeedsCleanup == false)
99 kill(Process,SIGINT);
100 ExecWait(Process,Access.c_str(),true);
101 }
102 }
103 /*}}}*/
104 // Worker::Start - Start the worker process /*{{{*/
105 // ---------------------------------------------------------------------
106 /* This forks the method and inits the communication channel */
107 bool pkgAcquire::Worker::Start()
108 {
109 // Get the method path
110 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
111 if (FileExists(Method) == false)
112 {
113 _error->Error(_("The method driver %s could not be found."),Method.c_str());
114 if (Access == "https")
115 _error->Notice(_("Is the package %s installed?"), "apt-transport-https");
116 return false;
117 }
118
119 if (Debug == true)
120 clog << "Starting method '" << Method << '\'' << endl;
121
122 // Create the pipes
123 int Pipes[4] = {-1,-1,-1,-1};
124 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
125 {
126 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
127 for (int I = 0; I != 4; I++)
128 close(Pipes[I]);
129 return false;
130 }
131 for (int I = 0; I != 4; I++)
132 SetCloseExec(Pipes[I],true);
133
134 // Fork off the process
135 Process = ExecFork();
136 if (Process == 0)
137 {
138 // Setup the FDs
139 dup2(Pipes[1],STDOUT_FILENO);
140 dup2(Pipes[2],STDIN_FILENO);
141 SetCloseExec(STDOUT_FILENO,false);
142 SetCloseExec(STDIN_FILENO,false);
143 SetCloseExec(STDERR_FILENO,false);
144
145 const char *Args[2];
146 Args[0] = Method.c_str();
147 Args[1] = 0;
148 execv(Args[0],(char **)Args);
149 cerr << "Failed to exec method " << Args[0] << endl;
150 _exit(100);
151 }
152
153 // Fix up our FDs
154 InFd = Pipes[0];
155 OutFd = Pipes[3];
156 SetNonBlock(Pipes[0],true);
157 SetNonBlock(Pipes[3],true);
158 close(Pipes[1]);
159 close(Pipes[2]);
160 OutReady = false;
161 InReady = true;
162
163 // Read the configuration data
164 if (WaitFd(InFd) == false ||
165 ReadMessages() == false)
166 return _error->Error(_("Method %s did not start correctly"),Method.c_str());
167
168 RunMessages();
169 if (OwnerQ != 0)
170 SendConfiguration();
171
172 return true;
173 }
174 /*}}}*/
175 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
176 // ---------------------------------------------------------------------
177 /* */
178 bool pkgAcquire::Worker::ReadMessages()
179 {
180 if (::ReadMessages(InFd,MessageQueue) == false)
181 return MethodFailure();
182 return true;
183 }
184 /*}}}*/
185 // Worker::RunMessage - Empty the message queue /*{{{*/
186 // ---------------------------------------------------------------------
187 /* This takes the messages from the message queue and runs them through
188 the parsers in order. */
189 bool pkgAcquire::Worker::RunMessages()
190 {
191 while (MessageQueue.empty() == false)
192 {
193 string Message = MessageQueue.front();
194 MessageQueue.erase(MessageQueue.begin());
195
196 if (Debug == true)
197 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
198
199 // Fetch the message number
200 char *End;
201 int Number = strtol(Message.c_str(),&End,10);
202 if (End == Message.c_str())
203 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
204
205 string URI = LookupTag(Message,"URI");
206 pkgAcquire::Queue::QItem *Itm = 0;
207 if (URI.empty() == false)
208 Itm = OwnerQ->FindItem(URI,this);
209
210 // update used mirror
211 string UsedMirror = LookupTag(Message,"UsedMirror", "");
212 if (!UsedMirror.empty() &&
213 Itm &&
214 Itm->Description.find(" ") != string::npos)
215 {
216 Itm->Description.replace(0, Itm->Description.find(" "), UsedMirror);
217 // FIXME: will we need this as well?
218 //Itm->ShortDesc = UsedMirror;
219 }
220
221 // Determine the message number and dispatch
222 switch (Number)
223 {
224 // 100 Capabilities
225 case 100:
226 if (Capabilities(Message) == false)
227 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
228 break;
229
230 // 101 Log
231 case 101:
232 if (Debug == true)
233 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
234 break;
235
236 // 102 Status
237 case 102:
238 Status = LookupTag(Message,"Message");
239 break;
240
241 // 103 Redirect
242 case 103:
243 {
244 if (Itm == 0)
245 {
246 _error->Error("Method gave invalid 103 Redirect message");
247 break;
248 }
249
250 string NewURI = LookupTag(Message,"New-URI",URI.c_str());
251 Itm->URI = NewURI;
252
253 ItemDone();
254
255 pkgAcquire::Item *Owner = Itm->Owner;
256 pkgAcquire::ItemDesc Desc = *Itm;
257
258 // Change the status so that it can be dequeued
259 Owner->Status = pkgAcquire::Item::StatIdle;
260 // Mark the item as done (taking care of all queues)
261 // and then put it in the main queue again
262 OwnerQ->ItemDone(Itm);
263 OwnerQ->Owner->Enqueue(Desc);
264
265 if (Log != 0)
266 Log->Done(Desc);
267 break;
268 }
269
270 // 200 URI Start
271 case 200:
272 {
273 if (Itm == 0)
274 {
275 _error->Error("Method gave invalid 200 URI Start message");
276 break;
277 }
278
279 CurrentItem = Itm;
280 CurrentSize = 0;
281 TotalSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
282 ResumePoint = strtoull(LookupTag(Message,"Resume-Point","0").c_str(), NULL, 10);
283 Itm->Owner->Start(Message,strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10));
284
285 // Display update before completion
286 if (Log != 0 && Log->MorePulses == true)
287 Log->Pulse(Itm->Owner->GetOwner());
288
289 if (Log != 0)
290 Log->Fetch(*Itm);
291
292 break;
293 }
294
295 // 201 URI Done
296 case 201:
297 {
298 if (Itm == 0)
299 {
300 _error->Error("Method gave invalid 201 URI Done message");
301 break;
302 }
303
304 pkgAcquire::Item *Owner = Itm->Owner;
305 pkgAcquire::ItemDesc Desc = *Itm;
306
307 // Display update before completion
308 if (Log != 0 && Log->MorePulses == true)
309 Log->Pulse(Owner->GetOwner());
310
311 OwnerQ->ItemDone(Itm);
312 unsigned long long const ServerSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
313 bool isHit = StringToBool(LookupTag(Message,"IMS-Hit"),false) ||
314 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false);
315 // Using the https method the server might return 200, but the
316 // If-Modified-Since condition is not satsified, libcurl will
317 // discard the download. In this case, however, TotalSize will be
318 // set to the actual size of the file, while ServerSize will be set
319 // to 0. Therefore, if the item is marked as a hit and the
320 // downloaded size (ServerSize) is 0, we ignore TotalSize.
321 if (TotalSize != 0 && (!isHit || ServerSize != 0) && ServerSize != TotalSize)
322 _error->Warning("Size of file %s is not what the server reported %s %llu",
323 Owner->DestFile.c_str(), LookupTag(Message,"Size","0").c_str(),TotalSize);
324
325 // see if there is a hash to verify
326 string RecivedHash;
327 HashString expectedHash(Owner->HashSum());
328 if(!expectedHash.empty())
329 {
330 string hashTag = expectedHash.HashType()+"-Hash";
331 string hashSum = LookupTag(Message, hashTag.c_str());
332 if(!hashSum.empty())
333 RecivedHash = expectedHash.HashType() + ":" + hashSum;
334 if(_config->FindB("Debug::pkgAcquire::Auth", false) == true)
335 {
336 clog << "201 URI Done: " << Owner->DescURI() << endl
337 << "RecivedHash: " << RecivedHash << endl
338 << "ExpectedHash: " << expectedHash.toStr()
339 << endl << endl;
340 }
341 }
342 Owner->Done(Message, ServerSize, RecivedHash.c_str(), Config);
343 ItemDone();
344
345 // Log that we are done
346 if (Log != 0)
347 {
348 if (isHit)
349 {
350 /* Hide 'hits' for local only sources - we also manage to
351 hide gets */
352 if (Config->LocalOnly == false)
353 Log->IMSHit(Desc);
354 }
355 else
356 Log->Done(Desc);
357 }
358 break;
359 }
360
361 // 400 URI Failure
362 case 400:
363 {
364 if (Itm == 0)
365 {
366 _error->Error("Method gave invalid 400 URI Failure message");
367 break;
368 }
369
370 // Display update before completion
371 if (Log != 0 && Log->MorePulses == true)
372 Log->Pulse(Itm->Owner->GetOwner());
373
374 pkgAcquire::Item *Owner = Itm->Owner;
375 pkgAcquire::ItemDesc Desc = *Itm;
376 OwnerQ->ItemDone(Itm);
377
378 // set some status
379 if(LookupTag(Message,"FailReason") == "Timeout" ||
380 LookupTag(Message,"FailReason") == "TmpResolveFailure" ||
381 LookupTag(Message,"FailReason") == "ResolveFailure" ||
382 LookupTag(Message,"FailReason") == "ConnectionRefused")
383 Owner->Status = pkgAcquire::Item::StatTransientNetworkError;
384
385 Owner->Failed(Message,Config);
386 ItemDone();
387
388 if (Log != 0)
389 Log->Fail(Desc);
390
391 break;
392 }
393
394 // 401 General Failure
395 case 401:
396 _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
397 break;
398
399 // 403 Media Change
400 case 403:
401 MediaChange(Message);
402 break;
403 }
404 }
405 return true;
406 }
407 /*}}}*/
408 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
409 // ---------------------------------------------------------------------
410 /* This parses the capabilities message and dumps it into the configuration
411 structure. */
412 bool pkgAcquire::Worker::Capabilities(string Message)
413 {
414 if (Config == 0)
415 return true;
416
417 Config->Version = LookupTag(Message,"Version");
418 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
419 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
420 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
421 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
422 Config->NeedsCleanup = StringToBool(LookupTag(Message,"Needs-Cleanup"),false);
423 Config->Removable = StringToBool(LookupTag(Message,"Removable"),false);
424
425 // Some debug text
426 if (Debug == true)
427 {
428 clog << "Configured access method " << Config->Access << endl;
429 clog << "Version:" << Config->Version <<
430 " SingleInstance:" << Config->SingleInstance <<
431 " Pipeline:" << Config->Pipeline <<
432 " SendConfig:" << Config->SendConfig <<
433 " LocalOnly: " << Config->LocalOnly <<
434 " NeedsCleanup: " << Config->NeedsCleanup <<
435 " Removable: " << Config->Removable << endl;
436 }
437
438 return true;
439 }
440 /*}}}*/
441 // Worker::MediaChange - Request a media change /*{{{*/
442 // ---------------------------------------------------------------------
443 /* */
444 bool pkgAcquire::Worker::MediaChange(string Message)
445 {
446 int status_fd = _config->FindI("APT::Status-Fd",-1);
447 if(status_fd > 0)
448 {
449 string Media = LookupTag(Message,"Media");
450 string Drive = LookupTag(Message,"Drive");
451 ostringstream msg,status;
452 ioprintf(msg,_("Please insert the disc labeled: "
453 "'%s' "
454 "in the drive '%s' and press enter."),
455 Media.c_str(),Drive.c_str());
456 status << "media-change: " // message
457 << Media << ":" // media
458 << Drive << ":" // drive
459 << msg.str() // l10n message
460 << endl;
461
462 std::string const dlstatus = status.str();
463 FileFd::Write(status_fd, dlstatus.c_str(), dlstatus.size());
464 }
465
466 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
467 LookupTag(Message,"Drive")) == false)
468 {
469 char S[300];
470 snprintf(S,sizeof(S),"603 Media Changed\nFailed: true\n\n");
471 if (Debug == true)
472 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
473 OutQueue += S;
474 OutReady = true;
475 return true;
476 }
477
478 char S[300];
479 snprintf(S,sizeof(S),"603 Media Changed\n\n");
480 if (Debug == true)
481 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
482 OutQueue += S;
483 OutReady = true;
484 return true;
485 }
486 /*}}}*/
487 // Worker::SendConfiguration - Send the config to the method /*{{{*/
488 // ---------------------------------------------------------------------
489 /* */
490 bool pkgAcquire::Worker::SendConfiguration()
491 {
492 if (Config->SendConfig == false)
493 return true;
494
495 if (OutFd == -1)
496 return false;
497
498 /* Write out all of the configuration directives by walking the
499 configuration tree */
500 std::ostringstream Message;
501 Message << "601 Configuration\n";
502 _config->Dump(Message, NULL, "Config-Item: %F=%V\n", false);
503 Message << '\n';
504
505 if (Debug == true)
506 clog << " -> " << Access << ':' << QuoteString(Message.str(),"\n") << endl;
507 OutQueue += Message.str();
508 OutReady = true;
509
510 return true;
511 }
512 /*}}}*/
513 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
514 // ---------------------------------------------------------------------
515 /* Send a URI Acquire message to the method */
516 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
517 {
518 if (OutFd == -1)
519 return false;
520
521 string Message = "600 URI Acquire\n";
522 Message.reserve(300);
523 Message += "URI: " + Item->URI;
524 Message += "\nFilename: " + Item->Owner->DestFile;
525 Message += Item->Owner->Custom600Headers();
526 Message += "\n\n";
527
528 if (Debug == true)
529 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
530 OutQueue += Message;
531 OutReady = true;
532
533 return true;
534 }
535 /*}}}*/
536 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
537 // ---------------------------------------------------------------------
538 /* */
539 bool pkgAcquire::Worker::OutFdReady()
540 {
541 int Res;
542 do
543 {
544 Res = write(OutFd,OutQueue.c_str(),OutQueue.length());
545 }
546 while (Res < 0 && errno == EINTR);
547
548 if (Res <= 0)
549 return MethodFailure();
550
551 OutQueue.erase(0,Res);
552 if (OutQueue.empty() == true)
553 OutReady = false;
554
555 return true;
556 }
557 /*}}}*/
558 // Worker::InFdRead - In bound FD is ready /*{{{*/
559 // ---------------------------------------------------------------------
560 /* */
561 bool pkgAcquire::Worker::InFdReady()
562 {
563 if (ReadMessages() == false)
564 return false;
565 RunMessages();
566 return true;
567 }
568 /*}}}*/
569 // Worker::MethodFailure - Called when the method fails /*{{{*/
570 // ---------------------------------------------------------------------
571 /* This is called when the method is believed to have failed, probably because
572 read returned -1. */
573 bool pkgAcquire::Worker::MethodFailure()
574 {
575 _error->Error("Method %s has died unexpectedly!",Access.c_str());
576
577 // do not reap the child here to show meaningfull error to the user
578 ExecWait(Process,Access.c_str(),false);
579 Process = -1;
580 close(InFd);
581 close(OutFd);
582 InFd = -1;
583 OutFd = -1;
584 OutReady = false;
585 InReady = false;
586 OutQueue = string();
587 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
588
589 return false;
590 }
591 /*}}}*/
592 // Worker::Pulse - Called periodically /*{{{*/
593 // ---------------------------------------------------------------------
594 /* */
595 void pkgAcquire::Worker::Pulse()
596 {
597 if (CurrentItem == 0)
598 return;
599
600 struct stat Buf;
601 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
602 return;
603 CurrentSize = Buf.st_size;
604
605 // Hmm? Should not happen...
606 if (CurrentSize > TotalSize && TotalSize != 0)
607 TotalSize = CurrentSize;
608 }
609 /*}}}*/
610 // Worker::ItemDone - Called when the current item is finished /*{{{*/
611 // ---------------------------------------------------------------------
612 /* */
613 void pkgAcquire::Worker::ItemDone()
614 {
615 CurrentItem = 0;
616 CurrentSize = 0;
617 TotalSize = 0;
618 Status = string();
619 }
620 /*}}}*/