1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
44 // Worker::Worker - Constructor for Queue startup /*{{{*/
45 pkgAcquire::Worker::Worker(Queue
*Q
, MethodConfig
*Cnf
, pkgAcquireStatus
*log
) :
46 d(NULL
), OwnerQ(Q
), Log(log
), Config(Cnf
), Access(Cnf
->Access
),
47 CurrentItem(nullptr), CurrentSize(0), TotalSize(0)
52 // Worker::Worker - Constructor for method config startup /*{{{*/
53 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : Worker(nullptr, Cnf
, nullptr)
57 // Worker::Construct - Constructor helper /*{{{*/
58 // ---------------------------------------------------------------------
60 void pkgAcquire::Worker::Construct()
69 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
72 // Worker::~Worker - Destructor /*{{{*/
73 // ---------------------------------------------------------------------
75 pkgAcquire::Worker::~Worker()
82 /* Closing of stdin is the signal to exit and die when the process
83 indicates it needs cleanup */
84 if (Config
->NeedsCleanup
== false)
86 ExecWait(Process
,Access
.c_str(),true);
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
95 // Get the method path
96 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
97 if (FileExists(Method
) == false)
99 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
100 std::string
const A(Access
.cbegin(), std::find(Access
.cbegin(), Access
.cend(), '+'));
102 strprintf(pkg
, "apt-transport-%s", A
.c_str());
103 _error
->Notice(_("Is the package %s installed?"), pkg
.c_str());
108 clog
<< "Starting method '" << Method
<< '\'' << endl
;
111 int Pipes
[4] = {-1,-1,-1,-1};
112 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
114 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
115 for (int I
= 0; I
!= 4; I
++)
119 for (int I
= 0; I
!= 4; I
++)
120 SetCloseExec(Pipes
[I
],true);
122 // Fork off the process
123 Process
= ExecFork();
127 dup2(Pipes
[1],STDOUT_FILENO
);
128 dup2(Pipes
[2],STDIN_FILENO
);
129 SetCloseExec(STDOUT_FILENO
,false);
130 SetCloseExec(STDIN_FILENO
,false);
131 SetCloseExec(STDERR_FILENO
,false);
134 Args
[0] = Method
.c_str();
136 execv(Args
[0],(char **)Args
);
137 cerr
<< "Failed to exec method " << Args
[0] << endl
;
144 SetNonBlock(Pipes
[0],true);
145 SetNonBlock(Pipes
[3],true);
151 // Read the configuration data
152 if (WaitFd(InFd
) == false ||
153 ReadMessages() == false)
154 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
163 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
164 // ---------------------------------------------------------------------
166 bool pkgAcquire::Worker::ReadMessages()
168 if (::ReadMessages(InFd
,MessageQueue
) == false)
169 return MethodFailure();
173 // Worker::RunMessage - Empty the message queue /*{{{*/
174 // ---------------------------------------------------------------------
175 /* This takes the messages from the message queue and runs them through
176 the parsers in order. */
177 enum class APT_HIDDEN MessageType
{
186 GENERAL_FAILURE
= 401,
189 static bool isDoomedItem(pkgAcquire::Item
const * const Itm
)
191 auto const TransItm
= dynamic_cast<pkgAcqTransactionItem
const * const>(Itm
);
192 if (TransItm
== nullptr)
194 return TransItm
->TransactionManager
->State
!= pkgAcqTransactionItem::TransactionStarted
;
196 bool pkgAcquire::Worker::RunMessages()
198 while (MessageQueue
.empty() == false)
200 string Message
= MessageQueue
.front();
201 MessageQueue
.erase(MessageQueue
.begin());
204 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
206 // Fetch the message number
208 MessageType
const Number
= static_cast<MessageType
>(strtoul(Message
.c_str(),&End
,10));
209 if (End
== Message
.c_str())
210 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
212 string URI
= LookupTag(Message
,"URI");
213 pkgAcquire::Queue::QItem
*Itm
= NULL
;
214 if (URI
.empty() == false)
215 Itm
= OwnerQ
->FindItem(URI
,this);
219 // update used mirror
220 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
221 if (UsedMirror
.empty() == false)
223 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
224 (*O
)->UsedMirror
= UsedMirror
;
226 if (Itm
->Description
.find(" ") != string::npos
)
227 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
231 // Determine the message number and dispatch
234 case MessageType::CAPABILITIES
:
235 if (Capabilities(Message
) == false)
236 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
239 case MessageType::LOG
:
241 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
244 case MessageType::STATUS
:
245 Status
= LookupTag(Message
,"Message");
248 case MessageType::REDIRECT
:
252 _error
->Error("Method gave invalid 103 Redirect message");
256 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
261 // Change the status so that it can be dequeued
262 for (auto const &O
: Itm
->Owners
)
263 O
->Status
= pkgAcquire::Item::StatIdle
;
264 // Mark the item as done (taking care of all queues)
265 // and then put it in the main queue again
266 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
267 OwnerQ
->ItemDone(Itm
);
269 for (auto const &Owner
: ItmOwners
)
271 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
275 // if we change site, treat it as a mirror change
276 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
278 auto const firstSpace
= desc
.Description
.find(" ");
279 if (firstSpace
!= std::string::npos
)
281 std::string
const OldSite
= desc
.Description
.substr(0, firstSpace
);
282 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
284 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
285 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
287 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
288 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
289 desc
.Description
.replace(0, firstSpace
, Owner
->UsedMirror
);
295 if (isDoomedItem(Owner
) == false)
296 OwnerQ
->Owner
->Enqueue(desc
);
301 case MessageType::WARNING
:
302 _error
->Warning("%s: %s", Itm
->Owner
->DescURI().c_str(), LookupTag(Message
,"Message").c_str());
305 case MessageType::URI_START
:
309 _error
->Error("Method gave invalid 200 URI Start message");
315 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
316 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
317 for (auto const Owner
: Itm
->Owners
)
319 Owner
->Start(Message
, TotalSize
);
320 // Display update before completion
323 if (Log
->MorePulses
== true)
324 Log
->Pulse(Owner
->GetOwner());
325 Log
->Fetch(Owner
->GetItemDesc());
332 case MessageType::URI_DONE
:
336 _error
->Error("Method gave invalid 201 URI Done message");
340 PrepareFiles("201::URIDone", Itm
);
342 // Display update before completion
343 if (Log
!= 0 && Log
->MorePulses
== true)
344 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
345 Log
->Pulse((*O
)->GetOwner());
347 HashStringList ReceivedHashes
;
349 std::string
const givenfilename
= LookupTag(Message
, "Filename");
350 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
351 // see if we got hashes to verify
352 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
354 std::string
const tagname
= std::string(*type
) + "-Hash";
355 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
356 if (hashsum
.empty() == false)
357 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
359 // not all methods always sent Hashes our way
360 if (ReceivedHashes
.usable() == false)
362 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
363 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
365 Hashes
calc(ExpectedHashes
);
366 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
368 ReceivedHashes
= calc
.GetHashStringList();
372 // only local files can refer other filenames and counting them as fetched would be unfair
373 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
374 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
377 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
378 OwnerQ
->ItemDone(Itm
);
381 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
382 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
383 auto const forcedHash
= _config
->Find("Acquire::ForceHash");
384 for (auto const Owner
: ItmOwners
)
386 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
387 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
389 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
390 << "ReceivedHash:" << endl
;
391 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
392 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
393 std::clog
<< "ExpectedHash:" << endl
;
394 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
395 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
399 // decide if what we got is what we expected
400 bool consideredOkay
= false;
401 if ((forcedHash
.empty() && ExpectedHashes
.empty() == false) ||
402 (forcedHash
.empty() == false && ExpectedHashes
.usable()))
404 if (ReceivedHashes
.empty())
406 /* IMS-Hits can't be checked here as we will have uncompressed file,
407 but the hashes for the compressed file. What we have was good through
408 so all we have to ensure later is that we are not stalled. */
409 consideredOkay
= isIMSHit
;
411 else if (ReceivedHashes
== ExpectedHashes
)
412 consideredOkay
= true;
414 consideredOkay
= false;
418 consideredOkay
= !Owner
->HashesRequired();
420 if (consideredOkay
== true)
421 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
422 else // hashsum mismatch
423 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
426 if (consideredOkay
== true)
428 if (isDoomedItem(Owner
) == false)
429 Owner
->Done(Message
, ReceivedHashes
, Config
);
433 Log
->IMSHit(Owner
->GetItemDesc());
435 Log
->Done(Owner
->GetItemDesc());
440 if (isDoomedItem(Owner
) == false)
442 if (Message
.find("\nFailReason:") == std::string::npos
)
444 if (ReceivedHashes
!= ExpectedHashes
)
445 Message
.append("\nFailReason: HashSumMismatch");
447 Message
.append("\nFailReason: WeakHashSums");
449 Owner
->Failed(Message
,Config
);
452 Log
->Fail(Owner
->GetItemDesc());
459 case MessageType::URI_FAILURE
:
463 std::string
const msg
= LookupTag(Message
,"Message");
464 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
468 PrepareFiles("400::URIFailure", Itm
);
470 // Display update before completion
471 if (Log
!= nullptr && Log
->MorePulses
== true)
472 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
473 Log
->Pulse((*O
)->GetOwner());
475 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
476 OwnerQ
->ItemDone(Itm
);
479 bool errTransient
= false, errAuthErr
= false;
481 std::string
const failReason
= LookupTag(Message
, "FailReason");
483 auto const reasons
= { "Timeout", "ConnectionRefused",
484 "ConnectionTimedOut", "ResolveFailure", "TmpResolveFailure" };
485 errTransient
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
487 if (errTransient
== false)
489 auto const reasons
= { "HashSumMismatch", "WeakHashSums", "MaximumSizeExceeded" };
490 errAuthErr
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
494 for (auto const Owner
: ItmOwners
)
496 if (errAuthErr
&& Owner
->GetExpectedHashes().empty() == false)
497 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
498 else if (errTransient
)
499 Owner
->Status
= pkgAcquire::Item::StatTransientNetworkError
;
501 if (isDoomedItem(Owner
) == false)
502 Owner
->Failed(Message
,Config
);
504 Log
->Fail(Owner
->GetItemDesc());
511 case MessageType::GENERAL_FAILURE
:
512 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
515 case MessageType::MEDIA_CHANGE
:
516 MediaChange(Message
);
523 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
524 // ---------------------------------------------------------------------
525 /* This parses the capabilities message and dumps it into the configuration
527 bool pkgAcquire::Worker::Capabilities(string Message
)
532 Config
->Version
= LookupTag(Message
,"Version");
533 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
534 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
535 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
536 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
537 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
538 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
543 clog
<< "Configured access method " << Config
->Access
<< endl
;
544 clog
<< "Version:" << Config
->Version
<<
545 " SingleInstance:" << Config
->SingleInstance
<<
546 " Pipeline:" << Config
->Pipeline
<<
547 " SendConfig:" << Config
->SendConfig
<<
548 " LocalOnly: " << Config
->LocalOnly
<<
549 " NeedsCleanup: " << Config
->NeedsCleanup
<<
550 " Removable: " << Config
->Removable
<< endl
;
556 // Worker::MediaChange - Request a media change /*{{{*/
557 // ---------------------------------------------------------------------
559 bool pkgAcquire::Worker::MediaChange(string Message
)
561 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
564 string Media
= LookupTag(Message
,"Media");
565 string Drive
= LookupTag(Message
,"Drive");
566 ostringstream msg
,status
;
567 ioprintf(msg
,_("Please insert the disc labeled: "
569 "in the drive '%s' and press [Enter]."),
570 Media
.c_str(),Drive
.c_str());
571 status
<< "media-change: " // message
572 << Media
<< ":" // media
573 << Drive
<< ":" // drive
574 << msg
.str() // l10n message
577 std::string
const dlstatus
= status
.str();
578 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
581 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
582 LookupTag(Message
,"Drive")) == false)
585 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
587 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
594 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
596 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
602 // Worker::SendConfiguration - Send the config to the method /*{{{*/
603 // ---------------------------------------------------------------------
605 bool pkgAcquire::Worker::SendConfiguration()
607 if (Config
->SendConfig
== false)
613 /* Write out all of the configuration directives by walking the
614 configuration tree */
615 std::ostringstream Message
;
616 Message
<< "601 Configuration\n";
617 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
621 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
622 OutQueue
+= Message
.str();
628 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
629 // ---------------------------------------------------------------------
630 /* Send a URI Acquire message to the method */
631 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
636 HashStringList
const hsl
= Item
->GetExpectedHashes();
638 if (isDoomedItem(Item
->Owner
))
641 if (hsl
.usable() == false && Item
->Owner
->HashesRequired() &&
642 _config
->Exists("Acquire::ForceHash") == false)
644 std::string
const Message
= "400 URI Failure"
645 "\nURI: " + Item
->URI
+
646 "\nFilename: " + Item
->Owner
->DestFile
+
647 "\nFailReason: WeakHashSums";
649 auto const ItmOwners
= Item
->Owners
;
650 for (auto &O
: ItmOwners
)
652 O
->Status
= pkgAcquire::Item::StatAuthError
;
653 O
->Failed(Message
, Config
);
655 Log
->Fail(O
->GetItemDesc());
657 // "queued" successfully, the item just instantly failed
661 string Message
= "600 URI Acquire\n";
662 Message
.reserve(300);
663 Message
+= "URI: " + Item
->URI
;
664 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
666 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
667 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
669 if (hsl
.FileSize() == 0)
671 unsigned long long FileSize
= Item
->GetMaximumSize();
675 strprintf(MaximumSize
, "%llu", FileSize
);
676 Message
+= "\nMaximum-Size: " + MaximumSize
;
680 Item
->SyncDestinationFiles();
681 Message
+= Item
->Custom600Headers();
684 if (RealFileExists(Item
->Owner
->DestFile
))
686 std::string
const SandboxUser
= _config
->Find("APT::Sandbox::User");
687 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
688 SandboxUser
.c_str(), "root", 0600);
692 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
699 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
700 // ---------------------------------------------------------------------
702 bool pkgAcquire::Worker::OutFdReady()
707 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
709 while (Res
< 0 && errno
== EINTR
);
712 return MethodFailure();
714 OutQueue
.erase(0,Res
);
715 if (OutQueue
.empty() == true)
721 // Worker::InFdRead - In bound FD is ready /*{{{*/
722 // ---------------------------------------------------------------------
724 bool pkgAcquire::Worker::InFdReady()
726 if (ReadMessages() == false)
732 // Worker::MethodFailure - Called when the method fails /*{{{*/
733 // ---------------------------------------------------------------------
734 /* This is called when the method is believed to have failed, probably because
736 bool pkgAcquire::Worker::MethodFailure()
738 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
740 // do not reap the child here to show meaningfull error to the user
741 ExecWait(Process
,Access
.c_str(),false);
750 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
755 // Worker::Pulse - Called periodically /*{{{*/
756 // ---------------------------------------------------------------------
758 void pkgAcquire::Worker::Pulse()
760 if (CurrentItem
== 0)
764 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
766 CurrentSize
= Buf
.st_size
;
769 // Worker::ItemDone - Called when the current item is finished /*{{{*/
770 // ---------------------------------------------------------------------
772 void pkgAcquire::Worker::ItemDone()
780 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
782 if (RealFileExists(Itm
->Owner
->DestFile
))
784 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
785 std::string
const filename
= Itm
->Owner
->DestFile
;
786 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
788 pkgAcquire::Item
const * const Owner
= *O
;
789 if (Owner
->DestFile
== filename
|| filename
== "/dev/null")
791 RemoveFile("PrepareFiles", Owner
->DestFile
);
792 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
794 // different mounts can't happen for us as we download to lists/ by default,
795 // but if the system is reused by others the locations can potentially be on
796 // different disks, so use symlink as poor-men replacement.
797 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
798 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
799 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
805 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
806 RemoveFile("PrepareFiles", (*O
)->DestFile
);