]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
9254e20a3614f4e4df0c02b41987bd49299e6cbc
[apt.git] / apt-pkg / acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
5
6 Acquire Worker
7
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
12 ##################################################################### */
13 /*}}}*/
14 // Include Files /*{{{*/
15 #include <config.h>
16
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30
31 #include <sys/stat.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <signal.h>
35 #include <stdio.h>
36 #include <errno.h>
37 #include <sys/types.h>
38 #include <pwd.h>
39 #include <grp.h>
40
41 #include <apti18n.h>
42 /*}}}*/
43
44 using namespace std;
45
46 // Worker::Worker - Constructor for Queue startup /*{{{*/
47 // ---------------------------------------------------------------------
48 /* */
49 pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
50 pkgAcquireStatus *Log) : Log(Log)
51 {
52 OwnerQ = Q;
53 Config = Cnf;
54 Access = Cnf->Access;
55 CurrentItem = 0;
56 TotalSize = 0;
57 CurrentSize = 0;
58
59 Construct();
60 }
61 /*}}}*/
62 // Worker::Worker - Constructor for method config startup /*{{{*/
63 // ---------------------------------------------------------------------
64 /* */
65 pkgAcquire::Worker::Worker(MethodConfig *Cnf)
66 {
67 OwnerQ = 0;
68 Config = Cnf;
69 Access = Cnf->Access;
70 CurrentItem = 0;
71 TotalSize = 0;
72 CurrentSize = 0;
73
74 Construct();
75 }
76 /*}}}*/
77 // Worker::Construct - Constructor helper /*{{{*/
78 // ---------------------------------------------------------------------
79 /* */
80 void pkgAcquire::Worker::Construct()
81 {
82 NextQueue = 0;
83 NextAcquire = 0;
84 Process = -1;
85 InFd = -1;
86 OutFd = -1;
87 OutReady = false;
88 InReady = false;
89 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
90 }
91 /*}}}*/
92 // Worker::~Worker - Destructor /*{{{*/
93 // ---------------------------------------------------------------------
94 /* */
95 pkgAcquire::Worker::~Worker()
96 {
97 close(InFd);
98 close(OutFd);
99
100 if (Process > 0)
101 {
102 /* Closing of stdin is the signal to exit and die when the process
103 indicates it needs cleanup */
104 if (Config->NeedsCleanup == false)
105 kill(Process,SIGINT);
106 ExecWait(Process,Access.c_str(),true);
107 }
108 }
109 /*}}}*/
110 // Worker::Start - Start the worker process /*{{{*/
111 // ---------------------------------------------------------------------
112 /* This forks the method and inits the communication channel */
113 bool pkgAcquire::Worker::Start()
114 {
115 // Get the method path
116 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
117 if (FileExists(Method) == false)
118 {
119 _error->Error(_("The method driver %s could not be found."),Method.c_str());
120 if (Access == "https")
121 _error->Notice(_("Is the package %s installed?"), "apt-transport-https");
122 return false;
123 }
124
125 if (Debug == true)
126 clog << "Starting method '" << Method << '\'' << endl;
127
128 // Create the pipes
129 int Pipes[4] = {-1,-1,-1,-1};
130 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
131 {
132 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
133 for (int I = 0; I != 4; I++)
134 close(Pipes[I]);
135 return false;
136 }
137 for (int I = 0; I != 4; I++)
138 SetCloseExec(Pipes[I],true);
139
140 // Fork off the process
141 Process = ExecFork();
142 if (Process == 0)
143 {
144 // Setup the FDs
145 dup2(Pipes[1],STDOUT_FILENO);
146 dup2(Pipes[2],STDIN_FILENO);
147 SetCloseExec(STDOUT_FILENO,false);
148 SetCloseExec(STDIN_FILENO,false);
149 SetCloseExec(STDERR_FILENO,false);
150
151 const char *Args[2];
152 Args[0] = Method.c_str();
153 Args[1] = 0;
154 execv(Args[0],(char **)Args);
155 cerr << "Failed to exec method " << Args[0] << endl;
156 _exit(100);
157 }
158
159 // Fix up our FDs
160 InFd = Pipes[0];
161 OutFd = Pipes[3];
162 SetNonBlock(Pipes[0],true);
163 SetNonBlock(Pipes[3],true);
164 close(Pipes[1]);
165 close(Pipes[2]);
166 OutReady = false;
167 InReady = true;
168
169 // Read the configuration data
170 if (WaitFd(InFd) == false ||
171 ReadMessages() == false)
172 return _error->Error(_("Method %s did not start correctly"),Method.c_str());
173
174 RunMessages();
175 if (OwnerQ != 0)
176 SendConfiguration();
177
178 return true;
179 }
180 /*}}}*/
181 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
182 // ---------------------------------------------------------------------
183 /* */
184 bool pkgAcquire::Worker::ReadMessages()
185 {
186 if (::ReadMessages(InFd,MessageQueue) == false)
187 return MethodFailure();
188 return true;
189 }
190 /*}}}*/
191 // Worker::RunMessage - Empty the message queue /*{{{*/
192 // ---------------------------------------------------------------------
193 /* This takes the messages from the message queue and runs them through
194 the parsers in order. */
195 bool pkgAcquire::Worker::RunMessages()
196 {
197 while (MessageQueue.empty() == false)
198 {
199 string Message = MessageQueue.front();
200 MessageQueue.erase(MessageQueue.begin());
201
202 if (Debug == true)
203 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
204
205 // Fetch the message number
206 char *End;
207 int Number = strtol(Message.c_str(),&End,10);
208 if (End == Message.c_str())
209 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
210
211 string URI = LookupTag(Message,"URI");
212 pkgAcquire::Queue::QItem *Itm = 0;
213 if (URI.empty() == false)
214 Itm = OwnerQ->FindItem(URI,this);
215
216 // update used mirror
217 string UsedMirror = LookupTag(Message,"UsedMirror", "");
218 if (!UsedMirror.empty() &&
219 Itm &&
220 Itm->Description.find(" ") != string::npos)
221 {
222 Itm->Description.replace(0, Itm->Description.find(" "), UsedMirror);
223 // FIXME: will we need this as well?
224 //Itm->ShortDesc = UsedMirror;
225 }
226
227 // Determine the message number and dispatch
228 switch (Number)
229 {
230 // 100 Capabilities
231 case 100:
232 if (Capabilities(Message) == false)
233 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
234 break;
235
236 // 101 Log
237 case 101:
238 if (Debug == true)
239 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
240 break;
241
242 // 102 Status
243 case 102:
244 Status = LookupTag(Message,"Message");
245 break;
246
247 // 103 Redirect
248 case 103:
249 {
250 if (Itm == 0)
251 {
252 _error->Error("Method gave invalid 103 Redirect message");
253 break;
254 }
255
256 string NewURI = LookupTag(Message,"New-URI",URI.c_str());
257 Itm->URI = NewURI;
258
259 ItemDone();
260
261 pkgAcquire::Item *Owner = Itm->Owner;
262 pkgAcquire::ItemDesc Desc = *Itm;
263
264 // Change the status so that it can be dequeued
265 Owner->Status = pkgAcquire::Item::StatIdle;
266 // Mark the item as done (taking care of all queues)
267 // and then put it in the main queue again
268 OwnerQ->ItemDone(Itm);
269 OwnerQ->Owner->Enqueue(Desc);
270
271 if (Log != 0)
272 Log->Done(Desc);
273 break;
274 }
275
276 // 200 URI Start
277 case 200:
278 {
279 if (Itm == 0)
280 {
281 _error->Error("Method gave invalid 200 URI Start message");
282 break;
283 }
284
285 CurrentItem = Itm;
286 CurrentSize = 0;
287 TotalSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
288 ResumePoint = strtoull(LookupTag(Message,"Resume-Point","0").c_str(), NULL, 10);
289 Itm->Owner->Start(Message,strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10));
290
291 // Display update before completion
292 if (Log != 0 && Log->MorePulses == true)
293 Log->Pulse(Itm->Owner->GetOwner());
294
295 if (Log != 0)
296 Log->Fetch(*Itm);
297
298 break;
299 }
300
301 // 201 URI Done
302 case 201:
303 {
304 if (Itm == 0)
305 {
306 _error->Error("Method gave invalid 201 URI Done message");
307 break;
308 }
309
310 pkgAcquire::Item *Owner = Itm->Owner;
311 pkgAcquire::ItemDesc Desc = *Itm;
312
313 if (RealFileExists(Owner->DestFile))
314 ChangeOwnerAndPermissionOfFile("201::URIDone", Owner->DestFile.c_str(), "root", "root", 0644);
315
316 // Display update before completion
317 if (Log != 0 && Log->MorePulses == true)
318 Log->Pulse(Owner->GetOwner());
319
320 OwnerQ->ItemDone(Itm);
321 unsigned long long const ServerSize = strtoull(LookupTag(Message,"Size","0").c_str(), NULL, 10);
322 bool isHit = StringToBool(LookupTag(Message,"IMS-Hit"),false) ||
323 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false);
324 // Using the https method the server might return 200, but the
325 // If-Modified-Since condition is not satsified, libcurl will
326 // discard the download. In this case, however, TotalSize will be
327 // set to the actual size of the file, while ServerSize will be set
328 // to 0. Therefore, if the item is marked as a hit and the
329 // downloaded size (ServerSize) is 0, we ignore TotalSize.
330 if (TotalSize != 0 && (!isHit || ServerSize != 0) && ServerSize != TotalSize)
331 _error->Warning("Size of file %s is not what the server reported %s %llu",
332 Owner->DestFile.c_str(), LookupTag(Message,"Size","0").c_str(),TotalSize);
333
334 // see if there is a hash to verify
335 HashStringList ReceivedHashes;
336 for (char const * const * type = HashString::SupportedHashes(); *type != NULL; ++type)
337 {
338 std::string const tagname = std::string(*type) + "-Hash";
339 std::string const hashsum = LookupTag(Message, tagname.c_str());
340 if (hashsum.empty() == false)
341 ReceivedHashes.push_back(HashString(*type, hashsum));
342 }
343
344 if(_config->FindB("Debug::pkgAcquire::Auth", false) == true)
345 {
346 std::clog << "201 URI Done: " << Owner->DescURI() << endl
347 << "ReceivedHash:" << endl;
348 for (HashStringList::const_iterator hs = ReceivedHashes.begin(); hs != ReceivedHashes.end(); ++hs)
349 std::clog << "\t- " << hs->toStr() << std::endl;
350 std::clog << "ExpectedHash:" << endl;
351 HashStringList expectedHashes = Owner->HashSums();
352 for (HashStringList::const_iterator hs = expectedHashes.begin(); hs != expectedHashes.end(); ++hs)
353 std::clog << "\t- " << hs->toStr() << std::endl;
354 std::clog << endl;
355 }
356 Owner->Done(Message, ServerSize, ReceivedHashes, Config);
357 ItemDone();
358
359 // Log that we are done
360 if (Log != 0)
361 {
362 if (isHit)
363 {
364 /* Hide 'hits' for local only sources - we also manage to
365 hide gets */
366 if (Config->LocalOnly == false)
367 Log->IMSHit(Desc);
368 }
369 else
370 Log->Done(Desc);
371 }
372 break;
373 }
374
375 // 400 URI Failure
376 case 400:
377 {
378 if (Itm == 0)
379 {
380 std::string const msg = LookupTag(Message,"Message");
381 _error->Error("Method gave invalid 400 URI Failure message: %s", msg.c_str());
382 break;
383 }
384
385 // Display update before completion
386 if (Log != 0 && Log->MorePulses == true)
387 Log->Pulse(Itm->Owner->GetOwner());
388
389 pkgAcquire::Item *Owner = Itm->Owner;
390 pkgAcquire::ItemDesc Desc = *Itm;
391
392 if (RealFileExists(Owner->DestFile))
393 ChangeOwnerAndPermissionOfFile("400::URIFailure", Owner->DestFile.c_str(), "root", "root", 0644);
394
395 OwnerQ->ItemDone(Itm);
396
397 // set some status
398 if(LookupTag(Message,"FailReason") == "Timeout" ||
399 LookupTag(Message,"FailReason") == "TmpResolveFailure" ||
400 LookupTag(Message,"FailReason") == "ResolveFailure" ||
401 LookupTag(Message,"FailReason") == "ConnectionRefused")
402 Owner->Status = pkgAcquire::Item::StatTransientNetworkError;
403
404 Owner->Failed(Message,Config);
405 ItemDone();
406
407 if (Log != 0)
408 Log->Fail(Desc);
409
410 break;
411 }
412
413 // 401 General Failure
414 case 401:
415 _error->Error("Method %s General failure: %s",Access.c_str(),LookupTag(Message,"Message").c_str());
416 break;
417
418 // 403 Media Change
419 case 403:
420 MediaChange(Message);
421 break;
422 }
423 }
424 return true;
425 }
426 /*}}}*/
427 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
428 // ---------------------------------------------------------------------
429 /* This parses the capabilities message and dumps it into the configuration
430 structure. */
431 bool pkgAcquire::Worker::Capabilities(string Message)
432 {
433 if (Config == 0)
434 return true;
435
436 Config->Version = LookupTag(Message,"Version");
437 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
438 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
439 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
440 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
441 Config->NeedsCleanup = StringToBool(LookupTag(Message,"Needs-Cleanup"),false);
442 Config->Removable = StringToBool(LookupTag(Message,"Removable"),false);
443
444 // Some debug text
445 if (Debug == true)
446 {
447 clog << "Configured access method " << Config->Access << endl;
448 clog << "Version:" << Config->Version <<
449 " SingleInstance:" << Config->SingleInstance <<
450 " Pipeline:" << Config->Pipeline <<
451 " SendConfig:" << Config->SendConfig <<
452 " LocalOnly: " << Config->LocalOnly <<
453 " NeedsCleanup: " << Config->NeedsCleanup <<
454 " Removable: " << Config->Removable << endl;
455 }
456
457 return true;
458 }
459 /*}}}*/
460 // Worker::MediaChange - Request a media change /*{{{*/
461 // ---------------------------------------------------------------------
462 /* */
463 bool pkgAcquire::Worker::MediaChange(string Message)
464 {
465 int status_fd = _config->FindI("APT::Status-Fd",-1);
466 if(status_fd > 0)
467 {
468 string Media = LookupTag(Message,"Media");
469 string Drive = LookupTag(Message,"Drive");
470 ostringstream msg,status;
471 ioprintf(msg,_("Please insert the disc labeled: "
472 "'%s' "
473 "in the drive '%s' and press enter."),
474 Media.c_str(),Drive.c_str());
475 status << "media-change: " // message
476 << Media << ":" // media
477 << Drive << ":" // drive
478 << msg.str() // l10n message
479 << endl;
480
481 std::string const dlstatus = status.str();
482 FileFd::Write(status_fd, dlstatus.c_str(), dlstatus.size());
483 }
484
485 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
486 LookupTag(Message,"Drive")) == false)
487 {
488 char S[300];
489 snprintf(S,sizeof(S),"603 Media Changed\nFailed: true\n\n");
490 if (Debug == true)
491 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
492 OutQueue += S;
493 OutReady = true;
494 return true;
495 }
496
497 char S[300];
498 snprintf(S,sizeof(S),"603 Media Changed\n\n");
499 if (Debug == true)
500 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
501 OutQueue += S;
502 OutReady = true;
503 return true;
504 }
505 /*}}}*/
506 // Worker::SendConfiguration - Send the config to the method /*{{{*/
507 // ---------------------------------------------------------------------
508 /* */
509 bool pkgAcquire::Worker::SendConfiguration()
510 {
511 if (Config->SendConfig == false)
512 return true;
513
514 if (OutFd == -1)
515 return false;
516
517 /* Write out all of the configuration directives by walking the
518 configuration tree */
519 std::ostringstream Message;
520 Message << "601 Configuration\n";
521 _config->Dump(Message, NULL, "Config-Item: %F=%V\n", false);
522 Message << '\n';
523
524 if (Debug == true)
525 clog << " -> " << Access << ':' << QuoteString(Message.str(),"\n") << endl;
526 OutQueue += Message.str();
527 OutReady = true;
528
529 return true;
530 }
531 /*}}}*/
532 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
533 // ---------------------------------------------------------------------
534 /* Send a URI Acquire message to the method */
535 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
536 {
537 if (OutFd == -1)
538 return false;
539
540 string Message = "600 URI Acquire\n";
541 Message.reserve(300);
542 Message += "URI: " + Item->URI;
543 Message += "\nFilename: " + Item->Owner->DestFile;
544 HashStringList const hsl = Item->Owner->HashSums();
545 for (HashStringList::const_iterator hs = hsl.begin(); hs != hsl.end(); ++hs)
546 Message += "\nExpected-" + hs->HashType() + ": " + hs->HashValue();
547 if(Item->Owner->FileSize > 0)
548 {
549 string MaximumSize;
550 strprintf(MaximumSize, "%llu", Item->Owner->FileSize);
551 Message += "\nMaximum-Size: " + MaximumSize;
552 }
553 Message += Item->Owner->Custom600Headers();
554 Message += "\n\n";
555
556 if (RealFileExists(Item->Owner->DestFile))
557 {
558 std::string SandboxUser = _config->Find("APT::Sandbox::User");
559 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item->Owner->DestFile.c_str(),
560 SandboxUser.c_str(), "root", 0600);
561 }
562
563 if (Debug == true)
564 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
565 OutQueue += Message;
566 OutReady = true;
567
568 return true;
569 }
570 /*}}}*/
571 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
572 // ---------------------------------------------------------------------
573 /* */
574 bool pkgAcquire::Worker::OutFdReady()
575 {
576 int Res;
577 do
578 {
579 Res = write(OutFd,OutQueue.c_str(),OutQueue.length());
580 }
581 while (Res < 0 && errno == EINTR);
582
583 if (Res <= 0)
584 return MethodFailure();
585
586 OutQueue.erase(0,Res);
587 if (OutQueue.empty() == true)
588 OutReady = false;
589
590 return true;
591 }
592 /*}}}*/
593 // Worker::InFdRead - In bound FD is ready /*{{{*/
594 // ---------------------------------------------------------------------
595 /* */
596 bool pkgAcquire::Worker::InFdReady()
597 {
598 if (ReadMessages() == false)
599 return false;
600 RunMessages();
601 return true;
602 }
603 /*}}}*/
604 // Worker::MethodFailure - Called when the method fails /*{{{*/
605 // ---------------------------------------------------------------------
606 /* This is called when the method is believed to have failed, probably because
607 read returned -1. */
608 bool pkgAcquire::Worker::MethodFailure()
609 {
610 _error->Error("Method %s has died unexpectedly!",Access.c_str());
611
612 // do not reap the child here to show meaningfull error to the user
613 ExecWait(Process,Access.c_str(),false);
614 Process = -1;
615 close(InFd);
616 close(OutFd);
617 InFd = -1;
618 OutFd = -1;
619 OutReady = false;
620 InReady = false;
621 OutQueue = string();
622 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
623
624 return false;
625 }
626 /*}}}*/
627 // Worker::Pulse - Called periodically /*{{{*/
628 // ---------------------------------------------------------------------
629 /* */
630 void pkgAcquire::Worker::Pulse()
631 {
632 if (CurrentItem == 0)
633 return;
634
635 struct stat Buf;
636 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
637 return;
638 CurrentSize = Buf.st_size;
639
640 // Hmm? Should not happen...
641 if (CurrentSize > TotalSize && TotalSize != 0)
642 TotalSize = CurrentSize;
643 }
644 /*}}}*/
645 // Worker::ItemDone - Called when the current item is finished /*{{{*/
646 // ---------------------------------------------------------------------
647 /* */
648 void pkgAcquire::Worker::ItemDone()
649 {
650 CurrentItem = 0;
651 CurrentSize = 0;
652 TotalSize = 0;
653 Status = string();
654 }
655 /*}}}*/