]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
add pkgAcquire::TransactionHasError()
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40 #include <sys/stat.h>
41
42 #include <apti18n.h>
43 /*}}}*/
44
45 using namespace std;
46
47 // Acquire::pkgAcquire - Constructor /*{{{*/
48 // ---------------------------------------------------------------------
49 /* We grab some runtime state from the configuration space */
50 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
51 Debug(_config->FindB("Debug::pkgAcquire",false)),
52 Running(false)
53 {
54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
58 QueueMode = QueueAccess;
59 }
60 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
63 Running(false)
64 {
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71 }
72 /*}}}*/
73 // Acquire::Setup - Delayed Constructor /*{{{*/
74 // ---------------------------------------------------------------------
75 /* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
77 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
78 {
79 Log = Progress;
80
81 // check for existence and possibly create auxiliary directories
82 string const listDir = _config->FindDir("Dir::State::lists");
83 string const partialListDir = listDir + "partial/";
84 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
85 string const partialArchivesDir = archivesDir + "partial/";
86
87 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
88 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
89 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
90
91 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
92 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
93 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
94
95 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
96 return true;
97
98 // Lock the directory this acquire object will work in
99 LockFD = GetLock(flCombine(Lock, "lock"));
100 if (LockFD == -1)
101 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
102
103 return true;
104 }
105 /*}}}*/
106 // Acquire::~pkgAcquire - Destructor /*{{{*/
107 // ---------------------------------------------------------------------
108 /* Free our memory, clean up the queues (destroy the workers) */
109 pkgAcquire::~pkgAcquire()
110 {
111 Shutdown();
112
113 if (LockFD != -1)
114 close(LockFD);
115
116 while (Configs != 0)
117 {
118 MethodConfig *Jnk = Configs;
119 Configs = Configs->Next;
120 delete Jnk;
121 }
122 }
123 /*}}}*/
124 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
125 // ---------------------------------------------------------------------
126 /* */
127 void pkgAcquire::Shutdown()
128 {
129 while (Items.empty() == false)
130 {
131 if (Items[0]->Status == Item::StatFetching)
132 Items[0]->Status = Item::StatError;
133 delete Items[0];
134 }
135
136 while (Queues != 0)
137 {
138 Queue *Jnk = Queues;
139 Queues = Queues->Next;
140 delete Jnk;
141 }
142 }
143 /*}}}*/
144 // Acquire::Add - Add a new item /*{{{*/
145 // ---------------------------------------------------------------------
146 /* This puts an item on the acquire list. This list is mainly for tracking
147 item status */
148 void pkgAcquire::Add(Item *Itm)
149 {
150 Items.push_back(Itm);
151 }
152 /*}}}*/
153 // Acquire::Remove - Remove a item /*{{{*/
154 // ---------------------------------------------------------------------
155 /* Remove an item from the acquire list. This is usually not used.. */
156 void pkgAcquire::Remove(Item *Itm)
157 {
158 Dequeue(Itm);
159
160 for (ItemIterator I = Items.begin(); I != Items.end();)
161 {
162 if (*I == Itm)
163 {
164 Items.erase(I);
165 I = Items.begin();
166 }
167 else
168 ++I;
169 }
170 }
171 /*}}}*/
172 // Acquire::AbortTransaction - Remove a transaction /*{{{*/
173 void pkgAcquire::AbortTransaction(unsigned long TransactionID)
174 {
175 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
176 std::clog << "AbortTransaction: " << TransactionID << std::endl;
177
178 std::vector<Item*> Transaction;
179 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
180 if((*I)->TransactionID == TransactionID)
181 Transaction.push_back(*I);
182
183 for (std::vector<Item*>::iterator I = Transaction.begin();
184 I != Transaction.end(); ++I)
185 {
186 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
187 std::clog << " Cancel: " << (*I)->DestFile << std::endl;
188 //Dequeue(*I);
189 (*I)->Status = pkgAcquire::Item::StatError;
190 }
191 }
192 /*}}}*/
193 bool pkgAcquire::TransactionHasError(unsigned long TransactionID)
194 {
195 std::vector<Item*> Transaction;
196 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
197 if((*I)->TransactionID == TransactionID)
198 if((*I)->Status == pkgAcquire::Item::StatError ||
199 (*I)->Status == pkgAcquire::Item::StatAuthError)
200 return true;
201 return false;
202 }
203 // Acquire::CommitTransaction - Commit a transaction /*{{{*/
204 void pkgAcquire::CommitTransaction(unsigned long TransactionID)
205 {
206 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
207 std::clog << "CommitTransaction: " << TransactionID << std::endl;
208
209 std::vector<Item*> Transaction;
210 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
211 if((*I)->TransactionID == TransactionID)
212 Transaction.push_back(*I);
213
214 // move new files into place *and* remove files that are not
215 // part of the transaction but are still on disk
216 for (std::vector<Item*>::iterator I = Transaction.begin();
217 I != Transaction.end(); ++I)
218 {
219 if((*I)->PartialFile != "")
220 {
221 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
222 std::clog << "mv "
223 << (*I)->PartialFile << " -> "
224 << (*I)->DestFile << std::endl;
225
226 Rename((*I)->PartialFile, (*I)->DestFile);
227 chmod((*I)->DestFile.c_str(),0644);
228 } else {
229 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
230 std::clog << "rm "
231 << (*I)->DestFile << std::endl;
232 unlink((*I)->DestFile.c_str());
233 }
234 }
235 }
236 /*}}}*/
237
238 // Acquire::Add - Add a worker /*{{{*/
239 // ---------------------------------------------------------------------
240 /* A list of workers is kept so that the select loop can direct their FD
241 usage. */
242 void pkgAcquire::Add(Worker *Work)
243 {
244 Work->NextAcquire = Workers;
245 Workers = Work;
246 }
247 /*}}}*/
248 // Acquire::Remove - Remove a worker /*{{{*/
249 // ---------------------------------------------------------------------
250 /* A worker has died. This can not be done while the select loop is running
251 as it would require that RunFds could handling a changing list state and
252 it can't.. */
253 void pkgAcquire::Remove(Worker *Work)
254 {
255 if (Running == true)
256 abort();
257
258 Worker **I = &Workers;
259 for (; *I != 0;)
260 {
261 if (*I == Work)
262 *I = (*I)->NextAcquire;
263 else
264 I = &(*I)->NextAcquire;
265 }
266 }
267 /*}}}*/
268 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
269 // ---------------------------------------------------------------------
270 /* This is the entry point for an item. An item calls this function when
271 it is constructed which creates a queue (based on the current queue
272 mode) and puts the item in that queue. If the system is running then
273 the queue might be started. */
274 void pkgAcquire::Enqueue(ItemDesc &Item)
275 {
276 // Determine which queue to put the item in
277 const MethodConfig *Config;
278 string Name = QueueName(Item.URI,Config);
279 if (Name.empty() == true)
280 return;
281
282 // Find the queue structure
283 Queue *I = Queues;
284 for (; I != 0 && I->Name != Name; I = I->Next);
285 if (I == 0)
286 {
287 I = new Queue(Name,this);
288 I->Next = Queues;
289 Queues = I;
290
291 if (Running == true)
292 I->Startup();
293 }
294
295 // See if this is a local only URI
296 if (Config->LocalOnly == true && Item.Owner->Complete == false)
297 Item.Owner->Local = true;
298 Item.Owner->Status = Item::StatIdle;
299
300 // Queue it into the named queue
301 if(I->Enqueue(Item))
302 ToFetch++;
303
304 // Some trace stuff
305 if (Debug == true)
306 {
307 clog << "Fetching " << Item.URI << endl;
308 clog << " to " << Item.Owner->DestFile << endl;
309 clog << " Queue is: " << Name << endl;
310 }
311 }
312 /*}}}*/
313 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
314 // ---------------------------------------------------------------------
315 /* This is called when an item is finished being fetched. It removes it
316 from all the queues */
317 void pkgAcquire::Dequeue(Item *Itm)
318 {
319 Queue *I = Queues;
320 bool Res = false;
321 if (Debug == true)
322 clog << "Dequeuing " << Itm->DestFile << endl;
323
324 for (; I != 0; I = I->Next)
325 {
326 if (I->Dequeue(Itm))
327 {
328 Res = true;
329 if (Debug == true)
330 clog << "Dequeued from " << I->Name << endl;
331 }
332 }
333
334 if (Res == true)
335 ToFetch--;
336 }
337 /*}}}*/
338 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
339 // ---------------------------------------------------------------------
340 /* The string returned depends on the configuration settings and the
341 method parameters. Given something like http://foo.org/bar it can
342 return http://foo.org or http */
343 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
344 {
345 URI U(Uri);
346
347 Config = GetConfig(U.Access);
348 if (Config == 0)
349 return string();
350
351 /* Single-Instance methods get exactly one queue per URI. This is
352 also used for the Access queue method */
353 if (Config->SingleInstance == true || QueueMode == QueueAccess)
354 return U.Access;
355
356 string AccessSchema = U.Access + ':',
357 FullQueueName = AccessSchema + U.Host;
358 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
359
360 Queue *I = Queues;
361 for (; I != 0; I = I->Next) {
362 // if the queue already exists, re-use it
363 if (I->Name == FullQueueName)
364 return FullQueueName;
365
366 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
367 Instances++;
368 }
369
370 if (Debug) {
371 clog << "Found " << Instances << " instances of " << U.Access << endl;
372 }
373
374 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
375 return U.Access;
376
377 return FullQueueName;
378 }
379 /*}}}*/
380 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
381 // ---------------------------------------------------------------------
382 /* This locates the configuration structure for an access method. If
383 a config structure cannot be found a Worker will be created to
384 retrieve it */
385 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
386 {
387 // Search for an existing config
388 MethodConfig *Conf;
389 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
390 if (Conf->Access == Access)
391 return Conf;
392
393 // Create the new config class
394 Conf = new MethodConfig;
395 Conf->Access = Access;
396 Conf->Next = Configs;
397 Configs = Conf;
398
399 // Create the worker to fetch the configuration
400 Worker Work(Conf);
401 if (Work.Start() == false)
402 return 0;
403
404 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
405 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
406 Conf->SingleInstance = true;
407
408 return Conf;
409 }
410 /*}}}*/
411 // Acquire::SetFds - Deal with readable FDs /*{{{*/
412 // ---------------------------------------------------------------------
413 /* Collect FDs that have activity monitors into the fd sets */
414 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
415 {
416 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
417 {
418 if (I->InReady == true && I->InFd >= 0)
419 {
420 if (Fd < I->InFd)
421 Fd = I->InFd;
422 FD_SET(I->InFd,RSet);
423 }
424 if (I->OutReady == true && I->OutFd >= 0)
425 {
426 if (Fd < I->OutFd)
427 Fd = I->OutFd;
428 FD_SET(I->OutFd,WSet);
429 }
430 }
431 }
432 /*}}}*/
433 // Acquire::RunFds - Deal with active FDs /*{{{*/
434 // ---------------------------------------------------------------------
435 /* Dispatch active FDs over to the proper workers. It is very important
436 that a worker never be erased while this is running! The queue class
437 should never erase a worker except during shutdown processing. */
438 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
439 {
440 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
441 {
442 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
443 I->InFdReady();
444 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
445 I->OutFdReady();
446 }
447 }
448 /*}}}*/
449 // Acquire::Run - Run the fetch sequence /*{{{*/
450 // ---------------------------------------------------------------------
451 /* This runs the queues. It manages a select loop for all of the
452 Worker tasks. The workers interact with the queues and items to
453 manage the actual fetch. */
454 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
455 {
456 Running = true;
457
458 for (Queue *I = Queues; I != 0; I = I->Next)
459 I->Startup();
460
461 if (Log != 0)
462 Log->Start();
463
464 bool WasCancelled = false;
465
466 // Run till all things have been acquired
467 struct timeval tv;
468 tv.tv_sec = 0;
469 tv.tv_usec = PulseIntervall;
470 while (ToFetch > 0)
471 {
472 fd_set RFds;
473 fd_set WFds;
474 int Highest = 0;
475 FD_ZERO(&RFds);
476 FD_ZERO(&WFds);
477 SetFds(Highest,&RFds,&WFds);
478
479 int Res;
480 do
481 {
482 Res = select(Highest+1,&RFds,&WFds,0,&tv);
483 }
484 while (Res < 0 && errno == EINTR);
485
486 if (Res < 0)
487 {
488 _error->Errno("select","Select has failed");
489 break;
490 }
491
492 RunFds(&RFds,&WFds);
493 if (_error->PendingError() == true)
494 break;
495
496 // Timeout, notify the log class
497 if (Res == 0 || (Log != 0 && Log->Update == true))
498 {
499 tv.tv_usec = PulseIntervall;
500 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
501 I->Pulse();
502 if (Log != 0 && Log->Pulse(this) == false)
503 {
504 WasCancelled = true;
505 break;
506 }
507 }
508 }
509
510 if (Log != 0)
511 Log->Stop();
512
513 // Shut down the acquire bits
514 Running = false;
515 for (Queue *I = Queues; I != 0; I = I->Next)
516 I->Shutdown(false);
517
518 // Shut down the items
519 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
520 (*I)->Finished();
521
522 if (_error->PendingError())
523 return Failed;
524 if (WasCancelled)
525 return Cancelled;
526 return Continue;
527 }
528 /*}}}*/
529 // Acquire::Bump - Called when an item is dequeued /*{{{*/
530 // ---------------------------------------------------------------------
531 /* This routine bumps idle queues in hopes that they will be able to fetch
532 the dequeued item */
533 void pkgAcquire::Bump()
534 {
535 for (Queue *I = Queues; I != 0; I = I->Next)
536 I->Bump();
537 }
538 /*}}}*/
539 // Acquire::WorkerStep - Step to the next worker /*{{{*/
540 // ---------------------------------------------------------------------
541 /* Not inlined to advoid including acquire-worker.h */
542 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
543 {
544 return I->NextAcquire;
545 }
546 /*}}}*/
547 // Acquire::Clean - Cleans a directory /*{{{*/
548 // ---------------------------------------------------------------------
549 /* This is a bit simplistic, it looks at every file in the dir and sees
550 if it is part of the download set. */
551 bool pkgAcquire::Clean(string Dir)
552 {
553 // non-existing directories are by definition clean…
554 if (DirectoryExists(Dir) == false)
555 return true;
556
557 if(Dir == "/")
558 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
559
560 DIR *D = opendir(Dir.c_str());
561 if (D == 0)
562 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
563
564 string StartDir = SafeGetCWD();
565 if (chdir(Dir.c_str()) != 0)
566 {
567 closedir(D);
568 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
569 }
570
571 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
572 {
573 // Skip some files..
574 if (strcmp(Dir->d_name,"lock") == 0 ||
575 strcmp(Dir->d_name,"partial") == 0 ||
576 strcmp(Dir->d_name,".") == 0 ||
577 strcmp(Dir->d_name,"..") == 0)
578 continue;
579
580 // Look in the get list
581 ItemCIterator I = Items.begin();
582 for (; I != Items.end(); ++I)
583 if (flNotDir((*I)->DestFile) == Dir->d_name)
584 break;
585
586 // Nothing found, nuke it
587 if (I == Items.end())
588 unlink(Dir->d_name);
589 };
590
591 closedir(D);
592 if (chdir(StartDir.c_str()) != 0)
593 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
594 return true;
595 }
596 /*}}}*/
597 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
598 // ---------------------------------------------------------------------
599 /* This is the total number of bytes needed */
600 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
601 {
602 unsigned long long Total = 0;
603 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
604 Total += (*I)->FileSize;
605 return Total;
606 }
607 /*}}}*/
608 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
609 // ---------------------------------------------------------------------
610 /* This is the number of bytes that is not local */
611 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
612 {
613 unsigned long long Total = 0;
614 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
615 if ((*I)->Local == false)
616 Total += (*I)->FileSize;
617 return Total;
618 }
619 /*}}}*/
620 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
621 // ---------------------------------------------------------------------
622 /* This is the number of bytes that is not local */
623 APT_PURE unsigned long long pkgAcquire::PartialPresent()
624 {
625 unsigned long long Total = 0;
626 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
627 if ((*I)->Local == false)
628 Total += (*I)->PartialSize;
629 return Total;
630 }
631 /*}}}*/
632 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
633 // ---------------------------------------------------------------------
634 /* */
635 pkgAcquire::UriIterator pkgAcquire::UriBegin()
636 {
637 return UriIterator(Queues);
638 }
639 /*}}}*/
640 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
641 // ---------------------------------------------------------------------
642 /* */
643 pkgAcquire::UriIterator pkgAcquire::UriEnd()
644 {
645 return UriIterator(0);
646 }
647 /*}}}*/
648 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
649 // ---------------------------------------------------------------------
650 /* */
651 pkgAcquire::MethodConfig::MethodConfig()
652 {
653 SingleInstance = false;
654 Pipeline = false;
655 SendConfig = false;
656 LocalOnly = false;
657 Removable = false;
658 Next = 0;
659 }
660 /*}}}*/
661 // Queue::Queue - Constructor /*{{{*/
662 // ---------------------------------------------------------------------
663 /* */
664 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
665 Owner(Owner)
666 {
667 Items = 0;
668 Next = 0;
669 Workers = 0;
670 MaxPipeDepth = 1;
671 PipeDepth = 0;
672 }
673 /*}}}*/
674 // Queue::~Queue - Destructor /*{{{*/
675 // ---------------------------------------------------------------------
676 /* */
677 pkgAcquire::Queue::~Queue()
678 {
679 Shutdown(true);
680
681 while (Items != 0)
682 {
683 QItem *Jnk = Items;
684 Items = Items->Next;
685 delete Jnk;
686 }
687 }
688 /*}}}*/
689 // Queue::Enqueue - Queue an item to the queue /*{{{*/
690 // ---------------------------------------------------------------------
691 /* */
692 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
693 {
694 QItem **I = &Items;
695 // move to the end of the queue and check for duplicates here
696 for (; *I != 0; I = &(*I)->Next)
697 if (Item.URI == (*I)->URI)
698 {
699 Item.Owner->Status = Item::StatDone;
700 return false;
701 }
702
703 // Create a new item
704 QItem *Itm = new QItem;
705 *Itm = Item;
706 Itm->Next = 0;
707 *I = Itm;
708
709 Item.Owner->QueueCounter++;
710 if (Items->Next == 0)
711 Cycle();
712 return true;
713 }
714 /*}}}*/
715 // Queue::Dequeue - Remove an item from the queue /*{{{*/
716 // ---------------------------------------------------------------------
717 /* We return true if we hit something */
718 bool pkgAcquire::Queue::Dequeue(Item *Owner)
719 {
720 if (Owner->Status == pkgAcquire::Item::StatFetching)
721 return _error->Error("Tried to dequeue a fetching object");
722
723 bool Res = false;
724
725 QItem **I = &Items;
726 for (; *I != 0;)
727 {
728 if ((*I)->Owner == Owner)
729 {
730 QItem *Jnk= *I;
731 *I = (*I)->Next;
732 Owner->QueueCounter--;
733 delete Jnk;
734 Res = true;
735 }
736 else
737 I = &(*I)->Next;
738 }
739
740 return Res;
741 }
742 /*}}}*/
743 // Queue::Startup - Start the worker processes /*{{{*/
744 // ---------------------------------------------------------------------
745 /* It is possible for this to be called with a pre-existing set of
746 workers. */
747 bool pkgAcquire::Queue::Startup()
748 {
749 if (Workers == 0)
750 {
751 URI U(Name);
752 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
753 if (Cnf == 0)
754 return false;
755
756 Workers = new Worker(this,Cnf,Owner->Log);
757 Owner->Add(Workers);
758 if (Workers->Start() == false)
759 return false;
760
761 /* When pipelining we commit 10 items. This needs to change when we
762 added other source retry to have cycle maintain a pipeline depth
763 on its own. */
764 if (Cnf->Pipeline == true)
765 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
766 else
767 MaxPipeDepth = 1;
768 }
769
770 return Cycle();
771 }
772 /*}}}*/
773 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
774 // ---------------------------------------------------------------------
775 /* If final is true then all workers are eliminated, otherwise only workers
776 that do not need cleanup are removed */
777 bool pkgAcquire::Queue::Shutdown(bool Final)
778 {
779 // Delete all of the workers
780 pkgAcquire::Worker **Cur = &Workers;
781 while (*Cur != 0)
782 {
783 pkgAcquire::Worker *Jnk = *Cur;
784 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
785 {
786 *Cur = Jnk->NextQueue;
787 Owner->Remove(Jnk);
788 delete Jnk;
789 }
790 else
791 Cur = &(*Cur)->NextQueue;
792 }
793
794 return true;
795 }
796 /*}}}*/
797 // Queue::FindItem - Find a URI in the item list /*{{{*/
798 // ---------------------------------------------------------------------
799 /* */
800 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
801 {
802 for (QItem *I = Items; I != 0; I = I->Next)
803 if (I->URI == URI && I->Worker == Owner)
804 return I;
805 return 0;
806 }
807 /*}}}*/
808 // Queue::ItemDone - Item has been completed /*{{{*/
809 // ---------------------------------------------------------------------
810 /* The worker signals this which causes the item to be removed from the
811 queue. If this is the last queue instance then it is removed from the
812 main queue too.*/
813 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
814 {
815 PipeDepth--;
816 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
817 Itm->Owner->Status = pkgAcquire::Item::StatDone;
818
819 if (Itm->Owner->QueueCounter <= 1)
820 Owner->Dequeue(Itm->Owner);
821 else
822 {
823 Dequeue(Itm->Owner);
824 Owner->Bump();
825 }
826
827 return Cycle();
828 }
829 /*}}}*/
830 // Queue::Cycle - Queue new items into the method /*{{{*/
831 // ---------------------------------------------------------------------
832 /* This locates a new idle item and sends it to the worker. If pipelining
833 is enabled then it keeps the pipe full. */
834 bool pkgAcquire::Queue::Cycle()
835 {
836 if (Items == 0 || Workers == 0)
837 return true;
838
839 if (PipeDepth < 0)
840 return _error->Error("Pipedepth failure");
841
842 // Look for a queable item
843 QItem *I = Items;
844 while (PipeDepth < (signed)MaxPipeDepth)
845 {
846 for (; I != 0; I = I->Next)
847 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
848 break;
849
850 // Nothing to do, queue is idle.
851 if (I == 0)
852 return true;
853
854 I->Worker = Workers;
855 I->Owner->Status = pkgAcquire::Item::StatFetching;
856 PipeDepth++;
857 if (Workers->QueueItem(I) == false)
858 return false;
859 }
860
861 return true;
862 }
863 /*}}}*/
864 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
865 // ---------------------------------------------------------------------
866 /* This is called when an item in multiple queues is dequeued */
867 void pkgAcquire::Queue::Bump()
868 {
869 Cycle();
870 }
871 /*}}}*/
872 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
873 // ---------------------------------------------------------------------
874 /* */
875 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
876 {
877 Start();
878 }
879 /*}}}*/
880 // AcquireStatus::Pulse - Called periodically /*{{{*/
881 // ---------------------------------------------------------------------
882 /* This computes some internal state variables for the derived classes to
883 use. It generates the current downloaded bytes and total bytes to download
884 as well as the current CPS estimate. */
885 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
886 {
887 TotalBytes = 0;
888 CurrentBytes = 0;
889 TotalItems = 0;
890 CurrentItems = 0;
891
892 // Compute the total number of bytes to fetch
893 unsigned int Unknown = 0;
894 unsigned int Count = 0;
895 bool UnfetchedReleaseFiles = false;
896 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
897 I != Owner->ItemsEnd();
898 ++I, ++Count)
899 {
900 TotalItems++;
901 if ((*I)->Status == pkgAcquire::Item::StatDone)
902 ++CurrentItems;
903
904 // Totally ignore local items
905 if ((*I)->Local == true)
906 continue;
907
908 // see if the method tells us to expect more
909 TotalItems += (*I)->ExpectedAdditionalItems;
910
911 // check if there are unfetched Release files
912 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
913 UnfetchedReleaseFiles = true;
914
915 TotalBytes += (*I)->FileSize;
916 if ((*I)->Complete == true)
917 CurrentBytes += (*I)->FileSize;
918 if ((*I)->FileSize == 0 && (*I)->Complete == false)
919 ++Unknown;
920 }
921
922 // Compute the current completion
923 unsigned long long ResumeSize = 0;
924 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
925 I = Owner->WorkerStep(I))
926 {
927 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
928 {
929 CurrentBytes += I->CurrentSize;
930 ResumeSize += I->ResumePoint;
931
932 // Files with unknown size always have 100% completion
933 if (I->CurrentItem->Owner->FileSize == 0 &&
934 I->CurrentItem->Owner->Complete == false)
935 TotalBytes += I->CurrentSize;
936 }
937 }
938
939 // Normalize the figures and account for unknown size downloads
940 if (TotalBytes <= 0)
941 TotalBytes = 1;
942 if (Unknown == Count)
943 TotalBytes = Unknown;
944
945 // Wha?! Is not supposed to happen.
946 if (CurrentBytes > TotalBytes)
947 CurrentBytes = TotalBytes;
948
949 // debug
950 if (_config->FindB("Debug::acquire::progress", false) == true)
951 std::clog << " Bytes: "
952 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
953 << std::endl;
954
955 // Compute the CPS
956 struct timeval NewTime;
957 gettimeofday(&NewTime,0);
958 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
959 NewTime.tv_sec - Time.tv_sec > 6)
960 {
961 double Delta = NewTime.tv_sec - Time.tv_sec +
962 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
963
964 // Compute the CPS value
965 if (Delta < 0.01)
966 CurrentCPS = 0;
967 else
968 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
969 LastBytes = CurrentBytes - ResumeSize;
970 ElapsedTime = (unsigned long long)Delta;
971 Time = NewTime;
972 }
973
974 // calculate the percentage, if we have too little data assume 1%
975 if (TotalBytes > 0 && UnfetchedReleaseFiles)
976 Percent = 0;
977 else
978 // use both files and bytes because bytes can be unreliable
979 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
980 0.2 * (CurrentItems/float(TotalItems)*100.0));
981
982 int fd = _config->FindI("APT::Status-Fd",-1);
983 if(fd > 0)
984 {
985 ostringstream status;
986
987 char msg[200];
988 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
989 unsigned long long ETA = 0;
990 if(CurrentCPS > 0)
991 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
992
993 // only show the ETA if it makes sense
994 if (ETA > 0 && ETA < 172800 /* two days */ )
995 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
996 else
997 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
998
999 // build the status str
1000 status << "dlstatus:" << i
1001 << ":" << std::setprecision(3) << Percent
1002 << ":" << msg
1003 << endl;
1004
1005 std::string const dlstatus = status.str();
1006 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
1007 }
1008
1009 return true;
1010 }
1011 /*}}}*/
1012 // AcquireStatus::Start - Called when the download is started /*{{{*/
1013 // ---------------------------------------------------------------------
1014 /* We just reset the counters */
1015 void pkgAcquireStatus::Start()
1016 {
1017 gettimeofday(&Time,0);
1018 gettimeofday(&StartTime,0);
1019 LastBytes = 0;
1020 CurrentCPS = 0;
1021 CurrentBytes = 0;
1022 TotalBytes = 0;
1023 FetchedBytes = 0;
1024 ElapsedTime = 0;
1025 TotalItems = 0;
1026 CurrentItems = 0;
1027 }
1028 /*}}}*/
1029 // AcquireStatus::Stop - Finished downloading /*{{{*/
1030 // ---------------------------------------------------------------------
1031 /* This accurately computes the elapsed time and the total overall CPS. */
1032 void pkgAcquireStatus::Stop()
1033 {
1034 // Compute the CPS and elapsed time
1035 struct timeval NewTime;
1036 gettimeofday(&NewTime,0);
1037
1038 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1039 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
1040
1041 // Compute the CPS value
1042 if (Delta < 0.01)
1043 CurrentCPS = 0;
1044 else
1045 CurrentCPS = FetchedBytes/Delta;
1046 LastBytes = CurrentBytes;
1047 ElapsedTime = (unsigned long long)Delta;
1048 }
1049 /*}}}*/
1050 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1051 // ---------------------------------------------------------------------
1052 /* This is used to get accurate final transfer rate reporting. */
1053 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
1054 {
1055 FetchedBytes += Size - Resume;
1056 }
1057 /*}}}*/