1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
37 #include <sys/types.h>
46 // Worker::Worker - Constructor for Queue startup /*{{{*/
47 // ---------------------------------------------------------------------
49 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
50 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
62 // Worker::Worker - Constructor for method config startup /*{{{*/
63 // ---------------------------------------------------------------------
65 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
66 Access(Cnf
->Access
), CurrentItem(NULL
),
67 CurrentSize(0), TotalSize(0)
72 // Worker::Construct - Constructor helper /*{{{*/
73 // ---------------------------------------------------------------------
75 void pkgAcquire::Worker::Construct()
84 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
87 // Worker::~Worker - Destructor /*{{{*/
88 // ---------------------------------------------------------------------
90 pkgAcquire::Worker::~Worker()
97 /* Closing of stdin is the signal to exit and die when the process
98 indicates it needs cleanup */
99 if (Config
->NeedsCleanup
== false)
100 kill(Process
,SIGINT
);
101 ExecWait(Process
,Access
.c_str(),true);
105 // Worker::Start - Start the worker process /*{{{*/
106 // ---------------------------------------------------------------------
107 /* This forks the method and inits the communication channel */
108 bool pkgAcquire::Worker::Start()
110 // Get the method path
111 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
112 if (FileExists(Method
) == false)
114 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
115 if (Access
== "https")
116 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
121 clog
<< "Starting method '" << Method
<< '\'' << endl
;
124 int Pipes
[4] = {-1,-1,-1,-1};
125 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
127 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
128 for (int I
= 0; I
!= 4; I
++)
132 for (int I
= 0; I
!= 4; I
++)
133 SetCloseExec(Pipes
[I
],true);
135 // Fork off the process
136 Process
= ExecFork();
140 dup2(Pipes
[1],STDOUT_FILENO
);
141 dup2(Pipes
[2],STDIN_FILENO
);
142 SetCloseExec(STDOUT_FILENO
,false);
143 SetCloseExec(STDIN_FILENO
,false);
144 SetCloseExec(STDERR_FILENO
,false);
147 Args
[0] = Method
.c_str();
149 execv(Args
[0],(char **)Args
);
150 cerr
<< "Failed to exec method " << Args
[0] << endl
;
157 SetNonBlock(Pipes
[0],true);
158 SetNonBlock(Pipes
[3],true);
164 // Read the configuration data
165 if (WaitFd(InFd
) == false ||
166 ReadMessages() == false)
167 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
176 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
177 // ---------------------------------------------------------------------
179 bool pkgAcquire::Worker::ReadMessages()
181 if (::ReadMessages(InFd
,MessageQueue
) == false)
182 return MethodFailure();
186 // Worker::RunMessage - Empty the message queue /*{{{*/
187 // ---------------------------------------------------------------------
188 /* This takes the messages from the message queue and runs them through
189 the parsers in order. */
190 bool pkgAcquire::Worker::RunMessages()
192 while (MessageQueue
.empty() == false)
194 string Message
= MessageQueue
.front();
195 MessageQueue
.erase(MessageQueue
.begin());
198 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
200 // Fetch the message number
202 int Number
= strtol(Message
.c_str(),&End
,10);
203 if (End
== Message
.c_str())
204 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
206 string URI
= LookupTag(Message
,"URI");
207 pkgAcquire::Queue::QItem
*Itm
= NULL
;
208 if (URI
.empty() == false)
209 Itm
= OwnerQ
->FindItem(URI
,this);
213 // update used mirror
214 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
215 if (UsedMirror
.empty() == false)
217 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
218 (*O
)->UsedMirror
= UsedMirror
;
220 if (Itm
->Description
.find(" ") != string::npos
)
221 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
225 // Determine the message number and dispatch
230 if (Capabilities(Message
) == false)
231 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
237 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
242 Status
= LookupTag(Message
,"Message");
250 _error
->Error("Method gave invalid 103 Redirect message");
254 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
259 // Change the status so that it can be dequeued
260 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
261 (*O
)->Status
= pkgAcquire::Item::StatIdle
;
262 // Mark the item as done (taking care of all queues)
263 // and then put it in the main queue again
264 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
265 OwnerQ
->ItemDone(Itm
);
267 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
269 pkgAcquire::Item
*Owner
= *O
;
270 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
271 // if we change site, treat it as a mirror change
272 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
274 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
275 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
277 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
278 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
280 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
281 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
282 if (desc
.Description
.find(" ") != string::npos
)
283 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
288 OwnerQ
->Owner
->Enqueue(desc
);
301 _error
->Error("Method gave invalid 200 URI Start message");
307 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
308 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
309 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
311 (*O
)->Start(Message
, TotalSize
);
313 // Display update before completion
316 if (Log
->MorePulses
== true)
317 Log
->Pulse((*O
)->GetOwner());
318 Log
->Fetch((*O
)->GetItemDesc());
330 _error
->Error("Method gave invalid 201 URI Done message");
334 PrepareFiles("201::URIDone", Itm
);
336 // Display update before completion
337 if (Log
!= 0 && Log
->MorePulses
== true)
338 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
339 Log
->Pulse((*O
)->GetOwner());
341 std::string
const filename
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str());
342 HashStringList ReceivedHashes
;
344 // see if we got hashes to verify
345 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
347 std::string
const tagname
= std::string(*type
) + "-Hash";
348 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
349 if (hashsum
.empty() == false)
350 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
352 // not all methods always sent Hashes our way
353 if (ReceivedHashes
.usable() == false)
355 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
356 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
358 Hashes
calc(ExpectedHashes
);
359 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
361 ReceivedHashes
= calc
.GetHashStringList();
366 // only local files can refer other filenames and counting them as fetched would be unfair
367 if (Log
!= NULL
&& filename
!= Itm
->Owner
->DestFile
)
368 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
370 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
371 OwnerQ
->ItemDone(Itm
);
374 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
375 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
376 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
378 pkgAcquire::Item
* const Owner
= *O
;
379 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
380 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
382 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
383 << "ReceivedHash:" << endl
;
384 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
385 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
386 std::clog
<< "ExpectedHash:" << endl
;
387 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
388 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
392 // decide if what we got is what we expected
393 bool consideredOkay
= false;
394 if (ExpectedHashes
.usable())
396 if (ReceivedHashes
.usable() == false)
398 /* IMS-Hits can't be checked here as we will have uncompressed file,
399 but the hashes for the compressed file. What we have was good through
400 so all we have to ensure later is that we are not stalled. */
401 consideredOkay
= isIMSHit
;
403 else if (ReceivedHashes
== ExpectedHashes
)
404 consideredOkay
= true;
406 consideredOkay
= false;
409 else if (Owner
->HashesRequired() == true)
410 consideredOkay
= false;
412 consideredOkay
= true;
414 if (consideredOkay
== true)
415 consideredOkay
= Owner
->VerifyDone(Message
, Config
);
416 else // hashsum mismatch
417 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
419 if (consideredOkay
== true)
421 Owner
->Done(Message
, ReceivedHashes
, Config
);
425 Log
->IMSHit(Owner
->GetItemDesc());
427 Log
->Done(Owner
->GetItemDesc());
432 Owner
->Failed(Message
,Config
);
434 Log
->Fail(Owner
->GetItemDesc());
446 std::string
const msg
= LookupTag(Message
,"Message");
447 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
451 PrepareFiles("400::URIFailure", Itm
);
453 // Display update before completion
454 if (Log
!= 0 && Log
->MorePulses
== true)
455 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
456 Log
->Pulse((*O
)->GetOwner());
458 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
459 OwnerQ
->ItemDone(Itm
);
462 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
465 if(LookupTag(Message
,"FailReason") == "Timeout" ||
466 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
467 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
468 LookupTag(Message
,"FailReason") == "ConnectionRefused")
469 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
471 (*O
)->Failed(Message
,Config
);
474 Log
->Fail((*O
)->GetItemDesc());
481 // 401 General Failure
483 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
488 MediaChange(Message
);
495 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
496 // ---------------------------------------------------------------------
497 /* This parses the capabilities message and dumps it into the configuration
499 bool pkgAcquire::Worker::Capabilities(string Message
)
504 Config
->Version
= LookupTag(Message
,"Version");
505 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
506 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
507 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
508 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
509 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
510 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
515 clog
<< "Configured access method " << Config
->Access
<< endl
;
516 clog
<< "Version:" << Config
->Version
<<
517 " SingleInstance:" << Config
->SingleInstance
<<
518 " Pipeline:" << Config
->Pipeline
<<
519 " SendConfig:" << Config
->SendConfig
<<
520 " LocalOnly: " << Config
->LocalOnly
<<
521 " NeedsCleanup: " << Config
->NeedsCleanup
<<
522 " Removable: " << Config
->Removable
<< endl
;
528 // Worker::MediaChange - Request a media change /*{{{*/
529 // ---------------------------------------------------------------------
531 bool pkgAcquire::Worker::MediaChange(string Message
)
533 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
536 string Media
= LookupTag(Message
,"Media");
537 string Drive
= LookupTag(Message
,"Drive");
538 ostringstream msg
,status
;
539 ioprintf(msg
,_("Please insert the disc labeled: "
541 "in the drive '%s' and press [Enter]."),
542 Media
.c_str(),Drive
.c_str());
543 status
<< "media-change: " // message
544 << Media
<< ":" // media
545 << Drive
<< ":" // drive
546 << msg
.str() // l10n message
549 std::string
const dlstatus
= status
.str();
550 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
553 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
554 LookupTag(Message
,"Drive")) == false)
557 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
559 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
566 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
568 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
574 // Worker::SendConfiguration - Send the config to the method /*{{{*/
575 // ---------------------------------------------------------------------
577 bool pkgAcquire::Worker::SendConfiguration()
579 if (Config
->SendConfig
== false)
585 /* Write out all of the configuration directives by walking the
586 configuration tree */
587 std::ostringstream Message
;
588 Message
<< "601 Configuration\n";
589 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
593 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
594 OutQueue
+= Message
.str();
600 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
601 // ---------------------------------------------------------------------
602 /* Send a URI Acquire message to the method */
603 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
608 string Message
= "600 URI Acquire\n";
609 Message
.reserve(300);
610 Message
+= "URI: " + Item
->URI
;
611 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
613 HashStringList
const hsl
= Item
->GetExpectedHashes();
614 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
615 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
617 if (hsl
.FileSize() == 0)
619 unsigned long long FileSize
= Item
->GetMaximumSize();
623 strprintf(MaximumSize
, "%llu", FileSize
);
624 Message
+= "\nMaximum-Size: " + MaximumSize
;
628 Item
->SyncDestinationFiles();
629 Message
+= Item
->Custom600Headers();
632 if (RealFileExists(Item
->Owner
->DestFile
))
634 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
635 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
636 SandboxUser
.c_str(), "root", 0600);
640 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
647 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
648 // ---------------------------------------------------------------------
650 bool pkgAcquire::Worker::OutFdReady()
655 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
657 while (Res
< 0 && errno
== EINTR
);
660 return MethodFailure();
662 OutQueue
.erase(0,Res
);
663 if (OutQueue
.empty() == true)
669 // Worker::InFdRead - In bound FD is ready /*{{{*/
670 // ---------------------------------------------------------------------
672 bool pkgAcquire::Worker::InFdReady()
674 if (ReadMessages() == false)
680 // Worker::MethodFailure - Called when the method fails /*{{{*/
681 // ---------------------------------------------------------------------
682 /* This is called when the method is believed to have failed, probably because
684 bool pkgAcquire::Worker::MethodFailure()
686 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
688 // do not reap the child here to show meaningfull error to the user
689 ExecWait(Process
,Access
.c_str(),false);
698 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
703 // Worker::Pulse - Called periodically /*{{{*/
704 // ---------------------------------------------------------------------
706 void pkgAcquire::Worker::Pulse()
708 if (CurrentItem
== 0)
712 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
714 CurrentSize
= Buf
.st_size
;
717 // Worker::ItemDone - Called when the current item is finished /*{{{*/
718 // ---------------------------------------------------------------------
720 void pkgAcquire::Worker::ItemDone()
728 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
730 if (RealFileExists(Itm
->Owner
->DestFile
))
732 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
733 std::string
const filename
= Itm
->Owner
->DestFile
;
734 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
736 pkgAcquire::Item
const * const Owner
= *O
;
737 if (Owner
->DestFile
== filename
)
739 unlink(Owner
->DestFile
.c_str());
740 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
742 // different mounts can't happen for us as we download to lists/ by default,
743 // but if the system is reused by others the locations can potentially be on
744 // different disks, so use symlink as poor-men replacement.
745 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
746 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
747 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
753 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
754 unlink((*O
)->DestFile
.c_str());