1 // -*- mode: cpp; mode: fold -*- 
   3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $ 
   4 /* ###################################################################### 
   8    The worker process can startup either as a Configuration prober 
   9    or as a queue runner. As a configuration prober it only reads the 
  10    configuration message and  
  12    ##################################################################### */ 
  14 // Include Files                                                        /*{{{*/ 
  17 #include <apt-pkg/acquire.h> 
  18 #include <apt-pkg/acquire-worker.h> 
  19 #include <apt-pkg/acquire-item.h> 
  20 #include <apt-pkg/configuration.h> 
  21 #include <apt-pkg/error.h> 
  22 #include <apt-pkg/fileutl.h> 
  23 #include <apt-pkg/strutl.h> 
  24 #include <apt-pkg/hashes.h> 
  37 #include <sys/types.h> 
  46 // Worker::Worker - Constructor for Queue startup                       /*{{{*/ 
  47 // --------------------------------------------------------------------- 
  49 pkgAcquire::Worker::Worker(Queue 
*Q
,MethodConfig 
*Cnf
, 
  50                            pkgAcquireStatus 
*log
) : d(NULL
), Log(log
) 
  62 // Worker::Worker - Constructor for method config startup               /*{{{*/ 
  63 // --------------------------------------------------------------------- 
  65 pkgAcquire::Worker::Worker(MethodConfig 
*Cnf
) : d(NULL
), OwnerQ(NULL
), Config(Cnf
), 
  66                                                 Access(Cnf
->Access
), CurrentItem(NULL
), 
  67                                                 CurrentSize(0), TotalSize(0) 
  72 // Worker::Construct - Constructor helper                               /*{{{*/ 
  73 // --------------------------------------------------------------------- 
  75 void pkgAcquire::Worker::Construct() 
  84    Debug 
= _config
->FindB("Debug::pkgAcquire::Worker",false); 
  87 // Worker::~Worker - Destructor                                         /*{{{*/ 
  88 // --------------------------------------------------------------------- 
  90 pkgAcquire::Worker::~Worker() 
  97       /* Closing of stdin is the signal to exit and die when the process 
  98          indicates it needs cleanup */ 
  99       if (Config
->NeedsCleanup 
== false) 
 100          kill(Process
,SIGINT
); 
 101       ExecWait(Process
,Access
.c_str(),true); 
 105 // Worker::Start - Start the worker process                             /*{{{*/ 
 106 // --------------------------------------------------------------------- 
 107 /* This forks the method and inits the communication channel */ 
 108 bool pkgAcquire::Worker::Start() 
 110    // Get the method path 
 111    string Method 
= _config
->FindDir("Dir::Bin::Methods") + Access
; 
 112    if (FileExists(Method
) == false) 
 114       _error
->Error(_("The method driver %s could not be found."),Method
.c_str()); 
 115       if (Access 
== "https") 
 116          _error
->Notice(_("Is the package %s installed?"), "apt-transport-https"); 
 121       clog 
<< "Starting method '" << Method 
<< '\'' << endl
; 
 124    int Pipes
[4] = {-1,-1,-1,-1}; 
 125    if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0) 
 127       _error
->Errno("pipe","Failed to create IPC pipe to subprocess"); 
 128       for (int I 
= 0; I 
!= 4; I
++) 
 132    for (int I 
= 0; I 
!= 4; I
++) 
 133       SetCloseExec(Pipes
[I
],true); 
 135    // Fork off the process 
 136    Process 
= ExecFork(); 
 140       dup2(Pipes
[1],STDOUT_FILENO
); 
 141       dup2(Pipes
[2],STDIN_FILENO
); 
 142       SetCloseExec(STDOUT_FILENO
,false); 
 143       SetCloseExec(STDIN_FILENO
,false); 
 144       SetCloseExec(STDERR_FILENO
,false); 
 147       Args
[0] = Method
.c_str(); 
 149       execv(Args
[0],(char **)Args
); 
 150       cerr 
<< "Failed to exec method " << Args
[0] << endl
; 
 157    SetNonBlock(Pipes
[0],true); 
 158    SetNonBlock(Pipes
[3],true); 
 164    // Read the configuration data 
 165    if (WaitFd(InFd
) == false || 
 166        ReadMessages() == false) 
 167       return _error
