]>
git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
16 #pragma implementation "apt-pkg/acquire-worker.h"
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
41 // Worker::Worker - Constructor for Queue startup /*{{{*/
42 // ---------------------------------------------------------------------
44 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
45 pkgAcquireStatus
*Log
) : Log(Log
)
57 // Worker::Worker - Constructor for method config startup /*{{{*/
58 // ---------------------------------------------------------------------
60 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
)
72 // Worker::Construct - Constructor helper /*{{{*/
73 // ---------------------------------------------------------------------
75 void pkgAcquire::Worker::Construct()
84 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
87 // Worker::~Worker - Destructor /*{{{*/
88 // ---------------------------------------------------------------------
90 pkgAcquire::Worker::~Worker()
97 /* Closing of stdin is the signal to exit and die when the process
98 indicates it needs cleanup */
99 if (Config
->NeedsCleanup
== false)
100 kill(Process
,SIGINT
);
101 ExecWait(Process
,Access
.c_str(),true);
105 // Worker::Start - Start the worker process /*{{{*/
106 // ---------------------------------------------------------------------
107 /* This forks the method and inits the communication channel */
108 bool pkgAcquire::Worker::Start()
110 // Get the method path
111 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
112 if (FileExists(Method
) == false)
113 return _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
116 clog
<< "Starting method '" << Method
<< '\'' << endl
;
119 int Pipes
[4] = {-1,-1,-1,-1};
120 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
122 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
123 for (int I
= 0; I
!= 4; I
++)
127 for (int I
= 0; I
!= 4; I
++)
128 SetCloseExec(Pipes
[I
],true);
130 // Fork off the process
131 Process
= ExecFork();
135 dup2(Pipes
[1],STDOUT_FILENO
);
136 dup2(Pipes
[2],STDIN_FILENO
);
137 SetCloseExec(STDOUT_FILENO
,false);
138 SetCloseExec(STDIN_FILENO
,false);
139 SetCloseExec(STDERR_FILENO
,false);
142 Args
[0] = Method
.c_str();
144 execv(Args
[0],(char **)Args
);
145 cerr
<< "Failed to exec method " << Args
[0] << endl
;
152 SetNonBlock(Pipes
[0],true);
153 SetNonBlock(Pipes
[3],true);
159 // Read the configuration data
160 if (WaitFd(InFd
) == false ||
161 ReadMessages() == false)
162 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
171 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
172 // ---------------------------------------------------------------------
174 bool pkgAcquire::Worker::ReadMessages()
176 if (::ReadMessages(InFd
,MessageQueue
) == false)
177 return MethodFailure();
181 // Worker::RunMessage - Empty the message queue /*{{{*/
182 // ---------------------------------------------------------------------
183 /* This takes the messages from the message queue and runs them through
184 the parsers in order. */
185 bool pkgAcquire::Worker::RunMessages()
187 while (MessageQueue
.empty() == false)
189 string Message
= MessageQueue
.front();
190 MessageQueue
.erase(MessageQueue
.begin());
193 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
195 // Fetch the message number
197 int Number
= strtol(Message
.c_str(),&End
,10);
198 if (End
== Message
.c_str())
199 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
201 string URI
= LookupTag(Message
,"URI");
202 pkgAcquire::Queue::QItem
*Itm
= 0;
203 if (URI
.empty() == false)
204 Itm
= OwnerQ
->FindItem(URI
,this);
206 // Determine the message number and dispatch
211 if (Capabilities(Message
) == false)
212 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
218 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
223 Status
= LookupTag(Message
,"Message");
231 _error
->Error("Method gave invalid 200 URI Start message");
237 TotalSize
= atoi(LookupTag(Message
,"Size","0").c_str());
238 ResumePoint
= atoi(LookupTag(Message
,"Resume-Point","0").c_str());
239 Itm
->Owner
->Start(Message
,atoi(LookupTag(Message
,"Size","0").c_str()));
241 // Display update before completion
242 if (Log
!= 0 && Log
->MorePulses
== true)
243 Log
->Pulse(Itm
->Owner
->GetOwner());
256 _error
->Error("Method gave invalid 201 URI Done message");
260 pkgAcquire::Item
*Owner
= Itm
->Owner
;
261 pkgAcquire::ItemDesc Desc
= *Itm
;
263 // Display update before completion
264 if (Log
!= 0 && Log
->MorePulses
== true)
265 Log
->Pulse(Owner
->GetOwner());
267 OwnerQ
->ItemDone(Itm
);
268 if (TotalSize
!= 0 &&
269 (unsigned)atoi(LookupTag(Message
,"Size","0").c_str()) != TotalSize
)
270 _error
->Warning("Bizarre Error - File size is not what the server reported %s %lu",
271 LookupTag(Message
,"Size","0").c_str(),TotalSize
);
273 Owner
->Done(Message
,atoi(LookupTag(Message
,"Size","0").c_str()),
274 LookupTag(Message
,"MD5-Hash"),Config
);
277 // Log that we are done
280 if (StringToBool(LookupTag(Message
,"IMS-Hit"),false) == true ||
281 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false) == true)
283 /* Hide 'hits' for local only sources - we also manage to
285 if (Config
->LocalOnly
== false)
299 _error
->Error("Method gave invalid 400 URI Failure message");
303 // Display update before completion
304 if (Log
!= 0 && Log
->MorePulses
== true)
305 Log
->Pulse(Itm
->Owner
->GetOwner());
307 pkgAcquire::Item
*Owner
= Itm
->Owner
;
308 pkgAcquire::ItemDesc Desc
= *Itm
;
309 OwnerQ
->ItemDone(Itm
);
310 Owner
->Failed(Message
,Config
);
319 // 401 General Failure
321 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
326 MediaChange(Message
);
333 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
334 // ---------------------------------------------------------------------
335 /* This parses the capabilities message and dumps it into the configuration
337 bool pkgAcquire::Worker::Capabilities(string Message
)
342 Config
->Version
= LookupTag(Message
,"Version");
343 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
344 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
345 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
346 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
347 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
348 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
353 clog
<< "Configured access method " << Config
->Access
<< endl
;
354 clog
<< "Version:" << Config
->Version
<<
355 " SingleInstance:" << Config
->SingleInstance
<<
356 " Pipeline:" << Config
->Pipeline
<<
357 " SendConfig:" << Config
->SendConfig
<<
358 " LocalOnly: " << Config
->LocalOnly
<<
359 " NeedsCleanup: " << Config
->NeedsCleanup
<<
360 " Removable: " << Config
->Removable
<< endl
;
366 // Worker::MediaChange - Request a media change /*{{{*/
367 // ---------------------------------------------------------------------
369 bool pkgAcquire::Worker::MediaChange(string Message
)
371 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
374 string Media
= LookupTag(Message
,"Media");
375 string Drive
= LookupTag(Message
,"Drive");
376 ostringstream msg
,status
;
377 ioprintf(msg
,_("Please insert the disc labeled: "
379 "in the drive '%s' and press enter."),
380 Media
.c_str(),Drive
.c_str());
381 status
<< "media-change: " // message
382 << Media
<< ":" // media
383 << Drive
<< ":" // drive
384 << msg
.str() // l10n message
386 write(status_fd
, status
.str().c_str(), status
.str().size());
389 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
390 LookupTag(Message
,"Drive")) == false)
393 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
395 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
402 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
404 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
410 // Worker::SendConfiguration - Send the config to the method /*{{{*/
411 // ---------------------------------------------------------------------
413 bool pkgAcquire::Worker::SendConfiguration()
415 if (Config
->SendConfig
== false)
421 string Message
= "601 Configuration\n";
422 Message
.reserve(2000);
424 /* Write out all of the configuration directives by walking the
425 configuration tree */
426 const Configuration::Item
*Top
= _config
->Tree(0);
429 if (Top
->Value
.empty() == false)
431 string Line
= "Config-Item: " + QuoteString(Top
->FullTag(),"=\"\n") + "=";
432 Line
+= QuoteString(Top
->Value
,"\n") + '\n';
442 while (Top
!= 0 && Top
->Next
== 0)
450 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
457 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
458 // ---------------------------------------------------------------------
459 /* Send a URI Acquire message to the method */
460 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
465 string Message
= "600 URI Acquire\n";
466 Message
.reserve(300);
467 Message
+= "URI: " + Item
->URI
;
468 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
469 Message
+= Item
->Owner
->Custom600Headers();
473 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
480 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
481 // ---------------------------------------------------------------------
483 bool pkgAcquire::Worker::OutFdReady()
488 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
490 while (Res
< 0 && errno
== EINTR
);
493 return MethodFailure();
495 // Hmm.. this should never happen.
499 OutQueue
.erase(0,Res
);
500 if (OutQueue
.empty() == true)
506 // Worker::InFdRead - In bound FD is ready /*{{{*/
507 // ---------------------------------------------------------------------
509 bool pkgAcquire::Worker::InFdReady()
511 if (ReadMessages() == false)
517 // Worker::MethodFailure - Called when the method fails /*{{{*/
518 // ---------------------------------------------------------------------
519 /* This is called when the method is belived to have failed, probably because
521 bool pkgAcquire::Worker::MethodFailure()
523 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
525 ExecWait(Process
,Access
.c_str(),true);
534 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
539 // Worker::Pulse - Called periodically /*{{{*/
540 // ---------------------------------------------------------------------
542 void pkgAcquire::Worker::Pulse()
544 if (CurrentItem
== 0)
548 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
550 CurrentSize
= Buf
.st_size
;
552 // Hmm? Should not happen...
553 if (CurrentSize
> TotalSize
&& TotalSize
!= 0)
554 TotalSize
= CurrentSize
;
557 // Worker::ItemDone - Called when the current item is finished /*{{{*/
558 // ---------------------------------------------------------------------
560 void pkgAcquire::Worker::ItemDone()