1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
44 // Worker::Worker - Constructor for Queue startup /*{{{*/
45 pkgAcquire::Worker::Worker(Queue
*Q
, MethodConfig
*Cnf
, pkgAcquireStatus
*log
) :
46 d(NULL
), OwnerQ(Q
), Log(log
), Config(Cnf
), Access(Cnf
->Access
),
47 CurrentItem(nullptr), CurrentSize(0), TotalSize(0)
52 // Worker::Worker - Constructor for method config startup /*{{{*/
53 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : Worker(nullptr, Cnf
, nullptr)
57 // Worker::Construct - Constructor helper /*{{{*/
58 // ---------------------------------------------------------------------
60 void pkgAcquire::Worker::Construct()
69 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
72 // Worker::~Worker - Destructor /*{{{*/
73 // ---------------------------------------------------------------------
75 pkgAcquire::Worker::~Worker()
82 /* Closing of stdin is the signal to exit and die when the process
83 indicates it needs cleanup */
84 if (Config
->NeedsCleanup
== false)
86 ExecWait(Process
,Access
.c_str(),true);
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
95 // Get the method path
96 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
97 if (FileExists(Method
) == false)
99 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
100 if (Access
== "https")
101 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
106 clog
<< "Starting method '" << Method
<< '\'' << endl
;
109 int Pipes
[4] = {-1,-1,-1,-1};
110 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
112 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
113 for (int I
= 0; I
!= 4; I
++)
117 for (int I
= 0; I
!= 4; I
++)
118 SetCloseExec(Pipes
[I
],true);
120 // Fork off the process
121 Process
= ExecFork();
125 dup2(Pipes
[1],STDOUT_FILENO
);
126 dup2(Pipes
[2],STDIN_FILENO
);
127 SetCloseExec(STDOUT_FILENO
,false);
128 SetCloseExec(STDIN_FILENO
,false);
129 SetCloseExec(STDERR_FILENO
,false);
132 Args
[0] = Method
.c_str();
134 execv(Args
[0],(char **)Args
);
135 cerr
<< "Failed to exec method " << Args
[0] << endl
;
142 SetNonBlock(Pipes
[0],true);
143 SetNonBlock(Pipes
[3],true);
149 // Read the configuration data
150 if (WaitFd(InFd
) == false ||
151 ReadMessages() == false)
152 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
161 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
162 // ---------------------------------------------------------------------
164 bool pkgAcquire::Worker::ReadMessages()
166 if (::ReadMessages(InFd
,MessageQueue
) == false)
167 return MethodFailure();
171 // Worker::RunMessage - Empty the message queue /*{{{*/
172 // ---------------------------------------------------------------------
173 /* This takes the messages from the message queue and runs them through
174 the parsers in order. */
175 bool pkgAcquire::Worker::RunMessages()
177 while (MessageQueue
.empty() == false)
179 string Message
= MessageQueue
.front();
180 MessageQueue
.erase(MessageQueue
.begin());
183 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
185 // Fetch the message number
187 int Number
= strtol(Message
.c_str(),&End
,10);
188 if (End
== Message
.c_str())
189 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
191 string URI
= LookupTag(Message
,"URI");
192 pkgAcquire::Queue::QItem
*Itm
= NULL
;
193 if (URI
.empty() == false)
194 Itm
= OwnerQ
->FindItem(URI
,this);
198 // update used mirror
199 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
200 if (UsedMirror
.empty() == false)
202 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
203 (*O
)->UsedMirror
= UsedMirror
;
205 if (Itm
->Description
.find(" ") != string::npos
)
206 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
210 // Determine the message number and dispatch
215 if (Capabilities(Message
) == false)
216 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
222 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
227 Status
= LookupTag(Message
,"Message");
235 _error
->Error("Method gave invalid 103 Redirect message");
239 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
244 // Change the status so that it can be dequeued
245 for (auto const &O
: Itm
->Owners
)
246 O
->Status
= pkgAcquire::Item::StatIdle
;
247 // Mark the item as done (taking care of all queues)
248 // and then put it in the main queue again
249 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
250 OwnerQ
->ItemDone(Itm
);
252 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
254 pkgAcquire::Item
*Owner
= *O
;
255 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
256 // if we change site, treat it as a mirror change
257 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
259 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
260 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
262 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
263 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
265 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
266 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
267 if (desc
.Description
.find(" ") != string::npos
)
268 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
273 OwnerQ
->Owner
->Enqueue(desc
);
282 _error
->Warning("%s: %s", Itm
->Owner
->DescURI().c_str(), LookupTag(Message
,"Message").c_str());
290 _error
->Error("Method gave invalid 200 URI Start message");
296 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
297 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
298 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
300 (*O
)->Start(Message
, TotalSize
);
302 // Display update before completion
305 if (Log
->MorePulses
== true)
306 Log
->Pulse((*O
)->GetOwner());
307 Log
->Fetch((*O
)->GetItemDesc());
319 _error
->Error("Method gave invalid 201 URI Done message");
323 PrepareFiles("201::URIDone", Itm
);
325 // Display update before completion
326 if (Log
!= 0 && Log
->MorePulses
== true)
327 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
328 Log
->Pulse((*O
)->GetOwner());
330 HashStringList ReceivedHashes
;
332 std::string
const givenfilename
= LookupTag(Message
, "Filename");
333 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
334 // see if we got hashes to verify
335 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
337 std::string
const tagname
= std::string(*type
) + "-Hash";
338 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
339 if (hashsum
.empty() == false)
340 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
342 // not all methods always sent Hashes our way
343 if (ReceivedHashes
.usable() == false)
345 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
346 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
348 Hashes
calc(ExpectedHashes
);
349 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
351 ReceivedHashes
= calc
.GetHashStringList();
355 // only local files can refer other filenames and counting them as fetched would be unfair
356 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
357 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
360 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
361 OwnerQ
->ItemDone(Itm
);
364 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
365 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
366 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
368 pkgAcquire::Item
* const Owner
= *O
;
369 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
370 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
372 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
373 << "ReceivedHash:" << endl
;
374 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
375 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
376 std::clog
<< "ExpectedHash:" << endl
;
377 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
378 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
382 // decide if what we got is what we expected
383 bool consideredOkay
= false;
384 if (ExpectedHashes
.usable())
386 if (ReceivedHashes
.usable() == false)
388 /* IMS-Hits can't be checked here as we will have uncompressed file,
389 but the hashes for the compressed file. What we have was good through
390 so all we have to ensure later is that we are not stalled. */
391 consideredOkay
= isIMSHit
;
393 else if (ReceivedHashes
== ExpectedHashes
)
394 consideredOkay
= true;
396 consideredOkay
= false;
399 else if (Owner
->HashesRequired() == true)
400 consideredOkay
= false;
403 consideredOkay
= true;
404 // even if the hashes aren't usable to declare something secure
405 // we can at least use them to declare it an integrity failure
406 if (ExpectedHashes
.empty() == false && ReceivedHashes
!= ExpectedHashes
&& _config
->Find("Acquire::ForceHash").empty())
407 consideredOkay
= false;
410 if (consideredOkay
== true)
411 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
412 else // hashsum mismatch
413 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
415 if (consideredOkay
== true)
417 Owner
->Done(Message
, ReceivedHashes
, Config
);
421 Log
->IMSHit(Owner
->GetItemDesc());
423 Log
->Done(Owner
->GetItemDesc());
428 Owner
->Failed(Message
,Config
);
430 Log
->Fail(Owner
->GetItemDesc());
442 std::string
const msg
= LookupTag(Message
,"Message");
443 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
447 PrepareFiles("400::URIFailure", Itm
);
449 // Display update before completion
450 if (Log
!= 0 && Log
->MorePulses
== true)
451 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
452 Log
->Pulse((*O
)->GetOwner());
454 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
455 OwnerQ
->ItemDone(Itm
);
460 std::string
const failReason
= LookupTag(Message
, "FailReason");
461 std::string
const reasons
[] = { "Timeout", "ConnectionRefused",
462 "ConnectionTimedOut", "ResolveFailure", "TmpResolveFailure" };
463 errTransient
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
466 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
469 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
470 (*O
)->Failed(Message
,Config
);
473 Log
->Fail((*O
)->GetItemDesc());
480 // 401 General Failure
482 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
487 MediaChange(Message
);
494 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
495 // ---------------------------------------------------------------------
496 /* This parses the capabilities message and dumps it into the configuration
498 bool pkgAcquire::Worker::Capabilities(string Message
)
503 Config
->Version
= LookupTag(Message
,"Version");
504 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
505 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
506 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
507 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
508 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
509 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
514 clog
<< "Configured access method " << Config
->Access
<< endl
;
515 clog
<< "Version:" << Config
->Version
<<
516 " SingleInstance:" << Config
->SingleInstance
<<
517 " Pipeline:" << Config
->Pipeline
<<
518 " SendConfig:" << Config
->SendConfig
<<
519 " LocalOnly: " << Config
->LocalOnly
<<
520 " NeedsCleanup: " << Config
->NeedsCleanup
<<
521 " Removable: " << Config
->Removable
<< endl
;
527 // Worker::MediaChange - Request a media change /*{{{*/
528 // ---------------------------------------------------------------------
530 bool pkgAcquire::Worker::MediaChange(string Message
)
532 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
535 string Media
= LookupTag(Message
,"Media");
536 string Drive
= LookupTag(Message
,"Drive");
537 ostringstream msg
,status
;
538 ioprintf(msg
,_("Please insert the disc labeled: "
540 "in the drive '%s' and press [Enter]."),
541 Media
.c_str(),Drive
.c_str());
542 status
<< "media-change: " // message
543 << Media
<< ":" // media
544 << Drive
<< ":" // drive
545 << msg
.str() // l10n message
548 std::string
const dlstatus
= status
.str();
549 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
552 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
553 LookupTag(Message
,"Drive")) == false)
556 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
558 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
565 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
567 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
573 // Worker::SendConfiguration - Send the config to the method /*{{{*/
574 // ---------------------------------------------------------------------
576 bool pkgAcquire::Worker::SendConfiguration()
578 if (Config
->SendConfig
== false)
584 /* Write out all of the configuration directives by walking the
585 configuration tree */
586 std::ostringstream Message
;
587 Message
<< "601 Configuration\n";
588 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
592 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
593 OutQueue
+= Message
.str();
599 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
600 // ---------------------------------------------------------------------
601 /* Send a URI Acquire message to the method */
602 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
607 string Message
= "600 URI Acquire\n";
608 Message
.reserve(300);
609 Message
+= "URI: " + Item
->URI
;
610 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
612 HashStringList
const hsl
= Item
->GetExpectedHashes();
613 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
614 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
616 if (hsl
.FileSize() == 0)
618 unsigned long long FileSize
= Item
->GetMaximumSize();
622 strprintf(MaximumSize
, "%llu", FileSize
);
623 Message
+= "\nMaximum-Size: " + MaximumSize
;
627 Item
->SyncDestinationFiles();
628 Message
+= Item
->Custom600Headers();
631 if (RealFileExists(Item
->Owner
->DestFile
))
633 std::string
const SandboxUser
= _config
->Find("APT::Sandbox::User");
634 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
635 SandboxUser
.c_str(), "root", 0600);
639 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
646 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
647 // ---------------------------------------------------------------------
649 bool pkgAcquire::Worker::OutFdReady()
654 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
656 while (Res
< 0 && errno
== EINTR
);
659 return MethodFailure();
661 OutQueue
.erase(0,Res
);
662 if (OutQueue
.empty() == true)
668 // Worker::InFdRead - In bound FD is ready /*{{{*/
669 // ---------------------------------------------------------------------
671 bool pkgAcquire::Worker::InFdReady()
673 if (ReadMessages() == false)
679 // Worker::MethodFailure - Called when the method fails /*{{{*/
680 // ---------------------------------------------------------------------
681 /* This is called when the method is believed to have failed, probably because
683 bool pkgAcquire::Worker::MethodFailure()
685 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
687 // do not reap the child here to show meaningfull error to the user
688 ExecWait(Process
,Access
.c_str(),false);
697 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
702 // Worker::Pulse - Called periodically /*{{{*/
703 // ---------------------------------------------------------------------
705 void pkgAcquire::Worker::Pulse()
707 if (CurrentItem
== 0)
711 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
713 CurrentSize
= Buf
.st_size
;
716 // Worker::ItemDone - Called when the current item is finished /*{{{*/
717 // ---------------------------------------------------------------------
719 void pkgAcquire::Worker::ItemDone()
727 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
729 if (RealFileExists(Itm
->Owner
->DestFile
))
731 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
732 std::string
const filename
= Itm
->Owner
->DestFile
;
733 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
735 pkgAcquire::Item
const * const Owner
= *O
;
736 if (Owner
->DestFile
== filename
|| filename
== "/dev/null")
738 RemoveFile("PrepareFiles", Owner
->DestFile
);
739 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
741 // different mounts can't happen for us as we download to lists/ by default,
742 // but if the system is reused by others the locations can potentially be on
743 // different disks, so use symlink as poor-men replacement.
744 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
745 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
746 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
752 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
753 RemoveFile("PrepareFiles", (*O
)->DestFile
);