->Error(_("Method %s did not start correctly"),Method
.c_str()); 
 176 // Worker::ReadMessages - Read all pending messages into the list       /*{{{*/ 
 177 // --------------------------------------------------------------------- 
 179 bool pkgAcquire::Worker::ReadMessages() 
 181    if (::ReadMessages(InFd
,MessageQueue
) == false) 
 182       return MethodFailure(); 
 186 // Worker::RunMessage - Empty the message queue                         /*{{{*/ 
 187 // --------------------------------------------------------------------- 
 188 /* This takes the messages from the message queue and runs them through 
 189    the parsers in order. */ 
 190 bool pkgAcquire::Worker::RunMessages() 
 192    while (MessageQueue
.empty() == false) 
 194       string Message 
= MessageQueue
.front(); 
 195       MessageQueue
.erase(MessageQueue
.begin()); 
 198          clog 
<< " <- " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 200       // Fetch the message number 
 202       int Number 
= strtol(Message
.c_str(),&End
,10); 
 203       if (End 
== Message
.c_str()) 
 204          return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str()); 
 206       string URI 
= LookupTag(Message
,"URI"); 
 207       pkgAcquire::Queue::QItem 
*Itm 
= 0; 
 208       if (URI
.empty() == false) 
 209          Itm 
= OwnerQ
->FindItem(URI
,this); 
 211       // update used mirror 
 212       string UsedMirror 
= LookupTag(Message
,"UsedMirror", ""); 
 213       if (!UsedMirror
.empty() && 
 215           Itm
->Description
.find(" ") != string::npos
) 
 217          Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
); 
 218          // FIXME: will we need this as well? 
 219          //Itm->ShortDesc = UsedMirror; 
 222       // Determine the message number and dispatch 
 227          if (Capabilities(Message
) == false) 
 228             return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str()); 
 234             clog 
<< " <- (log) " << LookupTag(Message
,"Message") << endl
; 
 239          Status 
= LookupTag(Message
,"Message"); 
 247                _error
->Error("Method gave invalid 103 Redirect message"); 
 251             string NewURI 
= LookupTag(Message
,"New-URI",URI
.c_str()); 
 256             // Change the status so that it can be dequeued 
 257             for (pkgAcquire::Queue::QItem::owner_iterator O 
= Itm
->Owners
.begin(); O 
!= Itm
->Owners
.end(); ++O
) 
 259                pkgAcquire::Item 
*Owner 
= *O
; 
 260                Owner
->Status 
= pkgAcquire::Item::StatIdle
; 
 262             // Mark the item as done (taking care of all queues) 
 263             // and then put it in the main queue again 
 264             std::vector
<Item
*> const ItmOwners 
= Itm
->Owners
; 
 265             OwnerQ
->ItemDone(Itm
); 
 267             for (pkgAcquire::Queue::QItem::owner_iterator O 
= ItmOwners
.begin(); O 
!= ItmOwners
.end(); ++O
) 
 269                pkgAcquire::Item 
*Owner 
= *O
; 
 270                pkgAcquire::ItemDesc desc 
= Owner
->GetItemDesc(); 
 272                OwnerQ
->Owner
->Enqueue(desc
); 
 275                   Log
->Done(Owner
->GetItemDesc()); 
 285                _error
->Error("Method gave invalid 200 URI Start message"); 
 291             TotalSize 
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10); 
 292             ResumePoint 
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10); 
 293             for (pkgAcquire::Queue::QItem::owner_iterator O 
= Itm
->Owners
.begin(); O 
!= Itm
->Owners
.end(); ++O
) 
 295                (*O
)->Start(Message
, TotalSize
); 
 297                // Display update before completion 
 300                   if (Log
->MorePulses 
== true) 
 301                      Log
->Pulse((*O
)->GetOwner()); 
 302                   Log
->Fetch((*O
)->GetItemDesc()); 
 314                _error
->Error("Method gave invalid 201 URI Done message"); 
 318             PrepareFiles("201::URIDone", Itm
); 
 320             // Display update before completion 
 321             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 322                for (pkgAcquire::Queue::QItem::owner_iterator O 
= Itm
->Owners
.begin(); O 
!= Itm
->Owners
.end(); ++O
) 
 323                   Log
->Pulse((*O
)->GetOwner()); 
 325             std::string 
const filename 
= LookupTag(Message
, "Filename", Itm
->Owner
->DestFile
.c_str()); 
 326             HashStringList ReceivedHashes
; 
 328                // see if we got hashes to verify 
 329                for (char const * const * type 
= HashString::SupportedHashes(); *type 
!= NULL
; ++type
) 
 331                   std::string 
