]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Merge remote-tracking branch 'upstream/debian/experimental' into feature/acq-trans
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40 #include <sys/stat.h>
41
42 #include <apti18n.h>
43 /*}}}*/
44
45 using namespace std;
46
47 // Acquire::pkgAcquire - Constructor /*{{{*/
48 // ---------------------------------------------------------------------
49 /* We grab some runtime state from the configuration space */
50 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
51 Debug(_config->FindB("Debug::pkgAcquire",false)),
52 Running(false)
53 {
54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
58 QueueMode = QueueAccess;
59 }
60 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
63 Running(false)
64 {
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71 }
72 /*}}}*/
73 // Acquire::Setup - Delayed Constructor /*{{{*/
74 // ---------------------------------------------------------------------
75 /* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
77 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock,
78 bool const createDirectories)
79 {
80 Log = Progress;
81
82 // check for existence and possibly create auxiliary directories
83 if (createDirectories == true)
84 {
85 string const listDir = _config->FindDir("Dir::State::lists");
86 string const partialListDir = listDir + "partial/";
87 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
88 string const partialArchivesDir = archivesDir + "partial/";
89
90 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
91 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
92 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
93
94 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
95 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
96 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
97 }
98
99 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
100 return true;
101
102 // Lock the directory this acquire object will work in
103 LockFD = GetLock(flCombine(Lock, "lock"));
104 if (LockFD == -1)
105 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
106
107 return true;
108 }
109 /*}}}*/
110 // Acquire::~pkgAcquire - Destructor /*{{{*/
111 // ---------------------------------------------------------------------
112 /* Free our memory, clean up the queues (destroy the workers) */
113 pkgAcquire::~pkgAcquire()
114 {
115 Shutdown();
116
117 if (LockFD != -1)
118 close(LockFD);
119
120 while (Configs != 0)
121 {
122 MethodConfig *Jnk = Configs;
123 Configs = Configs->Next;
124 delete Jnk;
125 }
126 }
127 /*}}}*/
128 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
129 // ---------------------------------------------------------------------
130 /* */
131 void pkgAcquire::Shutdown()
132 {
133 while (Items.empty() == false)
134 {
135 if (Items[0]->Status == Item::StatFetching)
136 Items[0]->Status = Item::StatError;
137 delete Items[0];
138 }
139
140 while (Queues != 0)
141 {
142 Queue *Jnk = Queues;
143 Queues = Queues->Next;
144 delete Jnk;
145 }
146 }
147 /*}}}*/
148 // Acquire::Add - Add a new item /*{{{*/
149 // ---------------------------------------------------------------------
150 /* This puts an item on the acquire list. This list is mainly for tracking
151 item status */
152 void pkgAcquire::Add(Item *Itm)
153 {
154 Items.push_back(Itm);
155 }
156 /*}}}*/
157 // Acquire::Remove - Remove a item /*{{{*/
158 // ---------------------------------------------------------------------
159 /* Remove an item from the acquire list. This is usually not used.. */
160 void pkgAcquire::Remove(Item *Itm)
161 {
162 Dequeue(Itm);
163
164 for (ItemIterator I = Items.begin(); I != Items.end();)
165 {
166 if (*I == Itm)
167 {
168 Items.erase(I);
169 I = Items.begin();
170 }
171 else
172 ++I;
173 }
174 }
175 /*}}}*/
176 // Acquire::Add - Add a worker /*{{{*/
177 // ---------------------------------------------------------------------
178 /* A list of workers is kept so that the select loop can direct their FD
179 usage. */
180 void pkgAcquire::Add(Worker *Work)
181 {
182 Work->NextAcquire = Workers;
183 Workers = Work;
184 }
185 /*}}}*/
186 // Acquire::Remove - Remove a worker /*{{{*/
187 // ---------------------------------------------------------------------
188 /* A worker has died. This can not be done while the select loop is running
189 as it would require that RunFds could handling a changing list state and
190 it can't.. */
191 void pkgAcquire::Remove(Worker *Work)
192 {
193 if (Running == true)
194 abort();
195
196 Worker **I = &Workers;
197 for (; *I != 0;)
198 {
199 if (*I == Work)
200 *I = (*I)->NextAcquire;
201 else
202 I = &(*I)->NextAcquire;
203 }
204 }
205 /*}}}*/
206 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
207 // ---------------------------------------------------------------------
208 /* This is the entry point for an item. An item calls this function when
209 it is constructed which creates a queue (based on the current queue
210 mode) and puts the item in that queue. If the system is running then
211 the queue might be started. */
212 void pkgAcquire::Enqueue(ItemDesc &Item)
213 {
214 // Determine which queue to put the item in
215 const MethodConfig *Config;
216 string Name = QueueName(Item.URI,Config);
217 if (Name.empty() == true)
218 return;
219
220 // Find the queue structure
221 Queue *I = Queues;
222 for (; I != 0 && I->Name != Name; I = I->Next);
223 if (I == 0)
224 {
225 I = new Queue(Name,this);
226 I->Next = Queues;
227 Queues = I;
228
229 if (Running == true)
230 I->Startup();
231 }
232
233 // See if this is a local only URI
234 if (Config->LocalOnly == true && Item.Owner->Complete == false)
235 Item.Owner->Local = true;
236 Item.Owner->Status = Item::StatIdle;
237
238 // Queue it into the named queue
239 if(I->Enqueue(Item))
240 ToFetch++;
241
242 // Some trace stuff
243 if (Debug == true)
244 {
245 clog << "Fetching " << Item.URI << endl;
246 clog << " to " << Item.Owner->DestFile << endl;
247 clog << " Queue is: " << Name << endl;
248 }
249 }
250 /*}}}*/
251 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
252 // ---------------------------------------------------------------------
253 /* This is called when an item is finished being fetched. It removes it
254 from all the queues */
255 void pkgAcquire::Dequeue(Item *Itm)
256 {
257 Queue *I = Queues;
258 bool Res = false;
259 if (Debug == true)
260 clog << "Dequeuing " << Itm->DestFile << endl;
261
262 for (; I != 0; I = I->Next)
263 {
264 if (I->Dequeue(Itm))
265 {
266 Res = true;
267 if (Debug == true)
268 clog << "Dequeued from " << I->Name << endl;
269 }
270 }
271
272 if (Res == true)
273 ToFetch--;
274 }
275 /*}}}*/
276 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
277 // ---------------------------------------------------------------------
278 /* The string returned depends on the configuration settings and the
279 method parameters. Given something like http://foo.org/bar it can
280 return http://foo.org or http */
281 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
282 {
283 URI U(Uri);
284
285 Config = GetConfig(U.Access);
286 if (Config == 0)
287 return string();
288
289 /* Single-Instance methods get exactly one queue per URI. This is
290 also used for the Access queue method */
291 if (Config->SingleInstance == true || QueueMode == QueueAccess)
292 return U.Access;
293
294 string AccessSchema = U.Access + ':',
295 FullQueueName = AccessSchema + U.Host;
296 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
297
298 Queue *I = Queues;
299 for (; I != 0; I = I->Next) {
300 // if the queue already exists, re-use it
301 if (I->Name == FullQueueName)
302 return FullQueueName;
303
304 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
305 Instances++;
306 }
307
308 if (Debug) {
309 clog << "Found " << Instances << " instances of " << U.Access << endl;
310 }
311
312 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
313 return U.Access;
314
315 return FullQueueName;
316 }
317 /*}}}*/
318 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
319 // ---------------------------------------------------------------------
320 /* This locates the configuration structure for an access method. If
321 a config structure cannot be found a Worker will be created to
322 retrieve it */
323 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
324 {
325 // Search for an existing config
326 MethodConfig *Conf;
327 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
328 if (Conf->Access == Access)
329 return Conf;
330
331 // Create the new config class
332 Conf = new MethodConfig;
333 Conf->Access = Access;
334 Conf->Next = Configs;
335 Configs = Conf;
336
337 // Create the worker to fetch the configuration
338 Worker Work(Conf);
339 if (Work.Start() == false)
340 return 0;
341
342 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
343 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
344 Conf->SingleInstance = true;
345
346 return Conf;
347 }
348 /*}}}*/
349 // Acquire::SetFds - Deal with readable FDs /*{{{*/
350 // ---------------------------------------------------------------------
351 /* Collect FDs that have activity monitors into the fd sets */
352 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
353 {
354 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
355 {
356 if (I->InReady == true && I->InFd >= 0)
357 {
358 if (Fd < I->InFd)
359 Fd = I->InFd;
360 FD_SET(I->InFd,RSet);
361 }
362 if (I->OutReady == true && I->OutFd >= 0)
363 {
364 if (Fd < I->OutFd)
365 Fd = I->OutFd;
366 FD_SET(I->OutFd,WSet);
367 }
368 }
369 }
370 /*}}}*/
371 // Acquire::RunFds - Deal with active FDs /*{{{*/
372 // ---------------------------------------------------------------------
373 /* Dispatch active FDs over to the proper workers. It is very important
374 that a worker never be erased while this is running! The queue class
375 should never erase a worker except during shutdown processing. */
376 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
377 {
378 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
379 {
380 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
381 I->InFdReady();
382 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
383 I->OutFdReady();
384 }
385 }
386 /*}}}*/
387 // Acquire::Run - Run the fetch sequence /*{{{*/
388 // ---------------------------------------------------------------------
389 /* This runs the queues. It manages a select loop for all of the
390 Worker tasks. The workers interact with the queues and items to
391 manage the actual fetch. */
392 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
393 {
394 Running = true;
395
396 for (Queue *I = Queues; I != 0; I = I->Next)
397 I->Startup();
398
399 if (Log != 0)
400 Log->Start();
401
402 bool WasCancelled = false;
403
404 // Run till all things have been acquired
405 struct timeval tv;
406 tv.tv_sec = 0;
407 tv.tv_usec = PulseIntervall;
408 while (ToFetch > 0)
409 {
410 fd_set RFds;
411 fd_set WFds;
412 int Highest = 0;
413 FD_ZERO(&RFds);
414 FD_ZERO(&WFds);
415 SetFds(Highest,&RFds,&WFds);
416
417 int Res;
418 do
419 {
420 Res = select(Highest+1,&RFds,&WFds,0,&tv);
421 }
422 while (Res < 0 && errno == EINTR);
423
424 if (Res < 0)
425 {
426 _error->Errno("select","Select has failed");
427 break;
428 }
429
430 RunFds(&RFds,&WFds);
431 if (_error->PendingError() == true)
432 break;
433
434 // Timeout, notify the log class
435 if (Res == 0 || (Log != 0 && Log->Update == true))
436 {
437 tv.tv_usec = PulseIntervall;
438 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
439 I->Pulse();
440 if (Log != 0 && Log->Pulse(this) == false)
441 {
442 WasCancelled = true;
443 break;
444 }
445 }
446 }
447
448 if (Log != 0)
449 Log->Stop();
450
451 // Shut down the acquire bits
452 Running = false;
453 for (Queue *I = Queues; I != 0; I = I->Next)
454 I->Shutdown(false);
455
456 // Shut down the items
457 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
458 (*I)->Finished();
459
460 if (_error->PendingError())
461 return Failed;
462 if (WasCancelled)
463 return Cancelled;
464 return Continue;
465 }
466 /*}}}*/
467 // Acquire::Bump - Called when an item is dequeued /*{{{*/
468 // ---------------------------------------------------------------------
469 /* This routine bumps idle queues in hopes that they will be able to fetch
470 the dequeued item */
471 void pkgAcquire::Bump()
472 {
473 for (Queue *I = Queues; I != 0; I = I->Next)
474 I->Bump();
475 }
476 /*}}}*/
477 // Acquire::WorkerStep - Step to the next worker /*{{{*/
478 // ---------------------------------------------------------------------
479 /* Not inlined to advoid including acquire-worker.h */
480 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
481 {
482 return I->NextAcquire;
483 }
484 /*}}}*/
485 // Acquire::Clean - Cleans a directory /*{{{*/
486 // ---------------------------------------------------------------------
487 /* This is a bit simplistic, it looks at every file in the dir and sees
488 if it is part of the download set. */
489 bool pkgAcquire::Clean(string Dir)
490 {
491 // non-existing directories are by definition clean…
492 if (DirectoryExists(Dir) == false)
493 return true;
494
495 if(Dir == "/")
496 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
497
498 DIR *D = opendir(Dir.c_str());
499 if (D == 0)
500 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
501
502 string StartDir = SafeGetCWD();
503 if (chdir(Dir.c_str()) != 0)
504 {
505 closedir(D);
506 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
507 }
508
509 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
510 {
511 // Skip some files..
512 if (strcmp(Dir->d_name,"lock") == 0 ||
513 strcmp(Dir->d_name,"partial") == 0 ||
514 strcmp(Dir->d_name,".") == 0 ||
515 strcmp(Dir->d_name,"..") == 0)
516 continue;
517
518 // Look in the get list
519 ItemCIterator I = Items.begin();
520 for (; I != Items.end(); ++I)
521 if (flNotDir((*I)->DestFile) == Dir->d_name)
522 break;
523
524 // Nothing found, nuke it
525 if (I == Items.end())
526 unlink(Dir->d_name);
527 };
528
529 closedir(D);
530 if (chdir(StartDir.c_str()) != 0)
531 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
532 return true;
533 }
534 /*}}}*/
535 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
536 // ---------------------------------------------------------------------
537 /* This is the total number of bytes needed */
538 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
539 {
540 unsigned long long Total = 0;
541 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
542 Total += (*I)->FileSize;
543 return Total;
544 }
545 /*}}}*/
546 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
547 // ---------------------------------------------------------------------
548 /* This is the number of bytes that is not local */
549 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
550 {
551 unsigned long long Total = 0;
552 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
553 if ((*I)->Local == false)
554 Total += (*I)->FileSize;
555 return Total;
556 }
557 /*}}}*/
558 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
559 // ---------------------------------------------------------------------
560 /* This is the number of bytes that is not local */
561 APT_PURE unsigned long long pkgAcquire::PartialPresent()
562 {
563 unsigned long long Total = 0;
564 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
565 if ((*I)->Local == false)
566 Total += (*I)->PartialSize;
567 return Total;
568 }
569 /*}}}*/
570 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
571 // ---------------------------------------------------------------------
572 /* */
573 pkgAcquire::UriIterator pkgAcquire::UriBegin()
574 {
575 return UriIterator(Queues);
576 }
577 /*}}}*/
578 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
579 // ---------------------------------------------------------------------
580 /* */
581 pkgAcquire::UriIterator pkgAcquire::UriEnd()
582 {
583 return UriIterator(0);
584 }
585 /*}}}*/
586 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
587 // ---------------------------------------------------------------------
588 /* */
589 pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
590 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
591 Removable(false)
592 {
593 }
594 /*}}}*/
595 // Queue::Queue - Constructor /*{{{*/
596 // ---------------------------------------------------------------------
597 /* */
598 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
599 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
600 {
601 }
602 /*}}}*/
603 // Queue::~Queue - Destructor /*{{{*/
604 // ---------------------------------------------------------------------
605 /* */
606 pkgAcquire::Queue::~Queue()
607 {
608 Shutdown(true);
609
610 while (Items != 0)
611 {
612 QItem *Jnk = Items;
613 Items = Items->Next;
614 delete Jnk;
615 }
616 }
617 /*}}}*/
618 // Queue::Enqueue - Queue an item to the queue /*{{{*/
619 // ---------------------------------------------------------------------
620 /* */
621 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
622 {
623 QItem **I = &Items;
624 // move to the end of the queue and check for duplicates here
625 for (; *I != 0; I = &(*I)->Next)
626 if (Item.URI == (*I)->URI)
627 {
628 Item.Owner->Status = Item::StatDone;
629 return false;
630 }
631
632 // Create a new item
633 QItem *Itm = new QItem;
634 *Itm = Item;
635 Itm->Next = 0;
636 *I = Itm;
637
638 Item.Owner->QueueCounter++;
639 if (Items->Next == 0)
640 Cycle();
641 return true;
642 }
643 /*}}}*/
644 // Queue::Dequeue - Remove an item from the queue /*{{{*/
645 // ---------------------------------------------------------------------
646 /* We return true if we hit something */
647 bool pkgAcquire::Queue::Dequeue(Item *Owner)
648 {
649 if (Owner->Status == pkgAcquire::Item::StatFetching)
650 return _error->Error("Tried to dequeue a fetching object");
651
652 bool Res = false;
653
654 QItem **I = &Items;
655 for (; *I != 0;)
656 {
657 if ((*I)->Owner == Owner)
658 {
659 QItem *Jnk= *I;
660 *I = (*I)->Next;
661 Owner->QueueCounter--;
662 delete Jnk;
663 Res = true;
664 }
665 else
666 I = &(*I)->Next;
667 }
668
669 return Res;
670 }
671 /*}}}*/
672 // Queue::Startup - Start the worker processes /*{{{*/
673 // ---------------------------------------------------------------------
674 /* It is possible for this to be called with a pre-existing set of
675 workers. */
676 bool pkgAcquire::Queue::Startup()
677 {
678 if (Workers == 0)
679 {
680 URI U(Name);
681 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
682 if (Cnf == 0)
683 return false;
684
685 Workers = new Worker(this,Cnf,Owner->Log);
686 Owner->Add(Workers);
687 if (Workers->Start() == false)
688 return false;
689
690 /* When pipelining we commit 10 items. This needs to change when we
691 added other source retry to have cycle maintain a pipeline depth
692 on its own. */
693 if (Cnf->Pipeline == true)
694 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
695 else
696 MaxPipeDepth = 1;
697 }
698
699 return Cycle();
700 }
701 /*}}}*/
702 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
703 // ---------------------------------------------------------------------
704 /* If final is true then all workers are eliminated, otherwise only workers
705 that do not need cleanup are removed */
706 bool pkgAcquire::Queue::Shutdown(bool Final)
707 {
708 // Delete all of the workers
709 pkgAcquire::Worker **Cur = &Workers;
710 while (*Cur != 0)
711 {
712 pkgAcquire::Worker *Jnk = *Cur;
713 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
714 {
715 *Cur = Jnk->NextQueue;
716 Owner->Remove(Jnk);
717 delete Jnk;
718 }
719 else
720 Cur = &(*Cur)->NextQueue;
721 }
722
723 return true;
724 }
725 /*}}}*/
726 // Queue::FindItem - Find a URI in the item list /*{{{*/
727 // ---------------------------------------------------------------------
728 /* */
729 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
730 {
731 for (QItem *I = Items; I != 0; I = I->Next)
732 if (I->URI == URI && I->Worker == Owner)
733 return I;
734 return 0;
735 }
736 /*}}}*/
737 // Queue::ItemDone - Item has been completed /*{{{*/
738 // ---------------------------------------------------------------------
739 /* The worker signals this which causes the item to be removed from the
740 queue. If this is the last queue instance then it is removed from the
741 main queue too.*/
742 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
743 {
744 PipeDepth--;
745 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
746 Itm->Owner->Status = pkgAcquire::Item::StatDone;
747
748 if (Itm->Owner->QueueCounter <= 1)
749 Owner->Dequeue(Itm->Owner);
750 else
751 {
752 Dequeue(Itm->Owner);
753 Owner->Bump();
754 }
755
756 return Cycle();
757 }
758 /*}}}*/
759 // Queue::Cycle - Queue new items into the method /*{{{*/
760 // ---------------------------------------------------------------------
761 /* This locates a new idle item and sends it to the worker. If pipelining
762 is enabled then it keeps the pipe full. */
763 bool pkgAcquire::Queue::Cycle()
764 {
765 if (Items == 0 || Workers == 0)
766 return true;
767
768 if (PipeDepth < 0)
769 return _error->Error("Pipedepth failure");
770
771 // Look for a queable item
772 QItem *I = Items;
773 while (PipeDepth < (signed)MaxPipeDepth)
774 {
775 for (; I != 0; I = I->Next)
776 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
777 break;
778
779 // Nothing to do, queue is idle.
780 if (I == 0)
781 return true;
782
783 I->Worker = Workers;
784 I->Owner->Status = pkgAcquire::Item::StatFetching;
785 PipeDepth++;
786 if (Workers->QueueItem(I) == false)
787 return false;
788 }
789
790 return true;
791 }
792 /*}}}*/
793 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
794 // ---------------------------------------------------------------------
795 /* This is called when an item in multiple queues is dequeued */
796 void pkgAcquire::Queue::Bump()
797 {
798 Cycle();
799 }
800 /*}}}*/
801 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
802 // ---------------------------------------------------------------------
803 /* */
804 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
805 {
806 Start();
807 }
808 /*}}}*/
809 // AcquireStatus::Pulse - Called periodically /*{{{*/
810 // ---------------------------------------------------------------------
811 /* This computes some internal state variables for the derived classes to
812 use. It generates the current downloaded bytes and total bytes to download
813 as well as the current CPS estimate. */
814 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
815 {
816 TotalBytes = 0;
817 CurrentBytes = 0;
818 TotalItems = 0;
819 CurrentItems = 0;
820
821 // Compute the total number of bytes to fetch
822 unsigned int Unknown = 0;
823 unsigned int Count = 0;
824 bool UnfetchedReleaseFiles = false;
825 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
826 I != Owner->ItemsEnd();
827 ++I, ++Count)
828 {
829 TotalItems++;
830 if ((*I)->Status == pkgAcquire::Item::StatDone)
831 ++CurrentItems;
832
833 // Totally ignore local items
834 if ((*I)->Local == true)
835 continue;
836
837 // see if the method tells us to expect more
838 TotalItems += (*I)->ExpectedAdditionalItems;
839
840 // check if there are unfetched Release files
841 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
842 UnfetchedReleaseFiles = true;
843
844 TotalBytes += (*I)->FileSize;
845 if ((*I)->Complete == true)
846 CurrentBytes += (*I)->FileSize;
847 if ((*I)->FileSize == 0 && (*I)->Complete == false)
848 ++Unknown;
849 }
850
851 // Compute the current completion
852 unsigned long long ResumeSize = 0;
853 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
854 I = Owner->WorkerStep(I))
855 {
856 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
857 {
858 CurrentBytes += I->CurrentSize;
859 ResumeSize += I->ResumePoint;
860
861 // Files with unknown size always have 100% completion
862 if (I->CurrentItem->Owner->FileSize == 0 &&
863 I->CurrentItem->Owner->Complete == false)
864 TotalBytes += I->CurrentSize;
865 }
866 }
867
868 // Normalize the figures and account for unknown size downloads
869 if (TotalBytes <= 0)
870 TotalBytes = 1;
871 if (Unknown == Count)
872 TotalBytes = Unknown;
873
874 // Wha?! Is not supposed to happen.
875 if (CurrentBytes > TotalBytes)
876 CurrentBytes = TotalBytes;
877
878 // debug
879 if (_config->FindB("Debug::acquire::progress", false) == true)
880 std::clog << " Bytes: "
881 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
882 << std::endl;
883
884 // Compute the CPS
885 struct timeval NewTime;
886 gettimeofday(&NewTime,0);
887 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
888 NewTime.tv_sec - Time.tv_sec > 6)
889 {
890 double Delta = NewTime.tv_sec - Time.tv_sec +
891 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
892
893 // Compute the CPS value
894 if (Delta < 0.01)
895 CurrentCPS = 0;
896 else
897 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
898 LastBytes = CurrentBytes - ResumeSize;
899 ElapsedTime = (unsigned long long)Delta;
900 Time = NewTime;
901 }
902
903 // calculate the percentage, if we have too little data assume 1%
904 if (TotalBytes > 0 && UnfetchedReleaseFiles)
905 Percent = 0;
906 else
907 // use both files and bytes because bytes can be unreliable
908 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
909 0.2 * (CurrentItems/float(TotalItems)*100.0));
910
911 int fd = _config->FindI("APT::Status-Fd",-1);
912 if(fd > 0)
913 {
914 ostringstream status;
915
916 char msg[200];
917 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
918 unsigned long long ETA = 0;
919 if(CurrentCPS > 0)
920 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
921
922 // only show the ETA if it makes sense
923 if (ETA > 0 && ETA < 172800 /* two days */ )
924 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
925 else
926 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
927
928 // build the status str
929 status << "dlstatus:" << i
930 << ":" << std::setprecision(3) << Percent
931 << ":" << msg
932 << endl;
933
934 std::string const dlstatus = status.str();
935 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
936 }
937
938 return true;
939 }
940 /*}}}*/
941 // AcquireStatus::Start - Called when the download is started /*{{{*/
942 // ---------------------------------------------------------------------
943 /* We just reset the counters */
944 void pkgAcquireStatus::Start()
945 {
946 gettimeofday(&Time,0);
947 gettimeofday(&StartTime,0);
948 LastBytes = 0;
949 CurrentCPS = 0;
950 CurrentBytes = 0;
951 TotalBytes = 0;
952 FetchedBytes = 0;
953 ElapsedTime = 0;
954 TotalItems = 0;
955 CurrentItems = 0;
956 }
957 /*}}}*/
958 // AcquireStatus::Stop - Finished downloading /*{{{*/
959 // ---------------------------------------------------------------------
960 /* This accurately computes the elapsed time and the total overall CPS. */
961 void pkgAcquireStatus::Stop()
962 {
963 // Compute the CPS and elapsed time
964 struct timeval NewTime;
965 gettimeofday(&NewTime,0);
966
967 double Delta = NewTime.tv_sec - StartTime.tv_sec +
968 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
969
970 // Compute the CPS value
971 if (Delta < 0.01)
972 CurrentCPS = 0;
973 else
974 CurrentCPS = FetchedBytes/Delta;
975 LastBytes = CurrentBytes;
976 ElapsedTime = (unsigned long long)Delta;
977 }
978 /*}}}*/
979 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
980 // ---------------------------------------------------------------------
981 /* This is used to get accurate final transfer rate reporting. */
982 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
983 {
984 FetchedBytes += Size - Resume;
985 }
986 /*}}}*/