1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
42 // Worker::Worker - Constructor for Queue startup /*{{{*/
43 // ---------------------------------------------------------------------
45 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
46 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
58 // Worker::Worker - Constructor for method config startup /*{{{*/
59 // ---------------------------------------------------------------------
61 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
62 Access(Cnf
->Access
), CurrentItem(NULL
),
63 CurrentSize(0), TotalSize(0)
68 // Worker::Construct - Constructor helper /*{{{*/
69 // ---------------------------------------------------------------------
71 void pkgAcquire::Worker::Construct()
80 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
83 // Worker::~Worker - Destructor /*{{{*/
84 // ---------------------------------------------------------------------
86 pkgAcquire::Worker::~Worker()
93 /* Closing of stdin is the signal to exit and die when the process
94 indicates it needs cleanup */
95 if (Config
->NeedsCleanup
== false)
97 ExecWait(Process
,Access
.c_str(),true);
101 // Worker::Start - Start the worker process /*{{{*/
102 // ---------------------------------------------------------------------
103 /* This forks the method and inits the communication channel */
104 bool pkgAcquire::Worker::Start()
106 // Get the method path
107 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
108 if (FileExists(Method
) == false)
110 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
111 if (Access
== "https")
112 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
117 clog
<< "Starting method '" << Method
<< '\'' << endl
;
120 int Pipes
[4] = {-1,-1,-1,-1};
121 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
123 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
124 for (int I
= 0; I
!= 4; I
++)
128 for (int I
= 0; I
!= 4; I
++)
129 SetCloseExec(Pipes
[I
],true);
131 // Fork off the process
132 Process
= ExecFork();
136 dup2(Pipes
[1],STDOUT_FILENO
);
137 dup2(Pipes
[2],STDIN_FILENO
);
138 SetCloseExec(STDOUT_FILENO
,false);
139 SetCloseExec(STDIN_FILENO
,false);
140 SetCloseExec(STDERR_FILENO
,false);
143 Args
[0] = Method
.c_str();
145 execv(Args
[0],(char **)Args
);
146 cerr
<< "Failed to exec method " << Args
[0] << endl
;
153 SetNonBlock(Pipes
[0],true);
154 SetNonBlock(Pipes
[3],true);
160 // Read the configuration data
161 if (WaitFd(InFd
) == false ||
162 ReadMessages() == false)
163 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
172 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
173 // ---------------------------------------------------------------------
175 bool pkgAcquire::Worker::ReadMessages()
177 if (::ReadMessages(InFd
,MessageQueue
) == false)
178 return MethodFailure();
182 // Worker::RunMessage - Empty the message queue /*{{{*/
183 // ---------------------------------------------------------------------
184 /* This takes the messages from the message queue and runs them through
185 the parsers in order. */
186 bool pkgAcquire::Worker::RunMessages()
188 while (MessageQueue
.empty() == false)
190 string Message
= MessageQueue
.front();
191 MessageQueue
.erase(MessageQueue
.begin());
194 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
196 // Fetch the message number
198 int Number
= strtol(Message
.c_str(),&End
,10);
199 if (End
== Message
.c_str())
200 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
202 string URI
= LookupTag(Message
,"URI");
203 pkgAcquire::Queue::QItem
*Itm
= NULL
;
204 if (URI
.empty() == false)
205 Itm
= OwnerQ
->FindItem(URI
,this);
209 // update used mirror
210 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
211 if (UsedMirror
.empty() == false)
213 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
214 (*O
)->UsedMirror
= UsedMirror
;
216 if (Itm
->Description
.find(" ") != string::npos
)
217 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
221 // Determine the message number and dispatch
226 if (Capabilities(Message
) == false)
227 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
233 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
238 Status
= LookupTag(Message
,"Message");
246 _error
->Error("Method gave invalid 103 Redirect message");
250 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
255 // Change the status so that it can be dequeued
256 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
257 (*O
)->Status
= pkgAcquire::Item::StatIdle
;
258 // Mark the item as done (taking care of all queues)
259 // and then put it in the main queue again
260 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
261 OwnerQ
->ItemDone(Itm
);
263 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
265 pkgAcquire::Item
*Owner
= *O
;
266 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
267 // if we change site, treat it as a mirror change
268 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
270 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
271 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
273 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
274 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
276 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
277 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
278 if (desc
.Description
.find(" ") != string::npos
)
279 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
284 OwnerQ
->Owner
->Enqueue(desc
);
297 _error
->Error("Method gave invalid 200 URI Start message");
303 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
304 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
305 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
307 (*O
)->Start(Message
, TotalSize
);
309 // Display update before completion
312 if (Log
->MorePulses
== true)
313 Log
->Pulse((*O
)->GetOwner());
314 Log
->Fetch((*O
)->GetItemDesc());
326 _error
->Error("Method gave invalid 201 URI Done message");
330 PrepareFiles("201::URIDone", Itm
);
332 // Display update before completion
333 if (Log
!= 0 && Log
->MorePulses
== true)
334 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
335 Log
->Pulse((*O
)->GetOwner());
337 std::string
const filename
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str());
338 HashStringList ReceivedHashes
;
340 // see if we got hashes to verify
341 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
343 std::string
const tagname
= std::string(*type
) + "-Hash";
344 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
345 if (hashsum
.empty() == false)
346 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
348 // not all methods always sent Hashes our way
349 if (ReceivedHashes
.usable() == false)
351 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
352 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
354 Hashes
calc(ExpectedHashes
);
355 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
357 ReceivedHashes
= calc
.GetHashStringList();
362 // only local files can refer other filenames and counting them as fetched would be unfair
363 if (Log
!= NULL
&& filename
!= Itm
->Owner
->DestFile
)
364 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
366 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
367 OwnerQ
->ItemDone(Itm
);
370 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
371 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
372 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
374 pkgAcquire::Item
* const Owner
= *O
;
375 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
376 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
378 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
379 << "ReceivedHash:" << endl
;
380 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
381 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
382 std::clog
<< "ExpectedHash:" << endl
;
383 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
384 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
388 // decide if what we got is what we expected
389 bool consideredOkay
= false;
390 if (ExpectedHashes
.usable())
392 if (ReceivedHashes
.usable() == false)
394 /* IMS-Hits can't be checked here as we will have uncompressed file,
395 but the hashes for the compressed file. What we have was good through
396 so all we have to ensure later is that we are not stalled. */
397 consideredOkay
= isIMSHit
;
399 else if (ReceivedHashes
== ExpectedHashes
)
400 consideredOkay
= true;
402 consideredOkay
= false;
405 else if (Owner
->HashesRequired() == true)
406 consideredOkay
= false;
408 consideredOkay
= true;
410 if (consideredOkay
== true)
411 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
412 else // hashsum mismatch
413 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
415 if (consideredOkay
== true)
417 Owner
->Done(Message
, ReceivedHashes
, Config
);
421 Log
->IMSHit(Owner
->GetItemDesc());
423 Log
->Done(Owner
->GetItemDesc());
428 Owner
->Failed(Message
,Config
);
430 Log
->Fail(Owner
->GetItemDesc());
442 std::string
const msg
= LookupTag(Message
,"Message");
443 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
447 PrepareFiles("400::URIFailure", Itm
);
449 // Display update before completion
450 if (Log
!= 0 && Log
->MorePulses
== true)
451 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
452 Log
->Pulse((*O
)->GetOwner());
454 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
455 OwnerQ
->ItemDone(Itm
);
458 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
461 if(LookupTag(Message
,"FailReason") == "Timeout" ||
462 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
463 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
464 LookupTag(Message
,"FailReason") == "ConnectionRefused")
465 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
467 (*O
)->Failed(Message
,Config
);
470 Log
->Fail((*O
)->GetItemDesc());
477 // 401 General Failure
479 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
484 MediaChange(Message
);
491 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
492 // ---------------------------------------------------------------------
493 /* This parses the capabilities message and dumps it into the configuration
495 bool pkgAcquire::Worker::Capabilities(string Message
)
500 Config
->Version
= LookupTag(Message
,"Version");
501 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
502 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
503 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
504 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
505 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
506 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
511 clog
<< "Configured access method " << Config
->Access
<< endl
;
512 clog
<< "Version:" << Config
->Version
<<
513 " SingleInstance:" << Config
->SingleInstance
<<
514 " Pipeline:" << Config
->Pipeline
<<
515 " SendConfig:" << Config
->SendConfig
<<
516 " LocalOnly: " << Config
->LocalOnly
<<
517 " NeedsCleanup: " << Config
->NeedsCleanup
<<
518 " Removable: " << Config
->Removable
<< endl
;
524 // Worker::MediaChange - Request a media change /*{{{*/
525 // ---------------------------------------------------------------------
527 bool pkgAcquire::Worker::MediaChange(string Message
)
529 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
532 string Media
= LookupTag(Message
,"Media");
533 string Drive
= LookupTag(Message
,"Drive");
534 ostringstream msg
,status
;
535 ioprintf(msg
,_("Please insert the disc labeled: "
537 "in the drive '%s' and press [Enter]."),
538 Media
.c_str(),Drive
.c_str());
539 status
<< "media-change: " // message
540 << Media
<< ":" // media
541 << Drive
<< ":" // drive
542 << msg
.str() // l10n message
545 std::string
const dlstatus
= status
.str();
546 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
549 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
550 LookupTag(Message
,"Drive")) == false)
553 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
555 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
562 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
564 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
570 // Worker::SendConfiguration - Send the config to the method /*{{{*/
571 // ---------------------------------------------------------------------
573 bool pkgAcquire::Worker::SendConfiguration()
575 if (Config
->SendConfig
== false)
581 /* Write out all of the configuration directives by walking the
582 configuration tree */
583 std::ostringstream Message
;
584 Message
<< "601 Configuration\n";
585 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
589 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
590 OutQueue
+= Message
.str();
596 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
597 // ---------------------------------------------------------------------
598 /* Send a URI Acquire message to the method */
599 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
604 string Message
= "600 URI Acquire\n";
605 Message
.reserve(300);
606 Message
+= "URI: " + Item
->URI
;
607 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
609 HashStringList
const hsl
= Item
->GetExpectedHashes();
610 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
611 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
613 if (hsl
.FileSize() == 0)
615 unsigned long long FileSize
= Item
->GetMaximumSize();
619 strprintf(MaximumSize
, "%llu", FileSize
);
620 Message
+= "\nMaximum-Size: " + MaximumSize
;
624 Item
->SyncDestinationFiles();
625 Message
+= Item
->Custom600Headers();
628 if (RealFileExists(Item
->Owner
->DestFile
))
630 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
631 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
632 SandboxUser
.c_str(), "root", 0600);
636 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
643 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
644 // ---------------------------------------------------------------------
646 bool pkgAcquire::Worker::OutFdReady()
651 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
653 while (Res
< 0 && errno
== EINTR
);
656 return MethodFailure();
658 OutQueue
.erase(0,Res
);
659 if (OutQueue
.empty() == true)
665 // Worker::InFdRead - In bound FD is ready /*{{{*/
666 // ---------------------------------------------------------------------
668 bool pkgAcquire::Worker::InFdReady()
670 if (ReadMessages() == false)
676 // Worker::MethodFailure - Called when the method fails /*{{{*/
677 // ---------------------------------------------------------------------
678 /* This is called when the method is believed to have failed, probably because
680 bool pkgAcquire::Worker::MethodFailure()
682 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
684 // do not reap the child here to show meaningfull error to the user
685 ExecWait(Process
,Access
.c_str(),false);
694 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
699 // Worker::Pulse - Called periodically /*{{{*/
700 // ---------------------------------------------------------------------
702 void pkgAcquire::Worker::Pulse()
704 if (CurrentItem
== 0)
708 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
710 CurrentSize
= Buf
.st_size
;
713 // Worker::ItemDone - Called when the current item is finished /*{{{*/
714 // ---------------------------------------------------------------------
716 void pkgAcquire::Worker::ItemDone()
724 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
726 if (RealFileExists(Itm
->Owner
->DestFile
))
728 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
729 std::string
const filename
= Itm
->Owner
->DestFile
;
730 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
732 pkgAcquire::Item
const * const Owner
= *O
;
733 if (Owner
->DestFile
== filename
)
735 unlink(Owner
->DestFile
.c_str());
736 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
738 // different mounts can't happen for us as we download to lists/ by default,
739 // but if the system is reused by others the locations can potentially be on
740 // different disks, so use symlink as poor-men replacement.
741 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
742 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
743 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
749 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
750 unlink((*O
)->DestFile
.c_str());