]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
replace c-string Mode with c++-string ActiveSubprocess
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40
41 #include <apti18n.h>
42 /*}}}*/
43
44 using namespace std;
45
46 // Acquire::pkgAcquire - Constructor /*{{{*/
47 // ---------------------------------------------------------------------
48 /* We grab some runtime state from the configuration space */
49 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
50 Debug(_config->FindB("Debug::pkgAcquire",false)),
51 Running(false)
52 {
53 string const Mode = _config->Find("Acquire::Queue-Mode","host");
54 if (strcasecmp(Mode.c_str(),"host") == 0)
55 QueueMode = QueueHost;
56 if (strcasecmp(Mode.c_str(),"access") == 0)
57 QueueMode = QueueAccess;
58 }
59 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
60 Configs(0), Log(Progress), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
62 Running(false)
63 {
64 string const Mode = _config->Find("Acquire::Queue-Mode","host");
65 if (strcasecmp(Mode.c_str(),"host") == 0)
66 QueueMode = QueueHost;
67 if (strcasecmp(Mode.c_str(),"access") == 0)
68 QueueMode = QueueAccess;
69 Setup(Progress, "");
70 }
71 /*}}}*/
72 // Acquire::Setup - Delayed Constructor /*{{{*/
73 // ---------------------------------------------------------------------
74 /* Do everything needed to be a complete Acquire object and report the
75 success (or failure) back so the user knows that something is wrong… */
76 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock,
77 bool const createDirectories)
78 {
79 Log = Progress;
80
81 // check for existence and possibly create auxiliary directories
82 if (createDirectories == true)
83 {
84 string const listDir = _config->FindDir("Dir::State::lists");
85 string const partialListDir = listDir + "partial/";
86 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
87 string const partialArchivesDir = archivesDir + "partial/";
88
89 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
90 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
91 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
92
93 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
94 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
95 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
96 }
97
98 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
99 return true;
100
101 // Lock the directory this acquire object will work in
102 LockFD = GetLock(flCombine(Lock, "lock"));
103 if (LockFD == -1)
104 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
105
106 return true;
107 }
108 /*}}}*/
109 // Acquire::~pkgAcquire - Destructor /*{{{*/
110 // ---------------------------------------------------------------------
111 /* Free our memory, clean up the queues (destroy the workers) */
112 pkgAcquire::~pkgAcquire()
113 {
114 Shutdown();
115
116 if (LockFD != -1)
117 close(LockFD);
118
119 while (Configs != 0)
120 {
121 MethodConfig *Jnk = Configs;
122 Configs = Configs->Next;
123 delete Jnk;
124 }
125 }
126 /*}}}*/
127 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
128 // ---------------------------------------------------------------------
129 /* */
130 void pkgAcquire::Shutdown()
131 {
132 while (Items.empty() == false)
133 {
134 if (Items[0]->Status == Item::StatFetching)
135 Items[0]->Status = Item::StatError;
136 delete Items[0];
137 }
138
139 while (Queues != 0)
140 {
141 Queue *Jnk = Queues;
142 Queues = Queues->Next;
143 delete Jnk;
144 }
145 }
146 /*}}}*/
147 // Acquire::Add - Add a new item /*{{{*/
148 // ---------------------------------------------------------------------
149 /* This puts an item on the acquire list. This list is mainly for tracking
150 item status */
151 void pkgAcquire::Add(Item *Itm)
152 {
153 Items.push_back(Itm);
154 }
155 /*}}}*/
156 // Acquire::Remove - Remove a item /*{{{*/
157 // ---------------------------------------------------------------------
158 /* Remove an item from the acquire list. This is usually not used.. */
159 void pkgAcquire::Remove(Item *Itm)
160 {
161 Dequeue(Itm);
162
163 for (ItemIterator I = Items.begin(); I != Items.end();)
164 {
165 if (*I == Itm)
166 {
167 Items.erase(I);
168 I = Items.begin();
169 }
170 else
171 ++I;
172 }
173 }
174 /*}}}*/
175 // Acquire::Add - Add a worker /*{{{*/
176 // ---------------------------------------------------------------------
177 /* A list of workers is kept so that the select loop can direct their FD
178 usage. */
179 void pkgAcquire::Add(Worker *Work)
180 {
181 Work->NextAcquire = Workers;
182 Workers = Work;
183 }
184 /*}}}*/
185 // Acquire::Remove - Remove a worker /*{{{*/
186 // ---------------------------------------------------------------------
187 /* A worker has died. This can not be done while the select loop is running
188 as it would require that RunFds could handling a changing list state and
189 it can't.. */
190 void pkgAcquire::Remove(Worker *Work)
191 {
192 if (Running == true)
193 abort();
194
195 Worker **I = &Workers;
196 for (; *I != 0;)
197 {
198 if (*I == Work)
199 *I = (*I)->NextAcquire;
200 else
201 I = &(*I)->NextAcquire;
202 }
203 }
204 /*}}}*/
205 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
206 // ---------------------------------------------------------------------
207 /* This is the entry point for an item. An item calls this function when
208 it is constructed which creates a queue (based on the current queue
209 mode) and puts the item in that queue. If the system is running then
210 the queue might be started. */
211 void pkgAcquire::Enqueue(ItemDesc &Item)
212 {
213 // Determine which queue to put the item in
214 const MethodConfig *Config;
215 string Name = QueueName(Item.URI,Config);
216 if (Name.empty() == true)
217 return;
218
219 // Find the queue structure
220 Queue *I = Queues;
221 for (; I != 0 && I->Name != Name; I = I->Next);
222 if (I == 0)
223 {
224 I = new Queue(Name,this);
225 I->Next = Queues;
226 Queues = I;
227
228 if (Running == true)
229 I->Startup();
230 }
231
232 // See if this is a local only URI
233 if (Config->LocalOnly == true && Item.Owner->Complete == false)
234 Item.Owner->Local = true;
235 Item.Owner->Status = Item::StatIdle;
236
237 // Queue it into the named queue
238 if(I->Enqueue(Item))
239 ToFetch++;
240
241 // Some trace stuff
242 if (Debug == true)
243 {
244 clog << "Fetching " << Item.URI << endl;
245 clog << " to " << Item.Owner->DestFile << endl;
246 clog << " Queue is: " << Name << endl;
247 }
248 }
249 /*}}}*/
250 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
251 // ---------------------------------------------------------------------
252 /* This is called when an item is finished being fetched. It removes it
253 from all the queues */
254 void pkgAcquire::Dequeue(Item *Itm)
255 {
256 Queue *I = Queues;
257 bool Res = false;
258 if (Debug == true)
259 clog << "Dequeuing " << Itm->DestFile << endl;
260
261 for (; I != 0; I = I->Next)
262 {
263 if (I->Dequeue(Itm))
264 {
265 Res = true;
266 if (Debug == true)
267 clog << "Dequeued from " << I->Name << endl;
268 }
269 }
270
271 if (Res == true)
272 ToFetch--;
273 }
274 /*}}}*/
275 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
276 // ---------------------------------------------------------------------
277 /* The string returned depends on the configuration settings and the
278 method parameters. Given something like http://foo.org/bar it can
279 return http://foo.org or http */
280 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
281 {
282 URI U(Uri);
283
284 Config = GetConfig(U.Access);
285 if (Config == 0)
286 return string();
287
288 /* Single-Instance methods get exactly one queue per URI. This is
289 also used for the Access queue method */
290 if (Config->SingleInstance == true || QueueMode == QueueAccess)
291 return U.Access;
292
293 string AccessSchema = U.Access + ':',
294 FullQueueName = AccessSchema + U.Host;
295 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
296
297 Queue *I = Queues;
298 for (; I != 0; I = I->Next) {
299 // if the queue already exists, re-use it
300 if (I->Name == FullQueueName)
301 return FullQueueName;
302
303 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
304 Instances++;
305 }
306
307 if (Debug) {
308 clog << "Found " << Instances << " instances of " << U.Access << endl;
309 }
310
311 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
312 return U.Access;
313
314 return FullQueueName;
315 }
316 /*}}}*/
317 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
318 // ---------------------------------------------------------------------
319 /* This locates the configuration structure for an access method. If
320 a config structure cannot be found a Worker will be created to
321 retrieve it */
322 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
323 {
324 // Search for an existing config
325 MethodConfig *Conf;
326 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
327 if (Conf->Access == Access)
328 return Conf;
329
330 // Create the new config class
331 Conf = new MethodConfig;
332 Conf->Access = Access;
333 Conf->Next = Configs;
334 Configs = Conf;
335
336 // Create the worker to fetch the configuration
337 Worker Work(Conf);
338 if (Work.Start() == false)
339 return 0;
340
341 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
342 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
343 Conf->SingleInstance = true;
344
345 return Conf;
346 }
347 /*}}}*/
348 // Acquire::SetFds - Deal with readable FDs /*{{{*/
349 // ---------------------------------------------------------------------
350 /* Collect FDs that have activity monitors into the fd sets */
351 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
352 {
353 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
354 {
355 if (I->InReady == true && I->InFd >= 0)
356 {
357 if (Fd < I->InFd)
358 Fd = I->InFd;
359 FD_SET(I->InFd,RSet);
360 }
361 if (I->OutReady == true && I->OutFd >= 0)
362 {
363 if (Fd < I->OutFd)
364 Fd = I->OutFd;
365 FD_SET(I->OutFd,WSet);
366 }
367 }
368 }
369 /*}}}*/
370 // Acquire::RunFds - Deal with active FDs /*{{{*/
371 // ---------------------------------------------------------------------
372 /* Dispatch active FDs over to the proper workers. It is very important
373 that a worker never be erased while this is running! The queue class
374 should never erase a worker except during shutdown processing. */
375 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
376 {
377 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
378 {
379 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
380 I->InFdReady();
381 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
382 I->OutFdReady();
383 }
384 }
385 /*}}}*/
386 // Acquire::Run - Run the fetch sequence /*{{{*/
387 // ---------------------------------------------------------------------
388 /* This runs the queues. It manages a select loop for all of the
389 Worker tasks. The workers interact with the queues and items to
390 manage the actual fetch. */
391 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
392 {
393 Running = true;
394
395 for (Queue *I = Queues; I != 0; I = I->Next)
396 I->Startup();
397
398 if (Log != 0)
399 Log->Start();
400
401 bool WasCancelled = false;
402
403 // Run till all things have been acquired
404 struct timeval tv;
405 tv.tv_sec = 0;
406 tv.tv_usec = PulseIntervall;
407 while (ToFetch > 0)
408 {
409 fd_set RFds;
410 fd_set WFds;
411 int Highest = 0;
412 FD_ZERO(&RFds);
413 FD_ZERO(&WFds);
414 SetFds(Highest,&RFds,&WFds);
415
416 int Res;
417 do
418 {
419 Res = select(Highest+1,&RFds,&WFds,0,&tv);
420 }
421 while (Res < 0 && errno == EINTR);
422
423 if (Res < 0)
424 {
425 _error->Errno("select","Select has failed");
426 break;
427 }
428
429 RunFds(&RFds,&WFds);
430 if (_error->PendingError() == true)
431 break;
432
433 // Timeout, notify the log class
434 if (Res == 0 || (Log != 0 && Log->Update == true))
435 {
436 tv.tv_usec = PulseIntervall;
437 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
438 I->Pulse();
439 if (Log != 0 && Log->Pulse(this) == false)
440 {
441 WasCancelled = true;
442 break;
443 }
444 }
445 }
446
447 if (Log != 0)
448 Log->Stop();
449
450 // Shut down the acquire bits
451 Running = false;
452 for (Queue *I = Queues; I != 0; I = I->Next)
453 I->Shutdown(false);
454
455 // Shut down the items
456 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
457 (*I)->Finished();
458
459 if (_error->PendingError())
460 return Failed;
461 if (WasCancelled)
462 return Cancelled;
463 return Continue;
464 }
465 /*}}}*/
466 // Acquire::Bump - Called when an item is dequeued /*{{{*/
467 // ---------------------------------------------------------------------
468 /* This routine bumps idle queues in hopes that they will be able to fetch
469 the dequeued item */
470 void pkgAcquire::Bump()
471 {
472 for (Queue *I = Queues; I != 0; I = I->Next)
473 I->Bump();
474 }
475 /*}}}*/
476 // Acquire::WorkerStep - Step to the next worker /*{{{*/
477 // ---------------------------------------------------------------------
478 /* Not inlined to advoid including acquire-worker.h */
479 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
480 {
481 return I->NextAcquire;
482 }
483 /*}}}*/
484 // Acquire::Clean - Cleans a directory /*{{{*/
485 // ---------------------------------------------------------------------
486 /* This is a bit simplistic, it looks at every file in the dir and sees
487 if it is part of the download set. */
488 bool pkgAcquire::Clean(string Dir)
489 {
490 // non-existing directories are by definition clean…
491 if (DirectoryExists(Dir) == false)
492 return true;
493
494 if(Dir == "/")
495 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
496
497 DIR *D = opendir(Dir.c_str());
498 if (D == 0)
499 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
500
501 string StartDir = SafeGetCWD();
502 if (chdir(Dir.c_str()) != 0)
503 {
504 closedir(D);
505 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
506 }
507
508 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
509 {
510 // Skip some files..
511 if (strcmp(Dir->d_name,"lock") == 0 ||
512 strcmp(Dir->d_name,"partial") == 0 ||
513 strcmp(Dir->d_name,".") == 0 ||
514 strcmp(Dir->d_name,"..") == 0)
515 continue;
516
517 // Look in the get list
518 ItemCIterator I = Items.begin();
519 for (; I != Items.end(); ++I)
520 if (flNotDir((*I)->DestFile) == Dir->d_name)
521 break;
522
523 // Nothing found, nuke it
524 if (I == Items.end())
525 unlink(Dir->d_name);
526 };
527
528 closedir(D);
529 if (chdir(StartDir.c_str()) != 0)
530 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
531 return true;
532 }
533 /*}}}*/
534 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
535 // ---------------------------------------------------------------------
536 /* This is the total number of bytes needed */
537 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
538 {
539 unsigned long long Total = 0;
540 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
541 Total += (*I)->FileSize;
542 return Total;
543 }
544 /*}}}*/
545 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
546 // ---------------------------------------------------------------------
547 /* This is the number of bytes that is not local */
548 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
549 {
550 unsigned long long Total = 0;
551 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
552 if ((*I)->Local == false)
553 Total += (*I)->FileSize;
554 return Total;
555 }
556 /*}}}*/
557 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
558 // ---------------------------------------------------------------------
559 /* This is the number of bytes that is not local */
560 APT_PURE unsigned long long pkgAcquire::PartialPresent()
561 {
562 unsigned long long Total = 0;
563 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
564 if ((*I)->Local == false)
565 Total += (*I)->PartialSize;
566 return Total;
567 }
568 /*}}}*/
569 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
570 // ---------------------------------------------------------------------
571 /* */
572 pkgAcquire::UriIterator pkgAcquire::UriBegin()
573 {
574 return UriIterator(Queues);
575 }
576 /*}}}*/
577 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
578 // ---------------------------------------------------------------------
579 /* */
580 pkgAcquire::UriIterator pkgAcquire::UriEnd()
581 {
582 return UriIterator(0);
583 }
584 /*}}}*/
585 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
586 // ---------------------------------------------------------------------
587 /* */
588 pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
589 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
590 Removable(false)
591 {
592 }
593 /*}}}*/
594 // Queue::Queue - Constructor /*{{{*/
595 // ---------------------------------------------------------------------
596 /* */
597 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
598 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
599 {
600 }
601 /*}}}*/
602 // Queue::~Queue - Destructor /*{{{*/
603 // ---------------------------------------------------------------------
604 /* */
605 pkgAcquire::Queue::~Queue()
606 {
607 Shutdown(true);
608
609 while (Items != 0)
610 {
611 QItem *Jnk = Items;
612 Items = Items->Next;
613 delete Jnk;
614 }
615 }
616 /*}}}*/
617 // Queue::Enqueue - Queue an item to the queue /*{{{*/
618 // ---------------------------------------------------------------------
619 /* */
620 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
621 {
622 QItem **I = &Items;
623 // move to the end of the queue and check for duplicates here
624 for (; *I != 0; I = &(*I)->Next)
625 if (Item.URI == (*I)->URI)
626 {
627 Item.Owner->Status = Item::StatDone;
628 return false;
629 }
630
631 // Create a new item
632 QItem *Itm = new QItem;
633 *Itm = Item;
634 Itm->Next = 0;
635 *I = Itm;
636
637 Item.Owner->QueueCounter++;
638 if (Items->Next == 0)
639 Cycle();
640 return true;
641 }
642 /*}}}*/
643 // Queue::Dequeue - Remove an item from the queue /*{{{*/
644 // ---------------------------------------------------------------------
645 /* We return true if we hit something */
646 bool pkgAcquire::Queue::Dequeue(Item *Owner)
647 {
648 if (Owner->Status == pkgAcquire::Item::StatFetching)
649 return _error->Error("Tried to dequeue a fetching object");
650
651 bool Res = false;
652
653 QItem **I = &Items;
654 for (; *I != 0;)
655 {
656 if ((*I)->Owner == Owner)
657 {
658 QItem *Jnk= *I;
659 *I = (*I)->Next;
660 Owner->QueueCounter--;
661 delete Jnk;
662 Res = true;
663 }
664 else
665 I = &(*I)->Next;
666 }
667
668 return Res;
669 }
670 /*}}}*/
671 // Queue::Startup - Start the worker processes /*{{{*/
672 // ---------------------------------------------------------------------
673 /* It is possible for this to be called with a pre-existing set of
674 workers. */
675 bool pkgAcquire::Queue::Startup()
676 {
677 if (Workers == 0)
678 {
679 URI U(Name);
680 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
681 if (Cnf == 0)
682 return false;
683
684 Workers = new Worker(this,Cnf,Owner->Log);
685 Owner->Add(Workers);
686 if (Workers->Start() == false)
687 return false;
688
689 /* When pipelining we commit 10 items. This needs to change when we
690 added other source retry to have cycle maintain a pipeline depth
691 on its own. */
692 if (Cnf->Pipeline == true)
693 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
694 else
695 MaxPipeDepth = 1;
696 }
697
698 return Cycle();
699 }
700 /*}}}*/
701 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
702 // ---------------------------------------------------------------------
703 /* If final is true then all workers are eliminated, otherwise only workers
704 that do not need cleanup are removed */
705 bool pkgAcquire::Queue::Shutdown(bool Final)
706 {
707 // Delete all of the workers
708 pkgAcquire::Worker **Cur = &Workers;
709 while (*Cur != 0)
710 {
711 pkgAcquire::Worker *Jnk = *Cur;
712 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
713 {
714 *Cur = Jnk->NextQueue;
715 Owner->Remove(Jnk);
716 delete Jnk;
717 }
718 else
719 Cur = &(*Cur)->NextQueue;
720 }
721
722 return true;
723 }
724 /*}}}*/
725 // Queue::FindItem - Find a URI in the item list /*{{{*/
726 // ---------------------------------------------------------------------
727 /* */
728 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
729 {
730 for (QItem *I = Items; I != 0; I = I->Next)
731 if (I->URI == URI && I->Worker == Owner)
732 return I;
733 return 0;
734 }
735 /*}}}*/
736 // Queue::ItemDone - Item has been completed /*{{{*/
737 // ---------------------------------------------------------------------
738 /* The worker signals this which causes the item to be removed from the
739 queue. If this is the last queue instance then it is removed from the
740 main queue too.*/
741 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
742 {
743 PipeDepth--;
744 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
745 Itm->Owner->Status = pkgAcquire::Item::StatDone;
746
747 if (Itm->Owner->QueueCounter <= 1)
748 Owner->Dequeue(Itm->Owner);
749 else
750 {
751 Dequeue(Itm->Owner);
752 Owner->Bump();
753 }
754
755 return Cycle();
756 }
757 /*}}}*/
758 // Queue::Cycle - Queue new items into the method /*{{{*/
759 // ---------------------------------------------------------------------
760 /* This locates a new idle item and sends it to the worker. If pipelining
761 is enabled then it keeps the pipe full. */
762 bool pkgAcquire::Queue::Cycle()
763 {
764 if (Items == 0 || Workers == 0)
765 return true;
766
767 if (PipeDepth < 0)
768 return _error->Error("Pipedepth failure");
769
770 // Look for a queable item
771 QItem *I = Items;
772 while (PipeDepth < (signed)MaxPipeDepth)
773 {
774 for (; I != 0; I = I->Next)
775 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
776 break;
777
778 // Nothing to do, queue is idle.
779 if (I == 0)
780 return true;
781
782 I->Worker = Workers;
783 I->Owner->Status = pkgAcquire::Item::StatFetching;
784 PipeDepth++;
785 if (Workers->QueueItem(I) == false)
786 return false;
787 }
788
789 return true;
790 }
791 /*}}}*/
792 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
793 // ---------------------------------------------------------------------
794 /* This is called when an item in multiple queues is dequeued */
795 void pkgAcquire::Queue::Bump()
796 {
797 Cycle();
798 }
799 /*}}}*/
800 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
801 // ---------------------------------------------------------------------
802 /* */
803 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
804 {
805 Start();
806 }
807 /*}}}*/
808 // AcquireStatus::Pulse - Called periodically /*{{{*/
809 // ---------------------------------------------------------------------
810 /* This computes some internal state variables for the derived classes to
811 use. It generates the current downloaded bytes and total bytes to download
812 as well as the current CPS estimate. */
813 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
814 {
815 TotalBytes = 0;
816 CurrentBytes = 0;
817 TotalItems = 0;
818 CurrentItems = 0;
819
820 // Compute the total number of bytes to fetch
821 unsigned int Unknown = 0;
822 unsigned int Count = 0;
823 bool UnfetchedReleaseFiles = false;
824 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
825 I != Owner->ItemsEnd();
826 ++I, ++Count)
827 {
828 TotalItems++;
829 if ((*I)->Status == pkgAcquire::Item::StatDone)
830 ++CurrentItems;
831
832 // Totally ignore local items
833 if ((*I)->Local == true)
834 continue;
835
836 // see if the method tells us to expect more
837 TotalItems += (*I)->ExpectedAdditionalItems;
838
839 // check if there are unfetched Release files
840 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
841 UnfetchedReleaseFiles = true;
842
843 TotalBytes += (*I)->FileSize;
844 if ((*I)->Complete == true)
845 CurrentBytes += (*I)->FileSize;
846 if ((*I)->FileSize == 0 && (*I)->Complete == false)
847 ++Unknown;
848 }
849
850 // Compute the current completion
851 unsigned long long ResumeSize = 0;
852 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
853 I = Owner->WorkerStep(I))
854 {
855 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
856 {
857 CurrentBytes += I->CurrentSize;
858 ResumeSize += I->ResumePoint;
859
860 // Files with unknown size always have 100% completion
861 if (I->CurrentItem->Owner->FileSize == 0 &&
862 I->CurrentItem->Owner->Complete == false)
863 TotalBytes += I->CurrentSize;
864 }
865 }
866
867 // Normalize the figures and account for unknown size downloads
868 if (TotalBytes <= 0)
869 TotalBytes = 1;
870 if (Unknown == Count)
871 TotalBytes = Unknown;
872
873 // Wha?! Is not supposed to happen.
874 if (CurrentBytes > TotalBytes)
875 CurrentBytes = TotalBytes;
876
877 // debug
878 if (_config->FindB("Debug::acquire::progress", false) == true)
879 std::clog << " Bytes: "
880 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
881 << std::endl;
882
883 // Compute the CPS
884 struct timeval NewTime;
885 gettimeofday(&NewTime,0);
886 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
887 NewTime.tv_sec - Time.tv_sec > 6)
888 {
889 double Delta = NewTime.tv_sec - Time.tv_sec +
890 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
891
892 // Compute the CPS value
893 if (Delta < 0.01)
894 CurrentCPS = 0;
895 else
896 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
897 LastBytes = CurrentBytes - ResumeSize;
898 ElapsedTime = (unsigned long long)Delta;
899 Time = NewTime;
900 }
901
902 // calculate the percentage, if we have too little data assume 1%
903 if (TotalBytes > 0 && UnfetchedReleaseFiles)
904 Percent = 0;
905 else
906 // use both files and bytes because bytes can be unreliable
907 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
908 0.2 * (CurrentItems/float(TotalItems)*100.0));
909
910 int fd = _config->FindI("APT::Status-Fd",-1);
911 if(fd > 0)
912 {
913 ostringstream status;
914
915 char msg[200];
916 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
917 unsigned long long ETA = 0;
918 if(CurrentCPS > 0)
919 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
920
921 // only show the ETA if it makes sense
922 if (ETA > 0 && ETA < 172800 /* two days */ )
923 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
924 else
925 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
926
927 // build the status str
928 status << "dlstatus:" << i
929 << ":" << std::setprecision(3) << Percent
930 << ":" << msg
931 << endl;
932
933 std::string const dlstatus = status.str();
934 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
935 }
936
937 return true;
938 }
939 /*}}}*/
940 // AcquireStatus::Start - Called when the download is started /*{{{*/
941 // ---------------------------------------------------------------------
942 /* We just reset the counters */
943 void pkgAcquireStatus::Start()
944 {
945 gettimeofday(&Time,0);
946 gettimeofday(&StartTime,0);
947 LastBytes = 0;
948 CurrentCPS = 0;
949 CurrentBytes = 0;
950 TotalBytes = 0;
951 FetchedBytes = 0;
952 ElapsedTime = 0;
953 TotalItems = 0;
954 CurrentItems = 0;
955 }
956 /*}}}*/
957 // AcquireStatus::Stop - Finished downloading /*{{{*/
958 // ---------------------------------------------------------------------
959 /* This accurately computes the elapsed time and the total overall CPS. */
960 void pkgAcquireStatus::Stop()
961 {
962 // Compute the CPS and elapsed time
963 struct timeval NewTime;
964 gettimeofday(&NewTime,0);
965
966 double Delta = NewTime.tv_sec - StartTime.tv_sec +
967 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
968
969 // Compute the CPS value
970 if (Delta < 0.01)
971 CurrentCPS = 0;
972 else
973 CurrentCPS = FetchedBytes/Delta;
974 LastBytes = CurrentBytes;
975 ElapsedTime = (unsigned long long)Delta;
976 }
977 /*}}}*/
978 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
979 // ---------------------------------------------------------------------
980 /* This is used to get accurate final transfer rate reporting. */
981 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
982 {
983 FetchedBytes += Size - Resume;
984 }
985 /*}}}*/