]>
git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
   1 // -*- mode: cpp; mode: fold -*- 
   3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $ 
   4 /* ###################################################################### 
   8    The worker process can startup either as a Configuration prober 
   9    or as a queue runner. As a configuration prober it only reads the 
  10    configuration message and  
  12    ##################################################################### */ 
  14 // Include Files                                                        /*{{{*/ 
  16 #pragma implementation "apt-pkg/acquire-worker.h" 
  18 #include <apt-pkg/acquire-worker.h> 
  19 #include <apt-pkg/acquire-item.h> 
  20 #include <apt-pkg/configuration.h> 
  21 #include <apt-pkg/error.h> 
  22 #include <apt-pkg/fileutl.h> 
  23 #include <apt-pkg/strutl.h> 
  40 // Worker::Worker - Constructor for Queue startup                       /*{{{*/ 
  41 // --------------------------------------------------------------------- 
  43 pkgAcquire::Worker::Worker(Queue 
*Q
,MethodConfig 
*Cnf
, 
  44                            pkgAcquireStatus 
*Log
) : Log(Log
) 
  56 // Worker::Worker - Constructor for method config startup               /*{{{*/ 
  57 // --------------------------------------------------------------------- 
  59 pkgAcquire::Worker::Worker(MethodConfig 
*Cnf
) 
  71 // Worker::Construct - Constructor helper                               /*{{{*/ 
  72 // --------------------------------------------------------------------- 
  74 void pkgAcquire::Worker::Construct() 
  83    Debug 
= _config
->FindB("Debug::pkgAcquire::Worker",false); 
  86 // Worker::~Worker - Destructor                                         /*{{{*/ 
  87 // --------------------------------------------------------------------- 
  89 pkgAcquire::Worker::~Worker() 
  96       /* Closing of stdin is the signal to exit and die when the process 
  97          indicates it needs cleanup */ 
  98       if (Config
->NeedsCleanup 
== false) 
 100       ExecWait(Process
,Access
.c_str(),true); 
 104 // Worker::Start - Start the worker process                             /*{{{*/ 
 105 // --------------------------------------------------------------------- 
 106 /* This forks the method and inits the communication channel */ 
 107 bool pkgAcquire::Worker::Start() 
 109    // Get the method path 
 110    string Method 
= _config
->FindDir("Dir::Bin::Methods") + Access
; 
 111    if (FileExists(Method
) == false) 
 112       return _error
->Error(_("The method driver %s could not be found."),Method
.c_str()); 
 115       clog 
<< "Starting method '" << Method 
<< '\'' << endl
; 
 118    int Pipes
[4] = {-1,-1,-1,-1}; 
 119    if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0) 
 121       _error
->Errno("pipe","Failed to create IPC pipe to subprocess"); 
 122       for (int I 
= 0; I 
!= 4; I
++) 
 126    for (int I 
= 0; I 
!= 4; I
++) 
 127       SetCloseExec(Pipes
[I
],true); 
 129    // Fork off the process 
 130    Process 
= ExecFork(); 
 134       dup2(Pipes
[1],STDOUT_FILENO
); 
 135       dup2(Pipes
[2],STDIN_FILENO
); 
 136       SetCloseExec(STDOUT_FILENO
,false); 
 137       SetCloseExec(STDIN_FILENO
,false);       
 138       SetCloseExec(STDERR_FILENO
,false); 
 141       Args
[0] = Method
.c_str(); 
 143       execv(Args
[0],(char **)Args
); 
 144       cerr 
<< "Failed to exec method " << Args
[0] << endl
; 
 151    SetNonBlock(Pipes
[0],true); 
 152    SetNonBlock(Pipes
[3],true); 
 158    // Read the configuration data 
 159    if (WaitFd(InFd
) == false || 
 160        ReadMessages() == false) 
 161       return _error
->Error(_("Method %s did not start correctly"),Method
.c_str()); 
 170 // Worker::ReadMessages - Read all pending messages into the list       /*{{{*/ 
 171 // --------------------------------------------------------------------- 
 173 bool pkgAcquire::Worker::ReadMessages() 
 175    if (::ReadMessages(InFd
,MessageQueue
) == false) 
 176       return MethodFailure(); 
 180 // Worker::RunMessage - Empty the message queue                         /*{{{*/ 
 181 // --------------------------------------------------------------------- 
 182 /* This takes the messages from the message queue and runs them through 
 183    the parsers in order. */ 
 184 bool pkgAcquire::Worker::RunMessages() 
 186    while (MessageQueue
.empty() == false) 
 188       string Message 
