1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
43 // Worker::Worker - Constructor for Queue startup /*{{{*/
44 // ---------------------------------------------------------------------
46 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
47 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
59 // Worker::Worker - Constructor for method config startup /*{{{*/
60 // ---------------------------------------------------------------------
62 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
63 Access(Cnf
->Access
), CurrentItem(NULL
),
64 CurrentSize(0), TotalSize(0)
69 // Worker::Construct - Constructor helper /*{{{*/
70 // ---------------------------------------------------------------------
72 void pkgAcquire::Worker::Construct()
81 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
84 // Worker::~Worker - Destructor /*{{{*/
85 // ---------------------------------------------------------------------
87 pkgAcquire::Worker::~Worker()
94 /* Closing of stdin is the signal to exit and die when the process
95 indicates it needs cleanup */
96 if (Config
->NeedsCleanup
== false)
98 ExecWait(Process
,Access
.c_str(),true);
102 // Worker::Start - Start the worker process /*{{{*/
103 // ---------------------------------------------------------------------
104 /* This forks the method and inits the communication channel */
105 bool pkgAcquire::Worker::Start()
107 // Get the method path
108 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
109 if (FileExists(Method
) == false)
111 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
112 if (Access
== "https")
113 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
118 clog
<< "Starting method '" << Method
<< '\'' << endl
;
121 int Pipes
[4] = {-1,-1,-1,-1};
122 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
124 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
125 for (int I
= 0; I
!= 4; I
++)
129 for (int I
= 0; I
!= 4; I
++)
130 SetCloseExec(Pipes
[I
],true);
132 // Fork off the process
133 Process
= ExecFork();
137 dup2(Pipes
[1],STDOUT_FILENO
);
138 dup2(Pipes
[2],STDIN_FILENO
);
139 SetCloseExec(STDOUT_FILENO
,false);
140 SetCloseExec(STDIN_FILENO
,false);
141 SetCloseExec(STDERR_FILENO
,false);
144 Args
[0] = Method
.c_str();
146 execv(Args
[0],(char **)Args
);
147 cerr
<< "Failed to exec method " << Args
[0] << endl
;
154 SetNonBlock(Pipes
[0],true);
155 SetNonBlock(Pipes
[3],true);
161 // Read the configuration data
162 if (WaitFd(InFd
) == false ||
163 ReadMessages() == false)
164 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
173 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
174 // ---------------------------------------------------------------------
176 bool pkgAcquire::Worker::ReadMessages()
178 if (::ReadMessages(InFd
,MessageQueue
) == false)
179 return MethodFailure();
183 // Worker::RunMessage - Empty the message queue /*{{{*/
184 // ---------------------------------------------------------------------
185 /* This takes the messages from the message queue and runs them through
186 the parsers in order. */
187 bool pkgAcquire::Worker::RunMessages()
189 while (MessageQueue
.empty() == false)
191 string Message
= MessageQueue
.front();
192 MessageQueue
.erase(MessageQueue
.begin());
195 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
197 // Fetch the message number
199 int Number
= strtol(Message
.c_str(),&End
,10);
200 if (End
== Message
.c_str())
201 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
203 string URI
= LookupTag(Message
,"URI");
204 pkgAcquire::Queue::QItem
*Itm
= NULL
;
205 if (URI
.empty() == false)
206 Itm
= OwnerQ
->FindItem(URI
,this);
210 // update used mirror
211 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
212 if (UsedMirror
.empty() == false)
214 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
215 (*O
)->UsedMirror
= UsedMirror
;
217 if (Itm
->Description
.find(" ") != string::npos
)
218 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
222 // Determine the message number and dispatch
227 if (Capabilities(Message
) == false)
228 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
234 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
239 Status
= LookupTag(Message
,"Message");
247 _error
->Error("Method gave invalid 103 Redirect message");
251 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
256 // Change the status so that it can be dequeued
257 for (auto const &O
: Itm
->Owners
)
258 O
->Status
= pkgAcquire::Item::StatIdle
;
259 // Mark the item as done (taking care of all queues)
260 // and then put it in the main queue again
261 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
262 OwnerQ
->ItemDone(Itm
);
264 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
266 pkgAcquire::Item
*Owner
= *O
;
267 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
268 // if we change site, treat it as a mirror change
269 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
271 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
272 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
274 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
275 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
277 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
278 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
279 if (desc
.Description
.find(" ") != string::npos
)
280 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
285 OwnerQ
->Owner
->Enqueue(desc
);
298 _error
->Error("Method gave invalid 200 URI Start message");
304 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
305 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
306 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
308 (*O
)->Start(Message
, TotalSize
);
310 // Display update before completion
313 if (Log
->MorePulses
== true)
314 Log
->Pulse((*O
)->GetOwner());
315 Log
->Fetch((*O
)->GetItemDesc());
327 _error
->Error("Method gave invalid 201 URI Done message");
331 PrepareFiles("201::URIDone", Itm
);
333 // Display update before completion
334 if (Log
!= 0 && Log
->MorePulses
== true)
335 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
336 Log
->Pulse((*O
)->GetOwner());
338 HashStringList ReceivedHashes
;
340 std::string
const givenfilename
= LookupTag(Message
, "Filename");
341 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
342 // see if we got hashes to verify
343 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
345 std::string
const tagname
= std::string(*type
) + "-Hash";
346 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
347 if (hashsum
.empty() == false)
348 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
350 // not all methods always sent Hashes our way
351 if (ReceivedHashes
.usable() == false)
353 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
354 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
356 Hashes
calc(ExpectedHashes
);
357 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
359 ReceivedHashes
= calc
.GetHashStringList();
363 // only local files can refer other filenames and counting them as fetched would be unfair
364 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
365 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
368 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
369 OwnerQ
->ItemDone(Itm
);
372 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
373 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
374 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
376 pkgAcquire::Item
* const Owner
= *O
;
377 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
378 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
380 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
381 << "ReceivedHash:" << endl
;
382 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
383 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
384 std::clog
<< "ExpectedHash:" << endl
;
385 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
386 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
390 // decide if what we got is what we expected
391 bool consideredOkay
= false;
392 if (ExpectedHashes
.usable())
394 if (ReceivedHashes
.usable() == false)
396 /* IMS-Hits can't be checked here as we will have uncompressed file,
397 but the hashes for the compressed file. What we have was good through
398 so all we have to ensure later is that we are not stalled. */
399 consideredOkay
= isIMSHit
;
401 else if (ReceivedHashes
== ExpectedHashes
)
402 consideredOkay
= true;
404 consideredOkay
= false;
407 else if (Owner
->HashesRequired() == true)
408 consideredOkay
= false;
411 consideredOkay
= true;
412 // even if the hashes aren't usable to declare something secure
413 // we can at least use them to declare it an integrity failure
414 if (ExpectedHashes
.empty() == false && ReceivedHashes
!= ExpectedHashes
&& _config
->Find("Acquire::ForceHash").empty())
415 consideredOkay
= false;
418 if (consideredOkay
== true)
419 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
420 else // hashsum mismatch
421 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
423 if (consideredOkay
== true)
425 Owner
->Done(Message
, ReceivedHashes
, Config
);
429 Log
->IMSHit(Owner
->GetItemDesc());
431 Log
->Done(Owner
->GetItemDesc());
436 Owner
->Failed(Message
,Config
);
438 Log
->Fail(Owner
->GetItemDesc());
450 std::string
const msg
= LookupTag(Message
,"Message");
451 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
455 PrepareFiles("400::URIFailure", Itm
);
457 // Display update before completion
458 if (Log
!= 0 && Log
->MorePulses
== true)
459 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
460 Log
->Pulse((*O
)->GetOwner());
462 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
463 OwnerQ
->ItemDone(Itm
);
466 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
469 if(LookupTag(Message
,"FailReason") == "Timeout" ||
470 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
471 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
472 LookupTag(Message
,"FailReason") == "ConnectionRefused")
473 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
475 (*O
)->Failed(Message
,Config
);
478 Log
->Fail((*O
)->GetItemDesc());
485 // 401 General Failure
487 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
492 MediaChange(Message
);
499 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
500 // ---------------------------------------------------------------------
501 /* This parses the capabilities message and dumps it into the configuration
503 bool pkgAcquire::Worker::Capabilities(string Message
)
508 Config
->Version
= LookupTag(Message
,"Version");
509 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
510 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
511 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
512 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
513 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
514 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
519 clog
<< "Configured access method " << Config
->Access
<< endl
;
520 clog
<< "Version:" << Config
->Version
<<
521 " SingleInstance:" << Config
->SingleInstance
<<
522 " Pipeline:" << Config
->Pipeline
<<
523 " SendConfig:" << Config
->SendConfig
<<
524 " LocalOnly: " << Config
->LocalOnly
<<
525 " NeedsCleanup: " << Config
->NeedsCleanup
<<
526 " Removable: " << Config
->Removable
<< endl
;
532 // Worker::MediaChange - Request a media change /*{{{*/
533 // ---------------------------------------------------------------------
535 bool pkgAcquire::Worker::MediaChange(string Message
)
537 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
540 string Media
= LookupTag(Message
,"Media");
541 string Drive
= LookupTag(Message
,"Drive");
542 ostringstream msg
,status
;
543 ioprintf(msg
,_("Please insert the disc labeled: "
545 "in the drive '%s' and press [Enter]."),
546 Media
.c_str(),Drive
.c_str());
547 status
<< "media-change: " // message
548 << Media
<< ":" // media
549 << Drive
<< ":" // drive
550 << msg
.str() // l10n message
553 std::string
const dlstatus
= status
.str();
554 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
557 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
558 LookupTag(Message
,"Drive")) == false)
561 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
563 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
570 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
572 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
578 // Worker::SendConfiguration - Send the config to the method /*{{{*/
579 // ---------------------------------------------------------------------
581 bool pkgAcquire::Worker::SendConfiguration()
583 if (Config
->SendConfig
== false)
589 /* Write out all of the configuration directives by walking the
590 configuration tree */
591 std::ostringstream Message
;
592 Message
<< "601 Configuration\n";
593 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
597 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
598 OutQueue
+= Message
.str();
604 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
605 // ---------------------------------------------------------------------
606 /* Send a URI Acquire message to the method */
607 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
612 string Message
= "600 URI Acquire\n";
613 Message
.reserve(300);
614 Message
+= "URI: " + Item
->URI
;
615 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
617 HashStringList
const hsl
= Item
->GetExpectedHashes();
618 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
619 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
621 if (hsl
.FileSize() == 0)
623 unsigned long long FileSize
= Item
->GetMaximumSize();
627 strprintf(MaximumSize
, "%llu", FileSize
);
628 Message
+= "\nMaximum-Size: " + MaximumSize
;
632 Item
->SyncDestinationFiles();
633 Message
+= Item
->Custom600Headers();
636 if (RealFileExists(Item
->Owner
->DestFile
))
638 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
639 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
640 SandboxUser
.c_str(), "root", 0600);
644 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
651 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
652 // ---------------------------------------------------------------------
654 bool pkgAcquire::Worker::OutFdReady()
659 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
661 while (Res
< 0 && errno
== EINTR
);
664 return MethodFailure();
666 OutQueue
.erase(0,Res
);
667 if (OutQueue
.empty() == true)
673 // Worker::InFdRead - In bound FD is ready /*{{{*/
674 // ---------------------------------------------------------------------
676 bool pkgAcquire::Worker::InFdReady()
678 if (ReadMessages() == false)
684 // Worker::MethodFailure - Called when the method fails /*{{{*/
685 // ---------------------------------------------------------------------
686 /* This is called when the method is believed to have failed, probably because
688 bool pkgAcquire::Worker::MethodFailure()
690 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
692 // do not reap the child here to show meaningfull error to the user
693 ExecWait(Process
,Access
.c_str(),false);
702 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
707 // Worker::Pulse - Called periodically /*{{{*/
708 // ---------------------------------------------------------------------
710 void pkgAcquire::Worker::Pulse()
712 if (CurrentItem
== 0)
716 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
718 CurrentSize
= Buf
.st_size
;
721 // Worker::ItemDone - Called when the current item is finished /*{{{*/
722 // ---------------------------------------------------------------------
724 void pkgAcquire::Worker::ItemDone()
732 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
734 if (RealFileExists(Itm
->Owner
->DestFile
))
736 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
737 std::string
const filename
= Itm
->Owner
->DestFile
;
738 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
740 pkgAcquire::Item
const * const Owner
= *O
;
741 if (Owner
->DestFile
== filename
)
743 unlink(Owner
->DestFile
.c_str());
744 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
746 // different mounts can't happen for us as we download to lists/ by default,
747 // but if the system is reused by others the locations can potentially be on
748 // different disks, so use symlink as poor-men replacement.
749 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
750 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
751 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
757 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
758 unlink((*O
)->DestFile
.c_str());