]>
git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
   1 // -*- mode: cpp; mode: fold -*- 
   3 // $Id: acquire-worker.cc,v 1.22 1999/05/23 06:47:43 jgg Exp $ 
   4 /* ###################################################################### 
   8    The worker process can startup either as a Configuration prober 
   9    or as a queue runner. As a configuration prober it only reads the 
  10    configuration message and  
  12    ##################################################################### */ 
  14 // Include Files                                                        /*{{{*/ 
  16 #pragma implementation "apt-pkg/acquire-worker.h" 
  18 #include <apt-pkg/acquire-worker.h> 
  19 #include <apt-pkg/acquire-item.h> 
  20 #include <apt-pkg/configuration.h> 
  21 #include <apt-pkg/error.h> 
  22 #include <apt-pkg/fileutl.h> 
  23 #include <apt-pkg/strutl.h> 
  34 // Worker::Worker - Constructor for Queue startup                       /*{{{*/ 
  35 // --------------------------------------------------------------------- 
  37 pkgAcquire::Worker::Worker(Queue 
*Q
,MethodConfig 
*Cnf
, 
  38                            pkgAcquireStatus 
*Log
) : Log(Log
) 
  48 // Worker::Worker - Constructor for method config startup               /*{{{*/ 
  49 // --------------------------------------------------------------------- 
  51 pkgAcquire::Worker::Worker(MethodConfig 
*Cnf
) 
  61 // Worker::Construct - Constructor helper                               /*{{{*/ 
  62 // --------------------------------------------------------------------- 
  64 void pkgAcquire::Worker::Construct() 
  73    Debug 
= _config
->FindB("Debug::pkgAcquire::Worker",false); 
  76 // Worker::~Worker - Destructor                                         /*{{{*/ 
  77 // --------------------------------------------------------------------- 
  79 pkgAcquire::Worker::~Worker() 
  87       if (waitpid(Process
,0,0) != Process
) 
  88          _error
->Warning("I waited but nothing was there!"); 
  92 // Worker::Start - Start the worker process                             /*{{{*/ 
  93 // --------------------------------------------------------------------- 
  94 /* This forks the method and inits the communication channel */ 
  95 bool pkgAcquire::Worker::Start() 
  97    // Get the method path 
  98    string Method 
= _config
->FindDir("Dir::Bin::Methods") + Access
; 
  99    if (FileExists(Method
) == false) 
 100       return _error
->Error("The method driver %s could not be found.",Method
.c_str()); 
 103       clog 
<< "Starting method '" << Method 
<< '\'' << endl
; 
 106    int Pipes
[4] = {-1,-1,-1,-1}; 
 107    if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0) 
 109       _error
->Errno("pipe","Failed to create IPC pipe to subprocess"); 
 110       for (int I 
= 0; I 
!= 4; I
++) 
 114    for (int I 
= 0; I 
!= 4; I
++) 
 115       SetCloseExec(Pipes
[0],true); 
 117    // Fork off the process 
 118    Process 
= ExecFork(); 
 120    // Spawn the subprocess 
 124       dup2(Pipes
[1],STDOUT_FILENO
); 
 125       dup2(Pipes
[2],STDIN_FILENO
); 
 126       dup2(((filebuf 
*)clog
.rdbuf())->fd(),STDERR_FILENO
); 
 127       SetCloseExec(STDOUT_FILENO
,false); 
 128       SetCloseExec(STDIN_FILENO
,false);       
 129       SetCloseExec(STDERR_FILENO
,false); 
 132       Args
[0] = Method
.c_str(); 
 134       execv(Args
[0],(char **)Args
); 
 135       cerr 
<< "Failed to exec method " << Args
[0] << endl
; 
 142    SetNonBlock(Pipes
[0],true); 
 143    SetNonBlock(Pipes
[3],true); 
 149    // Read the configuration data 
 150    if (WaitFd(InFd
) == false || 
 151        ReadMessages() == false) 
 152       return _error