= MessageQueue
.front(); 
 189       MessageQueue
.erase(MessageQueue
.begin()); 
 192          clog 
<< " <- " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 194       // Fetch the message number 
 196       int Number 
= strtol(Message
.c_str(),&End
,10); 
 197       if (End 
== Message
.c_str()) 
 198          return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str()); 
 200       string URI 
= LookupTag(Message
,"URI"); 
 201       pkgAcquire::Queue::QItem 
*Itm 
= 0; 
 202       if (URI
.empty() == false) 
 203          Itm 
= OwnerQ
->FindItem(URI
,this); 
 205       // Determine the message number and dispatch 
 210          if (Capabilities(Message
) == false) 
 211             return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str()); 
 217             clog 
<< " <- (log) " << LookupTag(Message
,"Message") << endl
; 
 222          Status 
= LookupTag(Message
,"Message"); 
 230                _error
->Error("Method gave invalid 200 URI Start message"); 
 236             TotalSize 
= atoi(LookupTag(Message
,"Size","0").c_str()); 
 237             ResumePoint 
= atoi(LookupTag(Message
,"Resume-Point","0").c_str()); 
 238             Itm
->Owner
->Start(Message
,atoi(LookupTag(Message
,"Size","0").c_str())); 
 240             // Display update before completion 
 241             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 242                Log
->Pulse(Itm
->Owner
->GetOwner()); 
 255                _error
->Error("Method gave invalid 201 URI Done message"); 
 259             pkgAcquire::Item 
*Owner 
= Itm
->Owner
; 
 260             pkgAcquire::ItemDesc Desc 
= *Itm
; 
 262             // Display update before completion 
 263             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 264                Log
->Pulse(Owner
->GetOwner()); 
 266             OwnerQ
->ItemDone(Itm
); 
 267             if (TotalSize 
!= 0 && 
 268                 (unsigned)atoi(LookupTag(Message
,"Size","0").c_str()) != TotalSize
) 
 269                _error
->Warning("Bizarre Error - File size is not what the server reported %s %lu", 
 270                                LookupTag(Message
,"Size","0").c_str(),TotalSize
); 
 272             Owner
->Done(Message
,atoi(LookupTag(Message
,"Size","0").c_str()), 
 273                         LookupTag(Message
,"MD5-Hash"),Config
); 
 276             // Log that we are done 
 279                if (StringToBool(LookupTag(Message
,"IMS-Hit"),false) == true || 
 280                    StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false) == true) 
 282                   /* Hide 'hits' for local only sources - we also manage to 
 284                   if (Config
->LocalOnly 
== false) 
 298                _error
->Error("Method gave invalid 400 URI Failure message"); 
 302             // Display update before completion 
 303             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 304                Log
->Pulse(Itm
->Owner
->GetOwner()); 
 306             pkgAcquire::Item 
*Owner 
= Itm
->Owner
; 
 307             pkgAcquire::ItemDesc Desc 
= *Itm
; 
 308             OwnerQ
->ItemDone(Itm
); 
 309             Owner
->Failed(Message
,Config
); 
 318          // 401 General Failure 
 320          _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str()); 
 325          MediaChange(Message
);  
 332 // Worker::Capabilities - 100 Capabilities handler                      /*{{{*/ 
 333 // --------------------------------------------------------------------- 
 334 /* This parses the capabilities message and dumps it into the configuration 
 336 bool pkgAcquire::Worker::Capabilities(string Message
) 
 341    Config
->Version 
= LookupTag(Message
,"Version"); 
 342    Config
->SingleInstance 
= StringToBool(LookupTag(Message
,"Single-Instance"),false); 
 343    Config
->Pipeline 
= StringToBool(LookupTag(Message
,"Pipeline"),false); 
 344    Config
->SendConfig 
= StringToBool(LookupTag(Message
,"Send-Config"),false); 
 345    Config
->LocalOnly 
= StringToBool(LookupTag(Message
,"Local-Only"),false); 
 346    Config
->NeedsCleanup 
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false); 
 347    Config
->Removable 
= StringToBool(LookupTag(Message
,"Removable"),false); 
 352       clog 
<< "Configured access method " << Config
->Access 
<< endl
; 
 353       clog 
<< "Version:" << Config
->Version 
<< 
 354               " SingleInstance:" << Config
->SingleInstance 
<< 
 355               " Pipeline:" << Config
->Pipeline 
<<  
 356               " SendConfig:" << Config
->SendConfig 
<<  
 357               " LocalOnly: " << Config
->LocalOnly 
<<  
 358               " NeedsCleanup: " << Config
->NeedsCleanup 
<<  
 359               " Removable: " << Config
->Removable 
<< endl
; 
 365 // Worker::MediaChange - Request a media change                         /*{{{*/ 
 366 // --------------------------------------------------------------------- 
 368 bool pkgAcquire::Worker::MediaChange(string Message
) 
 370    if (Log 
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"), 
 371                                     LookupTag(Message
,"Drive")) == false) 
 374       snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n"); 
 376          clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 383    snprintf(S
,sizeof(S
),"603 Media Changed\n\n"); 
 385       clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 391 // Worker::SendConfiguration - Send the config to the method            /*{{{*/ 
 392 // --------------------------------------------------------------------- 
 394 bool pkgAcquire::Worker::SendConfiguration() 
 396    if (Config
->SendConfig 
== false) 
 402    string Message 
