1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
44 // Worker::Worker - Constructor for Queue startup /*{{{*/
45 // ---------------------------------------------------------------------
47 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
48 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
60 // Worker::Worker - Constructor for method config startup /*{{{*/
61 // ---------------------------------------------------------------------
63 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
64 Access(Cnf
->Access
), CurrentItem(NULL
),
65 CurrentSize(0), TotalSize(0)
70 // Worker::Construct - Constructor helper /*{{{*/
71 // ---------------------------------------------------------------------
73 void pkgAcquire::Worker::Construct()
82 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
85 // Worker::~Worker - Destructor /*{{{*/
86 // ---------------------------------------------------------------------
88 pkgAcquire::Worker::~Worker()
95 /* Closing of stdin is the signal to exit and die when the process
96 indicates it needs cleanup */
97 if (Config
->NeedsCleanup
== false)
99 ExecWait(Process
,Access
.c_str(),true);
103 // Worker::Start - Start the worker process /*{{{*/
104 // ---------------------------------------------------------------------
105 /* This forks the method and inits the communication channel */
106 bool pkgAcquire::Worker::Start()
108 // Get the method path
109 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
110 if (FileExists(Method
) == false)
112 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
113 if (Access
== "https")
114 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
119 clog
<< "Starting method '" << Method
<< '\'' << endl
;
122 int Pipes
[4] = {-1,-1,-1,-1};
123 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
125 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
126 for (int I
= 0; I
!= 4; I
++)
130 for (int I
= 0; I
!= 4; I
++)
131 SetCloseExec(Pipes
[I
],true);
133 // Fork off the process
134 Process
= ExecFork();
138 dup2(Pipes
[1],STDOUT_FILENO
);
139 dup2(Pipes
[2],STDIN_FILENO
);
140 SetCloseExec(STDOUT_FILENO
,false);
141 SetCloseExec(STDIN_FILENO
,false);
142 SetCloseExec(STDERR_FILENO
,false);
145 Args
[0] = Method
.c_str();
147 execv(Args
[0],(char **)Args
);
148 cerr
<< "Failed to exec method " << Args
[0] << endl
;
155 SetNonBlock(Pipes
[0],true);
156 SetNonBlock(Pipes
[3],true);
162 // Read the configuration data
163 if (WaitFd(InFd
) == false ||
164 ReadMessages() == false)
165 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
174 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
175 // ---------------------------------------------------------------------
177 bool pkgAcquire::Worker::ReadMessages()
179 if (::ReadMessages(InFd
,MessageQueue
) == false)
180 return MethodFailure();
184 // Worker::RunMessage - Empty the message queue /*{{{*/
185 // ---------------------------------------------------------------------
186 /* This takes the messages from the message queue and runs them through
187 the parsers in order. */
188 bool pkgAcquire::Worker::RunMessages()
190 while (MessageQueue
.empty() == false)
192 string Message
= MessageQueue
.front();
193 MessageQueue
.erase(MessageQueue
.begin());
196 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
198 // Fetch the message number
200 int Number
= strtol(Message
.c_str(),&End
,10);
201 if (End
== Message
.c_str())
202 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
204 string URI
= LookupTag(Message
,"URI");
205 pkgAcquire::Queue::QItem
*Itm
= NULL
;
206 if (URI
.empty() == false)
207 Itm
= OwnerQ
->FindItem(URI
,this);
211 // update used mirror
212 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
213 if (UsedMirror
.empty() == false)
215 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
216 (*O
)->UsedMirror
= UsedMirror
;
218 if (Itm
->Description
.find(" ") != string::npos
)
219 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
223 // Determine the message number and dispatch
228 if (Capabilities(Message
) == false)
229 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
235 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
240 Status
= LookupTag(Message
,"Message");
248 _error
->Error("Method gave invalid 103 Redirect message");
252 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
257 // Change the status so that it can be dequeued
258 for (auto const &O
: Itm
->Owners
)
259 O
->Status
= pkgAcquire::Item::StatIdle
;
260 // Mark the item as done (taking care of all queues)
261 // and then put it in the main queue again
262 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
263 OwnerQ
->ItemDone(Itm
);
265 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
267 pkgAcquire::Item
*Owner
= *O
;
268 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
269 // if we change site, treat it as a mirror change
270 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
272 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
273 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
275 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
276 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
278 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
279 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
280 if (desc
.Description
.find(" ") != string::npos
)
281 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
286 OwnerQ
->Owner
->Enqueue(desc
);
299 _error
->Error("Method gave invalid 200 URI Start message");
305 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
306 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
307 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
309 (*O
)->Start(Message
, TotalSize
);
311 // Display update before completion
314 if (Log
->MorePulses
== true)
315 Log
->Pulse((*O
)->GetOwner());
316 Log
->Fetch((*O
)->GetItemDesc());
328 _error
->Error("Method gave invalid 201 URI Done message");
332 PrepareFiles("201::URIDone", Itm
);
334 // Display update before completion
335 if (Log
!= 0 && Log
->MorePulses
== true)
336 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
337 Log
->Pulse((*O
)->GetOwner());
339 HashStringList ReceivedHashes
;
341 std::string
const givenfilename
= LookupTag(Message
, "Filename");
342 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
343 // see if we got hashes to verify
344 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
346 std::string
const tagname
= std::string(*type
) + "-Hash";
347 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
348 if (hashsum
.empty() == false)
349 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
351 // not all methods always sent Hashes our way
352 if (ReceivedHashes
.usable() == false)
354 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
355 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
357 Hashes
calc(ExpectedHashes
);
358 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
360 ReceivedHashes
= calc
.GetHashStringList();
364 // only local files can refer other filenames and counting them as fetched would be unfair
365 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
366 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
369 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
370 OwnerQ
->ItemDone(Itm
);
373 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
374 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
375 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
377 pkgAcquire::Item
* const Owner
= *O
;
378 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
379 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
381 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
382 << "ReceivedHash:" << endl
;
383 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
384 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
385 std::clog
<< "ExpectedHash:" << endl
;
386 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
387 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
391 // decide if what we got is what we expected
392 bool consideredOkay
= false;
393 if (ExpectedHashes
.usable())
395 if (ReceivedHashes
.usable() == false)
397 /* IMS-Hits can't be checked here as we will have uncompressed file,
398 but the hashes for the compressed file. What we have was good through
399 so all we have to ensure later is that we are not stalled. */
400 consideredOkay
= isIMSHit
;
402 else if (ReceivedHashes
== ExpectedHashes
)
403 consideredOkay
= true;
405 consideredOkay
= false;
408 else if (Owner
->HashesRequired() == true)
409 consideredOkay
= false;
412 consideredOkay
= true;
413 // even if the hashes aren't usable to declare something secure
414 // we can at least use them to declare it an integrity failure
415 if (ExpectedHashes
.empty() == false && ReceivedHashes
!= ExpectedHashes
&& _config
->Find("Acquire::ForceHash").empty())
416 consideredOkay
= false;
419 if (consideredOkay
== true)
420 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
421 else // hashsum mismatch
422 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
424 if (consideredOkay
== true)
426 Owner
->Done(Message
, ReceivedHashes
, Config
);
430 Log
->IMSHit(Owner
->GetItemDesc());
432 Log
->Done(Owner
->GetItemDesc());
437 Owner
->Failed(Message
,Config
);
439 Log
->Fail(Owner
->GetItemDesc());
451 std::string
const msg
= LookupTag(Message
,"Message");
452 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
456 PrepareFiles("400::URIFailure", Itm
);
458 // Display update before completion
459 if (Log
!= 0 && Log
->MorePulses
== true)
460 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
461 Log
->Pulse((*O
)->GetOwner());
463 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
464 OwnerQ
->ItemDone(Itm
);
469 std::string
const failReason
= LookupTag(Message
, "FailReason");
470 std::string
const reasons
[] = { "Timeout", "ConnectionRefused",
471 "ConnectionTimedOut", "ResolveFailure", "TmpResolveFailure" };
472 errTransient
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
475 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
478 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
479 (*O
)->Failed(Message
,Config
);
482 Log
->Fail((*O
)->GetItemDesc());
489 // 401 General Failure
491 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
496 MediaChange(Message
);
503 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
504 // ---------------------------------------------------------------------
505 /* This parses the capabilities message and dumps it into the configuration
507 bool pkgAcquire::Worker::Capabilities(string Message
)
512 Config
->Version
= LookupTag(Message
,"Version");
513 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
514 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
515 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
516 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
517 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
518 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
523 clog
<< "Configured access method " << Config
->Access
<< endl
;
524 clog
<< "Version:" << Config
->Version
<<
525 " SingleInstance:" << Config
->SingleInstance
<<
526 " Pipeline:" << Config
->Pipeline
<<
527 " SendConfig:" << Config
->SendConfig
<<
528 " LocalOnly: " << Config
->LocalOnly
<<
529 " NeedsCleanup: " << Config
->NeedsCleanup
<<
530 " Removable: " << Config
->Removable
<< endl
;
536 // Worker::MediaChange - Request a media change /*{{{*/
537 // ---------------------------------------------------------------------
539 bool pkgAcquire::Worker::MediaChange(string Message
)
541 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
544 string Media
= LookupTag(Message
,"Media");
545 string Drive
= LookupTag(Message
,"Drive");
546 ostringstream msg
,status
;
547 ioprintf(msg
,_("Please insert the disc labeled: "
549 "in the drive '%s' and press [Enter]."),
550 Media
.c_str(),Drive
.c_str());
551 status
<< "media-change: " // message
552 << Media
<< ":" // media
553 << Drive
<< ":" // drive
554 << msg
.str() // l10n message
557 std::string
const dlstatus
= status
.str();
558 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
561 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
562 LookupTag(Message
,"Drive")) == false)
565 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
567 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
574 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
576 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
582 // Worker::SendConfiguration - Send the config to the method /*{{{*/
583 // ---------------------------------------------------------------------
585 bool pkgAcquire::Worker::SendConfiguration()
587 if (Config
->SendConfig
== false)
593 /* Write out all of the configuration directives by walking the
594 configuration tree */
595 std::ostringstream Message
;
596 Message
<< "601 Configuration\n";
597 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
601 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
602 OutQueue
+= Message
.str();
608 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
609 // ---------------------------------------------------------------------
610 /* Send a URI Acquire message to the method */
611 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
616 string Message
= "600 URI Acquire\n";
617 Message
.reserve(300);
618 Message
+= "URI: " + Item
->URI
;
619 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
621 HashStringList
const hsl
= Item
->GetExpectedHashes();
622 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
623 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
625 if (hsl
.FileSize() == 0)
627 unsigned long long FileSize
= Item
->GetMaximumSize();
631 strprintf(MaximumSize
, "%llu", FileSize
);
632 Message
+= "\nMaximum-Size: " + MaximumSize
;
636 Item
->SyncDestinationFiles();
637 Message
+= Item
->Custom600Headers();
640 if (RealFileExists(Item
->Owner
->DestFile
))
642 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
643 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
644 SandboxUser
.c_str(), "root", 0600);
648 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
655 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
656 // ---------------------------------------------------------------------
658 bool pkgAcquire::Worker::OutFdReady()
663 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
665 while (Res
< 0 && errno
== EINTR
);
668 return MethodFailure();
670 OutQueue
.erase(0,Res
);
671 if (OutQueue
.empty() == true)
677 // Worker::InFdRead - In bound FD is ready /*{{{*/
678 // ---------------------------------------------------------------------
680 bool pkgAcquire::Worker::InFdReady()
682 if (ReadMessages() == false)
688 // Worker::MethodFailure - Called when the method fails /*{{{*/
689 // ---------------------------------------------------------------------
690 /* This is called when the method is believed to have failed, probably because
692 bool pkgAcquire::Worker::MethodFailure()
694 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
696 // do not reap the child here to show meaningfull error to the user
697 ExecWait(Process
,Access
.c_str(),false);
706 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
711 // Worker::Pulse - Called periodically /*{{{*/
712 // ---------------------------------------------------------------------
714 void pkgAcquire::Worker::Pulse()
716 if (CurrentItem
== 0)
720 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
722 CurrentSize
= Buf
.st_size
;
725 // Worker::ItemDone - Called when the current item is finished /*{{{*/
726 // ---------------------------------------------------------------------
728 void pkgAcquire::Worker::ItemDone()
736 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
738 if (RealFileExists(Itm
->Owner
->DestFile
))
740 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
741 std::string
const filename
= Itm
->Owner
->DestFile
;
742 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
744 pkgAcquire::Item
const * const Owner
= *O
;
745 if (Owner
->DestFile
== filename
)
747 unlink(Owner
->DestFile
.c_str());
748 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
750 // different mounts can't happen for us as we download to lists/ by default,
751 // but if the system is reused by others the locations can potentially be on
752 // different disks, so use symlink as poor-men replacement.
753 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
754 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
755 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
761 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
762 unlink((*O
)->DestFile
.c_str());