]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Merge remote-tracking branch 'debian/debian/experimental' into feature/acq-trans
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40 #include <sys/stat.h>
41
42 #include <apti18n.h>
43 /*}}}*/
44
45 using namespace std;
46
47 // Acquire::pkgAcquire - Constructor /*{{{*/
48 // ---------------------------------------------------------------------
49 /* We grab some runtime state from the configuration space */
50 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
51 Debug(_config->FindB("Debug::pkgAcquire",false)),
52 Running(false)
53 {
54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
58 QueueMode = QueueAccess;
59 }
60 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
63 Running(false)
64 {
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71 }
72 /*}}}*/
73 // Acquire::Setup - Delayed Constructor /*{{{*/
74 // ---------------------------------------------------------------------
75 /* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
77 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
78 {
79 Log = Progress;
80
81 // check for existence and possibly create auxiliary directories
82 string const listDir = _config->FindDir("Dir::State::lists");
83 string const partialListDir = listDir + "partial/";
84 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
85 string const partialArchivesDir = archivesDir + "partial/";
86
87 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
88 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
89 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
90
91 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
92 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
93 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
94
95 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
96 return true;
97
98 // Lock the directory this acquire object will work in
99 LockFD = GetLock(flCombine(Lock, "lock"));
100 if (LockFD == -1)
101 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
102
103 return true;
104 }
105 /*}}}*/
106 // Acquire::~pkgAcquire - Destructor /*{{{*/
107 // ---------------------------------------------------------------------
108 /* Free our memory, clean up the queues (destroy the workers) */
109 pkgAcquire::~pkgAcquire()
110 {
111 Shutdown();
112
113 if (LockFD != -1)
114 close(LockFD);
115
116 while (Configs != 0)
117 {
118 MethodConfig *Jnk = Configs;
119 Configs = Configs->Next;
120 delete Jnk;
121 }
122 }
123 /*}}}*/
124 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
125 // ---------------------------------------------------------------------
126 /* */
127 void pkgAcquire::Shutdown()
128 {
129 while (Items.empty() == false)
130 {
131 if (Items[0]->Status == Item::StatFetching)
132 Items[0]->Status = Item::StatError;
133 delete Items[0];
134 }
135
136 while (Queues != 0)
137 {
138 Queue *Jnk = Queues;
139 Queues = Queues->Next;
140 delete Jnk;
141 }
142 }
143 /*}}}*/
144 // Acquire::Add - Add a new item /*{{{*/
145 // ---------------------------------------------------------------------
146 /* This puts an item on the acquire list. This list is mainly for tracking
147 item status */
148 void pkgAcquire::Add(Item *Itm)
149 {
150 Items.push_back(Itm);
151 }
152 /*}}}*/
153 // Acquire::Remove - Remove a item /*{{{*/
154 // ---------------------------------------------------------------------
155 /* Remove an item from the acquire list. This is usually not used.. */
156 void pkgAcquire::Remove(Item *Itm)
157 {
158 Dequeue(Itm);
159
160 for (ItemIterator I = Items.begin(); I != Items.end();)
161 {
162 if (*I == Itm)
163 {
164 Items.erase(I);
165 I = Items.begin();
166 }
167 else
168 ++I;
169 }
170 }
171 /*}}}*/
172 // Acquire::Add - Add a worker /*{{{*/
173 // ---------------------------------------------------------------------
174 /* A list of workers is kept so that the select loop can direct their FD
175 usage. */
176 void pkgAcquire::Add(Worker *Work)
177 {
178 Work->NextAcquire = Workers;
179 Workers = Work;
180 }
181 /*}}}*/
182 // Acquire::Remove - Remove a worker /*{{{*/
183 // ---------------------------------------------------------------------
184 /* A worker has died. This can not be done while the select loop is running
185 as it would require that RunFds could handling a changing list state and
186 it can't.. */
187 void pkgAcquire::Remove(Worker *Work)
188 {
189 if (Running == true)
190 abort();
191
192 Worker **I = &Workers;
193 for (; *I != 0;)
194 {
195 if (*I == Work)
196 *I = (*I)->NextAcquire;
197 else
198 I = &(*I)->NextAcquire;
199 }
200 }
201 /*}}}*/
202 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
203 // ---------------------------------------------------------------------
204 /* This is the entry point for an item. An item calls this function when
205 it is constructed which creates a queue (based on the current queue
206 mode) and puts the item in that queue. If the system is running then
207 the queue might be started. */
208 void pkgAcquire::Enqueue(ItemDesc &Item)
209 {
210 // Determine which queue to put the item in
211 const MethodConfig *Config;
212 string Name = QueueName(Item.URI,Config);
213 if (Name.empty() == true)
214 return;
215
216 // Find the queue structure
217 Queue *I = Queues;
218 for (; I != 0 && I->Name != Name; I = I->Next);
219 if (I == 0)
220 {
221 I = new Queue(Name,this);
222 I->Next = Queues;
223 Queues = I;
224
225 if (Running == true)
226 I->Startup();
227 }
228
229 // See if this is a local only URI
230 if (Config->LocalOnly == true && Item.Owner->Complete == false)
231 Item.Owner->Local = true;
232 Item.Owner->Status = Item::StatIdle;
233
234 // Queue it into the named queue
235 if(I->Enqueue(Item))
236 ToFetch++;
237
238 // Some trace stuff
239 if (Debug == true)
240 {
241 clog << "Fetching " << Item.URI << endl;
242 clog << " to " << Item.Owner->DestFile << endl;
243 clog << " Queue is: " << Name << endl;
244 }
245 }
246 /*}}}*/
247 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
248 // ---------------------------------------------------------------------
249 /* This is called when an item is finished being fetched. It removes it
250 from all the queues */
251 void pkgAcquire::Dequeue(Item *Itm)
252 {
253 Queue *I = Queues;
254 bool Res = false;
255 if (Debug == true)
256 clog << "Dequeuing " << Itm->DestFile << endl;
257
258 for (; I != 0; I = I->Next)
259 {
260 if (I->Dequeue(Itm))
261 {
262 Res = true;
263 if (Debug == true)
264 clog << "Dequeued from " << I->Name << endl;
265 }
266 }
267
268 if (Res == true)
269 ToFetch--;
270 }
271 /*}}}*/
272 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
273 // ---------------------------------------------------------------------
274 /* The string returned depends on the configuration settings and the
275 method parameters. Given something like http://foo.org/bar it can
276 return http://foo.org or http */
277 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
278 {
279 URI U(Uri);
280
281 Config = GetConfig(U.Access);
282 if (Config == 0)
283 return string();
284
285 /* Single-Instance methods get exactly one queue per URI. This is
286 also used for the Access queue method */
287 if (Config->SingleInstance == true || QueueMode == QueueAccess)
288 return U.Access;
289
290 string AccessSchema = U.Access + ':',
291 FullQueueName = AccessSchema + U.Host;
292 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
293
294 Queue *I = Queues;
295 for (; I != 0; I = I->Next) {
296 // if the queue already exists, re-use it
297 if (I->Name == FullQueueName)
298 return FullQueueName;
299
300 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
301 Instances++;
302 }
303
304 if (Debug) {
305 clog << "Found " << Instances << " instances of " << U.Access << endl;
306 }
307
308 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
309 return U.Access;
310
311 return FullQueueName;
312 }
313 /*}}}*/
314 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
315 // ---------------------------------------------------------------------
316 /* This locates the configuration structure for an access method. If
317 a config structure cannot be found a Worker will be created to
318 retrieve it */
319 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
320 {
321 // Search for an existing config
322 MethodConfig *Conf;
323 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
324 if (Conf->Access == Access)
325 return Conf;
326
327 // Create the new config class
328 Conf = new MethodConfig;
329 Conf->Access = Access;
330 Conf->Next = Configs;
331 Configs = Conf;
332
333 // Create the worker to fetch the configuration
334 Worker Work(Conf);
335 if (Work.Start() == false)
336 return 0;
337
338 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
339 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
340 Conf->SingleInstance = true;
341
342 return Conf;
343 }
344 /*}}}*/
345 // Acquire::SetFds - Deal with readable FDs /*{{{*/
346 // ---------------------------------------------------------------------
347 /* Collect FDs that have activity monitors into the fd sets */
348 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
349 {
350 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
351 {
352 if (I->InReady == true && I->InFd >= 0)
353 {
354 if (Fd < I->InFd)
355 Fd = I->InFd;
356 FD_SET(I->InFd,RSet);
357 }
358 if (I->OutReady == true && I->OutFd >= 0)
359 {
360 if (Fd < I->OutFd)
361 Fd = I->OutFd;
362 FD_SET(I->OutFd,WSet);
363 }
364 }
365 }
366 /*}}}*/
367 // Acquire::RunFds - Deal with active FDs /*{{{*/
368 // ---------------------------------------------------------------------
369 /* Dispatch active FDs over to the proper workers. It is very important
370 that a worker never be erased while this is running! The queue class
371 should never erase a worker except during shutdown processing. */
372 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
373 {
374 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
375 {
376 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
377 I->InFdReady();
378 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
379 I->OutFdReady();
380 }
381 }
382 /*}}}*/
383 // Acquire::Run - Run the fetch sequence /*{{{*/
384 // ---------------------------------------------------------------------
385 /* This runs the queues. It manages a select loop for all of the
386 Worker tasks. The workers interact with the queues and items to
387 manage the actual fetch. */
388 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
389 {
390 Running = true;
391
392 for (Queue *I = Queues; I != 0; I = I->Next)
393 I->Startup();
394
395 if (Log != 0)
396 Log->Start();
397
398 bool WasCancelled = false;
399
400 // Run till all things have been acquired
401 struct timeval tv;
402 tv.tv_sec = 0;
403 tv.tv_usec = PulseIntervall;
404 while (ToFetch > 0)
405 {
406 fd_set RFds;
407 fd_set WFds;
408 int Highest = 0;
409 FD_ZERO(&RFds);
410 FD_ZERO(&WFds);
411 SetFds(Highest,&RFds,&WFds);
412
413 int Res;
414 do
415 {
416 Res = select(Highest+1,&RFds,&WFds,0,&tv);
417 }
418 while (Res < 0 && errno == EINTR);
419
420 if (Res < 0)
421 {
422 _error->Errno("select","Select has failed");
423 break;
424 }
425
426 RunFds(&RFds,&WFds);
427 if (_error->PendingError() == true)
428 break;
429
430 // Timeout, notify the log class
431 if (Res == 0 || (Log != 0 && Log->Update == true))
432 {
433 tv.tv_usec = PulseIntervall;
434 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
435 I->Pulse();
436 if (Log != 0 && Log->Pulse(this) == false)
437 {
438 WasCancelled = true;
439 break;
440 }
441 }
442 }
443
444 if (Log != 0)
445 Log->Stop();
446
447 // Shut down the acquire bits
448 Running = false;
449 for (Queue *I = Queues; I != 0; I = I->Next)
450 I->Shutdown(false);
451
452 // Shut down the items
453 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
454 (*I)->Finished();
455
456 if (_error->PendingError())
457 return Failed;
458 if (WasCancelled)
459 return Cancelled;
460 return Continue;
461 }
462 /*}}}*/
463 // Acquire::Bump - Called when an item is dequeued /*{{{*/
464 // ---------------------------------------------------------------------
465 /* This routine bumps idle queues in hopes that they will be able to fetch
466 the dequeued item */
467 void pkgAcquire::Bump()
468 {
469 for (Queue *I = Queues; I != 0; I = I->Next)
470 I->Bump();
471 }
472 /*}}}*/
473 // Acquire::WorkerStep - Step to the next worker /*{{{*/
474 // ---------------------------------------------------------------------
475 /* Not inlined to advoid including acquire-worker.h */
476 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
477 {
478 return I->NextAcquire;
479 }
480 /*}}}*/
481 // Acquire::Clean - Cleans a directory /*{{{*/
482 // ---------------------------------------------------------------------
483 /* This is a bit simplistic, it looks at every file in the dir and sees
484 if it is part of the download set. */
485 bool pkgAcquire::Clean(string Dir)
486 {
487 // non-existing directories are by definition clean…
488 if (DirectoryExists(Dir) == false)
489 return true;
490
491 if(Dir == "/")
492 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
493
494 DIR *D = opendir(Dir.c_str());
495 if (D == 0)
496 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
497
498 string StartDir = SafeGetCWD();
499 if (chdir(Dir.c_str()) != 0)
500 {
501 closedir(D);
502 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
503 }
504
505 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
506 {
507 // Skip some files..
508 if (strcmp(Dir->d_name,"lock") == 0 ||
509 strcmp(Dir->d_name,"partial") == 0 ||
510 strcmp(Dir->d_name,".") == 0 ||
511 strcmp(Dir->d_name,"..") == 0)
512 continue;
513
514 // Look in the get list
515 ItemCIterator I = Items.begin();
516 for (; I != Items.end(); ++I)
517 if (flNotDir((*I)->DestFile) == Dir->d_name)
518 break;
519
520 // Nothing found, nuke it
521 if (I == Items.end())
522 unlink(Dir->d_name);
523 };
524
525 closedir(D);
526 if (chdir(StartDir.c_str()) != 0)
527 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
528 return true;
529 }
530 /*}}}*/
531 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
532 // ---------------------------------------------------------------------
533 /* This is the total number of bytes needed */
534 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
535 {
536 unsigned long long Total = 0;
537 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
538 Total += (*I)->FileSize;
539 return Total;
540 }
541 /*}}}*/
542 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
543 // ---------------------------------------------------------------------
544 /* This is the number of bytes that is not local */
545 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
546 {
547 unsigned long long Total = 0;
548 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
549 if ((*I)->Local == false)
550 Total += (*I)->FileSize;
551 return Total;
552 }
553 /*}}}*/
554 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
555 // ---------------------------------------------------------------------
556 /* This is the number of bytes that is not local */
557 APT_PURE unsigned long long pkgAcquire::PartialPresent()
558 {
559 unsigned long long Total = 0;
560 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
561 if ((*I)->Local == false)
562 Total += (*I)->PartialSize;
563 return Total;
564 }
565 /*}}}*/
566 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
567 // ---------------------------------------------------------------------
568 /* */
569 pkgAcquire::UriIterator pkgAcquire::UriBegin()
570 {
571 return UriIterator(Queues);
572 }
573 /*}}}*/
574 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
575 // ---------------------------------------------------------------------
576 /* */
577 pkgAcquire::UriIterator pkgAcquire::UriEnd()
578 {
579 return UriIterator(0);
580 }
581 /*}}}*/
582 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
583 // ---------------------------------------------------------------------
584 /* */
585 pkgAcquire::MethodConfig::MethodConfig()
586 {
587 SingleInstance = false;
588 Pipeline = false;
589 SendConfig = false;
590 LocalOnly = false;
591 Removable = false;
592 Next = 0;
593 }
594 /*}}}*/
595 // Queue::Queue - Constructor /*{{{*/
596 // ---------------------------------------------------------------------
597 /* */
598 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
599 Owner(Owner)
600 {
601 Items = 0;
602 Next = 0;
603 Workers = 0;
604 MaxPipeDepth = 1;
605 PipeDepth = 0;
606 }
607 /*}}}*/
608 // Queue::~Queue - Destructor /*{{{*/
609 // ---------------------------------------------------------------------
610 /* */
611 pkgAcquire::Queue::~Queue()
612 {
613 Shutdown(true);
614
615 while (Items != 0)
616 {
617 QItem *Jnk = Items;
618 Items = Items->Next;
619 delete Jnk;
620 }
621 }
622 /*}}}*/
623 // Queue::Enqueue - Queue an item to the queue /*{{{*/
624 // ---------------------------------------------------------------------
625 /* */
626 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
627 {
628 QItem **I = &Items;
629 // move to the end of the queue and check for duplicates here
630 for (; *I != 0; I = &(*I)->Next)
631 if (Item.URI == (*I)->URI)
632 {
633 Item.Owner->Status = Item::StatDone;
634 return false;
635 }
636
637 // Create a new item
638 QItem *Itm = new QItem;
639 *Itm = Item;
640 Itm->Next = 0;
641 *I = Itm;
642
643 Item.Owner->QueueCounter++;
644 if (Items->Next == 0)
645 Cycle();
646 return true;
647 }
648 /*}}}*/
649 // Queue::Dequeue - Remove an item from the queue /*{{{*/
650 // ---------------------------------------------------------------------
651 /* We return true if we hit something */
652 bool pkgAcquire::Queue::Dequeue(Item *Owner)
653 {
654 if (Owner->Status == pkgAcquire::Item::StatFetching)
655 return _error->Error("Tried to dequeue a fetching object");
656
657 bool Res = false;
658
659 QItem **I = &Items;
660 for (; *I != 0;)
661 {
662 if ((*I)->Owner == Owner)
663 {
664 QItem *Jnk= *I;
665 *I = (*I)->Next;
666 Owner->QueueCounter--;
667 delete Jnk;
668 Res = true;
669 }
670 else
671 I = &(*I)->Next;
672 }
673
674 return Res;
675 }
676 /*}}}*/
677 // Queue::Startup - Start the worker processes /*{{{*/
678 // ---------------------------------------------------------------------
679 /* It is possible for this to be called with a pre-existing set of
680 workers. */
681 bool pkgAcquire::Queue::Startup()
682 {
683 if (Workers == 0)
684 {
685 URI U(Name);
686 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
687 if (Cnf == 0)
688 return false;
689
690 Workers = new Worker(this,Cnf,Owner->Log);
691 Owner->Add(Workers);
692 if (Workers->Start() == false)
693 return false;
694
695 /* When pipelining we commit 10 items. This needs to change when we
696 added other source retry to have cycle maintain a pipeline depth
697 on its own. */
698 if (Cnf->Pipeline == true)
699 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
700 else
701 MaxPipeDepth = 1;
702 }
703
704 return Cycle();
705 }
706 /*}}}*/
707 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
708 // ---------------------------------------------------------------------
709 /* If final is true then all workers are eliminated, otherwise only workers
710 that do not need cleanup are removed */
711 bool pkgAcquire::Queue::Shutdown(bool Final)
712 {
713 // Delete all of the workers
714 pkgAcquire::Worker **Cur = &Workers;
715 while (*Cur != 0)
716 {
717 pkgAcquire::Worker *Jnk = *Cur;
718 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
719 {
720 *Cur = Jnk->NextQueue;
721 Owner->Remove(Jnk);
722 delete Jnk;
723 }
724 else
725 Cur = &(*Cur)->NextQueue;
726 }
727
728 return true;
729 }
730 /*}}}*/
731 // Queue::FindItem - Find a URI in the item list /*{{{*/
732 // ---------------------------------------------------------------------
733 /* */
734 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
735 {
736 for (QItem *I = Items; I != 0; I = I->Next)
737 if (I->URI == URI && I->Worker == Owner)
738 return I;
739 return 0;
740 }
741 /*}}}*/
742 // Queue::ItemDone - Item has been completed /*{{{*/
743 // ---------------------------------------------------------------------
744 /* The worker signals this which causes the item to be removed from the
745 queue. If this is the last queue instance then it is removed from the
746 main queue too.*/
747 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
748 {
749 PipeDepth--;
750 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
751 Itm->Owner->Status = pkgAcquire::Item::StatDone;
752
753 if (Itm->Owner->QueueCounter <= 1)
754 Owner->Dequeue(Itm->Owner);
755 else
756 {
757 Dequeue(Itm->Owner);
758 Owner->Bump();
759 }
760
761 return Cycle();
762 }
763 /*}}}*/
764 // Queue::Cycle - Queue new items into the method /*{{{*/
765 // ---------------------------------------------------------------------
766 /* This locates a new idle item and sends it to the worker. If pipelining
767 is enabled then it keeps the pipe full. */
768 bool pkgAcquire::Queue::Cycle()
769 {
770 if (Items == 0 || Workers == 0)
771 return true;
772
773 if (PipeDepth < 0)
774 return _error->Error("Pipedepth failure");
775
776 // Look for a queable item
777 QItem *I = Items;
778 while (PipeDepth < (signed)MaxPipeDepth)
779 {
780 for (; I != 0; I = I->Next)
781 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
782 break;
783
784 // Nothing to do, queue is idle.
785 if (I == 0)
786 return true;
787
788 I->Worker = Workers;
789 I->Owner->Status = pkgAcquire::Item::StatFetching;
790 PipeDepth++;
791 if (Workers->QueueItem(I) == false)
792 return false;
793 }
794
795 return true;
796 }
797 /*}}}*/
798 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
799 // ---------------------------------------------------------------------
800 /* This is called when an item in multiple queues is dequeued */
801 void pkgAcquire::Queue::Bump()
802 {
803 Cycle();
804 }
805 /*}}}*/
806 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
807 // ---------------------------------------------------------------------
808 /* */
809 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
810 {
811 Start();
812 }
813 /*}}}*/
814 // AcquireStatus::Pulse - Called periodically /*{{{*/
815 // ---------------------------------------------------------------------
816 /* This computes some internal state variables for the derived classes to
817 use. It generates the current downloaded bytes and total bytes to download
818 as well as the current CPS estimate. */
819 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
820 {
821 TotalBytes = 0;
822 CurrentBytes = 0;
823 TotalItems = 0;
824 CurrentItems = 0;
825
826 // Compute the total number of bytes to fetch
827 unsigned int Unknown = 0;
828 unsigned int Count = 0;
829 bool UnfetchedReleaseFiles = false;
830 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
831 I != Owner->ItemsEnd();
832 ++I, ++Count)
833 {
834 TotalItems++;
835 if ((*I)->Status == pkgAcquire::Item::StatDone)
836 ++CurrentItems;
837
838 // Totally ignore local items
839 if ((*I)->Local == true)
840 continue;
841
842 // see if the method tells us to expect more
843 TotalItems += (*I)->ExpectedAdditionalItems;
844
845 // check if there are unfetched Release files
846 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
847 UnfetchedReleaseFiles = true;
848
849 TotalBytes += (*I)->FileSize;
850 if ((*I)->Complete == true)
851 CurrentBytes += (*I)->FileSize;
852 if ((*I)->FileSize == 0 && (*I)->Complete == false)
853 ++Unknown;
854 }
855
856 // Compute the current completion
857 unsigned long long ResumeSize = 0;
858 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
859 I = Owner->WorkerStep(I))
860 {
861 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
862 {
863 CurrentBytes += I->CurrentSize;
864 ResumeSize += I->ResumePoint;
865
866 // Files with unknown size always have 100% completion
867 if (I->CurrentItem->Owner->FileSize == 0 &&
868 I->CurrentItem->Owner->Complete == false)
869 TotalBytes += I->CurrentSize;
870 }
871 }
872
873 // Normalize the figures and account for unknown size downloads
874 if (TotalBytes <= 0)
875 TotalBytes = 1;
876 if (Unknown == Count)
877 TotalBytes = Unknown;
878
879 // Wha?! Is not supposed to happen.
880 if (CurrentBytes > TotalBytes)
881 CurrentBytes = TotalBytes;
882
883 // debug
884 if (_config->FindB("Debug::acquire::progress", false) == true)
885 std::clog << " Bytes: "
886 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
887 << std::endl;
888
889 // Compute the CPS
890 struct timeval NewTime;
891 gettimeofday(&NewTime,0);
892 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
893 NewTime.tv_sec - Time.tv_sec > 6)
894 {
895 double Delta = NewTime.tv_sec - Time.tv_sec +
896 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
897
898 // Compute the CPS value
899 if (Delta < 0.01)
900 CurrentCPS = 0;
901 else
902 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
903 LastBytes = CurrentBytes - ResumeSize;
904 ElapsedTime = (unsigned long long)Delta;
905 Time = NewTime;
906 }
907
908 // calculate the percentage, if we have too little data assume 1%
909 if (TotalBytes > 0 && UnfetchedReleaseFiles)
910 Percent = 0;
911 else
912 // use both files and bytes because bytes can be unreliable
913 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
914 0.2 * (CurrentItems/float(TotalItems)*100.0));
915
916 int fd = _config->FindI("APT::Status-Fd",-1);
917 if(fd > 0)
918 {
919 ostringstream status;
920
921 char msg[200];
922 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
923 unsigned long long ETA = 0;
924 if(CurrentCPS > 0)
925 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
926
927 // only show the ETA if it makes sense
928 if (ETA > 0 && ETA < 172800 /* two days */ )
929 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
930 else
931 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
932
933 // build the status str
934 status << "dlstatus:" << i
935 << ":" << std::setprecision(3) << Percent
936 << ":" << msg
937 << endl;
938
939 std::string const dlstatus = status.str();
940 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
941 }
942
943 return true;
944 }
945 /*}}}*/
946 // AcquireStatus::Start - Called when the download is started /*{{{*/
947 // ---------------------------------------------------------------------
948 /* We just reset the counters */
949 void pkgAcquireStatus::Start()
950 {
951 gettimeofday(&Time,0);
952 gettimeofday(&StartTime,0);
953 LastBytes = 0;
954 CurrentCPS = 0;
955 CurrentBytes = 0;
956 TotalBytes = 0;
957 FetchedBytes = 0;
958 ElapsedTime = 0;
959 TotalItems = 0;
960 CurrentItems = 0;
961 }
962 /*}}}*/
963 // AcquireStatus::Stop - Finished downloading /*{{{*/
964 // ---------------------------------------------------------------------
965 /* This accurately computes the elapsed time and the total overall CPS. */
966 void pkgAcquireStatus::Stop()
967 {
968 // Compute the CPS and elapsed time
969 struct timeval NewTime;
970 gettimeofday(&NewTime,0);
971
972 double Delta = NewTime.tv_sec - StartTime.tv_sec +
973 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
974
975 // Compute the CPS value
976 if (Delta < 0.01)
977 CurrentCPS = 0;
978 else
979 CurrentCPS = FetchedBytes/Delta;
980 LastBytes = CurrentBytes;
981 ElapsedTime = (unsigned long long)Delta;
982 }
983 /*}}}*/
984 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
985 // ---------------------------------------------------------------------
986 /* This is used to get accurate final transfer rate reporting. */
987 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
988 {
989 FetchedBytes += Size - Resume;
990 }
991 /*}}}*/