]> git.saurik.com Git - apt.git/blob - apt-pkg/acquire.cc
033fa9bd3caabed826cc86744d6e1cb8ad0a0fdc
[apt.git] / apt-pkg / acquire.cc
1 // -*- mode: cpp; mode: fold -*-
2 // Description /*{{{*/
3 // $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
4 /* ######################################################################
5
6 Acquire - File Acquiration
7
8 The core element for the schedule system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controlled by how the queue
11 name is derived from the URI.
12
13 ##################################################################### */
14 /*}}}*/
15 // Include Files /*{{{*/
16 #include <config.h>
17
18 #include <apt-pkg/acquire.h>
19 #include <apt-pkg/acquire-item.h>
20 #include <apt-pkg/acquire-worker.h>
21 #include <apt-pkg/configuration.h>
22 #include <apt-pkg/error.h>
23 #include <apt-pkg/strutl.h>
24 #include <apt-pkg/fileutl.h>
25
26 #include <string>
27 #include <vector>
28 #include <iostream>
29 #include <sstream>
30 #include <iomanip>
31
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <unistd.h>
36 #include <pwd.h>
37 #include <grp.h>
38 #include <dirent.h>
39 #include <sys/time.h>
40 #include <sys/select.h>
41 #include <errno.h>
42 #include <sys/stat.h>
43 #include <sys/types.h>
44
45 #include <apti18n.h>
46 /*}}}*/
47
48 using namespace std;
49
50 // Acquire::pkgAcquire - Constructor /*{{{*/
51 // ---------------------------------------------------------------------
52 /* We grab some runtime state from the configuration space */
53 pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
54 Debug(_config->FindB("Debug::pkgAcquire",false)),
55 Running(false)
56 {
57 string const Mode = _config->Find("Acquire::Queue-Mode","host");
58 if (strcasecmp(Mode.c_str(),"host") == 0)
59 QueueMode = QueueHost;
60 if (strcasecmp(Mode.c_str(),"access") == 0)
61 QueueMode = QueueAccess;
62 }
63 pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
64 Configs(0), Log(NULL), ToFetch(0),
65 Debug(_config->FindB("Debug::pkgAcquire",false)),
66 Running(false)
67 {
68 string const Mode = _config->Find("Acquire::Queue-Mode","host");
69 if (strcasecmp(Mode.c_str(),"host") == 0)
70 QueueMode = QueueHost;
71 if (strcasecmp(Mode.c_str(),"access") == 0)
72 QueueMode = QueueAccess;
73 SetLog(Progress);
74 }
75 /*}}}*/
76 // Acquire::GetLock - lock directory and prepare for action /*{{{*/
77 static bool SetupAPTPartialDirectory(std::string const &grand, std::string const &parent)
78 {
79 std::string const partial = parent + "partial";
80 if (CreateAPTDirectoryIfNeeded(grand, partial) == false &&
81 CreateAPTDirectoryIfNeeded(parent, partial) == false)
82 return false;
83
84 if (getuid() == 0) // if we aren't root, we can't chown, so don't try it
85 {
86 std::string SandboxUser = _config->Find("APT::Sandbox::User");
87 struct passwd *pw = getpwnam(SandboxUser.c_str());
88 struct group *gr = getgrnam("root");
89 if (pw != NULL && gr != NULL)
90 {
91 // chown the partial dir
92 if(chown(partial.c_str(), pw->pw_uid, gr->gr_gid) != 0)
93 _error->WarningE("SetupAPTPartialDirectory", "chown to %s:root of directory %s failed", SandboxUser.c_str(), partial.c_str());
94 // chown the auth.conf file
95 std::string AuthConf = _config->FindFile("Dir::Etc::netrc");
96 if(chown(AuthConf.c_str(), pw->pw_uid, gr->gr_gid) != 0)
97 _error->WarningE("SetupAPTPartialDirectory", "chown to %s:root of file %s failed", SandboxUser.c_str(), AuthConf.c_str());
98 }
99 }
100 if (chmod(partial.c_str(), 0700) != 0)
101 _error->WarningE("SetupAPTPartialDirectory", "chmod 0700 of directory %s failed", partial.c_str());
102
103 return true;
104 }
105 bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
106 {
107 Log = Progress;
108 if (Lock.empty())
109 {
110 string const listDir = _config->FindDir("Dir::State::lists");
111 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
112 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
113 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
114 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
115 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
116 return true;
117 }
118 return GetLock(Lock);
119 }
120 bool pkgAcquire::GetLock(std::string const &Lock)
121 {
122 if (Lock.empty() == true)
123 return false;
124
125 // check for existence and possibly create auxiliary directories
126 string const listDir = _config->FindDir("Dir::State::lists");
127 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
128
129 if (Lock == listDir)
130 {
131 if (SetupAPTPartialDirectory(_config->FindDir("Dir::State"), listDir) == false)
132 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
133 }
134 if (Lock == archivesDir)
135 {
136 if (SetupAPTPartialDirectory(_config->FindDir("Dir::Cache"), archivesDir) == false)
137 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
138 }
139
140 if (_config->FindB("Debug::NoLocking", false) == true)
141 return true;
142
143 // Lock the directory this acquire object will work in
144 LockFD = ::GetLock(flCombine(Lock, "lock"));
145 if (LockFD == -1)
146 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
147
148 return true;
149 }
150 /*}}}*/
151 // Acquire::~pkgAcquire - Destructor /*{{{*/
152 // ---------------------------------------------------------------------
153 /* Free our memory, clean up the queues (destroy the workers) */
154 pkgAcquire::~pkgAcquire()
155 {
156 Shutdown();
157
158 if (LockFD != -1)
159 close(LockFD);
160
161 while (Configs != 0)
162 {
163 MethodConfig *Jnk = Configs;
164 Configs = Configs->Next;
165 delete Jnk;
166 }
167 }
168 /*}}}*/
169 // Acquire::Shutdown - Clean out the acquire object /*{{{*/
170 // ---------------------------------------------------------------------
171 /* */
172 void pkgAcquire::Shutdown()
173 {
174 while (Items.empty() == false)
175 {
176 if (Items[0]->Status == Item::StatFetching)
177 Items[0]->Status = Item::StatError;
178 delete Items[0];
179 }
180
181 while (Queues != 0)
182 {
183 Queue *Jnk = Queues;
184 Queues = Queues->Next;
185 delete Jnk;
186 }
187 }
188 /*}}}*/
189 // Acquire::Add - Add a new item /*{{{*/
190 // ---------------------------------------------------------------------
191 /* This puts an item on the acquire list. This list is mainly for tracking
192 item status */
193 void pkgAcquire::Add(Item *Itm)
194 {
195 Items.push_back(Itm);
196 }
197 /*}}}*/
198 // Acquire::Remove - Remove a item /*{{{*/
199 // ---------------------------------------------------------------------
200 /* Remove an item from the acquire list. This is usually not used.. */
201 void pkgAcquire::Remove(Item *Itm)
202 {
203 Dequeue(Itm);
204
205 for (ItemIterator I = Items.begin(); I != Items.end();)
206 {
207 if (*I == Itm)
208 {
209 Items.erase(I);
210 I = Items.begin();
211 }
212 else
213 ++I;
214 }
215 }
216 /*}}}*/
217 // Acquire::Add - Add a worker /*{{{*/
218 // ---------------------------------------------------------------------
219 /* A list of workers is kept so that the select loop can direct their FD
220 usage. */
221 void pkgAcquire::Add(Worker *Work)
222 {
223 Work->NextAcquire = Workers;
224 Workers = Work;
225 }
226 /*}}}*/
227 // Acquire::Remove - Remove a worker /*{{{*/
228 // ---------------------------------------------------------------------
229 /* A worker has died. This can not be done while the select loop is running
230 as it would require that RunFds could handling a changing list state and
231 it can't.. */
232 void pkgAcquire::Remove(Worker *Work)
233 {
234 if (Running == true)
235 abort();
236
237 Worker **I = &Workers;
238 for (; *I != 0;)
239 {
240 if (*I == Work)
241 *I = (*I)->NextAcquire;
242 else
243 I = &(*I)->NextAcquire;
244 }
245 }
246 /*}}}*/
247 // Acquire::Enqueue - Queue an URI for fetching /*{{{*/
248 // ---------------------------------------------------------------------
249 /* This is the entry point for an item. An item calls this function when
250 it is constructed which creates a queue (based on the current queue
251 mode) and puts the item in that queue. If the system is running then
252 the queue might be started. */
253 void pkgAcquire::Enqueue(ItemDesc &Item)
254 {
255 // Determine which queue to put the item in
256 const MethodConfig *Config;
257 string Name = QueueName(Item.URI,Config);
258 if (Name.empty() == true)
259 return;
260
261 // Find the queue structure
262 Queue *I = Queues;
263 for (; I != 0 && I->Name != Name; I = I->Next);
264 if (I == 0)
265 {
266 I = new Queue(Name,this);
267 I->Next = Queues;
268 Queues = I;
269
270 if (Running == true)
271 I->Startup();
272 }
273
274 // See if this is a local only URI
275 if (Config->LocalOnly == true && Item.Owner->Complete == false)
276 Item.Owner->Local = true;
277 Item.Owner->Status = Item::StatIdle;
278
279 // Queue it into the named queue
280 if(I->Enqueue(Item))
281 ToFetch++;
282
283 // Some trace stuff
284 if (Debug == true)
285 {
286 clog << "Fetching " << Item.URI << endl;
287 clog << " to " << Item.Owner->DestFile << endl;
288 clog << " Queue is: " << Name << endl;
289 }
290 }
291 /*}}}*/
292 // Acquire::Dequeue - Remove an item from all queues /*{{{*/
293 // ---------------------------------------------------------------------
294 /* This is called when an item is finished being fetched. It removes it
295 from all the queues */
296 void pkgAcquire::Dequeue(Item *Itm)
297 {
298 Queue *I = Queues;
299 bool Res = false;
300 if (Debug == true)
301 clog << "Dequeuing " << Itm->DestFile << endl;
302
303 for (; I != 0; I = I->Next)
304 {
305 if (I->Dequeue(Itm))
306 {
307 Res = true;
308 if (Debug == true)
309 clog << "Dequeued from " << I->Name << endl;
310 }
311 }
312
313 if (Res == true)
314 ToFetch--;
315 }
316 /*}}}*/
317 // Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
318 // ---------------------------------------------------------------------
319 /* The string returned depends on the configuration settings and the
320 method parameters. Given something like http://foo.org/bar it can
321 return http://foo.org or http */
322 string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
323 {
324 URI U(Uri);
325
326 Config = GetConfig(U.Access);
327 if (Config == 0)
328 return string();
329
330 /* Single-Instance methods get exactly one queue per URI. This is
331 also used for the Access queue method */
332 if (Config->SingleInstance == true || QueueMode == QueueAccess)
333 return U.Access;
334
335 string AccessSchema = U.Access + ':',
336 FullQueueName = AccessSchema + U.Host;
337 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
338
339 Queue *I = Queues;
340 for (; I != 0; I = I->Next) {
341 // if the queue already exists, re-use it
342 if (I->Name == FullQueueName)
343 return FullQueueName;
344
345 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
346 Instances++;
347 }
348
349 if (Debug) {
350 clog << "Found " << Instances << " instances of " << U.Access << endl;
351 }
352
353 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
354 return U.Access;
355
356 return FullQueueName;
357 }
358 /*}}}*/
359 // Acquire::GetConfig - Fetch the configuration information /*{{{*/
360 // ---------------------------------------------------------------------
361 /* This locates the configuration structure for an access method. If
362 a config structure cannot be found a Worker will be created to
363 retrieve it */
364 pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
365 {
366 // Search for an existing config
367 MethodConfig *Conf;
368 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
369 if (Conf->Access == Access)
370 return Conf;
371
372 // Create the new config class
373 Conf = new MethodConfig;
374 Conf->Access = Access;
375 Conf->Next = Configs;
376 Configs = Conf;
377
378 // Create the worker to fetch the configuration
379 Worker Work(Conf);
380 if (Work.Start() == false)
381 return 0;
382
383 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
384 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
385 Conf->SingleInstance = true;
386
387 return Conf;
388 }
389 /*}}}*/
390 // Acquire::SetFds - Deal with readable FDs /*{{{*/
391 // ---------------------------------------------------------------------
392 /* Collect FDs that have activity monitors into the fd sets */
393 void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
394 {
395 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
396 {
397 if (I->InReady == true && I->InFd >= 0)
398 {
399 if (Fd < I->InFd)
400 Fd = I->InFd;
401 FD_SET(I->InFd,RSet);
402 }
403 if (I->OutReady == true && I->OutFd >= 0)
404 {
405 if (Fd < I->OutFd)
406 Fd = I->OutFd;
407 FD_SET(I->OutFd,WSet);
408 }
409 }
410 }
411 /*}}}*/
412 // Acquire::RunFds - Deal with active FDs /*{{{*/
413 // ---------------------------------------------------------------------
414 /* Dispatch active FDs over to the proper workers. It is very important
415 that a worker never be erased while this is running! The queue class
416 should never erase a worker except during shutdown processing. */
417 void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
418 {
419 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
420 {
421 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
422 I->InFdReady();
423 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
424 I->OutFdReady();
425 }
426 }
427 /*}}}*/
428 // Acquire::Run - Run the fetch sequence /*{{{*/
429 // ---------------------------------------------------------------------
430 /* This runs the queues. It manages a select loop for all of the
431 Worker tasks. The workers interact with the queues and items to
432 manage the actual fetch. */
433 pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
434 {
435 Running = true;
436
437 for (Queue *I = Queues; I != 0; I = I->Next)
438 I->Startup();
439
440 if (Log != 0)
441 Log->Start();
442
443 bool WasCancelled = false;
444
445 // Run till all things have been acquired
446 struct timeval tv;
447 tv.tv_sec = 0;
448 tv.tv_usec = PulseIntervall;
449 while (ToFetch > 0)
450 {
451 fd_set RFds;
452 fd_set WFds;
453 int Highest = 0;
454 FD_ZERO(&RFds);
455 FD_ZERO(&WFds);
456 SetFds(Highest,&RFds,&WFds);
457
458 int Res;
459 do
460 {
461 Res = select(Highest+1,&RFds,&WFds,0,&tv);
462 }
463 while (Res < 0 && errno == EINTR);
464
465 if (Res < 0)
466 {
467 _error->Errno("select","Select has failed");
468 break;
469 }
470
471 RunFds(&RFds,&WFds);
472 if (_error->PendingError() == true)
473 break;
474
475 // Timeout, notify the log class
476 if (Res == 0 || (Log != 0 && Log->Update == true))
477 {
478 tv.tv_usec = PulseIntervall;
479 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
480 I->Pulse();
481 if (Log != 0 && Log->Pulse(this) == false)
482 {
483 WasCancelled = true;
484 break;
485 }
486 }
487 }
488
489 if (Log != 0)
490 Log->Stop();
491
492 // Shut down the acquire bits
493 Running = false;
494 for (Queue *I = Queues; I != 0; I = I->Next)
495 I->Shutdown(false);
496
497 // Shut down the items
498 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
499 (*I)->Finished();
500
501 if (_error->PendingError())
502 return Failed;
503 if (WasCancelled)
504 return Cancelled;
505 return Continue;
506 }
507 /*}}}*/
508 // Acquire::Bump - Called when an item is dequeued /*{{{*/
509 // ---------------------------------------------------------------------
510 /* This routine bumps idle queues in hopes that they will be able to fetch
511 the dequeued item */
512 void pkgAcquire::Bump()
513 {
514 for (Queue *I = Queues; I != 0; I = I->Next)
515 I->Bump();
516 }
517 /*}}}*/
518 // Acquire::WorkerStep - Step to the next worker /*{{{*/
519 // ---------------------------------------------------------------------
520 /* Not inlined to advoid including acquire-worker.h */
521 pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
522 {
523 return I->NextAcquire;
524 }
525 /*}}}*/
526 // Acquire::Clean - Cleans a directory /*{{{*/
527 // ---------------------------------------------------------------------
528 /* This is a bit simplistic, it looks at every file in the dir and sees
529 if it is part of the download set. */
530 bool pkgAcquire::Clean(string Dir)
531 {
532 // non-existing directories are by definition clean…
533 if (DirectoryExists(Dir) == false)
534 return true;
535
536 if(Dir == "/")
537 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
538
539 DIR *D = opendir(Dir.c_str());
540 if (D == 0)
541 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
542
543 string StartDir = SafeGetCWD();
544 if (chdir(Dir.c_str()) != 0)
545 {
546 closedir(D);
547 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
548 }
549
550 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
551 {
552 // Skip some files..
553 if (strcmp(Dir->d_name,"lock") == 0 ||
554 strcmp(Dir->d_name,"partial") == 0 ||
555 strcmp(Dir->d_name,".") == 0 ||
556 strcmp(Dir->d_name,"..") == 0)
557 continue;
558
559 // Look in the get list
560 ItemCIterator I = Items.begin();
561 for (; I != Items.end(); ++I)
562 if (flNotDir((*I)->DestFile) == Dir->d_name)
563 break;
564
565 // Nothing found, nuke it
566 if (I == Items.end())
567 unlink(Dir->d_name);
568 };
569
570 closedir(D);
571 if (chdir(StartDir.c_str()) != 0)
572 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
573 return true;
574 }
575 /*}}}*/
576 // Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
577 // ---------------------------------------------------------------------
578 /* This is the total number of bytes needed */
579 APT_PURE unsigned long long pkgAcquire::TotalNeeded()
580 {
581 unsigned long long Total = 0;
582 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
583 Total += (*I)->FileSize;
584 return Total;
585 }
586 /*}}}*/
587 // Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
588 // ---------------------------------------------------------------------
589 /* This is the number of bytes that is not local */
590 APT_PURE unsigned long long pkgAcquire::FetchNeeded()
591 {
592 unsigned long long Total = 0;
593 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
594 if ((*I)->Local == false)
595 Total += (*I)->FileSize;
596 return Total;
597 }
598 /*}}}*/
599 // Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
600 // ---------------------------------------------------------------------
601 /* This is the number of bytes that is not local */
602 APT_PURE unsigned long long pkgAcquire::PartialPresent()
603 {
604 unsigned long long Total = 0;
605 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
606 if ((*I)->Local == false)
607 Total += (*I)->PartialSize;
608 return Total;
609 }
610 /*}}}*/
611 // Acquire::UriBegin - Start iterator for the uri list /*{{{*/
612 // ---------------------------------------------------------------------
613 /* */
614 pkgAcquire::UriIterator pkgAcquire::UriBegin()
615 {
616 return UriIterator(Queues);
617 }
618 /*}}}*/
619 // Acquire::UriEnd - End iterator for the uri list /*{{{*/
620 // ---------------------------------------------------------------------
621 /* */
622 pkgAcquire::UriIterator pkgAcquire::UriEnd()
623 {
624 return UriIterator(0);
625 }
626 /*}}}*/
627 // Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
628 // ---------------------------------------------------------------------
629 /* */
630 pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
631 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
632 Removable(false)
633 {
634 }
635 /*}}}*/
636 // Queue::Queue - Constructor /*{{{*/
637 // ---------------------------------------------------------------------
638 /* */
639 pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
640 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
641 {
642 }
643 /*}}}*/
644 // Queue::~Queue - Destructor /*{{{*/
645 // ---------------------------------------------------------------------
646 /* */
647 pkgAcquire::Queue::~Queue()
648 {
649 Shutdown(true);
650
651 while (Items != 0)
652 {
653 QItem *Jnk = Items;
654 Items = Items->Next;
655 delete Jnk;
656 }
657 }
658 /*}}}*/
659 // Queue::Enqueue - Queue an item to the queue /*{{{*/
660 // ---------------------------------------------------------------------
661 /* */
662 bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
663 {
664 QItem **I = &Items;
665 // move to the end of the queue and check for duplicates here
666 for (; *I != 0; I = &(*I)->Next)
667 if (Item.URI == (*I)->URI)
668 {
669 Item.Owner->Status = Item::StatDone;
670 return false;
671 }
672
673 // Create a new item
674 QItem *Itm = new QItem;
675 *Itm = Item;
676 Itm->Next = 0;
677 *I = Itm;
678
679 Item.Owner->QueueCounter++;
680 if (Items->Next == 0)
681 Cycle();
682 return true;
683 }
684 /*}}}*/
685 // Queue::Dequeue - Remove an item from the queue /*{{{*/
686 // ---------------------------------------------------------------------
687 /* We return true if we hit something */
688 bool pkgAcquire::Queue::Dequeue(Item *Owner)
689 {
690 if (Owner->Status == pkgAcquire::Item::StatFetching)
691 return _error->Error("Tried to dequeue a fetching object");
692
693 bool Res = false;
694
695 QItem **I = &Items;
696 for (; *I != 0;)
697 {
698 if ((*I)->Owner == Owner)
699 {
700 QItem *Jnk= *I;
701 *I = (*I)->Next;
702 Owner->QueueCounter--;
703 delete Jnk;
704 Res = true;
705 }
706 else
707 I = &(*I)->Next;
708 }
709
710 return Res;
711 }
712 /*}}}*/
713 // Queue::Startup - Start the worker processes /*{{{*/
714 // ---------------------------------------------------------------------
715 /* It is possible for this to be called with a pre-existing set of
716 workers. */
717 bool pkgAcquire::Queue::Startup()
718 {
719 if (Workers == 0)
720 {
721 URI U(Name);
722 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
723 if (Cnf == 0)
724 return false;
725
726 Workers = new Worker(this,Cnf,Owner->Log);
727 Owner->Add(Workers);
728 if (Workers->Start() == false)
729 return false;
730
731 /* When pipelining we commit 10 items. This needs to change when we
732 added other source retry to have cycle maintain a pipeline depth
733 on its own. */
734 if (Cnf->Pipeline == true)
735 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
736 else
737 MaxPipeDepth = 1;
738 }
739
740 return Cycle();
741 }
742 /*}}}*/
743 // Queue::Shutdown - Shutdown the worker processes /*{{{*/
744 // ---------------------------------------------------------------------
745 /* If final is true then all workers are eliminated, otherwise only workers
746 that do not need cleanup are removed */
747 bool pkgAcquire::Queue::Shutdown(bool Final)
748 {
749 // Delete all of the workers
750 pkgAcquire::Worker **Cur = &Workers;
751 while (*Cur != 0)
752 {
753 pkgAcquire::Worker *Jnk = *Cur;
754 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
755 {
756 *Cur = Jnk->NextQueue;
757 Owner->Remove(Jnk);
758 delete Jnk;
759 }
760 else
761 Cur = &(*Cur)->NextQueue;
762 }
763
764 return true;
765 }
766 /*}}}*/
767 // Queue::FindItem - Find a URI in the item list /*{{{*/
768 // ---------------------------------------------------------------------
769 /* */
770 pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
771 {
772 for (QItem *I = Items; I != 0; I = I->Next)
773 if (I->URI == URI && I->Worker == Owner)
774 return I;
775 return 0;
776 }
777 /*}}}*/
778 // Queue::ItemDone - Item has been completed /*{{{*/
779 // ---------------------------------------------------------------------
780 /* The worker signals this which causes the item to be removed from the
781 queue. If this is the last queue instance then it is removed from the
782 main queue too.*/
783 bool pkgAcquire::Queue::ItemDone(QItem *Itm)
784 {
785 PipeDepth--;
786 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
787 Itm->Owner->Status = pkgAcquire::Item::StatDone;
788
789 if (Itm->Owner->QueueCounter <= 1)
790 Owner->Dequeue(Itm->Owner);
791 else
792 {
793 Dequeue(Itm->Owner);
794 Owner->Bump();
795 }
796
797 return Cycle();
798 }
799 /*}}}*/
800 // Queue::Cycle - Queue new items into the method /*{{{*/
801 // ---------------------------------------------------------------------
802 /* This locates a new idle item and sends it to the worker. If pipelining
803 is enabled then it keeps the pipe full. */
804 bool pkgAcquire::Queue::Cycle()
805 {
806 if (Items == 0 || Workers == 0)
807 return true;
808
809 if (PipeDepth < 0)
810 return _error->Error("Pipedepth failure");
811
812 // Look for a queable item
813 QItem *I = Items;
814 while (PipeDepth < (signed)MaxPipeDepth)
815 {
816 for (; I != 0; I = I->Next)
817 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
818 break;
819
820 // Nothing to do, queue is idle.
821 if (I == 0)
822 return true;
823
824 I->Worker = Workers;
825 I->Owner->Status = pkgAcquire::Item::StatFetching;
826 PipeDepth++;
827 if (Workers->QueueItem(I) == false)
828 return false;
829 }
830
831 return true;
832 }
833 /*}}}*/
834 // Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
835 // ---------------------------------------------------------------------
836 /* This is called when an item in multiple queues is dequeued */
837 void pkgAcquire::Queue::Bump()
838 {
839 Cycle();
840 }
841 /*}}}*/
842 // AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
843 // ---------------------------------------------------------------------
844 /* */
845 pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
846 {
847 Start();
848 }
849 /*}}}*/
850 // AcquireStatus::Pulse - Called periodically /*{{{*/
851 // ---------------------------------------------------------------------
852 /* This computes some internal state variables for the derived classes to
853 use. It generates the current downloaded bytes and total bytes to download
854 as well as the current CPS estimate. */
855 bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
856 {
857 TotalBytes = 0;
858 CurrentBytes = 0;
859 TotalItems = 0;
860 CurrentItems = 0;
861
862 // Compute the total number of bytes to fetch
863 unsigned int Unknown = 0;
864 unsigned int Count = 0;
865 bool UnfetchedReleaseFiles = false;
866 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
867 I != Owner->ItemsEnd();
868 ++I, ++Count)
869 {
870 TotalItems++;
871 if ((*I)->Status == pkgAcquire::Item::StatDone)
872 ++CurrentItems;
873
874 // Totally ignore local items
875 if ((*I)->Local == true)
876 continue;
877
878 // see if the method tells us to expect more
879 TotalItems += (*I)->ExpectedAdditionalItems;
880
881 // check if there are unfetched Release files
882 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
883 UnfetchedReleaseFiles = true;
884
885 TotalBytes += (*I)->FileSize;
886 if ((*I)->Complete == true)
887 CurrentBytes += (*I)->FileSize;
888 if ((*I)->FileSize == 0 && (*I)->Complete == false)
889 ++Unknown;
890 }
891
892 // Compute the current completion
893 unsigned long long ResumeSize = 0;
894 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
895 I = Owner->WorkerStep(I))
896 {
897 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
898 {
899 CurrentBytes += I->CurrentSize;
900 ResumeSize += I->ResumePoint;
901
902 // Files with unknown size always have 100% completion
903 if (I->CurrentItem->Owner->FileSize == 0 &&
904 I->CurrentItem->Owner->Complete == false)
905 TotalBytes += I->CurrentSize;
906 }
907 }
908
909 // Normalize the figures and account for unknown size downloads
910 if (TotalBytes <= 0)
911 TotalBytes = 1;
912 if (Unknown == Count)
913 TotalBytes = Unknown;
914
915 // Wha?! Is not supposed to happen.
916 if (CurrentBytes > TotalBytes)
917 CurrentBytes = TotalBytes;
918
919 // debug
920 if (_config->FindB("Debug::acquire::progress", false) == true)
921 std::clog << " Bytes: "
922 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
923 << std::endl;
924
925 // Compute the CPS
926 struct timeval NewTime;
927 gettimeofday(&NewTime,0);
928 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
929 NewTime.tv_sec - Time.tv_sec > 6)
930 {
931 double Delta = NewTime.tv_sec - Time.tv_sec +
932 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
933
934 // Compute the CPS value
935 if (Delta < 0.01)
936 CurrentCPS = 0;
937 else
938 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
939 LastBytes = CurrentBytes - ResumeSize;
940 ElapsedTime = (unsigned long long)Delta;
941 Time = NewTime;
942 }
943
944 // calculate the percentage, if we have too little data assume 1%
945 if (TotalBytes > 0 && UnfetchedReleaseFiles)
946 Percent = 0;
947 else
948 // use both files and bytes because bytes can be unreliable
949 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
950 0.2 * (CurrentItems/float(TotalItems)*100.0));
951
952 int fd = _config->FindI("APT::Status-Fd",-1);
953 if(fd > 0)
954 {
955 ostringstream status;
956
957 char msg[200];
958 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
959 unsigned long long ETA = 0;
960 if(CurrentCPS > 0)
961 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
962
963 // only show the ETA if it makes sense
964 if (ETA > 0 && ETA < 172800 /* two days */ )
965 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
966 else
967 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
968
969 // build the status str
970 status << "dlstatus:" << i
971 << ":" << std::setprecision(3) << Percent
972 << ":" << msg
973 << endl;
974
975 std::string const dlstatus = status.str();
976 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
977 }
978
979 return true;
980 }
981 /*}}}*/
982 // AcquireStatus::Start - Called when the download is started /*{{{*/
983 // ---------------------------------------------------------------------
984 /* We just reset the counters */
985 void pkgAcquireStatus::Start()
986 {
987 gettimeofday(&Time,0);
988 gettimeofday(&StartTime,0);
989 LastBytes = 0;
990 CurrentCPS = 0;
991 CurrentBytes = 0;
992 TotalBytes = 0;
993 FetchedBytes = 0;
994 ElapsedTime = 0;
995 TotalItems = 0;
996 CurrentItems = 0;
997 }
998 /*}}}*/
999 // AcquireStatus::Stop - Finished downloading /*{{{*/
1000 // ---------------------------------------------------------------------
1001 /* This accurately computes the elapsed time and the total overall CPS. */
1002 void pkgAcquireStatus::Stop()
1003 {
1004 // Compute the CPS and elapsed time
1005 struct timeval NewTime;
1006 gettimeofday(&NewTime,0);
1007
1008 double Delta = NewTime.tv_sec - StartTime.tv_sec +
1009 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
1010
1011 // Compute the CPS value
1012 if (Delta < 0.01)
1013 CurrentCPS = 0;
1014 else
1015 CurrentCPS = FetchedBytes/Delta;
1016 LastBytes = CurrentBytes;
1017 ElapsedTime = (unsigned long long)Delta;
1018 }
1019 /*}}}*/
1020 // AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
1021 // ---------------------------------------------------------------------
1022 /* This is used to get accurate final transfer rate reporting. */
1023 void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
1024 {
1025 FetchedBytes += Size - Resume;
1026 }
1027 /*}}}*/
1028
1029 APT_CONST pkgAcquire::UriIterator::~UriIterator() {}
1030 APT_CONST pkgAcquire::MethodConfig::~MethodConfig() {}
1031 APT_CONST pkgAcquireStatus::~pkgAcquireStatus() {}