1 // -*- mode: cpp; mode: fold -*-
3 // $Id: acquire-worker.cc,v 1.34 2001/05/22 04:42:54 jgg Exp $
4 /* ######################################################################
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
12 ##################################################################### */
14 // Include Files /*{{{*/
17 #include <apt-pkg/acquire.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/hashes.h>
37 #include <sys/types.h>
46 // Worker::Worker - Constructor for Queue startup /*{{{*/
47 // ---------------------------------------------------------------------
49 pkgAcquire::Worker::Worker(Queue
*Q
,MethodConfig
*Cnf
,
50 pkgAcquireStatus
*Log
) : Log(Log
)
62 // Worker::Worker - Constructor for method config startup /*{{{*/
63 // ---------------------------------------------------------------------
65 pkgAcquire::Worker::Worker(MethodConfig
*Cnf
)
77 // Worker::Construct - Constructor helper /*{{{*/
78 // ---------------------------------------------------------------------
80 void pkgAcquire::Worker::Construct()
89 Debug
= _config
->FindB("Debug::pkgAcquire::Worker",false);
92 // Worker::~Worker - Destructor /*{{{*/
93 // ---------------------------------------------------------------------
95 pkgAcquire::Worker::~Worker()
102 /* Closing of stdin is the signal to exit and die when the process
103 indicates it needs cleanup */
104 if (Config
->NeedsCleanup
== false)
105 kill(Process
,SIGINT
);
106 ExecWait(Process
,Access
.c_str(),true);
110 // Worker::Start - Start the worker process /*{{{*/
111 // ---------------------------------------------------------------------
112 /* This forks the method and inits the communication channel */
113 bool pkgAcquire::Worker::Start()
115 // Get the method path
116 string Method
= _config
->FindDir("Dir::Bin::Methods") + Access
;
117 if (FileExists(Method
) == false)
119 _error
->Error(_("The method driver %s could not be found."),Method
.c_str());
120 if (Access
== "https")
121 _error
->Notice(_("Is the package %s installed?"), "apt-transport-https");
126 clog
<< "Starting method '" << Method
<< '\'' << endl
;
129 int Pipes
[4] = {-1,-1,-1,-1};
130 if (pipe(Pipes
) != 0 || pipe(Pipes
+2) != 0)
132 _error
->Errno("pipe","Failed to create IPC pipe to subprocess");
133 for (int I
= 0; I
!= 4; I
++)
137 for (int I
= 0; I
!= 4; I
++)
138 SetCloseExec(Pipes
[I
],true);
140 // Fork off the process
141 Process
= ExecFork();
145 dup2(Pipes
[1],STDOUT_FILENO
);
146 dup2(Pipes
[2],STDIN_FILENO
);
147 SetCloseExec(STDOUT_FILENO
,false);
148 SetCloseExec(STDIN_FILENO
,false);
149 SetCloseExec(STDERR_FILENO
,false);
152 Args
[0] = Method
.c_str();
154 execv(Args
[0],(char **)Args
);
155 cerr
<< "Failed to exec method " << Args
[0] << endl
;
162 SetNonBlock(Pipes
[0],true);
163 SetNonBlock(Pipes
[3],true);
169 // Read the configuration data
170 if (WaitFd(InFd
) == false ||
171 ReadMessages() == false)
172 return _error
->Error(_("Method %s did not start correctly"),Method
.c_str());
181 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
182 // ---------------------------------------------------------------------
184 bool pkgAcquire::Worker::ReadMessages()
186 if (::ReadMessages(InFd
,MessageQueue
) == false)
187 return MethodFailure();
191 // Worker::RunMessage - Empty the message queue /*{{{*/
192 // ---------------------------------------------------------------------
193 /* This takes the messages from the message queue and runs them through
194 the parsers in order. */
195 bool pkgAcquire::Worker::RunMessages()
197 while (MessageQueue
.empty() == false)
199 string Message
= MessageQueue
.front();
200 MessageQueue
.erase(MessageQueue
.begin());
203 clog
<< " <- " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
205 // Fetch the message number
207 int Number
= strtol(Message
.c_str(),&End
,10);
208 if (End
== Message
.c_str())
209 return _error
->Error("Invalid message from method %s: %s",Access
.c_str(),Message
.c_str());
211 string URI
= LookupTag(Message
,"URI");
212 pkgAcquire::Queue::QItem
*Itm
= 0;
213 if (URI
.empty() == false)
214 Itm
= OwnerQ
->FindItem(URI
,this);
216 // update used mirror
217 string UsedMirror
= LookupTag(Message
,"UsedMirror", "");
218 if (!UsedMirror
.empty() &&
220 Itm
->Description
.find(" ") != string::npos
)
222 Itm
->Description
.replace(0, Itm
->Description
.find(" "), UsedMirror
);
223 // FIXME: will we need this as well?
224 //Itm->ShortDesc = UsedMirror;
227 // Determine the message number and dispatch
232 if (Capabilities(Message
) == false)
233 return _error
->Error("Unable to process Capabilities message from %s",Access
.c_str());
239 clog
<< " <- (log) " << LookupTag(Message
,"Message") << endl
;
244 Status
= LookupTag(Message
,"Message");
252 _error
->Error("Method gave invalid 103 Redirect message");
256 string NewURI
= LookupTag(Message
,"New-URI",URI
.c_str());
261 pkgAcquire::Item
*Owner
= Itm
->Owner
;
262 pkgAcquire::ItemDesc Desc
= *Itm
;
264 // Change the status so that it can be dequeued
265 Owner
->Status
= pkgAcquire::Item::StatIdle
;
266 // Mark the item as done (taking care of all queues)
267 // and then put it in the main queue again
268 OwnerQ
->ItemDone(Itm
);
269 OwnerQ
->Owner
->Enqueue(Desc
);
281 _error
->Error("Method gave invalid 200 URI Start message");
287 TotalSize
= strtoull(LookupTag(Message
,"Size","0").c_str(), NULL
, 10);
288 ResumePoint
= strtoull(LookupTag(Message
,"Resume-Point","0").c_str(), NULL
, 10);
289 Itm
->Owner
->Start(Message
, TotalSize
);
291 // Display update before completion
292 if (Log
!= 0 && Log
->MorePulses
== true)
293 Log
->Pulse(Itm
->Owner
->GetOwner());
306 _error
->Error("Method gave invalid 201 URI Done message");
310 pkgAcquire::Item
*Owner
= Itm
->Owner
;
311 pkgAcquire::ItemDesc Desc
= *Itm
;
313 if (RealFileExists(Owner
->DestFile
))
314 ChangeOwnerAndPermissionOfFile("201::URIDone", Owner
->DestFile
.c_str(), "root", "root", 0644);
316 // Display update before completion
317 if (Log
!= 0 && Log
->MorePulses
== true)
318 Log
->Pulse(Owner
->GetOwner());
320 OwnerQ
->ItemDone(Itm
);
322 HashStringList
const ExpectedHashes
= Owner
->GetExpectedHashes();
323 // see if we got hashes to verify
324 HashStringList ReceivedHashes
;
325 for (char const * const * type
= HashString::SupportedHashes(); *type
!= NULL
; ++type
)
327 std::string
const tagname
= std::string(*type
) + "-Hash";
328 std::string
const hashsum
= LookupTag(Message
, tagname
.c_str());
329 if (hashsum
.empty() == false)
330 ReceivedHashes
.push_back(HashString(*type
, hashsum
));
332 // not all methods always sent Hashes our way
333 if (ExpectedHashes
.usable() == true && ReceivedHashes
.usable() == false)
335 std::string
const filename
= LookupTag(Message
, "Filename", Owner
->DestFile
.c_str());
336 if (filename
.empty() == false && RealFileExists(filename
))
338 Hashes
calc(ExpectedHashes
);
339 FileFd
file(filename
, FileFd::ReadOnly
, FileFd::None
);
341 ReceivedHashes
= calc
.GetHashStringList();
345 if(_config
->FindB("Debug::pkgAcquire::Auth", false) == true)
347 std::clog
<< "201 URI Done: " << Owner
->DescURI() << endl
348 << "ReceivedHash:" << endl
;
349 for (HashStringList::const_iterator hs
= ReceivedHashes
.begin(); hs
!= ReceivedHashes
.end(); ++hs
)
350 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
351 std::clog
<< "ExpectedHash:" << endl
;
352 for (HashStringList::const_iterator hs
= ExpectedHashes
.begin(); hs
!= ExpectedHashes
.end(); ++hs
)
353 std::clog
<< "\t- " << hs
->toStr() << std::endl
;
357 // decide if what we got is what we expected
358 bool consideredOkay
= false;
359 bool const isIMSHit
= StringToBool(LookupTag(Message
,"IMS-Hit"),false) ||
360 StringToBool(LookupTag(Message
,"Alt-IMS-Hit"),false);
361 if (ExpectedHashes
.usable())
363 if (ReceivedHashes
.usable() == false)
365 /* IMS-Hits can't be checked here as we will have uncompressed file,
366 but the hashes for the compressed file. What we have was good through
367 so all we have to ensure later is that we are not stalled. */
368 consideredOkay
= isIMSHit
;
370 else if (ReceivedHashes
== ExpectedHashes
)
371 consideredOkay
= true;
373 consideredOkay
= false;
376 else if (Owner
->HashesRequired() == true)
377 consideredOkay
= false;
379 consideredOkay
= true;
381 if (consideredOkay
== true)
383 Owner
->Done(Message
, ReceivedHashes
, Config
);
386 // Log that we are done
391 /* Hide 'hits' for local only sources - we also manage to
393 if (Config
->LocalOnly
== false)
402 Owner
->Status
= pkgAcquire::Item::StatAuthError
;
403 Owner
->Failed(Message
,Config
);
417 std::string
const msg
= LookupTag(Message
,"Message");
418 _error
->Error("Method gave invalid 400 URI Failure message: %s", msg
.c_str());
422 // Display update before completion
423 if (Log
!= 0 && Log
->MorePulses
== true)
424 Log
->Pulse(Itm
->Owner
->GetOwner());
426 pkgAcquire::Item
*Owner
= Itm
->Owner
;
427 pkgAcquire::ItemDesc Desc
= *Itm
;
429 if (RealFileExists(Owner
->DestFile
))
430 ChangeOwnerAndPermissionOfFile("400::URIFailure", Owner
->DestFile
.c_str(), "root", "root", 0644);
432 OwnerQ
->ItemDone(Itm
);
435 if(LookupTag(Message
,"FailReason") == "Timeout" ||
436 LookupTag(Message
,"FailReason") == "TmpResolveFailure" ||
437 LookupTag(Message
,"FailReason") == "ResolveFailure" ||
438 LookupTag(Message
,"FailReason") == "ConnectionRefused")
439 Owner
->Status
= pkgAcquire::Item::StatTransientNetworkError
;
441 Owner
->Failed(Message
,Config
);
450 // 401 General Failure
452 _error
->Error("Method %s General failure: %s",Access
.c_str(),LookupTag(Message
,"Message").c_str());
457 MediaChange(Message
);
464 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
465 // ---------------------------------------------------------------------
466 /* This parses the capabilities message and dumps it into the configuration
468 bool pkgAcquire::Worker::Capabilities(string Message
)
473 Config
->Version
= LookupTag(Message
,"Version");
474 Config
->SingleInstance
= StringToBool(LookupTag(Message
,"Single-Instance"),false);
475 Config
->Pipeline
= StringToBool(LookupTag(Message
,"Pipeline"),false);
476 Config
->SendConfig
= StringToBool(LookupTag(Message
,"Send-Config"),false);
477 Config
->LocalOnly
= StringToBool(LookupTag(Message
,"Local-Only"),false);
478 Config
->NeedsCleanup
= StringToBool(LookupTag(Message
,"Needs-Cleanup"),false);
479 Config
->Removable
= StringToBool(LookupTag(Message
,"Removable"),false);
484 clog
<< "Configured access method " << Config
->Access
<< endl
;
485 clog
<< "Version:" << Config
->Version
<<
486 " SingleInstance:" << Config
->SingleInstance
<<
487 " Pipeline:" << Config
->Pipeline
<<
488 " SendConfig:" << Config
->SendConfig
<<
489 " LocalOnly: " << Config
->LocalOnly
<<
490 " NeedsCleanup: " << Config
->NeedsCleanup
<<
491 " Removable: " << Config
->Removable
<< endl
;
497 // Worker::MediaChange - Request a media change /*{{{*/
498 // ---------------------------------------------------------------------
500 bool pkgAcquire::Worker::MediaChange(string Message
)
502 int status_fd
= _config
->FindI("APT::Status-Fd",-1);
505 string Media
= LookupTag(Message
,"Media");
506 string Drive
= LookupTag(Message
,"Drive");
507 ostringstream msg
,status
;
508 ioprintf(msg
,_("Please insert the disc labeled: "
510 "in the drive '%s' and press enter."),
511 Media
.c_str(),Drive
.c_str());
512 status
<< "media-change: " // message
513 << Media
<< ":" // media
514 << Drive
<< ":" // drive
515 << msg
.str() // l10n message
518 std::string
const dlstatus
= status
.str();
519 FileFd::Write(status_fd
, dlstatus
.c_str(), dlstatus
.size());
522 if (Log
== 0 || Log
->MediaChange(LookupTag(Message
,"Media"),
523 LookupTag(Message
,"Drive")) == false)
526 snprintf(S
,sizeof(S
),"603 Media Changed\nFailed: true\n\n");
528 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
535 snprintf(S
,sizeof(S
),"603 Media Changed\n\n");
537 clog
<< " -> " << Access
<< ':' << QuoteString(S
,"\n") << endl
;
543 // Worker::SendConfiguration - Send the config to the method /*{{{*/
544 // ---------------------------------------------------------------------
546 bool pkgAcquire::Worker::SendConfiguration()
548 if (Config
->SendConfig
== false)
554 /* Write out all of the configuration directives by walking the
555 configuration tree */
556 std::ostringstream Message
;
557 Message
<< "601 Configuration\n";
558 _config
->Dump(Message
, NULL
, "Config-Item: %F=%V\n", false);
562 clog
<< " -> " << Access
<< ':' << QuoteString(Message
.str(),"\n") << endl
;
563 OutQueue
+= Message
.str();
569 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
570 // ---------------------------------------------------------------------
571 /* Send a URI Acquire message to the method */
572 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem
*Item
)
577 string Message
= "600 URI Acquire\n";
578 Message
.reserve(300);
579 Message
+= "URI: " + Item
->URI
;
580 Message
+= "\nFilename: " + Item
->Owner
->DestFile
;
581 HashStringList
const hsl
= Item
->Owner
->GetExpectedHashes();
582 for (HashStringList::const_iterator hs
= hsl
.begin(); hs
!= hsl
.end(); ++hs
)
583 Message
+= "\nExpected-" + hs
->HashType() + ": " + hs
->HashValue();
584 if(Item
->Owner
->FileSize
> 0)
587 strprintf(MaximumSize
, "%llu", Item
->Owner
->FileSize
);
588 Message
+= "\nMaximum-Size: " + MaximumSize
;
590 Message
+= Item
->Owner
->Custom600Headers();
593 if (RealFileExists(Item
->Owner
->DestFile
))
595 std::string SandboxUser
= _config
->Find("APT::Sandbox::User");
596 ChangeOwnerAndPermissionOfFile("Item::QueueURI", Item
->Owner
->DestFile
.c_str(),
597 SandboxUser
.c_str(), "root", 0600);
601 clog
<< " -> " << Access
<< ':' << QuoteString(Message
,"\n") << endl
;
608 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
609 // ---------------------------------------------------------------------
611 bool pkgAcquire::Worker::OutFdReady()
616 Res
= write(OutFd
,OutQueue
.c_str(),OutQueue
.length());
618 while (Res
< 0 && errno
== EINTR
);
621 return MethodFailure();
623 OutQueue
.erase(0,Res
);
624 if (OutQueue
.empty() == true)
630 // Worker::InFdRead - In bound FD is ready /*{{{*/
631 // ---------------------------------------------------------------------
633 bool pkgAcquire::Worker::InFdReady()
635 if (ReadMessages() == false)
641 // Worker::MethodFailure - Called when the method fails /*{{{*/
642 // ---------------------------------------------------------------------
643 /* This is called when the method is believed to have failed, probably because
645 bool pkgAcquire::Worker::MethodFailure()
647 _error
->Error("Method %s has died unexpectedly!",Access
.c_str());
649 // do not reap the child here to show meaningfull error to the user
650 ExecWait(Process
,Access
.c_str(),false);
659 MessageQueue
.erase(MessageQueue
.begin(),MessageQueue
.end());
664 // Worker::Pulse - Called periodically /*{{{*/
665 // ---------------------------------------------------------------------
667 void pkgAcquire::Worker::Pulse()
669 if (CurrentItem
== 0)
673 if (stat(CurrentItem
->Owner
->DestFile
.c_str(),&Buf
) != 0)
675 CurrentSize
= Buf
.st_size
;
678 // Worker::ItemDone - Called when the current item is finished /*{{{*/
679 // ---------------------------------------------------------------------
681 void pkgAcquire::Worker::ItemDone()