1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
15 #include <apt-pkg/acquire-worker.h>
16 #include <apt-pkg/acquire-item.h>
17 #include <apt-pkg/configuration.h>
18 #include <apt-pkg/error.h>
19 #include <apt-pkg/fileutl.h>
20 #include <apt-pkg/strutl.h>
38 // Worker::Worker - Constructor for Queue startup /*{{{*/
39 // ---------------------------------------------------------------------
41 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
42 pkgAcquireStatus
*Log
) : Log(Log
)
54 // Worker::Worker - Constructor for method config startup /*{{{*/
55 // ---------------------------------------------------------------------
57 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
)
69 // Worker::Construct - Constructor helper /*{{{*/
70 // ---------------------------------------------------------------------
72 void pkgAcquire::Worker::Construct()
81 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
84 // Worker::~Worker - Destructor /*{{{*/
85 // ---------------------------------------------------------------------
87 pkgAcquire::Worker::~Worker()
94 /* Closing of stdin is the signal to exit and die when the process
95 indicates it needs cleanup */
96 if (Config
->NeedsCleanup
== false)
98 ExecWait(Process
,Access
.c_str(),true);
102 // Worker::Start - Start the worker process /*{{{*/
103 // ---------------------------------------------------------------------
104 /* This forks the method and inits the communication channel */
105 bool pkgAcquire::Worker::Start()
107 // Get the method path
108 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
109 if (FileExists(Method
) == false)
110 return _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
113 clog
<< "Starting method '" << Method
<< '\'' << endl
;
116 int Pipes
[4] = {-1,-1,-1,-1};
117 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
119 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
120 for (int I
= 0; I
!= 4; I
++)
124 for (int I
= 0; I
!= 4; I
++)
125 SetCloseExec(Pipes
[I
],true);
127 // Fork off the process
128 Process
= ExecFork();
132 dup2(Pipes
[1],STDOUT_FILENO
);
133 dup2(Pipes
[2],STDIN_FILENO
);
134 SetCloseExec(STDOUT_FILENO
,false);
135 SetCloseExec(STDIN_FILENO
,false);
136 SetCloseExec(STDERR_FILENO
,false);
139 Args
[0] = Method
.c_str();
141 execv(Args
[0],(char **)Args
);
142 cerr
<< "Failed to exec method " << Args
[0] << endl
;
149 SetNonBlock(Pipes
[0],true);
150 SetNonBlock(Pipes
[3],true);
156 // Read the configuration data
157 if (WaitFd(InFd
) == false ||
158 ReadMessages() == false)
159 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
168 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
169 // ---------------------------------------------------------------------
171 bool pkgAcquire::Worker::ReadMessages()
173 if (::ReadMessages(InFd
,MessageQueue
) == false)
174 return MethodFailure();
178 // Worker::RunMessage - Empty the message queue /*{{{*/
179 // ---------------------------------------------------------------------
180 /* This takes the messages from the message queue and runs them through
181 the parsers in order. */
182 bool pkgAcquire::Worker::RunMessages()
184 while (MessageQueue
.empty() == false)
186 string Message
= MessageQueue
.front();
187 MessageQueue
.erase(MessageQueue
.begin());
190 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
192 // Fetch the message number
194 int Number
= strtol(Message
.c_str(),&End
,10);
195 if (End
== Message
.c_str())
196 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
198 string URI
= LookupTag(Message
,"URI");
199 pkgAcquire::Queue::QItem
*Itm
= 0;
200 if (URI
.empty() == false)
201 Itm
= OwnerQ
->FindItem(URI
,this);
203 // update used mirror
204 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
205 if (!UsedMirror
.empty() &&
207 Itm
->Description
.find(" ") != string::npos
)
209 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
210 // FIXME: will we need this as well?
211 //Itm->ShortDesc = UsedMirror;
214 // Determine the message number and dispatch
219 if (Capabilities(Message
) == false)
220 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
226 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
231 Status
= LookupTag(Message
,"Message");
239 _error
->Error("Method gave invalid 103 Redirect message");
243 string NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
253 _error
->Error("Method gave invalid 200 URI Start message");
259 TotalSize
= atoi(LookupTag(Message
,"Size","0").c_str());
260 ResumePoint
= atoi(LookupTag(Message
,"Resume-Point","0").c_str());
261 Itm
->Owner
->Start(Message
,atoi(LookupTag(Message
,"Size","0").c_str()));
263 // Display update before completion
264 if (Log
!= 0 && Log
->MorePulses
== true)
265 Log
->Pulse(Itm
->Owner
->GetOwner());
278 _error
->Error("Method gave invalid 201 URI Done message");
282 pkgAcquire::Item
*Owner
= Itm
->Owner
;
283 pkgAcquire::ItemDesc Desc
= *Itm
;
285 // Display update before completion
286 if (Log
!= 0 && Log
->MorePulses
== true)
287 Log
->Pulse(Owner
->GetOwner());
289 OwnerQ
->ItemDone(Itm
);
290 unsigned long long const ServerSize
= atoll(LookupTag(Message
,"Size","0").c_str());
291 if (TotalSize
!= 0 && ServerSize
!= TotalSize
)
292 _error
->Warning("Size of file %s is not what the server reported %s %llu",
293 Owner
->DestFile
.c_str(), LookupTag(Message
,"Size","0").c_str(),TotalSize
);
295 // see if there is a hash to verify
297 HashString
expectedHash(Owner
->HashSum());
298 if(!expectedHash
.empty())
300 string hashTag
= expectedHash
.HashType()+"-Hash";
301 string hashSum
= LookupTag(Message
, hashTag
.c_str());
303 RecivedHash
= expectedHash
.HashType() + ":" + hashSum
;
304 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
306 clog
<< "201 URI Done: " << Owner
->DescURI() << endl
307 << "RecivedHash: " << RecivedHash
<< endl
308 << "ExpectedHash: " << expectedHash
.toStr()
312 Owner
->Done(Message
, ServerSize
, RecivedHash
.c_str(), Config
);
315 // Log that we are done
318 if (StringToBool(LookupTag(Message
,"IMS-Hit"),false) == true ||
319 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false) == true)
321 /* Hide 'hits' for local only sources - we also manage to
323 if (Config
->LocalOnly
== false)
337 _error
->Error("Method gave invalid 400 URI Failure message");
341 // Display update before completion
342 if (Log
!= 0 && Log
->MorePulses
== true)
343 Log
->Pulse(Itm
->Owner
->GetOwner());
345 pkgAcquire::Item
*Owner
= Itm
->Owner
;
346 pkgAcquire::ItemDesc Desc
= *Itm
;
347 OwnerQ
->ItemDone(Itm
);
350 if(LookupTag(Message
,"FailReason") == "Timeout" ||
351 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
352 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
353 LookupTag(Message
,"FailReason") == "ConnectionRefused")
354 Owner
->Status
= pkgAcquire::Item::StatTransientNetworkError
;
356 Owner
->Failed(Message
,Config
);
365 // 401 General Failure
367 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
372 MediaChange(Message
);
379 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
380 // ---------------------------------------------------------------------
381 /* This parses the capabilities message and dumps it into the configuration
383 bool pkgAcquire::Worker::Capabilities(string Message
)
388 Config
->Version
= LookupTag(Message
,"Version");
389 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
390 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
391 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
392 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
393 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
394 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
399 clog
<< "Configured access method " << Config
->Access
<< endl
;
400 clog
<< "Version:" << Config
->Version
<<
401 " SingleInstance:" << Config
->SingleInstance
<<
402 " Pipeline:" << Config
->Pipeline
<<
403 " SendConfig:" << Config
->SendConfig
<<
404 " LocalOnly: " << Config
->LocalOnly
<<
405 " NeedsCleanup: " << Config
->NeedsCleanup
<<
406 " Removable: " << Config
->Removable
<< endl
;
412 // Worker::MediaChange - Request a media change /*{{{*/
413 // ---------------------------------------------------------------------
415 bool pkgAcquire::Worker::MediaChange(string Message
)
417 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
420 string Media
= LookupTag(Message
,"Media");
421 string Drive
= LookupTag(Message
,"Drive");
422 ostringstream msg
,status
;
423 ioprintf(msg
,_("Please insert the disc labeled: "
425 "in the drive '%s' and press enter."),
426 Media
.c_str(),Drive
.c_str());
427 status
<< "media-change: " // message
428 << Media
<< ":" // media
429 << Drive
<< ":" // drive
430 << msg
.str() // l10n message
432 write(status_fd
, status
.str().c_str(), status
.str().size());
435 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
436 LookupTag(Message
,"Drive")) == false)
439 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
441 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
448 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
450 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
456 // Worker::SendConfiguration - Send the config to the method /*{{{*/
457 // ---------------------------------------------------------------------
459 bool pkgAcquire::Worker::SendConfiguration()
461 if (Config
->SendConfig
== false)
467 string Message
= "601 Configuration\n";
468 Message
.reserve(2000);
470 /* Write out all of the configuration directives by walking the
471 configuration tree */
472 const Configuration::Item
*Top
= _config
->Tree(0);
475 if (Top
->Value
.empty() == false)
477 string Line
= "Config-Item: " + QuoteString(Top
->FullTag(),"=\"\n") + "=";
478 Line
+= QuoteString(Top
->Value
,"\n") + '\n';
488 while (Top
!= 0 && Top
->Next
== 0)
496 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
503 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
504 // ---------------------------------------------------------------------
505 /* Send a URI Acquire message to the method */
506 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
511 string Message
= "600 URI Acquire\n";
512 Message
.reserve(300);
513 Message
+= "URI: " + Item
->URI
;
514 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
515 Message
+= Item
->Owner
->Custom600Headers();
519 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
526 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
527 // ---------------------------------------------------------------------
529 bool pkgAcquire::Worker::OutFdReady()
534 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
536 while (Res
< 0 && errno
== EINTR
);
539 return MethodFailure();
541 OutQueue
.erase(0,Res
);
542 if (OutQueue
.empty() == true)
548 // Worker::InFdRead - In bound FD is ready /*{{{*/
549 // ---------------------------------------------------------------------
551 bool pkgAcquire::Worker::InFdReady()
553 if (ReadMessages() == false)
559 // Worker::MethodFailure - Called when the method fails /*{{{*/
560 // ---------------------------------------------------------------------
561 /* This is called when the method is belived to have failed, probably because
563 bool pkgAcquire::Worker::MethodFailure()
565 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
567 // do not reap the child here to show meaningfull error to the user
568 ExecWait(Process
,Access
.c_str(),false);
577 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
582 // Worker::Pulse - Called periodically /*{{{*/
583 // ---------------------------------------------------------------------
585 void pkgAcquire::Worker::Pulse()
587 if (CurrentItem
== 0)
591 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
593 CurrentSize
= Buf
.st_size
;
595 // Hmm? Should not happen...
596 if (CurrentSize
> TotalSize
&& TotalSize
!= 0)
597 TotalSize
= CurrentSize
;
600 // Worker::ItemDone - Called when the current item is finished /*{{{*/
601 // ---------------------------------------------------------------------
603 void pkgAcquire::Worker::ItemDone()