1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
43 // Worker::Worker - Constructor for Queue startup /*{{{*/
44 // ---------------------------------------------------------------------
46 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
47 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
59 // Worker::Worker - Constructor for method config startup /*{{{*/
60 // ---------------------------------------------------------------------
62 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
63 Access(Cnf
->Access
), CurrentItem(NULL
),
64 CurrentSize(0), TotalSize(0)
69 // Worker::Construct - Constructor helper /*{{{*/
70 // ---------------------------------------------------------------------
72 void pkgAcquire::Worker::Construct()
81 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
84 // Worker::~Worker - Destructor /*{{{*/
85 // ---------------------------------------------------------------------
87 pkgAcquire::Worker::~Worker()
94 /* Closing of stdin is the signal to exit and die when the process
95 indicates it needs cleanup */
96 if (Config
->NeedsCleanup
== false)
98 ExecWait(Process
,Access
.c_str(),true);
102 // Worker::Start - Start the worker process /*{{{*/
103 // ---------------------------------------------------------------------
104 /* This forks the method and inits the communication channel */
105 bool pkgAcquire::Worker::Start()
107 // Get the method path
108 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
109 if (FileExists(Method
) == false)
111 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
112 if (Access
== "https")
113 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
118 clog
<< "Starting method '" << Method
<< '\'' << endl
;
121 int Pipes
[4] = {-1,-1,-1,-1};
122 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
124 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
125 for (int I
= 0; I
!= 4; I
++)
129 for (int I
= 0; I
!= 4; I
++)
130 SetCloseExec(Pipes
[I
],true);
132 // Fork off the process
133 Process
= ExecFork();
137 dup2(Pipes
[1],STDOUT_FILENO
);
138 dup2(Pipes
[2],STDIN_FILENO
);
139 SetCloseExec(STDOUT_FILENO
,false);
140 SetCloseExec(STDIN_FILENO
,false);
141 SetCloseExec(STDERR_FILENO
,false);
144 Args
[0] = Method
.c_str();
146 execv(Args
[0],(char **)Args
);
147 cerr
<< "Failed to exec method " << Args
[0] << endl
;
154 SetNonBlock(Pipes
[0],true);
155 SetNonBlock(Pipes
[3],true);
161 // Read the configuration data
162 if (WaitFd(InFd
) == false ||
163 ReadMessages() == false)
164 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
173 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
174 // ---------------------------------------------------------------------
176 bool pkgAcquire::Worker::ReadMessages()
178 if (::ReadMessages(InFd
,MessageQueue
) == false)
179 return MethodFailure();
183 // Worker::RunMessage - Empty the message queue /*{{{*/
184 // ---------------------------------------------------------------------
185 /* This takes the messages from the message queue and runs them through
186 the parsers in order. */
187 bool pkgAcquire::Worker::RunMessages()
189 while (MessageQueue
.empty() == false)
191 string Message
= MessageQueue
.front();
192 MessageQueue
.erase(MessageQueue
.begin());
195 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
197 // Fetch the message number
199 int Number
= strtol(Message
.c_str(),&End
,10);
200 if (End
== Message
.c_str())
201 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
203 string URI
= LookupTag(Message
,"URI");
204 pkgAcquire::Queue::QItem
*Itm
= NULL
;
205 if (URI
.empty() == false)
206 Itm
= OwnerQ
->FindItem(URI
,this);
210 // update used mirror
211 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
212 if (UsedMirror
.empty() == false)
214 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
215 (*O
)->UsedMirror
= UsedMirror
;
217 if (Itm
->Description
.find(" ") != string::npos
)
218 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
222 // Determine the message number and dispatch
227 if (Capabilities(Message
) == false)
228 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
234 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
239 Status
= LookupTag(Message
,"Message");
247 _error
->Error("Method gave invalid 103 Redirect message");
251 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
256 // Change the status so that it can be dequeued
257 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
258 (*O
)->Status
= pkgAcquire::Item::StatIdle
;
259 // Mark the item as done (taking care of all queues)
260 // and then put it in the main queue again
261 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
262 OwnerQ
->ItemDone(Itm
);
264 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
266 pkgAcquire::Item
*Owner
= *O
;
267 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
268 // if we change site, treat it as a mirror change
269 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
271 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
272 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
274 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
275 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
277 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
278 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
279 if (desc
.Description
.find(" ") != string::npos
)
280 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
285 OwnerQ
->Owner
->Enqueue(desc
);
298 _error
->Error("Method gave invalid 200 URI Start message");
304 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
305 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
306 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
308 (*O
)->Start(Message
, TotalSize
);
310 // Display update before completion
313 if (Log
->MorePulses
== true)
314 Log
->Pulse((*O
)->GetOwner());
315 Log
->Fetch((*O
)->GetItemDesc());
327 _error
->Error("Method gave invalid 201 URI Done message");
331 PrepareFiles("201::URIDone", Itm
);
333 // Display update before completion
334 if (Log
!= 0 && Log
->MorePulses
== true)
335 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
336 Log
->Pulse((*O
)->GetOwner());
338 std::string
const filename
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str());
339 HashStringList ReceivedHashes
;
341 // see if we got hashes to verify
342 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
344 std::string
const tagname
= std::string(*type
) + "-Hash";
345 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
346 if (hashsum
.empty() == false)
347 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
349 // not all methods always sent Hashes our way
350 if (ReceivedHashes
.usable() == false)
352 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
353 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
355 Hashes
calc(ExpectedHashes
);
356 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
358 ReceivedHashes
= calc
.GetHashStringList();
363 // only local files can refer other filenames and counting them as fetched would be unfair
364 if (Log
!= NULL
&& filename
!= Itm
->Owner
->DestFile
)
365 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
367 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
368 OwnerQ
->ItemDone(Itm
);
371 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
372 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
373 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
375 pkgAcquire::Item
* const Owner
= *O
;
376 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
377 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
379 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
380 << "ReceivedHash:" << endl
;
381 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
382 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
383 std::clog
<< "ExpectedHash:" << endl
;
384 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
385 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
389 // decide if what we got is what we expected
390 bool consideredOkay
= false;
391 if (ExpectedHashes
.usable())
393 if (ReceivedHashes
.usable() == false)
395 /* IMS-Hits can't be checked here as we will have uncompressed file,
396 but the hashes for the compressed file. What we have was good through
397 so all we have to ensure later is that we are not stalled. */
398 consideredOkay
= isIMSHit
;
400 else if (ReceivedHashes
== ExpectedHashes
)
401 consideredOkay
= true;
403 consideredOkay
= false;
406 else if (Owner
->HashesRequired() == true)
407 consideredOkay
= false;
409 consideredOkay
= true;
411 if (consideredOkay
== true)
412 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
413 else // hashsum mismatch
414 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
416 if (consideredOkay
== true)
418 Owner
->Done(Message
, ReceivedHashes
, Config
);
422 Log
->IMSHit(Owner
->GetItemDesc());
424 Log
->Done(Owner
->GetItemDesc());
429 Owner
->Failed(Message
,Config
);
431 Log
->Fail(Owner
->GetItemDesc());
443 std::string
const msg
= LookupTag(Message
,"Message");
444 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
448 PrepareFiles("400::URIFailure", Itm
);
450 // Display update before completion
451 if (Log
!= 0 && Log
->MorePulses
== true)
452 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
453 Log
->Pulse((*O
)->GetOwner());
455 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
456 OwnerQ
->ItemDone(Itm
);
459 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
462 if(LookupTag(Message
,"FailReason") == "Timeout" ||
463 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
464 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
465 LookupTag(Message
,"FailReason") == "ConnectionRefused")
466 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
468 (*O
)->Failed(Message
,Config
);
471 Log
->Fail((*O
)->GetItemDesc());
478 // 401 General Failure
480 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
485 MediaChange(Message
);
492 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
493 // ---------------------------------------------------------------------
494 /* This parses the capabilities message and dumps it into the configuration
496 bool pkgAcquire::Worker::Capabilities(string Message
)
501 Config
->Version
= LookupTag(Message
,"Version");
502 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
503 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
504 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
505 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
506 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
507 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
512 clog
<< "Configured access method " << Config
->Access
<< endl
;
513 clog
<< "Version:" << Config
->Version
<<
514 " SingleInstance:" << Config
->SingleInstance
<<
515 " Pipeline:" << Config
->Pipeline
<<
516 " SendConfig:" << Config
->SendConfig
<<
517 " LocalOnly: " << Config
->LocalOnly
<<
518 " NeedsCleanup: " << Config
->NeedsCleanup
<<
519 " Removable: " << Config
->Removable
<< endl
;
525 // Worker::MediaChange - Request a media change /*{{{*/
526 // ---------------------------------------------------------------------
528 bool pkgAcquire::Worker::MediaChange(string Message
)
530 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
533 string Media
= LookupTag(Message
,"Media");
534 string Drive
= LookupTag(Message
,"Drive");
535 ostringstream msg
,status
;
536 ioprintf(msg
,_("Please insert the disc labeled: "
538 "in the drive '%s' and press [Enter]."),
539 Media
.c_str(),Drive
.c_str());
540 status
<< "media-change: " // message
541 << Media
<< ":" // media
542 << Drive
<< ":" // drive
543 << msg
.str() // l10n message
546 std::string
const dlstatus
= status
.str();
547 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
550 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
551 LookupTag(Message
,"Drive")) == false)
554 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
556 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
563 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
565 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
571 // Worker::SendConfiguration - Send the config to the method /*{{{*/
572 // ---------------------------------------------------------------------
574 bool pkgAcquire::Worker::SendConfiguration()
576 if (Config
->SendConfig
== false)
582 /* Write out all of the configuration directives by walking the
583 configuration tree */
584 std::ostringstream Message
;
585 Message
<< "601 Configuration\n";
586 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
590 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
591 OutQueue
+= Message
.str();
597 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
598 // ---------------------------------------------------------------------
599 /* Send a URI Acquire message to the method */
600 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
605 string Message
= "600 URI Acquire\n";
606 Message
.reserve(300);
607 Message
+= "URI: " + Item
->URI
;
608 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
610 HashStringList
const hsl
= Item
->GetExpectedHashes();
611 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
612 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
614 if (hsl
.FileSize() == 0)
616 unsigned long long FileSize
= Item
->GetMaximumSize();
620 strprintf(MaximumSize
, "%llu", FileSize
);
621 Message
+= "\nMaximum-Size: " + MaximumSize
;
625 Item
->SyncDestinationFiles();
626 Message
+= Item
->Custom600Headers();
629 if (RealFileExists(Item
->Owner
->DestFile
))
631 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
632 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
633 SandboxUser
.c_str(), "root", 0600);
637 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
644 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
645 // ---------------------------------------------------------------------
647 bool pkgAcquire::Worker::OutFdReady()
652 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
654 while (Res
< 0 && errno
== EINTR
);
657 return MethodFailure();
659 OutQueue
.erase(0,Res
);
660 if (OutQueue
.empty() == true)
666 // Worker::InFdRead - In bound FD is ready /*{{{*/
667 // ---------------------------------------------------------------------
669 bool pkgAcquire::Worker::InFdReady()
671 if (ReadMessages() == false)
677 // Worker::MethodFailure - Called when the method fails /*{{{*/
678 // ---------------------------------------------------------------------
679 /* This is called when the method is believed to have failed, probably because
681 bool pkgAcquire::Worker::MethodFailure()
683 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
685 // do not reap the child here to show meaningfull error to the user
686 ExecWait(Process
,Access
.c_str(),false);
695 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
700 // Worker::Pulse - Called periodically /*{{{*/
701 // ---------------------------------------------------------------------
703 void pkgAcquire::Worker::Pulse()
705 if (CurrentItem
== 0)
709 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
711 CurrentSize
= Buf
.st_size
;
714 // Worker::ItemDone - Called when the current item is finished /*{{{*/
715 // ---------------------------------------------------------------------
717 void pkgAcquire::Worker::ItemDone()
725 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
727 if (RealFileExists(Itm
->Owner
->DestFile
))
729 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
730 std::string
const filename
= Itm
->Owner
->DestFile
;
731 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
733 pkgAcquire::Item
const * const Owner
= *O
;
734 if (Owner
->DestFile
== filename
)
736 unlink(Owner
->DestFile
.c_str());
737 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
739 // different mounts can't happen for us as we download to lists/ by default,
740 // but if the system is reused by others the locations can potentially be on
741 // different disks, so use symlink as poor-men replacement.
742 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
743 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
744 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
750 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
751 unlink((*O
)->DestFile
.c_str());