]>
git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
   1 // -*- mode: cpp; mode: fold -*- 
   3 // $Id: acquire-worker.cc,v 1.32 2001/02/20 07:03:17 jgg Exp $ 
   4 /* ###################################################################### 
   8    The worker process can startup either as a Configuration prober 
   9    or as a queue runner. As a configuration prober it only reads the 
  10    configuration message and  
  12    ##################################################################### */ 
  14 // Include Files                                                        /*{{{*/ 
  16 #pragma implementation "apt-pkg/acquire-worker.h" 
  18 #include <apt-pkg/acquire-worker.h> 
  19 #include <apt-pkg/acquire-item.h> 
  20 #include <apt-pkg/configuration.h> 
  21 #include <apt-pkg/error.h> 
  22 #include <apt-pkg/fileutl.h> 
  23 #include <apt-pkg/strutl.h> 
  35 // Worker::Worker - Constructor for Queue startup                       /*{{{*/ 
  36 // --------------------------------------------------------------------- 
  38 pkgAcquire::Worker::Worker(Queue 
*Q
,MethodConfig 
*Cnf
, 
  39                            pkgAcquireStatus 
*Log
) : Log(Log
) 
  51 // Worker::Worker - Constructor for method config startup               /*{{{*/ 
  52 // --------------------------------------------------------------------- 
  54 pkgAcquire::Worker::Worker(MethodConfig 
*Cnf
) 
  66 // Worker::Construct - Constructor helper                               /*{{{*/ 
  67 // --------------------------------------------------------------------- 
  69 void pkgAcquire::Worker::Construct() 
  78    Debug 
= _config
->FindB("Debug::pkgAcquire::Worker",false); 
  81 // Worker::~Worker - Destructor                                         /*{{{*/ 
  82 // --------------------------------------------------------------------- 
  84 pkgAcquire::Worker::~Worker() 
  91       /* Closing of stdin is the signal to exit and die when the process 
  92          indicates it needs cleanup */ 
  93       if (Config
->NeedsCleanup 
== false) 
  95       ExecWait(Process
,Access
.c_str(),true); 
  99 // Worker::Start - Start the worker process                             /*{{{*/ 
 100 // --------------------------------------------------------------------- 
 101 /* This forks the method and inits the communication channel */ 
 102 bool pkgAcquire::Worker::Start() 
 104    // Get the method path 
 105    string Method 
= _config
->FindDir("Dir::Bin::Methods") + Access
; 
 106    if (FileExists(Method
) == false) 
 107       return _error
->Error(_("The method driver %s could not be found."),Method
.c_str()); 
 110       clog 
<< "Starting method '" << Method 
<< '\'' << endl
; 
 113    int Pipes
[4] = {-1,-1,-1,-1}; 
 114    if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0) 
 116       _error
->Errno("pipe","Failed to create IPC pipe to subprocess"); 
 117       for (int I 
= 0; I 
!= 4; I
++) 
 121    for (int I 
= 0; I 
!= 4; I
++) 
 122       SetCloseExec(Pipes
[I
],true); 
 124    // Fork off the process 
 125    Process 
= ExecFork(); 
 127    // Spawn the subprocess 
 131       dup2(Pipes
[1],STDOUT_FILENO
); 
 132       dup2(Pipes
[2],STDIN_FILENO
); 
 133       dup2(((filebuf 
*)clog
.rdbuf())->fd(),STDERR_FILENO
); 
 134       SetCloseExec(STDOUT_FILENO
,false); 
 135       SetCloseExec(STDIN_FILENO
,false);       
 136       SetCloseExec(STDERR_FILENO
,false); 
 139       Args
[0] = Method
.c_str(); 
 141       execv(Args
[0],(char **)Args
); 
 142       cerr 
<< "Failed to exec method " << Args
[0] << endl
; 
 149    SetNonBlock(Pipes
[0],true); 
 150    SetNonBlock(Pipes
[3],true); 
 156    // Read the configuration data 
 157    if (WaitFd(InFd
) == false || 
 158        ReadMessages() == false) 
 159       return _error
->Error(_("Method %s did not start correctly"),Method
.c_str()); 
 168 // Worker::ReadMessages - Read all pending messages into the list       /*{{{*/ 
 169 // --------------------------------------------------------------------- 
 171 bool pkgAcquire::Worker::ReadMessages() 
 173    if (::ReadMessages(InFd
,MessageQueue
) == false) 
 174       return MethodFailure(); 
 178 // Worker::RunMessage - Empty the message queue                         /*{{{*/ 
 179 // --------------------------------------------------------------------- 
 180 /* This takes the messages from the message queue and runs them through 
 181    the parsers in order. */ 
 182 bool pkgAcquire::Worker::RunMessages() 
 184    while (MessageQueue
.empty() == false) 
 186       string Message 
