]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Improved message
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
dca1e241 3// $Id: acquire.cc,v 1.42 1999/10/27 05:00:25 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
7a7fa5f0 26#include <dirent.h>
8267fe24 27#include <sys/time.h>
524f8105 28#include <errno.h>
5f3edc0f 29#include <sys/stat.h>
0118833a
AL
30 /*}}}*/
31
32// Acquire::pkgAcquire - Constructor /*{{{*/
33// ---------------------------------------------------------------------
93bf083d 34/* We grab some runtime state from the configuration space */
8267fe24 35pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
36{
37 Queues = 0;
38 Configs = 0;
0a8a80e5
AL
39 Workers = 0;
40 ToFetch = 0;
8b89e57f 41 Running = false;
0a8a80e5
AL
42
43 string Mode = _config->Find("Acquire::Queue-Mode","host");
44 if (strcasecmp(Mode.c_str(),"host") == 0)
45 QueueMode = QueueHost;
46 if (strcasecmp(Mode.c_str(),"access") == 0)
47 QueueMode = QueueAccess;
48
49 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f
AL
50
51 // This is really a stupid place for this, but people whine so much..
52 struct stat St;
53 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
54 S_ISDIR(St.st_mode) == 0)
dca1e241 55 _error->Error("Lists directory %s/partial is missing.",
5f3edc0f
AL
56 _config->FindDir("Dir::State::lists").c_str());
57 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
58 S_ISDIR(St.st_mode) == 0)
dca1e241 59 _error->Error("Archive directory %s/partial is missing.",
5f3edc0f 60 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
61}
62 /*}}}*/
63// Acquire::~pkgAcquire - Destructor /*{{{*/
64// ---------------------------------------------------------------------
93bf083d 65/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
66pkgAcquire::~pkgAcquire()
67{
3b5421b4
AL
68 while (Configs != 0)
69 {
70 MethodConfig *Jnk = Configs;
71 Configs = Configs->Next;
72 delete Jnk;
73 }
281daf46
AL
74
75 Shutdown();
76}
77 /*}}}*/
8e5fc8f5 78// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
79// ---------------------------------------------------------------------
80/* */
81void pkgAcquire::Shutdown()
82{
83 while (Items.size() != 0)
84 delete Items[0];
0a8a80e5
AL
85
86 while (Queues != 0)
87 {
88 Queue *Jnk = Queues;
89 Queues = Queues->Next;
90 delete Jnk;
91 }
0118833a
AL
92}
93 /*}}}*/
94// Acquire::Add - Add a new item /*{{{*/
95// ---------------------------------------------------------------------
93bf083d
AL
96/* This puts an item on the acquire list. This list is mainly for tracking
97 item status */
0118833a
AL
98void pkgAcquire::Add(Item *Itm)
99{
100 Items.push_back(Itm);
101}
102 /*}}}*/
103// Acquire::Remove - Remove a item /*{{{*/
104// ---------------------------------------------------------------------
93bf083d 105/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
106void pkgAcquire::Remove(Item *Itm)
107{
108 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
109 {
110 if (*I == Itm)
111 Items.erase(I);
8267fe24 112 }
0118833a
AL
113}
114 /*}}}*/
0a8a80e5
AL
115// Acquire::Add - Add a worker /*{{{*/
116// ---------------------------------------------------------------------
93bf083d
AL
117/* A list of workers is kept so that the select loop can direct their FD
118 usage. */
0a8a80e5
AL
119void pkgAcquire::Add(Worker *Work)
120{
121 Work->NextAcquire = Workers;
122 Workers = Work;
123}
124 /*}}}*/
125// Acquire::Remove - Remove a worker /*{{{*/
126// ---------------------------------------------------------------------
93bf083d
AL
127/* A worker has died. This can not be done while the select loop is running
128 as it would require that RunFds could handling a changing list state and
129 it cant.. */
0a8a80e5
AL
130void pkgAcquire::Remove(Worker *Work)
131{
93bf083d
AL
132 if (Running == true)
133 abort();
134
0a8a80e5
AL
135 Worker **I = &Workers;
136 for (; *I != 0;)
137 {
138 if (*I == Work)
139 *I = (*I)->NextAcquire;
140 else
141 I = &(*I)->NextAcquire;
142 }
143}
144 /*}}}*/
0118833a
AL
145// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
146// ---------------------------------------------------------------------
93bf083d 147/* This is the entry point for an item. An item calls this function when
281daf46 148 it is constructed which creates a queue (based on the current queue
93bf083d
AL
149 mode) and puts the item in that queue. If the system is running then
150 the queue might be started. */
8267fe24 151void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 152{
0a8a80e5 153 // Determine which queue to put the item in
e331f6ed
AL
154 const MethodConfig *Config;
155 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
156 if (Name.empty() == true)
157 return;
158
159 // Find the queue structure
160 Queue *I = Queues;
161 for (; I != 0 && I->Name != Name; I = I->Next);
162 if (I == 0)
163 {
164 I = new Queue(Name,this);
165 I->Next = Queues;
166 Queues = I;
93bf083d
AL
167
168 if (Running == true)
169 I->Startup();
0a8a80e5 170 }
bfd22fc0 171
e331f6ed
AL
172 // See if this is a local only URI
173 if (Config->LocalOnly == true && Item.Owner->Complete == false)
174 Item.Owner->Local = true;
8267fe24 175 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
176
177 // Queue it into the named queue
8267fe24 178 I->Enqueue(Item);
0a8a80e5 179 ToFetch++;
93bf083d 180
0a8a80e5
AL
181 // Some trace stuff
182 if (Debug == true)
183 {
8267fe24
AL
184 clog << "Fetching " << Item.URI << endl;
185 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 186 clog << " Queue is: " << Name << endl;
0a8a80e5 187 }
3b5421b4
AL
188}
189 /*}}}*/
0a8a80e5 190// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 191// ---------------------------------------------------------------------
93bf083d
AL
192/* This is called when an item is finished being fetched. It removes it
193 from all the queues */
0a8a80e5
AL
194void pkgAcquire::Dequeue(Item *Itm)
195{
196 Queue *I = Queues;
bfd22fc0 197 bool Res = false;
0a8a80e5 198 for (; I != 0; I = I->Next)
bfd22fc0 199 Res |= I->Dequeue(Itm);
93bf083d
AL
200
201 if (Debug == true)
202 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
203 if (Res == true)
204 ToFetch--;
0a8a80e5
AL
205}
206 /*}}}*/
207// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
208// ---------------------------------------------------------------------
209/* The string returned depends on the configuration settings and the
210 method parameters. Given something like http://foo.org/bar it can
211 return http://foo.org or http */
e331f6ed 212string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 213{
93bf083d
AL
214 URI U(Uri);
215
e331f6ed 216 Config = GetConfig(U.Access);
0a8a80e5
AL
217 if (Config == 0)
218 return string();
219
220 /* Single-Instance methods get exactly one queue per URI. This is
221 also used for the Access queue method */
222 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 223 return U.Access;
93bf083d
AL
224
225 return U.Access + ':' + U.Host;
0118833a
AL
226}
227 /*}}}*/
3b5421b4
AL
228// Acquire::GetConfig - Fetch the configuration information /*{{{*/
229// ---------------------------------------------------------------------
230/* This locates the configuration structure for an access method. If
231 a config structure cannot be found a Worker will be created to
232 retrieve it */
0a8a80e5 233pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
234{
235 // Search for an existing config
236 MethodConfig *Conf;
237 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
238 if (Conf->Access == Access)
239 return Conf;
240
241 // Create the new config class
242 Conf = new MethodConfig;
243 Conf->Access = Access;
244 Conf->Next = Configs;
245 Configs = Conf;
0118833a 246
3b5421b4
AL
247 // Create the worker to fetch the configuration
248 Worker Work(Conf);
249 if (Work.Start() == false)
250 return 0;
251
252 return Conf;
253}
254 /*}}}*/
0a8a80e5
AL
255// Acquire::SetFds - Deal with readable FDs /*{{{*/
256// ---------------------------------------------------------------------
257/* Collect FDs that have activity monitors into the fd sets */
258void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
259{
260 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
261 {
262 if (I->InReady == true && I->InFd >= 0)
263 {
264 if (Fd < I->InFd)
265 Fd = I->InFd;
266 FD_SET(I->InFd,RSet);
267 }
268 if (I->OutReady == true && I->OutFd >= 0)
269 {
270 if (Fd < I->OutFd)
271 Fd = I->OutFd;
272 FD_SET(I->OutFd,WSet);
273 }
274 }
275}
276 /*}}}*/
277// Acquire::RunFds - Deal with active FDs /*{{{*/
278// ---------------------------------------------------------------------
93bf083d
AL
279/* Dispatch active FDs over to the proper workers. It is very important
280 that a worker never be erased while this is running! The queue class
281 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
282void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
283{
284 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
285 {
286 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
287 I->InFdReady();
288 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
289 I->OutFdReady();
290 }
291}
292 /*}}}*/
293// Acquire::Run - Run the fetch sequence /*{{{*/
294// ---------------------------------------------------------------------
295/* This runs the queues. It manages a select loop for all of the
296 Worker tasks. The workers interact with the queues and items to
297 manage the actual fetch. */
024d1123 298pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 299{
8b89e57f
AL
300 Running = true;
301
0a8a80e5
AL
302 for (Queue *I = Queues; I != 0; I = I->Next)
303 I->Startup();
304
b98f2859
AL
305 if (Log != 0)
306 Log->Start();
307
024d1123
AL
308 bool WasCancelled = false;
309
0a8a80e5 310 // Run till all things have been acquired
8267fe24
AL
311 struct timeval tv;
312 tv.tv_sec = 0;
313 tv.tv_usec = 500000;
0a8a80e5
AL
314 while (ToFetch > 0)
315 {
316 fd_set RFds;
317 fd_set WFds;
318 int Highest = 0;
319 FD_ZERO(&RFds);
320 FD_ZERO(&WFds);
321 SetFds(Highest,&RFds,&WFds);
322
b0db36b1
AL
323 int Res;
324 do
325 {
326 Res = select(Highest+1,&RFds,&WFds,0,&tv);
327 }
328 while (Res < 0 && errno == EINTR);
329
8267fe24 330 if (Res < 0)
8b89e57f 331 {
8267fe24
AL
332 _error->Errno("select","Select has failed");
333 break;
8b89e57f 334 }
93bf083d 335
0a8a80e5 336 RunFds(&RFds,&WFds);
93bf083d
AL
337 if (_error->PendingError() == true)
338 break;
8267fe24
AL
339
340 // Timeout, notify the log class
341 if (Res == 0 || (Log != 0 && Log->Update == true))
342 {
343 tv.tv_usec = 500000;
344 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
345 I->Pulse();
024d1123
AL
346 if (Log != 0 && Log->Pulse(this) == false)
347 {
348 WasCancelled = true;
349 break;
350 }
8267fe24 351 }
0a8a80e5 352 }
be4401bf 353
b98f2859
AL
354 if (Log != 0)
355 Log->Stop();
356
be4401bf
AL
357 // Shut down the acquire bits
358 Running = false;
0a8a80e5 359 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 360 I->Shutdown(false);
0a8a80e5 361
ab559b35
AL
362 // Shut down the items
363 for (Item **I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 364 (*I)->Finished();
ab559b35 365
024d1123
AL
366 if (_error->PendingError())
367 return Failed;
368 if (WasCancelled)
369 return Cancelled;
370 return Continue;
93bf083d
AL
371}
372 /*}}}*/
be4401bf 373// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
374// ---------------------------------------------------------------------
375/* This routine bumps idle queues in hopes that they will be able to fetch
376 the dequeued item */
377void pkgAcquire::Bump()
378{
be4401bf
AL
379 for (Queue *I = Queues; I != 0; I = I->Next)
380 I->Bump();
0a8a80e5
AL
381}
382 /*}}}*/
8267fe24
AL
383// Acquire::WorkerStep - Step to the next worker /*{{{*/
384// ---------------------------------------------------------------------
385/* Not inlined to advoid including acquire-worker.h */
386pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
387{
388 return I->NextAcquire;
389};
390 /*}}}*/
a6568219 391// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
392// ---------------------------------------------------------------------
393/* This is a bit simplistic, it looks at every file in the dir and sees
394 if it is part of the download set. */
395bool pkgAcquire::Clean(string Dir)
396{
397 DIR *D = opendir(Dir.c_str());
398 if (D == 0)
399 return _error->Errno("opendir","Unable to read %s",Dir.c_str());
400
401 string StartDir = SafeGetCWD();
402 if (chdir(Dir.c_str()) != 0)
403 {
404 closedir(D);
405 return _error->Errno("chdir","Unable to change to ",Dir.c_str());
406 }
407
408 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
409 {
410 // Skip some files..
411 if (strcmp(Dir->d_name,"lock") == 0 ||
412 strcmp(Dir->d_name,"partial") == 0 ||
413 strcmp(Dir->d_name,".") == 0 ||
414 strcmp(Dir->d_name,"..") == 0)
415 continue;
416
417 // Look in the get list
418 vector<Item *>::iterator I = Items.begin();
419 for (; I != Items.end(); I++)
420 if (flNotDir((*I)->DestFile) == Dir->d_name)
421 break;
422
423 // Nothing found, nuke it
424 if (I == Items.end())
425 unlink(Dir->d_name);
426 };
427
428 chdir(StartDir.c_str());
429 closedir(D);
430 return true;
431}
432 /*}}}*/
a6568219
AL
433// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
434// ---------------------------------------------------------------------
435/* This is the total number of bytes needed */
436unsigned long pkgAcquire::TotalNeeded()
437{
438 unsigned long Total = 0;
439 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
440 Total += (*I)->FileSize;
441 return Total;
442}
443 /*}}}*/
444// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
445// ---------------------------------------------------------------------
446/* This is the number of bytes that is not local */
447unsigned long pkgAcquire::FetchNeeded()
448{
449 unsigned long Total = 0;
450 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
451 if ((*I)->Local == false)
452 Total += (*I)->FileSize;
453 return Total;
454}
455 /*}}}*/
6b1ff003
AL
456// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
457// ---------------------------------------------------------------------
458/* This is the number of bytes that is not local */
459unsigned long pkgAcquire::PartialPresent()
460{
461 unsigned long Total = 0;
462 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
463 if ((*I)->Local == false)
464 Total += (*I)->PartialSize;
465 return Total;
466}
467 /*}}}*/
8e5fc8f5 468// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
469// ---------------------------------------------------------------------
470/* */
471pkgAcquire::UriIterator pkgAcquire::UriBegin()
472{
473 return UriIterator(Queues);
474}
475 /*}}}*/
8e5fc8f5 476// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
477// ---------------------------------------------------------------------
478/* */
479pkgAcquire::UriIterator pkgAcquire::UriEnd()
480{
481 return UriIterator(0);
482}
483 /*}}}*/
0a8a80e5 484
e331f6ed
AL
485// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
486// ---------------------------------------------------------------------
487/* */
488pkgAcquire::MethodConfig::MethodConfig()
489{
490 SingleInstance = false;
e331f6ed
AL
491 Pipeline = false;
492 SendConfig = false;
493 LocalOnly = false;
494 Next = 0;
495}
496 /*}}}*/
497
0a8a80e5
AL
498// Queue::Queue - Constructor /*{{{*/
499// ---------------------------------------------------------------------
500/* */
501pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
502 Owner(Owner)
503{
504 Items = 0;
505 Next = 0;
506 Workers = 0;
b185acc2
AL
507 MaxPipeDepth = 1;
508 PipeDepth = 0;
0a8a80e5
AL
509}
510 /*}}}*/
511// Queue::~Queue - Destructor /*{{{*/
512// ---------------------------------------------------------------------
513/* */
514pkgAcquire::Queue::~Queue()
515{
8e5fc8f5 516 Shutdown(true);
0a8a80e5
AL
517
518 while (Items != 0)
519 {
520 QItem *Jnk = Items;
521 Items = Items->Next;
522 delete Jnk;
523 }
524}
525 /*}}}*/
526// Queue::Enqueue - Queue an item to the queue /*{{{*/
527// ---------------------------------------------------------------------
528/* */
8267fe24 529void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 530{
7a1b1f8b
AL
531 QItem **I = &Items;
532 for (; *I != 0; I = &(*I)->Next);
533
0a8a80e5 534 // Create a new item
7a1b1f8b
AL
535 QItem *Itm = new QItem;
536 *Itm = Item;
537 Itm->Next = 0;
538 *I = Itm;
0a8a80e5 539
8267fe24 540 Item.Owner->QueueCounter++;
93bf083d
AL
541 if (Items->Next == 0)
542 Cycle();
0a8a80e5
AL
543}
544 /*}}}*/
c88edf1d 545// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 546// ---------------------------------------------------------------------
b185acc2 547/* We return true if we hit something */
bfd22fc0 548bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 549{
b185acc2
AL
550 if (Owner->Status == pkgAcquire::Item::StatFetching)
551 return _error->Error("Tried to dequeue a fetching object");
552
bfd22fc0
AL
553 bool Res = false;
554
0a8a80e5
AL
555 QItem **I = &Items;
556 for (; *I != 0;)
557 {
558 if ((*I)->Owner == Owner)
559 {
560 QItem *Jnk= *I;
561 *I = (*I)->Next;
562 Owner->QueueCounter--;
563 delete Jnk;
bfd22fc0 564 Res = true;
0a8a80e5
AL
565 }
566 else
567 I = &(*I)->Next;
568 }
bfd22fc0
AL
569
570 return Res;
0a8a80e5
AL
571}
572 /*}}}*/
573// Queue::Startup - Start the worker processes /*{{{*/
574// ---------------------------------------------------------------------
8e5fc8f5
AL
575/* It is possible for this to be called with a pre-existing set of
576 workers. */
0a8a80e5
AL
577bool pkgAcquire::Queue::Startup()
578{
8e5fc8f5
AL
579 if (Workers == 0)
580 {
581 URI U(Name);
582 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
583 if (Cnf == 0)
584 return false;
585
586 Workers = new Worker(this,Cnf,Owner->Log);
587 Owner->Add(Workers);
588 if (Workers->Start() == false)
589 return false;
590
591 /* When pipelining we commit 10 items. This needs to change when we
592 added other source retry to have cycle maintain a pipeline depth
593 on its own. */
594 if (Cnf->Pipeline == true)
595 MaxPipeDepth = 10;
596 else
597 MaxPipeDepth = 1;
598 }
5cb5d8dc 599
93bf083d 600 return Cycle();
0a8a80e5
AL
601}
602 /*}}}*/
603// Queue::Shutdown - Shutdown the worker processes /*{{{*/
604// ---------------------------------------------------------------------
8e5fc8f5
AL
605/* If final is true then all workers are eliminated, otherwise only workers
606 that do not need cleanup are removed */
607bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
608{
609 // Delete all of the workers
8e5fc8f5
AL
610 pkgAcquire::Worker **Cur = &Workers;
611 while (*Cur != 0)
0a8a80e5 612 {
8e5fc8f5
AL
613 pkgAcquire::Worker *Jnk = *Cur;
614 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
615 {
616 *Cur = Jnk->NextQueue;
617 Owner->Remove(Jnk);
618 delete Jnk;
619 }
620 else
621 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
622 }
623
624 return true;
3b5421b4
AL
625}
626 /*}}}*/
7d8afa39 627// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
628// ---------------------------------------------------------------------
629/* */
630pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
631{
632 for (QItem *I = Items; I != 0; I = I->Next)
633 if (I->URI == URI && I->Worker == Owner)
634 return I;
635 return 0;
636}
637 /*}}}*/
638// Queue::ItemDone - Item has been completed /*{{{*/
639// ---------------------------------------------------------------------
640/* The worker signals this which causes the item to be removed from the
93bf083d
AL
641 queue. If this is the last queue instance then it is removed from the
642 main queue too.*/
c88edf1d
AL
643bool pkgAcquire::Queue::ItemDone(QItem *Itm)
644{
b185acc2 645 PipeDepth--;
db890fdb
AL
646 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
647 Itm->Owner->Status = pkgAcquire::Item::StatDone;
648
93bf083d
AL
649 if (Itm->Owner->QueueCounter <= 1)
650 Owner->Dequeue(Itm->Owner);
651 else
652 {
653 Dequeue(Itm->Owner);
654 Owner->Bump();
655 }
c88edf1d 656
93bf083d
AL
657 return Cycle();
658}
659 /*}}}*/
660// Queue::Cycle - Queue new items into the method /*{{{*/
661// ---------------------------------------------------------------------
b185acc2
AL
662/* This locates a new idle item and sends it to the worker. If pipelining
663 is enabled then it keeps the pipe full. */
93bf083d
AL
664bool pkgAcquire::Queue::Cycle()
665{
666 if (Items == 0 || Workers == 0)
c88edf1d
AL
667 return true;
668
e7432370
AL
669 if (PipeDepth < 0)
670 return _error->Error("Pipedepth failure");
671
93bf083d
AL
672 // Look for a queable item
673 QItem *I = Items;
e7432370 674 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
675 {
676 for (; I != 0; I = I->Next)
677 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
678 break;
679
680 // Nothing to do, queue is idle.
681 if (I == 0)
682 return true;
683
684 I->Worker = Workers;
685 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 686 PipeDepth++;
b185acc2
AL
687 if (Workers->QueueItem(I) == false)
688 return false;
689 }
93bf083d 690
b185acc2 691 return true;
c88edf1d
AL
692}
693 /*}}}*/
be4401bf
AL
694// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
695// ---------------------------------------------------------------------
b185acc2 696/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
697void pkgAcquire::Queue::Bump()
698{
b185acc2 699 Cycle();
be4401bf
AL
700}
701 /*}}}*/
b98f2859
AL
702
703// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
704// ---------------------------------------------------------------------
705/* */
706pkgAcquireStatus::pkgAcquireStatus()
707{
708 Start();
709}
710 /*}}}*/
711// AcquireStatus::Pulse - Called periodically /*{{{*/
712// ---------------------------------------------------------------------
713/* This computes some internal state variables for the derived classes to
714 use. It generates the current downloaded bytes and total bytes to download
715 as well as the current CPS estimate. */
024d1123 716bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
717{
718 TotalBytes = 0;
719 CurrentBytes = 0;
d568ed2d
AL
720 TotalItems = 0;
721 CurrentItems = 0;
b98f2859
AL
722
723 // Compute the total number of bytes to fetch
724 unsigned int Unknown = 0;
725 unsigned int Count = 0;
726 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
727 I++, Count++)
728 {
d568ed2d
AL
729 TotalItems++;
730 if ((*I)->Status == pkgAcquire::Item::StatDone)
731 CurrentItems++;
732
a6568219
AL
733 // Totally ignore local items
734 if ((*I)->Local == true)
735 continue;
736
b98f2859
AL
737 TotalBytes += (*I)->FileSize;
738 if ((*I)->Complete == true)
739 CurrentBytes += (*I)->FileSize;
740 if ((*I)->FileSize == 0 && (*I)->Complete == false)
741 Unknown++;
742 }
743
744 // Compute the current completion
aa0e1101 745 unsigned long ResumeSize = 0;
b98f2859
AL
746 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
747 I = Owner->WorkerStep(I))
748 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
749 {
750 CurrentBytes += I->CurrentSize;
751 ResumeSize += I->ResumePoint;
752
753 // Files with unknown size always have 100% completion
754 if (I->CurrentItem->Owner->FileSize == 0 &&
755 I->CurrentItem->Owner->Complete == false)
756 TotalBytes += I->CurrentSize;
757 }
758
b98f2859
AL
759 // Normalize the figures and account for unknown size downloads
760 if (TotalBytes <= 0)
761 TotalBytes = 1;
762 if (Unknown == Count)
763 TotalBytes = Unknown;
18ef0a78
AL
764
765 // Wha?! Is not supposed to happen.
766 if (CurrentBytes > TotalBytes)
767 CurrentBytes = TotalBytes;
b98f2859
AL
768
769 // Compute the CPS
770 struct timeval NewTime;
771 gettimeofday(&NewTime,0);
772 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
773 NewTime.tv_sec - Time.tv_sec > 6)
774 {
f17ac097
AL
775 double Delta = NewTime.tv_sec - Time.tv_sec +
776 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 777
b98f2859 778 // Compute the CPS value
f17ac097 779 if (Delta < 0.01)
e331f6ed
AL
780 CurrentCPS = 0;
781 else
aa0e1101
AL
782 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
783 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 784 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
785 Time = NewTime;
786 }
024d1123
AL
787
788 return true;
b98f2859
AL
789}
790 /*}}}*/
791// AcquireStatus::Start - Called when the download is started /*{{{*/
792// ---------------------------------------------------------------------
793/* We just reset the counters */
794void pkgAcquireStatus::Start()
795{
796 gettimeofday(&Time,0);
797 gettimeofday(&StartTime,0);
798 LastBytes = 0;
799 CurrentCPS = 0;
800 CurrentBytes = 0;
801 TotalBytes = 0;
802 FetchedBytes = 0;
803 ElapsedTime = 0;
d568ed2d
AL
804 TotalItems = 0;
805 CurrentItems = 0;
b98f2859
AL
806}
807 /*}}}*/
a6568219 808// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
809// ---------------------------------------------------------------------
810/* This accurately computes the elapsed time and the total overall CPS. */
811void pkgAcquireStatus::Stop()
812{
813 // Compute the CPS and elapsed time
814 struct timeval NewTime;
815 gettimeofday(&NewTime,0);
816
31a0531d
AL
817 double Delta = NewTime.tv_sec - StartTime.tv_sec +
818 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 819
b98f2859 820 // Compute the CPS value
31a0531d 821 if (Delta < 0.01)
e331f6ed
AL
822 CurrentCPS = 0;
823 else
31a0531d 824 CurrentCPS = FetchedBytes/Delta;
b98f2859 825 LastBytes = CurrentBytes;
31a0531d 826 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
827}
828 /*}}}*/
829// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
830// ---------------------------------------------------------------------
831/* This is used to get accurate final transfer rate reporting. */
832void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 833{
b98f2859
AL
834 FetchedBytes += Size - Resume;
835}
836 /*}}}*/