]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Merge remote-tracking branch 'upstream/debian/experimental' into feature/acq-trans
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40 #include <sys/stat.h>
41
42 #include <apti18n.h>
43 /*}}}*/
44
45 using namespace std;
46
47 // Acquire::pkgAcquire - Constructor /*{{{*/
48 // ---------------------------------------------------------------------
49 /* We grab some runtime state from the configuration space */
50 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
51 Debug(_config->FindB("Debug::pkgAcquire",false)),
52 Running(false)
53 {
54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
58 QueueMode = QueueAccess;
59 }
60 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
63 Running(false)
64 {
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71 }
72 /*}}}*/
73 // Acquire::Setup - Delayed Constructor /*{{{*/
74 // ---------------------------------------------------------------------
75 /* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
77 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
78 {
79 Log = Progress;
80
81 // check for existence and possibly create auxiliary directories
82 string const listDir = _config->FindDir("Dir::State::lists");
83 string const partialListDir = listDir + "partial/";
84 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
85 string const partialArchivesDir = archivesDir + "partial/";
86
87 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
88 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
89 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
90
91 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
92 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
93 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
94
95 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
96 return true;
97
98 // Lock the directory this acquire object will work in
99 LockFD = GetLock(flCombine(Lock, "lock"));
100 if (LockFD == -1)
101 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
102
103 return true;
104 }
105 /*}}}*/
106 // Acquire::~pkgAcquire - Destructor /*{{{*/
107 // ---------------------------------------------------------------------
108 /* Free our memory, clean up the queues (destroy the workers) */
109 pkgAcquire::~pkgAcquire()
110 {
111 Shutdown();
112
113 if (LockFD != -1)
114 close(LockFD);
115
116 while (Configs != 0)
117 {
118 MethodConfig *Jnk = Configs;
119 Configs = Configs->Next;
120 delete Jnk;
121 }
122 }
123 /*}}}*/
124 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
125 // ---------------------------------------------------------------------
126 /* */
127 void pkgAcquire::Shutdown()
128 {
129 while (Items.empty() == false)
130 {
131 if (Items[0]->Status == Item::StatFetching)
132 Items[0]->Status = Item::StatError;
133 delete Items[0];
134 }
135
136 while (Queues != 0)
137 {
138 Queue *Jnk = Queues;
139 Queues = Queues->Next;
140 delete Jnk;
141 }
142 }
143 /*}}}*/
144 // Acquire::Add - Add a new item /*{{{*/
145 // ---------------------------------------------------------------------
146 /* This puts an item on the acquire list. This list is mainly for tracking
147 item status */
148 void pkgAcquire::Add(Item *Itm)
149 {
150 Items.push_back(Itm);
151 }
152 /*}}}*/
153 // Acquire::Remove - Remove a item /*{{{*/
154 // ---------------------------------------------------------------------
155 /* Remove an item from the acquire list. This is usually not used.. */
156 void pkgAcquire::Remove(Item *Itm)
157 {
158 Dequeue(Itm);
159
160 for (ItemIterator I = Items.begin(); I != Items.end();)
161 {
162 if (*I == Itm)
163 {
164 Items.erase(I);
165 I = Items.begin();
166 }
167 else
168 ++I;
169 }
170 }
171 /*}}}*/
172 // Acquire::AbortTransaction - Remove a transaction /*{{{*/
173 void pkgAcquire::AbortTransaction(unsigned long TransactionID)
174 {
175 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
176 std::clog << "AbortTransaction: " << TransactionID << std::endl;
177
178 std::vector<Item*> Transaction;
179 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
180 if((*I)->TransactionID == TransactionID)
181 Transaction.push_back(*I);
182
183 for (std::vector<Item*>::iterator I = Transaction.begin();
184 I != Transaction.end(); ++I)
185 {
186 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
187 std::clog << " Cancel: " << (*I)->DestFile << std::endl;
188 // the transaction will abort, so stop anything that is idle
189 if ((*I)->Status == pkgAcquire::Item::StatIdle)
190 (*I)->Status = pkgAcquire::Item::StatDone;
191 }
192 }
193 /*}}}*/
194 bool pkgAcquire::TransactionHasError(unsigned long TransactionID)
195 {
196 std::vector<Item*> Transaction;
197 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
198 if((*I)->TransactionID == TransactionID)
199 if((*I)->Status != pkgAcquire::Item::StatDone &&
200 (*I)->Status != pkgAcquire::Item::StatIdle)
201 return true;
202
203 return false;
204 }
205 // Acquire::CommitTransaction - Commit a transaction /*{{{*/
206 void pkgAcquire::CommitTransaction(unsigned long TransactionID)
207 {
208 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
209 std::clog << "CommitTransaction: " << TransactionID << std::endl;
210
211 std::vector<Item*> Transaction;
212 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
213 if((*I)->TransactionID == TransactionID)
214 Transaction.push_back(*I);
215
216 // move new files into place *and* remove files that are not
217 // part of the transaction but are still on disk
218 for (std::vector<Item*>::iterator I = Transaction.begin();
219 I != Transaction.end(); ++I)
220 {
221 if((*I)->PartialFile != "")
222 {
223 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
224 std::clog << "mv "
225 << (*I)->PartialFile << " -> "
226 << (*I)->DestFile << std::endl;
227 Rename((*I)->PartialFile, (*I)->DestFile);
228 chmod((*I)->DestFile.c_str(),0644);
229 } else {
230 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
231 std::clog << "rm "
232 << (*I)->DestFile << std::endl;
233 unlink((*I)->DestFile.c_str());
234 }
235 // mark that this transaction is finished
236 (*I)->TransactionID = 0;
237 }
238 }
239 /*}}}*/
240
241 // Acquire::Add - Add a worker /*{{{*/
242 // ---------------------------------------------------------------------
243 /* A list of workers is kept so that the select loop can direct their FD
244 usage. */
245 void pkgAcquire::Add(Worker *Work)
246 {
247 Work->NextAcquire = Workers;
248 Workers = Work;
249 }
250 /*}}}*/
251 // Acquire::Remove - Remove a worker /*{{{*/
252 // ---------------------------------------------------------------------
253 /* A worker has died. This can not be done while the select loop is running
254 as it would require that RunFds could handling a changing list state and
255 it can't.. */
256 void pkgAcquire::Remove(Worker *Work)
257 {
258 if (Running == true)
259 abort();
260
261 Worker **I = &Workers;
262 for (; *I != 0;)
263 {
264 if (*I == Work)
265 *I = (*I)->NextAcquire;
266 else
267 I = &(*I)->NextAcquire;
268 }
269 }
270 /*}}}*/
271 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
272 // ---------------------------------------------------------------------
273 /* This is the entry point for an item. An item calls this function when
274 it is constructed which creates a queue (based on the current queue
275 mode) and puts the item in that queue. If the system is running then
276 the queue might be started. */
277 void pkgAcquire::Enqueue(ItemDesc &Item)
278 {
279 // Determine which queue to put the item in
280 const MethodConfig *Config;
281 string Name = QueueName(Item.URI,Config);
282 if (Name.empty() == true)
283 return;
284
285 // Find the queue structure
286 Queue *I = Queues;
287 for (; I != 0 && I->Name != Name; I = I->Next);
288 if (I == 0)
289 {
290 I = new Queue(Name,this);
291 I->Next = Queues;
292 Queues = I;
293
294 if (Running == true)
295 I->Startup();
296 }
297
298 // See if this is a local only URI
299 if (Config->LocalOnly == true && Item.Owner->Complete == false)
300 Item.Owner->Local = true;
301 Item.Owner->Status = Item::StatIdle;
302
303 // Queue it into the named queue
304 if(I->Enqueue(Item))
305 ToFetch++;
306
307 // Some trace stuff
308 if (Debug == true)
309 {
310 clog << "Fetching " << Item.URI << endl;
311 clog << " to " << Item.Owner->DestFile << endl;
312 clog << " Queue is: " << Name << endl;
313 }
314 }
315 /*}}}*/
316 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
317 // ---------------------------------------------------------------------
318 /* This is called when an item is finished being fetched. It removes it
319 from all the queues */
320 void pkgAcquire::Dequeue(Item *Itm)
321 {
322 Queue *I = Queues;
323 bool Res = false;
324 if (Debug == true)
325 clog << "Dequeuing " << Itm->DestFile << endl;
326
327 for (; I != 0; I = I->Next)
328 {
329 if (I->Dequeue(Itm))
330 {
331 Res = true;
332 if (Debug == true)
333 clog << "Dequeued from " << I->Name << endl;
334 }
335 }
336
337 if (Res == true)
338 ToFetch--;
339 }
340 /*}}}*/
341 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
342 // ---------------------------------------------------------------------
343 /* The string returned depends on the configuration settings and the
344 method parameters. Given something like http://foo.org/bar it can
345 return http://foo.org or http */
346 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
347 {
348 URI U(Uri);
349
350 Config = GetConfig(U.Access);
351 if (Config == 0)
352 return string();
353
354 /* Single-Instance methods get exactly one queue per URI. This is
355 also used for the Access queue method */
356 if (Config->SingleInstance == true || QueueMode == QueueAccess)
357 return U.Access;
358
359 string AccessSchema = U.Access + ':',
360 FullQueueName = AccessSchema + U.Host;
361 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
362
363 Queue *I = Queues;
364 for (; I != 0; I = I->Next) {
365 // if the queue already exists, re-use it
366 if (I->Name == FullQueueName)
367 return FullQueueName;
368
369 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
370 Instances++;
371 }
372
373 if (Debug) {
374 clog << "Found " << Instances << " instances of " << U.Access << endl;
375 }
376
377 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
378 return U.Access;
379
380 return FullQueueName;
381 }
382 /*}}}*/
383 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
384 // ---------------------------------------------------------------------
385 /* This locates the configuration structure for an access method. If
386 a config structure cannot be found a Worker will be created to
387 retrieve it */
388 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
389 {
390 // Search for an existing config
391 MethodConfig *Conf;
392 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
393 if (Conf->Access == Access)
394 return Conf;
395
396 // Create the new config class
397 Conf = new MethodConfig;
398 Conf->Access = Access;
399 Conf->Next = Configs;
400 Configs = Conf;
401
402 // Create the worker to fetch the configuration
403 Worker Work(Conf);
404 if (Work.Start() == false)
405 return 0;
406
407 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
408 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
409 Conf->SingleInstance = true;
410
411 return Conf;
412 }
413 /*}}}*/
414 // Acquire::SetFds - Deal with readable FDs /*{{{*/
415 // ---------------------------------------------------------------------
416 /* Collect FDs that have activity monitors into the fd sets */
417 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
418 {
419 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
420 {
421 if (I->InReady == true && I->InFd >= 0)
422 {
423 if (Fd < I->InFd)
424 Fd = I->InFd;
425 FD_SET(I->InFd,RSet);
426 }
427 if (I->OutReady == true && I->OutFd >= 0)
428 {
429 if (Fd < I->OutFd)
430 Fd = I->OutFd;
431 FD_SET(I->OutFd,WSet);
432 }
433 }
434 }
435 /*}}}*/
436 // Acquire::RunFds - Deal with active FDs /*{{{*/
437 // ---------------------------------------------------------------------
438 /* Dispatch active FDs over to the proper workers. It is very important
439 that a worker never be erased while this is running! The queue class
440 should never erase a worker except during shutdown processing. */
441 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
442 {
443 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
444 {
445 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
446 I->InFdReady();
447 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
448 I->OutFdReady();
449 }
450 }
451 /*}}}*/
452 // Acquire::Run - Run the fetch sequence /*{{{*/
453 // ---------------------------------------------------------------------
454 /* This runs the queues. It manages a select loop for all of the
455 Worker tasks. The workers interact with the queues and items to
456 manage the actual fetch. */
457 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
458 {
459 Running = true;
460
461 for (Queue *I = Queues; I != 0; I = I->Next)
462 I->Startup();
463
464 if (Log != 0)
465 Log->Start();
466
467 bool WasCancelled = false;
468
469 // Run till all things have been acquired
470 struct timeval tv;
471 tv.tv_sec = 0;
472 tv.tv_usec = PulseIntervall;
473 while (ToFetch > 0)
474 {
475 fd_set RFds;
476 fd_set WFds;
477 int Highest = 0;
478 FD_ZERO(&RFds);
479 FD_ZERO(&WFds);
480 SetFds(Highest,&RFds,&WFds);
481
482 int Res;
483 do
484 {
485 Res = select(Highest+1,&RFds,&WFds,0,&tv);
486 }
487 while (Res < 0 && errno == EINTR);
488
489 if (Res < 0)
490 {
491 _error->Errno("select","Select has failed");
492 break;
493 }
494
495 RunFds(&RFds,&WFds);
496 if (_error->PendingError() == true)
497 break;
498
499 // Timeout, notify the log class
500 if (Res == 0 || (Log != 0 && Log->Update == true))
501 {
502 tv.tv_usec = PulseIntervall;
503 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
504 I->Pulse();
505 if (Log != 0 && Log->Pulse(this) == false)
506 {
507 WasCancelled = true;
508 break;
509 }
510 }
511 }
512
513 if (Log != 0)
514 Log->Stop();
515
516 // Shut down the acquire bits
517 Running = false;
518 for (Queue *I = Queues; I != 0; I = I->Next)
519 I->Shutdown(false);
520
521 // Shut down the items
522 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
523 (*I)->Finished();
524
525 if (_error->PendingError())
526 return Failed;
527 if (WasCancelled)
528 return Cancelled;
529 return Continue;
530 }
531 /*}}}*/
532 // Acquire::Bump - Called when an item is dequeued /*{{{*/
533 // ---------------------------------------------------------------------
534 /* This routine bumps idle queues in hopes that they will be able to fetch
535 the dequeued item */
536 void pkgAcquire::Bump()
537 {
538 for (Queue *I = Queues; I != 0; I = I->Next)
539 I->Bump();
540 }
541 /*}}}*/
542 // Acquire::WorkerStep - Step to the next worker /*{{{*/
543 // ---------------------------------------------------------------------
544 /* Not inlined to advoid including acquire-worker.h */
545 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
546 {
547 return I->NextAcquire;
548 }
549 /*}}}*/
550 // Acquire::Clean - Cleans a directory /*{{{*/
551 // ---------------------------------------------------------------------
552 /* This is a bit simplistic, it looks at every file in the dir and sees
553 if it is part of the download set. */
554 bool pkgAcquire::Clean(string Dir)
555 {
556 // non-existing directories are by definition clean…
557 if (DirectoryExists(Dir) == false)
558 return true;
559
560 if(Dir == "/")
561 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
562
563 DIR *D = opendir(Dir.c_str());
564 if (D == 0)
565 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
566
567 string StartDir = SafeGetCWD();
568 if (chdir(Dir.c_str()) != 0)
569 {
570 closedir(D);
571 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
572 }
573
574 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
575 {
576 // Skip some files..
577 if (strcmp(Dir->d_name,"lock") == 0 ||
578 strcmp(Dir->d_name,"partial") == 0 ||
579 strcmp(Dir->d_name,".") == 0 ||
580 strcmp(Dir->d_name,"..") == 0)
581 continue;
582
583 // Look in the get list
584 ItemCIterator I = Items.begin();
585 for (; I != Items.end(); ++I)
586 if (flNotDir((*I)->DestFile) == Dir->d_name)
587 break;
588
589 // Nothing found, nuke it
590 if (I == Items.end())
591 unlink(Dir->d_name);
592 };
593
594 closedir(D);
595 if (chdir(StartDir.c_str()) != 0)
596 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
597 return true;
598 }
599 /*}}}*/
600 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
601 // ---------------------------------------------------------------------
602 /* This is the total number of bytes needed */
603 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
604 {
605 unsigned long long Total = 0;
606 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
607 Total += (*I)->FileSize;
608 return Total;
609 }
610 /*}}}*/
611 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
612 // ---------------------------------------------------------------------
613 /* This is the number of bytes that is not local */
614 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
615 {
616 unsigned long long Total = 0;
617 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
618 if ((*I)->Local == false)
619 Total += (*I)->FileSize;
620 return Total;
621 }
622 /*}}}*/
623 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
624 // ---------------------------------------------------------------------
625 /* This is the number of bytes that is not local */
626 APT_PURE unsigned long long pkgAcquire::PartialPresent()
627 {
628 unsigned long long Total = 0;
629 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
630 if ((*I)->Local == false)
631 Total += (*I)->PartialSize;
632 return Total;
633 }
634 /*}}}*/
635 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
636 // ---------------------------------------------------------------------
637 /* */
638 pkgAcquire::UriIterator pkgAcquire::UriBegin()
639 {
640 return UriIterator(Queues);
641 }
642 /*}}}*/
643 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
644 // ---------------------------------------------------------------------
645 /* */
646 pkgAcquire::UriIterator pkgAcquire::UriEnd()
647 {
648 return UriIterator(0);
649 }
650 /*}}}*/
651 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
652 // ---------------------------------------------------------------------
653 /* */
654 pkgAcquire::MethodConfig::MethodConfig()
655 {
656 SingleInstance = false;
657 Pipeline = false;
658 SendConfig = false;
659 LocalOnly = false;
660 Removable = false;
661 Next = 0;
662 }
663 /*}}}*/
664 // Queue::Queue - Constructor /*{{{*/
665 // ---------------------------------------------------------------------
666 /* */
667 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
668 Owner(Owner)
669 {
670 Items = 0;
671 Next = 0;
672 Workers = 0;
673 MaxPipeDepth = 1;
674 PipeDepth = 0;
675 }
676 /*}}}*/
677 // Queue::~Queue - Destructor /*{{{*/
678 // ---------------------------------------------------------------------
679 /* */
680 pkgAcquire::Queue::~Queue()
681 {
682 Shutdown(true);
683
684 while (Items != 0)
685 {
686 QItem *Jnk = Items;
687 Items = Items->Next;
688 delete Jnk;
689 }
690 }
691 /*}}}*/
692 // Queue::Enqueue - Queue an item to the queue /*{{{*/
693 // ---------------------------------------------------------------------
694 /* */
695 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
696 {
697 QItem **I = &Items;
698 // move to the end of the queue and check for duplicates here
699 for (; *I != 0; I = &(*I)->Next)
700 if (Item.URI == (*I)->URI)
701 {
702 Item.Owner->Status = Item::StatDone;
703 return false;
704 }
705
706 // Create a new item
707 QItem *Itm = new QItem;
708 *Itm = Item;
709 Itm->Next = 0;
710 *I = Itm;
711
712 Item.Owner->QueueCounter++;
713 if (Items->Next == 0)
714 Cycle();
715 return true;
716 }
717 /*}}}*/
718 // Queue::Dequeue - Remove an item from the queue /*{{{*/
719 // ---------------------------------------------------------------------
720 /* We return true if we hit something */
721 bool pkgAcquire::Queue::Dequeue(Item *Owner)
722 {
723 if (Owner->Status == pkgAcquire::Item::StatFetching)
724 return _error->Error("Tried to dequeue a fetching object");
725
726 bool Res = false;
727
728 QItem **I = &Items;
729 for (; *I != 0;)
730 {
731 if ((*I)->Owner == Owner)
732 {
733 QItem *Jnk= *I;
734 *I = (*I)->Next;
735 Owner->QueueCounter--;
736 delete Jnk;
737 Res = true;
738 }
739 else
740 I = &(*I)->Next;
741 }
742
743 return Res;
744 }
745 /*}}}*/
746 // Queue::Startup - Start the worker processes /*{{{*/
747 // ---------------------------------------------------------------------
748 /* It is possible for this to be called with a pre-existing set of
749 workers. */
750 bool pkgAcquire::Queue::Startup()
751 {
752 if (Workers == 0)
753 {
754 URI U(Name);
755 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
756 if (Cnf == 0)
757 return false;
758
759 Workers = new Worker(this,Cnf,Owner->Log);
760 Owner->Add(Workers);
761 if (Workers->Start() == false)
762 return false;
763
764 /* When pipelining we commit 10 items. This needs to change when we
765 added other source retry to have cycle maintain a pipeline depth
766 on its own. */
767 if (Cnf->Pipeline == true)
768 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
769 else
770 MaxPipeDepth = 1;
771 }
772
773 return Cycle();
774 }
775 /*}}}*/
776 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
777 // ---------------------------------------------------------------------
778 /* If final is true then all workers are eliminated, otherwise only workers
779 that do not need cleanup are removed */
780 bool pkgAcquire::Queue::Shutdown(bool Final)
781 {
782 // Delete all of the workers
783 pkgAcquire::Worker **Cur = &Workers;
784 while (*Cur != 0)
785 {
786 pkgAcquire::Worker *Jnk = *Cur;
787 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
788 {
789 *Cur = Jnk->NextQueue;
790 Owner->Remove(Jnk);
791 delete Jnk;
792 }
793 else
794 Cur = &(*Cur)->NextQueue;
795 }
796
797 return true;
798 }
799 /*}}}*/
800 // Queue::FindItem - Find a URI in the item list /*{{{*/
801 // ---------------------------------------------------------------------
802 /* */
803 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
804 {
805 for (QItem *I = Items; I != 0; I = I->Next)
806 if (I->URI == URI && I->Worker == Owner)
807 return I;
808 return 0;
809 }
810 /*}}}*/
811 // Queue::ItemDone - Item has been completed /*{{{*/
812 // ---------------------------------------------------------------------
813 /* The worker signals this which causes the item to be removed from the
814 queue. If this is the last queue instance then it is removed from the
815 main queue too.*/
816 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
817 {
818 PipeDepth--;
819 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
820 Itm->Owner->Status = pkgAcquire::Item::StatDone;
821
822 if (Itm->Owner->QueueCounter <= 1)
823 Owner->Dequeue(Itm->Owner);
824 else
825 {
826 Dequeue(Itm->Owner);
827 Owner->Bump();
828 }
829
830 return Cycle();
831 }
832 /*}}}*/
833 // Queue::Cycle - Queue new items into the method /*{{{*/
834 // ---------------------------------------------------------------------
835 /* This locates a new idle item and sends it to the worker. If pipelining
836 is enabled then it keeps the pipe full. */
837 bool pkgAcquire::Queue::Cycle()
838 {
839 if (Items == 0 || Workers == 0)
840 return true;
841
842 if (PipeDepth < 0)
843 return _error->Error("Pipedepth failure");
844
845 // Look for a queable item
846 QItem *I = Items;
847 while (PipeDepth < (signed)MaxPipeDepth)
848 {
849 for (; I != 0; I = I->Next)
850 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
851 break;
852
853 // Nothing to do, queue is idle.
854 if (I == 0)
855 return true;
856
857 I->Worker = Workers;
858 I->Owner->Status = pkgAcquire::Item::StatFetching;
859 PipeDepth++;
860 if (Workers->QueueItem(I) == false)
861 return false;
862 }
863
864 return true;
865 }
866 /*}}}*/
867 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
868 // ---------------------------------------------------------------------
869 /* This is called when an item in multiple queues is dequeued */
870 void pkgAcquire::Queue::Bump()
871 {
872 Cycle();
873 }
874 /*}}}*/
875 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
876 // ---------------------------------------------------------------------
877 /* */
878 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
879 {
880 Start();
881 }
882 /*}}}*/
883 // AcquireStatus::Pulse - Called periodically /*{{{*/
884 // ---------------------------------------------------------------------
885 /* This computes some internal state variables for the derived classes to
886 use. It generates the current downloaded bytes and total bytes to download
887 as well as the current CPS estimate. */
888 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
889 {
890 TotalBytes = 0;
891 CurrentBytes = 0;
892 TotalItems = 0;
893 CurrentItems = 0;
894
895 // Compute the total number of bytes to fetch
896 unsigned int Unknown = 0;
897 unsigned int Count = 0;
898 bool UnfetchedReleaseFiles = false;
899 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
900 I != Owner->ItemsEnd();
901 ++I, ++Count)
902 {
903 TotalItems++;
904 if ((*I)->Status == pkgAcquire::Item::StatDone)
905 ++CurrentItems;
906
907 // Totally ignore local items
908 if ((*I)->Local == true)
909 continue;
910
911 // see if the method tells us to expect more
912 TotalItems += (*I)->ExpectedAdditionalItems;
913
914 // check if there are unfetched Release files
915 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
916 UnfetchedReleaseFiles = true;
917
918 TotalBytes += (*I)->FileSize;
919 if ((*I)->Complete == true)
920 CurrentBytes += (*I)->FileSize;
921 if ((*I)->FileSize == 0 && (*I)->Complete == false)
922 ++Unknown;
923 }
924
925 // Compute the current completion
926 unsigned long long ResumeSize = 0;
927 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
928 I = Owner->WorkerStep(I))
929 {
930 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
931 {
932 CurrentBytes += I->CurrentSize;
933 ResumeSize += I->ResumePoint;
934
935 // Files with unknown size always have 100% completion
936 if (I->CurrentItem->Owner->FileSize == 0 &&
937 I->CurrentItem->Owner->Complete == false)
938 TotalBytes += I->CurrentSize;
939 }
940 }
941
942 // Normalize the figures and account for unknown size downloads
943 if (TotalBytes <= 0)
944 TotalBytes = 1;
945 if (Unknown == Count)
946 TotalBytes = Unknown;
947
948 // Wha?! Is not supposed to happen.
949 if (CurrentBytes > TotalBytes)
950 CurrentBytes = TotalBytes;
951
952 // debug
953 if (_config->FindB("Debug::acquire::progress", false) == true)
954 std::clog << " Bytes: "
955 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
956 << std::endl;
957
958 // Compute the CPS
959 struct timeval NewTime;
960 gettimeofday(&NewTime,0);
961 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
962 NewTime.tv_sec - Time.tv_sec > 6)
963 {
964 double Delta = NewTime.tv_sec - Time.tv_sec +
965 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
966
967 // Compute the CPS value
968 if (Delta < 0.01)
969 CurrentCPS = 0;
970 else
971 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
972 LastBytes = CurrentBytes - ResumeSize;
973 ElapsedTime = (unsigned long long)Delta;
974 Time = NewTime;
975 }
976
977 // calculate the percentage, if we have too little data assume 1%
978 if (TotalBytes > 0 && UnfetchedReleaseFiles)
979 Percent = 0;
980 else
981 // use both files and bytes because bytes can be unreliable
982 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
983 0.2 * (CurrentItems/float(TotalItems)*100.0));
984
985 int fd = _config->FindI("APT::Status-Fd",-1);
986 if(fd > 0)
987 {
988 ostringstream status;
989
990 char msg[200];
991 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
992 unsigned long long ETA = 0;
993 if(CurrentCPS > 0)
994 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
995
996 // only show the ETA if it makes sense
997 if (ETA > 0 && ETA < 172800 /* two days */ )
998 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
999 else
1000 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1001
1002 // build the status str
1003 status << "dlstatus:" << i
1004 << ":" << std::setprecision(3) << Percent
1005 << ":" << msg
1006 << endl;
1007
1008 std::string const dlstatus = status.str();
1009 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
1010 }
1011
1012 return true;
1013 }
1014 /*}}}*/
1015 // AcquireStatus::Start - Called when the download is started /*{{{*/
1016 // ---------------------------------------------------------------------
1017 /* We just reset the counters */
1018 void pkgAcquireStatus::Start()
1019 {
1020 gettimeofday(&Time,0);
1021 gettimeofday(&StartTime,0);
1022 LastBytes = 0;
1023 CurrentCPS = 0;
1024 CurrentBytes = 0;
1025 TotalBytes = 0;
1026 FetchedBytes = 0;
1027 ElapsedTime = 0;
1028 TotalItems = 0;
1029 CurrentItems = 0;
1030 }
1031 /*}}}*/
1032 // AcquireStatus::Stop - Finished downloading /*{{{*/
1033 // ---------------------------------------------------------------------
1034 /* This accurately computes the elapsed time and the total overall CPS. */
1035 void pkgAcquireStatus::Stop()
1036 {
1037 // Compute the CPS and elapsed time
1038 struct timeval NewTime;
1039 gettimeofday(&NewTime,0);
1040
1041 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1042 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
1043
1044 // Compute the CPS value
1045 if (Delta < 0.01)
1046 CurrentCPS = 0;
1047 else
1048 CurrentCPS = FetchedBytes/Delta;
1049 LastBytes = CurrentBytes;
1050 ElapsedTime = (unsigned long long)Delta;
1051 }
1052 /*}}}*/
1053 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1054 // ---------------------------------------------------------------------
1055 /* This is used to get accurate final transfer rate reporting. */
1056 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
1057 {
1058 FetchedBytes += Size - Resume;
1059 }
1060 /*}}}*/