]> git.saurik.com Git - apt.git/blame_incremental - apt-pkg/acquire-worker.cc
Doc fixes and copy method patch
[apt.git] / apt-pkg / acquire-worker.cc
... / ...
CommitLineData
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
3// $Id: acquire-worker.cc,v 1.16 1998/12/14 06:54:41 jgg Exp $
4/* ######################################################################
5
6 Acquire Worker
7
8 The worker process can startup either as a Configuration prober
9 or as a queue runner. As a configuration prober it only reads the
10 configuration message and
11
12 ##################################################################### */
13 /*}}}*/
14// Include Files /*{{{*/
15#ifdef __GNUG__
16#pragma implementation "apt-pkg/acquire-worker.h"
17#endif
18#include <apt-pkg/acquire-worker.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/configuration.h>
21#include <apt-pkg/error.h>
22#include <apt-pkg/fileutl.h>
23#include <strutl.h>
24
25#include <sys/stat.h>
26#include <unistd.h>
27#include <signal.h>
28#include <wait.h>
29#include <stdio.h>
30 /*}}}*/
31
32// Worker::Worker - Constructor for Queue startup /*{{{*/
33// ---------------------------------------------------------------------
34/* */
35pkgAcquire::Worker::Worker(Queue *Q,MethodConfig *Cnf,
36 pkgAcquireStatus *Log) : Log(Log)
37{
38 OwnerQ = Q;
39 Config = Cnf;
40 Access = Cnf->Access;
41 CurrentItem = 0;
42
43 Construct();
44}
45 /*}}}*/
46// Worker::Worker - Constructor for method config startup /*{{{*/
47// ---------------------------------------------------------------------
48/* */
49pkgAcquire::Worker::Worker(MethodConfig *Cnf)
50{
51 OwnerQ = 0;
52 Config = Cnf;
53 Access = Cnf->Access;
54 CurrentItem = 0;
55
56 Construct();
57}
58 /*}}}*/
59// Worker::Construct - Constructor helper /*{{{*/
60// ---------------------------------------------------------------------
61/* */
62void pkgAcquire::Worker::Construct()
63{
64 NextQueue = 0;
65 NextAcquire = 0;
66 Process = -1;
67 InFd = -1;
68 OutFd = -1;
69 OutReady = false;
70 InReady = false;
71 Debug = _config->FindB("Debug::pkgAcquire::Worker",false);
72}
73 /*}}}*/
74// Worker::~Worker - Destructor /*{{{*/
75// ---------------------------------------------------------------------
76/* */
77pkgAcquire::Worker::~Worker()
78{
79 close(InFd);
80 close(OutFd);
81
82 if (Process > 0)
83 {
84 kill(Process,SIGINT);
85 if (waitpid(Process,0,0) != Process)
86 _error->Warning("I waited but nothing was there!");
87 }
88}
89 /*}}}*/
90// Worker::Start - Start the worker process /*{{{*/
91// ---------------------------------------------------------------------
92/* This forks the method and inits the communication channel */
93bool pkgAcquire::Worker::Start()
94{
95 // Get the method path
96 string Method = _config->FindDir("Dir::Bin::Methods") + Access;
97 if (FileExists(Method) == false)
98 return _error->Error("The method driver %s could not be found.",Method.c_str());
99
100 if (Debug == true)
101 clog << "Starting method '" << Method << '\'' << endl;
102
103 // Create the pipes
104 int Pipes[4] = {-1,-1,-1,-1};
105 if (pipe(Pipes) != 0 || pipe(Pipes+2) != 0)
106 {
107 _error->Errno("pipe","Failed to create IPC pipe to subprocess");
108 for (int I = 0; I != 4; I++)
109 close(Pipes[I]);
110 return false;
111 }
112 for (int I = 0; I != 4; I++)
113 SetCloseExec(Pipes[0],true);
114
115 // Fork off the process
116 Process = fork();
117 if (Process < 0)
118 {
119 cerr << "FATAL -> Failed to fork." << endl;
120 exit(100);
121 }
122
123 // Spawn the subprocess
124 if (Process == 0)
125 {
126 // Setup the FDs
127 dup2(Pipes[1],STDOUT_FILENO);
128 dup2(Pipes[2],STDIN_FILENO);
129 dup2(((filebuf *)clog.rdbuf())->fd(),STDERR_FILENO);
130 SetCloseExec(STDOUT_FILENO,false);
131 SetCloseExec(STDIN_FILENO,false);
132 SetCloseExec(STDERR_FILENO,false);
133
134 const char *Args[2];
135 Args[0] = Method.c_str();
136 Args[1] = 0;
137 execv(Args[0],(char **)Args);
138 cerr << "Failed to exec method " << Args[0] << endl;
139 _exit(100);
140 }
141
142 // Fix up our FDs
143 InFd = Pipes[0];
144 OutFd = Pipes[3];
145 SetNonBlock(Pipes[0],true);
146 SetNonBlock(Pipes[3],true);
147 close(Pipes[1]);
148 close(Pipes[2]);
149 OutReady = false;
150 InReady = true;
151
152 // Read the configuration data
153 if (WaitFd(InFd) == false ||
154 ReadMessages() == false)
155 return _error->Error("Method %s did not start correctly",Method.c_str());
156
157 RunMessages();
158 if (OwnerQ != 0)
159 SendConfiguration();
160
161 return true;
162}
163 /*}}}*/
164// Worker::ReadMessages - Read all pending messages into the list /*{{{*/
165// ---------------------------------------------------------------------
166/* */
167bool pkgAcquire::Worker::ReadMessages()
168{
169 if (::ReadMessages(InFd,MessageQueue) == false)
170 return MethodFailure();
171 return true;
172}
173 /*}}}*/
174// Worker::RunMessage - Empty the message queue /*{{{*/
175// ---------------------------------------------------------------------
176/* This takes the messages from the message queue and runs them through
177 the parsers in order. */
178bool pkgAcquire::Worker::RunMessages()
179{
180 while (MessageQueue.empty() == false)
181 {
182 string Message = MessageQueue.front();
183 MessageQueue.erase(MessageQueue.begin());
184
185 if (Debug == true)
186 clog << " <- " << Access << ':' << QuoteString(Message,"\n") << endl;
187
188 // Fetch the message number
189 char *End;
190 int Number = strtol(Message.c_str(),&End,10);
191 if (End == Message.c_str())
192 return _error->Error("Invalid message from method %s: %s",Access.c_str(),Message.c_str());
193
194 string URI = LookupTag(Message,"URI");
195 pkgAcquire::Queue::QItem *Itm = 0;
196 if (URI.empty() == false)
197 Itm = OwnerQ->FindItem(URI,this);
198
199 // Determine the message number and dispatch
200 switch (Number)
201 {
202 // 100 Capabilities
203 case 100:
204 if (Capabilities(Message) == false)
205 return _error->Error("Unable to process Capabilities message from %s",Access.c_str());
206 break;
207
208 // 101 Log
209 case 101:
210 if (Debug == true)
211 clog << " <- (log) " << LookupTag(Message,"Message") << endl;
212 break;
213
214 // 102 Status
215 case 102:
216 Status = LookupTag(Message,"Message");
217 break;
218
219 // 200 URI Start
220 case 200:
221 {
222 if (Itm == 0)
223 {
224 _error->Error("Method gave invalid 200 URI Start message");
225 break;
226 }
227
228 CurrentItem = Itm;
229 CurrentSize = 0;
230 TotalSize = atoi(LookupTag(Message,"Size","0").c_str());
231 Itm->Owner->Start(Message,atoi(LookupTag(Message,"Size","0").c_str()));
232
233 if (Log != 0)
234 Log->Fetch(*Itm);
235
236 break;
237 }
238
239 // 201 URI Done
240 case 201:
241 {
242 if (Itm == 0)
243 {
244 _error->Error("Method gave invalid 201 URI Done message");
245 break;
246 }
247
248 pkgAcquire::Item *Owner = Itm->Owner;
249 pkgAcquire::ItemDesc Desc = *Itm;
250 OwnerQ->ItemDone(Itm);
251 Owner->Done(Message,atoi(LookupTag(Message,"Size","0").c_str()),
252 LookupTag(Message,"MD5-Hash"));
253 ItemDone();
254
255 // Log that we are done
256 if (Log != 0)
257 {
258 if (StringToBool(LookupTag(Message,"IMS-Hit"),false) == true ||
259 StringToBool(LookupTag(Message,"Alt-IMS-Hit"),false) == true)
260 Log->IMSHit(Desc);
261 else
262 Log->Done(Desc);
263 }
264 break;
265 }
266
267 // 400 URI Failure
268 case 400:
269 {
270 if (Itm == 0)
271 {
272 _error->Error("Method gave invalid 400 URI Failure message");
273 break;
274 }
275
276 pkgAcquire::Item *Owner = Itm->Owner;
277 pkgAcquire::ItemDesc Desc = *Itm;
278 OwnerQ->ItemDone(Itm);
279 Owner->Failed(Message);
280 ItemDone();
281
282 if (Log != 0)
283 Log->Fail(Desc);
284
285 break;
286 }
287
288 // 401 General Failure
289 case 401:
290 _error->Error("Method %s General failure: %s",LookupTag(Message,"Message").c_str());
291 break;
292
293 // 403 Media Change
294 case 403:
295 MediaChange(Message);
296 break;
297 }
298 }
299 return true;
300}
301 /*}}}*/
302// Worker::Capabilities - 100 Capabilities handler /*{{{*/
303// ---------------------------------------------------------------------
304/* This parses the capabilities message and dumps it into the configuration
305 structure. */
306bool pkgAcquire::Worker::Capabilities(string Message)
307{
308 if (Config == 0)
309 return true;
310
311 Config->Version = LookupTag(Message,"Version");
312 Config->SingleInstance = StringToBool(LookupTag(Message,"Single-Instance"),false);
313 Config->Pipeline = StringToBool(LookupTag(Message,"Pipeline"),false);
314 Config->SendConfig = StringToBool(LookupTag(Message,"Send-Config"),false);
315 Config->LocalOnly = StringToBool(LookupTag(Message,"Local-Only"),false);
316
317 // Some debug text
318 if (Debug == true)
319 {
320 clog << "Configured access method " << Config->Access << endl;
321 clog << "Version:" << Config->Version << " SingleInstance:" <<
322 Config->SingleInstance <<
323 " Pipeline:" << Config->Pipeline << " SendConfig:" <<
324 Config->SendConfig << endl;
325 }
326
327 return true;
328}
329 /*}}}*/
330// Worker::MediaChange - Request a media change /*{{{*/
331// ---------------------------------------------------------------------
332/* */
333bool pkgAcquire::Worker::MediaChange(string Message)
334{
335 if (Log == 0 || Log->MediaChange(LookupTag(Message,"Media"),
336 LookupTag(Message,"Drive")) == false)
337 {
338 char S[300];
339 sprintf(S,"603 Media Changed\nFailed: true\n\n");
340 if (Debug == true)
341 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
342 OutQueue += S;
343 OutReady = true;
344 return true;
345 }
346
347 char S[300];
348 sprintf(S,"603 Media Changed\n\n");
349 if (Debug == true)
350 clog << " -> " << Access << ':' << QuoteString(S,"\n") << endl;
351 OutQueue += S;
352 OutReady = true;
353 return true;
354}
355 /*}}}*/
356// Worker::SendConfiguration - Send the config to the method /*{{{*/
357// ---------------------------------------------------------------------
358/* */
359bool pkgAcquire::Worker::SendConfiguration()
360{
361 if (Config->SendConfig == false)
362 return true;
363
364 if (OutFd == -1)
365 return false;
366
367 string Message = "601 Configuration\n";
368 Message.reserve(2000);
369
370 /* Write out all of the configuration directives by walking the
371 configuration tree */
372 const Configuration::Item *Top = _config->Tree(0);
373 for (; Top != 0;)
374 {
375 if (Top->Value.empty() == false)
376 {
377 string Line = "Config-Item: " + Top->FullTag() + "=";
378 Line += QuoteString(Top->Value,"\n") + '\n';
379 Message += Line;
380 }
381
382 if (Top->Child != 0)
383 {
384 Top = Top->Child;
385 continue;
386 }
387
388 while (Top != 0 && Top->Next == 0)
389 Top = Top->Parent;
390 if (Top != 0)
391 Top = Top->Next;
392 }
393 Message += '\n';
394
395 if (Debug == true)
396 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
397 OutQueue += Message;
398 OutReady = true;
399
400 return true;
401}
402 /*}}}*/
403// Worker::QueueItem - Add an item to the outbound queue /*{{{*/
404// ---------------------------------------------------------------------
405/* Send a URI Acquire message to the method */
406bool pkgAcquire::Worker::QueueItem(pkgAcquire::Queue::QItem *Item)
407{
408 if (OutFd == -1)
409 return false;
410
411 string Message = "600 URI Acquire\n";
412 Message.reserve(300);
413 Message += "URI: " + Item->URI;
414 Message += "\nFilename: " + Item->Owner->DestFile;
415 Message += Item->Owner->Custom600Headers();
416 Message += "\n\n";
417
418 if (Debug == true)
419 clog << " -> " << Access << ':' << QuoteString(Message,"\n") << endl;
420 OutQueue += Message;
421 OutReady = true;
422
423 return true;
424}
425 /*}}}*/
426// Worker::OutFdRead - Out bound FD is ready /*{{{*/
427// ---------------------------------------------------------------------
428/* */
429bool pkgAcquire::Worker::OutFdReady()
430{
431 int Res = write(OutFd,OutQueue.begin(),OutQueue.length());
432 if (Res <= 0)
433 return MethodFailure();
434
435 // Hmm.. this should never happen.
436 if (Res < 0)
437 return true;
438
439 OutQueue.erase(0,Res);
440 if (OutQueue.empty() == true)
441 OutReady = false;
442
443 return true;
444}
445 /*}}}*/
446// Worker::InFdRead - In bound FD is ready /*{{{*/
447// ---------------------------------------------------------------------
448/* */
449bool pkgAcquire::Worker::InFdReady()
450{
451 if (ReadMessages() == false)
452 return false;
453 RunMessages();
454 return true;
455}
456 /*}}}*/
457// Worker::MethodFailure - Called when the method fails /*{{{*/
458// ---------------------------------------------------------------------
459/* This is called when the method is belived to have failed, probably because
460 read returned -1. */
461bool pkgAcquire::Worker::MethodFailure()
462{
463 _error->Error("Method %s has died unexpectedly!",Access.c_str());
464
465 if (waitpid(Process,0,0) != Process)
466 _error->Warning("I waited but nothing was there!");
467 Process = -1;
468 close(InFd);
469 close(OutFd);
470 InFd = -1;
471 OutFd = -1;
472 OutReady = false;
473 InReady = false;
474 OutQueue = string();
475 MessageQueue.erase(MessageQueue.begin(),MessageQueue.end());
476
477 return false;
478}
479 /*}}}*/
480// Worker::Pulse - Called periodically /*{{{*/
481// ---------------------------------------------------------------------
482/* */
483void pkgAcquire::Worker::Pulse()
484{
485 if (CurrentItem == 0)
486 return;
487
488 struct stat Buf;
489 if (stat(CurrentItem->Owner->DestFile.c_str(),&Buf) != 0)
490 return;
491 CurrentSize = Buf.st_size;
492}
493 /*}}}*/
494// Worker::ItemDone - Called when the current item is finished /*{{{*/
495// ---------------------------------------------------------------------
496/* */
497void pkgAcquire::Worker::ItemDone()
498{
499 CurrentItem = 0;
500 CurrentSize = 0;
501 TotalSize = 0;
502 Status = string();
503}
504 /*}}}*/