]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Merge remote-tracking branch 'mvo/feature/acq-trans' into debian/experimental
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <iomanip>
31
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <pwd.h>
37 #include <grp.h>
38 #include <dirent.h>
39 #include <sys/time.h>
40 #include <sys/select.h>
41 #include <errno.h>
42 #include <sys/stat.h>
43 #include <sys/types.h>
44
45 #include <apti18n.h>
46 /*}}}*/
47
48 using namespace std;
49
50 // Acquire::pkgAcquire - Constructor /*{{{*/
51 // ---------------------------------------------------------------------
52 /* We grab some runtime state from the configuration space */
53 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
55 Running(false)
56 {
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 }
63 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
64 Configs(0), Log(NULL), ToFetch(0),
65 Debug(_config->FindB("Debug::pkgAcquire",false)),
66 Running(false)
67 {
68 string const Mode = _config->Find("Acquire::Queue-Mode","host");
69 if (strcasecmp(Mode.c_str(),"host") == 0)
70 QueueMode = QueueHost;
71 if (strcasecmp(Mode.c_str(),"access") == 0)
72 QueueMode = QueueAccess;
73 SetLog(Progress);
74 }
75 /*}}}*/
76 // Acquire::GetLock - lock directory and prepare for action /*{{{*/
77 static bool SetupAPTPartialDirectory(std::string const &grand, std::string const &parent)
78 {
79 std::string const partial = parent + "partial";
80 if (CreateAPTDirectoryIfNeeded(grand, partial) == false &&
81 CreateAPTDirectoryIfNeeded(parent, partial) == false)
82 return false;
83
84 if (getuid() == 0) // if we aren't root, we can't chown, so don't try it
85 {
86 struct passwd *pw = getpwnam("_apt");
87 struct group *gr = getgrnam("root");
88 if (pw != NULL && gr != NULL && chown(partial.c_str(), pw->pw_uid, gr->gr_gid) != 0)
89 _error->WarningE("SetupAPTPartialDirectory", "chown to _apt:root of directory %s failed", partial.c_str());
90 }
91 if (chmod(partial.c_str(), 0700) != 0)
92 _error->WarningE("SetupAPTPartialDirectory", "chmod 0700 of directory %s failed", partial.c_str());
93
94 return true;
95 }
96 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
97 {
98 Log = Progress;
99 if (Lock.empty())
100 {
101 string const listDir = _config->FindDir("Dir::State::lists");
102 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
103 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
104 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
105 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
106 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
107 return true;
108 }
109 return GetLock(Lock);
110 }
111 bool pkgAcquire::GetLock(std::string const &Lock)
112 {
113 if (Lock.empty() == true)
114 return false;
115
116 // check for existence and possibly create auxiliary directories
117 string const listDir = _config->FindDir("Dir::State::lists");
118 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
119
120 if (Lock == listDir)
121 {
122 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
123 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
124 }
125 if (Lock == archivesDir)
126 {
127 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
128 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
129 }
130
131 if (_config->FindB("Debug::NoLocking", false) == true)
132 return true;
133
134 // Lock the directory this acquire object will work in
135 LockFD = ::GetLock(flCombine(Lock, "lock"));
136 if (LockFD == -1)
137 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
138
139 return true;
140 }
141 /*}}}*/
142 // Acquire::~pkgAcquire - Destructor /*{{{*/
143 // ---------------------------------------------------------------------
144 /* Free our memory, clean up the queues (destroy the workers) */
145 pkgAcquire::~pkgAcquire()
146 {
147 Shutdown();
148
149 if (LockFD != -1)
150 close(LockFD);
151
152 while (Configs != 0)
153 {
154 MethodConfig *Jnk = Configs;
155 Configs = Configs->Next;
156 delete Jnk;
157 }
158 }
159 /*}}}*/
160 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
161 // ---------------------------------------------------------------------
162 /* */
163 void pkgAcquire::Shutdown()
164 {
165 while (Items.empty() == false)
166 {
167 if (Items[0]->Status == Item::StatFetching)
168 Items[0]->Status = Item::StatError;
169 delete Items[0];
170 }
171
172 while (Queues != 0)
173 {
174 Queue *Jnk = Queues;
175 Queues = Queues->Next;
176 delete Jnk;
177 }
178 }
179 /*}}}*/
180 // Acquire::Add - Add a new item /*{{{*/
181 // ---------------------------------------------------------------------
182 /* This puts an item on the acquire list. This list is mainly for tracking
183 item status */
184 void pkgAcquire::Add(Item *Itm)
185 {
186 Items.push_back(Itm);
187 }
188 /*}}}*/
189 // Acquire::Remove - Remove a item /*{{{*/
190 // ---------------------------------------------------------------------
191 /* Remove an item from the acquire list. This is usually not used.. */
192 void pkgAcquire::Remove(Item *Itm)
193 {
194 Dequeue(Itm);
195
196 for (ItemIterator I = Items.begin(); I != Items.end();)
197 {
198 if (*I == Itm)
199 {
200 Items.erase(I);
201 I = Items.begin();
202 }
203 else
204 ++I;
205 }
206 }
207 /*}}}*/
208 // Acquire::Add - Add a worker /*{{{*/
209 // ---------------------------------------------------------------------
210 /* A list of workers is kept so that the select loop can direct their FD
211 usage. */
212 void pkgAcquire::Add(Worker *Work)
213 {
214 Work->NextAcquire = Workers;
215 Workers = Work;
216 }
217 /*}}}*/
218 // Acquire::Remove - Remove a worker /*{{{*/
219 // ---------------------------------------------------------------------
220 /* A worker has died. This can not be done while the select loop is running
221 as it would require that RunFds could handling a changing list state and
222 it can't.. */
223 void pkgAcquire::Remove(Worker *Work)
224 {
225 if (Running == true)
226 abort();
227
228 Worker **I = &Workers;
229 for (; *I != 0;)
230 {
231 if (*I == Work)
232 *I = (*I)->NextAcquire;
233 else
234 I = &(*I)->NextAcquire;
235 }
236 }
237 /*}}}*/
238 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
239 // ---------------------------------------------------------------------
240 /* This is the entry point for an item. An item calls this function when
241 it is constructed which creates a queue (based on the current queue
242 mode) and puts the item in that queue. If the system is running then
243 the queue might be started. */
244 void pkgAcquire::Enqueue(ItemDesc &Item)
245 {
246 // Determine which queue to put the item in
247 const MethodConfig *Config;
248 string Name = QueueName(Item.URI,Config);
249 if (Name.empty() == true)
250 return;
251
252 // Find the queue structure
253 Queue *I = Queues;
254 for (; I != 0 && I->Name != Name; I = I->Next);
255 if (I == 0)
256 {
257 I = new Queue(Name,this);
258 I->Next = Queues;
259 Queues = I;
260
261 if (Running == true)
262 I->Startup();
263 }
264
265 // See if this is a local only URI
266 if (Config->LocalOnly == true && Item.Owner->Complete == false)
267 Item.Owner->Local = true;
268 Item.Owner->Status = Item::StatIdle;
269
270 // Queue it into the named queue
271 if(I->Enqueue(Item))
272 ToFetch++;
273
274 // Some trace stuff
275 if (Debug == true)
276 {
277 clog << "Fetching " << Item.URI << endl;
278 clog << " to " << Item.Owner->DestFile << endl;
279 clog << " Queue is: " << Name << endl;
280 }
281 }
282 /*}}}*/
283 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
284 // ---------------------------------------------------------------------
285 /* This is called when an item is finished being fetched. It removes it
286 from all the queues */
287 void pkgAcquire::Dequeue(Item *Itm)
288 {
289 Queue *I = Queues;
290 bool Res = false;
291 if (Debug == true)
292 clog << "Dequeuing " << Itm->DestFile << endl;
293
294 for (; I != 0; I = I->Next)
295 {
296 if (I->Dequeue(Itm))
297 {
298 Res = true;
299 if (Debug == true)
300 clog << "Dequeued from " << I->Name << endl;
301 }
302 }
303
304 if (Res == true)
305 ToFetch--;
306 }
307 /*}}}*/
308 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
309 // ---------------------------------------------------------------------
310 /* The string returned depends on the configuration settings and the
311 method parameters. Given something like http://foo.org/bar it can
312 return http://foo.org or http */
313 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
314 {
315 URI U(Uri);
316
317 Config = GetConfig(U.Access);
318 if (Config == 0)
319 return string();
320
321 /* Single-Instance methods get exactly one queue per URI. This is
322 also used for the Access queue method */
323 if (Config->SingleInstance == true || QueueMode == QueueAccess)
324 return U.Access;
325
326 string AccessSchema = U.Access + ':',
327 FullQueueName = AccessSchema + U.Host;
328 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
329
330 Queue *I = Queues;
331 for (; I != 0; I = I->Next) {
332 // if the queue already exists, re-use it
333 if (I->Name == FullQueueName)
334 return FullQueueName;
335
336 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
337 Instances++;
338 }
339
340 if (Debug) {
341 clog << "Found " << Instances << " instances of " << U.Access << endl;
342 }
343
344 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
345 return U.Access;
346
347 return FullQueueName;
348 }
349 /*}}}*/
350 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
351 // ---------------------------------------------------------------------
352 /* This locates the configuration structure for an access method. If
353 a config structure cannot be found a Worker will be created to
354 retrieve it */
355 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
356 {
357 // Search for an existing config
358 MethodConfig *Conf;
359 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
360 if (Conf->Access == Access)
361 return Conf;
362
363 // Create the new config class
364 Conf = new MethodConfig;
365 Conf->Access = Access;
366 Conf->Next = Configs;
367 Configs = Conf;
368
369 // Create the worker to fetch the configuration
370 Worker Work(Conf);
371 if (Work.Start() == false)
372 return 0;
373
374 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
375 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
376 Conf->SingleInstance = true;
377
378 return Conf;
379 }
380 /*}}}*/
381 // Acquire::SetFds - Deal with readable FDs /*{{{*/
382 // ---------------------------------------------------------------------
383 /* Collect FDs that have activity monitors into the fd sets */
384 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
385 {
386 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
387 {
388 if (I->InReady == true && I->InFd >= 0)
389 {
390 if (Fd < I->InFd)
391 Fd = I->InFd;
392 FD_SET(I->InFd,RSet);
393 }
394 if (I->OutReady == true && I->OutFd >= 0)
395 {
396 if (Fd < I->OutFd)
397 Fd = I->OutFd;
398 FD_SET(I->OutFd,WSet);
399 }
400 }
401 }
402 /*}}}*/
403 // Acquire::RunFds - Deal with active FDs /*{{{*/
404 // ---------------------------------------------------------------------
405 /* Dispatch active FDs over to the proper workers. It is very important
406 that a worker never be erased while this is running! The queue class
407 should never erase a worker except during shutdown processing. */
408 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
409 {
410 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
411 {
412 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
413 I->InFdReady();
414 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
415 I->OutFdReady();
416 }
417 }
418 /*}}}*/
419 // Acquire::Run - Run the fetch sequence /*{{{*/
420 // ---------------------------------------------------------------------
421 /* This runs the queues. It manages a select loop for all of the
422 Worker tasks. The workers interact with the queues and items to
423 manage the actual fetch. */
424 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
425 {
426 Running = true;
427
428 for (Queue *I = Queues; I != 0; I = I->Next)
429 I->Startup();
430
431 if (Log != 0)
432 Log->Start();
433
434 bool WasCancelled = false;
435
436 // Run till all things have been acquired
437 struct timeval tv;
438 tv.tv_sec = 0;
439 tv.tv_usec = PulseIntervall;
440 while (ToFetch > 0)
441 {
442 fd_set RFds;
443 fd_set WFds;
444 int Highest = 0;
445 FD_ZERO(&RFds);
446 FD_ZERO(&WFds);
447 SetFds(Highest,&RFds,&WFds);
448
449 int Res;
450 do
451 {
452 Res = select(Highest+1,&RFds,&WFds,0,&tv);
453 }
454 while (Res < 0 && errno == EINTR);
455
456 if (Res < 0)
457 {
458 _error->Errno("select","Select has failed");
459 break;
460 }
461
462 RunFds(&RFds,&WFds);
463 if (_error->PendingError() == true)
464 break;
465
466 // Timeout, notify the log class
467 if (Res == 0 || (Log != 0 && Log->Update == true))
468 {
469 tv.tv_usec = PulseIntervall;
470 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
471 I->Pulse();
472 if (Log != 0 && Log->Pulse(this) == false)
473 {
474 WasCancelled = true;
475 break;
476 }
477 }
478 }
479
480 if (Log != 0)
481 Log->Stop();
482
483 // Shut down the acquire bits
484 Running = false;
485 for (Queue *I = Queues; I != 0; I = I->Next)
486 I->Shutdown(false);
487
488 // Shut down the items
489 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
490 (*I)->Finished();
491
492 if (_error->PendingError())
493 return Failed;
494 if (WasCancelled)
495 return Cancelled;
496 return Continue;
497 }
498 /*}}}*/
499 // Acquire::Bump - Called when an item is dequeued /*{{{*/
500 // ---------------------------------------------------------------------
501 /* This routine bumps idle queues in hopes that they will be able to fetch
502 the dequeued item */
503 void pkgAcquire::Bump()
504 {
505 for (Queue *I = Queues; I != 0; I = I->Next)
506 I->Bump();
507 }
508 /*}}}*/
509 // Acquire::WorkerStep - Step to the next worker /*{{{*/
510 // ---------------------------------------------------------------------
511 /* Not inlined to advoid including acquire-worker.h */
512 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
513 {
514 return I->NextAcquire;
515 }
516 /*}}}*/
517 // Acquire::Clean - Cleans a directory /*{{{*/
518 // ---------------------------------------------------------------------
519 /* This is a bit simplistic, it looks at every file in the dir and sees
520 if it is part of the download set. */
521 bool pkgAcquire::Clean(string Dir)
522 {
523 // non-existing directories are by definition clean…
524 if (DirectoryExists(Dir) == false)
525 return true;
526
527 if(Dir == "/")
528 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
529
530 DIR *D = opendir(Dir.c_str());
531 if (D == 0)
532 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
533
534 string StartDir = SafeGetCWD();
535 if (chdir(Dir.c_str()) != 0)
536 {
537 closedir(D);
538 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
539 }
540
541 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
542 {
543 // Skip some files..
544 if (strcmp(Dir->d_name,"lock") == 0 ||
545 strcmp(Dir->d_name,"partial") == 0 ||
546 strcmp(Dir->d_name,".") == 0 ||
547 strcmp(Dir->d_name,"..") == 0)
548 continue;
549
550 // Look in the get list
551 ItemCIterator I = Items.begin();
552 for (; I != Items.end(); ++I)
553 if (flNotDir((*I)->DestFile) == Dir->d_name)
554 break;
555
556 // Nothing found, nuke it
557 if (I == Items.end())
558 unlink(Dir->d_name);
559 };
560
561 closedir(D);
562 if (chdir(StartDir.c_str()) != 0)
563 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
564 return true;
565 }
566 /*}}}*/
567 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
568 // ---------------------------------------------------------------------
569 /* This is the total number of bytes needed */
570 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
571 {
572 unsigned long long Total = 0;
573 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
574 Total += (*I)->FileSize;
575 return Total;
576 }
577 /*}}}*/
578 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
579 // ---------------------------------------------------------------------
580 /* This is the number of bytes that is not local */
581 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
582 {
583 unsigned long long Total = 0;
584 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
585 if ((*I)->Local == false)
586 Total += (*I)->FileSize;
587 return Total;
588 }
589 /*}}}*/
590 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
591 // ---------------------------------------------------------------------
592 /* This is the number of bytes that is not local */
593 APT_PURE unsigned long long pkgAcquire::PartialPresent()
594 {
595 unsigned long long Total = 0;
596 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
597 if ((*I)->Local == false)
598 Total += (*I)->PartialSize;
599 return Total;
600 }
601 /*}}}*/
602 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
603 // ---------------------------------------------------------------------
604 /* */
605 pkgAcquire::UriIterator pkgAcquire::UriBegin()
606 {
607 return UriIterator(Queues);
608 }
609 /*}}}*/
610 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
611 // ---------------------------------------------------------------------
612 /* */
613 pkgAcquire::UriIterator pkgAcquire::UriEnd()
614 {
615 return UriIterator(0);
616 }
617 /*}}}*/
618 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
619 // ---------------------------------------------------------------------
620 /* */
621 pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
622 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
623 Removable(false)
624 {
625 }
626 /*}}}*/
627 // Queue::Queue - Constructor /*{{{*/
628 // ---------------------------------------------------------------------
629 /* */
630 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
631 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
632 {
633 }
634 /*}}}*/
635 // Queue::~Queue - Destructor /*{{{*/
636 // ---------------------------------------------------------------------
637 /* */
638 pkgAcquire::Queue::~Queue()
639 {
640 Shutdown(true);
641
642 while (Items != 0)
643 {
644 QItem *Jnk = Items;
645 Items = Items->Next;
646 delete Jnk;
647 }
648 }
649 /*}}}*/
650 // Queue::Enqueue - Queue an item to the queue /*{{{*/
651 // ---------------------------------------------------------------------
652 /* */
653 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
654 {
655 QItem **I = &Items;
656 // move to the end of the queue and check for duplicates here
657 for (; *I != 0; I = &(*I)->Next)
658 if (Item.URI == (*I)->URI)
659 {
660 Item.Owner->Status = Item::StatDone;
661 return false;
662 }
663
664 // Create a new item
665 QItem *Itm = new QItem;
666 *Itm = Item;
667 Itm->Next = 0;
668 *I = Itm;
669
670 Item.Owner->QueueCounter++;
671 if (Items->Next == 0)
672 Cycle();
673 return true;
674 }
675 /*}}}*/
676 // Queue::Dequeue - Remove an item from the queue /*{{{*/
677 // ---------------------------------------------------------------------
678 /* We return true if we hit something */
679 bool pkgAcquire::Queue::Dequeue(Item *Owner)
680 {
681 if (Owner->Status == pkgAcquire::Item::StatFetching)
682 return _error->Error("Tried to dequeue a fetching object");
683
684 bool Res = false;
685
686 QItem **I = &Items;
687 for (; *I != 0;)
688 {
689 if ((*I)->Owner == Owner)
690 {
691 QItem *Jnk= *I;
692 *I = (*I)->Next;
693 Owner->QueueCounter--;
694 delete Jnk;
695 Res = true;
696 }
697 else
698 I = &(*I)->Next;
699 }
700
701 return Res;
702 }
703 /*}}}*/
704 // Queue::Startup - Start the worker processes /*{{{*/
705 // ---------------------------------------------------------------------
706 /* It is possible for this to be called with a pre-existing set of
707 workers. */
708 bool pkgAcquire::Queue::Startup()
709 {
710 if (Workers == 0)
711 {
712 URI U(Name);
713 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
714 if (Cnf == 0)
715 return false;
716
717 Workers = new Worker(this,Cnf,Owner->Log);
718 Owner->Add(Workers);
719 if (Workers->Start() == false)
720 return false;
721
722 /* When pipelining we commit 10 items. This needs to change when we
723 added other source retry to have cycle maintain a pipeline depth
724 on its own. */
725 if (Cnf->Pipeline == true)
726 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
727 else
728 MaxPipeDepth = 1;
729 }
730
731 return Cycle();
732 }
733 /*}}}*/
734 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
735 // ---------------------------------------------------------------------
736 /* If final is true then all workers are eliminated, otherwise only workers
737 that do not need cleanup are removed */
738 bool pkgAcquire::Queue::Shutdown(bool Final)
739 {
740 // Delete all of the workers
741 pkgAcquire::Worker **Cur = &Workers;
742 while (*Cur != 0)
743 {
744 pkgAcquire::Worker *Jnk = *Cur;
745 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
746 {
747 *Cur = Jnk->NextQueue;
748 Owner->Remove(Jnk);
749 delete Jnk;
750 }
751 else
752 Cur = &(*Cur)->NextQueue;
753 }
754
755 return true;
756 }
757 /*}}}*/
758 // Queue::FindItem - Find a URI in the item list /*{{{*/
759 // ---------------------------------------------------------------------
760 /* */
761 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
762 {
763 for (QItem *I = Items; I != 0; I = I->Next)
764 if (I->URI == URI && I->Worker == Owner)
765 return I;
766 return 0;
767 }
768 /*}}}*/
769 // Queue::ItemDone - Item has been completed /*{{{*/
770 // ---------------------------------------------------------------------
771 /* The worker signals this which causes the item to be removed from the
772 queue. If this is the last queue instance then it is removed from the
773 main queue too.*/
774 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
775 {
776 PipeDepth--;
777 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
778 Itm->Owner->Status = pkgAcquire::Item::StatDone;
779
780 if (Itm->Owner->QueueCounter <= 1)
781 Owner->Dequeue(Itm->Owner);
782 else
783 {
784 Dequeue(Itm->Owner);
785 Owner->Bump();
786 }
787
788 return Cycle();
789 }
790 /*}}}*/
791 // Queue::Cycle - Queue new items into the method /*{{{*/
792 // ---------------------------------------------------------------------
793 /* This locates a new idle item and sends it to the worker. If pipelining
794 is enabled then it keeps the pipe full. */
795 bool pkgAcquire::Queue::Cycle()
796 {
797 if (Items == 0 || Workers == 0)
798 return true;
799
800 if (PipeDepth < 0)
801 return _error->Error("Pipedepth failure");
802
803 // Look for a queable item
804 QItem *I = Items;
805 while (PipeDepth < (signed)MaxPipeDepth)
806 {
807 for (; I != 0; I = I->Next)
808 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
809 break;
810
811 // Nothing to do, queue is idle.
812 if (I == 0)
813 return true;
814
815 I->Worker = Workers;
816 I->Owner->Status = pkgAcquire::Item::StatFetching;
817 PipeDepth++;
818 if (Workers->QueueItem(I) == false)
819 return false;
820 }
821
822 return true;
823 }
824 /*}}}*/
825 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
826 // ---------------------------------------------------------------------
827 /* This is called when an item in multiple queues is dequeued */
828 void pkgAcquire::Queue::Bump()
829 {
830 Cycle();
831 }
832 /*}}}*/
833 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
834 // ---------------------------------------------------------------------
835 /* */
836 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
837 {
838 Start();
839 }
840 /*}}}*/
841 // AcquireStatus::Pulse - Called periodically /*{{{*/
842 // ---------------------------------------------------------------------
843 /* This computes some internal state variables for the derived classes to
844 use. It generates the current downloaded bytes and total bytes to download
845 as well as the current CPS estimate. */
846 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
847 {
848 TotalBytes = 0;
849 CurrentBytes = 0;
850 TotalItems = 0;
851 CurrentItems = 0;
852
853 // Compute the total number of bytes to fetch
854 unsigned int Unknown = 0;
855 unsigned int Count = 0;
856 bool UnfetchedReleaseFiles = false;
857 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
858 I != Owner->ItemsEnd();
859 ++I, ++Count)
860 {
861 TotalItems++;
862 if ((*I)->Status == pkgAcquire::Item::StatDone)
863 ++CurrentItems;
864
865 // Totally ignore local items
866 if ((*I)->Local == true)
867 continue;
868
869 // see if the method tells us to expect more
870 TotalItems += (*I)->ExpectedAdditionalItems;
871
872 // check if there are unfetched Release files
873 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
874 UnfetchedReleaseFiles = true;
875
876 TotalBytes += (*I)->FileSize;
877 if ((*I)->Complete == true)
878 CurrentBytes += (*I)->FileSize;
879 if ((*I)->FileSize == 0 && (*I)->Complete == false)
880 ++Unknown;
881 }
882
883 // Compute the current completion
884 unsigned long long ResumeSize = 0;
885 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
886 I = Owner->WorkerStep(I))
887 {
888 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
889 {
890 CurrentBytes += I->CurrentSize;
891 ResumeSize += I->ResumePoint;
892
893 // Files with unknown size always have 100% completion
894 if (I->CurrentItem->Owner->FileSize == 0 &&
895 I->CurrentItem->Owner->Complete == false)
896 TotalBytes += I->CurrentSize;
897 }
898 }
899
900 // Normalize the figures and account for unknown size downloads
901 if (TotalBytes <= 0)
902 TotalBytes = 1;
903 if (Unknown == Count)
904 TotalBytes = Unknown;
905
906 // Wha?! Is not supposed to happen.
907 if (CurrentBytes > TotalBytes)
908 CurrentBytes = TotalBytes;
909
910 // debug
911 if (_config->FindB("Debug::acquire::progress", false) == true)
912 std::clog << " Bytes: "
913 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
914 << std::endl;
915
916 // Compute the CPS
917 struct timeval NewTime;
918 gettimeofday(&NewTime,0);
919 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
920 NewTime.tv_sec - Time.tv_sec > 6)
921 {
922 double Delta = NewTime.tv_sec - Time.tv_sec +
923 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
924
925 // Compute the CPS value
926 if (Delta < 0.01)
927 CurrentCPS = 0;
928 else
929 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
930 LastBytes = CurrentBytes - ResumeSize;
931 ElapsedTime = (unsigned long long)Delta;
932 Time = NewTime;
933 }
934
935 // calculate the percentage, if we have too little data assume 1%
936 if (TotalBytes > 0 && UnfetchedReleaseFiles)
937 Percent = 0;
938 else
939 // use both files and bytes because bytes can be unreliable
940 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
941 0.2 * (CurrentItems/float(TotalItems)*100.0));
942
943 int fd = _config->FindI("APT::Status-Fd",-1);
944 if(fd > 0)
945 {
946 ostringstream status;
947
948 char msg[200];
949 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
950 unsigned long long ETA = 0;
951 if(CurrentCPS > 0)
952 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
953
954 // only show the ETA if it makes sense
955 if (ETA > 0 && ETA < 172800 /* two days */ )
956 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
957 else
958 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
959
960 // build the status str
961 status << "dlstatus:" << i
962 << ":" << std::setprecision(3) << Percent
963 << ":" << msg
964 << endl;
965
966 std::string const dlstatus = status.str();
967 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
968 }
969
970 return true;
971 }
972 /*}}}*/
973 // AcquireStatus::Start - Called when the download is started /*{{{*/
974 // ---------------------------------------------------------------------
975 /* We just reset the counters */
976 void pkgAcquireStatus::Start()
977 {
978 gettimeofday(&Time,0);
979 gettimeofday(&StartTime,0);
980 LastBytes = 0;
981 CurrentCPS = 0;
982 CurrentBytes = 0;
983 TotalBytes = 0;
984 FetchedBytes = 0;
985 ElapsedTime = 0;
986 TotalItems = 0;
987 CurrentItems = 0;
988 }
989 /*}}}*/
990 // AcquireStatus::Stop - Finished downloading /*{{{*/
991 // ---------------------------------------------------------------------
992 /* This accurately computes the elapsed time and the total overall CPS. */
993 void pkgAcquireStatus::Stop()
994 {
995 // Compute the CPS and elapsed time
996 struct timeval NewTime;
997 gettimeofday(&NewTime,0);
998
999 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1000 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
1001
1002 // Compute the CPS value
1003 if (Delta < 0.01)
1004 CurrentCPS = 0;
1005 else
1006 CurrentCPS = FetchedBytes/Delta;
1007 LastBytes = CurrentBytes;
1008 ElapsedTime = (unsigned long long)Delta;
1009 }
1010 /*}}}*/
1011 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1012 // ---------------------------------------------------------------------
1013 /* This is used to get accurate final transfer rate reporting. */
1014 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
1015 {
1016 FetchedBytes += Size - Resume;
1017 }
1018 /*}}}*/