1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
43 // Worker::Worker - Constructor for Queue startup /*{{{*/
44 // ---------------------------------------------------------------------
46 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
47 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
59 // Worker::Worker - Constructor for method config startup /*{{{*/
60 // ---------------------------------------------------------------------
62 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
63 Access(Cnf
->Access
), CurrentItem(NULL
),
64 CurrentSize(0), TotalSize(0)
69 // Worker::Construct - Constructor helper /*{{{*/
70 // ---------------------------------------------------------------------
72 void pkgAcquire::Worker::Construct()
81 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
84 // Worker::~Worker - Destructor /*{{{*/
85 // ---------------------------------------------------------------------
87 pkgAcquire::Worker::~Worker()
94 /* Closing of stdin is the signal to exit and die when the process
95 indicates it needs cleanup */
96 if (Config
->NeedsCleanup
== false)
98 ExecWait(Process
,Access
.c_str(),true);
102 // Worker::Start - Start the worker process /*{{{*/
103 // ---------------------------------------------------------------------
104 /* This forks the method and inits the communication channel */
105 bool pkgAcquire::Worker::Start()
107 // Get the method path
108 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
109 if (FileExists(Method
) == false)
111 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
112 if (Access
== "https")
113 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
118 clog
<< "Starting method '" << Method
<< '\'' << endl
;
121 int Pipes
[4] = {-1,-1,-1,-1};
122 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
124 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
125 for (int I
= 0; I
!= 4; I
++)
129 for (int I
= 0; I
!= 4; I
++)
130 SetCloseExec(Pipes
[I
],true);
132 // Fork off the process
133 Process
= ExecFork();
137 dup2(Pipes
[1],STDOUT_FILENO
);
138 dup2(Pipes
[2],STDIN_FILENO
);
139 SetCloseExec(STDOUT_FILENO
,false);
140 SetCloseExec(STDIN_FILENO
,false);
141 SetCloseExec(STDERR_FILENO
,false);
144 Args
[0] = Method
.c_str();
146 execv(Args
[0],(char **)Args
);
147 cerr
<< "Failed to exec method " << Args
[0] << endl
;
154 SetNonBlock(Pipes
[0],true);
155 SetNonBlock(Pipes
[3],true);
161 // Read the configuration data
162 if (WaitFd(InFd
) == false ||
163 ReadMessages() == false)
164 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
173 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
174 // ---------------------------------------------------------------------
176 bool pkgAcquire::Worker::ReadMessages()
178 if (::ReadMessages(InFd
,MessageQueue
) == false)
179 return MethodFailure();
183 // Worker::RunMessage - Empty the message queue /*{{{*/
184 // ---------------------------------------------------------------------
185 /* This takes the messages from the message queue and runs them through
186 the parsers in order. */
187 bool pkgAcquire::Worker::RunMessages()
189 while (MessageQueue
.empty() == false)
191 string Message
= MessageQueue
.front();
192 MessageQueue
.erase(MessageQueue
.begin());
195 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
197 // Fetch the message number
199 int Number
= strtol(Message
.c_str(),&End
,10);
200 if (End
== Message
.c_str())
201 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
203 string URI
= LookupTag(Message
,"URI");
204 pkgAcquire::Queue::QItem
*Itm
= NULL
;
205 if (URI
.empty() == false)
206 Itm
= OwnerQ
->FindItem(URI
,this);
210 // update used mirror
211 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
212 if (UsedMirror
.empty() == false)
214 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
215 (*O
)->UsedMirror
= UsedMirror
;
217 if (Itm
->Description
.find(" ") != string::npos
)
218 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
222 // Determine the message number and dispatch
227 if (Capabilities(Message
) == false)
228 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
234 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
239 Status
= LookupTag(Message
,"Message");
247 _error
->Error("Method gave invalid 103 Redirect message");
251 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
256 // Change the status so that it can be dequeued
257 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
258 (*O
)->Status
= pkgAcquire::Item::StatIdle
;
259 // Mark the item as done (taking care of all queues)
260 // and then put it in the main queue again
261 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
262 OwnerQ
->ItemDone(Itm
);
264 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
266 pkgAcquire::Item
*Owner
= *O
;
267 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
268 // if we change site, treat it as a mirror change
269 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
271 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
272 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
274 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
275 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
277 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
278 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
279 if (desc
.Description
.find(" ") != string::npos
)
280 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
285 OwnerQ
->Owner
->Enqueue(desc
);
298 _error
->Error("Method gave invalid 200 URI Start message");
304 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
305 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
306 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
308 (*O
)->Start(Message
, TotalSize
);
310 // Display update before completion
313 if (Log
->MorePulses
== true)
314 Log
->Pulse((*O
)->GetOwner());
315 Log
->Fetch((*O
)->GetItemDesc());
327 _error
->Error("Method gave invalid 201 URI Done message");
331 PrepareFiles("201::URIDone", Itm
);
333 // Display update before completion
334 if (Log
!= 0 && Log
->MorePulses
== true)
335 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
336 Log
->Pulse((*O
)->GetOwner());
338 HashStringList ReceivedHashes
;
340 std::string
const givenfilename
= LookupTag(Message
, "Filename");
341 std::string
const filename
= givenfilename
.empty() ? Itm
->Owner
->DestFile
: givenfilename
;
342 // see if we got hashes to verify
343 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
345 std::string
const tagname
= std::string(*type
) + "-Hash";
346 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
347 if (hashsum
.empty() == false)
348 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
350 // not all methods always sent Hashes our way
351 if (ReceivedHashes
.usable() == false)
353 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
354 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
356 Hashes
calc(ExpectedHashes
);
357 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
359 ReceivedHashes
= calc
.GetHashStringList();
363 // only local files can refer other filenames and counting them as fetched would be unfair
364 if (Log
!= NULL
&& Itm
->Owner
->Complete
== false && Itm
->Owner
->Local
== false && givenfilename
== filename
)
365 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
368 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
369 OwnerQ
->ItemDone(Itm
);
372 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
373 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
374 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
376 pkgAcquire::Item
* const Owner
= *O
;
377 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
378 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
380 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
381 << "ReceivedHash:" << endl
;
382 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
383 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
384 std::clog
<< "ExpectedHash:" << endl
;
385 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
386 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
390 // decide if what we got is what we expected
391 bool consideredOkay
= false;
392 if (ExpectedHashes
.usable())
394 if (ReceivedHashes
.usable() == false)
396 /* IMS-Hits can't be checked here as we will have uncompressed file,
397 but the hashes for the compressed file. What we have was good through
398 so all we have to ensure later is that we are not stalled. */
399 consideredOkay
= isIMSHit
;
401 else if (ReceivedHashes
== ExpectedHashes
)
402 consideredOkay
= true;
404 consideredOkay
= false;
407 else if (Owner
->HashesRequired() == true)
408 consideredOkay
= false;
410 consideredOkay
= true;
412 if (consideredOkay
== true)
413 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
414 else // hashsum mismatch
415 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
417 if (consideredOkay
== true)
419 Owner
->Done(Message
, ReceivedHashes
, Config
);
423 Log
->IMSHit(Owner
->GetItemDesc());
425 Log
->Done(Owner
->GetItemDesc());
430 Owner
->Failed(Message
,Config
);
432 Log
->Fail(Owner
->GetItemDesc());
444 std::string
const msg
= LookupTag(Message
,"Message");
445 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
449 PrepareFiles("400::URIFailure", Itm
);
451 // Display update before completion
452 if (Log
!= 0 && Log
->MorePulses
== true)
453 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
454 Log
->Pulse((*O
)->GetOwner());
456 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
457 OwnerQ
->ItemDone(Itm
);
460 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
463 if(LookupTag(Message
,"FailReason") == "Timeout" ||
464 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
465 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
466 LookupTag(Message
,"FailReason") == "ConnectionRefused")
467 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
469 (*O
)->Failed(Message
,Config
);
472 Log
->Fail((*O
)->GetItemDesc());
479 // 401 General Failure
481 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
486 MediaChange(Message
);
493 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
494 // ---------------------------------------------------------------------
495 /* This parses the capabilities message and dumps it into the configuration
497 bool pkgAcquire::Worker::Capabilities(string Message
)
502 Config
->Version
= LookupTag(Message
,"Version");
503 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
504 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
505 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
506 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
507 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
508 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
513 clog
<< "Configured access method " << Config
->Access
<< endl
;
514 clog
<< "Version:" << Config
->Version
<<
515 " SingleInstance:" << Config
->SingleInstance
<<
516 " Pipeline:" << Config
->Pipeline
<<
517 " SendConfig:" << Config
->SendConfig
<<
518 " LocalOnly: " << Config
->LocalOnly
<<
519 " NeedsCleanup: " << Config
->NeedsCleanup
<<
520 " Removable: " << Config
->Removable
<< endl
;
526 // Worker::MediaChange - Request a media change /*{{{*/
527 // ---------------------------------------------------------------------
529 bool pkgAcquire::Worker::MediaChange(string Message
)
531 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
534 string Media
= LookupTag(Message
,"Media");
535 string Drive
= LookupTag(Message
,"Drive");
536 ostringstream msg
,status
;
537 ioprintf(msg
,_("Please insert the disc labeled: "
539 "in the drive '%s' and press [Enter]."),
540 Media
.c_str(),Drive
.c_str());
541 status
<< "media-change: " // message
542 << Media
<< ":" // media
543 << Drive
<< ":" // drive
544 << msg
.str() // l10n message
547 std::string
const dlstatus
= status
.str();
548 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
551 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
552 LookupTag(Message
,"Drive")) == false)
555 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
557 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
564 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
566 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
572 // Worker::SendConfiguration - Send the config to the method /*{{{*/
573 // ---------------------------------------------------------------------
575 bool pkgAcquire::Worker::SendConfiguration()
577 if (Config
->SendConfig
== false)
583 /* Write out all of the configuration directives by walking the
584 configuration tree */
585 std::ostringstream Message
;
586 Message
<< "601 Configuration\n";
587 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
591 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
592 OutQueue
+= Message
.str();
598 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
599 // ---------------------------------------------------------------------
600 /* Send a URI Acquire message to the method */
601 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
606 string Message
= "600 URI Acquire\n";
607 Message
.reserve(300);
608 Message
+= "URI: " + Item
->URI
;
609 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
611 HashStringList
const hsl
= Item
->GetExpectedHashes();
612 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
613 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
615 if (hsl
.FileSize() == 0)
617 unsigned long long FileSize
= Item
->GetMaximumSize();
621 strprintf(MaximumSize
, "%llu", FileSize
);
622 Message
+= "\nMaximum-Size: " + MaximumSize
;
626 Item
->SyncDestinationFiles();
627 Message
+= Item
->Custom600Headers();
630 if (RealFileExists(Item
->Owner
->DestFile
))
632 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
633 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
634 SandboxUser
.c_str(), "root", 0600);
638 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
645 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
646 // ---------------------------------------------------------------------
648 bool pkgAcquire::Worker::OutFdReady()
653 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
655 while (Res
< 0 && errno
== EINTR
);
658 return MethodFailure();
660 OutQueue
.erase(0,Res
);
661 if (OutQueue
.empty() == true)
667 // Worker::InFdRead - In bound FD is ready /*{{{*/
668 // ---------------------------------------------------------------------
670 bool pkgAcquire::Worker::InFdReady()
672 if (ReadMessages() == false)
678 // Worker::MethodFailure - Called when the method fails /*{{{*/
679 // ---------------------------------------------------------------------
680 /* This is called when the method is believed to have failed, probably because
682 bool pkgAcquire::Worker::MethodFailure()
684 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
686 // do not reap the child here to show meaningfull error to the user
687 ExecWait(Process
,Access
.c_str(),false);
696 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
701 // Worker::Pulse - Called periodically /*{{{*/
702 // ---------------------------------------------------------------------
704 void pkgAcquire::Worker::Pulse()
706 if (CurrentItem
== 0)
710 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
712 CurrentSize
= Buf
.st_size
;
715 // Worker::ItemDone - Called when the current item is finished /*{{{*/
716 // ---------------------------------------------------------------------
718 void pkgAcquire::Worker::ItemDone()
726 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
728 if (RealFileExists(Itm
->Owner
->DestFile
))
730 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
731 std::string
const filename
= Itm
->Owner
->DestFile
;
732 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
734 pkgAcquire::Item
const * const Owner
= *O
;
735 if (Owner
->DestFile
== filename
)
737 unlink(Owner
->DestFile
.c_str());
738 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
740 // different mounts can't happen for us as we download to lists/ by default,
741 // but if the system is reused by others the locations can potentially be on
742 // different disks, so use symlink as poor-men replacement.
743 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
744 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
745 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
751 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
752 unlink((*O
)->DestFile
.c_str());