1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
37 #include <sys/types.h>
46 // Worker::Worker - Constructor for Queue startup /*{{{*/
47 // ---------------------------------------------------------------------
49 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
50 pkgAcquireStatus
*log
) : Log(log
)
62 // Worker::Worker - Constructor for method config startup /*{{{*/
63 // ---------------------------------------------------------------------
65 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
)
77 // Worker::Construct - Constructor helper /*{{{*/
78 // ---------------------------------------------------------------------
80 void pkgAcquire::Worker::Construct()
89 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
92 // Worker::~Worker - Destructor /*{{{*/
93 // ---------------------------------------------------------------------
95 pkgAcquire::Worker::~Worker()
102 /* Closing of stdin is the signal to exit and die when the process
103 indicates it needs cleanup */
104 if (Config
->NeedsCleanup
== false)
105 kill(Process
,SIGINT
);
106 ExecWait(Process
,Access
.c_str(),true);
110 // Worker::Start - Start the worker process /*{{{*/
111 // ---------------------------------------------------------------------
112 /* This forks the method and inits the communication channel */
113 bool pkgAcquire::Worker::Start()
115 // Get the method path
116 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
117 if (FileExists(Method
) == false)
119 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
120 if (Access
== "https")
121 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
126 clog
<< "Starting method '" << Method
<< '\'' << endl
;
129 int Pipes
[4] = {-1,-1,-1,-1};
130 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
132 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
133 for (int I
= 0; I
!= 4; I
++)
137 for (int I
= 0; I
!= 4; I
++)
138 SetCloseExec(Pipes
[I
],true);
140 // Fork off the process
141 Process
= ExecFork();
145 dup2(Pipes
[1],STDOUT_FILENO
);
146 dup2(Pipes
[2],STDIN_FILENO
);
147 SetCloseExec(STDOUT_FILENO
,false);
148 SetCloseExec(STDIN_FILENO
,false);
149 SetCloseExec(STDERR_FILENO
,false);
152 Args
[0] = Method
.c_str();
154 execv(Args
[0],(char **)Args
);
155 cerr
<< "Failed to exec method " << Args
[0] << endl
;
162 SetNonBlock(Pipes
[0],true);
163 SetNonBlock(Pipes
[3],true);
169 // Read the configuration data
170 if (WaitFd(InFd
) == false ||
171 ReadMessages() == false)
172 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
181 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
182 // ---------------------------------------------------------------------
184 bool pkgAcquire::Worker::ReadMessages()
186 if (::ReadMessages(InFd
,MessageQueue
) == false)
187 return MethodFailure();
191 // Worker::RunMessage - Empty the message queue /*{{{*/
192 // ---------------------------------------------------------------------
193 /* This takes the messages from the message queue and runs them through
194 the parsers in order. */
195 bool pkgAcquire::Worker::RunMessages()
197 while (MessageQueue
.empty() == false)
199 string Message
= MessageQueue
.front();
200 MessageQueue
.erase(MessageQueue
.begin());
203 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
205 // Fetch the message number
207 int Number
= strtol(Message
.c_str(),&End
,10);
208 if (End
== Message
.c_str())
209 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
211 string URI
= LookupTag(Message
,"URI");
212 pkgAcquire::Queue::QItem
*Itm
= 0;
213 if (URI
.empty() == false)
214 Itm
= OwnerQ
->FindItem(URI
,this);
216 // update used mirror
217 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
218 if (!UsedMirror
.empty() &&
220 Itm
->Description
.find(" ") != string::npos
)
222 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
223 // FIXME: will we need this as well?
224 //Itm->ShortDesc = UsedMirror;
227 // Determine the message number and dispatch
232 if (Capabilities(Message
) == false)
233 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
239 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
244 Status
= LookupTag(Message
,"Message");
252 _error
->Error("Method gave invalid 103 Redirect message");
256 string NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
261 // Change the status so that it can be dequeued
262 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
264 pkgAcquire::Item
*Owner
= *O
;
265 Owner
->Status
= pkgAcquire::Item::StatIdle
;
267 // Mark the item as done (taking care of all queues)
268 // and then put it in the main queue again
269 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
270 OwnerQ
->ItemDone(Itm
);
272 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
274 pkgAcquire::Item
*Owner
= *O
;
275 pkgAcquire::ItemDesc desc
= Owner
->GetItemDesc();
277 OwnerQ
->Owner
->Enqueue(desc
);
280 Log
->Done(Owner
->GetItemDesc());
290 _error
->Error("Method gave invalid 200 URI Start message");
296 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
297 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
298 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
300 (*O
)->Start(Message
, TotalSize
);
302 // Display update before completion
305 if (Log
->MorePulses
== true)
306 Log
->Pulse((*O
)->GetOwner());
307 Log
->Fetch((*O
)->GetItemDesc());
319 _error
->Error("Method gave invalid 201 URI Done message");
323 PrepareFiles("201::URIDone", Itm
);
325 // Display update before completion
326 if (Log
!= 0 && Log
->MorePulses
== true)
327 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
328 Log
->Pulse((*O
)->GetOwner());
330 std::string
const filename
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str());
331 HashStringList ReceivedHashes
;
333 // see if we got hashes to verify
334 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
336 std::string
const tagname
= std::string(*type
) + "-Hash";
337 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
338 if (hashsum
.empty() == false)
339 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
341 // not all methods always sent Hashes our way
342 if (ReceivedHashes
.usable() == false)
344 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
345 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
347 Hashes
calc(ExpectedHashes
);
348 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
350 ReceivedHashes
= calc
.GetHashStringList();
355 // only local files can refer other filenames and counting them as fetched would be unfair
356 if (Log
!= NULL
&& filename
!= Itm
->Owner
->DestFile
)
357 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
359 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
360 OwnerQ
->ItemDone(Itm
);
363 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
364 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
365 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
367 pkgAcquire::Item
* const Owner
= *O
;
368 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
369 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
371 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
372 << "ReceivedHash:" << endl
;
373 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
374 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
375 std::clog
<< "ExpectedHash:" << endl
;
376 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
377 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
381 // decide if what we got is what we expected
382 bool consideredOkay
= false;
383 if (ExpectedHashes
.usable())
385 if (ReceivedHashes
.usable() == false)
387 /* IMS-Hits can't be checked here as we will have uncompressed file,
388 but the hashes for the compressed file. What we have was good through
389 so all we have to ensure later is that we are not stalled. */
390 consideredOkay
= isIMSHit
;
392 else if (ReceivedHashes
== ExpectedHashes
)
393 consideredOkay
= true;
395 consideredOkay
= false;
398 else if (Owner
->HashesRequired() == true)
399 consideredOkay
= false;
401 consideredOkay
= true;
403 if (consideredOkay
== true)
405 Owner
->Done(Message
, ReceivedHashes
, Config
);
407 // Log that we are done
411 Log
->IMSHit(Owner
->GetItemDesc());
413 Log
->Done(Owner
->GetItemDesc());
418 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
419 Owner
->Failed(Message
,Config
);
422 Log
->Fail(Owner
->GetItemDesc());
434 std::string
const msg
= LookupTag(Message
,"Message");
435 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
439 PrepareFiles("400::URIFailure", Itm
);
441 // Display update before completion
442 if (Log
!= 0 && Log
->MorePulses
== true)
443 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
444 Log
->Pulse((*O
)->GetOwner());
446 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
447 OwnerQ
->ItemDone(Itm
);
450 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
453 if(LookupTag(Message
,"FailReason") == "Timeout" ||
454 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
455 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
456 LookupTag(Message
,"FailReason") == "ConnectionRefused")
457 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
459 (*O
)->Failed(Message
,Config
);
462 Log
->Fail((*O
)->GetItemDesc());
469 // 401 General Failure
471 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
476 MediaChange(Message
);
483 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
484 // ---------------------------------------------------------------------
485 /* This parses the capabilities message and dumps it into the configuration
487 bool pkgAcquire::Worker::Capabilities(string Message
)
492 Config
->Version
= LookupTag(Message
,"Version");
493 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
494 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
495 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
496 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
497 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
498 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
503 clog
<< "Configured access method " << Config
->Access
<< endl
;
504 clog
<< "Version:" << Config
->Version
<<
505 " SingleInstance:" << Config
->SingleInstance
<<
506 " Pipeline:" << Config
->Pipeline
<<
507 " SendConfig:" << Config
->SendConfig
<<
508 " LocalOnly: " << Config
->LocalOnly
<<
509 " NeedsCleanup: " << Config
->NeedsCleanup
<<
510 " Removable: " << Config
->Removable
<< endl
;
516 // Worker::MediaChange - Request a media change /*{{{*/
517 // ---------------------------------------------------------------------
519 bool pkgAcquire::Worker::MediaChange(string Message
)
521 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
524 string Media
= LookupTag(Message
,"Media");
525 string Drive
= LookupTag(Message
,"Drive");
526 ostringstream msg
,status
;
527 ioprintf(msg
,_("Please insert the disc labeled: "
529 "in the drive '%s' and press enter."),
530 Media
.c_str(),Drive
.c_str());
531 status
<< "media-change: " // message
532 << Media
<< ":" // media
533 << Drive
<< ":" // drive
534 << msg
.str() // l10n message
537 std::string
const dlstatus
= status
.str();
538 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
541 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
542 LookupTag(Message
,"Drive")) == false)
545 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
547 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
554 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
556 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
562 // Worker::SendConfiguration - Send the config to the method /*{{{*/
563 // ---------------------------------------------------------------------
565 bool pkgAcquire::Worker::SendConfiguration()
567 if (Config
->SendConfig
== false)
573 /* Write out all of the configuration directives by walking the
574 configuration tree */
575 std::ostringstream Message
;
576 Message
<< "601 Configuration\n";
577 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
581 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
582 OutQueue
+= Message
.str();
588 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
589 // ---------------------------------------------------------------------
590 /* Send a URI Acquire message to the method */
591 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
596 string Message
= "600 URI Acquire\n";
597 Message
.reserve(300);
598 Message
+= "URI: " + Item
->URI
;
599 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
601 HashStringList
const hsl
= Item
->GetExpectedHashes();
602 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
603 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
605 if (hsl
.FileSize() == 0)
607 unsigned long long FileSize
= Item
->GetMaximumSize();
611 strprintf(MaximumSize
, "%llu", FileSize
);
612 Message
+= "\nMaximum-Size: " + MaximumSize
;
616 Item
->SyncDestinationFiles();
617 Message
+= Item
->Custom600Headers();
620 if (RealFileExists(Item
->Owner
->DestFile
))
622 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
623 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
624 SandboxUser
.c_str(), "root", 0600);
628 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
635 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
636 // ---------------------------------------------------------------------
638 bool pkgAcquire::Worker::OutFdReady()
643 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
645 while (Res
< 0 && errno
== EINTR
);
648 return MethodFailure();
650 OutQueue
.erase(0,Res
);
651 if (OutQueue
.empty() == true)
657 // Worker::InFdRead - In bound FD is ready /*{{{*/
658 // ---------------------------------------------------------------------
660 bool pkgAcquire::Worker::InFdReady()
662 if (ReadMessages() == false)
668 // Worker::MethodFailure - Called when the method fails /*{{{*/
669 // ---------------------------------------------------------------------
670 /* This is called when the method is believed to have failed, probably because
672 bool pkgAcquire::Worker::MethodFailure()
674 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
676 // do not reap the child here to show meaningfull error to the user
677 ExecWait(Process
,Access
.c_str(),false);
686 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
691 // Worker::Pulse - Called periodically /*{{{*/
692 // ---------------------------------------------------------------------
694 void pkgAcquire::Worker::Pulse()
696 if (CurrentItem
== 0)
700 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
702 CurrentSize
= Buf
.st_size
;
705 // Worker::ItemDone - Called when the current item is finished /*{{{*/
706 // ---------------------------------------------------------------------
708 void pkgAcquire::Worker::ItemDone()
716 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
718 if (RealFileExists(Itm
->Owner
->DestFile
))
720 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
721 std::string
const filename
= Itm
->Owner
->DestFile
;
722 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
724 pkgAcquire::Item
const * const Owner
= *O
;
725 if (Owner
->DestFile
== filename
)
727 unlink(Owner
->DestFile
.c_str());
728 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
730 // diferent mounts can't happen for us as we download to lists/ by default,
731 // but if the system is reused by others the locations can potentially be on
732 // different disks, so use symlink as poor-men replacement.
733 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
734 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
735 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
741 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
742 unlink((*O
)->DestFile
.c_str());