const tagname 
= std::string(*type
) + "-Hash"; 
 332                   std::string 
const hashsum 
= LookupTag(Message
, tagname
.c_str()); 
 333                   if (hashsum
.empty() == false) 
 334                      ReceivedHashes
.push_back(HashString(*type
, hashsum
)); 
 336                // not all methods always sent Hashes our way 
 337                if (ReceivedHashes
.usable() == false) 
 339                   HashStringList 
const ExpectedHashes 
= Itm
->GetExpectedHashes(); 
 340                   if (ExpectedHashes
.usable() == true && RealFileExists(filename
)) 
 342                      Hashes 
calc(ExpectedHashes
); 
 343                      FileFd 
file(filename
, FileFd::ReadOnly
, FileFd::None
); 
 345                      ReceivedHashes 
= calc
.GetHashStringList(); 
 350             // only local files can refer other filenames and counting them as fetched would be unfair 
 351             if (Log 
!=  NULL 
&& filename 
!= Itm
->Owner
->DestFile
) 
 352                Log
->Fetched(ReceivedHashes
.FileSize(),atoi(LookupTag(Message
,"Resume-Point","0").c_str())); 
 354             std::vector
<Item
*> const ItmOwners 
= Itm
->Owners
; 
 355             OwnerQ
->ItemDone(Itm
); 
 358             bool const isIMSHit 
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) || 
 359                StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false); 
 360             for (pkgAcquire::Queue::QItem::owner_iterator O 
= ItmOwners
.begin(); O 
!= ItmOwners
.end(); ++O
) 
 362                pkgAcquire::Item 
* const Owner 
= *O
; 
 363                HashStringList 
const ExpectedHashes 
= Owner
->GetExpectedHashes(); 
 364                if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true) 
 366                   std::clog 
<< "201 URI Done: " << Owner
->DescURI() << endl
 
 367                      << "ReceivedHash:" << endl
; 
 368                   for (HashStringList::const_iterator hs 
= ReceivedHashes
.begin(); hs 
!= ReceivedHashes
.end(); ++hs
) 
 369                      std::clog 
<<  "\t- " << hs
->toStr() << std::endl
; 
 370                   std::clog 
<< "ExpectedHash:" << endl
; 
 371                   for (HashStringList::const_iterator hs 
= ExpectedHashes
.begin(); hs 
!= ExpectedHashes
.end(); ++hs
) 
 372                      std::clog 
<<  "\t- " << hs
->toStr() << std::endl
; 
 376                // decide if what we got is what we expected 
 377                bool consideredOkay 
= false; 
 378                if (ExpectedHashes
.usable()) 
 380                   if (ReceivedHashes
.usable() == false) 
 382                      /* IMS-Hits can't be checked here as we will have uncompressed file, 
 383                         but the hashes for the compressed file. What we have was good through 
 384                         so all we have to ensure later is that we are not stalled. */ 
 385                      consideredOkay 
= isIMSHit
; 
 387                   else if (ReceivedHashes 
== ExpectedHashes
) 
 388                      consideredOkay 
= true; 
 390                      consideredOkay 
= false; 
 393                else if (Owner
->HashesRequired() == true) 
 394                   consideredOkay 
= false; 
 396                   consideredOkay 
= true; 
 398                if (consideredOkay 
== true) 
 400                   Owner
->Done(Message
, ReceivedHashes
, Config
); 
 402                   // Log that we are done 
 406                         Log
->IMSHit(Owner
->GetItemDesc()); 
 408                         Log
->Done(Owner
->GetItemDesc()); 
 413                   Owner
->Status 
= pkgAcquire::Item::StatAuthError
; 
 414                   Owner
->Failed(Message
,Config
); 
 417                      Log
->Fail(Owner
->GetItemDesc()); 
 429                std::string 
const msg 
= LookupTag(Message
,"Message"); 
 430                _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str()); 
 434             PrepareFiles("400::URIFailure", Itm
); 
 436             // Display update before completion 
 437             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 438                for (pkgAcquire::Queue::QItem::owner_iterator O 
= Itm
->Owners
.begin(); O 
!= Itm
->Owners
.end(); ++O
) 
 439                   Log
->Pulse((*O
)->GetOwner()); 
 441             std::vector
<Item
*> const ItmOwners 
= Itm
->Owners
; 
 442             OwnerQ
