]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire-worker.cc
67bb61207c951f7ea2766b4bc706982dd3b54ead
[apt.git] / apt-pkg / acquire-worker.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire-worker.cc,v 1.14 1998/12/04 21:16:47 jgg Exp $
4 /* ######################################################################
5
6 Acquire Worker
7
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
12 ##################################################################### */
13 /*}}}*/
14 // Include Files /*{{{*/
15 #ifdef __GNUG__
16 #pragma implementation "apt-pkg/acquire-worker.h"
17 #endif
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/configuration.h>
21 #include <apt-pkg/error.h>
22 #include <apt-pkg/fileutl.h>
23 #include <strutl.h>
24
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <signal.h>
28 #include <wait.h>
29 #include <stdio.h>
30 /*}}}*/
31
32 // Worker::Worker - Constructor for Queue startup /*{{{*/
33 // ---------------------------------------------------------------------
34 /* */
35 pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
36 pkgAcquireStatus *Log) : Log(Log)
37 {
38 OwnerQ = Q;
39 Config = Cnf;
40 Access = Cnf->Access;
41 CurrentItem = 0;
42
43 Construct();
44 }
45 /*}}}*/
46 // Worker::Worker - Constructor for method config startup /*{{{*/
47 // ---------------------------------------------------------------------
48 /* */
49 pkgAcquire::Worker::Worker(MethodConfig *Cnf)
50 {
51 OwnerQ = 0;
52 Config = Cnf;
53 Access = Cnf->Access;
54 CurrentItem = 0;
55
56 Construct();
57 }
58 /*}}}*/
59 // Worker::Construct - Constructor helper /*{{{*/
60 // ---------------------------------------------------------------------
61 /* */
62 void pkgAcquire::Worker::Construct()
63 {
64 NextQueue = 0;
65 NextAcquire = 0;
66 Process = -1;
67 InFd = -1;
68 OutFd = -1;
69 OutReady = false;
70 InReady = false;
71 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
72 }
73 /*}}}*/
74 // Worker::~Worker - Destructor /*{{{*/
75 // ---------------------------------------------------------------------
76 /* */
77 pkgAcquire::Worker::~Worker()
78 {
79 close(InFd);
80 close(OutFd);
81
82 if (Process > 0)
83 {
84 kill(Process,SIGINT);
85 if (waitpid(Process,0,0) != Process)
86 _error->Warning("I waited but nothing was there!");
87 }
88 }
89 /*}}}*/
90 // Worker::Start - Start the worker process /*{{{*/
91 // ---------------------------------------------------------------------
92 /* This forks the method and inits the communication channel */
93 bool pkgAcquire::Worker::Start()
94 {
95 // Get the method path
96 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
97 if (FileExists(Method) == false)
98 return _error->Error("The method driver %s could not be found.",Method.c_str());
99
100 if (Debug == true)
101 clog << "Starting method '" << Method << '\'' << endl;
102
103 // Create the pipes
104 int Pipes[4] = {-1,-1,-1,-1};
105 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
106 {
107 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
108 for (int I = 0; I != 4; I++)
109 close(Pipes[I]);
110 return false;
111 }
112 for (int I = 0; I != 4; I++)
113 SetCloseExec(Pipes[0],true);
114
115 // Fork off the process
116 Process = fork();
117 if (Process < 0)
118 {
119 cerr << "FATAL -> Failed to fork." << endl;
120 exit(100);
121 }
122
123 // Spawn the subprocess
124 if (Process == 0)
125 {
126 // Setup the FDs
127 dup2(Pipes[1],STDOUT_FILENO);
128 dup2(Pipes[2],STDIN_FILENO);
129 dup2(((filebuf *)clog.rdbuf())->fd(),STDERR_FILENO);
130 SetCloseExec(STDOUT_FILENO,false);
131 SetCloseExec(STDIN_FILENO,false);
132 SetCloseExec(STDERR_FILENO,false);
133
134 const char *Args[2];
135 Args[0] = Method.c_str();
136 Args[1] = 0;
137 execv(Args[0],(char **)Args);
138 cerr << "Failed to exec method " << Args[0] << endl;
139 exit(100);
140 }
141
142 // Fix up our FDs
143 InFd = Pipes[0];
144 OutFd = Pipes[3];
145 SetNonBlock(Pipes[0],true);
146 SetNonBlock(Pipes[3],true);
147 close(Pipes[1]);
148 close(Pipes[2]);
149 OutReady = false;
150 InReady = true;
151
152 // Read the configuration data
153 if (WaitFd(InFd) == false ||
154 ReadMessages() == false)
155 return _error->Error("Method %s did not start correctly",Method.c_str());
156
157 RunMessages();
158 if (OwnerQ != 0)
159 SendConfiguration();
160
161 return true;
162 }
163 /*}}}*/
164 // Worker::ReadMessages - Read all pending messages into the list /*{{{*/
165 // ---------------------------------------------------------------------
166 /* */
167 bool pkgAcquire::Worker::ReadMessages()
168 {
169 if (::ReadMessages(InFd,MessageQueue) == false)
170 return MethodFailure();
171 return true;
172 }
173 /*}}}*/
174 // Worker::RunMessage - Empty the message queue /*{{{*/
175 // ---------------------------------------------------------------------
176 /* This takes the messages from the message queue and runs them through
177 the parsers in order. */
178 bool pkgAcquire::Worker::RunMessages()
179 {
180 while (MessageQueue.empty() == false)
181 {
182 string Message = MessageQueue.front();
183 MessageQueue.erase(MessageQueue.begin());
184
185 if (Debug == true)
186 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
187
188 // Fetch the message number
189 char *End;
190 int Number = strtol(Message.c_str(),&End,10);
191 if (End == Message.c_str())
192 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
193
194 string URI = LookupTag(Message,"URI");
195 pkgAcquire::Queue::QItem *Itm = 0;
196 if (URI.empty() == false)
197 Itm = OwnerQ->FindItem(URI,this);
198
199 // Determine the message number and dispatch
200 switch (Number)
201 {
202 // 100 Capabilities
203 case 100:
204 if (Capabilities(Message) == false)
205 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
206 break;
207
208 // 101 Log
209 case 101:
210 if (Debug == true)
211 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
212 break;
213
214 // 102 Status
215 case 102:
216 Status = LookupTag(Message,"Message");
217 break;
218
219 // 200 URI Start
220 case 200:
221 {
222 if (Itm == 0)
223 {
224 _error->Error("Method gave invalid 200 URI Start message");
225 break;
226 }
227
228 CurrentItem = Itm;
229 CurrentSize = 0;
230 TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
231 Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
232
233 if (Log != 0)
234 Log->Fetch(*Itm);
235
236 break;
237 }
238
239 // 201 URI Done
240 case 201:
241 {
242 if (Itm == 0)
243 {
244 _error->Error("Method gave invalid 201 URI Done message");
245 break;
246 }
247
248 pkgAcquire::Item *Owner = Itm->Owner;
249 pkgAcquire::ItemDesc Desc = *Itm;
250 OwnerQ->ItemDone(Itm);
251 Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
252 LookupTag(Message,"MD5-Hash"));
253 ItemDone();
254
255 // Log that we are done
256 if (Log != 0)
257 {
258 if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
259 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
260 Log->IMSHit(Desc);
261 else
262 Log->Done(Desc);
263 }
264 break;
265 }
266
267 // 400 URI Failure
268 case 400:
269 {
270 if (Itm == 0)
271 {
272 _error->Error("Method gave invalid 400 URI Failure message");
273 break;
274 }
275
276 pkgAcquire::Item *Owner = Itm->Owner;
277 pkgAcquire::ItemDesc Desc = *Itm;
278 OwnerQ->ItemDone(Itm);
279 Owner->Failed(Message);
280 ItemDone();
281
282 if (Log != 0)
283 Log->Fail(Desc);
284
285 break;
286 }
287
288 // 401 General Failure
289 case 401:
290 _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
291 break;
292
293 // 403 Media Change
294 case 403:
295 MediaChange(Message);
296 break;
297 }
298 }
299 return true;
300 }
301 /*}}}*/
302 // Worker::Capabilities - 100 Capabilities handler /*{{{*/
303 // ---------------------------------------------------------------------
304 /* This parses the capabilities message and dumps it into the configuration
305 structure. */
306 bool pkgAcquire::Worker::Capabilities(string Message)
307 {
308 if (Config == 0)
309 return true;
310
311 Config->Version = LookupTag(Message,"Version");
312 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
313 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
314 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
315 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
316
317 // Some debug text
318 if (Debug == true)
319 {
320 clog << "Configured access method " << Config->Access << endl;
321 clog << "Version:" << Config->Version << " SingleInstance:" <<
322 Config->SingleInstance <<
323 " Pipeline:" << Config->Pipeline << " SendConfig:" <<
324 Config->SendConfig << endl;
325 }
326
327 return true;
328 }
329 /*}}}*/
330 // Worker::MediaChange - Request a media change /*{{{*/
331 // ---------------------------------------------------------------------
332 /* */
333 bool pkgAcquire::Worker::MediaChange(string Message)
334 {
335 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
336 LookupTag(Message,"Drive")) == false)
337 {
338 char S[300];
339 sprintf(S,"603 Media Changed\nFailed: true\n\n");
340 if (Debug == true)
341 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
342 OutQueue += S;
343 OutReady = true;
344 return true;
345 }
346
347 char S[300];
348 sprintf(S,"603 Media Changed\n\n");
349 if (Debug == true)
350 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
351 OutQueue += S;
352 OutReady = true;
353 return true;
354 }
355 /*}}}*/
356 // Worker::SendConfiguration - Send the config to the method /*{{{*/
357 // ---------------------------------------------------------------------
358 /* */
359 bool pkgAcquire::Worker::SendConfiguration()
360 {
361 if (Config->SendConfig == false)
362 return true;
363
364 if (OutFd == -1)
365 return false;
366
367 string Message = "601 Configuration\n";
368 Message.reserve(2000);
369
370 /* Write out all of the configuration directives by walking the
371 configuration tree */
372 const Configuration::Item *Top = _config->Tree(0);
373 for (; Top != 0;)
374 {
375 if (Top->Value.empty() == false)
376 {
377 string Line = "Config-Item: " + Top->FullTag() + "=";
378 Line += QuoteString(Top->Value,"\n") + '\n';
379 Message += Line;
380 }
381
382 if (Top->Child != 0)
383 {
384 Top = Top->Child;
385 continue;
386 }
387
388 while (Top != 0 && Top->Next == 0)
389 Top = Top->Parent;
390 if (Top != 0)
391 Top = Top->Next;
392 }
393 Message += '\n';
394
395 if (Debug == true)
396 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
397 OutQueue += Message;
398 OutReady = true;
399
400 return true;
401 }
402 /*}}}*/
403 // Worker::QueueItem - Add an item to the outbound queue /*{{{*/
404 // ---------------------------------------------------------------------
405 /* Send a URI Acquire message to the method */
406 bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
407 {
408 if (OutFd == -1)
409 return false;
410
411 string Message = "600 URI Acquire\n";
412 Message.reserve(300);
413 Message += "URI: " + Item->URI;
414 Message += "\nFilename: " + Item->Owner->DestFile;
415 Message += Item->Owner->Custom600Headers();
416 Message += "\n\n";
417
418 if (Debug == true)
419 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
420 OutQueue += Message;
421 OutReady = true;
422
423 return true;
424 }
425 /*}}}*/
426 // Worker::OutFdRead - Out bound FD is ready /*{{{*/
427 // ---------------------------------------------------------------------
428 /* */
429 bool pkgAcquire::Worker::OutFdReady()
430 {
431 int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
432 if (Res <= 0)
433 return MethodFailure();
434
435 // Hmm.. this should never happen.
436 if (Res < 0)
437 return true;
438
439 OutQueue.erase(0,Res);
440 if (OutQueue.empty() == true)
441 OutReady = false;
442
443 return true;
444 }
445 /*}}}*/
446 // Worker::InFdRead - In bound FD is ready /*{{{*/
447 // ---------------------------------------------------------------------
448 /* */
449 bool pkgAcquire::Worker::InFdReady()
450 {
451 if (ReadMessages() == false)
452 return false;
453 RunMessages();
454 return true;
455 }
456 /*}}}*/
457 // Worker::MethodFailure - Called when the method fails /*{{{*/
458 // ---------------------------------------------------------------------
459 /* This is called when the method is belived to have failed, probably because
460 read returned -1. */
461 bool pkgAcquire::Worker::MethodFailure()
462 {
463 cerr << "Method " << Access << " has died unexpectedly!" << endl;
464 if (waitpid(Process,0,0) != Process)
465 _error->Warning("I waited but nothing was there!");
466 Process = -1;
467 close(InFd);
468 close(OutFd);
469 InFd = -1;
470 OutFd = -1;
471 OutReady = false;
472 InReady = false;
473 OutQueue = string();
474 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
475
476 return false;
477 }
478 /*}}}*/
479 // Worker::Pulse - Called periodically /*{{{*/
480 // ---------------------------------------------------------------------
481 /* */
482 void pkgAcquire::Worker::Pulse()
483 {
484 if (CurrentItem == 0)
485 return;
486
487 struct stat Buf;
488 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
489 return;
490 CurrentSize = Buf.st_size;
491 }
492 /*}}}*/
493 // Worker::ItemDone - Called when the current item is finished /*{{{*/
494 // ---------------------------------------------------------------------
495 /* */
496 void pkgAcquire::Worker::ItemDone()
497 {
498 CurrentItem = 0;
499 CurrentSize = 0;
500 TotalSize = 0;
501 Status = string();
502 }
503 /*}}}*/