]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
Update symbols file
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <iomanip>
31
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <pwd.h>
37 #include <grp.h>
38 #include <dirent.h>
39 #include <sys/time.h>
40 #include <sys/select.h>
41 #include <errno.h>
42 #include <sys/stat.h>
43 #include <sys/types.h>
44
45 #include <apti18n.h>
46 /*}}}*/
47
48 using namespace std;
49
50 // Acquire::pkgAcquire - Constructor /*{{{*/
51 // ---------------------------------------------------------------------
52 /* We grab some runtime state from the configuration space */
53 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
55 Running(false)
56 {
57 Initialize();
58 }
59 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
60 Configs(0), Log(NULL), ToFetch(0),
61 Debug(_config->FindB("Debug::pkgAcquire",false)),
62 Running(false)
63 {
64 Initialize();
65 SetLog(Progress);
66 }
67 void pkgAcquire::Initialize()
68 {
69 string const Mode = _config->Find("Acquire::Queue-Mode","host");
70 if (strcasecmp(Mode.c_str(),"host") == 0)
71 QueueMode = QueueHost;
72 if (strcasecmp(Mode.c_str(),"access") == 0)
73 QueueMode = QueueAccess;
74
75 // chown the auth.conf file as it will be accessed by our methods
76 std::string const SandboxUser = _config->Find("APT::Sandbox::User");
77 if (getuid() == 0 && SandboxUser.empty() == false) // if we aren't root, we can't chown, so don't try it
78 {
79 struct passwd const * const pw = getpwnam(SandboxUser.c_str());
80 struct group const * const gr = getgrnam("root");
81 if (pw != NULL && gr != NULL)
82 {
83 std::string const AuthConf = _config->FindFile("Dir::Etc::netrc");
84 if(AuthConf.empty() == false && RealFileExists(AuthConf) &&
85 chown(AuthConf.c_str(), pw->pw_uid, gr->gr_gid) != 0)
86 _error->WarningE("SetupAPTPartialDirectory", "chown to %s:root of file %s failed", SandboxUser.c_str(), AuthConf.c_str());
87 }
88 }
89 }
90 /*}}}*/
91 // Acquire::GetLock - lock directory and prepare for action /*{{{*/
92 static bool SetupAPTPartialDirectory(std::string const &grand, std::string const &parent)
93 {
94 std::string const partial = parent + "partial";
95 if (CreateAPTDirectoryIfNeeded(grand, partial) == false &&
96 CreateAPTDirectoryIfNeeded(parent, partial) == false)
97 return false;
98
99 std::string const SandboxUser = _config->Find("APT::Sandbox::User");
100 if (getuid() == 0 && SandboxUser.empty() == false) // if we aren't root, we can't chown, so don't try it
101 {
102 struct passwd const * const pw = getpwnam(SandboxUser.c_str());
103 struct group const * const gr = getgrnam("root");
104 if (pw != NULL && gr != NULL)
105 {
106 // chown the partial dir
107 if(chown(partial.c_str(), pw->pw_uid, gr->gr_gid) != 0)
108 _error->WarningE("SetupAPTPartialDirectory", "chown to %s:root of directory %s failed", SandboxUser.c_str(), partial.c_str());
109 }
110 }
111 if (chmod(partial.c_str(), 0700) != 0)
112 _error->WarningE("SetupAPTPartialDirectory", "chmod 0700 of directory %s failed", partial.c_str());
113
114 return true;
115 }
116 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
117 {
118 Log = Progress;
119 if (Lock.empty())
120 {
121 string const listDir = _config->FindDir("Dir::State::lists");
122 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
123 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
124 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
125 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
126 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
127 return true;
128 }
129 return GetLock(Lock);
130 }
131 bool pkgAcquire::GetLock(std::string const &Lock)
132 {
133 if (Lock.empty() == true)
134 return false;
135
136 // check for existence and possibly create auxiliary directories
137 string const listDir = _config->FindDir("Dir::State::lists");
138 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
139
140 if (Lock == listDir)
141 {
142 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
143 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
144 }
145 if (Lock == archivesDir)
146 {
147 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
148 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
149 }
150
151 if (_config->FindB("Debug::NoLocking", false) == true)
152 return true;
153
154 // Lock the directory this acquire object will work in
155 LockFD = ::GetLock(flCombine(Lock, "lock"));
156 if (LockFD == -1)
157 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
158
159 return true;
160 }
161 /*}}}*/
162 // Acquire::~pkgAcquire - Destructor /*{{{*/
163 // ---------------------------------------------------------------------
164 /* Free our memory, clean up the queues (destroy the workers) */
165 pkgAcquire::~pkgAcquire()
166 {
167 Shutdown();
168
169 if (LockFD != -1)
170 close(LockFD);
171
172 while (Configs != 0)
173 {
174 MethodConfig *Jnk = Configs;
175 Configs = Configs->Next;
176 delete Jnk;
177 }
178 }
179 /*}}}*/
180 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
181 // ---------------------------------------------------------------------
182 /* */
183 void pkgAcquire::Shutdown()
184 {
185 while (Items.empty() == false)
186 {
187 if (Items[0]->Status == Item::StatFetching)
188 Items[0]->Status = Item::StatError;
189 delete Items[0];
190 }
191
192 while (Queues != 0)
193 {
194 Queue *Jnk = Queues;
195 Queues = Queues->Next;
196 delete Jnk;
197 }
198 }
199 /*}}}*/
200 // Acquire::Add - Add a new item /*{{{*/
201 // ---------------------------------------------------------------------
202 /* This puts an item on the acquire list. This list is mainly for tracking
203 item status */
204 void pkgAcquire::Add(Item *Itm)
205 {
206 Items.push_back(Itm);
207 }
208 /*}}}*/
209 // Acquire::Remove - Remove a item /*{{{*/
210 // ---------------------------------------------------------------------
211 /* Remove an item from the acquire list. This is usually not used.. */
212 void pkgAcquire::Remove(Item *Itm)
213 {
214 Dequeue(Itm);
215
216 for (ItemIterator I = Items.begin(); I != Items.end();)
217 {
218 if (*I == Itm)
219 {
220 Items.erase(I);
221 I = Items.begin();
222 }
223 else
224 ++I;
225 }
226 }
227 /*}}}*/
228 // Acquire::Add - Add a worker /*{{{*/
229 // ---------------------------------------------------------------------
230 /* A list of workers is kept so that the select loop can direct their FD
231 usage. */
232 void pkgAcquire::Add(Worker *Work)
233 {
234 Work->NextAcquire = Workers;
235 Workers = Work;
236 }
237 /*}}}*/
238 // Acquire::Remove - Remove a worker /*{{{*/
239 // ---------------------------------------------------------------------
240 /* A worker has died. This can not be done while the select loop is running
241 as it would require that RunFds could handling a changing list state and
242 it can't.. */
243 void pkgAcquire::Remove(Worker *Work)
244 {
245 if (Running == true)
246 abort();
247
248 Worker **I = &Workers;
249 for (; *I != 0;)
250 {
251 if (*I == Work)
252 *I = (*I)->NextAcquire;
253 else
254 I = &(*I)->NextAcquire;
255 }
256 }
257 /*}}}*/
258 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
259 // ---------------------------------------------------------------------
260 /* This is the entry point for an item. An item calls this function when
261 it is constructed which creates a queue (based on the current queue
262 mode) and puts the item in that queue. If the system is running then
263 the queue might be started. */
264 void pkgAcquire::Enqueue(ItemDesc &Item)
265 {
266 // Determine which queue to put the item in
267 const MethodConfig *Config;
268 string Name = QueueName(Item.URI,Config);
269 if (Name.empty() == true)
270 return;
271
272 // Find the queue structure
273 Queue *I = Queues;
274 for (; I != 0 && I->Name != Name; I = I->Next);
275 if (I == 0)
276 {
277 I = new Queue(Name,this);
278 I->Next = Queues;
279 Queues = I;
280
281 if (Running == true)
282 I->Startup();
283 }
284
285 // See if this is a local only URI
286 if (Config->LocalOnly == true && Item.Owner->Complete == false)
287 Item.Owner->Local = true;
288 Item.Owner->Status = Item::StatIdle;
289
290 // Queue it into the named queue
291 if(I->Enqueue(Item))
292 ToFetch++;
293
294 // Some trace stuff
295 if (Debug == true)
296 {
297 clog << "Fetching " << Item.URI << endl;
298 clog << " to " << Item.Owner->DestFile << endl;
299 clog << " Queue is: " << Name << endl;
300 }
301 }
302 /*}}}*/
303 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
304 // ---------------------------------------------------------------------
305 /* This is called when an item is finished being fetched. It removes it
306 from all the queues */
307 void pkgAcquire::Dequeue(Item *Itm)
308 {
309 Queue *I = Queues;
310 bool Res = false;
311 if (Debug == true)
312 clog << "Dequeuing " << Itm->DestFile << endl;
313
314 for (; I != 0; I = I->Next)
315 {
316 if (I->Dequeue(Itm))
317 {
318 Res = true;
319 if (Debug == true)
320 clog << "Dequeued from " << I->Name << endl;
321 }
322 }
323
324 if (Res == true)
325 ToFetch--;
326 }
327 /*}}}*/
328 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
329 // ---------------------------------------------------------------------
330 /* The string returned depends on the configuration settings and the
331 method parameters. Given something like http://foo.org/bar it can
332 return http://foo.org or http */
333 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
334 {
335 URI U(Uri);
336
337 Config = GetConfig(U.Access);
338 if (Config == 0)
339 return string();
340
341 /* Single-Instance methods get exactly one queue per URI. This is
342 also used for the Access queue method */
343 if (Config->SingleInstance == true || QueueMode == QueueAccess)
344 return U.Access;
345
346 string AccessSchema = U.Access + ':',
347 FullQueueName = AccessSchema + U.Host;
348 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
349
350 Queue *I = Queues;
351 for (; I != 0; I = I->Next) {
352 // if the queue already exists, re-use it
353 if (I->Name == FullQueueName)
354 return FullQueueName;
355
356 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
357 Instances++;
358 }
359
360 if (Debug) {
361 clog << "Found " << Instances << " instances of " << U.Access << endl;
362 }
363
364 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
365 return U.Access;
366
367 return FullQueueName;
368 }
369 /*}}}*/
370 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
371 // ---------------------------------------------------------------------
372 /* This locates the configuration structure for an access method. If
373 a config structure cannot be found a Worker will be created to
374 retrieve it */
375 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
376 {
377 // Search for an existing config
378 MethodConfig *Conf;
379 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
380 if (Conf->Access == Access)
381 return Conf;
382
383 // Create the new config class
384 Conf = new MethodConfig;
385 Conf->Access = Access;
386 Conf->Next = Configs;
387 Configs = Conf;
388
389 // Create the worker to fetch the configuration
390 Worker Work(Conf);
391 if (Work.Start() == false)
392 return 0;
393
394 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
395 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
396 Conf->SingleInstance = true;
397
398 return Conf;
399 }
400 /*}}}*/
401 // Acquire::SetFds - Deal with readable FDs /*{{{*/
402 // ---------------------------------------------------------------------
403 /* Collect FDs that have activity monitors into the fd sets */
404 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
405 {
406 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
407 {
408 if (I->InReady == true && I->InFd >= 0)
409 {
410 if (Fd < I->InFd)
411 Fd = I->InFd;
412 FD_SET(I->InFd,RSet);
413 }
414 if (I->OutReady == true && I->OutFd >= 0)
415 {
416 if (Fd < I->OutFd)
417 Fd = I->OutFd;
418 FD_SET(I->OutFd,WSet);
419 }
420 }
421 }
422 /*}}}*/
423 // Acquire::RunFds - Deal with active FDs /*{{{*/
424 // ---------------------------------------------------------------------
425 /* Dispatch active FDs over to the proper workers. It is very important
426 that a worker never be erased while this is running! The queue class
427 should never erase a worker except during shutdown processing. */
428 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
429 {
430 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
431 {
432 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
433 I->InFdReady();
434 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
435 I->OutFdReady();
436 }
437 }
438 /*}}}*/
439 // Acquire::Run - Run the fetch sequence /*{{{*/
440 // ---------------------------------------------------------------------
441 /* This runs the queues. It manages a select loop for all of the
442 Worker tasks. The workers interact with the queues and items to
443 manage the actual fetch. */
444 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
445 {
446 Running = true;
447
448 for (Queue *I = Queues; I != 0; I = I->Next)
449 I->Startup();
450
451 if (Log != 0)
452 Log->Start();
453
454 bool WasCancelled = false;
455
456 // Run till all things have been acquired
457 struct timeval tv;
458 tv.tv_sec = 0;
459 tv.tv_usec = PulseIntervall;
460 while (ToFetch > 0)
461 {
462 fd_set RFds;
463 fd_set WFds;
464 int Highest = 0;
465 FD_ZERO(&RFds);
466 FD_ZERO(&WFds);
467 SetFds(Highest,&RFds,&WFds);
468
469 int Res;
470 do
471 {
472 Res = select(Highest+1,&RFds,&WFds,0,&tv);
473 }
474 while (Res < 0 && errno == EINTR);
475
476 if (Res < 0)
477 {
478 _error->Errno("select","Select has failed");
479 break;
480 }
481
482 RunFds(&RFds,&WFds);
483 if (_error->PendingError() == true)
484 break;
485
486 // Timeout, notify the log class
487 if (Res == 0 || (Log != 0 && Log->Update == true))
488 {
489 tv.tv_usec = PulseIntervall;
490 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
491 I->Pulse();
492 if (Log != 0 && Log->Pulse(this) == false)
493 {
494 WasCancelled = true;
495 break;
496 }
497 }
498 }
499
500 if (Log != 0)
501 Log->Stop();
502
503 // Shut down the acquire bits
504 Running = false;
505 for (Queue *I = Queues; I != 0; I = I->Next)
506 I->Shutdown(false);
507
508 // Shut down the items
509 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
510 (*I)->Finished();
511
512 if (_error->PendingError())
513 return Failed;
514 if (WasCancelled)
515 return Cancelled;
516 return Continue;
517 }
518 /*}}}*/
519 // Acquire::Bump - Called when an item is dequeued /*{{{*/
520 // ---------------------------------------------------------------------
521 /* This routine bumps idle queues in hopes that they will be able to fetch
522 the dequeued item */
523 void pkgAcquire::Bump()
524 {
525 for (Queue *I = Queues; I != 0; I = I->Next)
526 I->Bump();
527 }
528 /*}}}*/
529 // Acquire::WorkerStep - Step to the next worker /*{{{*/
530 // ---------------------------------------------------------------------
531 /* Not inlined to advoid including acquire-worker.h */
532 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
533 {
534 return I->NextAcquire;
535 }
536 /*}}}*/
537 // Acquire::Clean - Cleans a directory /*{{{*/
538 // ---------------------------------------------------------------------
539 /* This is a bit simplistic, it looks at every file in the dir and sees
540 if it is part of the download set. */
541 bool pkgAcquire::Clean(string Dir)
542 {
543 // non-existing directories are by definition clean…
544 if (DirectoryExists(Dir) == false)
545 return true;
546
547 if(Dir == "/")
548 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
549
550 DIR *D = opendir(Dir.c_str());
551 if (D == 0)
552 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
553
554 string StartDir = SafeGetCWD();
555 if (chdir(Dir.c_str()) != 0)
556 {
557 closedir(D);
558 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
559 }
560
561 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
562 {
563 // Skip some files..
564 if (strcmp(Dir->d_name,"lock") == 0 ||
565 strcmp(Dir->d_name,"partial") == 0 ||
566 strcmp(Dir->d_name,".") == 0 ||
567 strcmp(Dir->d_name,"..") == 0)
568 continue;
569
570 // Look in the get list
571 ItemCIterator I = Items.begin();
572 for (; I != Items.end(); ++I)
573 if (flNotDir((*I)->DestFile) == Dir->d_name)
574 break;
575
576 // Nothing found, nuke it
577 if (I == Items.end())
578 unlink(Dir->d_name);
579 };
580
581 closedir(D);
582 if (chdir(StartDir.c_str()) != 0)
583 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
584 return true;
585 }
586 /*}}}*/
587 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
588 // ---------------------------------------------------------------------
589 /* This is the total number of bytes needed */
590 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
591 {
592 unsigned long long Total = 0;
593 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
594 Total += (*I)->FileSize;
595 return Total;
596 }
597 /*}}}*/
598 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
599 // ---------------------------------------------------------------------
600 /* This is the number of bytes that is not local */
601 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
602 {
603 unsigned long long Total = 0;
604 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
605 if ((*I)->Local == false)
606 Total += (*I)->FileSize;
607 return Total;
608 }
609 /*}}}*/
610 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
611 // ---------------------------------------------------------------------
612 /* This is the number of bytes that is not local */
613 APT_PURE unsigned long long pkgAcquire::PartialPresent()
614 {
615 unsigned long long Total = 0;
616 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
617 if ((*I)->Local == false)
618 Total += (*I)->PartialSize;
619 return Total;
620 }
621 /*}}}*/
622 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
623 // ---------------------------------------------------------------------
624 /* */
625 pkgAcquire::UriIterator pkgAcquire::UriBegin()
626 {
627 return UriIterator(Queues);
628 }
629 /*}}}*/
630 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
631 // ---------------------------------------------------------------------
632 /* */
633 pkgAcquire::UriIterator pkgAcquire::UriEnd()
634 {
635 return UriIterator(0);
636 }
637 /*}}}*/
638 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
639 // ---------------------------------------------------------------------
640 /* */
641 pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
642 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
643 Removable(false)
644 {
645 }
646 /*}}}*/
647 // Queue::Queue - Constructor /*{{{*/
648 // ---------------------------------------------------------------------
649 /* */
650 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
651 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
652 {
653 }
654 /*}}}*/
655 // Queue::~Queue - Destructor /*{{{*/
656 // ---------------------------------------------------------------------
657 /* */
658 pkgAcquire::Queue::~Queue()
659 {
660 Shutdown(true);
661
662 while (Items != 0)
663 {
664 QItem *Jnk = Items;
665 Items = Items->Next;
666 delete Jnk;
667 }
668 }
669 /*}}}*/
670 // Queue::Enqueue - Queue an item to the queue /*{{{*/
671 // ---------------------------------------------------------------------
672 /* */
673 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
674 {
675 QItem **I = &Items;
676 // move to the end of the queue and check for duplicates here
677 for (; *I != 0; I = &(*I)->Next)
678 if (Item.URI == (*I)->URI)
679 {
680 Item.Owner->Status = Item::StatDone;
681 return false;
682 }
683
684 // Create a new item
685 QItem *Itm = new QItem;
686 *Itm = Item;
687 Itm->Next = 0;
688 *I = Itm;
689
690 Item.Owner->QueueCounter++;
691 if (Items->Next == 0)
692 Cycle();
693 return true;
694 }
695 /*}}}*/
696 // Queue::Dequeue - Remove an item from the queue /*{{{*/
697 // ---------------------------------------------------------------------
698 /* We return true if we hit something */
699 bool pkgAcquire::Queue::Dequeue(Item *Owner)
700 {
701 if (Owner->Status == pkgAcquire::Item::StatFetching)
702 return _error->Error("Tried to dequeue a fetching object");
703
704 bool Res = false;
705
706 QItem **I = &Items;
707 for (; *I != 0;)
708 {
709 if ((*I)->Owner == Owner)
710 {
711 QItem *Jnk= *I;
712 *I = (*I)->Next;
713 Owner->QueueCounter--;
714 delete Jnk;
715 Res = true;
716 }
717 else
718 I = &(*I)->Next;
719 }
720
721 return Res;
722 }
723 /*}}}*/
724 // Queue::Startup - Start the worker processes /*{{{*/
725 // ---------------------------------------------------------------------
726 /* It is possible for this to be called with a pre-existing set of
727 workers. */
728 bool pkgAcquire::Queue::Startup()
729 {
730 if (Workers == 0)
731 {
732 URI U(Name);
733 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
734 if (Cnf == 0)
735 return false;
736
737 Workers = new Worker(this,Cnf,Owner->Log);
738 Owner->Add(Workers);
739 if (Workers->Start() == false)
740 return false;
741
742 /* When pipelining we commit 10 items. This needs to change when we
743 added other source retry to have cycle maintain a pipeline depth
744 on its own. */
745 if (Cnf->Pipeline == true)
746 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
747 else
748 MaxPipeDepth = 1;
749 }
750
751 return Cycle();
752 }
753 /*}}}*/
754 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
755 // ---------------------------------------------------------------------
756 /* If final is true then all workers are eliminated, otherwise only workers
757 that do not need cleanup are removed */
758 bool pkgAcquire::Queue::Shutdown(bool Final)
759 {
760 // Delete all of the workers
761 pkgAcquire::Worker **Cur = &Workers;
762 while (*Cur != 0)
763 {
764 pkgAcquire::Worker *Jnk = *Cur;
765 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
766 {
767 *Cur = Jnk->NextQueue;
768 Owner->Remove(Jnk);
769 delete Jnk;
770 }
771 else
772 Cur = &(*Cur)->NextQueue;
773 }
774
775 return true;
776 }
777 /*}}}*/
778 // Queue::FindItem - Find a URI in the item list /*{{{*/
779 // ---------------------------------------------------------------------
780 /* */
781 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
782 {
783 for (QItem *I = Items; I != 0; I = I->Next)
784 if (I->URI == URI && I->Worker == Owner)
785 return I;
786 return 0;
787 }
788 /*}}}*/
789 // Queue::ItemDone - Item has been completed /*{{{*/
790 // ---------------------------------------------------------------------
791 /* The worker signals this which causes the item to be removed from the
792 queue. If this is the last queue instance then it is removed from the
793 main queue too.*/
794 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
795 {
796 PipeDepth--;
797 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
798 Itm->Owner->Status = pkgAcquire::Item::StatDone;
799
800 if (Itm->Owner->QueueCounter <= 1)
801 Owner->Dequeue(Itm->Owner);
802 else
803 {
804 Dequeue(Itm->Owner);
805 Owner->Bump();
806 }
807
808 return Cycle();
809 }
810 /*}}}*/
811 // Queue::Cycle - Queue new items into the method /*{{{*/
812 // ---------------------------------------------------------------------
813 /* This locates a new idle item and sends it to the worker. If pipelining
814 is enabled then it keeps the pipe full. */
815 bool pkgAcquire::Queue::Cycle()
816 {
817 if (Items == 0 || Workers == 0)
818 return true;
819
820 if (PipeDepth < 0)
821 return _error->Error("Pipedepth failure");
822
823 // Look for a queable item
824 QItem *I = Items;
825 while (PipeDepth < (signed)MaxPipeDepth)
826 {
827 for (; I != 0; I = I->Next)
828 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
829 break;
830
831 // Nothing to do, queue is idle.
832 if (I == 0)
833 return true;
834
835 I->Worker = Workers;
836 I->Owner->Status = pkgAcquire::Item::StatFetching;
837 PipeDepth++;
838 if (Workers->QueueItem(I) == false)
839 return false;
840 }
841
842 return true;
843 }
844 /*}}}*/
845 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
846 // ---------------------------------------------------------------------
847 /* This is called when an item in multiple queues is dequeued */
848 void pkgAcquire::Queue::Bump()
849 {
850 Cycle();
851 }
852 /*}}}*/
853 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
854 // ---------------------------------------------------------------------
855 /* */
856 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
857 {
858 Start();
859 }
860 /*}}}*/
861 // AcquireStatus::Pulse - Called periodically /*{{{*/
862 // ---------------------------------------------------------------------
863 /* This computes some internal state variables for the derived classes to
864 use. It generates the current downloaded bytes and total bytes to download
865 as well as the current CPS estimate. */
866 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
867 {
868 TotalBytes = 0;
869 CurrentBytes = 0;
870 TotalItems = 0;
871 CurrentItems = 0;
872
873 // Compute the total number of bytes to fetch
874 unsigned int Unknown = 0;
875 unsigned int Count = 0;
876 bool UnfetchedReleaseFiles = false;
877 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
878 I != Owner->ItemsEnd();
879 ++I, ++Count)
880 {
881 TotalItems++;
882 if ((*I)->Status == pkgAcquire::Item::StatDone)
883 ++CurrentItems;
884
885 // Totally ignore local items
886 if ((*I)->Local == true)
887 continue;
888
889 // see if the method tells us to expect more
890 TotalItems += (*I)->ExpectedAdditionalItems;
891
892 // check if there are unfetched Release files
893 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
894 UnfetchedReleaseFiles = true;
895
896 TotalBytes += (*I)->FileSize;
897 if ((*I)->Complete == true)
898 CurrentBytes += (*I)->FileSize;
899 if ((*I)->FileSize == 0 && (*I)->Complete == false)
900 ++Unknown;
901 }
902
903 // Compute the current completion
904 unsigned long long ResumeSize = 0;
905 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
906 I = Owner->WorkerStep(I))
907 {
908 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
909 {
910 CurrentBytes += I->CurrentSize;
911 ResumeSize += I->ResumePoint;
912
913 // Files with unknown size always have 100% completion
914 if (I->CurrentItem->Owner->FileSize == 0 &&
915 I->CurrentItem->Owner->Complete == false)
916 TotalBytes += I->CurrentSize;
917 }
918 }
919
920 // Normalize the figures and account for unknown size downloads
921 if (TotalBytes <= 0)
922 TotalBytes = 1;
923 if (Unknown == Count)
924 TotalBytes = Unknown;
925
926 // Wha?! Is not supposed to happen.
927 if (CurrentBytes > TotalBytes)
928 CurrentBytes = TotalBytes;
929
930 // debug
931 if (_config->FindB("Debug::acquire::progress", false) == true)
932 std::clog << " Bytes: "
933 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
934 << std::endl;
935
936 // Compute the CPS
937 struct timeval NewTime;
938 gettimeofday(&NewTime,0);
939 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
940 NewTime.tv_sec - Time.tv_sec > 6)
941 {
942 double Delta = NewTime.tv_sec - Time.tv_sec +
943 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
944
945 // Compute the CPS value
946 if (Delta < 0.01)
947 CurrentCPS = 0;
948 else
949 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
950 LastBytes = CurrentBytes - ResumeSize;
951 ElapsedTime = (unsigned long long)Delta;
952 Time = NewTime;
953 }
954
955 // calculate the percentage, if we have too little data assume 1%
956 if (TotalBytes > 0 && UnfetchedReleaseFiles)
957 Percent = 0;
958 else
959 // use both files and bytes because bytes can be unreliable
960 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
961 0.2 * (CurrentItems/float(TotalItems)*100.0));
962
963 int fd = _config->FindI("APT::Status-Fd",-1);
964 if(fd > 0)
965 {
966 ostringstream status;
967
968 char msg[200];
969 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
970 unsigned long long ETA = 0;
971 if(CurrentCPS > 0)
972 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
973
974 // only show the ETA if it makes sense
975 if (ETA > 0 && ETA < 172800 /* two days */ )
976 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
977 else
978 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
979
980 // build the status str
981 status << "dlstatus:" << i
982 << ":" << std::setprecision(3) << Percent
983 << ":" << msg
984 << endl;
985
986 std::string const dlstatus = status.str();
987 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
988 }
989
990 return true;
991 }
992 /*}}}*/
993 // AcquireStatus::Start - Called when the download is started /*{{{*/
994 // ---------------------------------------------------------------------
995 /* We just reset the counters */
996 void pkgAcquireStatus::Start()
997 {
998 gettimeofday(&Time,0);
999 gettimeofday(&StartTime,0);
1000 LastBytes = 0;
1001 CurrentCPS = 0;
1002 CurrentBytes = 0;
1003 TotalBytes = 0;
1004 FetchedBytes = 0;
1005 ElapsedTime = 0;
1006 TotalItems = 0;
1007 CurrentItems = 0;
1008 }
1009 /*}}}*/
1010 // AcquireStatus::Stop - Finished downloading /*{{{*/
1011 // ---------------------------------------------------------------------
1012 /* This accurately computes the elapsed time and the total overall CPS. */
1013 void pkgAcquireStatus::Stop()
1014 {
1015 // Compute the CPS and elapsed time
1016 struct timeval NewTime;
1017 gettimeofday(&NewTime,0);
1018
1019 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1020 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
1021
1022 // Compute the CPS value
1023 if (Delta < 0.01)
1024 CurrentCPS = 0;
1025 else
1026 CurrentCPS = FetchedBytes/Delta;
1027 LastBytes = CurrentBytes;
1028 ElapsedTime = (unsigned long long)Delta;
1029 }
1030 /*}}}*/
1031 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1032 // ---------------------------------------------------------------------
1033 /* This is used to get accurate final transfer rate reporting. */
1034 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
1035 {
1036 FetchedBytes += Size - Resume;
1037 }
1038 /*}}}*/
1039
1040 APT_CONST pkgAcquire::UriIterator::~UriIterator() {}
1041 APT_CONST pkgAcquire::MethodConfig::~MethodConfig() {}
1042 APT_CONST pkgAcquireStatus::~pkgAcquireStatus() {}