1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
44 // Worker::Worker - Constructor for Queue startup /*{{{*/
45 pkgAcquire::Worker::Worker(Queue
*Q
, MethodConfig
*Cnf
, pkgAcquireStatus
*log
) :
46 d(NULL
), OwnerQ(Q
), Log(log
), Config(Cnf
), Access(Cnf
->Access
),
47 CurrentItem(nullptr), CurrentSize(0), TotalSize(0)
52 // Worker::Worker - Constructor for method config startup /*{{{*/
53 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : Worker(nullptr, Cnf
, nullptr)
57 // Worker::Construct - Constructor helper /*{{{*/
58 // ---------------------------------------------------------------------
60 void pkgAcquire::Worker::Construct()
69 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
72 // Worker::~Worker - Destructor /*{{{*/
73 // ---------------------------------------------------------------------
75 pkgAcquire::Worker::~Worker()
82 /* Closing of stdin is the signal to exit and die when the process
83 indicates it needs cleanup */
84 if (Config
->NeedsCleanup
== false)
86 ExecWait(Process
,Access
.c_str(),true);
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
95 // Get the method path
96 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
97 if (FileExists(Method
) == false)
99 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
100 if (Access
== "https")
101 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
106 clog
<< "Starting method '" << Method
<< '\'' << endl
;
109 int Pipes
[4] = {-1,-1,-1,-1};
110 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
112 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
113 for (int I
= 0; I
!= 4; I
++)
117 for (int I
= 0; I
!= 4; I
++)
118 SetCloseExec(Pipes
[I
],true);
120 // Fork off the process
121 Process
= ExecFork();
125 dup2(Pipes
[1],STDOUT_FILENO
);
126 dup2(Pipes
[2],STDIN_FILENO
);
127 SetCloseExec(STDOUT_FILENO
,false);
128 SetCloseExec(STDIN_FILENO
,false);
129 SetCloseExec(STDERR_FILENO
,false);
132 Args
[0] = Method
.c_str();
134 execv(Args
[0],(char **)Args
);
135 cerr
<< "Failed to exec method " << Args
[0] << endl
;
142 SetNonBlock(Pipes
[0],true);
143 SetNonBlock(Pipes
[3],true);
149 // Read the configuration data
150 if (WaitFd(InFd
) == false ||
151 ReadMessages() == false)
152 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
161 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
162 // ---------------------------------------------------------------------
164 bool pkgAcquire::Worker::ReadMessages()
166 if (::ReadMessages(InFd
,MessageQueue
) == false)
167 return MethodFailure();
171 // Worker::RunMessage - Empty the message queue /*{{{*/
172 // ---------------------------------------------------------------------
173 /* This takes the messages from the message queue and runs them through
174 the parsers in order. */
175 bool pkgAcquire::Worker::RunMessages()
177 while (MessageQueue
.empty() == false)
179 string Message
= MessageQueue
.front();
180 MessageQueue
.erase(MessageQueue
.begin());
183 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
185 // Fetch the message number
187 int Number
= strtol(Message
.c_str(),&End
,10);
188 if (End
== Message
.c_str())
189 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
191 string URI
= LookupTag(Message
,"URI");
192 pkgAcquire::Queue::QItem
*Itm
= NULL
;
193 if (URI
.empty() == false)
194 Itm
= OwnerQ
->FindItem(URI
,this);
198 // update used mirror
199 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
200 if (UsedMirror
.empty() == false)
202 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
203 (*O
)->UsedMirror
= UsedMirror
;
205 if (Itm
->Description
.find(" ") != string::npos
)
206 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
210 // Determine the message number and dispatch
215 if (Capabilities(Message
) == false)
216 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
222 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
227 Status
= LookupTag(Message
,"Message");
235 _error
->Error("Method gave invalid 103 Redirect message");
239 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
244 // Change the status so that it can be dequeued
245 for (auto const &O
: Itm
->Owners
)
246 O
->Status
= pkgAcquire::Item::StatIdle
;
247 // Mark the item as done (taking care of all queues)
248 // and then put it in the main queue again
249 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
250 OwnerQ
->ItemDone(Itm
);
252 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
254 pkgAcquire::Item
*Owner
= *O
;
255 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
256 // if we change site, treat it as a mirror change
257 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
259 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
260 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
262 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
263 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
265 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
266 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
267 if (desc
.Description
.find(" ") != string::npos
)
268 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
273 OwnerQ
->Owner
->Enqueue(desc
);
286 _error
->Error("Method gave invalid 200 URI Start message");
292 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
293 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
294 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
296 (*O
)->Start(Message
, TotalSize
);
298 // Display update before completion
301 if (Log
->MorePulses
== true)
302 Log
->Pulse((*O
)->GetOwner());
303 Log
->Fetch((*O
)->GetItemDesc());
315 _error
->Error("Method gave invalid 201 URI Done message");
319 PrepareFiles("201::URIDone", Itm
);
321 // Display update before completion
322 if (Log
!= 0 && Log
->MorePulses
== true)
323 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
324 Log
->Pulse((*O
)->GetOwner());
326 HashStringList ReceivedHashes
;
328 std::string
const givenfilename
= LookupTag(Message
, "Filename");
329 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
330 // see if we got hashes to verify
331 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
333 std::string
const tagname
= std::string(*type
) + "-Hash";
334 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
335 if (hashsum
.empty() == false)
336 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
338 // not all methods always sent Hashes our way
339 if (ReceivedHashes
.usable() == false)
341 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
342 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
344 Hashes
calc(ExpectedHashes
);
345 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
347 ReceivedHashes
= calc
.GetHashStringList();
351 // only local files can refer other filenames and counting them as fetched would be unfair
352 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
353 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
356 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
357 OwnerQ
->ItemDone(Itm
);
360 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
361 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
362 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
364 pkgAcquire::Item
* const Owner
= *O
;
365 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
366 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
368 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
369 << "ReceivedHash:" << endl
;
370 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
371 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
372 std::clog
<< "ExpectedHash:" << endl
;
373 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
374 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
378 // decide if what we got is what we expected
379 bool consideredOkay
= false;
380 if (ExpectedHashes
.usable())
382 if (ReceivedHashes
.usable() == false)
384 /* IMS-Hits can't be checked here as we will have uncompressed file,
385 but the hashes for the compressed file. What we have was good through
386 so all we have to ensure later is that we are not stalled. */
387 consideredOkay
= isIMSHit
;
389 else if (ReceivedHashes
== ExpectedHashes
)
390 consideredOkay
= true;
392 consideredOkay
= false;
395 else if (Owner
->HashesRequired() == true)
396 consideredOkay
= false;
399 consideredOkay
= true;
400 // even if the hashes aren't usable to declare something secure
401 // we can at least use them to declare it an integrity failure
402 if (ExpectedHashes
.empty() == false && ReceivedHashes
!= ExpectedHashes
&& _config
->Find("Acquire::ForceHash").empty())
403 consideredOkay
= false;
406 if (consideredOkay
== true)
407 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
408 else // hashsum mismatch
409 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
411 if (consideredOkay
== true)
413 Owner
->Done(Message
, ReceivedHashes
, Config
);
417 Log
->IMSHit(Owner
->GetItemDesc());
419 Log
->Done(Owner
->GetItemDesc());
424 Owner
->Failed(Message
,Config
);
426 Log
->Fail(Owner
->GetItemDesc());
438 std::string
const msg
= LookupTag(Message
,"Message");
439 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
443 PrepareFiles("400::URIFailure", Itm
);
445 // Display update before completion
446 if (Log
!= 0 && Log
->MorePulses
== true)
447 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
448 Log
->Pulse((*O
)->GetOwner());
450 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
451 OwnerQ
->ItemDone(Itm
);
456 std::string
const failReason
= LookupTag(Message
, "FailReason");
457 std::string
const reasons
[] = { "Timeout", "ConnectionRefused",
458 "ConnectionTimedOut", "ResolveFailure", "TmpResolveFailure" };
459 errTransient
= std::find(std::begin(reasons
), std::end(reasons
), failReason
) != std::end(reasons
);
462 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
465 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
466 (*O
)->Failed(Message
,Config
);
469 Log
->Fail((*O
)->GetItemDesc());
476 // 401 General Failure
478 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
483 MediaChange(Message
);
490 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
491 // ---------------------------------------------------------------------
492 /* This parses the capabilities message and dumps it into the configuration
494 bool pkgAcquire::Worker::Capabilities(string Message
)
499 Config
->Version
= LookupTag(Message
,"Version");
500 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
501 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
502 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
503 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
504 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
505 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
510 clog
<< "Configured access method " << Config
->Access
<< endl
;
511 clog
<< "Version:" << Config
->Version
<<
512 " SingleInstance:" << Config
->SingleInstance
<<
513 " Pipeline:" << Config
->Pipeline
<<
514 " SendConfig:" << Config
->SendConfig
<<
515 " LocalOnly: " << Config
->LocalOnly
<<
516 " NeedsCleanup: " << Config
->NeedsCleanup
<<
517 " Removable: " << Config
->Removable
<< endl
;
523 // Worker::MediaChange - Request a media change /*{{{*/
524 // ---------------------------------------------------------------------
526 bool pkgAcquire::Worker::MediaChange(string Message
)
528 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
531 string Media
= LookupTag(Message
,"Media");
532 string Drive
= LookupTag(Message
,"Drive");
533 ostringstream msg
,status
;
534 ioprintf(msg
,_("Please insert the disc labeled: "
536 "in the drive '%s' and press [Enter]."),
537 Media
.c_str(),Drive
.c_str());
538 status
<< "media-change: " // message
539 << Media
<< ":" // media
540 << Drive
<< ":" // drive
541 << msg
.str() // l10n message
544 std::string
const dlstatus
= status
.str();
545 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
548 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
549 LookupTag(Message
,"Drive")) == false)
552 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
554 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
561 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
563 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
569 // Worker::SendConfiguration - Send the config to the method /*{{{*/
570 // ---------------------------------------------------------------------
572 bool pkgAcquire::Worker::SendConfiguration()
574 if (Config
->SendConfig
== false)
580 /* Write out all of the configuration directives by walking the
581 configuration tree */
582 std::ostringstream Message
;
583 Message
<< "601 Configuration\n";
584 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
588 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
589 OutQueue
+= Message
.str();
595 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
596 // ---------------------------------------------------------------------
597 /* Send a URI Acquire message to the method */
598 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
603 string Message
= "600 URI Acquire\n";
604 Message
.reserve(300);
605 Message
+= "URI: " + Item
->URI
;
606 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
608 HashStringList
const hsl
= Item
->GetExpectedHashes();
609 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
610 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
612 if (hsl
.FileSize() == 0)
614 unsigned long long FileSize
= Item
->GetMaximumSize();
618 strprintf(MaximumSize
, "%llu", FileSize
);
619 Message
+= "\nMaximum-Size: " + MaximumSize
;
623 Item
->SyncDestinationFiles();
624 Message
+= Item
->Custom600Headers();
627 if (RealFileExists(Item
->Owner
->DestFile
))
629 std::string
const SandboxUser
= _config
->Find("APT::Sandbox::User");
630 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
631 SandboxUser
.c_str(), "root", 0600);
635 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
642 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
643 // ---------------------------------------------------------------------
645 bool pkgAcquire::Worker::OutFdReady()
650 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
652 while (Res
< 0 && errno
== EINTR
);
655 return MethodFailure();
657 OutQueue
.erase(0,Res
);
658 if (OutQueue
.empty() == true)
664 // Worker::InFdRead - In bound FD is ready /*{{{*/
665 // ---------------------------------------------------------------------
667 bool pkgAcquire::Worker::InFdReady()
669 if (ReadMessages() == false)
675 // Worker::MethodFailure - Called when the method fails /*{{{*/
676 // ---------------------------------------------------------------------
677 /* This is called when the method is believed to have failed, probably because
679 bool pkgAcquire::Worker::MethodFailure()
681 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
683 // do not reap the child here to show meaningfull error to the user
684 ExecWait(Process
,Access
.c_str(),false);
693 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
698 // Worker::Pulse - Called periodically /*{{{*/
699 // ---------------------------------------------------------------------
701 void pkgAcquire::Worker::Pulse()
703 if (CurrentItem
== 0)
707 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
709 CurrentSize
= Buf
.st_size
;
712 // Worker::ItemDone - Called when the current item is finished /*{{{*/
713 // ---------------------------------------------------------------------
715 void pkgAcquire::Worker::ItemDone()
723 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
725 if (RealFileExists(Itm
->Owner
->DestFile
))
727 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
728 std::string
const filename
= Itm
->Owner
->DestFile
;
729 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
731 pkgAcquire::Item
const * const Owner
= *O
;
732 if (Owner
->DestFile
== filename
|| filename
== "/dev/null")
734 RemoveFile("PrepareFiles", Owner
->DestFile
);
735 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
737 // different mounts can't happen for us as we download to lists/ by default,
738 // but if the system is reused by others the locations can potentially be on
739 // different disks, so use symlink as poor-men replacement.
740 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
741 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
742 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
748 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
749 RemoveFile("PrepareFiles", (*O
)->DestFile
);