1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
43 // Worker::Worker - Constructor for Queue startup /*{{{*/
44 // ---------------------------------------------------------------------
46 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
47 pkgAcquireStatus
*Log
) : Log(Log
)
59 // Worker::Worker - Constructor for method config startup /*{{{*/
60 // ---------------------------------------------------------------------
62 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
)
74 // Worker::Construct - Constructor helper /*{{{*/
75 // ---------------------------------------------------------------------
77 void pkgAcquire::Worker::Construct()
86 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
89 // Worker::~Worker - Destructor /*{{{*/
90 // ---------------------------------------------------------------------
92 pkgAcquire::Worker::~Worker()
99 /* Closing of stdin is the signal to exit and die when the process
100 indicates it needs cleanup */
101 if (Config
->NeedsCleanup
== false)
102 kill(Process
,SIGINT
);
103 ExecWait(Process
,Access
.c_str(),true);
107 // Worker::Start - Start the worker process /*{{{*/
108 // ---------------------------------------------------------------------
109 /* This forks the method and inits the communication channel */
110 bool pkgAcquire::Worker::Start()
112 // Get the method path
113 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
114 if (FileExists(Method
) == false)
116 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
117 if (Access
== "https")
118 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
123 clog
<< "Starting method '" << Method
<< '\'' << endl
;
126 int Pipes
[4] = {-1,-1,-1,-1};
127 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
129 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
130 for (int I
= 0; I
!= 4; I
++)
134 for (int I
= 0; I
!= 4; I
++)
135 SetCloseExec(Pipes
[I
],true);
137 // Fork off the process
138 Process
= ExecFork();
142 dup2(Pipes
[1],STDOUT_FILENO
);
143 dup2(Pipes
[2],STDIN_FILENO
);
144 SetCloseExec(STDOUT_FILENO
,false);
145 SetCloseExec(STDIN_FILENO
,false);
146 SetCloseExec(STDERR_FILENO
,false);
149 Args
[0] = Method
.c_str();
151 execv(Args
[0],(char **)Args
);
152 cerr
<< "Failed to exec method " << Args
[0] << endl
;
159 SetNonBlock(Pipes
[0],true);
160 SetNonBlock(Pipes
[3],true);
166 // Read the configuration data
167 if (WaitFd(InFd
) == false ||
168 ReadMessages() == false)
169 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
178 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
179 // ---------------------------------------------------------------------
181 bool pkgAcquire::Worker::ReadMessages()
183 if (::ReadMessages(InFd
,MessageQueue
) == false)
184 return MethodFailure();
188 // Worker::RunMessage - Empty the message queue /*{{{*/
189 // ---------------------------------------------------------------------
190 /* This takes the messages from the message queue and runs them through
191 the parsers in order. */
192 bool pkgAcquire::Worker::RunMessages()
194 while (MessageQueue
.empty() == false)
196 string Message
= MessageQueue
.front();
197 MessageQueue
.erase(MessageQueue
.begin());
200 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
202 // Fetch the message number
204 int Number
= strtol(Message
.c_str(),&End
,10);
205 if (End
== Message
.c_str())
206 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
208 string URI
= LookupTag(Message
,"URI");
209 pkgAcquire::Queue::QItem
*Itm
= 0;
210 if (URI
.empty() == false)
211 Itm
= OwnerQ
->FindItem(URI
,this);
213 // update used mirror
214 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
215 if (!UsedMirror
.empty() &&
217 Itm
->Description
.find(" ") != string::npos
)
219 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
220 // FIXME: will we need this as well?
221 //Itm->ShortDesc = UsedMirror;
224 // Determine the message number and dispatch
229 if (Capabilities(Message
) == false)
230 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
236 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
241 Status
= LookupTag(Message
,"Message");
249 _error
->Error("Method gave invalid 103 Redirect message");
253 string NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
258 pkgAcquire::Item
*Owner
= Itm
->Owner
;
259 pkgAcquire::ItemDesc Desc
= *Itm
;
261 // Change the status so that it can be dequeued
262 Owner
->Status
= pkgAcquire::Item::StatIdle
;
263 // Mark the item as done (taking care of all queues)
264 // and then put it in the main queue again
265 OwnerQ
->ItemDone(Itm
);
266 OwnerQ
->Owner
->Enqueue(Desc
);
278 _error
->Error("Method gave invalid 200 URI Start message");
284 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
285 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
286 Itm
->Owner
->Start(Message
,strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10));
288 // Display update before completion
289 if (Log
!= 0 && Log
->MorePulses
== true)
290 Log
->Pulse(Itm
->Owner
->GetOwner());
303 _error
->Error("Method gave invalid 201 URI Done message");
307 pkgAcquire::Item
*Owner
= Itm
->Owner
;
308 pkgAcquire::ItemDesc Desc
= *Itm
;
310 // Display update before completion
311 if (Log
!= 0 && Log
->MorePulses
== true)
312 Log
->Pulse(Owner
->GetOwner());
314 OwnerQ
->ItemDone(Itm
);
315 unsigned long long const ServerSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
316 bool isHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
317 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
318 // Using the https method the server might return 200, but the
319 // If-Modified-Since condition is not satsified, libcurl will
320 // discard the download. In this case, however, TotalSize will be
321 // set to the actual size of the file, while ServerSize will be set
322 // to 0. Therefore, if the item is marked as a hit and the
323 // downloaded size (ServerSize) is 0, we ignore TotalSize.
324 if (TotalSize
!= 0 && (!isHit
|| ServerSize
!= 0) && ServerSize
!= TotalSize
)
325 _error
->Warning("Size of file %s is not what the server reported %s %llu",
326 Owner
->DestFile
.c_str(), LookupTag(Message
,"Size","0").c_str(),TotalSize
);
328 // see if there is a hash to verify
330 HashString
expectedHash(Owner
->HashSum());
331 if(!expectedHash
.empty())
333 string hashTag
= expectedHash
.HashType()+"-Hash";
334 string hashSum
= LookupTag(Message
, hashTag
.c_str());
336 RecivedHash
= expectedHash
.HashType() + ":" + hashSum
;
337 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
339 clog
<< "201 URI Done: " << Owner
->DescURI() << endl
340 << "RecivedHash: " << RecivedHash
<< endl
341 << "ExpectedHash: " << expectedHash
.toStr()
345 Owner
->Done(Message
, ServerSize
, RecivedHash
.c_str(), Config
);
348 // Log that we are done
353 /* Hide 'hits' for local only sources - we also manage to
355 if (Config
->LocalOnly
== false)
369 _error
->Error("Method gave invalid 400 URI Failure message");
373 // Display update before completion
374 if (Log
!= 0 && Log
->MorePulses
== true)
375 Log
->Pulse(Itm
->Owner
->GetOwner());
377 pkgAcquire::Item
*Owner
= Itm
->Owner
;
378 pkgAcquire::ItemDesc Desc
= *Itm
;
379 OwnerQ
->ItemDone(Itm
);
382 if(LookupTag(Message
,"FailReason") == "Timeout" ||
383 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
384 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
385 LookupTag(Message
,"FailReason") == "ConnectionRefused")
386 Owner
->Status
= pkgAcquire::Item::StatTransientNetworkError
;
388 Owner
->Failed(Message
,Config
);
397 // 401 General Failure
399 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
404 MediaChange(Message
);
411 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
412 // ---------------------------------------------------------------------
413 /* This parses the capabilities message and dumps it into the configuration
415 bool pkgAcquire::Worker::Capabilities(string Message
)
420 Config
->Version
= LookupTag(Message
,"Version");
421 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
422 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
423 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
424 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
425 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
426 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
431 clog
<< "Configured access method " << Config
->Access
<< endl
;
432 clog
<< "Version:" << Config
->Version
<<
433 " SingleInstance:" << Config
->SingleInstance
<<
434 " Pipeline:" << Config
->Pipeline
<<
435 " SendConfig:" << Config
->SendConfig
<<
436 " LocalOnly: " << Config
->LocalOnly
<<
437 " NeedsCleanup: " << Config
->NeedsCleanup
<<
438 " Removable: " << Config
->Removable
<< endl
;
444 // Worker::MediaChange - Request a media change /*{{{*/
445 // ---------------------------------------------------------------------
447 bool pkgAcquire::Worker::MediaChange(string Message
)
449 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
452 string Media
= LookupTag(Message
,"Media");
453 string Drive
= LookupTag(Message
,"Drive");
454 ostringstream msg
,status
;
455 ioprintf(msg
,_("Please insert the disc labeled: "
457 "in the drive '%s' and press enter."),
458 Media
.c_str(),Drive
.c_str());
459 status
<< "media-change: " // message
460 << Media
<< ":" // media
461 << Drive
<< ":" // drive
462 << msg
.str() // l10n message
465 std::string
const dlstatus
= status
.str();
466 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
469 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
470 LookupTag(Message
,"Drive")) == false)
473 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
475 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
482 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
484 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
490 // Worker::SendConfiguration - Send the config to the method /*{{{*/
491 // ---------------------------------------------------------------------
493 bool pkgAcquire::Worker::SendConfiguration()
495 if (Config
->SendConfig
== false)
501 /* Write out all of the configuration directives by walking the
502 configuration tree */
503 std::ostringstream Message
;
504 Message
<< "601 Configuration\n";
505 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
509 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
510 OutQueue
+= Message
.str();
516 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
517 // ---------------------------------------------------------------------
518 /* Send a URI Acquire message to the method */
519 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
524 string Message
= "600 URI Acquire\n";
525 Message
.reserve(300);
526 Message
+= "URI: " + Item
->URI
;
527 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
528 Message
+= Item
->Owner
->Custom600Headers();
532 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
539 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
540 // ---------------------------------------------------------------------
542 bool pkgAcquire::Worker::OutFdReady()
547 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
549 while (Res
< 0 && errno
== EINTR
);
552 return MethodFailure();
554 OutQueue
.erase(0,Res
);
555 if (OutQueue
.empty() == true)
561 // Worker::InFdRead - In bound FD is ready /*{{{*/
562 // ---------------------------------------------------------------------
564 bool pkgAcquire::Worker::InFdReady()
566 if (ReadMessages() == false)
572 // Worker::MethodFailure - Called when the method fails /*{{{*/
573 // ---------------------------------------------------------------------
574 /* This is called when the method is believed to have failed, probably because
576 bool pkgAcquire::Worker::MethodFailure()
578 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
580 // do not reap the child here to show meaningfull error to the user
581 ExecWait(Process
,Access
.c_str(),false);
590 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
595 // Worker::Pulse - Called periodically /*{{{*/
596 // ---------------------------------------------------------------------
598 void pkgAcquire::Worker::Pulse()
600 if (CurrentItem
== 0)
604 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
606 CurrentSize
= Buf
.st_size
;
608 // Hmm? Should not happen...
609 if (CurrentSize
> TotalSize
&& TotalSize
!= 0)
610 TotalSize
= CurrentSize
;
613 // Worker::ItemDone - Called when the current item is finished /*{{{*/
614 // ---------------------------------------------------------------------
616 void pkgAcquire::Worker::ItemDone()