]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Set STRIP_FROM_PATH for doxygen
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34
35 #include <dirent.h>
36 #include <sys/time.h>
37 #include <sys/select.h>
38 #include <errno.h>
39
40 #include <apti18n.h>
41 /*}}}*/
42
43 using namespace std;
44
45 // Acquire::pkgAcquire - Constructor /*{{{*/
46 // ---------------------------------------------------------------------
47 /* We grab some runtime state from the configuration space */
48 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
49 Debug(_config->FindB("Debug::pkgAcquire",false)),
50 Running(false)
51 {
52 string const Mode = _config->Find("Acquire::Queue-Mode","host");
53 if (strcasecmp(Mode.c_str(),"host") == 0)
54 QueueMode = QueueHost;
55 if (strcasecmp(Mode.c_str(),"access") == 0)
56 QueueMode = QueueAccess;
57 }
58 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
59 Configs(0), Log(Progress), ToFetch(0),
60 Debug(_config->FindB("Debug::pkgAcquire",false)),
61 Running(false)
62 {
63 string const Mode = _config->Find("Acquire::Queue-Mode","host");
64 if (strcasecmp(Mode.c_str(),"host") == 0)
65 QueueMode = QueueHost;
66 if (strcasecmp(Mode.c_str(),"access") == 0)
67 QueueMode = QueueAccess;
68 Setup(Progress, "");
69 }
70 /*}}}*/
71 // Acquire::Setup - Delayed Constructor /*{{{*/
72 // ---------------------------------------------------------------------
73 /* Do everything needed to be a complete Acquire object and report the
74 success (or failure) back so the user knows that something is wrong… */
75 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
76 {
77 Log = Progress;
78
79 // check for existence and possibly create auxiliary directories
80 string const listDir = _config->FindDir("Dir::State::lists");
81 string const partialListDir = listDir + "partial/";
82 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
83 string const partialArchivesDir = archivesDir + "partial/";
84
85 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
86 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
87 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
88
89 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
90 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
91 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
92
93 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
94 return true;
95
96 // Lock the directory this acquire object will work in
97 LockFD = GetLock(flCombine(Lock, "lock"));
98 if (LockFD == -1)
99 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
100
101 return true;
102 }
103 /*}}}*/
104 // Acquire::~pkgAcquire - Destructor /*{{{*/
105 // ---------------------------------------------------------------------
106 /* Free our memory, clean up the queues (destroy the workers) */
107 pkgAcquire::~pkgAcquire()
108 {
109 Shutdown();
110
111 if (LockFD != -1)
112 close(LockFD);
113
114 while (Configs != 0)
115 {
116 MethodConfig *Jnk = Configs;
117 Configs = Configs->Next;
118 delete Jnk;
119 }
120 }
121 /*}}}*/
122 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
123 // ---------------------------------------------------------------------
124 /* */
125 void pkgAcquire::Shutdown()
126 {
127 while (Items.empty() == false)
128 {
129 if (Items[0]->Status == Item::StatFetching)
130 Items[0]->Status = Item::StatError;
131 delete Items[0];
132 }
133
134 while (Queues != 0)
135 {
136 Queue *Jnk = Queues;
137 Queues = Queues->Next;
138 delete Jnk;
139 }
140 }
141 /*}}}*/
142 // Acquire::Add - Add a new item /*{{{*/
143 // ---------------------------------------------------------------------
144 /* This puts an item on the acquire list. This list is mainly for tracking
145 item status */
146 void pkgAcquire::Add(Item *Itm)
147 {
148 Items.push_back(Itm);
149 }
150 /*}}}*/
151 // Acquire::Remove - Remove a item /*{{{*/
152 // ---------------------------------------------------------------------
153 /* Remove an item from the acquire list. This is usually not used.. */
154 void pkgAcquire::Remove(Item *Itm)
155 {
156 Dequeue(Itm);
157
158 for (ItemIterator I = Items.begin(); I != Items.end();)
159 {
160 if (*I == Itm)
161 {
162 Items.erase(I);
163 I = Items.begin();
164 }
165 else
166 ++I;
167 }
168 }
169 /*}}}*/
170 // Acquire::Add - Add a worker /*{{{*/
171 // ---------------------------------------------------------------------
172 /* A list of workers is kept so that the select loop can direct their FD
173 usage. */
174 void pkgAcquire::Add(Worker *Work)
175 {
176 Work->NextAcquire = Workers;
177 Workers = Work;
178 }
179 /*}}}*/
180 // Acquire::Remove - Remove a worker /*{{{*/
181 // ---------------------------------------------------------------------
182 /* A worker has died. This can not be done while the select loop is running
183 as it would require that RunFds could handling a changing list state and
184 it can't.. */
185 void pkgAcquire::Remove(Worker *Work)
186 {
187 if (Running == true)
188 abort();
189
190 Worker **I = &Workers;
191 for (; *I != 0;)
192 {
193 if (*I == Work)
194 *I = (*I)->NextAcquire;
195 else
196 I = &(*I)->NextAcquire;
197 }
198 }
199 /*}}}*/
200 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
201 // ---------------------------------------------------------------------
202 /* This is the entry point for an item. An item calls this function when
203 it is constructed which creates a queue (based on the current queue
204 mode) and puts the item in that queue. If the system is running then
205 the queue might be started. */
206 void pkgAcquire::Enqueue(ItemDesc &Item)
207 {
208 // Determine which queue to put the item in
209 const MethodConfig *Config;
210 string Name = QueueName(Item.URI,Config);
211 if (Name.empty() == true)
212 return;
213
214 // Find the queue structure
215 Queue *I = Queues;
216 for (; I != 0 && I->Name != Name; I = I->Next);
217 if (I == 0)
218 {
219 I = new Queue(Name,this);
220 I->Next = Queues;
221 Queues = I;
222
223 if (Running == true)
224 I->Startup();
225 }
226
227 // See if this is a local only URI
228 if (Config->LocalOnly == true && Item.Owner->Complete == false)
229 Item.Owner->Local = true;
230 Item.Owner->Status = Item::StatIdle;
231
232 // Queue it into the named queue
233 if(I->Enqueue(Item))
234 ToFetch++;
235
236 // Some trace stuff
237 if (Debug == true)
238 {
239 clog << "Fetching " << Item.URI << endl;
240 clog << " to " << Item.Owner->DestFile << endl;
241 clog << " Queue is: " << Name << endl;
242 }
243 }
244 /*}}}*/
245 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
246 // ---------------------------------------------------------------------
247 /* This is called when an item is finished being fetched. It removes it
248 from all the queues */
249 void pkgAcquire::Dequeue(Item *Itm)
250 {
251 Queue *I = Queues;
252 bool Res = false;
253 if (Debug == true)
254 clog << "Dequeuing " << Itm->DestFile << endl;
255
256 for (; I != 0; I = I->Next)
257 {
258 if (I->Dequeue(Itm))
259 {
260 Res = true;
261 if (Debug == true)
262 clog << "Dequeued from " << I->Name << endl;
263 }
264 }
265
266 if (Res == true)
267 ToFetch--;
268 }
269 /*}}}*/
270 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
271 // ---------------------------------------------------------------------
272 /* The string returned depends on the configuration settings and the
273 method parameters. Given something like http://foo.org/bar it can
274 return http://foo.org or http */
275 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
276 {
277 URI U(Uri);
278
279 Config = GetConfig(U.Access);
280 if (Config == 0)
281 return string();
282
283 /* Single-Instance methods get exactly one queue per URI. This is
284 also used for the Access queue method */
285 if (Config->SingleInstance == true || QueueMode == QueueAccess)
286 return U.Access;
287
288 string AccessSchema = U.Access + ':',
289 FullQueueName = AccessSchema + U.Host;
290 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
291
292 Queue *I = Queues;
293 for (; I != 0; I = I->Next) {
294 // if the queue already exists, re-use it
295 if (I->Name == FullQueueName)
296 return FullQueueName;
297
298 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
299 Instances++;
300 }
301
302 if (Debug) {
303 clog << "Found " << Instances << " instances of " << U.Access << endl;
304 }
305
306 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
307 return U.Access;
308
309 return FullQueueName;
310 }
311 /*}}}*/
312 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
313 // ---------------------------------------------------------------------
314 /* This locates the configuration structure for an access method. If
315 a config structure cannot be found a Worker will be created to
316 retrieve it */
317 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
318 {
319 // Search for an existing config
320 MethodConfig *Conf;
321 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
322 if (Conf->Access == Access)
323 return Conf;
324
325 // Create the new config class
326 Conf = new MethodConfig;
327 Conf->Access = Access;
328 Conf->Next = Configs;
329 Configs = Conf;
330
331 // Create the worker to fetch the configuration
332 Worker Work(Conf);
333 if (Work.Start() == false)
334 return 0;
335
336 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
337 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
338 Conf->SingleInstance = true;
339
340 return Conf;
341 }
342 /*}}}*/
343 // Acquire::SetFds - Deal with readable FDs /*{{{*/
344 // ---------------------------------------------------------------------
345 /* Collect FDs that have activity monitors into the fd sets */
346 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
347 {
348 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
349 {
350 if (I->InReady == true && I->InFd >= 0)
351 {
352 if (Fd < I->InFd)
353 Fd = I->InFd;
354 FD_SET(I->InFd,RSet);
355 }
356 if (I->OutReady == true && I->OutFd >= 0)
357 {
358 if (Fd < I->OutFd)
359 Fd = I->OutFd;
360 FD_SET(I->OutFd,WSet);
361 }
362 }
363 }
364 /*}}}*/
365 // Acquire::RunFds - Deal with active FDs /*{{{*/
366 // ---------------------------------------------------------------------
367 /* Dispatch active FDs over to the proper workers. It is very important
368 that a worker never be erased while this is running! The queue class
369 should never erase a worker except during shutdown processing. */
370 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
371 {
372 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
373 {
374 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
375 I->InFdReady();
376 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
377 I->OutFdReady();
378 }
379 }
380 /*}}}*/
381 // Acquire::Run - Run the fetch sequence /*{{{*/
382 // ---------------------------------------------------------------------
383 /* This runs the queues. It manages a select loop for all of the
384 Worker tasks. The workers interact with the queues and items to
385 manage the actual fetch. */
386 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
387 {
388 Running = true;
389
390 for (Queue *I = Queues; I != 0; I = I->Next)
391 I->Startup();
392
393 if (Log != 0)
394 Log->Start();
395
396 bool WasCancelled = false;
397
398 // Run till all things have been acquired
399 struct timeval tv;
400 tv.tv_sec = 0;
401 tv.tv_usec = PulseIntervall;
402 while (ToFetch > 0)
403 {
404 fd_set RFds;
405 fd_set WFds;
406 int Highest = 0;
407 FD_ZERO(&RFds);
408 FD_ZERO(&WFds);
409 SetFds(Highest,&RFds,&WFds);
410
411 int Res;
412 do
413 {
414 Res = select(Highest+1,&RFds,&WFds,0,&tv);
415 }
416 while (Res < 0 && errno == EINTR);
417
418 if (Res < 0)
419 {
420 _error->Errno("select","Select has failed");
421 break;
422 }
423
424 RunFds(&RFds,&WFds);
425 if (_error->PendingError() == true)
426 break;
427
428 // Timeout, notify the log class
429 if (Res == 0 || (Log != 0 && Log->Update == true))
430 {
431 tv.tv_usec = PulseIntervall;
432 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
433 I->Pulse();
434 if (Log != 0 && Log->Pulse(this) == false)
435 {
436 WasCancelled = true;
437 break;
438 }
439 }
440 }
441
442 if (Log != 0)
443 Log->Stop();
444
445 // Shut down the acquire bits
446 Running = false;
447 for (Queue *I = Queues; I != 0; I = I->Next)
448 I->Shutdown(false);
449
450 // Shut down the items
451 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
452 (*I)->Finished();
453
454 if (_error->PendingError())
455 return Failed;
456 if (WasCancelled)
457 return Cancelled;
458 return Continue;
459 }
460 /*}}}*/
461 // Acquire::Bump - Called when an item is dequeued /*{{{*/
462 // ---------------------------------------------------------------------
463 /* This routine bumps idle queues in hopes that they will be able to fetch
464 the dequeued item */
465 void pkgAcquire::Bump()
466 {
467 for (Queue *I = Queues; I != 0; I = I->Next)
468 I->Bump();
469 }
470 /*}}}*/
471 // Acquire::WorkerStep - Step to the next worker /*{{{*/
472 // ---------------------------------------------------------------------
473 /* Not inlined to advoid including acquire-worker.h */
474 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
475 {
476 return I->NextAcquire;
477 }
478 /*}}}*/
479 // Acquire::Clean - Cleans a directory /*{{{*/
480 // ---------------------------------------------------------------------
481 /* This is a bit simplistic, it looks at every file in the dir and sees
482 if it is part of the download set. */
483 bool pkgAcquire::Clean(string Dir)
484 {
485 // non-existing directories are by definition clean…
486 if (DirectoryExists(Dir) == false)
487 return true;
488
489 if(Dir == "/")
490 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
491
492 DIR *D = opendir(Dir.c_str());
493 if (D == 0)
494 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
495
496 string StartDir = SafeGetCWD();
497 if (chdir(Dir.c_str()) != 0)
498 {
499 closedir(D);
500 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
501 }
502
503 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
504 {
505 // Skip some files..
506 if (strcmp(Dir->d_name,"lock") == 0 ||
507 strcmp(Dir->d_name,"partial") == 0 ||
508 strcmp(Dir->d_name,".") == 0 ||
509 strcmp(Dir->d_name,"..") == 0)
510 continue;
511
512 // Look in the get list
513 ItemCIterator I = Items.begin();
514 for (; I != Items.end(); ++I)
515 if (flNotDir((*I)->DestFile) == Dir->d_name)
516 break;
517
518 // Nothing found, nuke it
519 if (I == Items.end())
520 unlink(Dir->d_name);
521 };
522
523 closedir(D);
524 if (chdir(StartDir.c_str()) != 0)
525 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
526 return true;
527 }
528 /*}}}*/
529 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
530 // ---------------------------------------------------------------------
531 /* This is the total number of bytes needed */
532 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
533 {
534 unsigned long long Total = 0;
535 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
536 Total += (*I)->FileSize;
537 return Total;
538 }
539 /*}}}*/
540 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
541 // ---------------------------------------------------------------------
542 /* This is the number of bytes that is not local */
543 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
544 {
545 unsigned long long Total = 0;
546 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
547 if ((*I)->Local == false)
548 Total += (*I)->FileSize;
549 return Total;
550 }
551 /*}}}*/
552 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
553 // ---------------------------------------------------------------------
554 /* This is the number of bytes that is not local */
555 APT_PURE unsigned long long pkgAcquire::PartialPresent()
556 {
557 unsigned long long Total = 0;
558 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
559 if ((*I)->Local == false)
560 Total += (*I)->PartialSize;
561 return Total;
562 }
563 /*}}}*/
564 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
565 // ---------------------------------------------------------------------
566 /* */
567 pkgAcquire::UriIterator pkgAcquire::UriBegin()
568 {
569 return UriIterator(Queues);
570 }
571 /*}}}*/
572 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
573 // ---------------------------------------------------------------------
574 /* */
575 pkgAcquire::UriIterator pkgAcquire::UriEnd()
576 {
577 return UriIterator(0);
578 }
579 /*}}}*/
580 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
581 // ---------------------------------------------------------------------
582 /* */
583 pkgAcquire::MethodConfig::MethodConfig()
584 {
585 SingleInstance = false;
586 Pipeline = false;
587 SendConfig = false;
588 LocalOnly = false;
589 Removable = false;
590 Next = 0;
591 }
592 /*}}}*/
593 // Queue::Queue - Constructor /*{{{*/
594 // ---------------------------------------------------------------------
595 /* */
596 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
597 Owner(Owner)
598 {
599 Items = 0;
600 Next = 0;
601 Workers = 0;
602 MaxPipeDepth = 1;
603 PipeDepth = 0;
604 }
605 /*}}}*/
606 // Queue::~Queue - Destructor /*{{{*/
607 // ---------------------------------------------------------------------
608 /* */
609 pkgAcquire::Queue::~Queue()
610 {
611 Shutdown(true);
612
613 while (Items != 0)
614 {
615 QItem *Jnk = Items;
616 Items = Items->Next;
617 delete Jnk;
618 }
619 }
620 /*}}}*/
621 // Queue::Enqueue - Queue an item to the queue /*{{{*/
622 // ---------------------------------------------------------------------
623 /* */
624 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
625 {
626 QItem **I = &Items;
627 // move to the end of the queue and check for duplicates here
628 for (; *I != 0; I = &(*I)->Next)
629 if (Item.URI == (*I)->URI)
630 {
631 Item.Owner->Status = Item::StatDone;
632 return false;
633 }
634
635 // Create a new item
636 QItem *Itm = new QItem;
637 *Itm = Item;
638 Itm->Next = 0;
639 *I = Itm;
640
641 Item.Owner->QueueCounter++;
642 if (Items->Next == 0)
643 Cycle();
644 return true;
645 }
646 /*}}}*/
647 // Queue::Dequeue - Remove an item from the queue /*{{{*/
648 // ---------------------------------------------------------------------
649 /* We return true if we hit something */
650 bool pkgAcquire::Queue::Dequeue(Item *Owner)
651 {
652 if (Owner->Status == pkgAcquire::Item::StatFetching)
653 return _error->Error("Tried to dequeue a fetching object");
654
655 bool Res = false;
656
657 QItem **I = &Items;
658 for (; *I != 0;)
659 {
660 if ((*I)->Owner == Owner)
661 {
662 QItem *Jnk= *I;
663 *I = (*I)->Next;
664 Owner->QueueCounter--;
665 delete Jnk;
666 Res = true;
667 }
668 else
669 I = &(*I)->Next;
670 }
671
672 return Res;
673 }
674 /*}}}*/
675 // Queue::Startup - Start the worker processes /*{{{*/
676 // ---------------------------------------------------------------------
677 /* It is possible for this to be called with a pre-existing set of
678 workers. */
679 bool pkgAcquire::Queue::Startup()
680 {
681 if (Workers == 0)
682 {
683 URI U(Name);
684 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
685 if (Cnf == 0)
686 return false;
687
688 Workers = new Worker(this,Cnf,Owner->Log);
689 Owner->Add(Workers);
690 if (Workers->Start() == false)
691 return false;
692
693 /* When pipelining we commit 10 items. This needs to change when we
694 added other source retry to have cycle maintain a pipeline depth
695 on its own. */
696 if (Cnf->Pipeline == true)
697 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
698 else
699 MaxPipeDepth = 1;
700 }
701
702 return Cycle();
703 }
704 /*}}}*/
705 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
706 // ---------------------------------------------------------------------
707 /* If final is true then all workers are eliminated, otherwise only workers
708 that do not need cleanup are removed */
709 bool pkgAcquire::Queue::Shutdown(bool Final)
710 {
711 // Delete all of the workers
712 pkgAcquire::Worker **Cur = &Workers;
713 while (*Cur != 0)
714 {
715 pkgAcquire::Worker *Jnk = *Cur;
716 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
717 {
718 *Cur = Jnk->NextQueue;
719 Owner->Remove(Jnk);
720 delete Jnk;
721 }
722 else
723 Cur = &(*Cur)->NextQueue;
724 }
725
726 return true;
727 }
728 /*}}}*/
729 // Queue::FindItem - Find a URI in the item list /*{{{*/
730 // ---------------------------------------------------------------------
731 /* */
732 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
733 {
734 for (QItem *I = Items; I != 0; I = I->Next)
735 if (I->URI == URI && I->Worker == Owner)
736 return I;
737 return 0;
738 }
739 /*}}}*/
740 // Queue::ItemDone - Item has been completed /*{{{*/
741 // ---------------------------------------------------------------------
742 /* The worker signals this which causes the item to be removed from the
743 queue. If this is the last queue instance then it is removed from the
744 main queue too.*/
745 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
746 {
747 PipeDepth--;
748 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
749 Itm->Owner->Status = pkgAcquire::Item::StatDone;
750
751 if (Itm->Owner->QueueCounter <= 1)
752 Owner->Dequeue(Itm->Owner);
753 else
754 {
755 Dequeue(Itm->Owner);
756 Owner->Bump();
757 }
758
759 return Cycle();
760 }
761 /*}}}*/
762 // Queue::Cycle - Queue new items into the method /*{{{*/
763 // ---------------------------------------------------------------------
764 /* This locates a new idle item and sends it to the worker. If pipelining
765 is enabled then it keeps the pipe full. */
766 bool pkgAcquire::Queue::Cycle()
767 {
768 if (Items == 0 || Workers == 0)
769 return true;
770
771 if (PipeDepth < 0)
772 return _error->Error("Pipedepth failure");
773
774 // Look for a queable item
775 QItem *I = Items;
776 while (PipeDepth < (signed)MaxPipeDepth)
777 {
778 for (; I != 0; I = I->Next)
779 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
780 break;
781
782 // Nothing to do, queue is idle.
783 if (I == 0)
784 return true;
785
786 I->Worker = Workers;
787 I->Owner->Status = pkgAcquire::Item::StatFetching;
788 PipeDepth++;
789 if (Workers->QueueItem(I) == false)
790 return false;
791 }
792
793 return true;
794 }
795 /*}}}*/
796 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
797 // ---------------------------------------------------------------------
798 /* This is called when an item in multiple queues is dequeued */
799 void pkgAcquire::Queue::Bump()
800 {
801 Cycle();
802 }
803 /*}}}*/
804 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
805 // ---------------------------------------------------------------------
806 /* */
807 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
808 {
809 Start();
810 }
811 /*}}}*/
812 // AcquireStatus::Pulse - Called periodically /*{{{*/
813 // ---------------------------------------------------------------------
814 /* This computes some internal state variables for the derived classes to
815 use. It generates the current downloaded bytes and total bytes to download
816 as well as the current CPS estimate. */
817 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
818 {
819 TotalBytes = 0;
820 CurrentBytes = 0;
821 TotalItems = 0;
822 CurrentItems = 0;
823
824 // Compute the total number of bytes to fetch
825 unsigned int Unknown = 0;
826 unsigned int Count = 0;
827 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
828 ++I, ++Count)
829 {
830 TotalItems++;
831 if ((*I)->Status == pkgAcquire::Item::StatDone)
832 ++CurrentItems;
833
834 // Totally ignore local items
835 if ((*I)->Local == true)
836 continue;
837
838 TotalBytes += (*I)->FileSize;
839 if ((*I)->Complete == true)
840 CurrentBytes += (*I)->FileSize;
841 if ((*I)->FileSize == 0 && (*I)->Complete == false)
842 ++Unknown;
843 }
844
845 // Compute the current completion
846 unsigned long long ResumeSize = 0;
847 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
848 I = Owner->WorkerStep(I))
849 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
850 {
851 CurrentBytes += I->CurrentSize;
852 ResumeSize += I->ResumePoint;
853
854 // Files with unknown size always have 100% completion
855 if (I->CurrentItem->Owner->FileSize == 0 &&
856 I->CurrentItem->Owner->Complete == false)
857 TotalBytes += I->CurrentSize;
858 }
859
860 // Normalize the figures and account for unknown size downloads
861 if (TotalBytes <= 0)
862 TotalBytes = 1;
863 if (Unknown == Count)
864 TotalBytes = Unknown;
865
866 // Wha?! Is not supposed to happen.
867 if (CurrentBytes > TotalBytes)
868 CurrentBytes = TotalBytes;
869
870 // Compute the CPS
871 struct timeval NewTime;
872 gettimeofday(&NewTime,0);
873 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
874 NewTime.tv_sec - Time.tv_sec > 6)
875 {
876 double Delta = NewTime.tv_sec - Time.tv_sec +
877 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
878
879 // Compute the CPS value
880 if (Delta < 0.01)
881 CurrentCPS = 0;
882 else
883 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
884 LastBytes = CurrentBytes - ResumeSize;
885 ElapsedTime = (unsigned long long)Delta;
886 Time = NewTime;
887 }
888
889 int fd = _config->FindI("APT::Status-Fd",-1);
890 if(fd > 0)
891 {
892 ostringstream status;
893
894 char msg[200];
895 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
896 unsigned long long ETA = 0;
897 if(CurrentCPS > 0)
898 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
899
900 // only show the ETA if it makes sense
901 if (ETA > 0 && ETA < 172800 /* two days */ )
902 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
903 else
904 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
905
906
907
908 // build the status str
909 status << "dlstatus:" << i
910 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
911 << ":" << msg
912 << endl;
913
914 std::string const dlstatus = status.str();
915 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
916 }
917
918 return true;
919 }
920 /*}}}*/
921 // AcquireStatus::Start - Called when the download is started /*{{{*/
922 // ---------------------------------------------------------------------
923 /* We just reset the counters */
924 void pkgAcquireStatus::Start()
925 {
926 gettimeofday(&Time,0);
927 gettimeofday(&StartTime,0);
928 LastBytes = 0;
929 CurrentCPS = 0;
930 CurrentBytes = 0;
931 TotalBytes = 0;
932 FetchedBytes = 0;
933 ElapsedTime = 0;
934 TotalItems = 0;
935 CurrentItems = 0;
936 }
937 /*}}}*/
938 // AcquireStatus::Stop - Finished downloading /*{{{*/
939 // ---------------------------------------------------------------------
940 /* This accurately computes the elapsed time and the total overall CPS. */
941 void pkgAcquireStatus::Stop()
942 {
943 // Compute the CPS and elapsed time
944 struct timeval NewTime;
945 gettimeofday(&NewTime,0);
946
947 double Delta = NewTime.tv_sec - StartTime.tv_sec +
948 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
949
950 // Compute the CPS value
951 if (Delta < 0.01)
952 CurrentCPS = 0;
953 else
954 CurrentCPS = FetchedBytes/Delta;
955 LastBytes = CurrentBytes;
956 ElapsedTime = (unsigned long long)Delta;
957 }
958 /*}}}*/
959 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
960 // ---------------------------------------------------------------------
961 /* This is used to get accurate final transfer rate reporting. */
962 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
963 {
964 FetchedBytes += Size - Resume;
965 }
966 /*}}}*/