->Error("Method %s did not start correctly",Method
.c_str()); 
 161 // Worker::ReadMessages - Read all pending messages into the list       /*{{{*/ 
 162 // --------------------------------------------------------------------- 
 164 bool pkgAcquire::Worker::ReadMessages() 
 166    if (::ReadMessages(InFd
,MessageQueue
) == false) 
 167       return MethodFailure(); 
 171 // Worker::RunMessage - Empty the message queue                         /*{{{*/ 
 172 // --------------------------------------------------------------------- 
 173 /* This takes the messages from the message queue and runs them through 
 174    the parsers in order. */ 
 175 bool pkgAcquire::Worker::RunMessages() 
 177    while (MessageQueue
.empty() == false) 
 179       string Message 
= MessageQueue
.front(); 
 180       MessageQueue
.erase(MessageQueue
.begin()); 
 183          clog 
<< " <- " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 185       // Fetch the message number 
 187       int Number 
= strtol(Message
.c_str(),&End
,10); 
 188       if (End 
== Message
.c_str()) 
 189          return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str()); 
 191       string URI 
= LookupTag(Message
,"URI"); 
 192       pkgAcquire::Queue::QItem 
*Itm 
= 0; 
 193       if (URI
.empty() == false) 
 194          Itm 
= OwnerQ
->FindItem(URI
,this); 
 196       // Determine the message number and dispatch 
 201          if (Capabilities(Message
) == false) 
 202             return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str()); 
 208             clog 
<< " <- (log) " << LookupTag(Message
,"Message") << endl
; 
 213          Status 
= LookupTag(Message
,"Message"); 
 221                _error
->Error("Method gave invalid 200 URI Start message"); 
 227             TotalSize 
= atoi(LookupTag(Message
,"Size","0").c_str()); 
 228             ResumePoint 
= atoi(LookupTag(Message
,"Resume-Point","0").c_str()); 
 229             Itm
->Owner
->Start(Message
,atoi(LookupTag(Message
,"Size","0").c_str())); 
 242                _error
->Error("Method gave invalid 201 URI Done message"); 
 246             pkgAcquire::Item 
*Owner 
= Itm
->Owner
; 
 247             pkgAcquire::ItemDesc Desc 
= *Itm
; 
 248             OwnerQ
->ItemDone(Itm
); 
 249             Owner
->Done(Message
,atoi(LookupTag(Message
,"Size","0").c_str()), 
 250                                           LookupTag(Message
,"MD5-Hash")); 
 253             // Log that we are done 
 256                if (StringToBool(LookupTag(Message
,"IMS-Hit"),false) == true || 
 257                    StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false) == true) 
 259                   /* Hide 'hits' for local only sources - we also manage to 
 261                   if (Config
->LocalOnly 
== false) 
 275                _error
->Error("Method gave invalid 400 URI Failure message"); 
 279             pkgAcquire::Item 
*Owner 
= Itm
->Owner
; 
 280             pkgAcquire::ItemDesc Desc 
= *Itm
; 
 281             OwnerQ
->ItemDone(Itm
); 
 282             Owner
->Failed(Message
,Config
); 
 291          // 401 General Failure 
 293          _error
->Error("Method %s General failure: %s",LookupTag(Message
,"Message").c_str()); 
 298          MediaChange(Message
);  
 305 // Worker::Capabilities - 100 Capabilities handler                      /*{{{*/ 
 306 // --------------------------------------------------------------------- 
 307 /* This parses the capabilities message and dumps it into the configuration 
 309 bool pkgAcquire::Worker::Capabilities(string Message
) 
 314    Config
->Version 
= LookupTag(Message
,"Version"); 
 315    Config
->SingleInstance 
= StringToBool(LookupTag(Message
,"Single-Instance"),false); 
 316    Config
->Pipeline 
= StringToBool(LookupTag(Message
,"Pipeline"),false); 
 317    Config
->SendConfig 
= StringToBool(LookupTag(Message
,"Send-Config"),false); 
 318    Config
->LocalOnly 
= StringToBool(LookupTag(Message
,"Local-Only"),false); 
 323       clog 
<< "Configured access method " << Config
->Access 
<< endl
; 
 324       clog 
<< "Version:" << Config
->Version 
<< " SingleInstance:" << 
 325          Config
->SingleInstance 
<<  
 326          " Pipeline:" << Config
->Pipeline 
<< " SendConfig:" <<  
 327          Config
->SendConfig 
<< endl
; 
 333 // Worker::MediaChange - Request a media change                         /*{{{*/ 
 334 // --------------------------------------------------------------------- 
 336 bool pkgAcquire::Worker::MediaChange(string Message
) 
 338    if (Log 
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"), 
 339                                     LookupTag(Message
,"Drive")) == false) 
 342       sprintf(S
,"603 Media Changed\nFailed: true\n\n"); 
 344          clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 351    sprintf(S
,"603 Media Changed\n\n"); 
 353       clog 
