]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire-worker.cc
Retry support
[apt.git] / apt-pkg / acquire-worker.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
7d8afa39 3// $Id: acquire-worker.cc,v 1.19 1999/01/30 08:08:54 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire Worker
7
3b5421b4
AL
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
0118833a
AL
12 ##################################################################### */
13 /*}}}*/
14// Include Files /*{{{*/
15#ifdef __GNUG__
16#pragma implementation "apt-pkg/acquire-worker.h"
3b5421b4 17#endif
0118833a 18#include <apt-pkg/acquire-worker.h>
0a8a80e5 19#include <apt-pkg/acquire-item.h>
3b5421b4
AL
20#include <apt-pkg/configuration.h>
21#include <apt-pkg/error.h>
22#include <apt-pkg/fileutl.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
3b5421b4 24
8267fe24 25#include <sys/stat.h>
3b5421b4
AL
26#include <unistd.h>
27#include <signal.h>
93641593 28#include <wait.h>
542ec555 29#include <stdio.h>
3b5421b4
AL
30 /*}}}*/
31
32// Worker::Worker - Constructor for Queue startup /*{{{*/
33// ---------------------------------------------------------------------
34/* */
8267fe24
AL
35pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
36 pkgAcquireStatus *Log) : Log(Log)
3b5421b4
AL
37{
38 OwnerQ = Q;
0a8a80e5
AL
39 Config = Cnf;
40 Access = Cnf->Access;
41 CurrentItem = 0;
3b5421b4
AL
42
43 Construct();
44}
45 /*}}}*/
46// Worker::Worker - Constructor for method config startup /*{{{*/
47// ---------------------------------------------------------------------
48/* */
49pkgAcquire::Worker::Worker(MethodConfig *Cnf)
50{
51 OwnerQ = 0;
52 Config = Cnf;
53 Access = Cnf->Access;
0a8a80e5
AL
54 CurrentItem = 0;
55
3b5421b4
AL
56 Construct();
57}
58 /*}}}*/
59// Worker::Construct - Constructor helper /*{{{*/
60// ---------------------------------------------------------------------
61/* */
62void pkgAcquire::Worker::Construct()
63{
0a8a80e5
AL
64 NextQueue = 0;
65 NextAcquire = 0;
3b5421b4
AL
66 Process = -1;
67 InFd = -1;
68 OutFd = -1;
0a8a80e5
AL
69 OutReady = false;
70 InReady = false;
3b5421b4
AL
71 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
72}
73 /*}}}*/
74// Worker::~Worker - Destructor /*{{{*/
75// ---------------------------------------------------------------------
76/* */
77pkgAcquire::Worker::~Worker()
78{
79 close(InFd);
80 close(OutFd);
81
82 if (Process > 0)
0a8a80e5 83 {
3b5421b4 84 kill(Process,SIGINT);
0a8a80e5
AL
85 if (waitpid(Process,0,0) != Process)
86 _error->Warning("I waited but nothing was there!");
87 }
3b5421b4
AL
88}
89 /*}}}*/
90// Worker::Start - Start the worker process /*{{{*/
91// ---------------------------------------------------------------------
92/* This forks the method and inits the communication channel */
93bool pkgAcquire::Worker::Start()
94{
95 // Get the method path
96 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
97 if (FileExists(Method) == false)
98 return _error->Error("The method driver %s could not be found.",Method.c_str());
99
100 if (Debug == true)
101 clog << "Starting method '" << Method << '\'' << endl;
102
103 // Create the pipes
104 int Pipes[4] = {-1,-1,-1,-1};
105 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
106 {
107 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
108 for (int I = 0; I != 4; I++)
109 close(Pipes[I]);
110 return false;
111 }
8b89e57f
AL
112 for (int I = 0; I != 4; I++)
113 SetCloseExec(Pipes[0],true);
114
3b5421b4
AL
115 // Fork off the process
116 Process = fork();
117 if (Process < 0)
118 {
119 cerr << "FATAL -> Failed to fork." << endl;
120 exit(100);
121 }
122
123 // Spawn the subprocess
124 if (Process == 0)
125 {
126 // Setup the FDs
127 dup2(Pipes[1],STDOUT_FILENO);
128 dup2(Pipes[2],STDIN_FILENO);
129 dup2(((filebuf *)clog.rdbuf())->fd(),STDERR_FILENO);
3b5421b4
AL
130 SetCloseExec(STDOUT_FILENO,false);
131 SetCloseExec(STDIN_FILENO,false);
132 SetCloseExec(STDERR_FILENO,false);
133
134 const char *Args[2];
135 Args[0] = Method.c_str();
136 Args[1] = 0;
137 execv(Args[0],(char **)Args);
138 cerr << "Failed to exec method " << Args[0] << endl;
0dbb95d8 139 _exit(100);
3b5421b4
AL
140 }
141
142 // Fix up our FDs
143 InFd = Pipes[0];
144 OutFd = Pipes[3];
145 SetNonBlock(Pipes[0],true);
146 SetNonBlock(Pipes[3],true);
147 close(Pipes[1]);
148 close(Pipes[2]);
0a8a80e5
AL
149 OutReady = false;
150 InReady = true;
3b5421b4
AL
151
152 // Read the configuration data
153 if (WaitFd(InFd) == false ||
154 ReadMessages() == false)
155 return _error->Error("Method %s did not start correctly",Method.c_str());
156
157 RunMessages();
8b89e57f
AL
158 if (OwnerQ != 0)
159 SendConfiguration();
3b5421b4
AL
160
161 return true;
162}
163 /*}}}*/
164// Worker::ReadMessages - Read all pending messages into the list /*{{{*/
165// ---------------------------------------------------------------------
0a8a80e5 166/* */
3b5421b4
AL
167bool pkgAcquire::Worker::ReadMessages()
168{
0a8a80e5
AL
169 if (::ReadMessages(InFd,MessageQueue) == false)
170 return MethodFailure();
3b5421b4
AL
171 return true;
172}
173 /*}}}*/
3b5421b4
AL
174// Worker::RunMessage - Empty the message queue /*{{{*/
175// ---------------------------------------------------------------------
176/* This takes the messages from the message queue and runs them through
177 the parsers in order. */
178bool pkgAcquire::Worker::RunMessages()
179{
180 while (MessageQueue.empty() == false)
181 {
182 string Message = MessageQueue.front();
183 MessageQueue.erase(MessageQueue.begin());
0a8a80e5
AL
184
185 if (Debug == true)
186 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
3b5421b4
AL
187
188 // Fetch the message number
189 char *End;
190 int Number = strtol(Message.c_str(),&End,10);
191 if (End == Message.c_str())
192 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
193
c88edf1d
AL
194 string URI = LookupTag(Message,"URI");
195 pkgAcquire::Queue::QItem *Itm = 0;
196 if (URI.empty() == false)
197 Itm = OwnerQ->FindItem(URI,this);
bfd22fc0 198
3b5421b4
AL
199 // Determine the message number and dispatch
200 switch (Number)
201 {
0a8a80e5 202 // 100 Capabilities
3b5421b4
AL
203 case 100:
204 if (Capabilities(Message) == false)
205 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
206 break;
0a8a80e5
AL
207
208 // 101 Log
209 case 101:
210 if (Debug == true)
211 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
212 break;
213
214 // 102 Status
215 case 102:
216 Status = LookupTag(Message,"Message");
217 break;
218
219 // 200 URI Start
220 case 200:
c88edf1d
AL
221 {
222 if (Itm == 0)
223 {
93bf083d 224 _error->Error("Method gave invalid 200 URI Start message");
c88edf1d
AL
225 break;
226 }
8267fe24 227
c88edf1d
AL
228 CurrentItem = Itm;
229 CurrentSize = 0;
230 TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
8267fe24 231 Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
c88edf1d 232
8267fe24
AL
233 if (Log != 0)
234 Log->Fetch(*Itm);
235
c88edf1d
AL
236 break;
237 }
0a8a80e5
AL
238
239 // 201 URI Done
240 case 201:
c88edf1d
AL
241 {
242 if (Itm == 0)
243 {
93bf083d 244 _error->Error("Method gave invalid 201 URI Done message");
c88edf1d
AL
245 break;
246 }
247
bfd22fc0 248 pkgAcquire::Item *Owner = Itm->Owner;
8267fe24 249 pkgAcquire::ItemDesc Desc = *Itm;
be4401bf 250 OwnerQ->ItemDone(Itm);
bfd22fc0 251 Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
c88edf1d 252 LookupTag(Message,"MD5-Hash"));
8267fe24
AL
253 ItemDone();
254
255 // Log that we are done
256 if (Log != 0)
257 {
258 if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
259 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
c46824ce
AL
260 {
261 /* Hide 'hits' for local only sources - we also manage to
262 hide gets */
263 if (Config->LocalOnly == false)
264 Log->IMSHit(Desc);
265 }
8267fe24
AL
266 else
267 Log->Done(Desc);
268 }
c88edf1d
AL
269 break;
270 }
0a8a80e5
AL
271
272 // 400 URI Failure
273 case 400:
c88edf1d
AL
274 {
275 if (Itm == 0)
276 {
93bf083d 277 _error->Error("Method gave invalid 400 URI Failure message");
c88edf1d
AL
278 break;
279 }
280
bfd22fc0 281 pkgAcquire::Item *Owner = Itm->Owner;
8267fe24 282 pkgAcquire::ItemDesc Desc = *Itm;
c88edf1d 283 OwnerQ->ItemDone(Itm);
7d8afa39 284 Owner->Failed(Message,Config);
8267fe24 285 ItemDone();
7d8afa39 286
8267fe24
AL
287 if (Log != 0)
288 Log->Fail(Desc);
7d8afa39 289
c88edf1d
AL
290 break;
291 }
0a8a80e5
AL
292
293 // 401 General Failure
294 case 401:
295 _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
296 break;
542ec555
AL
297
298 // 403 Media Change
299 case 403:
300 MediaChange(Message);
301 break;
3b5421b4
AL
302 }
303 }
304 return true;
305}
306 /*}}}*/
307// Worker::Capabilities - 100 Capabilities handler /*{{{*/
308// ---------------------------------------------------------------------
309/* This parses the capabilities message and dumps it into the configuration
310 structure. */
311bool pkgAcquire::Worker::Capabilities(string Message)
312{
313 if (Config == 0)
314 return true;
315
316 Config->Version = LookupTag(Message,"Version");
317 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
0a8a80e5
AL
318 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
319 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
e331f6ed 320 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
3b5421b4
AL
321
322 // Some debug text
323 if (Debug == true)
324 {
325 clog << "Configured access method " << Config->Access << endl;
0a8a80e5 326 clog << "Version:" << Config->Version << " SingleInstance:" <<
a72ace20 327 Config->SingleInstance <<
0a8a80e5
AL
328 " Pipeline:" << Config->Pipeline << " SendConfig:" <<
329 Config->SendConfig << endl;
3b5421b4
AL
330 }
331
542ec555
AL
332 return true;
333}
334 /*}}}*/
335// Worker::MediaChange - Request a media change /*{{{*/
336// ---------------------------------------------------------------------
337/* */
338bool pkgAcquire::Worker::MediaChange(string Message)
339{
340 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
341 LookupTag(Message,"Drive")) == false)
342 {
343 char S[300];
344 sprintf(S,"603 Media Changed\nFailed: true\n\n");
345 if (Debug == true)
346 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
347 OutQueue += S;
348 OutReady = true;
349 return true;
350 }
351
352 char S[300];
353 sprintf(S,"603 Media Changed\n\n");
354 if (Debug == true)
355 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
356 OutQueue += S;
357 OutReady = true;
3b5421b4
AL
358 return true;
359}
0118833a 360 /*}}}*/
0a8a80e5
AL
361// Worker::SendConfiguration - Send the config to the method /*{{{*/
362// ---------------------------------------------------------------------
363/* */
364bool pkgAcquire::Worker::SendConfiguration()
365{
366 if (Config->SendConfig == false)
367 return true;
368
369 if (OutFd == -1)
370 return false;
371
372 string Message = "601 Configuration\n";
373 Message.reserve(2000);
374
375 /* Write out all of the configuration directives by walking the
376 configuration tree */
377 const Configuration::Item *Top = _config->Tree(0);
378 for (; Top != 0;)
379 {
380 if (Top->Value.empty() == false)
381 {
382 string Line = "Config-Item: " + Top->FullTag() + "=";
383 Line += QuoteString(Top->Value,"\n") + '\n';
384 Message += Line;
385 }
386
387 if (Top->Child != 0)
388 {
389 Top = Top->Child;
390 continue;
391 }
392
393 while (Top != 0 && Top->Next == 0)
394 Top = Top->Parent;
395 if (Top != 0)
396 Top = Top->Next;
397 }
398 Message += '\n';
399
400 if (Debug == true)
401 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
402 OutQueue += Message;
403 OutReady = true;
404
405 return true;
406}
407 /*}}}*/
408// Worker::QueueItem - Add an item to the outbound queue /*{{{*/
409// ---------------------------------------------------------------------
410/* Send a URI Acquire message to the method */
411bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
412{
413 if (OutFd == -1)
414 return false;
415
416 string Message = "600 URI Acquire\n";
417 Message.reserve(300);
418 Message += "URI: " + Item->URI;
419 Message += "\nFilename: " + Item->Owner->DestFile;
420 Message += Item->Owner->Custom600Headers();
421 Message += "\n\n";
422
423 if (Debug == true)
424 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
425 OutQueue += Message;
426 OutReady = true;
427
428 return true;
429}
430 /*}}}*/
431// Worker::OutFdRead - Out bound FD is ready /*{{{*/
432// ---------------------------------------------------------------------
433/* */
434bool pkgAcquire::Worker::OutFdReady()
435{
436 int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
437 if (Res <= 0)
438 return MethodFailure();
439
440 // Hmm.. this should never happen.
441 if (Res < 0)
442 return true;
443
444 OutQueue.erase(0,Res);
445 if (OutQueue.empty() == true)
446 OutReady = false;
447
448 return true;
449}
450 /*}}}*/
451// Worker::InFdRead - In bound FD is ready /*{{{*/
452// ---------------------------------------------------------------------
453/* */
454bool pkgAcquire::Worker::InFdReady()
455{
456 if (ReadMessages() == false)
457 return false;
458 RunMessages();
459 return true;
460}
461 /*}}}*/
462// Worker::MethodFailure - Called when the method fails /*{{{*/
463// ---------------------------------------------------------------------
464/* This is called when the method is belived to have failed, probably because
465 read returned -1. */
466bool pkgAcquire::Worker::MethodFailure()
467{
76d97c26
AL
468 _error->Error("Method %s has died unexpectedly!",Access.c_str());
469
0a8a80e5
AL
470 if (waitpid(Process,0,0) != Process)
471 _error->Warning("I waited but nothing was there!");
472 Process = -1;
473 close(InFd);
474 close(OutFd);
475 InFd = -1;
476 OutFd = -1;
477 OutReady = false;
478 InReady = false;
479 OutQueue = string();
480 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
481
482 return false;
483}
484 /*}}}*/
8267fe24
AL
485// Worker::Pulse - Called periodically /*{{{*/
486// ---------------------------------------------------------------------
487/* */
488void pkgAcquire::Worker::Pulse()
489{
490 if (CurrentItem == 0)
491 return;
542ec555 492
8267fe24
AL
493 struct stat Buf;
494 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
495 return;
496 CurrentSize = Buf.st_size;
497}
498 /*}}}*/
499// Worker::ItemDone - Called when the current item is finished /*{{{*/
500// ---------------------------------------------------------------------
501/* */
502void pkgAcquire::Worker::ItemDone()
503{
504 CurrentItem = 0;
505 CurrentSize = 0;
506 TotalSize = 0;
507 Status = string();
508}
509 /*}}}*/