]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Purge support
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
281daf46 3// $Id: acquire.cc,v 1.37 1999/07/03 03:10:35 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
7a7fa5f0 26#include <dirent.h>
8267fe24 27#include <sys/time.h>
524f8105 28#include <errno.h>
0118833a
AL
29 /*}}}*/
30
31// Acquire::pkgAcquire - Constructor /*{{{*/
32// ---------------------------------------------------------------------
93bf083d 33/* We grab some runtime state from the configuration space */
8267fe24 34pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
35{
36 Queues = 0;
37 Configs = 0;
0a8a80e5
AL
38 Workers = 0;
39 ToFetch = 0;
8b89e57f 40 Running = false;
0a8a80e5
AL
41
42 string Mode = _config->Find("Acquire::Queue-Mode","host");
43 if (strcasecmp(Mode.c_str(),"host") == 0)
44 QueueMode = QueueHost;
45 if (strcasecmp(Mode.c_str(),"access") == 0)
46 QueueMode = QueueAccess;
47
48 Debug = _config->FindB("Debug::pkgAcquire",false);
0118833a
AL
49}
50 /*}}}*/
51// Acquire::~pkgAcquire - Destructor /*{{{*/
52// ---------------------------------------------------------------------
93bf083d 53/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
54pkgAcquire::~pkgAcquire()
55{
3b5421b4
AL
56 while (Configs != 0)
57 {
58 MethodConfig *Jnk = Configs;
59 Configs = Configs->Next;
60 delete Jnk;
61 }
281daf46
AL
62
63 Shutdown();
64}
65 /*}}}*/
66// pkgAcquire::Shutdown - Clean out the acquire object /*{{{*/
67// ---------------------------------------------------------------------
68/* */
69void pkgAcquire::Shutdown()
70{
71 while (Items.size() != 0)
72 delete Items[0];
0a8a80e5
AL
73
74 while (Queues != 0)
75 {
76 Queue *Jnk = Queues;
77 Queues = Queues->Next;
78 delete Jnk;
79 }
0118833a
AL
80}
81 /*}}}*/
82// Acquire::Add - Add a new item /*{{{*/
83// ---------------------------------------------------------------------
93bf083d
AL
84/* This puts an item on the acquire list. This list is mainly for tracking
85 item status */
0118833a
AL
86void pkgAcquire::Add(Item *Itm)
87{
88 Items.push_back(Itm);
89}
90 /*}}}*/
91// Acquire::Remove - Remove a item /*{{{*/
92// ---------------------------------------------------------------------
93bf083d 93/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
94void pkgAcquire::Remove(Item *Itm)
95{
96 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
97 {
98 if (*I == Itm)
99 Items.erase(I);
8267fe24 100 }
0118833a
AL
101}
102 /*}}}*/
0a8a80e5
AL
103// Acquire::Add - Add a worker /*{{{*/
104// ---------------------------------------------------------------------
93bf083d
AL
105/* A list of workers is kept so that the select loop can direct their FD
106 usage. */
0a8a80e5
AL
107void pkgAcquire::Add(Worker *Work)
108{
109 Work->NextAcquire = Workers;
110 Workers = Work;
111}
112 /*}}}*/
113// Acquire::Remove - Remove a worker /*{{{*/
114// ---------------------------------------------------------------------
93bf083d
AL
115/* A worker has died. This can not be done while the select loop is running
116 as it would require that RunFds could handling a changing list state and
117 it cant.. */
0a8a80e5
AL
118void pkgAcquire::Remove(Worker *Work)
119{
93bf083d
AL
120 if (Running == true)
121 abort();
122
0a8a80e5
AL
123 Worker **I = &Workers;
124 for (; *I != 0;)
125 {
126 if (*I == Work)
127 *I = (*I)->NextAcquire;
128 else
129 I = &(*I)->NextAcquire;
130 }
131}
132 /*}}}*/
0118833a
AL
133// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
134// ---------------------------------------------------------------------
93bf083d 135/* This is the entry point for an item. An item calls this function when
281daf46 136 it is constructed which creates a queue (based on the current queue
93bf083d
AL
137 mode) and puts the item in that queue. If the system is running then
138 the queue might be started. */
8267fe24 139void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 140{
0a8a80e5 141 // Determine which queue to put the item in
e331f6ed
AL
142 const MethodConfig *Config;
143 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
144 if (Name.empty() == true)
145 return;
146
147 // Find the queue structure
148 Queue *I = Queues;
149 for (; I != 0 && I->Name != Name; I = I->Next);
150 if (I == 0)
151 {
152 I = new Queue(Name,this);
153 I->Next = Queues;
154 Queues = I;
93bf083d
AL
155
156 if (Running == true)
157 I->Startup();
0a8a80e5 158 }
bfd22fc0 159
e331f6ed
AL
160 // See if this is a local only URI
161 if (Config->LocalOnly == true && Item.Owner->Complete == false)
162 Item.Owner->Local = true;
8267fe24 163 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
164
165 // Queue it into the named queue
8267fe24 166 I->Enqueue(Item);
0a8a80e5 167 ToFetch++;
93bf083d 168
0a8a80e5
AL
169 // Some trace stuff
170 if (Debug == true)
171 {
8267fe24
AL
172 clog << "Fetching " << Item.URI << endl;
173 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 174 clog << " Queue is: " << Name << endl;
0a8a80e5 175 }
3b5421b4
AL
176}
177 /*}}}*/
0a8a80e5 178// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 179// ---------------------------------------------------------------------
93bf083d
AL
180/* This is called when an item is finished being fetched. It removes it
181 from all the queues */
0a8a80e5
AL
182void pkgAcquire::Dequeue(Item *Itm)
183{
184 Queue *I = Queues;
bfd22fc0 185 bool Res = false;
0a8a80e5 186 for (; I != 0; I = I->Next)
bfd22fc0 187 Res |= I->Dequeue(Itm);
93bf083d
AL
188
189 if (Debug == true)
190 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
191 if (Res == true)
192 ToFetch--;
0a8a80e5
AL
193}
194 /*}}}*/
195// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
196// ---------------------------------------------------------------------
197/* The string returned depends on the configuration settings and the
198 method parameters. Given something like http://foo.org/bar it can
199 return http://foo.org or http */
e331f6ed 200string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 201{
93bf083d
AL
202 URI U(Uri);
203
e331f6ed 204 Config = GetConfig(U.Access);
0a8a80e5
AL
205 if (Config == 0)
206 return string();
207
208 /* Single-Instance methods get exactly one queue per URI. This is
209 also used for the Access queue method */
210 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 211 return U.Access;
93bf083d
AL
212
213 return U.Access + ':' + U.Host;
0118833a
AL
214}
215 /*}}}*/
3b5421b4
AL
216// Acquire::GetConfig - Fetch the configuration information /*{{{*/
217// ---------------------------------------------------------------------
218/* This locates the configuration structure for an access method. If
219 a config structure cannot be found a Worker will be created to
220 retrieve it */
0a8a80e5 221pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
222{
223 // Search for an existing config
224 MethodConfig *Conf;
225 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
226 if (Conf->Access == Access)
227 return Conf;
228
229 // Create the new config class
230 Conf = new MethodConfig;
231 Conf->Access = Access;
232 Conf->Next = Configs;
233 Configs = Conf;
0118833a 234
3b5421b4
AL
235 // Create the worker to fetch the configuration
236 Worker Work(Conf);
237 if (Work.Start() == false)
238 return 0;
239
240 return Conf;
241}
242 /*}}}*/
0a8a80e5
AL
243// Acquire::SetFds - Deal with readable FDs /*{{{*/
244// ---------------------------------------------------------------------
245/* Collect FDs that have activity monitors into the fd sets */
246void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
247{
248 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
249 {
250 if (I->InReady == true && I->InFd >= 0)
251 {
252 if (Fd < I->InFd)
253 Fd = I->InFd;
254 FD_SET(I->InFd,RSet);
255 }
256 if (I->OutReady == true && I->OutFd >= 0)
257 {
258 if (Fd < I->OutFd)
259 Fd = I->OutFd;
260 FD_SET(I->OutFd,WSet);
261 }
262 }
263}
264 /*}}}*/
265// Acquire::RunFds - Deal with active FDs /*{{{*/
266// ---------------------------------------------------------------------
93bf083d
AL
267/* Dispatch active FDs over to the proper workers. It is very important
268 that a worker never be erased while this is running! The queue class
269 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
270void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
271{
272 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
273 {
274 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
275 I->InFdReady();
276 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
277 I->OutFdReady();
278 }
279}
280 /*}}}*/
281// Acquire::Run - Run the fetch sequence /*{{{*/
282// ---------------------------------------------------------------------
283/* This runs the queues. It manages a select loop for all of the
284 Worker tasks. The workers interact with the queues and items to
285 manage the actual fetch. */
024d1123 286pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 287{
8b89e57f
AL
288 Running = true;
289
0a8a80e5
AL
290 for (Queue *I = Queues; I != 0; I = I->Next)
291 I->Startup();
292
b98f2859
AL
293 if (Log != 0)
294 Log->Start();
295
024d1123
AL
296 bool WasCancelled = false;
297
0a8a80e5 298 // Run till all things have been acquired
8267fe24
AL
299 struct timeval tv;
300 tv.tv_sec = 0;
301 tv.tv_usec = 500000;
0a8a80e5
AL
302 while (ToFetch > 0)
303 {
304 fd_set RFds;
305 fd_set WFds;
306 int Highest = 0;
307 FD_ZERO(&RFds);
308 FD_ZERO(&WFds);
309 SetFds(Highest,&RFds,&WFds);
310
b0db36b1
AL
311 int Res;
312 do
313 {
314 Res = select(Highest+1,&RFds,&WFds,0,&tv);
315 }
316 while (Res < 0 && errno == EINTR);
317
8267fe24 318 if (Res < 0)
8b89e57f 319 {
8267fe24
AL
320 _error->Errno("select","Select has failed");
321 break;
8b89e57f 322 }
93bf083d 323
0a8a80e5 324 RunFds(&RFds,&WFds);
93bf083d
AL
325 if (_error->PendingError() == true)
326 break;
8267fe24
AL
327
328 // Timeout, notify the log class
329 if (Res == 0 || (Log != 0 && Log->Update == true))
330 {
331 tv.tv_usec = 500000;
332 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
333 I->Pulse();
024d1123
AL
334 if (Log != 0 && Log->Pulse(this) == false)
335 {
336 WasCancelled = true;
337 break;
338 }
8267fe24 339 }
0a8a80e5 340 }
be4401bf 341
b98f2859
AL
342 if (Log != 0)
343 Log->Stop();
344
be4401bf
AL
345 // Shut down the acquire bits
346 Running = false;
0a8a80e5
AL
347 for (Queue *I = Queues; I != 0; I = I->Next)
348 I->Shutdown();
349
ab559b35
AL
350 // Shut down the items
351 for (Item **I = Items.begin(); I != Items.end(); I++)
352 (*I)->Finished();
353
024d1123
AL
354 if (_error->PendingError())
355 return Failed;
356 if (WasCancelled)
357 return Cancelled;
358 return Continue;
93bf083d
AL
359}
360 /*}}}*/
be4401bf 361// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
362// ---------------------------------------------------------------------
363/* This routine bumps idle queues in hopes that they will be able to fetch
364 the dequeued item */
365void pkgAcquire::Bump()
366{
be4401bf
AL
367 for (Queue *I = Queues; I != 0; I = I->Next)
368 I->Bump();
0a8a80e5
AL
369}
370 /*}}}*/
8267fe24
AL
371// Acquire::WorkerStep - Step to the next worker /*{{{*/
372// ---------------------------------------------------------------------
373/* Not inlined to advoid including acquire-worker.h */
374pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
375{
376 return I->NextAcquire;
377};
378 /*}}}*/
a6568219 379// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
380// ---------------------------------------------------------------------
381/* This is a bit simplistic, it looks at every file in the dir and sees
382 if it is part of the download set. */
383bool pkgAcquire::Clean(string Dir)
384{
385 DIR *D = opendir(Dir.c_str());
386 if (D == 0)
387 return _error->Errno("opendir","Unable to read %s",Dir.c_str());
388
389 string StartDir = SafeGetCWD();
390 if (chdir(Dir.c_str()) != 0)
391 {
392 closedir(D);
393 return _error->Errno("chdir","Unable to change to ",Dir.c_str());
394 }
395
396 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
397 {
398 // Skip some files..
399 if (strcmp(Dir->d_name,"lock") == 0 ||
400 strcmp(Dir->d_name,"partial") == 0 ||
401 strcmp(Dir->d_name,".") == 0 ||
402 strcmp(Dir->d_name,"..") == 0)
403 continue;
404
405 // Look in the get list
406 vector<Item *>::iterator I = Items.begin();
407 for (; I != Items.end(); I++)
408 if (flNotDir((*I)->DestFile) == Dir->d_name)
409 break;
410
411 // Nothing found, nuke it
412 if (I == Items.end())
413 unlink(Dir->d_name);
414 };
415
416 chdir(StartDir.c_str());
417 closedir(D);
418 return true;
419}
420 /*}}}*/
a6568219
AL
421// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
422// ---------------------------------------------------------------------
423/* This is the total number of bytes needed */
424unsigned long pkgAcquire::TotalNeeded()
425{
426 unsigned long Total = 0;
427 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
428 Total += (*I)->FileSize;
429 return Total;
430}
431 /*}}}*/
432// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
433// ---------------------------------------------------------------------
434/* This is the number of bytes that is not local */
435unsigned long pkgAcquire::FetchNeeded()
436{
437 unsigned long Total = 0;
438 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
439 if ((*I)->Local == false)
440 Total += (*I)->FileSize;
441 return Total;
442}
443 /*}}}*/
6b1ff003
AL
444// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
445// ---------------------------------------------------------------------
446/* This is the number of bytes that is not local */
447unsigned long pkgAcquire::PartialPresent()
448{
449 unsigned long Total = 0;
450 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
451 if ((*I)->Local == false)
452 Total += (*I)->PartialSize;
453 return Total;
454}
455 /*}}}*/
f7a08e33
AL
456// pkgAcquire::UriBegin - Start iterator for the uri list /*{{{*/
457// ---------------------------------------------------------------------
458/* */
459pkgAcquire::UriIterator pkgAcquire::UriBegin()
460{
461 return UriIterator(Queues);
462}
463 /*}}}*/
464// pkgAcquire::UriEnd - End iterator for the uri list /*{{{*/
465// ---------------------------------------------------------------------
466/* */
467pkgAcquire::UriIterator pkgAcquire::UriEnd()
468{
469 return UriIterator(0);
470}
471 /*}}}*/
0a8a80e5 472
e331f6ed
AL
473// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
474// ---------------------------------------------------------------------
475/* */
476pkgAcquire::MethodConfig::MethodConfig()
477{
478 SingleInstance = false;
e331f6ed
AL
479 Pipeline = false;
480 SendConfig = false;
481 LocalOnly = false;
482 Next = 0;
483}
484 /*}}}*/
485
0a8a80e5
AL
486// Queue::Queue - Constructor /*{{{*/
487// ---------------------------------------------------------------------
488/* */
489pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
490 Owner(Owner)
491{
492 Items = 0;
493 Next = 0;
494 Workers = 0;
b185acc2
AL
495 MaxPipeDepth = 1;
496 PipeDepth = 0;
0a8a80e5
AL
497}
498 /*}}}*/
499// Queue::~Queue - Destructor /*{{{*/
500// ---------------------------------------------------------------------
501/* */
502pkgAcquire::Queue::~Queue()
503{
504 Shutdown();
505
506 while (Items != 0)
507 {
508 QItem *Jnk = Items;
509 Items = Items->Next;
510 delete Jnk;
511 }
512}
513 /*}}}*/
514// Queue::Enqueue - Queue an item to the queue /*{{{*/
515// ---------------------------------------------------------------------
516/* */
8267fe24 517void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 518{
7a1b1f8b
AL
519 QItem **I = &Items;
520 for (; *I != 0; I = &(*I)->Next);
521
0a8a80e5 522 // Create a new item
7a1b1f8b
AL
523 QItem *Itm = new QItem;
524 *Itm = Item;
525 Itm->Next = 0;
526 *I = Itm;
0a8a80e5 527
8267fe24 528 Item.Owner->QueueCounter++;
93bf083d
AL
529 if (Items->Next == 0)
530 Cycle();
0a8a80e5
AL
531}
532 /*}}}*/
c88edf1d 533// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 534// ---------------------------------------------------------------------
b185acc2 535/* We return true if we hit something */
bfd22fc0 536bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 537{
b185acc2
AL
538 if (Owner->Status == pkgAcquire::Item::StatFetching)
539 return _error->Error("Tried to dequeue a fetching object");
540
bfd22fc0
AL
541 bool Res = false;
542
0a8a80e5
AL
543 QItem **I = &Items;
544 for (; *I != 0;)
545 {
546 if ((*I)->Owner == Owner)
547 {
548 QItem *Jnk= *I;
549 *I = (*I)->Next;
550 Owner->QueueCounter--;
551 delete Jnk;
bfd22fc0 552 Res = true;
0a8a80e5
AL
553 }
554 else
555 I = &(*I)->Next;
556 }
bfd22fc0
AL
557
558 return Res;
0a8a80e5
AL
559}
560 /*}}}*/
561// Queue::Startup - Start the worker processes /*{{{*/
562// ---------------------------------------------------------------------
563/* */
564bool pkgAcquire::Queue::Startup()
565{
566 Shutdown();
567
93bf083d
AL
568 URI U(Name);
569 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
0a8a80e5
AL
570 if (Cnf == 0)
571 return false;
572
8267fe24 573 Workers = new Worker(this,Cnf,Owner->Log);
0a8a80e5
AL
574 Owner->Add(Workers);
575 if (Workers->Start() == false)
576 return false;
0a8a80e5 577
5cb5d8dc
AL
578 /* When pipelining we commit 10 items. This needs to change when we
579 added other source retry to have cycle maintain a pipeline depth
580 on its own. */
581 if (Cnf->Pipeline == true)
b185acc2
AL
582 MaxPipeDepth = 10;
583 else
584 MaxPipeDepth = 1;
5cb5d8dc 585
93bf083d 586 return Cycle();
0a8a80e5
AL
587}
588 /*}}}*/
589// Queue::Shutdown - Shutdown the worker processes /*{{{*/
590// ---------------------------------------------------------------------
591/* */
592bool pkgAcquire::Queue::Shutdown()
593{
594 // Delete all of the workers
595 while (Workers != 0)
596 {
597 pkgAcquire::Worker *Jnk = Workers;
598 Workers = Workers->NextQueue;
599 Owner->Remove(Jnk);
600 delete Jnk;
601 }
602
603 return true;
3b5421b4
AL
604}
605 /*}}}*/
7d8afa39 606// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
607// ---------------------------------------------------------------------
608/* */
609pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
610{
611 for (QItem *I = Items; I != 0; I = I->Next)
612 if (I->URI == URI && I->Worker == Owner)
613 return I;
614 return 0;
615}
616 /*}}}*/
617// Queue::ItemDone - Item has been completed /*{{{*/
618// ---------------------------------------------------------------------
619/* The worker signals this which causes the item to be removed from the
93bf083d
AL
620 queue. If this is the last queue instance then it is removed from the
621 main queue too.*/
c88edf1d
AL
622bool pkgAcquire::Queue::ItemDone(QItem *Itm)
623{
b185acc2 624 PipeDepth--;
db890fdb
AL
625 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
626 Itm->Owner->Status = pkgAcquire::Item::StatDone;
627
93bf083d
AL
628 if (Itm->Owner->QueueCounter <= 1)
629 Owner->Dequeue(Itm->Owner);
630 else
631 {
632 Dequeue(Itm->Owner);
633 Owner->Bump();
634 }
c88edf1d 635
93bf083d
AL
636 return Cycle();
637}
638 /*}}}*/
639// Queue::Cycle - Queue new items into the method /*{{{*/
640// ---------------------------------------------------------------------
b185acc2
AL
641/* This locates a new idle item and sends it to the worker. If pipelining
642 is enabled then it keeps the pipe full. */
93bf083d
AL
643bool pkgAcquire::Queue::Cycle()
644{
645 if (Items == 0 || Workers == 0)
c88edf1d
AL
646 return true;
647
e7432370
AL
648 if (PipeDepth < 0)
649 return _error->Error("Pipedepth failure");
650
93bf083d
AL
651 // Look for a queable item
652 QItem *I = Items;
e7432370 653 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
654 {
655 for (; I != 0; I = I->Next)
656 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
657 break;
658
659 // Nothing to do, queue is idle.
660 if (I == 0)
661 return true;
662
663 I->Worker = Workers;
664 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 665 PipeDepth++;
b185acc2
AL
666 if (Workers->QueueItem(I) == false)
667 return false;
668 }
93bf083d 669
b185acc2 670 return true;
c88edf1d
AL
671}
672 /*}}}*/
be4401bf
AL
673// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
674// ---------------------------------------------------------------------
b185acc2 675/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
676void pkgAcquire::Queue::Bump()
677{
b185acc2 678 Cycle();
be4401bf
AL
679}
680 /*}}}*/
b98f2859
AL
681
682// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
683// ---------------------------------------------------------------------
684/* */
685pkgAcquireStatus::pkgAcquireStatus()
686{
687 Start();
688}
689 /*}}}*/
690// AcquireStatus::Pulse - Called periodically /*{{{*/
691// ---------------------------------------------------------------------
692/* This computes some internal state variables for the derived classes to
693 use. It generates the current downloaded bytes and total bytes to download
694 as well as the current CPS estimate. */
024d1123 695bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
696{
697 TotalBytes = 0;
698 CurrentBytes = 0;
d568ed2d
AL
699 TotalItems = 0;
700 CurrentItems = 0;
b98f2859
AL
701
702 // Compute the total number of bytes to fetch
703 unsigned int Unknown = 0;
704 unsigned int Count = 0;
705 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
706 I++, Count++)
707 {
d568ed2d
AL
708 TotalItems++;
709 if ((*I)->Status == pkgAcquire::Item::StatDone)
710 CurrentItems++;
711
a6568219
AL
712 // Totally ignore local items
713 if ((*I)->Local == true)
714 continue;
715
b98f2859
AL
716 TotalBytes += (*I)->FileSize;
717 if ((*I)->Complete == true)
718 CurrentBytes += (*I)->FileSize;
719 if ((*I)->FileSize == 0 && (*I)->Complete == false)
720 Unknown++;
721 }
722
723 // Compute the current completion
aa0e1101 724 unsigned long ResumeSize = 0;
b98f2859
AL
725 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
726 I = Owner->WorkerStep(I))
727 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
728 {
729 CurrentBytes += I->CurrentSize;
730 ResumeSize += I->ResumePoint;
731
732 // Files with unknown size always have 100% completion
733 if (I->CurrentItem->Owner->FileSize == 0 &&
734 I->CurrentItem->Owner->Complete == false)
735 TotalBytes += I->CurrentSize;
736 }
737
b98f2859
AL
738 // Normalize the figures and account for unknown size downloads
739 if (TotalBytes <= 0)
740 TotalBytes = 1;
741 if (Unknown == Count)
742 TotalBytes = Unknown;
b98f2859
AL
743
744 // Compute the CPS
745 struct timeval NewTime;
746 gettimeofday(&NewTime,0);
747 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
748 NewTime.tv_sec - Time.tv_sec > 6)
749 {
f17ac097
AL
750 double Delta = NewTime.tv_sec - Time.tv_sec +
751 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 752
b98f2859 753 // Compute the CPS value
f17ac097 754 if (Delta < 0.01)
e331f6ed
AL
755 CurrentCPS = 0;
756 else
aa0e1101
AL
757 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
758 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 759 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
760 Time = NewTime;
761 }
024d1123
AL
762
763 return true;
b98f2859
AL
764}
765 /*}}}*/
766// AcquireStatus::Start - Called when the download is started /*{{{*/
767// ---------------------------------------------------------------------
768/* We just reset the counters */
769void pkgAcquireStatus::Start()
770{
771 gettimeofday(&Time,0);
772 gettimeofday(&StartTime,0);
773 LastBytes = 0;
774 CurrentCPS = 0;
775 CurrentBytes = 0;
776 TotalBytes = 0;
777 FetchedBytes = 0;
778 ElapsedTime = 0;
d568ed2d
AL
779 TotalItems = 0;
780 CurrentItems = 0;
b98f2859
AL
781}
782 /*}}}*/
a6568219 783// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
784// ---------------------------------------------------------------------
785/* This accurately computes the elapsed time and the total overall CPS. */
786void pkgAcquireStatus::Stop()
787{
788 // Compute the CPS and elapsed time
789 struct timeval NewTime;
790 gettimeofday(&NewTime,0);
791
792 // Compute the delta time with full accuracy
793 long usdiff = NewTime.tv_usec - StartTime.tv_usec;
794 long sdiff = NewTime.tv_sec - StartTime.tv_sec;
795
796 // Borrow
797 if (usdiff < 0)
798 {
799 usdiff += 1000000;
800 sdiff--;
801 }
e331f6ed 802
b98f2859 803 // Compute the CPS value
e331f6ed
AL
804 if (sdiff == 0 && usdiff == 0)
805 CurrentCPS = 0;
806 else
807 CurrentCPS = FetchedBytes/(sdiff + usdiff/1000000.0);
b98f2859
AL
808 LastBytes = CurrentBytes;
809 ElapsedTime = sdiff;
810}
811 /*}}}*/
812// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
813// ---------------------------------------------------------------------
814/* This is used to get accurate final transfer rate reporting. */
815void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 816{
b98f2859
AL
817 FetchedBytes += Size - Resume;
818}
819 /*}}}*/