= MessageQueue
.front(); 
 187       MessageQueue
.erase(MessageQueue
.begin()); 
 190          clog 
<< " <- " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 192       // Fetch the message number 
 194       int Number 
= strtol(Message
.c_str(),&End
,10); 
 195       if (End 
== Message
.c_str()) 
 196          return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str()); 
 198       string URI 
= LookupTag(Message
,"URI"); 
 199       pkgAcquire::Queue::QItem 
*Itm 
= 0; 
 200       if (URI
.empty() == false) 
 201          Itm 
= OwnerQ
->FindItem(URI
,this); 
 203       // Determine the message number and dispatch 
 208          if (Capabilities(Message
) == false) 
 209             return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str()); 
 215             clog 
<< " <- (log) " << LookupTag(Message
,"Message") << endl
; 
 220          Status 
= LookupTag(Message
,"Message"); 
 228                _error
->Error("Method gave invalid 200 URI Start message"); 
 234             TotalSize 
= atoi(LookupTag(Message
,"Size","0").c_str()); 
 235             ResumePoint 
= atoi(LookupTag(Message
,"Resume-Point","0").c_str()); 
 236             Itm
->Owner
->Start(Message
,atoi(LookupTag(Message
,"Size","0").c_str())); 
 238             // Display update before completion 
 239             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 240                Log
->Pulse(Itm
->Owner
->GetOwner()); 
 253                _error
->Error("Method gave invalid 201 URI Done message"); 
 257             pkgAcquire::Item 
*Owner 
= Itm
->Owner
; 
 258             pkgAcquire::ItemDesc Desc 
= *Itm
; 
 260             // Display update before completion 
 261             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 262                Log
->Pulse(Owner
->GetOwner()); 
 264             OwnerQ
->ItemDone(Itm
); 
 265             if (TotalSize 
!= 0 && 
 266                 (unsigned)atoi(LookupTag(Message
,"Size","0").c_str()) != TotalSize
) 
 267                _error
->Warning("Bizarre Error - File size is not what the server reported %s %lu", 
 268                                LookupTag(Message
,"Size","0").c_str(),TotalSize
); 
 270             Owner
->Done(Message
,atoi(LookupTag(Message
,"Size","0").c_str()), 
 271                         LookupTag(Message
,"MD5-Hash"),Config
); 
 274             // Log that we are done 
 277                if (StringToBool(LookupTag(Message
,"IMS-Hit"),false) == true || 
 278                    StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false) == true) 
 280                   /* Hide 'hits' for local only sources - we also manage to 
 282                   if (Config
->LocalOnly 
== false) 
 296                _error
->Error("Method gave invalid 400 URI Failure message"); 
 300             // Display update before completion 
 301             if (Log 
!= 0 && Log
->MorePulses 
== true) 
 302                Log
->Pulse(Itm
->Owner
->GetOwner()); 
 304             pkgAcquire::Item 
*Owner 
= Itm
->Owner
; 
 305             pkgAcquire::ItemDesc Desc 
= *Itm
; 
 306             OwnerQ
->ItemDone(Itm
); 
 307             Owner
->Failed(Message
,Config
); 
 316          // 401 General Failure 
 318          _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str()); 
 323          MediaChange(Message
);  
 330 // Worker::Capabilities - 100 Capabilities handler                      /*{{{*/ 
 331 // --------------------------------------------------------------------- 
 332 /* This parses the capabilities message and dumps it into the configuration 
 334 bool pkgAcquire::Worker::Capabilities(string Message
) 
 339    Config
->Version 
= LookupTag(Message
,"Version"); 
 340    Config
->SingleInstance 
= StringToBool(LookupTag(Message
,"Single-Instance"),false); 
 341    Config
->Pipeline 
= StringToBool(LookupTag(Message
,"Pipeline"),false); 
 342    Config
->SendConfig 
= StringToBool(LookupTag(Message
,"Send-Config"),false); 
 343    Config
->LocalOnly 
= StringToBool(LookupTag(Message
,"Local-Only"),false); 
 344    Config
->NeedsCleanup 
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false); 
 345    Config
->Removable 
= StringToBool(LookupTag(Message
,"Removable"),false); 
 350       clog 
<< "Configured access method " << Config
->Access 
<< endl
; 
 351       clog 
<< "Version:" << Config
->Version 
<< 
 352               " SingleInstance:" << Config
->SingleInstance 
<< 
 353               " Pipeline:" << Config
->Pipeline 
<<  
 354               " SendConfig:" << Config
->SendConfig 
<<  
 355               " LocalOnly: " << Config
->LocalOnly 
<<  
 356               " NeedsCleanup: " << Config
->NeedsCleanup 
<<  
 357               " Removable: " << Config
->Removable 
<< endl
; 
 363 // Worker::MediaChange - Request a media change                         /*{{{*/ 
 364 // --------------------------------------------------------------------- 
 366 bool pkgAcquire::Worker::MediaChange(string Message
) 
 368    if (Log 
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"), 
 369                                     LookupTag(Message
,"Drive")) == false) 
 372       sprintf(S
,"603 Media Changed\nFailed: true\n\n"); 
 374          clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 381    sprintf(S
,"603 Media Changed\n\n"); 
 383       clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 389 // Worker::SendConfiguration - Send the config to the method            /*{{{*/ 
 390 // --------------------------------------------------------------------- 
 392 bool pkgAcquire::Worker::SendConfiguration() 
 394    if (Config
->SendConfig 
== false) 
 400    string Message 