<< " -> " << Access 
<< ':' << QuoteString(S
,"\n") << endl
; 
 359 // Worker::SendConfiguration - Send the config to the method            /*{{{*/ 
 360 // --------------------------------------------------------------------- 
 362 bool pkgAcquire::Worker::SendConfiguration() 
 364    if (Config
->SendConfig 
== false) 
 370    string Message 
= "601 Configuration\n"; 
 371    Message
.reserve(2000); 
 373    /* Write out all of the configuration directives by walking the  
 374       configuration tree */ 
 375    const Configuration::Item 
*Top 
= _config
->Tree(0); 
 378       if (Top
->Value
.empty() == false) 
 380          string Line 
= "Config-Item: " + Top
->FullTag() + "="; 
 381          Line 
+= QuoteString(Top
->Value
,"\n") + '\n'; 
 391       while (Top 
!= 0 && Top
->Next 
== 0) 
 399       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 406 // Worker::QueueItem - Add an item to the outbound queue                /*{{{*/ 
 407 // --------------------------------------------------------------------- 
 408 /* Send a URI Acquire message to the method */ 
 409 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem 
*Item
) 
 414    string Message 
= "600 URI Acquire\n"; 
 415    Message
.reserve(300); 
 416    Message 
+= "URI: " + Item
->URI
; 
 417    Message 
+= "\nFilename: " + Item
->Owner
->DestFile
; 
 418    Message 
+= Item
->Owner
->Custom600Headers(); 
 422       clog 
<< " -> " << Access 
<< ':' << QuoteString(Message
,"\n") << endl
; 
 429 // Worker::OutFdRead - Out bound FD is ready                            /*{{{*/ 
 430 // --------------------------------------------------------------------- 
 432 bool pkgAcquire::Worker::OutFdReady() 
 437       Res 
= write(OutFd
,OutQueue
.begin(),OutQueue
.length()); 
 439    while (Res 
< 0 && errno 
== EINTR
); 
 442       return MethodFailure(); 
 444    // Hmm.. this should never happen. 
 448    OutQueue
.erase(0,Res
); 
 449    if (OutQueue
.empty() == true) 
 455 // Worker::InFdRead - In bound FD is ready                              /*{{{*/ 
 456 // --------------------------------------------------------------------- 
 458 bool pkgAcquire::Worker::InFdReady() 
 460    if (ReadMessages() == false) 
 466 // Worker::MethodFailure - Called when the method fails                 /*{{{*/ 
 467 // --------------------------------------------------------------------- 
 468 /* This is called when the method is belived to have failed, probably because 
 470 bool pkgAcquire::Worker::MethodFailure() 
 472    _error
->Error("Method %s has died unexpectedly!",Access
.c_str()); 
 474    if (waitpid(Process
,0,0) != Process
) 
 475       _error
->Warning("I waited but nothing was there!"); 
 484    MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end()); 
 489 // Worker::Pulse - Called periodically                                  /*{{{*/ 
 490 // --------------------------------------------------------------------- 
 492 void pkgAcquire::Worker::Pulse() 
 494    if (CurrentItem 
== 0) 
 498    if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0) 
 500    CurrentSize 
= Buf
.st_size
; 
 503 // Worker::ItemDone - Called when the current item is finished          /*{{{*/ 
 504 // --------------------------------------------------------------------- 
 506 void pkgAcquire::Worker::ItemDone()