->ItemDone(Itm
); 
 445             for (pkgAcquire::Queue::QItem::owner_iterator O 
= ItmOwners
.begin(); O 
!= ItmOwners
.end(); ++O
) 
 448                if(LookupTag(Message
,"FailReason") == "Timeout" || 
 449                      LookupTag(Message
,"FailReason") == "TmpResolveFailure" || 
 450                      LookupTag(Message
,"FailReason") == "ResolveFailure" || 
 451                      LookupTag(Message
,"FailReason") == "ConnectionRefused") 
 452                   (*O
)->Status 
= pkgAcquire::Item::StatTransientNetworkError
; 
 454                (*O
)->Failed(Message
,Config
); 
 457                   Log
->Fail((*O
)->GetItemDesc()); 
 464          // 401 General Failure 
 466          _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str()); 
 471          MediaChange(Message
); 
 478 // Worker::Capabilities - 100 Capabilities handler                      /*{{{*/ 
 479 // --------------------------------------------------------------------- 
 480 /* This parses the capabilities message and dumps it into the configuration 
 482 bool pkgAcquire::Worker::Capabilities(string Message
) 
 487    Config
->Version 
= LookupTag(Message
,"Version"); 
 488    Config
->SingleInstance 
= StringToBool(LookupTag(Message
,"Single-Instance"),false); 
 489    Config
->Pipeline 
= StringToBool(LookupTag(Message
,"Pipeline"),false); 
 490    Config
->SendConfig 
= StringToBool(LookupTag(Message
,"Send-Config"),false); 
 491    Config
->LocalOnly 
= StringToBool(LookupTag(Message
,"Local-Only"),false); 
 492    Config
->NeedsCleanup 
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false); 
 493    Config
->Removable 
= StringToBool(LookupTag(Message
,"Removable"),false); 
 498       clog 
<< "Configured access method " << Config
->Access 
<< endl
; 
 499       clog 
<< "Version:" << Config
->Version 
<< 
 500               " SingleInstance:" << Config
->SingleInstance 
<< 
 501               " Pipeline:" << Config
->Pipeline 
<< 
 502               " SendConfig:" << Config
->SendConfig 
<< 
 503               " LocalOnly: " << Config
->LocalOnly 
<< 
 504               " NeedsCleanup: " << Config
->NeedsCleanup 
<< 
 505               " Removable: " << Config
->Removable 
<< endl
; 
 511 // Worker::MediaChange - Request a media change                         /*{{{*/ 
 512 // --------------------------------------------------------------------- 
 514 bool pkgAcquire::Worker::MediaChange(string Message
) 
 516    int status_fd 
= _config
->FindI("APT::Status-Fd",-1); 
 519       string Media 
= LookupTag(Message
,"Media"); 
 520       string Drive 
= LookupTag(Message
,"Drive"); 
 521       ostringstream msg
,status
; 
 522       ioprintf(msg
,_("Please insert the disc labeled: " 
 524                      "in the drive '%s' and press enter."), 
 525                Media
.c_str(),Drive
.c_str()); 
 526       status 
<< "media-change: "  // message 
 527              << Media  
<< ":"     // media 
 528              << Drive  
<< ":"     // drive 
 529              << msg
.str()         // l10n message 
 532       std::string 
const dlstatus 
= status
.str(); 
 533       FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size()); 
 536    if (Log 
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"), 
 537                                     LookupTag(Message
,"Drive")) == false) 
 540       snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n"); 
 542          clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 549    snprintf(S
,sizeof(S
),"603 Media Changed\n\n"); 
 551       clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 557 // Worker::SendConfiguration - Send the config to the method            /*{{{*/ 
 558 // --------------------------------------------------------------------- 
 560 bool pkgAcquire::Worker::SendConfiguration() 
 562    if (Config
->SendConfig 
== false) 
 568    /* Write out all of the configuration directives by walking the 
 569       configuration tree */ 
 570    std::ostringstream Message
; 
 571    Message 
<< "601 Configuration\n"; 
 572    _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false); 
 576       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
.str(),"\n") << endl
; 
 577    OutQueue 
+= Message
.str(); 
 583 // Worker::QueueItem - Add an item to the outbound queue                /*{{{*/ 
 584 // --------------------------------------------------------------------- 
 585 /* Send a URI Acquire message to the method */ 
 586 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem 
*Item
) 
 591    string Message 
= "600 URI Acquire\n"; 
 592    Message
.reserve(300); 
 593    Message 
