]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
d83d80fac155d14b68e25e19ecb246f844731abf
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <apt-pkg/acquire.h>
17 #include <apt-pkg/acquire-item.h>
18 #include <apt-pkg/acquire-worker.h>
19 #include <apt-pkg/configuration.h>
20 #include <apt-pkg/error.h>
21 #include <apt-pkg/strutl.h>
22 #include <apt-pkg/fileutl.h>
23
24 #include <apti18n.h>
25
26 #include <iostream>
27 #include <sstream>
28 #include <stdio.h>
29
30 #include <dirent.h>
31 #include <sys/time.h>
32 #include <errno.h>
33 /*}}}*/
34
35 using namespace std;
36
37 // Acquire::pkgAcquire - Constructor /*{{{*/
38 // ---------------------------------------------------------------------
39 /* We grab some runtime state from the configuration space */
40 pkgAcquire::pkgAcquire() : Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
41 Debug(_config->FindB("Debug::pkgAcquire",false)),
42 Running(false), LockFD(-1)
43 {
44 string const Mode = _config->Find("Acquire::Queue-Mode","host");
45 if (strcasecmp(Mode.c_str(),"host") == 0)
46 QueueMode = QueueHost;
47 if (strcasecmp(Mode.c_str(),"access") == 0)
48 QueueMode = QueueAccess;
49 }
50 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : Queues(0), Workers(0),
51 Configs(0), Log(Progress), ToFetch(0),
52 Debug(_config->FindB("Debug::pkgAcquire",false)),
53 Running(false), LockFD(-1)
54 {
55 string const Mode = _config->Find("Acquire::Queue-Mode","host");
56 if (strcasecmp(Mode.c_str(),"host") == 0)
57 QueueMode = QueueHost;
58 if (strcasecmp(Mode.c_str(),"access") == 0)
59 QueueMode = QueueAccess;
60 Setup(Progress, "");
61 }
62 /*}}}*/
63 // Acquire::Setup - Delayed Constructor /*{{{*/
64 // ---------------------------------------------------------------------
65 /* Do everything needed to be a complete Acquire object and report the
66 success (or failure) back so the user knows that something is wrong… */
67 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
68 {
69 Log = Progress;
70
71 // check for existence and possibly create auxiliary directories
72 if (CheckDirectory(_config->FindDir("Dir::State"), _config->FindDir("Dir::State::lists") + "partial/") == false ||
73 CheckDirectory(_config->FindDir("Dir::Cache"), _config->FindDir("Dir::Cache::Archives") + "partial/") == false)
74 return false;
75
76 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
77 return true;
78
79 // Lock the directory this acquire object will work in
80 LockFD = GetLock(flCombine(Lock, "lock"));
81 if (LockFD == -1)
82 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
83
84 return true;
85 }
86 /*}}}*/
87 // Acquire::CheckDirectory - ensure that the given directory exists /*{{{*/
88 // ---------------------------------------------------------------------
89 /* a small wrapper around CreateDirectory to check if it exists and to
90 remove the trailing "/apt/" from the parent directory if needed */
91 bool pkgAcquire::CheckDirectory(string const &Parent, string const &Path) const
92 {
93 if (DirectoryExists(Path) == true)
94 return true;
95
96 size_t const len = Parent.size();
97 if (len > 5 && Parent.find("/apt/", len - 6, 5) != len - 5)
98 {
99 if (CreateDirectory(Parent.substr(0,len-5), Path) == true)
100 return true;
101 }
102 else if (CreateDirectory(Parent, Path) == true)
103 return true;
104
105 return _error->Errno("Acquire", _("Directory %s can't be created."), Path.c_str());
106 }
107 /*}}}*/
108 // Acquire::~pkgAcquire - Destructor /*{{{*/
109 // ---------------------------------------------------------------------
110 /* Free our memory, clean up the queues (destroy the workers) */
111 pkgAcquire::~pkgAcquire()
112 {
113 Shutdown();
114
115 if (LockFD != -1)
116 close(LockFD);
117
118 while (Configs != 0)
119 {
120 MethodConfig *Jnk = Configs;
121 Configs = Configs->Next;
122 delete Jnk;
123 }
124 }
125 /*}}}*/
126 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
127 // ---------------------------------------------------------------------
128 /* */
129 void pkgAcquire::Shutdown()
130 {
131 while (Items.size() != 0)
132 {
133 if (Items[0]->Status == Item::StatFetching)
134 Items[0]->Status = Item::StatError;
135 delete Items[0];
136 }
137
138 while (Queues != 0)
139 {
140 Queue *Jnk = Queues;
141 Queues = Queues->Next;
142 delete Jnk;
143 }
144 }
145 /*}}}*/
146 // Acquire::Add - Add a new item /*{{{*/
147 // ---------------------------------------------------------------------
148 /* This puts an item on the acquire list. This list is mainly for tracking
149 item status */
150 void pkgAcquire::Add(Item *Itm)
151 {
152 Items.push_back(Itm);
153 }
154 /*}}}*/
155 // Acquire::Remove - Remove a item /*{{{*/
156 // ---------------------------------------------------------------------
157 /* Remove an item from the acquire list. This is usually not used.. */
158 void pkgAcquire::Remove(Item *Itm)
159 {
160 Dequeue(Itm);
161
162 for (ItemIterator I = Items.begin(); I != Items.end();)
163 {
164 if (*I == Itm)
165 {
166 Items.erase(I);
167 I = Items.begin();
168 }
169 else
170 I++;
171 }
172 }
173 /*}}}*/
174 // Acquire::Add - Add a worker /*{{{*/
175 // ---------------------------------------------------------------------
176 /* A list of workers is kept so that the select loop can direct their FD
177 usage. */
178 void pkgAcquire::Add(Worker *Work)
179 {
180 Work->NextAcquire = Workers;
181 Workers = Work;
182 }
183 /*}}}*/
184 // Acquire::Remove - Remove a worker /*{{{*/
185 // ---------------------------------------------------------------------
186 /* A worker has died. This can not be done while the select loop is running
187 as it would require that RunFds could handling a changing list state and
188 it cant.. */
189 void pkgAcquire::Remove(Worker *Work)
190 {
191 if (Running == true)
192 abort();
193
194 Worker **I = &Workers;
195 for (; *I != 0;)
196 {
197 if (*I == Work)
198 *I = (*I)->NextAcquire;
199 else
200 I = &(*I)->NextAcquire;
201 }
202 }
203 /*}}}*/
204 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
205 // ---------------------------------------------------------------------
206 /* This is the entry point for an item. An item calls this function when
207 it is constructed which creates a queue (based on the current queue
208 mode) and puts the item in that queue. If the system is running then
209 the queue might be started. */
210 void pkgAcquire::Enqueue(ItemDesc &Item)
211 {
212 // Determine which queue to put the item in
213 const MethodConfig *Config;
214 string Name = QueueName(Item.URI,Config);
215 if (Name.empty() == true)
216 return;
217
218 // Find the queue structure
219 Queue *I = Queues;
220 for (; I != 0 && I->Name != Name; I = I->Next);
221 if (I == 0)
222 {
223 I = new Queue(Name,this);
224 I->Next = Queues;
225 Queues = I;
226
227 if (Running == true)
228 I->Startup();
229 }
230
231 // See if this is a local only URI
232 if (Config->LocalOnly == true && Item.Owner->Complete == false)
233 Item.Owner->Local = true;
234 Item.Owner->Status = Item::StatIdle;
235
236 // Queue it into the named queue
237 if(I->Enqueue(Item))
238 ToFetch++;
239
240 // Some trace stuff
241 if (Debug == true)
242 {
243 clog << "Fetching " << Item.URI << endl;
244 clog << " to " << Item.Owner->DestFile << endl;
245 clog << " Queue is: " << Name << endl;
246 }
247 }
248 /*}}}*/
249 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
250 // ---------------------------------------------------------------------
251 /* This is called when an item is finished being fetched. It removes it
252 from all the queues */
253 void pkgAcquire::Dequeue(Item *Itm)
254 {
255 Queue *I = Queues;
256 bool Res = false;
257 for (; I != 0; I = I->Next)
258 Res |= I->Dequeue(Itm);
259
260 if (Debug == true)
261 clog << "Dequeuing " << Itm->DestFile << endl;
262 if (Res == true)
263 ToFetch--;
264 }
265 /*}}}*/
266 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
267 // ---------------------------------------------------------------------
268 /* The string returned depends on the configuration settings and the
269 method parameters. Given something like http://foo.org/bar it can
270 return http://foo.org or http */
271 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
272 {
273 URI U(Uri);
274
275 Config = GetConfig(U.Access);
276 if (Config == 0)
277 return string();
278
279 /* Single-Instance methods get exactly one queue per URI. This is
280 also used for the Access queue method */
281 if (Config->SingleInstance == true || QueueMode == QueueAccess)
282 return U.Access;
283
284 return U.Access + ':' + U.Host;
285 }
286 /*}}}*/
287 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
288 // ---------------------------------------------------------------------
289 /* This locates the configuration structure for an access method. If
290 a config structure cannot be found a Worker will be created to
291 retrieve it */
292 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
293 {
294 // Search for an existing config
295 MethodConfig *Conf;
296 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
297 if (Conf->Access == Access)
298 return Conf;
299
300 // Create the new config class
301 Conf = new MethodConfig;
302 Conf->Access = Access;
303 Conf->Next = Configs;
304 Configs = Conf;
305
306 // Create the worker to fetch the configuration
307 Worker Work(Conf);
308 if (Work.Start() == false)
309 return 0;
310
311 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
312 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
313 Conf->SingleInstance = true;
314
315 return Conf;
316 }
317 /*}}}*/
318 // Acquire::SetFds - Deal with readable FDs /*{{{*/
319 // ---------------------------------------------------------------------
320 /* Collect FDs that have activity monitors into the fd sets */
321 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
322 {
323 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
324 {
325 if (I->InReady == true && I->InFd >= 0)
326 {
327 if (Fd < I->InFd)
328 Fd = I->InFd;
329 FD_SET(I->InFd,RSet);
330 }
331 if (I->OutReady == true && I->OutFd >= 0)
332 {
333 if (Fd < I->OutFd)
334 Fd = I->OutFd;
335 FD_SET(I->OutFd,WSet);
336 }
337 }
338 }
339 /*}}}*/
340 // Acquire::RunFds - Deal with active FDs /*{{{*/
341 // ---------------------------------------------------------------------
342 /* Dispatch active FDs over to the proper workers. It is very important
343 that a worker never be erased while this is running! The queue class
344 should never erase a worker except during shutdown processing. */
345 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
346 {
347 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
348 {
349 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
350 I->InFdReady();
351 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
352 I->OutFdReady();
353 }
354 }
355 /*}}}*/
356 // Acquire::Run - Run the fetch sequence /*{{{*/
357 // ---------------------------------------------------------------------
358 /* This runs the queues. It manages a select loop for all of the
359 Worker tasks. The workers interact with the queues and items to
360 manage the actual fetch. */
361 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
362 {
363 Running = true;
364
365 for (Queue *I = Queues; I != 0; I = I->Next)
366 I->Startup();
367
368 if (Log != 0)
369 Log->Start();
370
371 bool WasCancelled = false;
372
373 // Run till all things have been acquired
374 struct timeval tv;
375 tv.tv_sec = 0;
376 tv.tv_usec = PulseIntervall;
377 while (ToFetch > 0)
378 {
379 fd_set RFds;
380 fd_set WFds;
381 int Highest = 0;
382 FD_ZERO(&RFds);
383 FD_ZERO(&WFds);
384 SetFds(Highest,&RFds,&WFds);
385
386 int Res;
387 do
388 {
389 Res = select(Highest+1,&RFds,&WFds,0,&tv);
390 }
391 while (Res < 0 && errno == EINTR);
392
393 if (Res < 0)
394 {
395 _error->Errno("select","Select has failed");
396 break;
397 }
398
399 RunFds(&RFds,&WFds);
400 if (_error->PendingError() == true)
401 break;
402
403 // Timeout, notify the log class
404 if (Res == 0 || (Log != 0 && Log->Update == true))
405 {
406 tv.tv_usec = PulseIntervall;
407 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
408 I->Pulse();
409 if (Log != 0 && Log->Pulse(this) == false)
410 {
411 WasCancelled = true;
412 break;
413 }
414 }
415 }
416
417 if (Log != 0)
418 Log->Stop();
419
420 // Shut down the acquire bits
421 Running = false;
422 for (Queue *I = Queues; I != 0; I = I->Next)
423 I->Shutdown(false);
424
425 // Shut down the items
426 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
427 (*I)->Finished();
428
429 if (_error->PendingError())
430 return Failed;
431 if (WasCancelled)
432 return Cancelled;
433 return Continue;
434 }
435 /*}}}*/
436 // Acquire::Bump - Called when an item is dequeued /*{{{*/
437 // ---------------------------------------------------------------------
438 /* This routine bumps idle queues in hopes that they will be able to fetch
439 the dequeued item */
440 void pkgAcquire::Bump()
441 {
442 for (Queue *I = Queues; I != 0; I = I->Next)
443 I->Bump();
444 }
445 /*}}}*/
446 // Acquire::WorkerStep - Step to the next worker /*{{{*/
447 // ---------------------------------------------------------------------
448 /* Not inlined to advoid including acquire-worker.h */
449 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
450 {
451 return I->NextAcquire;
452 };
453 /*}}}*/
454 // Acquire::Clean - Cleans a directory /*{{{*/
455 // ---------------------------------------------------------------------
456 /* This is a bit simplistic, it looks at every file in the dir and sees
457 if it is part of the download set. */
458 bool pkgAcquire::Clean(string Dir)
459 {
460 DIR *D = opendir(Dir.c_str());
461 if (D == 0)
462 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
463
464 string StartDir = SafeGetCWD();
465 if (chdir(Dir.c_str()) != 0)
466 {
467 closedir(D);
468 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
469 }
470
471 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
472 {
473 // Skip some files..
474 if (strcmp(Dir->d_name,"lock") == 0 ||
475 strcmp(Dir->d_name,"partial") == 0 ||
476 strcmp(Dir->d_name,".") == 0 ||
477 strcmp(Dir->d_name,"..") == 0)
478 continue;
479
480 // Look in the get list
481 ItemCIterator I = Items.begin();
482 for (; I != Items.end(); I++)
483 if (flNotDir((*I)->DestFile) == Dir->d_name)
484 break;
485
486 // Nothing found, nuke it
487 if (I == Items.end())
488 unlink(Dir->d_name);
489 };
490
491 closedir(D);
492 if (chdir(StartDir.c_str()) != 0)
493 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
494 return true;
495 }
496 /*}}}*/
497 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
498 // ---------------------------------------------------------------------
499 /* This is the total number of bytes needed */
500 double pkgAcquire::TotalNeeded()
501 {
502 double Total = 0;
503 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
504 Total += (*I)->FileSize;
505 return Total;
506 }
507 /*}}}*/
508 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
509 // ---------------------------------------------------------------------
510 /* This is the number of bytes that is not local */
511 double pkgAcquire::FetchNeeded()
512 {
513 double Total = 0;
514 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
515 if ((*I)->Local == false)
516 Total += (*I)->FileSize;
517 return Total;
518 }
519 /*}}}*/
520 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
521 // ---------------------------------------------------------------------
522 /* This is the number of bytes that is not local */
523 double pkgAcquire::PartialPresent()
524 {
525 double Total = 0;
526 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
527 if ((*I)->Local == false)
528 Total += (*I)->PartialSize;
529 return Total;
530 }
531 /*}}}*/
532 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
533 // ---------------------------------------------------------------------
534 /* */
535 pkgAcquire::UriIterator pkgAcquire::UriBegin()
536 {
537 return UriIterator(Queues);
538 }
539 /*}}}*/
540 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
541 // ---------------------------------------------------------------------
542 /* */
543 pkgAcquire::UriIterator pkgAcquire::UriEnd()
544 {
545 return UriIterator(0);
546 }
547 /*}}}*/
548 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
549 // ---------------------------------------------------------------------
550 /* */
551 pkgAcquire::MethodConfig::MethodConfig()
552 {
553 SingleInstance = false;
554 Pipeline = false;
555 SendConfig = false;
556 LocalOnly = false;
557 Removable = false;
558 Next = 0;
559 }
560 /*}}}*/
561 // Queue::Queue - Constructor /*{{{*/
562 // ---------------------------------------------------------------------
563 /* */
564 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
565 Owner(Owner)
566 {
567 Items = 0;
568 Next = 0;
569 Workers = 0;
570 MaxPipeDepth = 1;
571 PipeDepth = 0;
572 }
573 /*}}}*/
574 // Queue::~Queue - Destructor /*{{{*/
575 // ---------------------------------------------------------------------
576 /* */
577 pkgAcquire::Queue::~Queue()
578 {
579 Shutdown(true);
580
581 while (Items != 0)
582 {
583 QItem *Jnk = Items;
584 Items = Items->Next;
585 delete Jnk;
586 }
587 }
588 /*}}}*/
589 // Queue::Enqueue - Queue an item to the queue /*{{{*/
590 // ---------------------------------------------------------------------
591 /* */
592 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
593 {
594 QItem **I = &Items;
595 // move to the end of the queue and check for duplicates here
596 for (; *I != 0; I = &(*I)->Next)
597 if (Item.URI == (*I)->URI)
598 {
599 Item.Owner->Status = Item::StatDone;
600 return false;
601 }
602
603 // Create a new item
604 QItem *Itm = new QItem;
605 *Itm = Item;
606 Itm->Next = 0;
607 *I = Itm;
608
609 Item.Owner->QueueCounter++;
610 if (Items->Next == 0)
611 Cycle();
612 return true;
613 }
614 /*}}}*/
615 // Queue::Dequeue - Remove an item from the queue /*{{{*/
616 // ---------------------------------------------------------------------
617 /* We return true if we hit something */
618 bool pkgAcquire::Queue::Dequeue(Item *Owner)
619 {
620 if (Owner->Status == pkgAcquire::Item::StatFetching)
621 return _error->Error("Tried to dequeue a fetching object");
622
623 bool Res = false;
624
625 QItem **I = &Items;
626 for (; *I != 0;)
627 {
628 if ((*I)->Owner == Owner)
629 {
630 QItem *Jnk= *I;
631 *I = (*I)->Next;
632 Owner->QueueCounter--;
633 delete Jnk;
634 Res = true;
635 }
636 else
637 I = &(*I)->Next;
638 }
639
640 return Res;
641 }
642 /*}}}*/
643 // Queue::Startup - Start the worker processes /*{{{*/
644 // ---------------------------------------------------------------------
645 /* It is possible for this to be called with a pre-existing set of
646 workers. */
647 bool pkgAcquire::Queue::Startup()
648 {
649 if (Workers == 0)
650 {
651 URI U(Name);
652 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
653 if (Cnf == 0)
654 return false;
655
656 Workers = new Worker(this,Cnf,Owner->Log);
657 Owner->Add(Workers);
658 if (Workers->Start() == false)
659 return false;
660
661 /* When pipelining we commit 10 items. This needs to change when we
662 added other source retry to have cycle maintain a pipeline depth
663 on its own. */
664 if (Cnf->Pipeline == true)
665 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
666 else
667 MaxPipeDepth = 1;
668 }
669
670 return Cycle();
671 }
672 /*}}}*/
673 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
674 // ---------------------------------------------------------------------
675 /* If final is true then all workers are eliminated, otherwise only workers
676 that do not need cleanup are removed */
677 bool pkgAcquire::Queue::Shutdown(bool Final)
678 {
679 // Delete all of the workers
680 pkgAcquire::Worker **Cur = &Workers;
681 while (*Cur != 0)
682 {
683 pkgAcquire::Worker *Jnk = *Cur;
684 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
685 {
686 *Cur = Jnk->NextQueue;
687 Owner->Remove(Jnk);
688 delete Jnk;
689 }
690 else
691 Cur = &(*Cur)->NextQueue;
692 }
693
694 return true;
695 }
696 /*}}}*/
697 // Queue::FindItem - Find a URI in the item list /*{{{*/
698 // ---------------------------------------------------------------------
699 /* */
700 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
701 {
702 for (QItem *I = Items; I != 0; I = I->Next)
703 if (I->URI == URI && I->Worker == Owner)
704 return I;
705 return 0;
706 }
707 /*}}}*/
708 // Queue::ItemDone - Item has been completed /*{{{*/
709 // ---------------------------------------------------------------------
710 /* The worker signals this which causes the item to be removed from the
711 queue. If this is the last queue instance then it is removed from the
712 main queue too.*/
713 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
714 {
715 PipeDepth--;
716 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
717 Itm->Owner->Status = pkgAcquire::Item::StatDone;
718
719 if (Itm->Owner->QueueCounter <= 1)
720 Owner->Dequeue(Itm->Owner);
721 else
722 {
723 Dequeue(Itm->Owner);
724 Owner->Bump();
725 }
726
727 return Cycle();
728 }
729 /*}}}*/
730 // Queue::Cycle - Queue new items into the method /*{{{*/
731 // ---------------------------------------------------------------------
732 /* This locates a new idle item and sends it to the worker. If pipelining
733 is enabled then it keeps the pipe full. */
734 bool pkgAcquire::Queue::Cycle()
735 {
736 if (Items == 0 || Workers == 0)
737 return true;
738
739 if (PipeDepth < 0)
740 return _error->Error("Pipedepth failure");
741
742 // Look for a queable item
743 QItem *I = Items;
744 while (PipeDepth < (signed)MaxPipeDepth)
745 {
746 for (; I != 0; I = I->Next)
747 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
748 break;
749
750 // Nothing to do, queue is idle.
751 if (I == 0)
752 return true;
753
754 I->Worker = Workers;
755 I->Owner->Status = pkgAcquire::Item::StatFetching;
756 PipeDepth++;
757 if (Workers->QueueItem(I) == false)
758 return false;
759 }
760
761 return true;
762 }
763 /*}}}*/
764 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
765 // ---------------------------------------------------------------------
766 /* This is called when an item in multiple queues is dequeued */
767 void pkgAcquire::Queue::Bump()
768 {
769 Cycle();
770 }
771 /*}}}*/
772 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
773 // ---------------------------------------------------------------------
774 /* */
775 pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
776 {
777 Start();
778 }
779 /*}}}*/
780 // AcquireStatus::Pulse - Called periodically /*{{{*/
781 // ---------------------------------------------------------------------
782 /* This computes some internal state variables for the derived classes to
783 use. It generates the current downloaded bytes and total bytes to download
784 as well as the current CPS estimate. */
785 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
786 {
787 TotalBytes = 0;
788 CurrentBytes = 0;
789 TotalItems = 0;
790 CurrentItems = 0;
791
792 // Compute the total number of bytes to fetch
793 unsigned int Unknown = 0;
794 unsigned int Count = 0;
795 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
796 I++, Count++)
797 {
798 TotalItems++;
799 if ((*I)->Status == pkgAcquire::Item::StatDone)
800 CurrentItems++;
801
802 // Totally ignore local items
803 if ((*I)->Local == true)
804 continue;
805
806 TotalBytes += (*I)->FileSize;
807 if ((*I)->Complete == true)
808 CurrentBytes += (*I)->FileSize;
809 if ((*I)->FileSize == 0 && (*I)->Complete == false)
810 Unknown++;
811 }
812
813 // Compute the current completion
814 unsigned long ResumeSize = 0;
815 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
816 I = Owner->WorkerStep(I))
817 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
818 {
819 CurrentBytes += I->CurrentSize;
820 ResumeSize += I->ResumePoint;
821
822 // Files with unknown size always have 100% completion
823 if (I->CurrentItem->Owner->FileSize == 0 &&
824 I->CurrentItem->Owner->Complete == false)
825 TotalBytes += I->CurrentSize;
826 }
827
828 // Normalize the figures and account for unknown size downloads
829 if (TotalBytes <= 0)
830 TotalBytes = 1;
831 if (Unknown == Count)
832 TotalBytes = Unknown;
833
834 // Wha?! Is not supposed to happen.
835 if (CurrentBytes > TotalBytes)
836 CurrentBytes = TotalBytes;
837
838 // Compute the CPS
839 struct timeval NewTime;
840 gettimeofday(&NewTime,0);
841 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
842 NewTime.tv_sec - Time.tv_sec > 6)
843 {
844 double Delta = NewTime.tv_sec - Time.tv_sec +
845 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
846
847 // Compute the CPS value
848 if (Delta < 0.01)
849 CurrentCPS = 0;
850 else
851 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
852 LastBytes = CurrentBytes - ResumeSize;
853 ElapsedTime = (unsigned long)Delta;
854 Time = NewTime;
855 }
856
857 int fd = _config->FindI("APT::Status-Fd",-1);
858 if(fd > 0)
859 {
860 ostringstream status;
861
862 char msg[200];
863 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
864 unsigned long ETA =
865 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
866
867 // only show the ETA if it makes sense
868 if (ETA > 0 && ETA < 172800 /* two days */ )
869 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
870 else
871 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
872
873
874
875 // build the status str
876 status << "dlstatus:" << i
877 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
878 << ":" << msg
879 << endl;
880 write(fd, status.str().c_str(), status.str().size());
881 }
882
883 return true;
884 }
885 /*}}}*/
886 // AcquireStatus::Start - Called when the download is started /*{{{*/
887 // ---------------------------------------------------------------------
888 /* We just reset the counters */
889 void pkgAcquireStatus::Start()
890 {
891 gettimeofday(&Time,0);
892 gettimeofday(&StartTime,0);
893 LastBytes = 0;
894 CurrentCPS = 0;
895 CurrentBytes = 0;
896 TotalBytes = 0;
897 FetchedBytes = 0;
898 ElapsedTime = 0;
899 TotalItems = 0;
900 CurrentItems = 0;
901 }
902 /*}}}*/
903 // AcquireStatus::Stop - Finished downloading /*{{{*/
904 // ---------------------------------------------------------------------
905 /* This accurately computes the elapsed time and the total overall CPS. */
906 void pkgAcquireStatus::Stop()
907 {
908 // Compute the CPS and elapsed time
909 struct timeval NewTime;
910 gettimeofday(&NewTime,0);
911
912 double Delta = NewTime.tv_sec - StartTime.tv_sec +
913 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
914
915 // Compute the CPS value
916 if (Delta < 0.01)
917 CurrentCPS = 0;
918 else
919 CurrentCPS = FetchedBytes/Delta;
920 LastBytes = CurrentBytes;
921 ElapsedTime = (unsigned int)Delta;
922 }
923 /*}}}*/
924 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
925 // ---------------------------------------------------------------------
926 /* This is used to get accurate final transfer rate reporting. */
927 void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
928 {
929 FetchedBytes += Size - Resume;
930 }
931 /*}}}*/