= "601 Configuration\n"; 
 401    Message
.reserve(2000); 
 403    /* Write out all of the configuration directives by walking the  
 404       configuration tree */ 
 405    const Configuration::Item 
*Top 
= _config
->Tree(0); 
 408       if (Top
->Value
.empty() == false) 
 410          string Line 
= "Config-Item: " + QuoteString(Top
->FullTag(),"=\"\n") + "="; 
 411          Line 
+= QuoteString(Top
->Value
,"\n") + '\n'; 
 421       while (Top 
!= 0 && Top
->Next 
== 0) 
 429       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 436 // Worker::QueueItem - Add an item to the outbound queue                /*{{{*/ 
 437 // --------------------------------------------------------------------- 
 438 /* Send a URI Acquire message to the method */ 
 439 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem 
*Item
) 
 444    string Message 
= "600 URI Acquire\n"; 
 445    Message
.reserve(300); 
 446    Message 
+= "URI: " + Item
->URI
; 
 447    Message 
+= "\nFilename: " + Item
->Owner
->DestFile
; 
 448    Message 
+= Item
->Owner
->Custom600Headers(); 
 452       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 459 // Worker::OutFdRead - Out bound FD is ready                            /*{{{*/ 
 460 // --------------------------------------------------------------------- 
 462 bool pkgAcquire::Worker::OutFdReady() 
 467       Res 
= write(OutFd
,OutQueue
.begin(),OutQueue
.length()); 
 469    while (Res 
< 0 && errno 
== EINTR
); 
 472       return MethodFailure(); 
 474    // Hmm.. this should never happen. 
 478    OutQueue
.erase(0,Res
); 
 479    if (OutQueue
.empty() == true) 
 485 // Worker::InFdRead - In bound FD is ready                              /*{{{*/ 
 486 // --------------------------------------------------------------------- 
 488 bool pkgAcquire::Worker::InFdReady() 
 490    if (ReadMessages() == false) 
 496 // Worker::MethodFailure - Called when the method fails                 /*{{{*/ 
 497 // --------------------------------------------------------------------- 
 498 /* This is called when the method is belived to have failed, probably because 
 500 bool pkgAcquire::Worker::MethodFailure() 
 502    _error
->Error("Method %s has died unexpectedly!",Access
.c_str()); 
 504    ExecWait(Process
,Access
.c_str(),true); 
 513    MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end()); 
 518 // Worker::Pulse - Called periodically                                  /*{{{*/ 
 519 // --------------------------------------------------------------------- 
 521 void pkgAcquire::Worker::Pulse() 
 523    if (CurrentItem 
== 0) 
 527    if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0) 
 529    CurrentSize 
= Buf
.st_size
; 
 531    // Hmm? Should not happen... 
 532    if (CurrentSize 
> TotalSize 
&& TotalSize 
!= 0) 
 533       TotalSize 
= CurrentSize
; 
 536 // Worker::ItemDone - Called when the current item is finished          /*{{{*/ 
 537 // --------------------------------------------------------------------- 
 539 void pkgAcquire::Worker::ItemDone()