]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
merged r1970 lp:~vorlon/apt/lp.968828
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <iostream>
27 #include <sstream>
28 #include <stdio.h>
29
30 #include <dirent.h>
31 #include <sys/time.h>
32 #include <errno.h>
33
34 #include <apti18n.h>
35 /*}}}*/
36
37 using namespace std;
38
39 // Acquire::pkgAcquire - Constructor /*{{{*/
40 // ---------------------------------------------------------------------
41 /* We grab some runtime state from the configuration space */
42 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
43 Debug(_config->FindB("Debug::pkgAcquire",false)),
44 Running(false)
45 {
46 string const Mode = _config->Find("Acquire::Queue-Mode","host");
47 if (strcasecmp(Mode.c_str(),"host") == 0)
48 QueueMode = QueueHost;
49 if (strcasecmp(Mode.c_str(),"access") == 0)
50 QueueMode = QueueAccess;
51 }
52 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
53 Configs(0), Log(Progress), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
55 Running(false)
56 {
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 Setup(Progress, "");
63 }
64 /*}}}*/
65 // Acquire::Setup - Delayed Constructor /*{{{*/
66 // ---------------------------------------------------------------------
67 /* Do everything needed to be a complete Acquire object and report the
68 success (or failure) back so the user knows that something is wrong… */
69 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
70 {
71 Log = Progress;
72
73 // check for existence and possibly create auxiliary directories
74 string const listDir = _config->FindDir("Dir::State::lists");
75 string const partialListDir = listDir + "partial/";
76 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
77 string const partialArchivesDir = archivesDir + "partial/";
78
79 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
80 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
81 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
82
83 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
84 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
85 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
86
87 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
88 return true;
89
90 // Lock the directory this acquire object will work in
91 LockFD = GetLock(flCombine(Lock, "lock"));
92 if (LockFD == -1)
93 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
94
95 return true;
96 }
97 /*}}}*/
98 // Acquire::~pkgAcquire - Destructor /*{{{*/
99 // ---------------------------------------------------------------------
100 /* Free our memory, clean up the queues (destroy the workers) */
101 pkgAcquire::~pkgAcquire()
102 {
103 Shutdown();
104
105 if (LockFD != -1)
106 close(LockFD);
107
108 while (Configs != 0)
109 {
110 MethodConfig *Jnk = Configs;
111 Configs = Configs->Next;
112 delete Jnk;
113 }
114 }
115 /*}}}*/
116 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
117 // ---------------------------------------------------------------------
118 /* */
119 void pkgAcquire::Shutdown()
120 {
121 while (Items.empty() == false)
122 {
123 if (Items[0]->Status == Item::StatFetching)
124 Items[0]->Status = Item::StatError;
125 delete Items[0];
126 }
127
128 while (Queues != 0)
129 {
130 Queue *Jnk = Queues;
131 Queues = Queues->Next;
132 delete Jnk;
133 }
134 }
135 /*}}}*/
136 // Acquire::Add - Add a new item /*{{{*/
137 // ---------------------------------------------------------------------
138 /* This puts an item on the acquire list. This list is mainly for tracking
139 item status */
140 void pkgAcquire::Add(Item *Itm)
141 {
142 Items.push_back(Itm);
143 }
144 /*}}}*/
145 // Acquire::Remove - Remove a item /*{{{*/
146 // ---------------------------------------------------------------------
147 /* Remove an item from the acquire list. This is usually not used.. */
148 void pkgAcquire::Remove(Item *Itm)
149 {
150 Dequeue(Itm);
151
152 for (ItemIterator I = Items.begin(); I != Items.end();)
153 {
154 if (*I == Itm)
155 {
156 Items.erase(I);
157 I = Items.begin();
158 }
159 else
160 ++I;
161 }
162 }
163 /*}}}*/
164 // Acquire::Add - Add a worker /*{{{*/
165 // ---------------------------------------------------------------------
166 /* A list of workers is kept so that the select loop can direct their FD
167 usage. */
168 void pkgAcquire::Add(Worker *Work)
169 {
170 Work->NextAcquire = Workers;
171 Workers = Work;
172 }
173 /*}}}*/
174 // Acquire::Remove - Remove a worker /*{{{*/
175 // ---------------------------------------------------------------------
176 /* A worker has died. This can not be done while the select loop is running
177 as it would require that RunFds could handling a changing list state and
178 it cant.. */
179 void pkgAcquire::Remove(Worker *Work)
180 {
181 if (Running == true)
182 abort();
183
184 Worker **I = &Workers;
185 for (; *I != 0;)
186 {
187 if (*I == Work)
188 *I = (*I)->NextAcquire;
189 else
190 I = &(*I)->NextAcquire;
191 }
192 }
193 /*}}}*/
194 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
195 // ---------------------------------------------------------------------
196 /* This is the entry point for an item. An item calls this function when
197 it is constructed which creates a queue (based on the current queue
198 mode) and puts the item in that queue. If the system is running then
199 the queue might be started. */
200 void pkgAcquire::Enqueue(ItemDesc &Item)
201 {
202 // Determine which queue to put the item in
203 const MethodConfig *Config;
204 string Name = QueueName(Item.URI,Config);
205 if (Name.empty() == true)
206 return;
207
208 // Find the queue structure
209 Queue *I = Queues;
210 for (; I != 0 && I->Name != Name; I = I->Next);
211 if (I == 0)
212 {
213 I = new Queue(Name,this);
214 I->Next = Queues;
215 Queues = I;
216
217 if (Running == true)
218 I->Startup();
219 }
220
221 // See if this is a local only URI
222 if (Config->LocalOnly == true && Item.Owner->Complete == false)
223 Item.Owner->Local = true;
224 Item.Owner->Status = Item::StatIdle;
225
226 // Queue it into the named queue
227 if(I->Enqueue(Item))
228 ToFetch++;
229
230 // Some trace stuff
231 if (Debug == true)
232 {
233 clog << "Fetching " << Item.URI << endl;
234 clog << " to " << Item.Owner->DestFile << endl;
235 clog << " Queue is: " << Name << endl;
236 }
237 }
238 /*}}}*/
239 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
240 // ---------------------------------------------------------------------
241 /* This is called when an item is finished being fetched. It removes it
242 from all the queues */
243 void pkgAcquire::Dequeue(Item *Itm)
244 {
245 Queue *I = Queues;
246 bool Res = false;
247 for (; I != 0; I = I->Next)
248 Res |= I->Dequeue(Itm);
249
250 if (Debug == true)
251 clog << "Dequeuing " << Itm->DestFile << endl;
252 if (Res == true)
253 ToFetch--;
254 }
255 /*}}}*/
256 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
257 // ---------------------------------------------------------------------
258 /* The string returned depends on the configuration settings and the
259 method parameters. Given something like http://foo.org/bar it can
260 return http://foo.org or http */
261 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
262 {
263 URI U(Uri);
264
265 Config = GetConfig(U.Access);
266 if (Config == 0)
267 return string();
268
269 /* Single-Instance methods get exactly one queue per URI. This is
270 also used for the Access queue method */
271 if (Config->SingleInstance == true || QueueMode == QueueAccess)
272 return U.Access;
273
274 return U.Access + ':' + U.Host;
275 }
276 /*}}}*/
277 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
278 // ---------------------------------------------------------------------
279 /* This locates the configuration structure for an access method. If
280 a config structure cannot be found a Worker will be created to
281 retrieve it */
282 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
283 {
284 // Search for an existing config
285 MethodConfig *Conf;
286 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
287 if (Conf->Access == Access)
288 return Conf;
289
290 // Create the new config class
291 Conf = new MethodConfig;
292 Conf->Access = Access;
293 Conf->Next = Configs;
294 Configs = Conf;
295
296 // Create the worker to fetch the configuration
297 Worker Work(Conf);
298 if (Work.Start() == false)
299 return 0;
300
301 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
302 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
303 Conf->SingleInstance = true;
304
305 return Conf;
306 }
307 /*}}}*/
308 // Acquire::SetFds - Deal with readable FDs /*{{{*/
309 // ---------------------------------------------------------------------
310 /* Collect FDs that have activity monitors into the fd sets */
311 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
312 {
313 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
314 {
315 if (I->InReady == true && I->InFd >= 0)
316 {
317 if (Fd < I->InFd)
318 Fd = I->InFd;
319 FD_SET(I->InFd,RSet);
320 }
321 if (I->OutReady == true && I->OutFd >= 0)
322 {
323 if (Fd < I->OutFd)
324 Fd = I->OutFd;
325 FD_SET(I->OutFd,WSet);
326 }
327 }
328 }
329 /*}}}*/
330 // Acquire::RunFds - Deal with active FDs /*{{{*/
331 // ---------------------------------------------------------------------
332 /* Dispatch active FDs over to the proper workers. It is very important
333 that a worker never be erased while this is running! The queue class
334 should never erase a worker except during shutdown processing. */
335 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
336 {
337 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
338 {
339 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
340 I->InFdReady();
341 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
342 I->OutFdReady();
343 }
344 }
345 /*}}}*/
346 // Acquire::Run - Run the fetch sequence /*{{{*/
347 // ---------------------------------------------------------------------
348 /* This runs the queues. It manages a select loop for all of the
349 Worker tasks. The workers interact with the queues and items to
350 manage the actual fetch. */
351 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
352 {
353 Running = true;
354
355 for (Queue *I = Queues; I != 0; I = I->Next)
356 I->Startup();
357
358 if (Log != 0)
359 Log->Start();
360
361 bool WasCancelled = false;
362
363 // Run till all things have been acquired
364 struct timeval tv;
365 tv.tv_sec = 0;
366 tv.tv_usec = PulseIntervall;
367 while (ToFetch > 0)
368 {
369 fd_set RFds;
370 fd_set WFds;
371 int Highest = 0;
372 FD_ZERO(&RFds);
373 FD_ZERO(&WFds);
374 SetFds(Highest,&RFds,&WFds);
375
376 int Res;
377 do
378 {
379 Res = select(Highest+1,&RFds,&WFds,0,&tv);
380 }
381 while (Res < 0 && errno == EINTR);
382
383 if (Res < 0)
384 {
385 _error->Errno("select","Select has failed");
386 break;
387 }
388
389 RunFds(&RFds,&WFds);
390 if (_error->PendingError() == true)
391 break;
392
393 // Timeout, notify the log class
394 if (Res == 0 || (Log != 0 && Log->Update == true))
395 {
396 tv.tv_usec = PulseIntervall;
397 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
398 I->Pulse();
399 if (Log != 0 && Log->Pulse(this) == false)
400 {
401 WasCancelled = true;
402 break;
403 }
404 }
405 }
406
407 if (Log != 0)
408 Log->Stop();
409
410 // Shut down the acquire bits
411 Running = false;
412 for (Queue *I = Queues; I != 0; I = I->Next)
413 I->Shutdown(false);
414
415 // Shut down the items
416 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
417 (*I)->Finished();
418
419 if (_error->PendingError())
420 return Failed;
421 if (WasCancelled)
422 return Cancelled;
423 return Continue;
424 }
425 /*}}}*/
426 // Acquire::Bump - Called when an item is dequeued /*{{{*/
427 // ---------------------------------------------------------------------
428 /* This routine bumps idle queues in hopes that they will be able to fetch
429 the dequeued item */
430 void pkgAcquire::Bump()
431 {
432 for (Queue *I = Queues; I != 0; I = I->Next)
433 I->Bump();
434 }
435 /*}}}*/
436 // Acquire::WorkerStep - Step to the next worker /*{{{*/
437 // ---------------------------------------------------------------------
438 /* Not inlined to advoid including acquire-worker.h */
439 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
440 {
441 return I->NextAcquire;
442 };
443 /*}}}*/
444 // Acquire::Clean - Cleans a directory /*{{{*/
445 // ---------------------------------------------------------------------
446 /* This is a bit simplistic, it looks at every file in the dir and sees
447 if it is part of the download set. */
448 bool pkgAcquire::Clean(string Dir)
449 {
450 // non-existing directories are by definition clean…
451 if (DirectoryExists(Dir) == false)
452 return true;
453
454 DIR *D = opendir(Dir.c_str());
455 if (D == 0)
456 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
457
458 string StartDir = SafeGetCWD();
459 if (chdir(Dir.c_str()) != 0)
460 {
461 closedir(D);
462 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
463 }
464
465 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
466 {
467 // Skip some files..
468 if (strcmp(Dir->d_name,"lock") == 0 ||
469 strcmp(Dir->d_name,"partial") == 0 ||
470 strcmp(Dir->d_name,".") == 0 ||
471 strcmp(Dir->d_name,"..") == 0)
472 continue;
473
474 // Look in the get list
475 ItemCIterator I = Items.begin();
476 for (; I != Items.end(); ++I)
477 if (flNotDir((*I)->DestFile) == Dir->d_name)
478 break;
479
480 // Nothing found, nuke it
481 if (I == Items.end())
482 unlink(Dir->d_name);
483 };
484
485 closedir(D);
486 if (chdir(StartDir.c_str()) != 0)
487 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
488 return true;
489 }
490 /*}}}*/
491 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
492 // ---------------------------------------------------------------------
493 /* This is the total number of bytes needed */
494 unsigned long long pkgAcquire::TotalNeeded()
495 {
496 unsigned long long Total = 0;
497 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
498 Total += (*I)->FileSize;
499 return Total;
500 }
501 /*}}}*/
502 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
503 // ---------------------------------------------------------------------
504 /* This is the number of bytes that is not local */
505 unsigned long long pkgAcquire::FetchNeeded()
506 {
507 unsigned long long Total = 0;
508 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
509 if ((*I)->Local == false)
510 Total += (*I)->FileSize;
511 return Total;
512 }
513 /*}}}*/
514 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
515 // ---------------------------------------------------------------------
516 /* This is the number of bytes that is not local */
517 unsigned long long pkgAcquire::PartialPresent()
518 {
519 unsigned long long Total = 0;
520 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
521 if ((*I)->Local == false)
522 Total += (*I)->PartialSize;
523 return Total;
524 }
525 /*}}}*/
526 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
527 // ---------------------------------------------------------------------
528 /* */
529 pkgAcquire::UriIterator pkgAcquire::UriBegin()
530 {
531 return UriIterator(Queues);
532 }
533 /*}}}*/
534 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
535 // ---------------------------------------------------------------------
536 /* */
537 pkgAcquire::UriIterator pkgAcquire::UriEnd()
538 {
539 return UriIterator(0);
540 }
541 /*}}}*/
542 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
543 // ---------------------------------------------------------------------
544 /* */
545 pkgAcquire::MethodConfig::MethodConfig()
546 {
547 SingleInstance = false;
548 Pipeline = false;
549 SendConfig = false;
550 LocalOnly = false;
551 Removable = false;
552 Next = 0;
553 }
554 /*}}}*/
555 // Queue::Queue - Constructor /*{{{*/
556 // ---------------------------------------------------------------------
557 /* */
558 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
559 Owner(Owner)
560 {
561 Items = 0;
562 Next = 0;
563 Workers = 0;
564 MaxPipeDepth = 1;
565 PipeDepth = 0;
566 }
567 /*}}}*/
568 // Queue::~Queue - Destructor /*{{{*/
569 // ---------------------------------------------------------------------
570 /* */
571 pkgAcquire::Queue::~Queue()
572 {
573 Shutdown(true);
574
575 while (Items != 0)
576 {
577 QItem *Jnk = Items;
578 Items = Items->Next;
579 delete Jnk;
580 }
581 }
582 /*}}}*/
583 // Queue::Enqueue - Queue an item to the queue /*{{{*/
584 // ---------------------------------------------------------------------
585 /* */
586 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
587 {
588 QItem **I = &Items;
589 // move to the end of the queue and check for duplicates here
590 for (; *I != 0; I = &(*I)->Next)
591 if (Item.URI == (*I)->URI)
592 {
593 Item.Owner->Status = Item::StatDone;
594 return false;
595 }
596
597 // Create a new item
598 QItem *Itm = new QItem;
599 *Itm = Item;
600 Itm->Next = 0;
601 *I = Itm;
602
603 Item.Owner->QueueCounter++;
604 if (Items->Next == 0)
605 Cycle();
606 return true;
607 }
608 /*}}}*/
609 // Queue::Dequeue - Remove an item from the queue /*{{{*/
610 // ---------------------------------------------------------------------
611 /* We return true if we hit something */
612 bool pkgAcquire::Queue::Dequeue(Item *Owner)
613 {
614 if (Owner->Status == pkgAcquire::Item::StatFetching)
615 return _error->Error("Tried to dequeue a fetching object");
616
617 bool Res = false;
618
619 QItem **I = &Items;
620 for (; *I != 0;)
621 {
622 if ((*I)->Owner == Owner)
623 {
624 QItem *Jnk= *I;
625 *I = (*I)->Next;
626 Owner->QueueCounter--;
627 delete Jnk;
628 Res = true;
629 }
630 else
631 I = &(*I)->Next;
632 }
633
634 return Res;
635 }
636 /*}}}*/
637 // Queue::Startup - Start the worker processes /*{{{*/
638 // ---------------------------------------------------------------------
639 /* It is possible for this to be called with a pre-existing set of
640 workers. */
641 bool pkgAcquire::Queue::Startup()
642 {
643 if (Workers == 0)
644 {
645 URI U(Name);
646 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
647 if (Cnf == 0)
648 return false;
649
650 Workers = new Worker(this,Cnf,Owner->Log);
651 Owner->Add(Workers);
652 if (Workers->Start() == false)
653 return false;
654
655 /* When pipelining we commit 10 items. This needs to change when we
656 added other source retry to have cycle maintain a pipeline depth
657 on its own. */
658 if (Cnf->Pipeline == true)
659 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
660 else
661 MaxPipeDepth = 1;
662 }
663
664 return Cycle();
665 }
666 /*}}}*/
667 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
668 // ---------------------------------------------------------------------
669 /* If final is true then all workers are eliminated, otherwise only workers
670 that do not need cleanup are removed */
671 bool pkgAcquire::Queue::Shutdown(bool Final)
672 {
673 // Delete all of the workers
674 pkgAcquire::Worker **Cur = &Workers;
675 while (*Cur != 0)
676 {
677 pkgAcquire::Worker *Jnk = *Cur;
678 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
679 {
680 *Cur = Jnk->NextQueue;
681 Owner->Remove(Jnk);
682 delete Jnk;
683 }
684 else
685 Cur = &(*Cur)->NextQueue;
686 }
687
688 return true;
689 }
690 /*}}}*/
691 // Queue::FindItem - Find a URI in the item list /*{{{*/
692 // ---------------------------------------------------------------------
693 /* */
694 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
695 {
696 for (QItem *I = Items; I != 0; I = I->Next)
697 if (I->URI == URI && I->Worker == Owner)
698 return I;
699 return 0;
700 }
701 /*}}}*/
702 // Queue::ItemDone - Item has been completed /*{{{*/
703 // ---------------------------------------------------------------------
704 /* The worker signals this which causes the item to be removed from the
705 queue. If this is the last queue instance then it is removed from the
706 main queue too.*/
707 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
708 {
709 PipeDepth--;
710 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
711 Itm->Owner->Status = pkgAcquire::Item::StatDone;
712
713 if (Itm->Owner->QueueCounter <= 1)
714 Owner->Dequeue(Itm->Owner);
715 else
716 {
717 Dequeue(Itm->Owner);
718 Owner->Bump();
719 }
720
721 return Cycle();
722 }
723 /*}}}*/
724 // Queue::Cycle - Queue new items into the method /*{{{*/
725 // ---------------------------------------------------------------------
726 /* This locates a new idle item and sends it to the worker. If pipelining
727 is enabled then it keeps the pipe full. */
728 bool pkgAcquire::Queue::Cycle()
729 {
730 if (Items == 0 || Workers == 0)
731 return true;
732
733 if (PipeDepth < 0)
734 return _error->Error("Pipedepth failure");
735
736 // Look for a queable item
737 QItem *I = Items;
738 while (PipeDepth < (signed)MaxPipeDepth)
739 {
740 for (; I != 0; I = I->Next)
741 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
742 break;
743
744 // Nothing to do, queue is idle.
745 if (I == 0)
746 return true;
747
748 I->Worker = Workers;
749 I->Owner->Status = pkgAcquire::Item::StatFetching;
750 PipeDepth++;
751 if (Workers->QueueItem(I) == false)
752 return false;
753 }
754
755 return true;
756 }
757 /*}}}*/
758 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
759 // ---------------------------------------------------------------------
760 /* This is called when an item in multiple queues is dequeued */
761 void pkgAcquire::Queue::Bump()
762 {
763 Cycle();
764 }
765 /*}}}*/
766 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
767 // ---------------------------------------------------------------------
768 /* */
769 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
770 {
771 Start();
772 }
773 /*}}}*/
774 // AcquireStatus::Pulse - Called periodically /*{{{*/
775 // ---------------------------------------------------------------------
776 /* This computes some internal state variables for the derived classes to
777 use. It generates the current downloaded bytes and total bytes to download
778 as well as the current CPS estimate. */
779 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
780 {
781 TotalBytes = 0;
782 CurrentBytes = 0;
783 TotalItems = 0;
784 CurrentItems = 0;
785
786 // Compute the total number of bytes to fetch
787 unsigned int Unknown = 0;
788 unsigned int Count = 0;
789 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
790 ++I, ++Count)
791 {
792 TotalItems++;
793 if ((*I)->Status == pkgAcquire::Item::StatDone)
794 ++CurrentItems;
795
796 // Totally ignore local items
797 if ((*I)->Local == true)
798 continue;
799
800 TotalBytes += (*I)->FileSize;
801 if ((*I)->Complete == true)
802 CurrentBytes += (*I)->FileSize;
803 if ((*I)->FileSize == 0 && (*I)->Complete == false)
804 ++Unknown;
805 }
806
807 // Compute the current completion
808 unsigned long long ResumeSize = 0;
809 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
810 I = Owner->WorkerStep(I))
811 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
812 {
813 CurrentBytes += I->CurrentSize;
814 ResumeSize += I->ResumePoint;
815
816 // Files with unknown size always have 100% completion
817 if (I->CurrentItem->Owner->FileSize == 0 &&
818 I->CurrentItem->Owner->Complete == false)
819 TotalBytes += I->CurrentSize;
820 }
821
822 // Normalize the figures and account for unknown size downloads
823 if (TotalBytes <= 0)
824 TotalBytes = 1;
825 if (Unknown == Count)
826 TotalBytes = Unknown;
827
828 // Wha?! Is not supposed to happen.
829 if (CurrentBytes > TotalBytes)
830 CurrentBytes = TotalBytes;
831
832 // Compute the CPS
833 struct timeval NewTime;
834 gettimeofday(&NewTime,0);
835 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
836 NewTime.tv_sec - Time.tv_sec > 6)
837 {
838 double Delta = NewTime.tv_sec - Time.tv_sec +
839 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
840
841 // Compute the CPS value
842 if (Delta < 0.01)
843 CurrentCPS = 0;
844 else
845 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
846 LastBytes = CurrentBytes - ResumeSize;
847 ElapsedTime = (unsigned long long)Delta;
848 Time = NewTime;
849 }
850
851 int fd = _config->FindI("APT::Status-Fd",-1);
852 if(fd > 0)
853 {
854 ostringstream status;
855
856 char msg[200];
857 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
858 unsigned long long ETA = 0;
859 if(CurrentCPS > 0)
860 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
861
862 // only show the ETA if it makes sense
863 if (ETA > 0 && ETA < 172800 /* two days */ )
864 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
865 else
866 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
867
868
869
870 // build the status str
871 status << "dlstatus:" << i
872 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
873 << ":" << msg
874 << endl;
875
876 std::string const dlstatus = status.str();
877 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
878 }
879
880 return true;
881 }
882 /*}}}*/
883 // AcquireStatus::Start - Called when the download is started /*{{{*/
884 // ---------------------------------------------------------------------
885 /* We just reset the counters */
886 void pkgAcquireStatus::Start()
887 {
888 gettimeofday(&Time,0);
889 gettimeofday(&StartTime,0);
890 LastBytes = 0;
891 CurrentCPS = 0;
892 CurrentBytes = 0;
893 TotalBytes = 0;
894 FetchedBytes = 0;
895 ElapsedTime = 0;
896 TotalItems = 0;
897 CurrentItems = 0;
898 }
899 /*}}}*/
900 // AcquireStatus::Stop - Finished downloading /*{{{*/
901 // ---------------------------------------------------------------------
902 /* This accurately computes the elapsed time and the total overall CPS. */
903 void pkgAcquireStatus::Stop()
904 {
905 // Compute the CPS and elapsed time
906 struct timeval NewTime;
907 gettimeofday(&NewTime,0);
908
909 double Delta = NewTime.tv_sec - StartTime.tv_sec +
910 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
911
912 // Compute the CPS value
913 if (Delta < 0.01)
914 CurrentCPS = 0;
915 else
916 CurrentCPS = FetchedBytes/Delta;
917 LastBytes = CurrentBytes;
918 ElapsedTime = (unsigned long long)Delta;
919 }
920 /*}}}*/
921 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
922 // ---------------------------------------------------------------------
923 /* This is used to get accurate final transfer rate reporting. */
924 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
925 {
926 FetchedBytes += Size - Resume;
927 }
928 /*}}}*/