1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
44 // Worker::Worker - Constructor for Queue startup /*{{{*/
45 pkgAcquire::Worker::Worker(Queue
*Q
, MethodConfig
*Cnf
, pkgAcquireStatus
*log
) :
46 d(NULL
), OwnerQ(Q
), Log(log
), Config(Cnf
), Access(Cnf
->Access
),
47 CurrentItem(nullptr), CurrentSize(0), TotalSize(0)
52 // Worker::Worker - Constructor for method config startup /*{{{*/
53 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : Worker(nullptr, Cnf
, nullptr)
57 // Worker::Construct - Constructor helper /*{{{*/
58 // ---------------------------------------------------------------------
60 void pkgAcquire::Worker::Construct()
69 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
72 // Worker::~Worker - Destructor /*{{{*/
73 // ---------------------------------------------------------------------
75 pkgAcquire::Worker::~Worker()
82 /* Closing of stdin is the signal to exit and die when the process
83 indicates it needs cleanup */
84 if (Config
->NeedsCleanup
== false)
86 ExecWait(Process
,Access
.c_str(),true);
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
95 // Get the method path
96 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
97 if (FileExists(Method
) == false)
99 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
100 if (Access
== "https")
101 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
106 clog
<< "Starting method '" << Method
<< '\'' << endl
;
109 int Pipes
[4] = {-1,-1,-1,-1};
110 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
112 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
113 for (int I
= 0; I
!= 4; I
++)
117 for (int I
= 0; I
!= 4; I
++)
118 SetCloseExec(Pipes
[I
],true);
120 // Fork off the process
121 Process
= ExecFork();
125 dup2(Pipes
[1],STDOUT_FILENO
);
126 dup2(Pipes
[2],STDIN_FILENO
);
127 SetCloseExec(STDOUT_FILENO
,false);
128 SetCloseExec(STDIN_FILENO
,false);
129 SetCloseExec(STDERR_FILENO
,false);
132 Args
[0] = Method
.c_str();
134 execv(Args
[0],(char **)Args
);
135 cerr
<< "Failed to exec method " << Args
[0] << endl
;
142 SetNonBlock(Pipes
[0],true);
143 SetNonBlock(Pipes
[3],true);
149 // Read the configuration data
150 if (WaitFd(InFd
) == false ||
151 ReadMessages() == false)
152 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
161 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
162 // ---------------------------------------------------------------------
164 bool pkgAcquire::Worker::ReadMessages()
166 if (::ReadMessages(InFd
,MessageQueue
) == false)
167 return MethodFailure();
171 // Worker::RunMessage - Empty the message queue /*{{{*/
172 // ---------------------------------------------------------------------
173 /* This takes the messages from the message queue and runs them through
174 the parsers in order. */
175 enum class APT_HIDDEN MessageType
{
184 GENERAL_FAILURE
= 401,
187 static bool isDoomedItem(pkgAcquire::Item
const * const Itm
)
189 auto const TransItm
= dynamic_cast<pkgAcqTransactionItem
const * const>(Itm
);
190 if (TransItm
== nullptr)
192 return TransItm
->TransactionManager
->State
!= pkgAcqTransactionItem::TransactionStarted
;
194 bool pkgAcquire::Worker::RunMessages()
196 while (MessageQueue
.empty() == false)
198 string Message
= MessageQueue
.front();
199 MessageQueue
.erase(MessageQueue
.begin());
202 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
204 // Fetch the message number
206 MessageType
const Number
= static_cast<MessageType
>(strtoul(Message
.c_str(),&End
,10));
207 if (End
== Message
.c_str())
208 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
210 string URI
= LookupTag(Message
,"URI");
211 pkgAcquire::Queue::QItem
*Itm
= NULL
;
212 if (URI
.empty() == false)
213 Itm
= OwnerQ
->FindItem(URI
,this);
217 // update used mirror
218 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
219 if (UsedMirror
.empty() == false)
221 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
222 (*O
)->UsedMirror
= UsedMirror
;
224 if (Itm
->Description
.find(" ") != string::npos
)
225 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
229 // Determine the message number and dispatch
232 case MessageType::CAPABILITIES
:
233 if (Capabilities(Message
) == false)
234 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
237 case MessageType::LOG
:
239 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
242 case MessageType::STATUS
:
243 Status
= LookupTag(Message
,"Message");
246 case MessageType::REDIRECT
:
250 _error
->Error("Method gave invalid 103 Redirect message");
254 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
259 // Change the status so that it can be dequeued
260 for (auto const &O
: Itm
->Owners
)
261 O
->Status
= pkgAcquire::Item::StatIdle
;
262 // Mark the item as done (taking care of all queues)
263 // and then put it in the main queue again
264 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
265 OwnerQ
->ItemDone(Itm
);
267 for (auto const &Owner
: ItmOwners
)
269 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
273 // if we change site, treat it as a mirror change
274 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
276 auto const firstSpace
= desc
.Description
.find(" ");
277 if (firstSpace
!= std::string::npos
)
279 std::string
const OldSite
= desc
.Description
.substr(0, firstSpace
);
280 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
282 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
283 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
285 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
286 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
287 desc
.Description
.replace(0, firstSpace
, Owner
->UsedMirror
);
293 if (isDoomedItem(Owner
) == false)
294 OwnerQ
->Owner
->Enqueue(desc
);
299 case MessageType::WARNING
:
300 _error
->Warning("%s: %s", Itm
->Owner
->DescURI().c_str(), LookupTag(Message
,"Message").c_str());
303 case MessageType::URI_START
:
307 _error
->Error("Method gave invalid 200 URI Start message");
313 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
314 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
315 for (auto const Owner
: Itm
->Owners
)
317 Owner
->Start(Message
, TotalSize
);
318 // Display update before completion
321 if (Log
->MorePulses
== true)
322 Log
->Pulse(Owner
->GetOwner());
323 Log
->Fetch(Owner
->GetItemDesc());
330 case MessageType::URI_DONE
:
334 _error
->Error("Method gave invalid 201 URI Done message");
338 PrepareFiles("201::URIDone", Itm
);
340 // Display update before completion
341 if (Log
!= 0 && Log
->MorePulses
== true)
342 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
343 Log
->Pulse((*O
)->GetOwner());
345 HashStringList ReceivedHashes
;
347 std::string
const givenfilename
= LookupTag(Message
, "Filename");
348 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
349 // see if we got hashes to verify
350 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
352 std::string
const tagname
= std::string(*type
) + "-Hash";
353 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
354 if (hashsum
.empty() == false)
355 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
357 // not all methods always sent Hashes our way
358 if (ReceivedHashes
.usable() == false)
360 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
361 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
363 Hashes
calc(ExpectedHashes
);
364 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
366 ReceivedHashes
= calc
.GetHashStringList();
370 // only local files can refer other filenames and counting them as fetched would be unfair
371 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
372 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
375 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
376 OwnerQ
->ItemDone(Itm
);
379 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
380 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
381 for (auto const Owner
: ItmOwners
)
383 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
384 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
386 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
387 << "ReceivedHash:" << endl
;
388 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
389 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
390 std::clog
<< "ExpectedHash:" << endl
;
391 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
392 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
396 // decide if what we got is what we expected
397 bool consideredOkay
= false;
398 if (ExpectedHashes
.usable())
400 if (ReceivedHashes
.usable() == false)
402 /* IMS-Hits can't be checked here as we will have uncompressed file,
403 but the hashes for the compressed file. What we have was good through
404 so all we have to ensure later is that we are not stalled. */
405 consideredOkay
= isIMSHit
;
407 else if (ReceivedHashes
== ExpectedHashes
)
408 consideredOkay
= true;
410 consideredOkay
= false;
413 else if (Owner
->HashesRequired() == true)
414 consideredOkay
= false;
417 consideredOkay
= true;
418 // even if the hashes aren't usable to declare something secure
419 // we can at least use them to declare it an integrity failure
420 if (ExpectedHashes
.empty() == false && ReceivedHashes
!= ExpectedHashes
&& _config
->Find("Acquire::ForceHash").empty())
421 consideredOkay
= false;
424 if (consideredOkay
== true)
425 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
426 else // hashsum mismatch
427 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
430 if (consideredOkay
== true)
432 if (isDoomedItem(Owner
) == false)
433 Owner
->Done(Message
, ReceivedHashes
, Config
);
437 Log
->IMSHit(Owner
->GetItemDesc());
439 Log
->Done(Owner
->GetItemDesc());
444 if (isDoomedItem(Owner
) == false)
446 if (Message
.find("\nFailReason:") == std::string::npos
)
448 if (ReceivedHashes
!= ExpectedHashes
)
449 Message
.append("\nFailReason: HashSumMismatch");
451 Message
.append("\nFailReason: WeakHashSums");
453 Owner
->Failed(Message
,Config
);
456 Log
->Fail(Owner
->GetItemDesc());
463 case MessageType::URI_FAILURE
:
467 std::string
const msg
= LookupTag(Message
,"Message");
468 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
472 PrepareFiles("400::URIFailure", Itm
);
474 // Display update before completion
475 if (Log
!= nullptr && Log
->MorePulses
== true)
476 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
477 Log
->Pulse((*O
)->GetOwner());
479 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
480 OwnerQ
->ItemDone(Itm
);
483 bool errTransient
= false, errAuthErr
= false;
485 std::string
const failReason
= LookupTag(Message
, "FailReason");
487 auto const reasons
= { "Timeout", "ConnectionRefused",
488 "ConnectionTimedOut", "ResolveFailure", "TmpResolveFailure" };
489 errTransient
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
491 if (errTransient
== false)
493 auto const reasons
= { "HashSumMismatch", "WeakHashSums", "MaximumSizeExceeded" };
494 errAuthErr
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
498 for (auto const Owner
: ItmOwners
)
500 if (errAuthErr
&& Owner
->GetExpectedHashes().empty() == false)
501 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
502 else if (errTransient
)
503 Owner
->Status
= pkgAcquire::Item::StatTransientNetworkError
;
505 if (isDoomedItem(Owner
) == false)
506 Owner
->Failed(Message
,Config
);
508 Log
->Fail(Owner
->GetItemDesc());
515 case MessageType::GENERAL_FAILURE
:
516 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
519 case MessageType::MEDIA_CHANGE
:
520 MediaChange(Message
);
527 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
528 // ---------------------------------------------------------------------
529 /* This parses the capabilities message and dumps it into the configuration
531 bool pkgAcquire::Worker::Capabilities(string Message
)
536 Config
->Version
= LookupTag(Message
,"Version");
537 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
538 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
539 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
540 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
541 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
542 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
547 clog
<< "Configured access method " << Config
->Access
<< endl
;
548 clog
<< "Version:" << Config
->Version
<<
549 " SingleInstance:" << Config
->SingleInstance
<<
550 " Pipeline:" << Config
->Pipeline
<<
551 " SendConfig:" << Config
->SendConfig
<<
552 " LocalOnly: " << Config
->LocalOnly
<<
553 " NeedsCleanup: " << Config
->NeedsCleanup
<<
554 " Removable: " << Config
->Removable
<< endl
;
560 // Worker::MediaChange - Request a media change /*{{{*/
561 // ---------------------------------------------------------------------
563 bool pkgAcquire::Worker::MediaChange(string Message
)
565 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
568 string Media
= LookupTag(Message
,"Media");
569 string Drive
= LookupTag(Message
,"Drive");
570 ostringstream msg
,status
;
571 ioprintf(msg
,_("Please insert the disc labeled: "
573 "in the drive '%s' and press [Enter]."),
574 Media
.c_str(),Drive
.c_str());
575 status
<< "media-change: " // message
576 << Media
<< ":" // media
577 << Drive
<< ":" // drive
578 << msg
.str() // l10n message
581 std::string
const dlstatus
= status
.str();
582 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
585 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
586 LookupTag(Message
,"Drive")) == false)
589 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
591 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
598 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
600 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
606 // Worker::SendConfiguration - Send the config to the method /*{{{*/
607 // ---------------------------------------------------------------------
609 bool pkgAcquire::Worker::SendConfiguration()
611 if (Config
->SendConfig
== false)
617 /* Write out all of the configuration directives by walking the
618 configuration tree */
619 std::ostringstream Message
;
620 Message
<< "601 Configuration\n";
621 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
625 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
626 OutQueue
+= Message
.str();
632 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
633 // ---------------------------------------------------------------------
634 /* Send a URI Acquire message to the method */
635 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
640 HashStringList
const hsl
= Item
->GetExpectedHashes();
642 if (isDoomedItem(Item
->Owner
))
645 if (hsl
.usable() == false && Item
->Owner
->HashesRequired() &&
646 _config
->Exists("Acquire::ForceHash") == false)
648 std::string
const Message
= "400 URI Failure"
649 "\nURI: " + Item
->URI
+
650 "\nFilename: " + Item
->Owner
->DestFile
+
651 "\nFailReason: WeakHashSums";
653 auto const ItmOwners
= Item
->Owners
;
654 for (auto &O
: ItmOwners
)
656 O
->Status
= pkgAcquire::Item::StatAuthError
;
657 O
->Failed(Message
, Config
);
659 Log
->Fail(O
->GetItemDesc());
661 // "queued" successfully, the item just instantly failed
665 string Message
= "600 URI Acquire\n";
666 Message
.reserve(300);
667 Message
+= "URI: " + Item
->URI
;
668 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
670 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
671 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
673 if (hsl
.FileSize() == 0)
675 unsigned long long FileSize
= Item
->GetMaximumSize();
679 strprintf(MaximumSize
, "%llu", FileSize
);
680 Message
+= "\nMaximum-Size: " + MaximumSize
;
684 Item
->SyncDestinationFiles();
685 Message
+= Item
->Custom600Headers();
688 if (RealFileExists(Item
->Owner
->DestFile
))
690 std::string
const SandboxUser
= _config
->Find("APT::Sandbox::User");
691 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
692 SandboxUser
.c_str(), "root", 0600);
696 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
703 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
704 // ---------------------------------------------------------------------
706 bool pkgAcquire::Worker::OutFdReady()
711 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
713 while (Res
< 0 && errno
== EINTR
);
716 return MethodFailure();
718 OutQueue
.erase(0,Res
);
719 if (OutQueue
.empty() == true)
725 // Worker::InFdRead - In bound FD is ready /*{{{*/
726 // ---------------------------------------------------------------------
728 bool pkgAcquire::Worker::InFdReady()
730 if (ReadMessages() == false)
736 // Worker::MethodFailure - Called when the method fails /*{{{*/
737 // ---------------------------------------------------------------------
738 /* This is called when the method is believed to have failed, probably because
740 bool pkgAcquire::Worker::MethodFailure()
742 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
744 // do not reap the child here to show meaningfull error to the user
745 ExecWait(Process
,Access
.c_str(),false);
754 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
759 // Worker::Pulse - Called periodically /*{{{*/
760 // ---------------------------------------------------------------------
762 void pkgAcquire::Worker::Pulse()
764 if (CurrentItem
== 0)
768 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
770 CurrentSize
= Buf
.st_size
;
773 // Worker::ItemDone - Called when the current item is finished /*{{{*/
774 // ---------------------------------------------------------------------
776 void pkgAcquire::Worker::ItemDone()
784 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
786 if (RealFileExists(Itm
->Owner
->DestFile
))
788 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
789 std::string
const filename
= Itm
->Owner
->DestFile
;
790 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
792 pkgAcquire::Item
const * const Owner
= *O
;
793 if (Owner
->DestFile
== filename
|| filename
== "/dev/null")
795 RemoveFile("PrepareFiles", Owner
->DestFile
);
796 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
798 // different mounts can't happen for us as we download to lists/ by default,
799 // but if the system is reused by others the locations can potentially be on
800 // different disks, so use symlink as poor-men replacement.
801 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
802 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
803 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
809 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
810 RemoveFile("PrepareFiles", (*O
)->DestFile
);