]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire-worker.cc
Test acquire failover
[apt.git] / apt-pkg / acquire-worker.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
76d97c26 3// $Id: acquire-worker.cc,v 1.15 1998/12/05 01:45:20 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire Worker
7
3b5421b4
AL
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
0118833a
AL
12 ##################################################################### */
13 /*}}}*/
14// Include Files /*{{{*/
15#ifdef __GNUG__
16#pragma implementation "apt-pkg/acquire-worker.h"
3b5421b4 17#endif
0118833a 18#include <apt-pkg/acquire-worker.h>
0a8a80e5 19#include <apt-pkg/acquire-item.h>
3b5421b4
AL
20#include <apt-pkg/configuration.h>
21#include <apt-pkg/error.h>
22#include <apt-pkg/fileutl.h>
23#include <strutl.h>
24
8267fe24 25#include <sys/stat.h>
3b5421b4
AL
26#include <unistd.h>
27#include <signal.h>
93641593 28#include <wait.h>
542ec555 29#include <stdio.h>
3b5421b4
AL
30 /*}}}*/
31
32// Worker::Worker - Constructor for Queue startup /*{{{*/
33// ---------------------------------------------------------------------
34/* */
8267fe24
AL
35pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
36 pkgAcquireStatus *Log) : Log(Log)
3b5421b4
AL
37{
38 OwnerQ = Q;
0a8a80e5
AL
39 Config = Cnf;
40 Access = Cnf->Access;
41 CurrentItem = 0;
3b5421b4
AL
42
43 Construct();
44}
45 /*}}}*/
46// Worker::Worker - Constructor for method config startup /*{{{*/
47// ---------------------------------------------------------------------
48/* */
49pkgAcquire::Worker::Worker(MethodConfig *Cnf)
50{
51 OwnerQ = 0;
52 Config = Cnf;
53 Access = Cnf->Access;
0a8a80e5
AL
54 CurrentItem = 0;
55
3b5421b4
AL
56 Construct();
57}
58 /*}}}*/
59// Worker::Construct - Constructor helper /*{{{*/
60// ---------------------------------------------------------------------
61/* */
62void pkgAcquire::Worker::Construct()
63{
0a8a80e5
AL
64 NextQueue = 0;
65 NextAcquire = 0;
3b5421b4
AL
66 Process = -1;
67 InFd = -1;
68 OutFd = -1;
0a8a80e5
AL
69 OutReady = false;
70 InReady = false;
3b5421b4
AL
71 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
72}
73 /*}}}*/
74// Worker::~Worker - Destructor /*{{{*/
75// ---------------------------------------------------------------------
76/* */
77pkgAcquire::Worker::~Worker()
78{
79 close(InFd);
80 close(OutFd);
81
82 if (Process > 0)
0a8a80e5 83 {
3b5421b4 84 kill(Process,SIGINT);
0a8a80e5
AL
85 if (waitpid(Process,0,0) != Process)
86 _error->Warning("I waited but nothing was there!");
87 }
3b5421b4
AL
88}
89 /*}}}*/
90// Worker::Start - Start the worker process /*{{{*/
91// ---------------------------------------------------------------------
92/* This forks the method and inits the communication channel */
93bool pkgAcquire::Worker::Start()
94{
95 // Get the method path
96 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
97 if (FileExists(Method) == false)
98 return _error->Error("The method driver %s could not be found.",Method.c_str());
99
100 if (Debug == true)
101 clog << "Starting method '" << Method << '\'' << endl;
102
103 // Create the pipes
104 int Pipes[4] = {-1,-1,-1,-1};
105 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
106 {
107 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
108 for (int I = 0; I != 4; I++)
109 close(Pipes[I]);
110 return false;
111 }
8b89e57f
AL
112 for (int I = 0; I != 4; I++)
113 SetCloseExec(Pipes[0],true);
114
3b5421b4
AL
115 // Fork off the process
116 Process = fork();
117 if (Process < 0)
118 {
119 cerr << "FATAL -> Failed to fork." << endl;
120 exit(100);
121 }
122
123 // Spawn the subprocess
124 if (Process == 0)
125 {
126 // Setup the FDs
127 dup2(Pipes[1],STDOUT_FILENO);
128 dup2(Pipes[2],STDIN_FILENO);
129 dup2(((filebuf *)clog.rdbuf())->fd(),STDERR_FILENO);
3b5421b4
AL
130 SetCloseExec(STDOUT_FILENO,false);
131 SetCloseExec(STDIN_FILENO,false);
132 SetCloseExec(STDERR_FILENO,false);
133
134 const char *Args[2];
135 Args[0] = Method.c_str();
136 Args[1] = 0;
137 execv(Args[0],(char **)Args);
138 cerr << "Failed to exec method " << Args[0] << endl;
139 exit(100);
140 }
141
142 // Fix up our FDs
143 InFd = Pipes[0];
144 OutFd = Pipes[3];
145 SetNonBlock(Pipes[0],true);
146 SetNonBlock(Pipes[3],true);
147 close(Pipes[1]);
148 close(Pipes[2]);
0a8a80e5
AL
149 OutReady = false;
150 InReady = true;
3b5421b4
AL
151
152 // Read the configuration data
153 if (WaitFd(InFd) == false ||
154 ReadMessages() == false)
155 return _error->Error("Method %s did not start correctly",Method.c_str());
156
157 RunMessages();
8b89e57f
AL
158 if (OwnerQ != 0)
159 SendConfiguration();
3b5421b4
AL
160
161 return true;
162}
163 /*}}}*/
164// Worker::ReadMessages - Read all pending messages into the list /*{{{*/
165// ---------------------------------------------------------------------
0a8a80e5 166/* */
3b5421b4
AL
167bool pkgAcquire::Worker::ReadMessages()
168{
0a8a80e5
AL
169 if (::ReadMessages(InFd,MessageQueue) == false)
170 return MethodFailure();
3b5421b4
AL
171 return true;
172}
173 /*}}}*/
3b5421b4
AL
174// Worker::RunMessage - Empty the message queue /*{{{*/
175// ---------------------------------------------------------------------
176/* This takes the messages from the message queue and runs them through
177 the parsers in order. */
178bool pkgAcquire::Worker::RunMessages()
179{
180 while (MessageQueue.empty() == false)
181 {
182 string Message = MessageQueue.front();
183 MessageQueue.erase(MessageQueue.begin());
0a8a80e5
AL
184
185 if (Debug == true)
186 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
3b5421b4
AL
187
188 // Fetch the message number
189 char *End;
190 int Number = strtol(Message.c_str(),&End,10);
191 if (End == Message.c_str())
192 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
193
c88edf1d
AL
194 string URI = LookupTag(Message,"URI");
195 pkgAcquire::Queue::QItem *Itm = 0;
196 if (URI.empty() == false)
197 Itm = OwnerQ->FindItem(URI,this);
bfd22fc0 198
3b5421b4
AL
199 // Determine the message number and dispatch
200 switch (Number)
201 {
0a8a80e5 202 // 100 Capabilities
3b5421b4
AL
203 case 100:
204 if (Capabilities(Message) == false)
205 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
206 break;
0a8a80e5
AL
207
208 // 101 Log
209 case 101:
210 if (Debug == true)
211 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
212 break;
213
214 // 102 Status
215 case 102:
216 Status = LookupTag(Message,"Message");
217 break;
218
219 // 200 URI Start
220 case 200:
c88edf1d
AL
221 {
222 if (Itm == 0)
223 {
93bf083d 224 _error->Error("Method gave invalid 200 URI Start message");
c88edf1d
AL
225 break;
226 }
8267fe24 227
c88edf1d
AL
228 CurrentItem = Itm;
229 CurrentSize = 0;
230 TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
8267fe24 231 Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
c88edf1d 232
8267fe24
AL
233 if (Log != 0)
234 Log->Fetch(*Itm);
235
c88edf1d
AL
236 break;
237 }
0a8a80e5
AL
238
239 // 201 URI Done
240 case 201:
c88edf1d
AL
241 {
242 if (Itm == 0)
243 {
93bf083d 244 _error->Error("Method gave invalid 201 URI Done message");
c88edf1d
AL
245 break;
246 }
247
bfd22fc0 248 pkgAcquire::Item *Owner = Itm->Owner;
8267fe24 249 pkgAcquire::ItemDesc Desc = *Itm;
be4401bf 250 OwnerQ->ItemDone(Itm);
bfd22fc0 251 Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
c88edf1d 252 LookupTag(Message,"MD5-Hash"));
8267fe24
AL
253 ItemDone();
254
255 // Log that we are done
256 if (Log != 0)
257 {
258 if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
259 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
260 Log->IMSHit(Desc);
261 else
262 Log->Done(Desc);
263 }
c88edf1d
AL
264 break;
265 }
0a8a80e5
AL
266
267 // 400 URI Failure
268 case 400:
c88edf1d
AL
269 {
270 if (Itm == 0)
271 {
93bf083d 272 _error->Error("Method gave invalid 400 URI Failure message");
c88edf1d
AL
273 break;
274 }
275
bfd22fc0 276 pkgAcquire::Item *Owner = Itm->Owner;
8267fe24 277 pkgAcquire::ItemDesc Desc = *Itm;
c88edf1d 278 OwnerQ->ItemDone(Itm);
bfd22fc0 279 Owner->Failed(Message);
8267fe24
AL
280 ItemDone();
281
282 if (Log != 0)
283 Log->Fail(Desc);
284
c88edf1d
AL
285 break;
286 }
0a8a80e5
AL
287
288 // 401 General Failure
289 case 401:
290 _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
291 break;
542ec555
AL
292
293 // 403 Media Change
294 case 403:
295 MediaChange(Message);
296 break;
3b5421b4
AL
297 }
298 }
299 return true;
300}
301 /*}}}*/
302// Worker::Capabilities - 100 Capabilities handler /*{{{*/
303// ---------------------------------------------------------------------
304/* This parses the capabilities message and dumps it into the configuration
305 structure. */
306bool pkgAcquire::Worker::Capabilities(string Message)
307{
308 if (Config == 0)
309 return true;
310
311 Config->Version = LookupTag(Message,"Version");
312 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
0a8a80e5
AL
313 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
314 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
e331f6ed 315 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
3b5421b4
AL
316
317 // Some debug text
318 if (Debug == true)
319 {
320 clog << "Configured access method " << Config->Access << endl;
0a8a80e5 321 clog << "Version:" << Config->Version << " SingleInstance:" <<
a72ace20 322 Config->SingleInstance <<
0a8a80e5
AL
323 " Pipeline:" << Config->Pipeline << " SendConfig:" <<
324 Config->SendConfig << endl;
3b5421b4
AL
325 }
326
542ec555
AL
327 return true;
328}
329 /*}}}*/
330// Worker::MediaChange - Request a media change /*{{{*/
331// ---------------------------------------------------------------------
332/* */
333bool pkgAcquire::Worker::MediaChange(string Message)
334{
335 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
336 LookupTag(Message,"Drive")) == false)
337 {
338 char S[300];
339 sprintf(S,"603 Media Changed\nFailed: true\n\n");
340 if (Debug == true)
341 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
342 OutQueue += S;
343 OutReady = true;
344 return true;
345 }
346
347 char S[300];
348 sprintf(S,"603 Media Changed\n\n");
349 if (Debug == true)
350 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
351 OutQueue += S;
352 OutReady = true;
3b5421b4
AL
353 return true;
354}
0118833a 355 /*}}}*/
0a8a80e5
AL
356// Worker::SendConfiguration - Send the config to the method /*{{{*/
357// ---------------------------------------------------------------------
358/* */
359bool pkgAcquire::Worker::SendConfiguration()
360{
361 if (Config->SendConfig == false)
362 return true;
363
364 if (OutFd == -1)
365 return false;
366
367 string Message = "601 Configuration\n";
368 Message.reserve(2000);
369
370 /* Write out all of the configuration directives by walking the
371 configuration tree */
372 const Configuration::Item *Top = _config->Tree(0);
373 for (; Top != 0;)
374 {
375 if (Top->Value.empty() == false)
376 {
377 string Line = "Config-Item: " + Top->FullTag() + "=";
378 Line += QuoteString(Top->Value,"\n") + '\n';
379 Message += Line;
380 }
381
382 if (Top->Child != 0)
383 {
384 Top = Top->Child;
385 continue;
386 }
387
388 while (Top != 0 && Top->Next == 0)
389 Top = Top->Parent;
390 if (Top != 0)
391 Top = Top->Next;
392 }
393 Message += '\n';
394
395 if (Debug == true)
396 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
397 OutQueue += Message;
398 OutReady = true;
399
400 return true;
401}
402 /*}}}*/
403// Worker::QueueItem - Add an item to the outbound queue /*{{{*/
404// ---------------------------------------------------------------------
405/* Send a URI Acquire message to the method */
406bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
407{
408 if (OutFd == -1)
409 return false;
410
411 string Message = "600 URI Acquire\n";
412 Message.reserve(300);
413 Message += "URI: " + Item->URI;
414 Message += "\nFilename: " + Item->Owner->DestFile;
415 Message += Item->Owner->Custom600Headers();
416 Message += "\n\n";
417
418 if (Debug == true)
419 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
420 OutQueue += Message;
421 OutReady = true;
422
423 return true;
424}
425 /*}}}*/
426// Worker::OutFdRead - Out bound FD is ready /*{{{*/
427// ---------------------------------------------------------------------
428/* */
429bool pkgAcquire::Worker::OutFdReady()
430{
431 int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
432 if (Res <= 0)
433 return MethodFailure();
434
435 // Hmm.. this should never happen.
436 if (Res < 0)
437 return true;
438
439 OutQueue.erase(0,Res);
440 if (OutQueue.empty() == true)
441 OutReady = false;
442
443 return true;
444}
445 /*}}}*/
446// Worker::InFdRead - In bound FD is ready /*{{{*/
447// ---------------------------------------------------------------------
448/* */
449bool pkgAcquire::Worker::InFdReady()
450{
451 if (ReadMessages() == false)
452 return false;
453 RunMessages();
454 return true;
455}
456 /*}}}*/
457// Worker::MethodFailure - Called when the method fails /*{{{*/
458// ---------------------------------------------------------------------
459/* This is called when the method is belived to have failed, probably because
460 read returned -1. */
461bool pkgAcquire::Worker::MethodFailure()
462{
76d97c26
AL
463 _error->Error("Method %s has died unexpectedly!",Access.c_str());
464
0a8a80e5
AL
465 if (waitpid(Process,0,0) != Process)
466 _error->Warning("I waited but nothing was there!");
467 Process = -1;
468 close(InFd);
469 close(OutFd);
470 InFd = -1;
471 OutFd = -1;
472 OutReady = false;
473 InReady = false;
474 OutQueue = string();
475 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
476
477 return false;
478}
479 /*}}}*/
8267fe24
AL
480// Worker::Pulse - Called periodically /*{{{*/
481// ---------------------------------------------------------------------
482/* */
483void pkgAcquire::Worker::Pulse()
484{
485 if (CurrentItem == 0)
486 return;
542ec555 487
8267fe24
AL
488 struct stat Buf;
489 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
490 return;
491 CurrentSize = Buf.st_size;
492}
493 /*}}}*/
494// Worker::ItemDone - Called when the current item is finished /*{{{*/
495// ---------------------------------------------------------------------
496/* */
497void pkgAcquire::Worker::ItemDone()
498{
499 CurrentItem = 0;
500 CurrentSize = 0;
501 TotalSize = 0;
502 Status = string();
503}
504 /*}}}*/