= "601 Configuration\n"; 
 403    Message
.reserve(2000); 
 405    /* Write out all of the configuration directives by walking the  
 406       configuration tree */ 
 407    const Configuration::Item 
*Top 
= _config
->Tree(0); 
 410       if (Top
->Value
.empty() == false) 
 412          string Line 
= "Config-Item: " + QuoteString(Top
->FullTag(),"=\"\n") + "="; 
 413          Line 
+= QuoteString(Top
->Value
,"\n") + '\n'; 
 423       while (Top 
!= 0 && Top
->Next 
== 0) 
 431       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 438 // Worker::QueueItem - Add an item to the outbound queue                /*{{{*/ 
 439 // --------------------------------------------------------------------- 
 440 /* Send a URI Acquire message to the method */ 
 441 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem 
*Item
) 
 446    string Message 
= "600 URI Acquire\n"; 
 447    Message
.reserve(300); 
 448    Message 
+= "URI: " + Item
->URI
; 
 449    Message 
+= "\nFilename: " + Item
->Owner
->DestFile
; 
 450    Message 
+= Item
->Owner
->Custom600Headers(); 
 454       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 461 // Worker::OutFdRead - Out bound FD is ready                            /*{{{*/ 
 462 // --------------------------------------------------------------------- 
 464 bool pkgAcquire::Worker::OutFdReady() 
 469       Res 
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length()); 
 471    while (Res 
< 0 && errno 
== EINTR
); 
 474       return MethodFailure(); 
 476    // Hmm.. this should never happen. 
 480    OutQueue
.erase(0,Res
); 
 481    if (OutQueue
.empty() == true) 
 487 // Worker::InFdRead - In bound FD is ready                              /*{{{*/ 
 488 // --------------------------------------------------------------------- 
 490 bool pkgAcquire::Worker::InFdReady() 
 492    if (ReadMessages() == false) 
 498 // Worker::MethodFailure - Called when the method fails                 /*{{{*/ 
 499 // --------------------------------------------------------------------- 
 500 /* This is called when the method is belived to have failed, probably because 
 502 bool pkgAcquire::Worker::MethodFailure() 
 504    _error
->Error("Method %s has died unexpectedly!",Access
.c_str()); 
 506    ExecWait(Process
,Access
.c_str(),true); 
 515    MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end()); 
 520 // Worker::Pulse - Called periodically                                  /*{{{*/ 
 521 // --------------------------------------------------------------------- 
 523 void pkgAcquire::Worker::Pulse() 
 525    if (CurrentItem 
== 0) 
 529    if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0) 
 531    CurrentSize 
= Buf
.st_size
; 
 533    // Hmm? Should not happen... 
 534    if (CurrentSize 
> TotalSize 
&& TotalSize 
!= 0) 
 535       TotalSize 
= CurrentSize
; 
 538 // Worker::ItemDone - Called when the current item is finished          /*{{{*/ 
 539 // --------------------------------------------------------------------- 
 541 void pkgAcquire::Worker::ItemDone()