]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
EDSP: bump protocol version to 0.5
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34
35 #include <dirent.h>
36 #include <sys/time.h>
37 #include <sys/select.h>
38 #include <errno.h>
39
40 #include <apti18n.h>
41 /*}}}*/
42
43 using namespace std;
44
45 // Acquire::pkgAcquire - Constructor /*{{{*/
46 // ---------------------------------------------------------------------
47 /* We grab some runtime state from the configuration space */
48 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
49 Debug(_config->FindB("Debug::pkgAcquire",false)),
50 Running(false)
51 {
52 string const Mode = _config->Find("Acquire::Queue-Mode","host");
53 if (strcasecmp(Mode.c_str(),"host") == 0)
54 QueueMode = QueueHost;
55 if (strcasecmp(Mode.c_str(),"access") == 0)
56 QueueMode = QueueAccess;
57 }
58 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
59 Configs(0), Log(Progress), ToFetch(0),
60 Debug(_config->FindB("Debug::pkgAcquire",false)),
61 Running(false)
62 {
63 string const Mode = _config->Find("Acquire::Queue-Mode","host");
64 if (strcasecmp(Mode.c_str(),"host") == 0)
65 QueueMode = QueueHost;
66 if (strcasecmp(Mode.c_str(),"access") == 0)
67 QueueMode = QueueAccess;
68 Setup(Progress, "");
69 }
70 /*}}}*/
71 // Acquire::Setup - Delayed Constructor /*{{{*/
72 // ---------------------------------------------------------------------
73 /* Do everything needed to be a complete Acquire object and report the
74 success (or failure) back so the user knows that something is wrong… */
75 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
76 {
77 Log = Progress;
78
79 // check for existence and possibly create auxiliary directories
80 string const listDir = _config->FindDir("Dir::State::lists");
81 string const partialListDir = listDir + "partial/";
82 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
83 string const partialArchivesDir = archivesDir + "partial/";
84
85 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
86 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
87 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
88
89 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
90 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
91 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
92
93 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
94 return true;
95
96 // Lock the directory this acquire object will work in
97 LockFD = GetLock(flCombine(Lock, "lock"));
98 if (LockFD == -1)
99 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
100
101 return true;
102 }
103 /*}}}*/
104 // Acquire::~pkgAcquire - Destructor /*{{{*/
105 // ---------------------------------------------------------------------
106 /* Free our memory, clean up the queues (destroy the workers) */
107 pkgAcquire::~pkgAcquire()
108 {
109 Shutdown();
110
111 if (LockFD != -1)
112 close(LockFD);
113
114 while (Configs != 0)
115 {
116 MethodConfig *Jnk = Configs;
117 Configs = Configs->Next;
118 delete Jnk;
119 }
120 }
121 /*}}}*/
122 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
123 // ---------------------------------------------------------------------
124 /* */
125 void pkgAcquire::Shutdown()
126 {
127 while (Items.empty() == false)
128 {
129 if (Items[0]->Status == Item::StatFetching)
130 Items[0]->Status = Item::StatError;
131 delete Items[0];
132 }
133
134 while (Queues != 0)
135 {
136 Queue *Jnk = Queues;
137 Queues = Queues->Next;
138 delete Jnk;
139 }
140 }
141 /*}}}*/
142 // Acquire::Add - Add a new item /*{{{*/
143 // ---------------------------------------------------------------------
144 /* This puts an item on the acquire list. This list is mainly for tracking
145 item status */
146 void pkgAcquire::Add(Item *Itm)
147 {
148 Items.push_back(Itm);
149 }
150 /*}}}*/
151 // Acquire::Remove - Remove a item /*{{{*/
152 // ---------------------------------------------------------------------
153 /* Remove an item from the acquire list. This is usually not used.. */
154 void pkgAcquire::Remove(Item *Itm)
155 {
156 Dequeue(Itm);
157
158 for (ItemIterator I = Items.begin(); I != Items.end();)
159 {
160 if (*I == Itm)
161 {
162 Items.erase(I);
163 I = Items.begin();
164 }
165 else
166 ++I;
167 }
168 }
169 /*}}}*/
170 // Acquire::Add - Add a worker /*{{{*/
171 // ---------------------------------------------------------------------
172 /* A list of workers is kept so that the select loop can direct their FD
173 usage. */
174 void pkgAcquire::Add(Worker *Work)
175 {
176 Work->NextAcquire = Workers;
177 Workers = Work;
178 }
179 /*}}}*/
180 // Acquire::Remove - Remove a worker /*{{{*/
181 // ---------------------------------------------------------------------
182 /* A worker has died. This can not be done while the select loop is running
183 as it would require that RunFds could handling a changing list state and
184 it can't.. */
185 void pkgAcquire::Remove(Worker *Work)
186 {
187 if (Running == true)
188 abort();
189
190 Worker **I = &Workers;
191 for (; *I != 0;)
192 {
193 if (*I == Work)
194 *I = (*I)->NextAcquire;
195 else
196 I = &(*I)->NextAcquire;
197 }
198 }
199 /*}}}*/
200 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
201 // ---------------------------------------------------------------------
202 /* This is the entry point for an item. An item calls this function when
203 it is constructed which creates a queue (based on the current queue
204 mode) and puts the item in that queue. If the system is running then
205 the queue might be started. */
206 void pkgAcquire::Enqueue(ItemDesc &Item)
207 {
208 // Determine which queue to put the item in
209 const MethodConfig *Config;
210 string Name = QueueName(Item.URI,Config);
211 if (Name.empty() == true)
212 return;
213
214 // Find the queue structure
215 Queue *I = Queues;
216 for (; I != 0 && I->Name != Name; I = I->Next);
217 if (I == 0)
218 {
219 I = new Queue(Name,this);
220 I->Next = Queues;
221 Queues = I;
222
223 if (Running == true)
224 I->Startup();
225 }
226
227 // See if this is a local only URI
228 if (Config->LocalOnly == true && Item.Owner->Complete == false)
229 Item.Owner->Local = true;
230 Item.Owner->Status = Item::StatIdle;
231
232 // Queue it into the named queue
233 if(I->Enqueue(Item))
234 ToFetch++;
235
236 // Some trace stuff
237 if (Debug == true)
238 {
239 clog << "Fetching " << Item.URI << endl;
240 clog << " to " << Item.Owner->DestFile << endl;
241 clog << " Queue is: " << Name << endl;
242 }
243 }
244 /*}}}*/
245 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
246 // ---------------------------------------------------------------------
247 /* This is called when an item is finished being fetched. It removes it
248 from all the queues */
249 void pkgAcquire::Dequeue(Item *Itm)
250 {
251 Queue *I = Queues;
252 bool Res = false;
253 if (Debug == true)
254 clog << "Dequeuing " << Itm->DestFile << endl;
255
256 for (; I != 0; I = I->Next)
257 {
258 if (I->Dequeue(Itm))
259 {
260 Res = true;
261 if (Debug == true)
262 clog << "Dequeued from " << I->Name << endl;
263 }
264 }
265
266 if (Res == true)
267 ToFetch--;
268 }
269 /*}}}*/
270 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
271 // ---------------------------------------------------------------------
272 /* The string returned depends on the configuration settings and the
273 method parameters. Given something like http://foo.org/bar it can
274 return http://foo.org or http */
275 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
276 {
277 URI U(Uri);
278
279 Config = GetConfig(U.Access);
280 if (Config == 0)
281 return string();
282
283 /* Single-Instance methods get exactly one queue per URI. This is
284 also used for the Access queue method */
285 if (Config->SingleInstance == true || QueueMode == QueueAccess)
286 return U.Access;
287
288 string AccessSchema = U.Access + ':',
289 FullQueueName = AccessSchema + U.Host;
290 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
291
292 Queue *I = Queues;
293 for (; I != 0; I = I->Next) {
294 // if the queue already exists, re-use it
295 if (I->Name == FullQueueName)
296 return FullQueueName;
297
298 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
299 Instances++;
300 }
301
302 if (Debug) {
303 clog << "Found " << Instances << " instances of " << U.Access << endl;
304 }
305
306 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
307 return U.Access;
308
309 return FullQueueName;
310 }
311 /*}}}*/
312 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
313 // ---------------------------------------------------------------------
314 /* This locates the configuration structure for an access method. If
315 a config structure cannot be found a Worker will be created to
316 retrieve it */
317 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
318 {
319 // Search for an existing config
320 MethodConfig *Conf;
321 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
322 if (Conf->Access == Access)
323 return Conf;
324
325 // Create the new config class
326 Conf = new MethodConfig;
327 Conf->Access = Access;
328 Conf->Next = Configs;
329 Configs = Conf;
330
331 // Create the worker to fetch the configuration
332 Worker Work(Conf);
333 if (Work.Start() == false)
334 return 0;
335
336 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
337 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
338 Conf->SingleInstance = true;
339
340 return Conf;
341 }
342 /*}}}*/
343 // Acquire::SetFds - Deal with readable FDs /*{{{*/
344 // ---------------------------------------------------------------------
345 /* Collect FDs that have activity monitors into the fd sets */
346 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
347 {
348 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
349 {
350 if (I->InReady == true && I->InFd >= 0)
351 {
352 if (Fd < I->InFd)
353 Fd = I->InFd;
354 FD_SET(I->InFd,RSet);
355 }
356 if (I->OutReady == true && I->OutFd >= 0)
357 {
358 if (Fd < I->OutFd)
359 Fd = I->OutFd;
360 FD_SET(I->OutFd,WSet);
361 }
362 }
363 }
364 /*}}}*/
365 // Acquire::RunFds - Deal with active FDs /*{{{*/
366 // ---------------------------------------------------------------------
367 /* Dispatch active FDs over to the proper workers. It is very important
368 that a worker never be erased while this is running! The queue class
369 should never erase a worker except during shutdown processing. */
370 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
371 {
372 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
373 {
374 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
375 I->InFdReady();
376 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
377 I->OutFdReady();
378 }
379 }
380 /*}}}*/
381 // Acquire::Run - Run the fetch sequence /*{{{*/
382 // ---------------------------------------------------------------------
383 /* This runs the queues. It manages a select loop for all of the
384 Worker tasks. The workers interact with the queues and items to
385 manage the actual fetch. */
386 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
387 {
388 Running = true;
389
390 for (Queue *I = Queues; I != 0; I = I->Next)
391 I->Startup();
392
393 if (Log != 0)
394 Log->Start();
395
396 bool WasCancelled = false;
397
398 // Run till all things have been acquired
399 struct timeval tv;
400 tv.tv_sec = 0;
401 tv.tv_usec = PulseIntervall;
402 while (ToFetch > 0)
403 {
404 fd_set RFds;
405 fd_set WFds;
406 int Highest = 0;
407 FD_ZERO(&RFds);
408 FD_ZERO(&WFds);
409 SetFds(Highest,&RFds,&WFds);
410
411 int Res;
412 do
413 {
414 Res = select(Highest+1,&RFds,&WFds,0,&tv);
415 }
416 while (Res < 0 && errno == EINTR);
417
418 if (Res < 0)
419 {
420 _error->Errno("select","Select has failed");
421 break;
422 }
423
424 RunFds(&RFds,&WFds);
425 if (_error->PendingError() == true)
426 break;
427
428 // Timeout, notify the log class
429 if (Res == 0 || (Log != 0 && Log->Update == true))
430 {
431 tv.tv_usec = PulseIntervall;
432 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
433 I->Pulse();
434 if (Log != 0 && Log->Pulse(this) == false)
435 {
436 WasCancelled = true;
437 break;
438 }
439 }
440 }
441
442 if (Log != 0)
443 Log->Stop();
444
445 // Shut down the acquire bits
446 Running = false;
447 for (Queue *I = Queues; I != 0; I = I->Next)
448 I->Shutdown(false);
449
450 // Shut down the items
451 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
452 (*I)->Finished();
453
454 if (_error->PendingError())
455 return Failed;
456 if (WasCancelled)
457 return Cancelled;
458 return Continue;
459 }
460 /*}}}*/
461 // Acquire::Bump - Called when an item is dequeued /*{{{*/
462 // ---------------------------------------------------------------------
463 /* This routine bumps idle queues in hopes that they will be able to fetch
464 the dequeued item */
465 void pkgAcquire::Bump()
466 {
467 for (Queue *I = Queues; I != 0; I = I->Next)
468 I->Bump();
469 }
470 /*}}}*/
471 // Acquire::WorkerStep - Step to the next worker /*{{{*/
472 // ---------------------------------------------------------------------
473 /* Not inlined to advoid including acquire-worker.h */
474 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
475 {
476 return I->NextAcquire;
477 }
478 /*}}}*/
479 // Acquire::Clean - Cleans a directory /*{{{*/
480 // ---------------------------------------------------------------------
481 /* This is a bit simplistic, it looks at every file in the dir and sees
482 if it is part of the download set. */
483 bool pkgAcquire::Clean(string Dir)
484 {
485 // non-existing directories are by definition clean…
486 if (DirectoryExists(Dir) == false)
487 return true;
488
489 DIR *D = opendir(Dir.c_str());
490 if (D == 0)
491 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
492
493 string StartDir = SafeGetCWD();
494 if (chdir(Dir.c_str()) != 0)
495 {
496 closedir(D);
497 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
498 }
499
500 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
501 {
502 // Skip some files..
503 if (strcmp(Dir->d_name,"lock") == 0 ||
504 strcmp(Dir->d_name,"partial") == 0 ||
505 strcmp(Dir->d_name,".") == 0 ||
506 strcmp(Dir->d_name,"..") == 0)
507 continue;
508
509 // Look in the get list
510 ItemCIterator I = Items.begin();
511 for (; I != Items.end(); ++I)
512 if (flNotDir((*I)->DestFile) == Dir->d_name)
513 break;
514
515 // Nothing found, nuke it
516 if (I == Items.end())
517 unlink(Dir->d_name);
518 };
519
520 closedir(D);
521 if (chdir(StartDir.c_str()) != 0)
522 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
523 return true;
524 }
525 /*}}}*/
526 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
527 // ---------------------------------------------------------------------
528 /* This is the total number of bytes needed */
529 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
530 {
531 unsigned long long Total = 0;
532 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
533 Total += (*I)->FileSize;
534 return Total;
535 }
536 /*}}}*/
537 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
538 // ---------------------------------------------------------------------
539 /* This is the number of bytes that is not local */
540 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
541 {
542 unsigned long long Total = 0;
543 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
544 if ((*I)->Local == false)
545 Total += (*I)->FileSize;
546 return Total;
547 }
548 /*}}}*/
549 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
550 // ---------------------------------------------------------------------
551 /* This is the number of bytes that is not local */
552 APT_PURE unsigned long long pkgAcquire::PartialPresent()
553 {
554 unsigned long long Total = 0;
555 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
556 if ((*I)->Local == false)
557 Total += (*I)->PartialSize;
558 return Total;
559 }
560 /*}}}*/
561 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
562 // ---------------------------------------------------------------------
563 /* */
564 pkgAcquire::UriIterator pkgAcquire::UriBegin()
565 {
566 return UriIterator(Queues);
567 }
568 /*}}}*/
569 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
570 // ---------------------------------------------------------------------
571 /* */
572 pkgAcquire::UriIterator pkgAcquire::UriEnd()
573 {
574 return UriIterator(0);
575 }
576 /*}}}*/
577 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
578 // ---------------------------------------------------------------------
579 /* */
580 pkgAcquire::MethodConfig::MethodConfig()
581 {
582 SingleInstance = false;
583 Pipeline = false;
584 SendConfig = false;
585 LocalOnly = false;
586 Removable = false;
587 Next = 0;
588 }
589 /*}}}*/
590 // Queue::Queue - Constructor /*{{{*/
591 // ---------------------------------------------------------------------
592 /* */
593 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
594 Owner(Owner)
595 {
596 Items = 0;
597 Next = 0;
598 Workers = 0;
599 MaxPipeDepth = 1;
600 PipeDepth = 0;
601 }
602 /*}}}*/
603 // Queue::~Queue - Destructor /*{{{*/
604 // ---------------------------------------------------------------------
605 /* */
606 pkgAcquire::Queue::~Queue()
607 {
608 Shutdown(true);
609
610 while (Items != 0)
611 {
612 QItem *Jnk = Items;
613 Items = Items->Next;
614 delete Jnk;
615 }
616 }
617 /*}}}*/
618 // Queue::Enqueue - Queue an item to the queue /*{{{*/
619 // ---------------------------------------------------------------------
620 /* */
621 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
622 {
623 QItem **I = &Items;
624 // move to the end of the queue and check for duplicates here
625 for (; *I != 0; I = &(*I)->Next)
626 if (Item.URI == (*I)->URI)
627 {
628 Item.Owner->Status = Item::StatDone;
629 return false;
630 }
631
632 // Create a new item
633 QItem *Itm = new QItem;
634 *Itm = Item;
635 Itm->Next = 0;
636 *I = Itm;
637
638 Item.Owner->QueueCounter++;
639 if (Items->Next == 0)
640 Cycle();
641 return true;
642 }
643 /*}}}*/
644 // Queue::Dequeue - Remove an item from the queue /*{{{*/
645 // ---------------------------------------------------------------------
646 /* We return true if we hit something */
647 bool pkgAcquire::Queue::Dequeue(Item *Owner)
648 {
649 if (Owner->Status == pkgAcquire::Item::StatFetching)
650 return _error->Error("Tried to dequeue a fetching object");
651
652 bool Res = false;
653
654 QItem **I = &Items;
655 for (; *I != 0;)
656 {
657 if ((*I)->Owner == Owner)
658 {
659 QItem *Jnk= *I;
660 *I = (*I)->Next;
661 Owner->QueueCounter--;
662 delete Jnk;
663 Res = true;
664 }
665 else
666 I = &(*I)->Next;
667 }
668
669 return Res;
670 }
671 /*}}}*/
672 // Queue::Startup - Start the worker processes /*{{{*/
673 // ---------------------------------------------------------------------
674 /* It is possible for this to be called with a pre-existing set of
675 workers. */
676 bool pkgAcquire::Queue::Startup()
677 {
678 if (Workers == 0)
679 {
680 URI U(Name);
681 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
682 if (Cnf == 0)
683 return false;
684
685 Workers = new Worker(this,Cnf,Owner->Log);
686 Owner->Add(Workers);
687 if (Workers->Start() == false)
688 return false;
689
690 /* When pipelining we commit 10 items. This needs to change when we
691 added other source retry to have cycle maintain a pipeline depth
692 on its own. */
693 if (Cnf->Pipeline == true)
694 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
695 else
696 MaxPipeDepth = 1;
697 }
698
699 return Cycle();
700 }
701 /*}}}*/
702 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
703 // ---------------------------------------------------------------------
704 /* If final is true then all workers are eliminated, otherwise only workers
705 that do not need cleanup are removed */
706 bool pkgAcquire::Queue::Shutdown(bool Final)
707 {
708 // Delete all of the workers
709 pkgAcquire::Worker **Cur = &Workers;
710 while (*Cur != 0)
711 {
712 pkgAcquire::Worker *Jnk = *Cur;
713 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
714 {
715 *Cur = Jnk->NextQueue;
716 Owner->Remove(Jnk);
717 delete Jnk;
718 }
719 else
720 Cur = &(*Cur)->NextQueue;
721 }
722
723 return true;
724 }
725 /*}}}*/
726 // Queue::FindItem - Find a URI in the item list /*{{{*/
727 // ---------------------------------------------------------------------
728 /* */
729 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
730 {
731 for (QItem *I = Items; I != 0; I = I->Next)
732 if (I->URI == URI && I->Worker == Owner)
733 return I;
734 return 0;
735 }
736 /*}}}*/
737 // Queue::ItemDone - Item has been completed /*{{{*/
738 // ---------------------------------------------------------------------
739 /* The worker signals this which causes the item to be removed from the
740 queue. If this is the last queue instance then it is removed from the
741 main queue too.*/
742 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
743 {
744 PipeDepth--;
745 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
746 Itm->Owner->Status = pkgAcquire::Item::StatDone;
747
748 if (Itm->Owner->QueueCounter <= 1)
749 Owner->Dequeue(Itm->Owner);
750 else
751 {
752 Dequeue(Itm->Owner);
753 Owner->Bump();
754 }
755
756 return Cycle();
757 }
758 /*}}}*/
759 // Queue::Cycle - Queue new items into the method /*{{{*/
760 // ---------------------------------------------------------------------
761 /* This locates a new idle item and sends it to the worker. If pipelining
762 is enabled then it keeps the pipe full. */
763 bool pkgAcquire::Queue::Cycle()
764 {
765 if (Items == 0 || Workers == 0)
766 return true;
767
768 if (PipeDepth < 0)
769 return _error->Error("Pipedepth failure");
770
771 // Look for a queable item
772 QItem *I = Items;
773 while (PipeDepth < (signed)MaxPipeDepth)
774 {
775 for (; I != 0; I = I->Next)
776 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
777 break;
778
779 // Nothing to do, queue is idle.
780 if (I == 0)
781 return true;
782
783 I->Worker = Workers;
784 I->Owner->Status = pkgAcquire::Item::StatFetching;
785 PipeDepth++;
786 if (Workers->QueueItem(I) == false)
787 return false;
788 }
789
790 return true;
791 }
792 /*}}}*/
793 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
794 // ---------------------------------------------------------------------
795 /* This is called when an item in multiple queues is dequeued */
796 void pkgAcquire::Queue::Bump()
797 {
798 Cycle();
799 }
800 /*}}}*/
801 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
802 // ---------------------------------------------------------------------
803 /* */
804 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
805 {
806 Start();
807 }
808 /*}}}*/
809 // AcquireStatus::Pulse - Called periodically /*{{{*/
810 // ---------------------------------------------------------------------
811 /* This computes some internal state variables for the derived classes to
812 use. It generates the current downloaded bytes and total bytes to download
813 as well as the current CPS estimate. */
814 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
815 {
816 TotalBytes = 0;
817 CurrentBytes = 0;
818 TotalItems = 0;
819 CurrentItems = 0;
820
821 // Compute the total number of bytes to fetch
822 unsigned int Unknown = 0;
823 unsigned int Count = 0;
824 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
825 ++I, ++Count)
826 {
827 TotalItems++;
828 if ((*I)->Status == pkgAcquire::Item::StatDone)
829 ++CurrentItems;
830
831 // Totally ignore local items
832 if ((*I)->Local == true)
833 continue;
834
835 TotalBytes += (*I)->FileSize;
836 if ((*I)->Complete == true)
837 CurrentBytes += (*I)->FileSize;
838 if ((*I)->FileSize == 0 && (*I)->Complete == false)
839 ++Unknown;
840 }
841
842 // Compute the current completion
843 unsigned long long ResumeSize = 0;
844 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
845 I = Owner->WorkerStep(I))
846 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
847 {
848 CurrentBytes += I->CurrentSize;
849 ResumeSize += I->ResumePoint;
850
851 // Files with unknown size always have 100% completion
852 if (I->CurrentItem->Owner->FileSize == 0 &&
853 I->CurrentItem->Owner->Complete == false)
854 TotalBytes += I->CurrentSize;
855 }
856
857 // Normalize the figures and account for unknown size downloads
858 if (TotalBytes <= 0)
859 TotalBytes = 1;
860 if (Unknown == Count)
861 TotalBytes = Unknown;
862
863 // Wha?! Is not supposed to happen.
864 if (CurrentBytes > TotalBytes)
865 CurrentBytes = TotalBytes;
866
867 // Compute the CPS
868 struct timeval NewTime;
869 gettimeofday(&NewTime,0);
870 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
871 NewTime.tv_sec - Time.tv_sec > 6)
872 {
873 double Delta = NewTime.tv_sec - Time.tv_sec +
874 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
875
876 // Compute the CPS value
877 if (Delta < 0.01)
878 CurrentCPS = 0;
879 else
880 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
881 LastBytes = CurrentBytes - ResumeSize;
882 ElapsedTime = (unsigned long long)Delta;
883 Time = NewTime;
884 }
885
886 int fd = _config->FindI("APT::Status-Fd",-1);
887 if(fd > 0)
888 {
889 ostringstream status;
890
891 char msg[200];
892 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
893 unsigned long long ETA = 0;
894 if(CurrentCPS > 0)
895 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
896
897 // only show the ETA if it makes sense
898 if (ETA > 0 && ETA < 172800 /* two days */ )
899 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
900 else
901 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
902
903
904
905 // build the status str
906 status << "dlstatus:" << i
907 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
908 << ":" << msg
909 << endl;
910
911 std::string const dlstatus = status.str();
912 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
913 }
914
915 return true;
916 }
917 /*}}}*/
918 // AcquireStatus::Start - Called when the download is started /*{{{*/
919 // ---------------------------------------------------------------------
920 /* We just reset the counters */
921 void pkgAcquireStatus::Start()
922 {
923 gettimeofday(&Time,0);
924 gettimeofday(&StartTime,0);
925 LastBytes = 0;
926 CurrentCPS = 0;
927 CurrentBytes = 0;
928 TotalBytes = 0;
929 FetchedBytes = 0;
930 ElapsedTime = 0;
931 TotalItems = 0;
932 CurrentItems = 0;
933 }
934 /*}}}*/
935 // AcquireStatus::Stop - Finished downloading /*{{{*/
936 // ---------------------------------------------------------------------
937 /* This accurately computes the elapsed time and the total overall CPS. */
938 void pkgAcquireStatus::Stop()
939 {
940 // Compute the CPS and elapsed time
941 struct timeval NewTime;
942 gettimeofday(&NewTime,0);
943
944 double Delta = NewTime.tv_sec - StartTime.tv_sec +
945 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
946
947 // Compute the CPS value
948 if (Delta < 0.01)
949 CurrentCPS = 0;
950 else
951 CurrentCPS = FetchedBytes/Delta;
952 LastBytes = CurrentBytes;
953 ElapsedTime = (unsigned long long)Delta;
954 }
955 /*}}}*/
956 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
957 // ---------------------------------------------------------------------
958 /* This is used to get accurate final transfer rate reporting. */
959 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
960 {
961 FetchedBytes += Size - Resume;
962 }
963 /*}}}*/