+= "URI: " + Item
->URI
; 
 594    Message 
+= "\nFilename: " + Item
->Owner
->DestFile
; 
 596    HashStringList 
const hsl 
= Item
->GetExpectedHashes(); 
 597    for (HashStringList::const_iterator hs 
= hsl
.begin(); hs 
!= hsl
.end(); ++hs
) 
 598       Message 
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue(); 
 600    if (hsl
.FileSize() == 0) 
 602       unsigned long long FileSize 
= Item
->GetMaximumSize(); 
 606          strprintf(MaximumSize
, "%llu", FileSize
); 
 607          Message 
+= "\nMaximum-Size: " + MaximumSize
; 
 611    Item
->SyncDestinationFiles(); 
 612    Message 
+= Item
->Custom600Headers(); 
 615    if (RealFileExists(Item
->Owner
->DestFile
)) 
 617       std::string SandboxUser 
= _config
->Find("APT::Sandbox::User"); 
 618       ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(), 
 619                                      SandboxUser
.c_str(), "root", 0600); 
 623       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 630 // Worker::OutFdRead - Out bound FD is ready                            /*{{{*/ 
 631 // --------------------------------------------------------------------- 
 633 bool pkgAcquire::Worker::OutFdReady() 
 638       Res 
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length()); 
 640    while (Res 
< 0 && errno 
== EINTR
); 
 643       return MethodFailure(); 
 645    OutQueue
.erase(0,Res
); 
 646    if (OutQueue
.empty() == true) 
 652 // Worker::InFdRead - In bound FD is ready                              /*{{{*/ 
 653 // --------------------------------------------------------------------- 
 655 bool pkgAcquire::Worker::InFdReady() 
 657    if (ReadMessages() == false) 
 663 // Worker::MethodFailure - Called when the method fails                 /*{{{*/ 
 664 // --------------------------------------------------------------------- 
 665 /* This is called when the method is believed to have failed, probably because 
 667 bool pkgAcquire::Worker::MethodFailure() 
 669    _error
->Error("Method %s has died unexpectedly!",Access
.c_str()); 
 671    // do not reap the child here to show meaningfull error to the user 
 672    ExecWait(Process
,Access
.c_str(),false); 
 681    MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end()); 
 686 // Worker::Pulse - Called periodically                                  /*{{{*/ 
 687 // --------------------------------------------------------------------- 
 689 void pkgAcquire::Worker::Pulse() 
 691    if (CurrentItem 
== 0) 
 695    if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0) 
 697    CurrentSize 
= Buf
.st_size
; 
 700 // Worker::ItemDone - Called when the current item is finished          /*{{{*/ 
 701 // --------------------------------------------------------------------- 
 703 void pkgAcquire::Worker::ItemDone() 
 711 void pkgAcquire::Worker::PrepareFiles(char const * const caller
, pkgAcquire::Queue::QItem 
const * const Itm
)/*{{{*/ 
 713    if (RealFileExists(Itm
->Owner
->DestFile
)) 
 715       ChangeOwnerAndPermissionOfFile(caller
, Itm
->Owner
->DestFile
.c_str(), "root", "root", 0644); 
 716       std::string 
const filename 
= Itm
->Owner
->DestFile
; 
 717       for (pkgAcquire::Queue::QItem::owner_iterator O 
= Itm
->Owners
.begin(); O 
!= Itm
->Owners
.end(); ++O
) 
 719          pkgAcquire::Item 
const * const Owner 
= *O
; 
 720          if (Owner
->DestFile 
== filename
) 
 722          unlink(Owner
->DestFile
.c_str()); 
 723          if (link(filename
.c_str(), Owner
->DestFile
.c_str()) != 0) 
 725             // different mounts can't happen for us as we download to lists/ by default, 
 726             // but if the system is reused by others the locations can potentially be on 
 727             // different disks, so use symlink as poor-men replacement. 
 728             // FIXME: Real copying as last fallback, but that is costly, so offload to a method preferable 
 729             if (symlink(filename
.c_str(), Owner
->DestFile
.c_str()) != 0) 
 730                _error
->Error("Can't create (sym)link of file %s to %s", filename
.c_str(), Owner
->DestFile
.c_str()); 
 736       for (pkgAcquire::Queue::QItem::owner_iterator O 
= Itm
->Owners
.begin(); O 
!= Itm
->Owners
.end(); ++O
) 
 737          unlink((*O
)->DestFile
.c_str());