]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Rework TransactionID stuff
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <iomanip>
35
36 #include <dirent.h>
37 #include <sys/time.h>
38 #include <sys/select.h>
39 #include <errno.h>
40 #include <sys/stat.h>
41
42 #include <apti18n.h>
43 /*}}}*/
44
45 using namespace std;
46
47 // Acquire::pkgAcquire - Constructor /*{{{*/
48 // ---------------------------------------------------------------------
49 /* We grab some runtime state from the configuration space */
50 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
51 Debug(_config->FindB("Debug::pkgAcquire",false)),
52 Running(false)
53 {
54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
58 QueueMode = QueueAccess;
59 }
60 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
63 Running(false)
64 {
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71 }
72 /*}}}*/
73 // Acquire::Setup - Delayed Constructor /*{{{*/
74 // ---------------------------------------------------------------------
75 /* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
77 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
78 {
79 Log = Progress;
80
81 // check for existence and possibly create auxiliary directories
82 string const listDir = _config->FindDir("Dir::State::lists");
83 string const partialListDir = listDir + "partial/";
84 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
85 string const partialArchivesDir = archivesDir + "partial/";
86
87 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
88 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
89 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
90
91 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
92 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
93 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
94
95 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
96 return true;
97
98 // Lock the directory this acquire object will work in
99 LockFD = GetLock(flCombine(Lock, "lock"));
100 if (LockFD == -1)
101 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
102
103 return true;
104 }
105 /*}}}*/
106 // Acquire::~pkgAcquire - Destructor /*{{{*/
107 // ---------------------------------------------------------------------
108 /* Free our memory, clean up the queues (destroy the workers) */
109 pkgAcquire::~pkgAcquire()
110 {
111 Shutdown();
112
113 if (LockFD != -1)
114 close(LockFD);
115
116 while (Configs != 0)
117 {
118 MethodConfig *Jnk = Configs;
119 Configs = Configs->Next;
120 delete Jnk;
121 }
122 }
123 /*}}}*/
124 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
125 // ---------------------------------------------------------------------
126 /* */
127 void pkgAcquire::Shutdown()
128 {
129 while (Items.empty() == false)
130 {
131 if (Items[0]->Status == Item::StatFetching)
132 Items[0]->Status = Item::StatError;
133 delete Items[0];
134 }
135
136 while (Queues != 0)
137 {
138 Queue *Jnk = Queues;
139 Queues = Queues->Next;
140 delete Jnk;
141 }
142 }
143 /*}}}*/
144 // Acquire::Add - Add a new item /*{{{*/
145 // ---------------------------------------------------------------------
146 /* This puts an item on the acquire list. This list is mainly for tracking
147 item status */
148 void pkgAcquire::Add(Item *Itm)
149 {
150 Items.push_back(Itm);
151 }
152 /*}}}*/
153 // Acquire::Remove - Remove a item /*{{{*/
154 // ---------------------------------------------------------------------
155 /* Remove an item from the acquire list. This is usually not used.. */
156 void pkgAcquire::Remove(Item *Itm)
157 {
158 Dequeue(Itm);
159
160 for (ItemIterator I = Items.begin(); I != Items.end();)
161 {
162 if (*I == Itm)
163 {
164 Items.erase(I);
165 I = Items.begin();
166 }
167 else
168 ++I;
169 }
170 }
171 /*}}}*/
172 // Acquire::AbortTransaction - Remove a transaction /*{{{*/
173 void pkgAcquire::AbortTransaction(unsigned long TransactionID)
174 {
175 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
176 std::clog << "AbortTransaction: " << TransactionID << std::endl;
177
178 std::vector<Item*> Transaction;
179 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
180 if((*I)->TransactionID == TransactionID)
181 Transaction.push_back(*I);
182
183 for (std::vector<Item*>::iterator I = Transaction.begin();
184 I != Transaction.end(); ++I)
185 {
186 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
187 std::clog << " Cancel: " << (*I)->DestFile << std::endl;
188 //Dequeue(*I);
189 (*I)->Status = pkgAcquire::Item::StatError;
190 }
191 }
192 /*}}}*/
193 bool pkgAcquire::TransactionHasError(unsigned long TransactionID)
194 {
195 std::vector<Item*> Transaction;
196 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
197 if((*I)->TransactionID == TransactionID)
198 if((*I)->Status == pkgAcquire::Item::StatError ||
199 (*I)->Status == pkgAcquire::Item::StatAuthError)
200 return true;
201
202 return false;
203 }
204 // Acquire::CommitTransaction - Commit a transaction /*{{{*/
205 void pkgAcquire::CommitTransaction(unsigned long TransactionID)
206 {
207 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
208 std::clog << "CommitTransaction: " << TransactionID << std::endl;
209
210 std::vector<Item*> Transaction;
211 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
212 if((*I)->TransactionID == TransactionID)
213 Transaction.push_back(*I);
214
215 // move new files into place *and* remove files that are not
216 // part of the transaction but are still on disk
217 for (std::vector<Item*>::iterator I = Transaction.begin();
218 I != Transaction.end(); ++I)
219 {
220 if((*I)->PartialFile != "")
221 {
222 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
223 std::clog << "mv "
224 << (*I)->PartialFile << " -> "
225 << (*I)->DestFile << std::endl;
226 Rename((*I)->PartialFile, (*I)->DestFile);
227 chmod((*I)->DestFile.c_str(),0644);
228 } else {
229 if(_config->FindB("Debug::Acquire::Transaction", false) == true)
230 std::clog << "rm "
231 << (*I)->DestFile << std::endl;
232 unlink((*I)->DestFile.c_str());
233 }
234 // mark that this transaction is finished
235 (*I)->TransactionID = 0;
236 }
237 }
238 /*}}}*/
239
240 // Acquire::Add - Add a worker /*{{{*/
241 // ---------------------------------------------------------------------
242 /* A list of workers is kept so that the select loop can direct their FD
243 usage. */
244 void pkgAcquire::Add(Worker *Work)
245 {
246 Work->NextAcquire = Workers;
247 Workers = Work;
248 }
249 /*}}}*/
250 // Acquire::Remove - Remove a worker /*{{{*/
251 // ---------------------------------------------------------------------
252 /* A worker has died. This can not be done while the select loop is running
253 as it would require that RunFds could handling a changing list state and
254 it can't.. */
255 void pkgAcquire::Remove(Worker *Work)
256 {
257 if (Running == true)
258 abort();
259
260 Worker **I = &Workers;
261 for (; *I != 0;)
262 {
263 if (*I == Work)
264 *I = (*I)->NextAcquire;
265 else
266 I = &(*I)->NextAcquire;
267 }
268 }
269 /*}}}*/
270 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
271 // ---------------------------------------------------------------------
272 /* This is the entry point for an item. An item calls this function when
273 it is constructed which creates a queue (based on the current queue
274 mode) and puts the item in that queue. If the system is running then
275 the queue might be started. */
276 void pkgAcquire::Enqueue(ItemDesc &Item)
277 {
278 // Determine which queue to put the item in
279 const MethodConfig *Config;
280 string Name = QueueName(Item.URI,Config);
281 if (Name.empty() == true)
282 return;
283
284 // Find the queue structure
285 Queue *I = Queues;
286 for (; I != 0 && I->Name != Name; I = I->Next);
287 if (I == 0)
288 {
289 I = new Queue(Name,this);
290 I->Next = Queues;
291 Queues = I;
292
293 if (Running == true)
294 I->Startup();
295 }
296
297 // See if this is a local only URI
298 if (Config->LocalOnly == true && Item.Owner->Complete == false)
299 Item.Owner->Local = true;
300 Item.Owner->Status = Item::StatIdle;
301
302 // Queue it into the named queue
303 if(I->Enqueue(Item))
304 ToFetch++;
305
306 // Some trace stuff
307 if (Debug == true)
308 {
309 clog << "Fetching " << Item.URI << endl;
310 clog << " to " << Item.Owner->DestFile << endl;
311 clog << " Queue is: " << Name << endl;
312 }
313 }
314 /*}}}*/
315 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
316 // ---------------------------------------------------------------------
317 /* This is called when an item is finished being fetched. It removes it
318 from all the queues */
319 void pkgAcquire::Dequeue(Item *Itm)
320 {
321 Queue *I = Queues;
322 bool Res = false;
323 if (Debug == true)
324 clog << "Dequeuing " << Itm->DestFile << endl;
325
326 for (; I != 0; I = I->Next)
327 {
328 if (I->Dequeue(Itm))
329 {
330 Res = true;
331 if (Debug == true)
332 clog << "Dequeued from " << I->Name << endl;
333 }
334 }
335
336 if (Res == true)
337 ToFetch--;
338 }
339 /*}}}*/
340 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
341 // ---------------------------------------------------------------------
342 /* The string returned depends on the configuration settings and the
343 method parameters. Given something like http://foo.org/bar it can
344 return http://foo.org or http */
345 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
346 {
347 URI U(Uri);
348
349 Config = GetConfig(U.Access);
350 if (Config == 0)
351 return string();
352
353 /* Single-Instance methods get exactly one queue per URI. This is
354 also used for the Access queue method */
355 if (Config->SingleInstance == true || QueueMode == QueueAccess)
356 return U.Access;
357
358 string AccessSchema = U.Access + ':',
359 FullQueueName = AccessSchema + U.Host;
360 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
361
362 Queue *I = Queues;
363 for (; I != 0; I = I->Next) {
364 // if the queue already exists, re-use it
365 if (I->Name == FullQueueName)
366 return FullQueueName;
367
368 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
369 Instances++;
370 }
371
372 if (Debug) {
373 clog << "Found " << Instances << " instances of " << U.Access << endl;
374 }
375
376 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
377 return U.Access;
378
379 return FullQueueName;
380 }
381 /*}}}*/
382 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
383 // ---------------------------------------------------------------------
384 /* This locates the configuration structure for an access method. If
385 a config structure cannot be found a Worker will be created to
386 retrieve it */
387 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
388 {
389 // Search for an existing config
390 MethodConfig *Conf;
391 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
392 if (Conf->Access == Access)
393 return Conf;
394
395 // Create the new config class
396 Conf = new MethodConfig;
397 Conf->Access = Access;
398 Conf->Next = Configs;
399 Configs = Conf;
400
401 // Create the worker to fetch the configuration
402 Worker Work(Conf);
403 if (Work.Start() == false)
404 return 0;
405
406 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
407 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
408 Conf->SingleInstance = true;
409
410 return Conf;
411 }
412 /*}}}*/
413 // Acquire::SetFds - Deal with readable FDs /*{{{*/
414 // ---------------------------------------------------------------------
415 /* Collect FDs that have activity monitors into the fd sets */
416 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
417 {
418 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
419 {
420 if (I->InReady == true && I->InFd >= 0)
421 {
422 if (Fd < I->InFd)
423 Fd = I->InFd;
424 FD_SET(I->InFd,RSet);
425 }
426 if (I->OutReady == true && I->OutFd >= 0)
427 {
428 if (Fd < I->OutFd)
429 Fd = I->OutFd;
430 FD_SET(I->OutFd,WSet);
431 }
432 }
433 }
434 /*}}}*/
435 // Acquire::RunFds - Deal with active FDs /*{{{*/
436 // ---------------------------------------------------------------------
437 /* Dispatch active FDs over to the proper workers. It is very important
438 that a worker never be erased while this is running! The queue class
439 should never erase a worker except during shutdown processing. */
440 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
441 {
442 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
443 {
444 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
445 I->InFdReady();
446 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
447 I->OutFdReady();
448 }
449 }
450 /*}}}*/
451 // Acquire::Run - Run the fetch sequence /*{{{*/
452 // ---------------------------------------------------------------------
453 /* This runs the queues. It manages a select loop for all of the
454 Worker tasks. The workers interact with the queues and items to
455 manage the actual fetch. */
456 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
457 {
458 Running = true;
459
460 for (Queue *I = Queues; I != 0; I = I->Next)
461 I->Startup();
462
463 if (Log != 0)
464 Log->Start();
465
466 bool WasCancelled = false;
467
468 // Run till all things have been acquired
469 struct timeval tv;
470 tv.tv_sec = 0;
471 tv.tv_usec = PulseIntervall;
472 while (ToFetch > 0)
473 {
474 fd_set RFds;
475 fd_set WFds;
476 int Highest = 0;
477 FD_ZERO(&RFds);
478 FD_ZERO(&WFds);
479 SetFds(Highest,&RFds,&WFds);
480
481 int Res;
482 do
483 {
484 Res = select(Highest+1,&RFds,&WFds,0,&tv);
485 }
486 while (Res < 0 && errno == EINTR);
487
488 if (Res < 0)
489 {
490 _error->Errno("select","Select has failed");
491 break;
492 }
493
494 RunFds(&RFds,&WFds);
495 if (_error->PendingError() == true)
496 break;
497
498 // Timeout, notify the log class
499 if (Res == 0 || (Log != 0 && Log->Update == true))
500 {
501 tv.tv_usec = PulseIntervall;
502 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
503 I->Pulse();
504 if (Log != 0 && Log->Pulse(this) == false)
505 {
506 WasCancelled = true;
507 break;
508 }
509 }
510 }
511
512 if (Log != 0)
513 Log->Stop();
514
515 // Shut down the acquire bits
516 Running = false;
517 for (Queue *I = Queues; I != 0; I = I->Next)
518 I->Shutdown(false);
519
520 // Shut down the items
521 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
522 (*I)->Finished();
523
524 if (_error->PendingError())
525 return Failed;
526 if (WasCancelled)
527 return Cancelled;
528 return Continue;
529 }
530 /*}}}*/
531 // Acquire::Bump - Called when an item is dequeued /*{{{*/
532 // ---------------------------------------------------------------------
533 /* This routine bumps idle queues in hopes that they will be able to fetch
534 the dequeued item */
535 void pkgAcquire::Bump()
536 {
537 for (Queue *I = Queues; I != 0; I = I->Next)
538 I->Bump();
539 }
540 /*}}}*/
541 // Acquire::WorkerStep - Step to the next worker /*{{{*/
542 // ---------------------------------------------------------------------
543 /* Not inlined to advoid including acquire-worker.h */
544 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
545 {
546 return I->NextAcquire;
547 }
548 /*}}}*/
549 // Acquire::Clean - Cleans a directory /*{{{*/
550 // ---------------------------------------------------------------------
551 /* This is a bit simplistic, it looks at every file in the dir and sees
552 if it is part of the download set. */
553 bool pkgAcquire::Clean(string Dir)
554 {
555 // non-existing directories are by definition clean…
556 if (DirectoryExists(Dir) == false)
557 return true;
558
559 if(Dir == "/")
560 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
561
562 DIR *D = opendir(Dir.c_str());
563 if (D == 0)
564 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
565
566 string StartDir = SafeGetCWD();
567 if (chdir(Dir.c_str()) != 0)
568 {
569 closedir(D);
570 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
571 }
572
573 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
574 {
575 // Skip some files..
576 if (strcmp(Dir->d_name,"lock") == 0 ||
577 strcmp(Dir->d_name,"partial") == 0 ||
578 strcmp(Dir->d_name,".") == 0 ||
579 strcmp(Dir->d_name,"..") == 0)
580 continue;
581
582 // Look in the get list
583 ItemCIterator I = Items.begin();
584 for (; I != Items.end(); ++I)
585 if (flNotDir((*I)->DestFile) == Dir->d_name)
586 break;
587
588 // Nothing found, nuke it
589 if (I == Items.end())
590 unlink(Dir->d_name);
591 };
592
593 closedir(D);
594 if (chdir(StartDir.c_str()) != 0)
595 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
596 return true;
597 }
598 /*}}}*/
599 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
600 // ---------------------------------------------------------------------
601 /* This is the total number of bytes needed */
602 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
603 {
604 unsigned long long Total = 0;
605 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
606 Total += (*I)->FileSize;
607 return Total;
608 }
609 /*}}}*/
610 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
611 // ---------------------------------------------------------------------
612 /* This is the number of bytes that is not local */
613 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
614 {
615 unsigned long long Total = 0;
616 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
617 if ((*I)->Local == false)
618 Total += (*I)->FileSize;
619 return Total;
620 }
621 /*}}}*/
622 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
623 // ---------------------------------------------------------------------
624 /* This is the number of bytes that is not local */
625 APT_PURE unsigned long long pkgAcquire::PartialPresent()
626 {
627 unsigned long long Total = 0;
628 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
629 if ((*I)->Local == false)
630 Total += (*I)->PartialSize;
631 return Total;
632 }
633 /*}}}*/
634 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
635 // ---------------------------------------------------------------------
636 /* */
637 pkgAcquire::UriIterator pkgAcquire::UriBegin()
638 {
639 return UriIterator(Queues);
640 }
641 /*}}}*/
642 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
643 // ---------------------------------------------------------------------
644 /* */
645 pkgAcquire::UriIterator pkgAcquire::UriEnd()
646 {
647 return UriIterator(0);
648 }
649 /*}}}*/
650 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
651 // ---------------------------------------------------------------------
652 /* */
653 pkgAcquire::MethodConfig::MethodConfig()
654 {
655 SingleInstance = false;
656 Pipeline = false;
657 SendConfig = false;
658 LocalOnly = false;
659 Removable = false;
660 Next = 0;
661 }
662 /*}}}*/
663 // Queue::Queue - Constructor /*{{{*/
664 // ---------------------------------------------------------------------
665 /* */
666 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
667 Owner(Owner)
668 {
669 Items = 0;
670 Next = 0;
671 Workers = 0;
672 MaxPipeDepth = 1;
673 PipeDepth = 0;
674 }
675 /*}}}*/
676 // Queue::~Queue - Destructor /*{{{*/
677 // ---------------------------------------------------------------------
678 /* */
679 pkgAcquire::Queue::~Queue()
680 {
681 Shutdown(true);
682
683 while (Items != 0)
684 {
685 QItem *Jnk = Items;
686 Items = Items->Next;
687 delete Jnk;
688 }
689 }
690 /*}}}*/
691 // Queue::Enqueue - Queue an item to the queue /*{{{*/
692 // ---------------------------------------------------------------------
693 /* */
694 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
695 {
696 QItem **I = &Items;
697 // move to the end of the queue and check for duplicates here
698 for (; *I != 0; I = &(*I)->Next)
699 if (Item.URI == (*I)->URI)
700 {
701 Item.Owner->Status = Item::StatDone;
702 return false;
703 }
704
705 // Create a new item
706 QItem *Itm = new QItem;
707 *Itm = Item;
708 Itm->Next = 0;
709 *I = Itm;
710
711 Item.Owner->QueueCounter++;
712 if (Items->Next == 0)
713 Cycle();
714 return true;
715 }
716 /*}}}*/
717 // Queue::Dequeue - Remove an item from the queue /*{{{*/
718 // ---------------------------------------------------------------------
719 /* We return true if we hit something */
720 bool pkgAcquire::Queue::Dequeue(Item *Owner)
721 {
722 if (Owner->Status == pkgAcquire::Item::StatFetching)
723 return _error->Error("Tried to dequeue a fetching object");
724
725 bool Res = false;
726
727 QItem **I = &Items;
728 for (; *I != 0;)
729 {
730 if ((*I)->Owner == Owner)
731 {
732 QItem *Jnk= *I;
733 *I = (*I)->Next;
734 Owner->QueueCounter--;
735 delete Jnk;
736 Res = true;
737 }
738 else
739 I = &(*I)->Next;
740 }
741
742 return Res;
743 }
744 /*}}}*/
745 // Queue::Startup - Start the worker processes /*{{{*/
746 // ---------------------------------------------------------------------
747 /* It is possible for this to be called with a pre-existing set of
748 workers. */
749 bool pkgAcquire::Queue::Startup()
750 {
751 if (Workers == 0)
752 {
753 URI U(Name);
754 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
755 if (Cnf == 0)
756 return false;
757
758 Workers = new Worker(this,Cnf,Owner->Log);
759 Owner->Add(Workers);
760 if (Workers->Start() == false)
761 return false;
762
763 /* When pipelining we commit 10 items. This needs to change when we
764 added other source retry to have cycle maintain a pipeline depth
765 on its own. */
766 if (Cnf->Pipeline == true)
767 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
768 else
769 MaxPipeDepth = 1;
770 }
771
772 return Cycle();
773 }
774 /*}}}*/
775 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
776 // ---------------------------------------------------------------------
777 /* If final is true then all workers are eliminated, otherwise only workers
778 that do not need cleanup are removed */
779 bool pkgAcquire::Queue::Shutdown(bool Final)
780 {
781 // Delete all of the workers
782 pkgAcquire::Worker **Cur = &Workers;
783 while (*Cur != 0)
784 {
785 pkgAcquire::Worker *Jnk = *Cur;
786 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
787 {
788 *Cur = Jnk->NextQueue;
789 Owner->Remove(Jnk);
790 delete Jnk;
791 }
792 else
793 Cur = &(*Cur)->NextQueue;
794 }
795
796 return true;
797 }
798 /*}}}*/
799 // Queue::FindItem - Find a URI in the item list /*{{{*/
800 // ---------------------------------------------------------------------
801 /* */
802 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
803 {
804 for (QItem *I = Items; I != 0; I = I->Next)
805 if (I->URI == URI && I->Worker == Owner)
806 return I;
807 return 0;
808 }
809 /*}}}*/
810 // Queue::ItemDone - Item has been completed /*{{{*/
811 // ---------------------------------------------------------------------
812 /* The worker signals this which causes the item to be removed from the
813 queue. If this is the last queue instance then it is removed from the
814 main queue too.*/
815 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
816 {
817 PipeDepth--;
818 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
819 Itm->Owner->Status = pkgAcquire::Item::StatDone;
820
821 if (Itm->Owner->QueueCounter <= 1)
822 Owner->Dequeue(Itm->Owner);
823 else
824 {
825 Dequeue(Itm->Owner);
826 Owner->Bump();
827 }
828
829 return Cycle();
830 }
831 /*}}}*/
832 // Queue::Cycle - Queue new items into the method /*{{{*/
833 // ---------------------------------------------------------------------
834 /* This locates a new idle item and sends it to the worker. If pipelining
835 is enabled then it keeps the pipe full. */
836 bool pkgAcquire::Queue::Cycle()
837 {
838 if (Items == 0 || Workers == 0)
839 return true;
840
841 if (PipeDepth < 0)
842 return _error->Error("Pipedepth failure");
843
844 // Look for a queable item
845 QItem *I = Items;
846 while (PipeDepth < (signed)MaxPipeDepth)
847 {
848 for (; I != 0; I = I->Next)
849 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
850 break;
851
852 // Nothing to do, queue is idle.
853 if (I == 0)
854 return true;
855
856 I->Worker = Workers;
857 I->Owner->Status = pkgAcquire::Item::StatFetching;
858 PipeDepth++;
859 if (Workers->QueueItem(I) == false)
860 return false;
861 }
862
863 return true;
864 }
865 /*}}}*/
866 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
867 // ---------------------------------------------------------------------
868 /* This is called when an item in multiple queues is dequeued */
869 void pkgAcquire::Queue::Bump()
870 {
871 Cycle();
872 }
873 /*}}}*/
874 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
875 // ---------------------------------------------------------------------
876 /* */
877 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
878 {
879 Start();
880 }
881 /*}}}*/
882 // AcquireStatus::Pulse - Called periodically /*{{{*/
883 // ---------------------------------------------------------------------
884 /* This computes some internal state variables for the derived classes to
885 use. It generates the current downloaded bytes and total bytes to download
886 as well as the current CPS estimate. */
887 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
888 {
889 TotalBytes = 0;
890 CurrentBytes = 0;
891 TotalItems = 0;
892 CurrentItems = 0;
893
894 // Compute the total number of bytes to fetch
895 unsigned int Unknown = 0;
896 unsigned int Count = 0;
897 bool UnfetchedReleaseFiles = false;
898 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
899 I != Owner->ItemsEnd();
900 ++I, ++Count)
901 {
902 TotalItems++;
903 if ((*I)->Status == pkgAcquire::Item::StatDone)
904 ++CurrentItems;
905
906 // Totally ignore local items
907 if ((*I)->Local == true)
908 continue;
909
910 // see if the method tells us to expect more
911 TotalItems += (*I)->ExpectedAdditionalItems;
912
913 // check if there are unfetched Release files
914 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
915 UnfetchedReleaseFiles = true;
916
917 TotalBytes += (*I)->FileSize;
918 if ((*I)->Complete == true)
919 CurrentBytes += (*I)->FileSize;
920 if ((*I)->FileSize == 0 && (*I)->Complete == false)
921 ++Unknown;
922 }
923
924 // Compute the current completion
925 unsigned long long ResumeSize = 0;
926 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
927 I = Owner->WorkerStep(I))
928 {
929 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
930 {
931 CurrentBytes += I->CurrentSize;
932 ResumeSize += I->ResumePoint;
933
934 // Files with unknown size always have 100% completion
935 if (I->CurrentItem->Owner->FileSize == 0 &&
936 I->CurrentItem->Owner->Complete == false)
937 TotalBytes += I->CurrentSize;
938 }
939 }
940
941 // Normalize the figures and account for unknown size downloads
942 if (TotalBytes <= 0)
943 TotalBytes = 1;
944 if (Unknown == Count)
945 TotalBytes = Unknown;
946
947 // Wha?! Is not supposed to happen.
948 if (CurrentBytes > TotalBytes)
949 CurrentBytes = TotalBytes;
950
951 // debug
952 if (_config->FindB("Debug::acquire::progress", false) == true)
953 std::clog << " Bytes: "
954 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
955 << std::endl;
956
957 // Compute the CPS
958 struct timeval NewTime;
959 gettimeofday(&NewTime,0);
960 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
961 NewTime.tv_sec - Time.tv_sec > 6)
962 {
963 double Delta = NewTime.tv_sec - Time.tv_sec +
964 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
965
966 // Compute the CPS value
967 if (Delta < 0.01)
968 CurrentCPS = 0;
969 else
970 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
971 LastBytes = CurrentBytes - ResumeSize;
972 ElapsedTime = (unsigned long long)Delta;
973 Time = NewTime;
974 }
975
976 // calculate the percentage, if we have too little data assume 1%
977 if (TotalBytes > 0 && UnfetchedReleaseFiles)
978 Percent = 0;
979 else
980 // use both files and bytes because bytes can be unreliable
981 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
982 0.2 * (CurrentItems/float(TotalItems)*100.0));
983
984 int fd = _config->FindI("APT::Status-Fd",-1);
985 if(fd > 0)
986 {
987 ostringstream status;
988
989 char msg[200];
990 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
991 unsigned long long ETA = 0;
992 if(CurrentCPS > 0)
993 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
994
995 // only show the ETA if it makes sense
996 if (ETA > 0 && ETA < 172800 /* two days */ )
997 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
998 else
999 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1000
1001 // build the status str
1002 status << "dlstatus:" << i
1003 << ":" << std::setprecision(3) << Percent
1004 << ":" << msg
1005 << endl;
1006
1007 std::string const dlstatus = status.str();
1008 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
1009 }
1010
1011 return true;
1012 }
1013 /*}}}*/
1014 // AcquireStatus::Start - Called when the download is started /*{{{*/
1015 // ---------------------------------------------------------------------
1016 /* We just reset the counters */
1017 void pkgAcquireStatus::Start()
1018 {
1019 gettimeofday(&Time,0);
1020 gettimeofday(&StartTime,0);
1021 LastBytes = 0;
1022 CurrentCPS = 0;
1023 CurrentBytes = 0;
1024 TotalBytes = 0;
1025 FetchedBytes = 0;
1026 ElapsedTime = 0;
1027 TotalItems = 0;
1028 CurrentItems = 0;
1029 }
1030 /*}}}*/
1031 // AcquireStatus::Stop - Finished downloading /*{{{*/
1032 // ---------------------------------------------------------------------
1033 /* This accurately computes the elapsed time and the total overall CPS. */
1034 void pkgAcquireStatus::Stop()
1035 {
1036 // Compute the CPS and elapsed time
1037 struct timeval NewTime;
1038 gettimeofday(&NewTime,0);
1039
1040 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1041 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
1042
1043 // Compute the CPS value
1044 if (Delta < 0.01)
1045 CurrentCPS = 0;
1046 else
1047 CurrentCPS = FetchedBytes/Delta;
1048 LastBytes = CurrentBytes;
1049 ElapsedTime = (unsigned long long)Delta;
1050 }
1051 /*}}}*/
1052 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1053 // ---------------------------------------------------------------------
1054 /* This is used to get accurate final transfer rate reporting. */
1055 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
1056 {
1057 FetchedBytes += Size - Resume;
1058 }
1059 /*}}}*/