1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
37 #include <sys/types.h>
46 // Worker::Worker - Constructor for Queue startup /*{{{*/
47 // ---------------------------------------------------------------------
49 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
50 pkgAcquireStatus
*log
) : d(NULL
), Log(log
)
62 // Worker::Worker - Constructor for method config startup /*{{{*/
63 // ---------------------------------------------------------------------
65 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
),
66 Access(Cnf
->Access
), CurrentItem(NULL
),
67 CurrentSize(0), TotalSize(0)
72 // Worker::Construct - Constructor helper /*{{{*/
73 // ---------------------------------------------------------------------
75 void pkgAcquire::Worker::Construct()
84 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
87 // Worker::~Worker - Destructor /*{{{*/
88 // ---------------------------------------------------------------------
90 pkgAcquire::Worker::~Worker()
97 /* Closing of stdin is the signal to exit and die when the process
98 indicates it needs cleanup */
99 if (Config
->NeedsCleanup
== false)
100 kill(Process
,SIGINT
);
101 ExecWait(Process
,Access
.c_str(),true);
105 // Worker::Start - Start the worker process /*{{{*/
106 // ---------------------------------------------------------------------
107 /* This forks the method and inits the communication channel */
108 bool pkgAcquire::Worker::Start()
110 // Get the method path
111 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
112 if (FileExists(Method
) == false)
114 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
115 if (Access
== "https")
116 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
121 clog
<< "Starting method '" << Method
<< '\'' << endl
;
124 int Pipes
[4] = {-1,-1,-1,-1};
125 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
127 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
128 for (int I
= 0; I
!= 4; I
++)
132 for (int I
= 0; I
!= 4; I
++)
133 SetCloseExec(Pipes
[I
],true);
135 // Fork off the process
136 Process
= ExecFork();
140 dup2(Pipes
[1],STDOUT_FILENO
);
141 dup2(Pipes
[2],STDIN_FILENO
);
142 SetCloseExec(STDOUT_FILENO
,false);
143 SetCloseExec(STDIN_FILENO
,false);
144 SetCloseExec(STDERR_FILENO
,false);
147 Args
[0] = Method
.c_str();
149 execv(Args
[0],(char **)Args
);
150 cerr
<< "Failed to exec method " << Args
[0] << endl
;
157 SetNonBlock(Pipes
[0],true);
158 SetNonBlock(Pipes
[3],true);
164 // Read the configuration data
165 if (WaitFd(InFd
) == false ||
166 ReadMessages() == false)
167 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
176 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
177 // ---------------------------------------------------------------------
179 bool pkgAcquire::Worker::ReadMessages()
181 if (::ReadMessages(InFd
,MessageQueue
) == false)
182 return MethodFailure();
186 // Worker::RunMessage - Empty the message queue /*{{{*/
187 // ---------------------------------------------------------------------
188 /* This takes the messages from the message queue and runs them through
189 the parsers in order. */
190 bool pkgAcquire::Worker::RunMessages()
192 while (MessageQueue
.empty() == false)
194 string Message
= MessageQueue
.front();
195 MessageQueue
.erase(MessageQueue
.begin());
198 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
200 // Fetch the message number
202 int Number
= strtol(Message
.c_str(),&End
,10);
203 if (End
== Message
.c_str())
204 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
206 string URI
= LookupTag(Message
,"URI");
207 pkgAcquire::Queue::QItem
*Itm
= NULL
;
208 if (URI
.empty() == false)
209 Itm
= OwnerQ
->FindItem(URI
,this);
213 // update used mirror
214 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
215 if (UsedMirror
.empty() == false)
217 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
218 (*O
)->UsedMirror
= UsedMirror
;
220 if (Itm
->Description
.find(" ") != string::npos
)
221 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
225 // Determine the message number and dispatch
230 if (Capabilities(Message
) == false)
231 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
237 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
242 Status
= LookupTag(Message
,"Message");
250 _error
->Error("Method gave invalid 103 Redirect message");
254 std::string
const NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
259 // Change the status so that it can be dequeued
260 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
261 (*O
)->Status
= pkgAcquire::Item::StatIdle
;
262 // Mark the item as done (taking care of all queues)
263 // and then put it in the main queue again
264 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
265 OwnerQ
->ItemDone(Itm
);
267 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
269 pkgAcquire::Item
*Owner
= *O
;
270 pkgAcquire::ItemDesc
&desc
= Owner
->GetItemDesc();
271 // if we change site, treat it as a mirror change
272 if (URI::SiteOnly(NewURI
) != URI::SiteOnly(desc
.URI
))
274 std::string
const OldSite
= desc
.Description
.substr(0, desc
.Description
.find(" "));
275 if (likely(APT::String::Startswith(desc
.URI
, OldSite
)))
277 std::string
const OldExtra
= desc
.URI
.substr(OldSite
.length() + 1);
278 if (likely(APT::String::Endswith(NewURI
, OldExtra
)))
280 std::string
const NewSite
= NewURI
.substr(0, NewURI
.length() - OldExtra
.length());
281 Owner
->UsedMirror
= URI::ArchiveOnly(NewSite
);
282 if (desc
.Description
.find(" ") != string::npos
)
283 desc
.Description
.replace(0, desc
.Description
.find(" "), Owner
->UsedMirror
);
288 OwnerQ
->Owner
->Enqueue(desc
);
301 _error
->Error("Method gave invalid 200 URI Start message");
307 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
308 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
309 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
311 (*O
)->Start(Message
, TotalSize
);
313 // Display update before completion
316 if (Log
->MorePulses
== true)
317 Log
->Pulse((*O
)->GetOwner());
318 Log
->Fetch((*O
)->GetItemDesc());
330 _error
->Error("Method gave invalid 201 URI Done message");
334 PrepareFiles("201::URIDone", Itm
);
336 // Display update before completion
337 if (Log
!= 0 && Log
->MorePulses
== true)
338 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
339 Log
->Pulse((*O
)->GetOwner());
341 std::string
const filename
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str());
342 HashStringList ReceivedHashes
;
344 // see if we got hashes to verify
345 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
347 std::string
const tagname
= std::string(*type
) + "-Hash";
348 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
349 if (hashsum
.empty() == false)
350 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
352 // not all methods always sent Hashes our way
353 if (ReceivedHashes
.usable() == false)
355 HashStringList
const ExpectedHashes
= Itm
->GetExpectedHashes();
356 if (ExpectedHashes
.usable() == true && RealFileExists(filename
))
358 Hashes
calc(ExpectedHashes
);
359 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
361 ReceivedHashes
= calc
.GetHashStringList();
366 // only local files can refer other filenames and counting them as fetched would be unfair
367 if (Log
!= NULL
&& filename
!= Itm
->Owner
->DestFile
)
368 Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str()));
370 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
371 OwnerQ
->ItemDone(Itm
);
374 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
375 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
376 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
378 pkgAcquire::Item
* const Owner
= *O
;
379 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
380 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
382 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
383 << "ReceivedHash:" << endl
;
384 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
385 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
386 std::clog
<< "ExpectedHash:" << endl
;
387 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
388 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
392 // decide if what we got is what we expected
393 bool consideredOkay
= false;
394 if (ExpectedHashes
.usable())
396 if (ReceivedHashes
.usable() == false)
398 /* IMS-Hits can't be checked here as we will have uncompressed file,
399 but the hashes for the compressed file. What we have was good through
400 so all we have to ensure later is that we are not stalled. */
401 consideredOkay
= isIMSHit
;
403 else if (ReceivedHashes
== ExpectedHashes
)
404 consideredOkay
= true;
406 consideredOkay
= false;
409 else if (Owner
->HashesRequired() == true)
410 consideredOkay
= false;
412 consideredOkay
= true;
414 if (consideredOkay
== true)
416 Owner
->Done(Message
, ReceivedHashes
, Config
);
418 // Log that we are done
422 Log
->IMSHit(Owner
->GetItemDesc());
424 Log
->Done(Owner
->GetItemDesc());
429 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
430 Owner
->Failed(Message
,Config
);
433 Log
->Fail(Owner
->GetItemDesc());
445 std::string
const msg
= LookupTag(Message
,"Message");
446 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
450 PrepareFiles("400::URIFailure", Itm
);
452 // Display update before completion
453 if (Log
!= 0 && Log
->MorePulses
== true)
454 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
455 Log
->Pulse((*O
)->GetOwner());
457 std::vector
<Item
*> const ItmOwners
= Itm
->Owners
;
458 OwnerQ
->ItemDone(Itm
);
461 for (pkgAcquire::Queue::QItem::owner_iterator O
= ItmOwners
.begin(); O
!= ItmOwners
.end(); ++O
)
464 if(LookupTag(Message
,"FailReason") == "Timeout" ||
465 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
466 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
467 LookupTag(Message
,"FailReason") == "ConnectionRefused")
468 (*O
)->Status
= pkgAcquire::Item::StatTransientNetworkError
;
470 (*O
)->Failed(Message
,Config
);
473 Log
->Fail((*O
)->GetItemDesc());
480 // 401 General Failure
482 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
487 MediaChange(Message
);
494 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
495 // ---------------------------------------------------------------------
496 /* This parses the capabilities message and dumps it into the configuration
498 bool pkgAcquire::Worker::Capabilities(string Message
)
503 Config
->Version
= LookupTag(Message
,"Version");
504 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
505 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
506 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
507 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
508 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
509 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
514 clog
<< "Configured access method " << Config
->Access
<< endl
;
515 clog
<< "Version:" << Config
->Version
<<
516 " SingleInstance:" << Config
->SingleInstance
<<
517 " Pipeline:" << Config
->Pipeline
<<
518 " SendConfig:" << Config
->SendConfig
<<
519 " LocalOnly: " << Config
->LocalOnly
<<
520 " NeedsCleanup: " << Config
->NeedsCleanup
<<
521 " Removable: " << Config
->Removable
<< endl
;
527 // Worker::MediaChange - Request a media change /*{{{*/
528 // ---------------------------------------------------------------------
530 bool pkgAcquire::Worker::MediaChange(string Message
)
532 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
535 string Media
= LookupTag(Message
,"Media");
536 string Drive
= LookupTag(Message
,"Drive");
537 ostringstream msg
,status
;
538 ioprintf(msg
,_("Please insert the disc labeled: "
540 "in the drive '%s' and press enter."),
541 Media
.c_str(),Drive
.c_str());
542 status
<< "media-change: " // message
543 << Media
<< ":" // media
544 << Drive
<< ":" // drive
545 << msg
.str() // l10n message
548 std::string
const dlstatus
= status
.str();
549 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
552 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
553 LookupTag(Message
,"Drive")) == false)
556 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
558 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
565 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
567 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
573 // Worker::SendConfiguration - Send the config to the method /*{{{*/
574 // ---------------------------------------------------------------------
576 bool pkgAcquire::Worker::SendConfiguration()
578 if (Config
->SendConfig
== false)
584 /* Write out all of the configuration directives by walking the
585 configuration tree */
586 std::ostringstream Message
;
587 Message
<< "601 Configuration\n";
588 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
592 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
593 OutQueue
+= Message
.str();
599 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
600 // ---------------------------------------------------------------------
601 /* Send a URI Acquire message to the method */
602 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
607 string Message
= "600 URI Acquire\n";
608 Message
.reserve(300);
609 Message
+= "URI: " + Item
->URI
;
610 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
612 HashStringList
const hsl
= Item
->GetExpectedHashes();
613 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
614 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
616 if (hsl
.FileSize() == 0)
618 unsigned long long FileSize
= Item
->GetMaximumSize();
622 strprintf(MaximumSize
, "%llu", FileSize
);
623 Message
+= "\nMaximum-Size: " + MaximumSize
;
627 Item
->SyncDestinationFiles();
628 Message
+= Item
->Custom600Headers();
631 if (RealFileExists(Item
->Owner
->DestFile
))
633 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
634 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
635 SandboxUser
.c_str(), "root", 0600);
639 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
646 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
647 // ---------------------------------------------------------------------
649 bool pkgAcquire::Worker::OutFdReady()
654 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
656 while (Res
< 0 && errno
== EINTR
);
659 return MethodFailure();
661 OutQueue
.erase(0,Res
);
662 if (OutQueue
.empty() == true)
668 // Worker::InFdRead - In bound FD is ready /*{{{*/
669 // ---------------------------------------------------------------------
671 bool pkgAcquire::Worker::InFdReady()
673 if (ReadMessages() == false)
679 // Worker::MethodFailure - Called when the method fails /*{{{*/
680 // ---------------------------------------------------------------------
681 /* This is called when the method is believed to have failed, probably because
683 bool pkgAcquire::Worker::MethodFailure()
685 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
687 // do not reap the child here to show meaningfull error to the user
688 ExecWait(Process
,Access
.c_str(),false);
697 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
702 // Worker::Pulse - Called periodically /*{{{*/
703 // ---------------------------------------------------------------------
705 void pkgAcquire::Worker::Pulse()
707 if (CurrentItem
== 0)
711 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
713 CurrentSize
= Buf
.st_size
;
716 // Worker::ItemDone - Called when the current item is finished /*{{{*/
717 // ---------------------------------------------------------------------
719 void pkgAcquire::Worker::ItemDone()
727 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem
const * const Itm
)/*{{{*/
729 if (RealFileExists(Itm
->Owner
->DestFile
))
731 ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644);
732 std::string
const filename
= Itm
->Owner
->DestFile
;
733 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
735 pkgAcquire::Item
const * const Owner
= *O
;
736 if (Owner
->DestFile
== filename
)
738 unlink(Owner
->DestFile
.c_str());
739 if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
741 // different mounts can't happen for us as we download to lists/ by default,
742 // but if the system is reused by others the locations can potentially be on
743 // different disks, so use symlink as poor-men replacement.
744 // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable
745 if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0)
746 _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str());
752 for (pkgAcquire::Queue::QItem::owner_iterator O
= Itm
->Owners
.begin(); O
!= Itm
->Owners
.end(); ++O
)
753 unlink((*O
)->DestFile
.c_str());