]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
* cmdline/apt-get.cc:
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #ifdef __GNUG__
17 #pragma implementation "apt-pkg/acquire.h"
18 #endif
19 #include <apt-pkg/acquire.h>
20 #include <apt-pkg/acquire-item.h>
21 #include <apt-pkg/acquire-worker.h>
22 #include <apt-pkg/configuration.h>
23 #include <apt-pkg/error.h>
24 #include <apt-pkg/strutl.h>
25
26 #include <apti18n.h>
27
28 #include <iostream>
29 #include <sstream>
30
31 #include <dirent.h>
32 #include <sys/time.h>
33 #include <errno.h>
34 #include <sys/stat.h>
35 /*}}}*/
36
37 using namespace std;
38
39 // Acquire::pkgAcquire - Constructor /*{{{*/
40 // ---------------------------------------------------------------------
41 /* We grab some runtime state from the configuration space */
42 pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
43 {
44 Queues = 0;
45 Configs = 0;
46 Workers = 0;
47 ToFetch = 0;
48 Running = false;
49
50 string Mode = _config->Find("Acquire::Queue-Mode","host");
51 if (strcasecmp(Mode.c_str(),"host") == 0)
52 QueueMode = QueueHost;
53 if (strcasecmp(Mode.c_str(),"access") == 0)
54 QueueMode = QueueAccess;
55
56 Debug = _config->FindB("Debug::pkgAcquire",false);
57
58 // This is really a stupid place for this
59 struct stat St;
60 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
61 S_ISDIR(St.st_mode) == 0)
62 _error->Error(_("Lists directory %spartial is missing."),
63 _config->FindDir("Dir::State::lists").c_str());
64 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
65 S_ISDIR(St.st_mode) == 0)
66 _error->Error(_("Archive directory %spartial is missing."),
67 _config->FindDir("Dir::Cache::Archives").c_str());
68 }
69 /*}}}*/
70 // Acquire::~pkgAcquire - Destructor /*{{{*/
71 // ---------------------------------------------------------------------
72 /* Free our memory, clean up the queues (destroy the workers) */
73 pkgAcquire::~pkgAcquire()
74 {
75 Shutdown();
76
77 while (Configs != 0)
78 {
79 MethodConfig *Jnk = Configs;
80 Configs = Configs->Next;
81 delete Jnk;
82 }
83 }
84 /*}}}*/
85 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
86 // ---------------------------------------------------------------------
87 /* */
88 void pkgAcquire::Shutdown()
89 {
90 while (Items.size() != 0)
91 {
92 if (Items[0]->Status == Item::StatFetching)
93 Items[0]->Status = Item::StatError;
94 delete Items[0];
95 }
96
97 while (Queues != 0)
98 {
99 Queue *Jnk = Queues;
100 Queues = Queues->Next;
101 delete Jnk;
102 }
103 }
104 /*}}}*/
105 // Acquire::Add - Add a new item /*{{{*/
106 // ---------------------------------------------------------------------
107 /* This puts an item on the acquire list. This list is mainly for tracking
108 item status */
109 void pkgAcquire::Add(Item *Itm)
110 {
111 Items.push_back(Itm);
112 }
113 /*}}}*/
114 // Acquire::Remove - Remove a item /*{{{*/
115 // ---------------------------------------------------------------------
116 /* Remove an item from the acquire list. This is usually not used.. */
117 void pkgAcquire::Remove(Item *Itm)
118 {
119 Dequeue(Itm);
120
121 for (ItemIterator I = Items.begin(); I != Items.end();)
122 {
123 if (*I == Itm)
124 {
125 Items.erase(I);
126 I = Items.begin();
127 }
128 else
129 I++;
130 }
131 }
132 /*}}}*/
133 // Acquire::Add - Add a worker /*{{{*/
134 // ---------------------------------------------------------------------
135 /* A list of workers is kept so that the select loop can direct their FD
136 usage. */
137 void pkgAcquire::Add(Worker *Work)
138 {
139 Work->NextAcquire = Workers;
140 Workers = Work;
141 }
142 /*}}}*/
143 // Acquire::Remove - Remove a worker /*{{{*/
144 // ---------------------------------------------------------------------
145 /* A worker has died. This can not be done while the select loop is running
146 as it would require that RunFds could handling a changing list state and
147 it cant.. */
148 void pkgAcquire::Remove(Worker *Work)
149 {
150 if (Running == true)
151 abort();
152
153 Worker **I = &Workers;
154 for (; *I != 0;)
155 {
156 if (*I == Work)
157 *I = (*I)->NextAcquire;
158 else
159 I = &(*I)->NextAcquire;
160 }
161 }
162 /*}}}*/
163 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
164 // ---------------------------------------------------------------------
165 /* This is the entry point for an item. An item calls this function when
166 it is constructed which creates a queue (based on the current queue
167 mode) and puts the item in that queue. If the system is running then
168 the queue might be started. */
169 void pkgAcquire::Enqueue(ItemDesc &Item)
170 {
171 // Determine which queue to put the item in
172 const MethodConfig *Config;
173 string Name = QueueName(Item.URI,Config);
174 if (Name.empty() == true)
175 return;
176
177 // Find the queue structure
178 Queue *I = Queues;
179 for (; I != 0 && I->Name != Name; I = I->Next);
180 if (I == 0)
181 {
182 I = new Queue(Name,this);
183 I->Next = Queues;
184 Queues = I;
185
186 if (Running == true)
187 I->Startup();
188 }
189
190 // See if this is a local only URI
191 if (Config->LocalOnly == true && Item.Owner->Complete == false)
192 Item.Owner->Local = true;
193 Item.Owner->Status = Item::StatIdle;
194
195 // Queue it into the named queue
196 if(I->Enqueue(Item))
197 ToFetch++;
198
199 // Some trace stuff
200 if (Debug == true)
201 {
202 clog << "Fetching " << Item.URI << endl;
203 clog << " to " << Item.Owner->DestFile << endl;
204 clog << " Queue is: " << Name << endl;
205 }
206 }
207 /*}}}*/
208 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
209 // ---------------------------------------------------------------------
210 /* This is called when an item is finished being fetched. It removes it
211 from all the queues */
212 void pkgAcquire::Dequeue(Item *Itm)
213 {
214 Queue *I = Queues;
215 bool Res = false;
216 for (; I != 0; I = I->Next)
217 Res |= I->Dequeue(Itm);
218
219 if (Debug == true)
220 clog << "Dequeuing " << Itm->DestFile << endl;
221 if (Res == true)
222 ToFetch--;
223 }
224 /*}}}*/
225 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
226 // ---------------------------------------------------------------------
227 /* The string returned depends on the configuration settings and the
228 method parameters. Given something like http://foo.org/bar it can
229 return http://foo.org or http */
230 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
231 {
232 URI U(Uri);
233
234 Config = GetConfig(U.Access);
235 if (Config == 0)
236 return string();
237
238 /* Single-Instance methods get exactly one queue per URI. This is
239 also used for the Access queue method */
240 if (Config->SingleInstance == true || QueueMode == QueueAccess)
241 return U.Access;
242
243 return U.Access + ':' + U.Host;
244 }
245 /*}}}*/
246 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
247 // ---------------------------------------------------------------------
248 /* This locates the configuration structure for an access method. If
249 a config structure cannot be found a Worker will be created to
250 retrieve it */
251 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
252 {
253 // Search for an existing config
254 MethodConfig *Conf;
255 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
256 if (Conf->Access == Access)
257 return Conf;
258
259 // Create the new config class
260 Conf = new MethodConfig;
261 Conf->Access = Access;
262 Conf->Next = Configs;
263 Configs = Conf;
264
265 // Create the worker to fetch the configuration
266 Worker Work(Conf);
267 if (Work.Start() == false)
268 return 0;
269
270 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
271 if(_config->FindI("Acquire::"+Access+"::DlLimit",0) > 0)
272 Conf->SingleInstance = true;
273
274 return Conf;
275 }
276 /*}}}*/
277 // Acquire::SetFds - Deal with readable FDs /*{{{*/
278 // ---------------------------------------------------------------------
279 /* Collect FDs that have activity monitors into the fd sets */
280 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
281 {
282 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
283 {
284 if (I->InReady == true && I->InFd >= 0)
285 {
286 if (Fd < I->InFd)
287 Fd = I->InFd;
288 FD_SET(I->InFd,RSet);
289 }
290 if (I->OutReady == true && I->OutFd >= 0)
291 {
292 if (Fd < I->OutFd)
293 Fd = I->OutFd;
294 FD_SET(I->OutFd,WSet);
295 }
296 }
297 }
298 /*}}}*/
299 // Acquire::RunFds - Deal with active FDs /*{{{*/
300 // ---------------------------------------------------------------------
301 /* Dispatch active FDs over to the proper workers. It is very important
302 that a worker never be erased while this is running! The queue class
303 should never erase a worker except during shutdown processing. */
304 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
305 {
306 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
307 {
308 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
309 I->InFdReady();
310 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
311 I->OutFdReady();
312 }
313 }
314 /*}}}*/
315 // Acquire::Run - Run the fetch sequence /*{{{*/
316 // ---------------------------------------------------------------------
317 /* This runs the queues. It manages a select loop for all of the
318 Worker tasks. The workers interact with the queues and items to
319 manage the actual fetch. */
320 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
321 {
322 Running = true;
323
324 for (Queue *I = Queues; I != 0; I = I->Next)
325 I->Startup();
326
327 if (Log != 0)
328 Log->Start();
329
330 bool WasCancelled = false;
331
332 // Run till all things have been acquired
333 struct timeval tv;
334 tv.tv_sec = 0;
335 tv.tv_usec = PulseIntervall;
336 while (ToFetch > 0)
337 {
338 fd_set RFds;
339 fd_set WFds;
340 int Highest = 0;
341 FD_ZERO(&RFds);
342 FD_ZERO(&WFds);
343 SetFds(Highest,&RFds,&WFds);
344
345 int Res;
346 do
347 {
348 Res = select(Highest+1,&RFds,&WFds,0,&tv);
349 }
350 while (Res < 0 && errno == EINTR);
351
352 if (Res < 0)
353 {
354 _error->Errno("select","Select has failed");
355 break;
356 }
357
358 RunFds(&RFds,&WFds);
359 if (_error->PendingError() == true)
360 break;
361
362 // Timeout, notify the log class
363 if (Res == 0 || (Log != 0 && Log->Update == true))
364 {
365 tv.tv_usec = PulseIntervall;
366 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
367 I->Pulse();
368 if (Log != 0 && Log->Pulse(this) == false)
369 {
370 WasCancelled = true;
371 break;
372 }
373 }
374 }
375
376 if (Log != 0)
377 Log->Stop();
378
379 // Shut down the acquire bits
380 Running = false;
381 for (Queue *I = Queues; I != 0; I = I->Next)
382 I->Shutdown(false);
383
384 // Shut down the items
385 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
386 (*I)->Finished();
387
388 if (_error->PendingError())
389 return Failed;
390 if (WasCancelled)
391 return Cancelled;
392 return Continue;
393 }
394 /*}}}*/
395 // Acquire::Bump - Called when an item is dequeued /*{{{*/
396 // ---------------------------------------------------------------------
397 /* This routine bumps idle queues in hopes that they will be able to fetch
398 the dequeued item */
399 void pkgAcquire::Bump()
400 {
401 for (Queue *I = Queues; I != 0; I = I->Next)
402 I->Bump();
403 }
404 /*}}}*/
405 // Acquire::WorkerStep - Step to the next worker /*{{{*/
406 // ---------------------------------------------------------------------
407 /* Not inlined to advoid including acquire-worker.h */
408 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
409 {
410 return I->NextAcquire;
411 };
412 /*}}}*/
413 // Acquire::Clean - Cleans a directory /*{{{*/
414 // ---------------------------------------------------------------------
415 /* This is a bit simplistic, it looks at every file in the dir and sees
416 if it is part of the download set. */
417 bool pkgAcquire::Clean(string Dir)
418 {
419 DIR *D = opendir(Dir.c_str());
420 if (D == 0)
421 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
422
423 string StartDir = SafeGetCWD();
424 if (chdir(Dir.c_str()) != 0)
425 {
426 closedir(D);
427 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
428 }
429
430 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
431 {
432 // Skip some files..
433 if (strcmp(Dir->d_name,"lock") == 0 ||
434 strcmp(Dir->d_name,"partial") == 0 ||
435 strcmp(Dir->d_name,".") == 0 ||
436 strcmp(Dir->d_name,"..") == 0)
437 continue;
438
439 // Look in the get list
440 ItemCIterator I = Items.begin();
441 for (; I != Items.end(); I++)
442 if (flNotDir((*I)->DestFile) == Dir->d_name)
443 break;
444
445 // Nothing found, nuke it
446 if (I == Items.end())
447 unlink(Dir->d_name);
448 };
449
450 chdir(StartDir.c_str());
451 closedir(D);
452 return true;
453 }
454 /*}}}*/
455 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
456 // ---------------------------------------------------------------------
457 /* This is the total number of bytes needed */
458 double pkgAcquire::TotalNeeded()
459 {
460 double Total = 0;
461 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
462 Total += (*I)->FileSize;
463 return Total;
464 }
465 /*}}}*/
466 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
467 // ---------------------------------------------------------------------
468 /* This is the number of bytes that is not local */
469 double pkgAcquire::FetchNeeded()
470 {
471 double Total = 0;
472 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
473 if ((*I)->Local == false)
474 Total += (*I)->FileSize;
475 return Total;
476 }
477 /*}}}*/
478 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
479 // ---------------------------------------------------------------------
480 /* This is the number of bytes that is not local */
481 double pkgAcquire::PartialPresent()
482 {
483 double Total = 0;
484 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
485 if ((*I)->Local == false)
486 Total += (*I)->PartialSize;
487 return Total;
488 }
489
490 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
491 // ---------------------------------------------------------------------
492 /* */
493 pkgAcquire::UriIterator pkgAcquire::UriBegin()
494 {
495 return UriIterator(Queues);
496 }
497 /*}}}*/
498 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
499 // ---------------------------------------------------------------------
500 /* */
501 pkgAcquire::UriIterator pkgAcquire::UriEnd()
502 {
503 return UriIterator(0);
504 }
505 /*}}}*/
506
507 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
508 // ---------------------------------------------------------------------
509 /* */
510 pkgAcquire::MethodConfig::MethodConfig()
511 {
512 SingleInstance = false;
513 Pipeline = false;
514 SendConfig = false;
515 LocalOnly = false;
516 Removable = false;
517 Next = 0;
518 }
519 /*}}}*/
520
521 // Queue::Queue - Constructor /*{{{*/
522 // ---------------------------------------------------------------------
523 /* */
524 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
525 Owner(Owner)
526 {
527 Items = 0;
528 Next = 0;
529 Workers = 0;
530 MaxPipeDepth = 1;
531 PipeDepth = 0;
532 }
533 /*}}}*/
534 // Queue::~Queue - Destructor /*{{{*/
535 // ---------------------------------------------------------------------
536 /* */
537 pkgAcquire::Queue::~Queue()
538 {
539 Shutdown(true);
540
541 while (Items != 0)
542 {
543 QItem *Jnk = Items;
544 Items = Items->Next;
545 delete Jnk;
546 }
547 }
548 /*}}}*/
549 // Queue::Enqueue - Queue an item to the queue /*{{{*/
550 // ---------------------------------------------------------------------
551 /* */
552 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
553 {
554 QItem **I = &Items;
555 // move to the end of the queue and check for duplicates here
556 for (; *I != 0; I = &(*I)->Next)
557 if (Item.URI == (*I)->URI)
558 {
559 Item.Owner->Status = Item::StatDone;
560 return false;
561 }
562
563 // Create a new item
564 QItem *Itm = new QItem;
565 *Itm = Item;
566 Itm->Next = 0;
567 *I = Itm;
568
569 Item.Owner->QueueCounter++;
570 if (Items->Next == 0)
571 Cycle();
572 return true;
573 }
574 /*}}}*/
575 // Queue::Dequeue - Remove an item from the queue /*{{{*/
576 // ---------------------------------------------------------------------
577 /* We return true if we hit something */
578 bool pkgAcquire::Queue::Dequeue(Item *Owner)
579 {
580 if (Owner->Status == pkgAcquire::Item::StatFetching)
581 return _error->Error("Tried to dequeue a fetching object");
582
583 bool Res = false;
584
585 QItem **I = &Items;
586 for (; *I != 0;)
587 {
588 if ((*I)->Owner == Owner)
589 {
590 QItem *Jnk= *I;
591 *I = (*I)->Next;
592 Owner->QueueCounter--;
593 delete Jnk;
594 Res = true;
595 }
596 else
597 I = &(*I)->Next;
598 }
599
600 return Res;
601 }
602 /*}}}*/
603 // Queue::Startup - Start the worker processes /*{{{*/
604 // ---------------------------------------------------------------------
605 /* It is possible for this to be called with a pre-existing set of
606 workers. */
607 bool pkgAcquire::Queue::Startup()
608 {
609 if (Workers == 0)
610 {
611 URI U(Name);
612 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
613 if (Cnf == 0)
614 return false;
615
616 Workers = new Worker(this,Cnf,Owner->Log);
617 Owner->Add(Workers);
618 if (Workers->Start() == false)
619 return false;
620
621 /* When pipelining we commit 10 items. This needs to change when we
622 added other source retry to have cycle maintain a pipeline depth
623 on its own. */
624 if (Cnf->Pipeline == true)
625 MaxPipeDepth = 10;
626 else
627 MaxPipeDepth = 1;
628 }
629
630 return Cycle();
631 }
632 /*}}}*/
633 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
634 // ---------------------------------------------------------------------
635 /* If final is true then all workers are eliminated, otherwise only workers
636 that do not need cleanup are removed */
637 bool pkgAcquire::Queue::Shutdown(bool Final)
638 {
639 // Delete all of the workers
640 pkgAcquire::Worker **Cur = &Workers;
641 while (*Cur != 0)
642 {
643 pkgAcquire::Worker *Jnk = *Cur;
644 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
645 {
646 *Cur = Jnk->NextQueue;
647 Owner->Remove(Jnk);
648 delete Jnk;
649 }
650 else
651 Cur = &(*Cur)->NextQueue;
652 }
653
654 return true;
655 }
656 /*}}}*/
657 // Queue::FindItem - Find a URI in the item list /*{{{*/
658 // ---------------------------------------------------------------------
659 /* */
660 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
661 {
662 for (QItem *I = Items; I != 0; I = I->Next)
663 if (I->URI == URI && I->Worker == Owner)
664 return I;
665 return 0;
666 }
667 /*}}}*/
668 // Queue::ItemDone - Item has been completed /*{{{*/
669 // ---------------------------------------------------------------------
670 /* The worker signals this which causes the item to be removed from the
671 queue. If this is the last queue instance then it is removed from the
672 main queue too.*/
673 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
674 {
675 PipeDepth--;
676 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
677 Itm->Owner->Status = pkgAcquire::Item::StatDone;
678
679 if (Itm->Owner->QueueCounter <= 1)
680 Owner->Dequeue(Itm->Owner);
681 else
682 {
683 Dequeue(Itm->Owner);
684 Owner->Bump();
685 }
686
687 return Cycle();
688 }
689 /*}}}*/
690 // Queue::Cycle - Queue new items into the method /*{{{*/
691 // ---------------------------------------------------------------------
692 /* This locates a new idle item and sends it to the worker. If pipelining
693 is enabled then it keeps the pipe full. */
694 bool pkgAcquire::Queue::Cycle()
695 {
696 if (Items == 0 || Workers == 0)
697 return true;
698
699 if (PipeDepth < 0)
700 return _error->Error("Pipedepth failure");
701
702 // Look for a queable item
703 QItem *I = Items;
704 while (PipeDepth < (signed)MaxPipeDepth)
705 {
706 for (; I != 0; I = I->Next)
707 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
708 break;
709
710 // Nothing to do, queue is idle.
711 if (I == 0)
712 return true;
713
714 I->Worker = Workers;
715 I->Owner->Status = pkgAcquire::Item::StatFetching;
716 PipeDepth++;
717 if (Workers->QueueItem(I) == false)
718 return false;
719 }
720
721 return true;
722 }
723 /*}}}*/
724 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
725 // ---------------------------------------------------------------------
726 /* This is called when an item in multiple queues is dequeued */
727 void pkgAcquire::Queue::Bump()
728 {
729 Cycle();
730 }
731 /*}}}*/
732
733 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
734 // ---------------------------------------------------------------------
735 /* */
736 pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
737 {
738 Start();
739 }
740 /*}}}*/
741 // AcquireStatus::Pulse - Called periodically /*{{{*/
742 // ---------------------------------------------------------------------
743 /* This computes some internal state variables for the derived classes to
744 use. It generates the current downloaded bytes and total bytes to download
745 as well as the current CPS estimate. */
746 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
747 {
748 TotalBytes = 0;
749 CurrentBytes = 0;
750 TotalItems = 0;
751 CurrentItems = 0;
752
753 // Compute the total number of bytes to fetch
754 unsigned int Unknown = 0;
755 unsigned int Count = 0;
756 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
757 I++, Count++)
758 {
759 TotalItems++;
760 if ((*I)->Status == pkgAcquire::Item::StatDone)
761 CurrentItems++;
762
763 // Totally ignore local items
764 if ((*I)->Local == true)
765 continue;
766
767 TotalBytes += (*I)->FileSize;
768 if ((*I)->Complete == true)
769 CurrentBytes += (*I)->FileSize;
770 if ((*I)->FileSize == 0 && (*I)->Complete == false)
771 Unknown++;
772 }
773
774 // Compute the current completion
775 unsigned long ResumeSize = 0;
776 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
777 I = Owner->WorkerStep(I))
778 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
779 {
780 CurrentBytes += I->CurrentSize;
781 ResumeSize += I->ResumePoint;
782
783 // Files with unknown size always have 100% completion
784 if (I->CurrentItem->Owner->FileSize == 0 &&
785 I->CurrentItem->Owner->Complete == false)
786 TotalBytes += I->CurrentSize;
787 }
788
789 // Normalize the figures and account for unknown size downloads
790 if (TotalBytes <= 0)
791 TotalBytes = 1;
792 if (Unknown == Count)
793 TotalBytes = Unknown;
794
795 // Wha?! Is not supposed to happen.
796 if (CurrentBytes > TotalBytes)
797 CurrentBytes = TotalBytes;
798
799 // Compute the CPS
800 struct timeval NewTime;
801 gettimeofday(&NewTime,0);
802 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
803 NewTime.tv_sec - Time.tv_sec > 6)
804 {
805 double Delta = NewTime.tv_sec - Time.tv_sec +
806 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
807
808 // Compute the CPS value
809 if (Delta < 0.01)
810 CurrentCPS = 0;
811 else
812 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
813 LastBytes = CurrentBytes - ResumeSize;
814 ElapsedTime = (unsigned long)Delta;
815 Time = NewTime;
816 }
817
818 int fd = _config->FindI("APT::Status-Fd",-1);
819 if(fd > 0)
820 {
821 ostringstream status;
822
823 char msg[200];
824 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
825 unsigned long ETA =
826 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
827
828 // only show the ETA if it makes sense
829 if (ETA > 0 && ETA < 172800 /* two days */ )
830 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
831 else
832 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
833
834
835
836 // build the status str
837 status << "dlstatus:" << i
838 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
839 << ":" << msg
840 << endl;
841 write(fd, status.str().c_str(), status.str().size());
842 }
843
844 return true;
845 }
846 /*}}}*/
847 // AcquireStatus::Start - Called when the download is started /*{{{*/
848 // ---------------------------------------------------------------------
849 /* We just reset the counters */
850 void pkgAcquireStatus::Start()
851 {
852 gettimeofday(&Time,0);
853 gettimeofday(&StartTime,0);
854 LastBytes = 0;
855 CurrentCPS = 0;
856 CurrentBytes = 0;
857 TotalBytes = 0;
858 FetchedBytes = 0;
859 ElapsedTime = 0;
860 TotalItems = 0;
861 CurrentItems = 0;
862 }
863 /*}}}*/
864 // AcquireStatus::Stop - Finished downloading /*{{{*/
865 // ---------------------------------------------------------------------
866 /* This accurately computes the elapsed time and the total overall CPS. */
867 void pkgAcquireStatus::Stop()
868 {
869 // Compute the CPS and elapsed time
870 struct timeval NewTime;
871 gettimeofday(&NewTime,0);
872
873 double Delta = NewTime.tv_sec - StartTime.tv_sec +
874 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
875
876 // Compute the CPS value
877 if (Delta < 0.01)
878 CurrentCPS = 0;
879 else
880 CurrentCPS = FetchedBytes/Delta;
881 LastBytes = CurrentBytes;
882 ElapsedTime = (unsigned int)Delta;
883 }
884 /*}}}*/
885 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
886 // ---------------------------------------------------------------------
887 /* This is used to get accurate final transfer rate reporting. */
888 void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
889 {
890 FetchedBytes += Size - Resume;
891 }
892 /*}}}*/