1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
44 // Worker::Worker - Constructor for Queue startup /*{{{*/
45 pkgAcquire::Worker::Worker(Queue
*Q
, MethodConfig
*Cnf
, pkgAcquireStatus
*log
) :
46 d(NULL
), OwnerQ(Q
), Log(log
), Config(Cnf
), Access(Cnf
->Access
),
47 CurrentItem(nullptr), CurrentSize(0), TotalSize(0)
52 // Worker::Worker - Constructor for method config startup /*{{{*/
53 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : Worker(nullptr, Cnf
, nullptr)
57 // Worker::Construct - Constructor helper /*{{{*/
58 // ---------------------------------------------------------------------
60 void pkgAcquire::Worker::Construct()
69 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
72 // Worker::~Worker - Destructor /*{{{*/
73 // ---------------------------------------------------------------------
75 pkgAcquire::Worker::~Worker()
82 /* Closing of stdin is the signal to exit and die when the process
83 indicates it needs cleanup */
84 if (Config
->NeedsCleanup
== false)
86 ExecWait(Process
,Access
.c_str(),true);
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
95 // Get the method path
96 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
97 if (FileExists(Method
) == false)
99 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
100 if (Access
== "https")
101 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
106 clog
<< "Starting method '" << Method
<< '\'' << endl
;
109 int Pipes
[4] = {-1,-1,-1,-1};
110 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
112 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
113 for (int I
= 0; I
!= 4; I
++)
117 for (int I
= 0; I
!= 4; I
++)
118 SetCloseExec(Pipes
[I
],true);
120 // Fork off the process
121 Process
= ExecFork();
125 dup2(Pipes
[1],STDOUT_FILENO
);
126 dup2(Pipes
[2],STDIN_FILENO
);
127 SetCloseExec(STDOUT_FILENO
,false);
128 SetCloseExec(STDIN_FILENO
,false);
129 SetCloseExec(STDERR_FILENO
,false);
132 Args
[0] = Method
.c_str();
134 execv(Args
[0],(char **)Args
);
135 cerr
<< "Failed to exec method " << Args
[0] << endl
;
142 SetNonBlock(Pipes
[0],true);
143 SetNonBlock(Pipes
[3],true);
149 // Read the configuration data
150 if (WaitFd(InFd
) == false ||
151 ReadMessages() == false)
152 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
161 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
162 // ---------------------------------------------------------------------
164 bool pkgAcquire::Worker::ReadMessages()
166 if (::ReadMessages(InFd
,MessageQueue
) == false)
167 return MethodFailure();
171 // Worker::RunMessage - Empty the message queue /*{{{*/
172 // ---------------------------------------------------------------------
173 /* This takes the messages from the message queue and runs them through
174 the parsers in order. */
175 enum class APT_HIDDEN MessageType
{
184 GENERAL_FAILURE
= 401,
187 static bool isDoomedItem(pkgAcquire::Item
const * const Itm
)
189 auto const TransItm
= dynamic_cast<pkgAcqTransactionItem
const * const>(Itm
);
190 if (TransItm
== nullptr)
192 return TransItm
->TransactionManager
->State
!= pkgAcqTransactionItem::TransactionStarted
;
194 bool pkgAcquire::Worker::RunMessages()
196 while (MessageQueue
.empty() == false)
198 string Message
= MessageQueue
.front();
199 MessageQueue
.erase(MessageQueue
.begin());
202 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
204 // Fetch the message number
206 MessageType
const Number
= static_cast<MessageType
>(strtoul(Message
.c_str(),&End
,10));
207 if (End
== Message
.c_str())
208 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
210 string URI
= LookupTag(Message
,"URI");
211 pkgAcquire::Queue::QItem
*Itm
= NULL
;
212 if (URI
.empty() == false)
213 Itm
= OwnerQ
->FindItem(URI
,this);
217 // update used mirror
218 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
219 if (UsedMirror
.empty() == false)
221 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
222 (*O
)->UsedMirror
= UsedMirror
;
224 if (Itm
->Description
.find(" ") != string::npos
)
225 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
229 // Determine the message number and dispatch
232 case MessageType::CAPABILITIES
:
233 if (Capabilities(Message
) == false)
234 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
237 case MessageType::LOG
:
239 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
242 case MessageType::STATUS
:
243 Status
= LookupTag(Message
,"Message");
246 case MessageType::REDIRECT
:
250 _error
->Error("Method gave invalid 103 Redirect message");
254 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
259 // Change the status so that it can be dequeued
260 for (auto const &O
: Itm
->Owners
)
261 O
->Status
= pkgAcquire::Item::StatIdle
;
262 // Mark the item as done (taking care of all queues)
263 // and then put it in the main queue again
264 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
265 OwnerQ
->ItemDone(Itm
);
267 for (auto const &Owner
: ItmOwners
)
269 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
273 // if we change site, treat it as a mirror change
274 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
276 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
277 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
279 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
280 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
282 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
283 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
284 if (desc
.Description
.find(" ") != string::npos
)
285 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
290 if (isDoomedItem(Owner
) == false)
291 OwnerQ
->Owner
->Enqueue(desc
);
296 case MessageType::WARNING
:
297 _error
->Warning("%s: %s", Itm
->Owner
->DescURI().c_str(), LookupTag(Message
,"Message").c_str());
300 case MessageType::URI_START
:
304 _error
->Error("Method gave invalid 200 URI Start message");
310 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
311 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
312 for (auto const Owner
: Itm
->Owners
)
314 Owner
->Start(Message
, TotalSize
);
315 // Display update before completion
318 if (Log
->MorePulses
== true)
319 Log
->Pulse(Owner
->GetOwner());
320 Log
->Fetch(Owner
->GetItemDesc());
327 case MessageType::URI_DONE
:
331 _error
->Error("Method gave invalid 201 URI Done message");
335 PrepareFiles("201::URIDone", Itm
);
337 // Display update before completion
338 if (Log
!= 0 && Log
->MorePulses
== true)
339 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
340 Log
->Pulse((*O
)->GetOwner());
342 HashStringList ReceivedHashes
;
344 std::string
const givenfilename
= LookupTag(Message
, "Filename");
345 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
346 // see if we got hashes to verify
347 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
349 std::string
const tagname
= std::string(*type
) + "-Hash";
350 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
351 if (hashsum
.empty() == false)
352 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
354 // not all methods always sent Hashes our way
355 if (ReceivedHashes
.usable() == false)
357 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
358 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
360 Hashes
calc(ExpectedHashes
);
361 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
363 ReceivedHashes
= calc
.GetHashStringList();
367 // only local files can refer other filenames and counting them as fetched would be unfair
368 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
369 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
372 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
373 OwnerQ
->ItemDone(Itm
);
376 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
377 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
378 for (auto const Owner
: ItmOwners
)
380 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
381 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
383 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
384 << "ReceivedHash:" << endl
;
385 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
386 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
387 std::clog
<< "ExpectedHash:" << endl
;
388 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
389 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
393 // decide if what we got is what we expected
394 bool consideredOkay
= false;
395 if (ExpectedHashes
.usable())
397 if (ReceivedHashes
.usable() == false)
399 /* IMS-Hits can't be checked here as we will have uncompressed file,
400 but the hashes for the compressed file. What we have was good through
401 so all we have to ensure later is that we are not stalled. */
402 consideredOkay
= isIMSHit
;
404 else if (ReceivedHashes
== ExpectedHashes
)
405 consideredOkay
= true;
407 consideredOkay
= false;
410 else if (Owner
->HashesRequired() == true)
411 consideredOkay
= false;
414 consideredOkay
= true;
415 // even if the hashes aren't usable to declare something secure
416 // we can at least use them to declare it an integrity failure
417 if (ExpectedHashes
.empty() == false && ReceivedHashes
!= ExpectedHashes
&& _config
->Find("Acquire::ForceHash").empty())
418 consideredOkay
= false;
421 if (consideredOkay
== true)
422 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
423 else // hashsum mismatch
424 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
427 if (consideredOkay
== true)
429 if (isDoomedItem(Owner
) == false)
430 Owner
->Done(Message
, ReceivedHashes
, Config
);
434 Log
->IMSHit(Owner
->GetItemDesc());
436 Log
->Done(Owner
->GetItemDesc());
441 if (isDoomedItem(Owner
) == false)
442 Owner
->Failed(Message
,Config
);
444 Log
->Fail(Owner
->GetItemDesc());
451 case MessageType::URI_FAILURE
:
455 std::string
const msg
= LookupTag(Message
,"Message");
456 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
460 PrepareFiles("400::URIFailure", Itm
);
462 // Display update before completion
463 if (Log
!= nullptr && Log
->MorePulses
== true)
464 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
465 Log
->Pulse((*O
)->GetOwner());
467 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
468 OwnerQ
->ItemDone(Itm
);
473 std::string
const failReason
= LookupTag(Message
, "FailReason");
474 std::string
const reasons
[] = { "Timeout", "ConnectionRefused",
475 "ConnectionTimedOut", "ResolveFailure", "TmpResolveFailure" };
476 errTransient
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
479 for (auto const Owner
: ItmOwners
)
482 Owner
->Status
= pkgAcquire::Item::StatTransientNetworkError
;
483 if (isDoomedItem(Owner
) == false)
484 Owner
->Failed(Message
,Config
);
486 Log
->Fail(Owner
->GetItemDesc());
493 case MessageType::GENERAL_FAILURE
:
494 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
497 case MessageType::MEDIA_CHANGE
:
498 MediaChange(Message
);
505 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
506 // ---------------------------------------------------------------------
507 /* This parses the capabilities message and dumps it into the configuration
509 bool pkgAcquire::Worker::Capabilities(string Message
)
514 Config
->Version
= LookupTag(Message
,"Version");
515 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
516 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
517 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
518 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
519 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
520 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
525 clog
<< "Configured access method " << Config
->Access
<< endl
;
526 clog
<< "Version:" << Config
->Version
<<
527 " SingleInstance:" << Config
->SingleInstance
<<
528 " Pipeline:" << Config
->Pipeline
<<
529 " SendConfig:" << Config
->SendConfig
<<
530 " LocalOnly: " << Config
->LocalOnly
<<
531 " NeedsCleanup: " << Config
->NeedsCleanup
<<
532 " Removable: " << Config
->Removable
<< endl
;
538 // Worker::MediaChange - Request a media change /*{{{*/
539 // ---------------------------------------------------------------------
541 bool pkgAcquire::Worker::MediaChange(string Message
)
543 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
546 string Media
= LookupTag(Message
,"Media");
547 string Drive
= LookupTag(Message
,"Drive");
548 ostringstream msg
,status
;
549 ioprintf(msg
,_("Please insert the disc labeled: "
551 "in the drive '%s' and press [Enter]."),
552 Media
.c_str(),Drive
.c_str());
553 status
<< "media-change: " // message
554 << Media
<< ":" // media
555 << Drive
<< ":" // drive
556 << msg
.str() // l10n message
559 std::string
const dlstatus
= status
.str();
560 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
563 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
564 LookupTag(Message
,"Drive")) == false)
567 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
569 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
576 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
578 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
584 // Worker::SendConfiguration - Send the config to the method /*{{{*/
585 // ---------------------------------------------------------------------
587 bool pkgAcquire::Worker::SendConfiguration()
589 if (Config
->SendConfig
== false)
595 /* Write out all of the configuration directives by walking the
596 configuration tree */
597 std::ostringstream Message
;
598 Message
<< "601 Configuration\n";
599 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
603 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
604 OutQueue
+= Message
.str();
610 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
611 // ---------------------------------------------------------------------
612 /* Send a URI Acquire message to the method */
613 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
618 string Message
= "600 URI Acquire\n";
619 Message
.reserve(300);
620 Message
+= "URI: " + Item
->URI
;
621 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
623 HashStringList
const hsl
= Item
->GetExpectedHashes();
624 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
625 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
627 if (hsl
.FileSize() == 0)
629 unsigned long long FileSize
= Item
->GetMaximumSize();
633 strprintf(MaximumSize
, "%llu", FileSize
);
634 Message
+= "\nMaximum-Size: " + MaximumSize
;
638 Item
->SyncDestinationFiles();
639 Message
+= Item
->Custom600Headers();
642 if (RealFileExists(Item
->Owner
->DestFile
))
644 std::string
const SandboxUser
= _config
->Find("APT::Sandbox::User");
645 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
646 SandboxUser
.c_str(), "root", 0600);
650 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
657 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
658 // ---------------------------------------------------------------------
660 bool pkgAcquire::Worker::OutFdReady()
665 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
667 while (Res
< 0 && errno
== EINTR
);
670 return MethodFailure();
672 OutQueue
.erase(0,Res
);
673 if (OutQueue
.empty() == true)
679 // Worker::InFdRead - In bound FD is ready /*{{{*/
680 // ---------------------------------------------------------------------
682 bool pkgAcquire::Worker::InFdReady()
684 if (ReadMessages() == false)
690 // Worker::MethodFailure - Called when the method fails /*{{{*/
691 // ---------------------------------------------------------------------
692 /* This is called when the method is believed to have failed, probably because
694 bool pkgAcquire::Worker::MethodFailure()
696 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
698 // do not reap the child here to show meaningfull error to the user
699 ExecWait(Process
,Access
.c_str(),false);
708 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
713 // Worker::Pulse - Called periodically /*{{{*/
714 // ---------------------------------------------------------------------
716 void pkgAcquire::Worker::Pulse()
718 if (CurrentItem
== 0)
722 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
724 CurrentSize
= Buf
.st_size
;
727 // Worker::ItemDone - Called when the current item is finished /*{{{*/
728 // ---------------------------------------------------------------------
730 void pkgAcquire::Worker::ItemDone()
738 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
740 if (RealFileExists(Itm
->Owner
->DestFile
))
742 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
743 std::string
const filename
= Itm
->Owner
->DestFile
;
744 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
746 pkgAcquire::Item
const * const Owner
= *O
;
747 if (Owner
->DestFile
== filename
|| filename
== "/dev/null")
749 RemoveFile("PrepareFiles", Owner
->DestFile
);
750 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
752 // different mounts can't happen for us as we download to lists/ by default,
753 // but if the system is reused by others the locations can potentially be on
754 // different disks, so use symlink as poor-men replacement.
755 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
756 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
757 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
763 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
764 RemoveFile("PrepareFiles", (*O
)->DestFile
);