1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
37 #include <sys/types.h>
46 // Worker::Worker - Constructor for Queue startup /*{{{*/
47 // ---------------------------------------------------------------------
49 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
50 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
62 // Worker::Worker - Constructor for method config startup /*{{{*/
63 // ---------------------------------------------------------------------
65 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
66 Access(Cnf
->Access
), CurrentItem(NULL
),
67 CurrentSize(0), TotalSize(0)
72 // Worker::Construct - Constructor helper /*{{{*/
73 // ---------------------------------------------------------------------
75 void pkgAcquire::Worker::Construct()
84 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
87 // Worker::~Worker - Destructor /*{{{*/
88 // ---------------------------------------------------------------------
90 pkgAcquire::Worker::~Worker()
97 /* Closing of stdin is the signal to exit and die when the process
98 indicates it needs cleanup */
99 if (Config
->NeedsCleanup
== false)
100 kill(Process
,SIGINT
);
101 ExecWait(Process
,Access
.c_str(),true);
105 // Worker::Start - Start the worker process /*{{{*/
106 // ---------------------------------------------------------------------
107 /* This forks the method and inits the communication channel */
108 bool pkgAcquire::Worker::Start()
110 // Get the method path
111 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
112 if (FileExists(Method
) == false)
114 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
115 if (Access
== "https")
116 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
121 clog
<< "Starting method '" << Method
<< '\'' << endl
;
124 int Pipes
[4] = {-1,-1,-1,-1};
125 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
127 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
128 for (int I
= 0; I
!= 4; I
++)
132 for (int I
= 0; I
!= 4; I
++)
133 SetCloseExec(Pipes
[I
],true);
135 // Fork off the process
136 Process
= ExecFork();
140 dup2(Pipes
[1],STDOUT_FILENO
);
141 dup2(Pipes
[2],STDIN_FILENO
);
142 SetCloseExec(STDOUT_FILENO
,false);
143 SetCloseExec(STDIN_FILENO
,false);
144 SetCloseExec(STDERR_FILENO
,false);
147 Args
[0] = Method
.c_str();
149 execv(Args
[0],(char **)Args
);
150 cerr
<< "Failed to exec method " << Args
[0] << endl
;
157 SetNonBlock(Pipes
[0],true);
158 SetNonBlock(Pipes
[3],true);
164 // Read the configuration data
165 if (WaitFd(InFd
) == false ||
166 ReadMessages() == false)
167 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
176 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
177 // ---------------------------------------------------------------------
179 bool pkgAcquire::Worker::ReadMessages()
181 if (::ReadMessages(InFd
,MessageQueue
) == false)
182 return MethodFailure();
186 // Worker::RunMessage - Empty the message queue /*{{{*/
187 // ---------------------------------------------------------------------
188 /* This takes the messages from the message queue and runs them through
189 the parsers in order. */
190 bool pkgAcquire::Worker::RunMessages()
192 while (MessageQueue
.empty() == false)
194 string Message
= MessageQueue
.front();
195 MessageQueue
.erase(MessageQueue
.begin());
198 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
200 // Fetch the message number
202 int Number
= strtol(Message
.c_str(),&End
,10);
203 if (End
== Message
.c_str())
204 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
206 string URI
= LookupTag(Message
,"URI");
207 pkgAcquire::Queue::QItem
*Itm
= 0;
208 if (URI
.empty() == false)
209 Itm
= OwnerQ
->FindItem(URI
,this);
211 // update used mirror
212 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
213 if (!UsedMirror
.empty() &&
215 Itm
->Description
.find(" ") != string::npos
)
217 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
218 // FIXME: will we need this as well?
219 //Itm->ShortDesc = UsedMirror;
222 // Determine the message number and dispatch
227 if (Capabilities(Message
) == false)
228 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
234 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
239 Status
= LookupTag(Message
,"Message");
247 _error
->Error("Method gave invalid 103 Redirect message");
251 string NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
256 // Change the status so that it can be dequeued
257 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
259 pkgAcquire::Item
*Owner
= *O
;
260 Owner
->Status
= pkgAcquire::Item::StatIdle
;
262 // Mark the item as done (taking care of all queues)
263 // and then put it in the main queue again
264 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
265 OwnerQ
->ItemDone(Itm
);
267 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
269 pkgAcquire::Item
*Owner
= *O
;
270 pkgAcquire::ItemDesc desc
= Owner
->GetItemDesc();
272 OwnerQ
->Owner
->Enqueue(desc
);
275 Log
->Done(Owner
->GetItemDesc());
285 _error
->Error("Method gave invalid 200 URI Start message");
291 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
292 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
293 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
295 (*O
)->Start(Message
, TotalSize
);
297 // Display update before completion
300 if (Log
->MorePulses
== true)
301 Log
->Pulse((*O
)->GetOwner());
302 Log
->Fetch((*O
)->GetItemDesc());
314 _error
->Error("Method gave invalid 201 URI Done message");
318 PrepareFiles("201::URIDone", Itm
);
320 // Display update before completion
321 if (Log
!= 0 && Log
->MorePulses
== true)
322 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
323 Log
->Pulse((*O
)->GetOwner());
325 std::string
const filename
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str());
326 HashStringList ReceivedHashes
;
328 // see if we got hashes to verify
329 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
331 std::string
const tagname
= std::string(*type
) + "-Hash";
332 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
333 if (hashsum
.empty() == false)
334 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
336 // not all methods always sent Hashes our way
337 if (ReceivedHashes
.usable() == false)
339 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
340 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
342 Hashes
calc(ExpectedHashes
);
343 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
345 ReceivedHashes
= calc
.GetHashStringList();
350 // only local files can refer other filenames and counting them as fetched would be unfair
351 if (Log
!= NULL
&& filename
!= Itm
->Owner
->DestFile
)
352 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
354 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
355 OwnerQ
->ItemDone(Itm
);
358 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
359 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
360 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
362 pkgAcquire::Item
* const Owner
= *O
;
363 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
364 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
366 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
367 << "ReceivedHash:" << endl
;
368 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
369 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
370 std::clog
<< "ExpectedHash:" << endl
;
371 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
372 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
376 // decide if what we got is what we expected
377 bool consideredOkay
= false;
378 if (ExpectedHashes
.usable())
380 if (ReceivedHashes
.usable() == false)
382 /* IMS-Hits can't be checked here as we will have uncompressed file,
383 but the hashes for the compressed file. What we have was good through
384 so all we have to ensure later is that we are not stalled. */
385 consideredOkay
= isIMSHit
;
387 else if (ReceivedHashes
== ExpectedHashes
)
388 consideredOkay
= true;
390 consideredOkay
= false;
393 else if (Owner
->HashesRequired() == true)
394 consideredOkay
= false;
396 consideredOkay
= true;
398 if (consideredOkay
== true)
400 Owner
->Done(Message
, ReceivedHashes
, Config
);
402 // Log that we are done
406 Log
->IMSHit(Owner
->GetItemDesc());
408 Log
->Done(Owner
->GetItemDesc());
413 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
414 Owner
->Failed(Message
,Config
);
417 Log
->Fail(Owner
->GetItemDesc());
429 std::string
const msg
= LookupTag(Message
,"Message");
430 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
434 PrepareFiles("400::URIFailure", Itm
);
436 // Display update before completion
437 if (Log
!= 0 && Log
->MorePulses
== true)
438 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
439 Log
->Pulse((*O
)->GetOwner());
441 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
442 OwnerQ
->ItemDone(Itm
);
445 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
448 if(LookupTag(Message
,"FailReason") == "Timeout" ||
449 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
450 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
451 LookupTag(Message
,"FailReason") == "ConnectionRefused")
452 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
454 (*O
)->Failed(Message
,Config
);
457 Log
->Fail((*O
)->GetItemDesc());
464 // 401 General Failure
466 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
471 MediaChange(Message
);
478 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
479 // ---------------------------------------------------------------------
480 /* This parses the capabilities message and dumps it into the configuration
482 bool pkgAcquire::Worker::Capabilities(string Message
)
487 Config
->Version
= LookupTag(Message
,"Version");
488 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
489 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
490 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
491 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
492 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
493 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
498 clog
<< "Configured access method " << Config
->Access
<< endl
;
499 clog
<< "Version:" << Config
->Version
<<
500 " SingleInstance:" << Config
->SingleInstance
<<
501 " Pipeline:" << Config
->Pipeline
<<
502 " SendConfig:" << Config
->SendConfig
<<
503 " LocalOnly: " << Config
->LocalOnly
<<
504 " NeedsCleanup: " << Config
->NeedsCleanup
<<
505 " Removable: " << Config
->Removable
<< endl
;
511 // Worker::MediaChange - Request a media change /*{{{*/
512 // ---------------------------------------------------------------------
514 bool pkgAcquire::Worker::MediaChange(string Message
)
516 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
519 string Media
= LookupTag(Message
,"Media");
520 string Drive
= LookupTag(Message
,"Drive");
521 ostringstream msg
,status
;
522 ioprintf(msg
,_("Please insert the disc labeled: "
524 "in the drive '%s' and press enter."),
525 Media
.c_str(),Drive
.c_str());
526 status
<< "media-change: " // message
527 << Media
<< ":" // media
528 << Drive
<< ":" // drive
529 << msg
.str() // l10n message
532 std::string
const dlstatus
= status
.str();
533 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
536 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
537 LookupTag(Message
,"Drive")) == false)
540 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
542 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
549 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
551 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
557 // Worker::SendConfiguration - Send the config to the method /*{{{*/
558 // ---------------------------------------------------------------------
560 bool pkgAcquire::Worker::SendConfiguration()
562 if (Config
->SendConfig
== false)
568 /* Write out all of the configuration directives by walking the
569 configuration tree */
570 std::ostringstream Message
;
571 Message
<< "601 Configuration\n";
572 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
576 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
577 OutQueue
+= Message
.str();
583 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
584 // ---------------------------------------------------------------------
585 /* Send a URI Acquire message to the method */
586 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
591 string Message
= "600 URI Acquire\n";
592 Message
.reserve(300);
593 Message
+= "URI: " + Item
->URI
;
594 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
596 HashStringList
const hsl
= Item
->GetExpectedHashes();
597 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
598 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
600 if (hsl
.FileSize() == 0)
602 unsigned long long FileSize
= Item
->GetMaximumSize();
606 strprintf(MaximumSize
, "%llu", FileSize
);
607 Message
+= "\nMaximum-Size: " + MaximumSize
;
611 Item
->SyncDestinationFiles();
612 Message
+= Item
->Custom600Headers();
615 if (RealFileExists(Item
->Owner
->DestFile
))
617 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
618 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
619 SandboxUser
.c_str(), "root", 0600);
623 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
630 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
631 // ---------------------------------------------------------------------
633 bool pkgAcquire::Worker::OutFdReady()
638 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
640 while (Res
< 0 && errno
== EINTR
);
643 return MethodFailure();
645 OutQueue
.erase(0,Res
);
646 if (OutQueue
.empty() == true)
652 // Worker::InFdRead - In bound FD is ready /*{{{*/
653 // ---------------------------------------------------------------------
655 bool pkgAcquire::Worker::InFdReady()
657 if (ReadMessages() == false)
663 // Worker::MethodFailure - Called when the method fails /*{{{*/
664 // ---------------------------------------------------------------------
665 /* This is called when the method is believed to have failed, probably because
667 bool pkgAcquire::Worker::MethodFailure()
669 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
671 // do not reap the child here to show meaningfull error to the user
672 ExecWait(Process
,Access
.c_str(),false);
681 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
686 // Worker::Pulse - Called periodically /*{{{*/
687 // ---------------------------------------------------------------------
689 void pkgAcquire::Worker::Pulse()
691 if (CurrentItem
== 0)
695 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
697 CurrentSize
= Buf
.st_size
;
700 // Worker::ItemDone - Called when the current item is finished /*{{{*/
701 // ---------------------------------------------------------------------
703 void pkgAcquire::Worker::ItemDone()
711 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
713 if (RealFileExists(Itm
->Owner
->DestFile
))
715 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
716 std::string
const filename
= Itm
->Owner
->DestFile
;
717 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
719 pkgAcquire::Item
const * const Owner
= *O
;
720 if (Owner
->DestFile
== filename
)
722 unlink(Owner
->DestFile
.c_str());
723 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
725 // different mounts can't happen for us as we download to lists/ by default,
726 // but if the system is reused by others the locations can potentially be on
727 // different disks, so use symlink as poor-men replacement.
728 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
729 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
730 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
736 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
737 unlink((*O
)->DestFile
.c_str());