]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
Simplified time calculations
[apt.git] / apt-pkg / acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire-worker.cc,v 1.19 1999/01/30 08:08:54 jgg Exp $
4 /* ######################################################################
5
6 Acquire Worker
7
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
12 ##################################################################### */
13 /*}}}*/
14 // Include Files /*{{{*/
15 #ifdef __GNUG__
16 #pragma implementation "apt-pkg/acquire-worker.h"
17 #endif
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <apt-pkg/strutl.h>
24
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <signal.h>
28 #include <wait.h>
29 #include <stdio.h>
30 /*}}}*/
31
32 // Worker::Worker - Constructor for Queue startup /*{{{*/
33 // ---------------------------------------------------------------------
34 /* */
35 pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
36 pkgAcquireStatus *Log) : Log(Log)
37 {
38 OwnerQ = Q;
39 Config = Cnf;
40 Access = Cnf->Access;
41 CurrentItem = 0;
42
43 Construct();
44 }
45 /*}}}*/
46 // Worker::Worker - Constructor for method config startup /*{{{*/
47 // ---------------------------------------------------------------------
48 /* */
49 pkgAcquire::Worker::Worker(MethodConfig *Cnf)
50 {
51 OwnerQ = 0;
52 Config = Cnf;
53 Access = Cnf->Access;
54 CurrentItem = 0;
55
56 Construct();
57 }
58 /*}}}*/
59 // Worker::Construct - Constructor helper /*{{{*/
60 // ---------------------------------------------------------------------
61 /* */
62 void pkgAcquire::Worker::Construct()
63 {
64 NextQueue = 0;
65 NextAcquire = 0;
66 Process = -1;
67 InFd = -1;
68 OutFd = -1;
69 OutReady = false;
70 InReady = false;
71 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
72 }
73 /*}}}*/
74 // Worker::~Worker - Destructor /*{{{*/
75 // ---------------------------------------------------------------------
76 /* */
77 pkgAcquire::Worker::~Worker()
78 {
79 close(InFd);
80 close(OutFd);
81
82 if (Process > 0)
83 {
84 kill(Process,SIGINT);
85 if (waitpid(Process,0,0) != Process)
86 _error->Warning("I waited but nothing was there!");
87 }
88 }
89 /*}}}*/
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
94 {
95 // Get the method path
96 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
97 if (FileExists(Method) == false)
98 return _error->Error("The method driver %s could not be found.",Method.c_str());
99
100 if (Debug == true)
101 clog << "Starting method '" << Method << '\'' << endl;
102
103 // Create the pipes
104 int Pipes[4] = {-1,-1,-1,-1};
105 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
106 {
107 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
108 for (int I = 0; I != 4; I++)
109 close(Pipes[I]);
110 return false;
111 }
112 for (int I = 0; I != 4; I++)
113 SetCloseExec(Pipes[0],true);
114
115 // Fork off the process
116 Process = fork();
117 if (Process < 0)
118 {
119 cerr << "FATAL -> Failed to fork." << endl;
120 exit(100);
121 }
122
123 // Spawn the subprocess
124 if (Process == 0)
125 {
126 // Setup the FDs
127 dup2(Pipes[1],STDOUT_FILENO);
128 dup2(Pipes[2],STDIN_FILENO);
129 dup2(((filebuf *)clog.rdbuf())->fd(),STDERR_FILENO);
130 SetCloseExec(STDOUT_FILENO,false);
131 SetCloseExec(STDIN_FILENO,false);
132 SetCloseExec(STDERR_FILENO,false);
133
134 const char *Args[2];
135 Args[0] = Method.c_str();
136 Args[1] = 0;
137 execv(Args[0],(char **)Args);
138 cerr << "Failed to exec method " << Args[0] << endl;
139 _exit(100);
140 }
141
142 // Fix up our FDs
143 InFd = Pipes[0];
144 OutFd = Pipes[3];
145 SetNonBlock(Pipes[0],true);
146 SetNonBlock(Pipes[3],true);
147 close(Pipes[1]);
148 close(Pipes[2]);
149 OutReady = false;
150 InReady = true;
151
152 // Read the configuration data
153 if (WaitFd(InFd) == false ||
154 ReadMessages() == false)
155 return _error->Error("Method %s did not start correctly",Method.c_str());
156
157 RunMessages();
158 if (OwnerQ != 0)
159 SendConfiguration();
160
161 return true;
162 }
163 /*}}}*/
164 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
165 // ---------------------------------------------------------------------
166 /* */
167 bool pkgAcquire::Worker::ReadMessages()
168 {
169 if (::ReadMessages(InFd,MessageQueue) == false)
170 return MethodFailure();
171 return true;
172 }
173 /*}}}*/
174 // Worker::RunMessage - Empty the message queue /*{{{*/
175 // ---------------------------------------------------------------------
176 /* This takes the messages from the message queue and runs them through
177 the parsers in order. */
178 bool pkgAcquire::Worker::RunMessages()
179 {
180 while (MessageQueue.empty() == false)
181 {
182 string Message = MessageQueue.front();
183 MessageQueue.erase(MessageQueue.begin());
184
185 if (Debug == true)
186 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
187
188 // Fetch the message number
189 char *End;
190 int Number = strtol(Message.c_str(),&End,10);
191 if (End == Message.c_str())
192 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
193
194 string URI = LookupTag(Message,"URI");
195 pkgAcquire::Queue::QItem *Itm = 0;
196 if (URI.empty() == false)
197 Itm = OwnerQ->FindItem(URI,this);
198
199 // Determine the message number and dispatch
200 switch (Number)
201 {
202 // 100 Capabilities
203 case 100:
204 if (Capabilities(Message) == false)
205 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
206 break;
207
208 // 101 Log
209 case 101:
210 if (Debug == true)
211 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
212 break;
213
214 // 102 Status
215 case 102:
216 Status = LookupTag(Message,"Message");
217 break;
218
219 // 200 URI Start
220 case 200:
221 {
222 if (Itm == 0)
223 {
224 _error->Error("Method gave invalid 200 URI Start message");
225 break;
226 }
227
228 CurrentItem = Itm;
229 CurrentSize = 0;
230 TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
231 Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
232
233 if (Log != 0)
234 Log->Fetch(*Itm);
235
236 break;
237 }
238
239 // 201 URI Done
240 case 201:
241 {
242 if (Itm == 0)
243 {
244 _error->Error("Method gave invalid 201 URI Done message");
245 break;
246 }
247
248 pkgAcquire::Item *Owner = Itm->Owner;
249 pkgAcquire::ItemDesc Desc = *Itm;
250 OwnerQ->ItemDone(Itm);
251 Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
252 LookupTag(Message,"MD5-Hash"));
253 ItemDone();
254
255 // Log that we are done
256 if (Log != 0)
257 {
258 if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
259 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
260 {
261 /* Hide 'hits' for local only sources - we also manage to
262 hide gets */
263 if (Config->LocalOnly == false)
264 Log->IMSHit(Desc);
265 }
266 else
267 Log->Done(Desc);
268 }
269 break;
270 }
271
272 // 400 URI Failure
273 case 400:
274 {
275 if (Itm == 0)
276 {
277 _error->Error("Method gave invalid 400 URI Failure message");
278 break;
279 }
280
281 pkgAcquire::Item *Owner = Itm->Owner;
282 pkgAcquire::ItemDesc Desc = *Itm;
283 OwnerQ->ItemDone(Itm);
284 Owner->Failed(Message,Config);
285 ItemDone();
286
287 if (Log != 0)
288 Log->Fail(Desc);
289
290 break;
291 }
292
293 // 401 General Failure
294 case 401:
295 _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
296 break;
297
298 // 403 Media Change
299 case 403:
300 MediaChange(Message);
301 break;
302 }
303 }
304 return true;
305 }
306 /*}}}*/
307 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
308 // ---------------------------------------------------------------------
309 /* This parses the capabilities message and dumps it into the configuration
310 structure. */
311 bool pkgAcquire::Worker::Capabilities(string Message)
312 {
313 if (Config == 0)
314 return true;
315
316 Config->Version = LookupTag(Message,"Version");
317 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
318 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
319 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
320 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
321
322 // Some debug text
323 if (Debug == true)
324 {
325 clog << "Configured access method " << Config->Access << endl;
326 clog << "Version:" << Config->Version << " SingleInstance:" <<
327 Config->SingleInstance <<
328 " Pipeline:" << Config->Pipeline << " SendConfig:" <<
329 Config->SendConfig << endl;
330 }
331
332 return true;
333 }
334 /*}}}*/
335 // Worker::MediaChange - Request a media change /*{{{*/
336 // ---------------------------------------------------------------------
337 /* */
338 bool pkgAcquire::Worker::MediaChange(string Message)
339 {
340 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
341 LookupTag(Message,"Drive")) == false)
342 {
343 char S[300];
344 sprintf(S,"603 Media Changed\nFailed: true\n\n");
345 if (Debug == true)
346 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
347 OutQueue += S;
348 OutReady = true;
349 return true;
350 }
351
352 char S[300];
353 sprintf(S,"603 Media Changed\n\n");
354 if (Debug == true)
355 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
356 OutQueue += S;
357 OutReady = true;
358 return true;
359 }
360 /*}}}*/
361 // Worker::SendConfiguration - Send the config to the method /*{{{*/
362 // ---------------------------------------------------------------------
363 /* */
364 bool pkgAcquire::Worker::SendConfiguration()
365 {
366 if (Config->SendConfig == false)
367 return true;
368
369 if (OutFd == -1)
370 return false;
371
372 string Message = "601 Configuration\n";
373 Message.reserve(2000);
374
375 /* Write out all of the configuration directives by walking the
376 configuration tree */
377 const Configuration::Item *Top = _config->Tree(0);
378 for (; Top != 0;)
379 {
380 if (Top->Value.empty() == false)
381 {
382 string Line = "Config-Item: " + Top->FullTag() + "=";
383 Line += QuoteString(Top->Value,"\n") + '\n';
384 Message += Line;
385 }
386
387 if (Top->Child != 0)
388 {
389 Top = Top->Child;
390 continue;
391 }
392
393 while (Top != 0 && Top->Next == 0)
394 Top = Top->Parent;
395 if (Top != 0)
396 Top = Top->Next;
397 }
398 Message += '\n';
399
400 if (Debug == true)
401 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
402 OutQueue += Message;
403 OutReady = true;
404
405 return true;
406 }
407 /*}}}*/
408 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
409 // ---------------------------------------------------------------------
410 /* Send a URI Acquire message to the method */
411 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
412 {
413 if (OutFd == -1)
414 return false;
415
416 string Message = "600 URI Acquire\n";
417 Message.reserve(300);
418 Message += "URI: " + Item->URI;
419 Message += "\nFilename: " + Item->Owner->DestFile;
420 Message += Item->Owner->Custom600Headers();
421 Message += "\n\n";
422
423 if (Debug == true)
424 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
425 OutQueue += Message;
426 OutReady = true;
427
428 return true;
429 }
430 /*}}}*/
431 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
432 // ---------------------------------------------------------------------
433 /* */
434 bool pkgAcquire::Worker::OutFdReady()
435 {
436 int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
437 if (Res <= 0)
438 return MethodFailure();
439
440 // Hmm.. this should never happen.
441 if (Res < 0)
442 return true;
443
444 OutQueue.erase(0,Res);
445 if (OutQueue.empty() == true)
446 OutReady = false;
447
448 return true;
449 }
450 /*}}}*/
451 // Worker::InFdRead - In bound FD is ready /*{{{*/
452 // ---------------------------------------------------------------------
453 /* */
454 bool pkgAcquire::Worker::InFdReady()
455 {
456 if (ReadMessages() == false)
457 return false;
458 RunMessages();
459 return true;
460 }
461 /*}}}*/
462 // Worker::MethodFailure - Called when the method fails /*{{{*/
463 // ---------------------------------------------------------------------
464 /* This is called when the method is belived to have failed, probably because
465 read returned -1. */
466 bool pkgAcquire::Worker::MethodFailure()
467 {
468 _error->Error("Method %s has died unexpectedly!",Access.c_str());
469
470 if (waitpid(Process,0,0) != Process)
471 _error->Warning("I waited but nothing was there!");
472 Process = -1;
473 close(InFd);
474 close(OutFd);
475 InFd = -1;
476 OutFd = -1;
477 OutReady = false;
478 InReady = false;
479 OutQueue = string();
480 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
481
482 return false;
483 }
484 /*}}}*/
485 // Worker::Pulse - Called periodically /*{{{*/
486 // ---------------------------------------------------------------------
487 /* */
488 void pkgAcquire::Worker::Pulse()
489 {
490 if (CurrentItem == 0)
491 return;
492
493 struct stat Buf;
494 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
495 return;
496 CurrentSize = Buf.st_size;
497 }
498 /*}}}*/
499 // Worker::ItemDone - Called when the current item is finished /*{{{*/
500 // ---------------------------------------------------------------------
501 /* */
502 void pkgAcquire::Worker::ItemDone()
503 {
504 CurrentItem = 0;
505 CurrentSize = 0;
506 TotalSize = 0;
507 Status = string();
508 }
509 /*}}}*/