]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Merge branch 'debian/sid' into debian/experimental
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40
41 #include <apti18n.h>
42 /*}}}*/
43
44 using namespace std;
45
46 // Acquire::pkgAcquire - Constructor /*{{{*/
47 // ---------------------------------------------------------------------
48 /* We grab some runtime state from the configuration space */
49 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
50 Debug(_config->FindB("Debug::pkgAcquire",false)),
51 Running(false)
52 {
53 string const Mode = _config->Find("Acquire::Queue-Mode","host");
54 if (strcasecmp(Mode.c_str(),"host") == 0)
55 QueueMode = QueueHost;
56 if (strcasecmp(Mode.c_str(),"access") == 0)
57 QueueMode = QueueAccess;
58 }
59 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
60 Configs(0), Log(Progress), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
62 Running(false)
63 {
64 string const Mode = _config->Find("Acquire::Queue-Mode","host");
65 if (strcasecmp(Mode.c_str(),"host") == 0)
66 QueueMode = QueueHost;
67 if (strcasecmp(Mode.c_str(),"access") == 0)
68 QueueMode = QueueAccess;
69 Setup(Progress, "");
70 }
71 /*}}}*/
72 // Acquire::Setup - Delayed Constructor /*{{{*/
73 // ---------------------------------------------------------------------
74 /* Do everything needed to be a complete Acquire object and report the
75 success (or failure) back so the user knows that something is wrong… */
76 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
77 {
78 Log = Progress;
79
80 // check for existence and possibly create auxiliary directories
81 string const listDir = _config->FindDir("Dir::State::lists");
82 string const partialListDir = listDir + "partial/";
83 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
84 string const partialArchivesDir = archivesDir + "partial/";
85
86 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
87 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
88 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
89
90 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
91 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
92 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
93
94 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
95 return true;
96
97 // Lock the directory this acquire object will work in
98 LockFD = GetLock(flCombine(Lock, "lock"));
99 if (LockFD == -1)
100 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
101
102 return true;
103 }
104 /*}}}*/
105 // Acquire::~pkgAcquire - Destructor /*{{{*/
106 // ---------------------------------------------------------------------
107 /* Free our memory, clean up the queues (destroy the workers) */
108 pkgAcquire::~pkgAcquire()
109 {
110 Shutdown();
111
112 if (LockFD != -1)
113 close(LockFD);
114
115 while (Configs != 0)
116 {
117 MethodConfig *Jnk = Configs;
118 Configs = Configs->Next;
119 delete Jnk;
120 }
121 }
122 /*}}}*/
123 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
124 // ---------------------------------------------------------------------
125 /* */
126 void pkgAcquire::Shutdown()
127 {
128 while (Items.empty() == false)
129 {
130 if (Items[0]->Status == Item::StatFetching)
131 Items[0]->Status = Item::StatError;
132 delete Items[0];
133 }
134
135 while (Queues != 0)
136 {
137 Queue *Jnk = Queues;
138 Queues = Queues->Next;
139 delete Jnk;
140 }
141 }
142 /*}}}*/
143 // Acquire::Add - Add a new item /*{{{*/
144 // ---------------------------------------------------------------------
145 /* This puts an item on the acquire list. This list is mainly for tracking
146 item status */
147 void pkgAcquire::Add(Item *Itm)
148 {
149 Items.push_back(Itm);
150 }
151 /*}}}*/
152 // Acquire::Remove - Remove a item /*{{{*/
153 // ---------------------------------------------------------------------
154 /* Remove an item from the acquire list. This is usually not used.. */
155 void pkgAcquire::Remove(Item *Itm)
156 {
157 Dequeue(Itm);
158
159 for (ItemIterator I = Items.begin(); I != Items.end();)
160 {
161 if (*I == Itm)
162 {
163 Items.erase(I);
164 I = Items.begin();
165 }
166 else
167 ++I;
168 }
169 }
170 /*}}}*/
171 // Acquire::Add - Add a worker /*{{{*/
172 // ---------------------------------------------------------------------
173 /* A list of workers is kept so that the select loop can direct their FD
174 usage. */
175 void pkgAcquire::Add(Worker *Work)
176 {
177 Work->NextAcquire = Workers;
178 Workers = Work;
179 }
180 /*}}}*/
181 // Acquire::Remove - Remove a worker /*{{{*/
182 // ---------------------------------------------------------------------
183 /* A worker has died. This can not be done while the select loop is running
184 as it would require that RunFds could handling a changing list state and
185 it can't.. */
186 void pkgAcquire::Remove(Worker *Work)
187 {
188 if (Running == true)
189 abort();
190
191 Worker **I = &Workers;
192 for (; *I != 0;)
193 {
194 if (*I == Work)
195 *I = (*I)->NextAcquire;
196 else
197 I = &(*I)->NextAcquire;
198 }
199 }
200 /*}}}*/
201 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
202 // ---------------------------------------------------------------------
203 /* This is the entry point for an item. An item calls this function when
204 it is constructed which creates a queue (based on the current queue
205 mode) and puts the item in that queue. If the system is running then
206 the queue might be started. */
207 void pkgAcquire::Enqueue(ItemDesc &Item)
208 {
209 // Determine which queue to put the item in
210 const MethodConfig *Config;
211 string Name = QueueName(Item.URI,Config);
212 if (Name.empty() == true)
213 return;
214
215 // Find the queue structure
216 Queue *I = Queues;
217 for (; I != 0 && I->Name != Name; I = I->Next);
218 if (I == 0)
219 {
220 I = new Queue(Name,this);
221 I->Next = Queues;
222 Queues = I;
223
224 if (Running == true)
225 I->Startup();
226 }
227
228 // See if this is a local only URI
229 if (Config->LocalOnly == true && Item.Owner->Complete == false)
230 Item.Owner->Local = true;
231 Item.Owner->Status = Item::StatIdle;
232
233 // Queue it into the named queue
234 if(I->Enqueue(Item))
235 ToFetch++;
236
237 // Some trace stuff
238 if (Debug == true)
239 {
240 clog << "Fetching " << Item.URI << endl;
241 clog << " to " << Item.Owner->DestFile << endl;
242 clog << " Queue is: " << Name << endl;
243 }
244 }
245 /*}}}*/
246 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
247 // ---------------------------------------------------------------------
248 /* This is called when an item is finished being fetched. It removes it
249 from all the queues */
250 void pkgAcquire::Dequeue(Item *Itm)
251 {
252 Queue *I = Queues;
253 bool Res = false;
254 if (Debug == true)
255 clog << "Dequeuing " << Itm->DestFile << endl;
256
257 for (; I != 0; I = I->Next)
258 {
259 if (I->Dequeue(Itm))
260 {
261 Res = true;
262 if (Debug == true)
263 clog << "Dequeued from " << I->Name << endl;
264 }
265 }
266
267 if (Res == true)
268 ToFetch--;
269 }
270 /*}}}*/
271 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
272 // ---------------------------------------------------------------------
273 /* The string returned depends on the configuration settings and the
274 method parameters. Given something like http://foo.org/bar it can
275 return http://foo.org or http */
276 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
277 {
278 URI U(Uri);
279
280 Config = GetConfig(U.Access);
281 if (Config == 0)
282 return string();
283
284 /* Single-Instance methods get exactly one queue per URI. This is
285 also used for the Access queue method */
286 if (Config->SingleInstance == true || QueueMode == QueueAccess)
287 return U.Access;
288
289 string AccessSchema = U.Access + ':',
290 FullQueueName = AccessSchema + U.Host;
291 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
292
293 Queue *I = Queues;
294 for (; I != 0; I = I->Next) {
295 // if the queue already exists, re-use it
296 if (I->Name == FullQueueName)
297 return FullQueueName;
298
299 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
300 Instances++;
301 }
302
303 if (Debug) {
304 clog << "Found " << Instances << " instances of " << U.Access << endl;
305 }
306
307 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
308 return U.Access;
309
310 return FullQueueName;
311 }
312 /*}}}*/
313 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
314 // ---------------------------------------------------------------------
315 /* This locates the configuration structure for an access method. If
316 a config structure cannot be found a Worker will be created to
317 retrieve it */
318 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
319 {
320 // Search for an existing config
321 MethodConfig *Conf;
322 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
323 if (Conf->Access == Access)
324 return Conf;
325
326 // Create the new config class
327 Conf = new MethodConfig;
328 Conf->Access = Access;
329 Conf->Next = Configs;
330 Configs = Conf;
331
332 // Create the worker to fetch the configuration
333 Worker Work(Conf);
334 if (Work.Start() == false)
335 return 0;
336
337 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
338 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
339 Conf->SingleInstance = true;
340
341 return Conf;
342 }
343 /*}}}*/
344 // Acquire::SetFds - Deal with readable FDs /*{{{*/
345 // ---------------------------------------------------------------------
346 /* Collect FDs that have activity monitors into the fd sets */
347 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
348 {
349 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
350 {
351 if (I->InReady == true && I->InFd >= 0)
352 {
353 if (Fd < I->InFd)
354 Fd = I->InFd;
355 FD_SET(I->InFd,RSet);
356 }
357 if (I->OutReady == true && I->OutFd >= 0)
358 {
359 if (Fd < I->OutFd)
360 Fd = I->OutFd;
361 FD_SET(I->OutFd,WSet);
362 }
363 }
364 }
365 /*}}}*/
366 // Acquire::RunFds - Deal with active FDs /*{{{*/
367 // ---------------------------------------------------------------------
368 /* Dispatch active FDs over to the proper workers. It is very important
369 that a worker never be erased while this is running! The queue class
370 should never erase a worker except during shutdown processing. */
371 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
372 {
373 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
374 {
375 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
376 I->InFdReady();
377 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
378 I->OutFdReady();
379 }
380 }
381 /*}}}*/
382 // Acquire::Run - Run the fetch sequence /*{{{*/
383 // ---------------------------------------------------------------------
384 /* This runs the queues. It manages a select loop for all of the
385 Worker tasks. The workers interact with the queues and items to
386 manage the actual fetch. */
387 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
388 {
389 Running = true;
390
391 for (Queue *I = Queues; I != 0; I = I->Next)
392 I->Startup();
393
394 if (Log != 0)
395 Log->Start();
396
397 bool WasCancelled = false;
398
399 // Run till all things have been acquired
400 struct timeval tv;
401 tv.tv_sec = 0;
402 tv.tv_usec = PulseIntervall;
403 while (ToFetch > 0)
404 {
405 fd_set RFds;
406 fd_set WFds;
407 int Highest = 0;
408 FD_ZERO(&RFds);
409 FD_ZERO(&WFds);
410 SetFds(Highest,&RFds,&WFds);
411
412 int Res;
413 do
414 {
415 Res = select(Highest+1,&RFds,&WFds,0,&tv);
416 }
417 while (Res < 0 && errno == EINTR);
418
419 if (Res < 0)
420 {
421 _error->Errno("select","Select has failed");
422 break;
423 }
424
425 RunFds(&RFds,&WFds);
426 if (_error->PendingError() == true)
427 break;
428
429 // Timeout, notify the log class
430 if (Res == 0 || (Log != 0 && Log->Update == true))
431 {
432 tv.tv_usec = PulseIntervall;
433 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
434 I->Pulse();
435 if (Log != 0 && Log->Pulse(this) == false)
436 {
437 WasCancelled = true;
438 break;
439 }
440 }
441 }
442
443 if (Log != 0)
444 Log->Stop();
445
446 // Shut down the acquire bits
447 Running = false;
448 for (Queue *I = Queues; I != 0; I = I->Next)
449 I->Shutdown(false);
450
451 // Shut down the items
452 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
453 (*I)->Finished();
454
455 if (_error->PendingError())
456 return Failed;
457 if (WasCancelled)
458 return Cancelled;
459 return Continue;
460 }
461 /*}}}*/
462 // Acquire::Bump - Called when an item is dequeued /*{{{*/
463 // ---------------------------------------------------------------------
464 /* This routine bumps idle queues in hopes that they will be able to fetch
465 the dequeued item */
466 void pkgAcquire::Bump()
467 {
468 for (Queue *I = Queues; I != 0; I = I->Next)
469 I->Bump();
470 }
471 /*}}}*/
472 // Acquire::WorkerStep - Step to the next worker /*{{{*/
473 // ---------------------------------------------------------------------
474 /* Not inlined to advoid including acquire-worker.h */
475 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
476 {
477 return I->NextAcquire;
478 }
479 /*}}}*/
480 // Acquire::Clean - Cleans a directory /*{{{*/
481 // ---------------------------------------------------------------------
482 /* This is a bit simplistic, it looks at every file in the dir and sees
483 if it is part of the download set. */
484 bool pkgAcquire::Clean(string Dir)
485 {
486 // non-existing directories are by definition clean…
487 if (DirectoryExists(Dir) == false)
488 return true;
489
490 DIR *D = opendir(Dir.c_str());
491 if (D == 0)
492 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
493
494 string StartDir = SafeGetCWD();
495 if (chdir(Dir.c_str()) != 0)
496 {
497 closedir(D);
498 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
499 }
500
501 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
502 {
503 // Skip some files..
504 if (strcmp(Dir->d_name,"lock") == 0 ||
505 strcmp(Dir->d_name,"partial") == 0 ||
506 strcmp(Dir->d_name,".") == 0 ||
507 strcmp(Dir->d_name,"..") == 0)
508 continue;
509
510 // Look in the get list
511 ItemCIterator I = Items.begin();
512 for (; I != Items.end(); ++I)
513 if (flNotDir((*I)->DestFile) == Dir->d_name)
514 break;
515
516 // Nothing found, nuke it
517 if (I == Items.end())
518 unlink(Dir->d_name);
519 };
520
521 closedir(D);
522 if (chdir(StartDir.c_str()) != 0)
523 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
524 return true;
525 }
526 /*}}}*/
527 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
528 // ---------------------------------------------------------------------
529 /* This is the total number of bytes needed */
530 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
531 {
532 unsigned long long Total = 0;
533 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
534 Total += (*I)->FileSize;
535 return Total;
536 }
537 /*}}}*/
538 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
539 // ---------------------------------------------------------------------
540 /* This is the number of bytes that is not local */
541 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
542 {
543 unsigned long long Total = 0;
544 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
545 if ((*I)->Local == false)
546 Total += (*I)->FileSize;
547 return Total;
548 }
549 /*}}}*/
550 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
551 // ---------------------------------------------------------------------
552 /* This is the number of bytes that is not local */
553 APT_PURE unsigned long long pkgAcquire::PartialPresent()
554 {
555 unsigned long long Total = 0;
556 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
557 if ((*I)->Local == false)
558 Total += (*I)->PartialSize;
559 return Total;
560 }
561 /*}}}*/
562 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
563 // ---------------------------------------------------------------------
564 /* */
565 pkgAcquire::UriIterator pkgAcquire::UriBegin()
566 {
567 return UriIterator(Queues);
568 }
569 /*}}}*/
570 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
571 // ---------------------------------------------------------------------
572 /* */
573 pkgAcquire::UriIterator pkgAcquire::UriEnd()
574 {
575 return UriIterator(0);
576 }
577 /*}}}*/
578 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
579 // ---------------------------------------------------------------------
580 /* */
581 pkgAcquire::MethodConfig::MethodConfig()
582 {
583 SingleInstance = false;
584 Pipeline = false;
585 SendConfig = false;
586 LocalOnly = false;
587 Removable = false;
588 Next = 0;
589 }
590 /*}}}*/
591 // Queue::Queue - Constructor /*{{{*/
592 // ---------------------------------------------------------------------
593 /* */
594 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
595 Owner(Owner)
596 {
597 Items = 0;
598 Next = 0;
599 Workers = 0;
600 MaxPipeDepth = 1;
601 PipeDepth = 0;
602 }
603 /*}}}*/
604 // Queue::~Queue - Destructor /*{{{*/
605 // ---------------------------------------------------------------------
606 /* */
607 pkgAcquire::Queue::~Queue()
608 {
609 Shutdown(true);
610
611 while (Items != 0)
612 {
613 QItem *Jnk = Items;
614 Items = Items->Next;
615 delete Jnk;
616 }
617 }
618 /*}}}*/
619 // Queue::Enqueue - Queue an item to the queue /*{{{*/
620 // ---------------------------------------------------------------------
621 /* */
622 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
623 {
624 QItem **I = &Items;
625 // move to the end of the queue and check for duplicates here
626 for (; *I != 0; I = &(*I)->Next)
627 if (Item.URI == (*I)->URI)
628 {
629 Item.Owner->Status = Item::StatDone;
630 return false;
631 }
632
633 // Create a new item
634 QItem *Itm = new QItem;
635 *Itm = Item;
636 Itm->Next = 0;
637 *I = Itm;
638
639 Item.Owner->QueueCounter++;
640 if (Items->Next == 0)
641 Cycle();
642 return true;
643 }
644 /*}}}*/
645 // Queue::Dequeue - Remove an item from the queue /*{{{*/
646 // ---------------------------------------------------------------------
647 /* We return true if we hit something */
648 bool pkgAcquire::Queue::Dequeue(Item *Owner)
649 {
650 if (Owner->Status == pkgAcquire::Item::StatFetching)
651 return _error->Error("Tried to dequeue a fetching object");
652
653 bool Res = false;
654
655 QItem **I = &Items;
656 for (; *I != 0;)
657 {
658 if ((*I)->Owner == Owner)
659 {
660 QItem *Jnk= *I;
661 *I = (*I)->Next;
662 Owner->QueueCounter--;
663 delete Jnk;
664 Res = true;
665 }
666 else
667 I = &(*I)->Next;
668 }
669
670 return Res;
671 }
672 /*}}}*/
673 // Queue::Startup - Start the worker processes /*{{{*/
674 // ---------------------------------------------------------------------
675 /* It is possible for this to be called with a pre-existing set of
676 workers. */
677 bool pkgAcquire::Queue::Startup()
678 {
679 if (Workers == 0)
680 {
681 URI U(Name);
682 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
683 if (Cnf == 0)
684 return false;
685
686 Workers = new Worker(this,Cnf,Owner->Log);
687 Owner->Add(Workers);
688 if (Workers->Start() == false)
689 return false;
690
691 /* When pipelining we commit 10 items. This needs to change when we
692 added other source retry to have cycle maintain a pipeline depth
693 on its own. */
694 if (Cnf->Pipeline == true)
695 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
696 else
697 MaxPipeDepth = 1;
698 }
699
700 return Cycle();
701 }
702 /*}}}*/
703 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
704 // ---------------------------------------------------------------------
705 /* If final is true then all workers are eliminated, otherwise only workers
706 that do not need cleanup are removed */
707 bool pkgAcquire::Queue::Shutdown(bool Final)
708 {
709 // Delete all of the workers
710 pkgAcquire::Worker **Cur = &Workers;
711 while (*Cur != 0)
712 {
713 pkgAcquire::Worker *Jnk = *Cur;
714 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
715 {
716 *Cur = Jnk->NextQueue;
717 Owner->Remove(Jnk);
718 delete Jnk;
719 }
720 else
721 Cur = &(*Cur)->NextQueue;
722 }
723
724 return true;
725 }
726 /*}}}*/
727 // Queue::FindItem - Find a URI in the item list /*{{{*/
728 // ---------------------------------------------------------------------
729 /* */
730 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
731 {
732 for (QItem *I = Items; I != 0; I = I->Next)
733 if (I->URI == URI && I->Worker == Owner)
734 return I;
735 return 0;
736 }
737 /*}}}*/
738 // Queue::ItemDone - Item has been completed /*{{{*/
739 // ---------------------------------------------------------------------
740 /* The worker signals this which causes the item to be removed from the
741 queue. If this is the last queue instance then it is removed from the
742 main queue too.*/
743 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
744 {
745 PipeDepth--;
746 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
747 Itm->Owner->Status = pkgAcquire::Item::StatDone;
748
749 if (Itm->Owner->QueueCounter <= 1)
750 Owner->Dequeue(Itm->Owner);
751 else
752 {
753 Dequeue(Itm->Owner);
754 Owner->Bump();
755 }
756
757 return Cycle();
758 }
759 /*}}}*/
760 // Queue::Cycle - Queue new items into the method /*{{{*/
761 // ---------------------------------------------------------------------
762 /* This locates a new idle item and sends it to the worker. If pipelining
763 is enabled then it keeps the pipe full. */
764 bool pkgAcquire::Queue::Cycle()
765 {
766 if (Items == 0 || Workers == 0)
767 return true;
768
769 if (PipeDepth < 0)
770 return _error->Error("Pipedepth failure");
771
772 // Look for a queable item
773 QItem *I = Items;
774 while (PipeDepth < (signed)MaxPipeDepth)
775 {
776 for (; I != 0; I = I->Next)
777 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
778 break;
779
780 // Nothing to do, queue is idle.
781 if (I == 0)
782 return true;
783
784 I->Worker = Workers;
785 I->Owner->Status = pkgAcquire::Item::StatFetching;
786 PipeDepth++;
787 if (Workers->QueueItem(I) == false)
788 return false;
789 }
790
791 return true;
792 }
793 /*}}}*/
794 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
795 // ---------------------------------------------------------------------
796 /* This is called when an item in multiple queues is dequeued */
797 void pkgAcquire::Queue::Bump()
798 {
799 Cycle();
800 }
801 /*}}}*/
802 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
803 // ---------------------------------------------------------------------
804 /* */
805 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
806 {
807 Start();
808 }
809 /*}}}*/
810 // AcquireStatus::Pulse - Called periodically /*{{{*/
811 // ---------------------------------------------------------------------
812 /* This computes some internal state variables for the derived classes to
813 use. It generates the current downloaded bytes and total bytes to download
814 as well as the current CPS estimate. */
815 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
816 {
817 TotalBytes = 0;
818 CurrentBytes = 0;
819 TotalItems = 0;
820 CurrentItems = 0;
821
822 // Compute the total number of bytes to fetch
823 unsigned int Unknown = 0;
824 unsigned int Count = 0;
825 bool UnfetchedReleaseFiles = false;
826 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
827 I != Owner->ItemsEnd();
828 ++I, ++Count)
829 {
830 TotalItems++;
831 if ((*I)->Status == pkgAcquire::Item::StatDone)
832 ++CurrentItems;
833
834 // Totally ignore local items
835 if ((*I)->Local == true)
836 continue;
837
838 // see if the method tells us to expect more
839 TotalItems += (*I)->ExpectedAdditionalItems;
840
841 // check if there are unfetched Release files
842 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
843 UnfetchedReleaseFiles = true;
844
845 TotalBytes += (*I)->FileSize;
846 if ((*I)->Complete == true)
847 CurrentBytes += (*I)->FileSize;
848 if ((*I)->FileSize == 0 && (*I)->Complete == false)
849 ++Unknown;
850 }
851
852 // Compute the current completion
853 unsigned long long ResumeSize = 0;
854 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
855 I = Owner->WorkerStep(I))
856 {
857 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
858 {
859 CurrentBytes += I->CurrentSize;
860 ResumeSize += I->ResumePoint;
861
862 // Files with unknown size always have 100% completion
863 if (I->CurrentItem->Owner->FileSize == 0 &&
864 I->CurrentItem->Owner->Complete == false)
865 TotalBytes += I->CurrentSize;
866 }
867 }
868
869 // Normalize the figures and account for unknown size downloads
870 if (TotalBytes <= 0)
871 TotalBytes = 1;
872 if (Unknown == Count)
873 TotalBytes = Unknown;
874
875 // Wha?! Is not supposed to happen.
876 if (CurrentBytes > TotalBytes)
877 CurrentBytes = TotalBytes;
878
879 // debug
880 if (_config->FindB("Debug::acquire::progress", false) == true)
881 std::clog << " Bytes: "
882 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
883 << std::endl;
884
885 // Compute the CPS
886 struct timeval NewTime;
887 gettimeofday(&NewTime,0);
888 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
889 NewTime.tv_sec - Time.tv_sec > 6)
890 {
891 double Delta = NewTime.tv_sec - Time.tv_sec +
892 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
893
894 // Compute the CPS value
895 if (Delta < 0.01)
896 CurrentCPS = 0;
897 else
898 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
899 LastBytes = CurrentBytes - ResumeSize;
900 ElapsedTime = (unsigned long long)Delta;
901 Time = NewTime;
902 }
903
904 // calculate the percentage, if we have too little data assume 1%
905 if (TotalBytes > 0 && UnfetchedReleaseFiles)
906 Percent = 0;
907 else
908 // use both files and bytes because bytes can be unreliable
909 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
910 0.2 * (CurrentItems/float(TotalItems)*100.0));
911
912 int fd = _config->FindI("APT::Status-Fd",-1);
913 if(fd > 0)
914 {
915 ostringstream status;
916
917 char msg[200];
918 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
919 unsigned long long ETA = 0;
920 if(CurrentCPS > 0)
921 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
922
923 // only show the ETA if it makes sense
924 if (ETA > 0 && ETA < 172800 /* two days */ )
925 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
926 else
927 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
928
929 // build the status str
930 status << "dlstatus:" << i
931 << ":" << std::setprecision(3) << Percent
932 << ":" << msg
933 << endl;
934
935 std::string const dlstatus = status.str();
936 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
937 }
938
939 return true;
940 }
941 /*}}}*/
942 // AcquireStatus::Start - Called when the download is started /*{{{*/
943 // ---------------------------------------------------------------------
944 /* We just reset the counters */
945 void pkgAcquireStatus::Start()
946 {
947 gettimeofday(&Time,0);
948 gettimeofday(&StartTime,0);
949 LastBytes = 0;
950 CurrentCPS = 0;
951 CurrentBytes = 0;
952 TotalBytes = 0;
953 FetchedBytes = 0;
954 ElapsedTime = 0;
955 TotalItems = 0;
956 CurrentItems = 0;
957 }
958 /*}}}*/
959 // AcquireStatus::Stop - Finished downloading /*{{{*/
960 // ---------------------------------------------------------------------
961 /* This accurately computes the elapsed time and the total overall CPS. */
962 void pkgAcquireStatus::Stop()
963 {
964 // Compute the CPS and elapsed time
965 struct timeval NewTime;
966 gettimeofday(&NewTime,0);
967
968 double Delta = NewTime.tv_sec - StartTime.tv_sec +
969 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
970
971 // Compute the CPS value
972 if (Delta < 0.01)
973 CurrentCPS = 0;
974 else
975 CurrentCPS = FetchedBytes/Delta;
976 LastBytes = CurrentBytes;
977 ElapsedTime = (unsigned long long)Delta;
978 }
979 /*}}}*/
980 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
981 // ---------------------------------------------------------------------
982 /* This is used to get accurate final transfer rate reporting. */
983 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
984 {
985 FetchedBytes += Size - Resume;
986 }
987 /*}}}*/