]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
* merged ddtp support
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #ifdef __GNUG__
17 #pragma implementation "apt-pkg/acquire.h"
18 #endif
19 #include <apt-pkg/acquire.h>
20 #include <apt-pkg/acquire-item.h>
21 #include <apt-pkg/acquire-worker.h>
22 #include <apt-pkg/configuration.h>
23 #include <apt-pkg/error.h>
24 #include <apt-pkg/strutl.h>
25
26 #include <apti18n.h>
27
28 #include <iostream>
29 #include <sstream>
30
31 #include <dirent.h>
32 #include <sys/time.h>
33 #include <errno.h>
34 #include <sys/stat.h>
35 /*}}}*/
36
37 using namespace std;
38
39 // Acquire::pkgAcquire - Constructor /*{{{*/
40 // ---------------------------------------------------------------------
41 /* We grab some runtime state from the configuration space */
42 pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
43 {
44 Queues = 0;
45 Configs = 0;
46 Workers = 0;
47 ToFetch = 0;
48 Running = false;
49
50 string Mode = _config->Find("Acquire::Queue-Mode","host");
51 if (strcasecmp(Mode.c_str(),"host") == 0)
52 QueueMode = QueueHost;
53 if (strcasecmp(Mode.c_str(),"access") == 0)
54 QueueMode = QueueAccess;
55
56 Debug = _config->FindB("Debug::pkgAcquire",false);
57
58 // This is really a stupid place for this
59 struct stat St;
60 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
61 S_ISDIR(St.st_mode) == 0)
62 _error->Error(_("Lists directory %spartial is missing."),
63 _config->FindDir("Dir::State::lists").c_str());
64 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
65 S_ISDIR(St.st_mode) == 0)
66 _error->Error(_("Archive directory %spartial is missing."),
67 _config->FindDir("Dir::Cache::Archives").c_str());
68 }
69 /*}}}*/
70 // Acquire::~pkgAcquire - Destructor /*{{{*/
71 // ---------------------------------------------------------------------
72 /* Free our memory, clean up the queues (destroy the workers) */
73 pkgAcquire::~pkgAcquire()
74 {
75 Shutdown();
76
77 while (Configs != 0)
78 {
79 MethodConfig *Jnk = Configs;
80 Configs = Configs->Next;
81 delete Jnk;
82 }
83 }
84 /*}}}*/
85 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
86 // ---------------------------------------------------------------------
87 /* */
88 void pkgAcquire::Shutdown()
89 {
90 while (Items.size() != 0)
91 {
92 if (Items[0]->Status == Item::StatFetching)
93 Items[0]->Status = Item::StatError;
94 delete Items[0];
95 }
96
97 while (Queues != 0)
98 {
99 Queue *Jnk = Queues;
100 Queues = Queues->Next;
101 delete Jnk;
102 }
103 }
104 /*}}}*/
105 // Acquire::Add - Add a new item /*{{{*/
106 // ---------------------------------------------------------------------
107 /* This puts an item on the acquire list. This list is mainly for tracking
108 item status */
109 void pkgAcquire::Add(Item *Itm)
110 {
111 Items.push_back(Itm);
112 }
113 /*}}}*/
114 // Acquire::Remove - Remove a item /*{{{*/
115 // ---------------------------------------------------------------------
116 /* Remove an item from the acquire list. This is usually not used.. */
117 void pkgAcquire::Remove(Item *Itm)
118 {
119 Dequeue(Itm);
120
121 for (ItemIterator I = Items.begin(); I != Items.end();)
122 {
123 if (*I == Itm)
124 {
125 Items.erase(I);
126 I = Items.begin();
127 }
128 else
129 I++;
130 }
131 }
132 /*}}}*/
133 // Acquire::Add - Add a worker /*{{{*/
134 // ---------------------------------------------------------------------
135 /* A list of workers is kept so that the select loop can direct their FD
136 usage. */
137 void pkgAcquire::Add(Worker *Work)
138 {
139 Work->NextAcquire = Workers;
140 Workers = Work;
141 }
142 /*}}}*/
143 // Acquire::Remove - Remove a worker /*{{{*/
144 // ---------------------------------------------------------------------
145 /* A worker has died. This can not be done while the select loop is running
146 as it would require that RunFds could handling a changing list state and
147 it cant.. */
148 void pkgAcquire::Remove(Worker *Work)
149 {
150 if (Running == true)
151 abort();
152
153 Worker **I = &Workers;
154 for (; *I != 0;)
155 {
156 if (*I == Work)
157 *I = (*I)->NextAcquire;
158 else
159 I = &(*I)->NextAcquire;
160 }
161 }
162 /*}}}*/
163 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
164 // ---------------------------------------------------------------------
165 /* This is the entry point for an item. An item calls this function when
166 it is constructed which creates a queue (based on the current queue
167 mode) and puts the item in that queue. If the system is running then
168 the queue might be started. */
169 void pkgAcquire::Enqueue(ItemDesc &Item)
170 {
171 // Determine which queue to put the item in
172 const MethodConfig *Config;
173 string Name = QueueName(Item.URI,Config);
174 if (Name.empty() == true)
175 return;
176
177 // Find the queue structure
178 Queue *I = Queues;
179 for (; I != 0 && I->Name != Name; I = I->Next);
180 if (I == 0)
181 {
182 I = new Queue(Name,this);
183 I->Next = Queues;
184 Queues = I;
185
186 if (Running == true)
187 I->Startup();
188 }
189
190 // See if this is a local only URI
191 if (Config->LocalOnly == true && Item.Owner->Complete == false)
192 Item.Owner->Local = true;
193 Item.Owner->Status = Item::StatIdle;
194
195 // Queue it into the named queue
196 I->Enqueue(Item);
197 ToFetch++;
198
199 // Some trace stuff
200 if (Debug == true)
201 {
202 clog << "Fetching " << Item.URI << endl;
203 clog << " to " << Item.Owner->DestFile << endl;
204 clog << " Queue is: " << Name << endl;
205 }
206 }
207 /*}}}*/
208 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
209 // ---------------------------------------------------------------------
210 /* This is called when an item is finished being fetched. It removes it
211 from all the queues */
212 void pkgAcquire::Dequeue(Item *Itm)
213 {
214 Queue *I = Queues;
215 bool Res = false;
216 for (; I != 0; I = I->Next)
217 Res |= I->Dequeue(Itm);
218
219 if (Debug == true)
220 clog << "Dequeuing " << Itm->DestFile << endl;
221 if (Res == true)
222 ToFetch--;
223 }
224 /*}}}*/
225 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
226 // ---------------------------------------------------------------------
227 /* The string returned depends on the configuration settings and the
228 method parameters. Given something like http://foo.org/bar it can
229 return http://foo.org or http */
230 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
231 {
232 URI U(Uri);
233
234 Config = GetConfig(U.Access);
235 if (Config == 0)
236 return string();
237
238 /* Single-Instance methods get exactly one queue per URI. This is
239 also used for the Access queue method */
240 if (Config->SingleInstance == true || QueueMode == QueueAccess)
241 return U.Access;
242
243 return U.Access + ':' + U.Host;
244 }
245 /*}}}*/
246 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
247 // ---------------------------------------------------------------------
248 /* This locates the configuration structure for an access method. If
249 a config structure cannot be found a Worker will be created to
250 retrieve it */
251 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
252 {
253 // Search for an existing config
254 MethodConfig *Conf;
255 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
256 if (Conf->Access == Access)
257 return Conf;
258
259 // Create the new config class
260 Conf = new MethodConfig;
261 Conf->Access = Access;
262 Conf->Next = Configs;
263 Configs = Conf;
264
265 // Create the worker to fetch the configuration
266 Worker Work(Conf);
267 if (Work.Start() == false)
268 return 0;
269
270 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
271 if(_config->FindI("Acquire::"+Access+"::DlLimit",0) > 0)
272 Conf->SingleInstance = true;
273
274 return Conf;
275 }
276 /*}}}*/
277 // Acquire::SetFds - Deal with readable FDs /*{{{*/
278 // ---------------------------------------------------------------------
279 /* Collect FDs that have activity monitors into the fd sets */
280 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
281 {
282 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
283 {
284 if (I->InReady == true && I->InFd >= 0)
285 {
286 if (Fd < I->InFd)
287 Fd = I->InFd;
288 FD_SET(I->InFd,RSet);
289 }
290 if (I->OutReady == true && I->OutFd >= 0)
291 {
292 if (Fd < I->OutFd)
293 Fd = I->OutFd;
294 FD_SET(I->OutFd,WSet);
295 }
296 }
297 }
298 /*}}}*/
299 // Acquire::RunFds - Deal with active FDs /*{{{*/
300 // ---------------------------------------------------------------------
301 /* Dispatch active FDs over to the proper workers. It is very important
302 that a worker never be erased while this is running! The queue class
303 should never erase a worker except during shutdown processing. */
304 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
305 {
306 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
307 {
308 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
309 I->InFdReady();
310 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
311 I->OutFdReady();
312 }
313 }
314 /*}}}*/
315 // Acquire::Run - Run the fetch sequence /*{{{*/
316 // ---------------------------------------------------------------------
317 /* This runs the queues. It manages a select loop for all of the
318 Worker tasks. The workers interact with the queues and items to
319 manage the actual fetch. */
320 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
321 {
322 Running = true;
323
324 for (Queue *I = Queues; I != 0; I = I->Next)
325 I->Startup();
326
327 if (Log != 0)
328 Log->Start();
329
330 bool WasCancelled = false;
331
332 // Run till all things have been acquired
333 struct timeval tv;
334 tv.tv_sec = 0;
335 tv.tv_usec = PulseIntervall;
336 while (ToFetch > 0)
337 {
338 fd_set RFds;
339 fd_set WFds;
340 int Highest = 0;
341 FD_ZERO(&RFds);
342 FD_ZERO(&WFds);
343 SetFds(Highest,&RFds,&WFds);
344
345 int Res;
346 do
347 {
348 Res = select(Highest+1,&RFds,&WFds,0,&tv);
349 }
350 while (Res < 0 && errno == EINTR);
351
352 if (Res < 0)
353 {
354 _error->Errno("select","Select has failed");
355 break;
356 }
357
358 RunFds(&RFds,&WFds);
359 if (_error->PendingError() == true)
360 break;
361
362 // Timeout, notify the log class
363 if (Res == 0 || (Log != 0 && Log->Update == true))
364 {
365 tv.tv_usec = PulseIntervall;
366 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
367 I->Pulse();
368 if (Log != 0 && Log->Pulse(this) == false)
369 {
370 WasCancelled = true;
371 break;
372 }
373 }
374 }
375
376 if (Log != 0)
377 Log->Stop();
378
379 // Shut down the acquire bits
380 Running = false;
381 for (Queue *I = Queues; I != 0; I = I->Next)
382 I->Shutdown(false);
383
384 // Shut down the items
385 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
386 (*I)->Finished();
387
388 if (_error->PendingError())
389 return Failed;
390 if (WasCancelled)
391 return Cancelled;
392 return Continue;
393 }
394 /*}}}*/
395 // Acquire::Bump - Called when an item is dequeued /*{{{*/
396 // ---------------------------------------------------------------------
397 /* This routine bumps idle queues in hopes that they will be able to fetch
398 the dequeued item */
399 void pkgAcquire::Bump()
400 {
401 for (Queue *I = Queues; I != 0; I = I->Next)
402 I->Bump();
403 }
404 /*}}}*/
405 // Acquire::WorkerStep - Step to the next worker /*{{{*/
406 // ---------------------------------------------------------------------
407 /* Not inlined to advoid including acquire-worker.h */
408 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
409 {
410 return I->NextAcquire;
411 };
412 /*}}}*/
413 // Acquire::Clean - Cleans a directory /*{{{*/
414 // ---------------------------------------------------------------------
415 /* This is a bit simplistic, it looks at every file in the dir and sees
416 if it is part of the download set. */
417 bool pkgAcquire::Clean(string Dir)
418 {
419 DIR *D = opendir(Dir.c_str());
420 if (D == 0)
421 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
422
423 string StartDir = SafeGetCWD();
424 if (chdir(Dir.c_str()) != 0)
425 {
426 closedir(D);
427 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
428 }
429
430 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
431 {
432 // Skip some files..
433 if (strcmp(Dir->d_name,"lock") == 0 ||
434 strcmp(Dir->d_name,"partial") == 0 ||
435 strcmp(Dir->d_name,".") == 0 ||
436 strcmp(Dir->d_name,"..") == 0)
437 continue;
438
439 // Look in the get list
440 ItemCIterator I = Items.begin();
441 for (; I != Items.end(); I++)
442 if (flNotDir((*I)->DestFile) == Dir->d_name)
443 break;
444
445 // Nothing found, nuke it
446 if (I == Items.end())
447 unlink(Dir->d_name);
448 };
449
450 chdir(StartDir.c_str());
451 closedir(D);
452 return true;
453 }
454 /*}}}*/
455 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
456 // ---------------------------------------------------------------------
457 /* This is the total number of bytes needed */
458 double pkgAcquire::TotalNeeded()
459 {
460 double Total = 0;
461 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
462 Total += (*I)->FileSize;
463 return Total;
464 }
465 /*}}}*/
466 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
467 // ---------------------------------------------------------------------
468 /* This is the number of bytes that is not local */
469 double pkgAcquire::FetchNeeded()
470 {
471 double Total = 0;
472 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
473 if ((*I)->Local == false)
474 Total += (*I)->FileSize;
475 return Total;
476 }
477 /*}}}*/
478 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
479 // ---------------------------------------------------------------------
480 /* This is the number of bytes that is not local */
481 double pkgAcquire::PartialPresent()
482 {
483 double Total = 0;
484 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
485 if ((*I)->Local == false)
486 Total += (*I)->PartialSize;
487 return Total;
488 }
489
490 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
491 // ---------------------------------------------------------------------
492 /* */
493 pkgAcquire::UriIterator pkgAcquire::UriBegin()
494 {
495 return UriIterator(Queues);
496 }
497 /*}}}*/
498 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
499 // ---------------------------------------------------------------------
500 /* */
501 pkgAcquire::UriIterator pkgAcquire::UriEnd()
502 {
503 return UriIterator(0);
504 }
505 /*}}}*/
506
507 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
508 // ---------------------------------------------------------------------
509 /* */
510 pkgAcquire::MethodConfig::MethodConfig()
511 {
512 SingleInstance = false;
513 Pipeline = false;
514 SendConfig = false;
515 LocalOnly = false;
516 Removable = false;
517 Next = 0;
518 }
519 /*}}}*/
520
521 // Queue::Queue - Constructor /*{{{*/
522 // ---------------------------------------------------------------------
523 /* */
524 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
525 Owner(Owner)
526 {
527 Items = 0;
528 Next = 0;
529 Workers = 0;
530 MaxPipeDepth = 1;
531 PipeDepth = 0;
532 }
533 /*}}}*/
534 // Queue::~Queue - Destructor /*{{{*/
535 // ---------------------------------------------------------------------
536 /* */
537 pkgAcquire::Queue::~Queue()
538 {
539 Shutdown(true);
540
541 while (Items != 0)
542 {
543 QItem *Jnk = Items;
544 Items = Items->Next;
545 delete Jnk;
546 }
547 }
548 /*}}}*/
549 // Queue::Enqueue - Queue an item to the queue /*{{{*/
550 // ---------------------------------------------------------------------
551 /* */
552 void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
553 {
554 QItem **I = &Items;
555 for (; *I != 0; I = &(*I)->Next);
556
557 // Create a new item
558 QItem *Itm = new QItem;
559 *Itm = Item;
560 Itm->Next = 0;
561 *I = Itm;
562
563 Item.Owner->QueueCounter++;
564 if (Items->Next == 0)
565 Cycle();
566 }
567 /*}}}*/
568 // Queue::Dequeue - Remove an item from the queue /*{{{*/
569 // ---------------------------------------------------------------------
570 /* We return true if we hit something */
571 bool pkgAcquire::Queue::Dequeue(Item *Owner)
572 {
573 if (Owner->Status == pkgAcquire::Item::StatFetching)
574 return _error->Error("Tried to dequeue a fetching object");
575
576 bool Res = false;
577
578 QItem **I = &Items;
579 for (; *I != 0;)
580 {
581 if ((*I)->Owner == Owner)
582 {
583 QItem *Jnk= *I;
584 *I = (*I)->Next;
585 Owner->QueueCounter--;
586 delete Jnk;
587 Res = true;
588 }
589 else
590 I = &(*I)->Next;
591 }
592
593 return Res;
594 }
595 /*}}}*/
596 // Queue::Startup - Start the worker processes /*{{{*/
597 // ---------------------------------------------------------------------
598 /* It is possible for this to be called with a pre-existing set of
599 workers. */
600 bool pkgAcquire::Queue::Startup()
601 {
602 if (Workers == 0)
603 {
604 URI U(Name);
605 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
606 if (Cnf == 0)
607 return false;
608
609 Workers = new Worker(this,Cnf,Owner->Log);
610 Owner->Add(Workers);
611 if (Workers->Start() == false)
612 return false;
613
614 /* When pipelining we commit 10 items. This needs to change when we
615 added other source retry to have cycle maintain a pipeline depth
616 on its own. */
617 if (Cnf->Pipeline == true)
618 MaxPipeDepth = 10;
619 else
620 MaxPipeDepth = 1;
621 }
622
623 return Cycle();
624 }
625 /*}}}*/
626 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
627 // ---------------------------------------------------------------------
628 /* If final is true then all workers are eliminated, otherwise only workers
629 that do not need cleanup are removed */
630 bool pkgAcquire::Queue::Shutdown(bool Final)
631 {
632 // Delete all of the workers
633 pkgAcquire::Worker **Cur = &Workers;
634 while (*Cur != 0)
635 {
636 pkgAcquire::Worker *Jnk = *Cur;
637 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
638 {
639 *Cur = Jnk->NextQueue;
640 Owner->Remove(Jnk);
641 delete Jnk;
642 }
643 else
644 Cur = &(*Cur)->NextQueue;
645 }
646
647 return true;
648 }
649 /*}}}*/
650 // Queue::FindItem - Find a URI in the item list /*{{{*/
651 // ---------------------------------------------------------------------
652 /* */
653 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
654 {
655 for (QItem *I = Items; I != 0; I = I->Next)
656 if (I->URI == URI && I->Worker == Owner)
657 return I;
658 return 0;
659 }
660 /*}}}*/
661 // Queue::ItemDone - Item has been completed /*{{{*/
662 // ---------------------------------------------------------------------
663 /* The worker signals this which causes the item to be removed from the
664 queue. If this is the last queue instance then it is removed from the
665 main queue too.*/
666 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
667 {
668 PipeDepth--;
669 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
670 Itm->Owner->Status = pkgAcquire::Item::StatDone;
671
672 if (Itm->Owner->QueueCounter <= 1)
673 Owner->Dequeue(Itm->Owner);
674 else
675 {
676 Dequeue(Itm->Owner);
677 Owner->Bump();
678 }
679
680 return Cycle();
681 }
682 /*}}}*/
683 // Queue::Cycle - Queue new items into the method /*{{{*/
684 // ---------------------------------------------------------------------
685 /* This locates a new idle item and sends it to the worker. If pipelining
686 is enabled then it keeps the pipe full. */
687 bool pkgAcquire::Queue::Cycle()
688 {
689 if (Items == 0 || Workers == 0)
690 return true;
691
692 if (PipeDepth < 0)
693 return _error->Error("Pipedepth failure");
694
695 // Look for a queable item
696 QItem *I = Items;
697 while (PipeDepth < (signed)MaxPipeDepth)
698 {
699 for (; I != 0; I = I->Next)
700 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
701 break;
702
703 // Nothing to do, queue is idle.
704 if (I == 0)
705 return true;
706
707 I->Worker = Workers;
708 I->Owner->Status = pkgAcquire::Item::StatFetching;
709 PipeDepth++;
710 if (Workers->QueueItem(I) == false)
711 return false;
712 }
713
714 return true;
715 }
716 /*}}}*/
717 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
718 // ---------------------------------------------------------------------
719 /* This is called when an item in multiple queues is dequeued */
720 void pkgAcquire::Queue::Bump()
721 {
722 Cycle();
723 }
724 /*}}}*/
725
726 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
727 // ---------------------------------------------------------------------
728 /* */
729 pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
730 {
731 Start();
732 }
733 /*}}}*/
734 // AcquireStatus::Pulse - Called periodically /*{{{*/
735 // ---------------------------------------------------------------------
736 /* This computes some internal state variables for the derived classes to
737 use. It generates the current downloaded bytes and total bytes to download
738 as well as the current CPS estimate. */
739 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
740 {
741 TotalBytes = 0;
742 CurrentBytes = 0;
743 TotalItems = 0;
744 CurrentItems = 0;
745
746 // Compute the total number of bytes to fetch
747 unsigned int Unknown = 0;
748 unsigned int Count = 0;
749 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
750 I++, Count++)
751 {
752 TotalItems++;
753 if ((*I)->Status == pkgAcquire::Item::StatDone)
754 CurrentItems++;
755
756 // Totally ignore local items
757 if ((*I)->Local == true)
758 continue;
759
760 TotalBytes += (*I)->FileSize;
761 if ((*I)->Complete == true)
762 CurrentBytes += (*I)->FileSize;
763 if ((*I)->FileSize == 0 && (*I)->Complete == false)
764 Unknown++;
765 }
766
767 // Compute the current completion
768 unsigned long ResumeSize = 0;
769 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
770 I = Owner->WorkerStep(I))
771 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
772 {
773 CurrentBytes += I->CurrentSize;
774 ResumeSize += I->ResumePoint;
775
776 // Files with unknown size always have 100% completion
777 if (I->CurrentItem->Owner->FileSize == 0 &&
778 I->CurrentItem->Owner->Complete == false)
779 TotalBytes += I->CurrentSize;
780 }
781
782 // Normalize the figures and account for unknown size downloads
783 if (TotalBytes <= 0)
784 TotalBytes = 1;
785 if (Unknown == Count)
786 TotalBytes = Unknown;
787
788 // Wha?! Is not supposed to happen.
789 if (CurrentBytes > TotalBytes)
790 CurrentBytes = TotalBytes;
791
792 // Compute the CPS
793 struct timeval NewTime;
794 gettimeofday(&NewTime,0);
795 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
796 NewTime.tv_sec - Time.tv_sec > 6)
797 {
798 double Delta = NewTime.tv_sec - Time.tv_sec +
799 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
800
801 // Compute the CPS value
802 if (Delta < 0.01)
803 CurrentCPS = 0;
804 else
805 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
806 LastBytes = CurrentBytes - ResumeSize;
807 ElapsedTime = (unsigned long)Delta;
808 Time = NewTime;
809 }
810
811 int fd = _config->FindI("APT::Status-Fd",-1);
812 if(fd > 0)
813 {
814 ostringstream status;
815
816 char msg[200];
817 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
818 unsigned long ETA =
819 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
820
821 // only show the ETA if it makes sense
822 if (ETA > 0 && ETA < 172800 /* two days */ )
823 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
824 else
825 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
826
827
828
829 // build the status str
830 status << "dlstatus:" << i
831 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
832 << ":" << msg
833 << endl;
834 write(fd, status.str().c_str(), status.str().size());
835 }
836
837 return true;
838 }
839 /*}}}*/
840 // AcquireStatus::Start - Called when the download is started /*{{{*/
841 // ---------------------------------------------------------------------
842 /* We just reset the counters */
843 void pkgAcquireStatus::Start()
844 {
845 gettimeofday(&Time,0);
846 gettimeofday(&StartTime,0);
847 LastBytes = 0;
848 CurrentCPS = 0;
849 CurrentBytes = 0;
850 TotalBytes = 0;
851 FetchedBytes = 0;
852 ElapsedTime = 0;
853 TotalItems = 0;
854 CurrentItems = 0;
855 }
856 /*}}}*/
857 // AcquireStatus::Stop - Finished downloading /*{{{*/
858 // ---------------------------------------------------------------------
859 /* This accurately computes the elapsed time and the total overall CPS. */
860 void pkgAcquireStatus::Stop()
861 {
862 // Compute the CPS and elapsed time
863 struct timeval NewTime;
864 gettimeofday(&NewTime,0);
865
866 double Delta = NewTime.tv_sec - StartTime.tv_sec +
867 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
868
869 // Compute the CPS value
870 if (Delta < 0.01)
871 CurrentCPS = 0;
872 else
873 CurrentCPS = FetchedBytes/Delta;
874 LastBytes = CurrentBytes;
875 ElapsedTime = (unsigned int)Delta;
876 }
877 /*}}}*/
878 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
879 // ---------------------------------------------------------------------
880 /* This is used to get accurate final transfer rate reporting. */
881 void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
882 {
883 FetchedBytes += Size - Resume;
884 }
885 /*}}}*/