]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Use the 'binary' target rather than 'all' so that the s...
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
753b3525 3// $Id: acquire.cc,v 1.49 2001/05/27 04:28:37 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6 26#include <apti18n.h>
b4fc9b6f
AL
27
28#include <iostream>
b2e465d6 29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
5f3edc0f 33#include <sys/stat.h>
0118833a
AL
34 /*}}}*/
35
b4fc9b6f
AL
36using namespace std;
37
0118833a
AL
38// Acquire::pkgAcquire - Constructor /*{{{*/
39// ---------------------------------------------------------------------
93bf083d 40/* We grab some runtime state from the configuration space */
8267fe24 41pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
42{
43 Queues = 0;
44 Configs = 0;
0a8a80e5
AL
45 Workers = 0;
46 ToFetch = 0;
8b89e57f 47 Running = false;
0a8a80e5
AL
48
49 string Mode = _config->Find("Acquire::Queue-Mode","host");
50 if (strcasecmp(Mode.c_str(),"host") == 0)
51 QueueMode = QueueHost;
52 if (strcasecmp(Mode.c_str(),"access") == 0)
53 QueueMode = QueueAccess;
54
55 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 56
0c55d1d2 57 // This is really a stupid place for this
5f3edc0f
AL
58 struct stat St;
59 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
60 S_ISDIR(St.st_mode) == 0)
b2e465d6 61 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
62 _config->FindDir("Dir::State::lists").c_str());
63 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
64 S_ISDIR(St.st_mode) == 0)
b2e465d6 65 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 66 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
67}
68 /*}}}*/
69// Acquire::~pkgAcquire - Destructor /*{{{*/
70// ---------------------------------------------------------------------
93bf083d 71/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
72pkgAcquire::~pkgAcquire()
73{
459681d3
AL
74 Shutdown();
75
3b5421b4
AL
76 while (Configs != 0)
77 {
78 MethodConfig *Jnk = Configs;
79 Configs = Configs->Next;
80 delete Jnk;
81 }
281daf46
AL
82}
83 /*}}}*/
8e5fc8f5 84// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
85// ---------------------------------------------------------------------
86/* */
87void pkgAcquire::Shutdown()
88{
89 while (Items.size() != 0)
90 delete Items[0];
0a8a80e5
AL
91
92 while (Queues != 0)
93 {
94 Queue *Jnk = Queues;
95 Queues = Queues->Next;
96 delete Jnk;
97 }
0118833a
AL
98}
99 /*}}}*/
100// Acquire::Add - Add a new item /*{{{*/
101// ---------------------------------------------------------------------
93bf083d
AL
102/* This puts an item on the acquire list. This list is mainly for tracking
103 item status */
0118833a
AL
104void pkgAcquire::Add(Item *Itm)
105{
106 Items.push_back(Itm);
107}
108 /*}}}*/
109// Acquire::Remove - Remove a item /*{{{*/
110// ---------------------------------------------------------------------
93bf083d 111/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
112void pkgAcquire::Remove(Item *Itm)
113{
a3eaf954
AL
114 Dequeue(Itm);
115
753b3525 116 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
117 {
118 if (*I == Itm)
b4fc9b6f 119 {
0118833a 120 Items.erase(I);
b4fc9b6f
AL
121 I = Items.begin();
122 }
753b3525
AL
123 else
124 I++;
8267fe24 125 }
0118833a
AL
126}
127 /*}}}*/
0a8a80e5
AL
128// Acquire::Add - Add a worker /*{{{*/
129// ---------------------------------------------------------------------
93bf083d
AL
130/* A list of workers is kept so that the select loop can direct their FD
131 usage. */
0a8a80e5
AL
132void pkgAcquire::Add(Worker *Work)
133{
134 Work->NextAcquire = Workers;
135 Workers = Work;
136}
137 /*}}}*/
138// Acquire::Remove - Remove a worker /*{{{*/
139// ---------------------------------------------------------------------
93bf083d
AL
140/* A worker has died. This can not be done while the select loop is running
141 as it would require that RunFds could handling a changing list state and
142 it cant.. */
0a8a80e5
AL
143void pkgAcquire::Remove(Worker *Work)
144{
93bf083d
AL
145 if (Running == true)
146 abort();
147
0a8a80e5
AL
148 Worker **I = &Workers;
149 for (; *I != 0;)
150 {
151 if (*I == Work)
152 *I = (*I)->NextAcquire;
153 else
154 I = &(*I)->NextAcquire;
155 }
156}
157 /*}}}*/
0118833a
AL
158// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
159// ---------------------------------------------------------------------
93bf083d 160/* This is the entry point for an item. An item calls this function when
281daf46 161 it is constructed which creates a queue (based on the current queue
93bf083d
AL
162 mode) and puts the item in that queue. If the system is running then
163 the queue might be started. */
8267fe24 164void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 165{
0a8a80e5 166 // Determine which queue to put the item in
e331f6ed
AL
167 const MethodConfig *Config;
168 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
169 if (Name.empty() == true)
170 return;
171
172 // Find the queue structure
173 Queue *I = Queues;
174 for (; I != 0 && I->Name != Name; I = I->Next);
175 if (I == 0)
176 {
177 I = new Queue(Name,this);
178 I->Next = Queues;
179 Queues = I;
93bf083d
AL
180
181 if (Running == true)
182 I->Startup();
0a8a80e5 183 }
bfd22fc0 184
e331f6ed
AL
185 // See if this is a local only URI
186 if (Config->LocalOnly == true && Item.Owner->Complete == false)
187 Item.Owner->Local = true;
8267fe24 188 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
189
190 // Queue it into the named queue
8267fe24 191 I->Enqueue(Item);
0a8a80e5 192 ToFetch++;
93bf083d 193
0a8a80e5
AL
194 // Some trace stuff
195 if (Debug == true)
196 {
8267fe24
AL
197 clog << "Fetching " << Item.URI << endl;
198 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 199 clog << " Queue is: " << Name << endl;
0a8a80e5 200 }
3b5421b4
AL
201}
202 /*}}}*/
0a8a80e5 203// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 204// ---------------------------------------------------------------------
93bf083d
AL
205/* This is called when an item is finished being fetched. It removes it
206 from all the queues */
0a8a80e5
AL
207void pkgAcquire::Dequeue(Item *Itm)
208{
209 Queue *I = Queues;
bfd22fc0 210 bool Res = false;
0a8a80e5 211 for (; I != 0; I = I->Next)
bfd22fc0 212 Res |= I->Dequeue(Itm);
93bf083d
AL
213
214 if (Debug == true)
215 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
216 if (Res == true)
217 ToFetch--;
0a8a80e5
AL
218}
219 /*}}}*/
220// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
221// ---------------------------------------------------------------------
222/* The string returned depends on the configuration settings and the
223 method parameters. Given something like http://foo.org/bar it can
224 return http://foo.org or http */
e331f6ed 225string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 226{
93bf083d
AL
227 URI U(Uri);
228
e331f6ed 229 Config = GetConfig(U.Access);
0a8a80e5
AL
230 if (Config == 0)
231 return string();
232
233 /* Single-Instance methods get exactly one queue per URI. This is
234 also used for the Access queue method */
235 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 236 return U.Access;
93bf083d
AL
237
238 return U.Access + ':' + U.Host;
0118833a
AL
239}
240 /*}}}*/
3b5421b4
AL
241// Acquire::GetConfig - Fetch the configuration information /*{{{*/
242// ---------------------------------------------------------------------
243/* This locates the configuration structure for an access method. If
244 a config structure cannot be found a Worker will be created to
245 retrieve it */
0a8a80e5 246pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
247{
248 // Search for an existing config
249 MethodConfig *Conf;
250 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
251 if (Conf->Access == Access)
252 return Conf;
253
254 // Create the new config class
255 Conf = new MethodConfig;
256 Conf->Access = Access;
257 Conf->Next = Configs;
258 Configs = Conf;
0118833a 259
3b5421b4
AL
260 // Create the worker to fetch the configuration
261 Worker Work(Conf);
262 if (Work.Start() == false)
263 return 0;
264
265 return Conf;
266}
267 /*}}}*/
0a8a80e5
AL
268// Acquire::SetFds - Deal with readable FDs /*{{{*/
269// ---------------------------------------------------------------------
270/* Collect FDs that have activity monitors into the fd sets */
271void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
272{
273 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
274 {
275 if (I->InReady == true && I->InFd >= 0)
276 {
277 if (Fd < I->InFd)
278 Fd = I->InFd;
279 FD_SET(I->InFd,RSet);
280 }
281 if (I->OutReady == true && I->OutFd >= 0)
282 {
283 if (Fd < I->OutFd)
284 Fd = I->OutFd;
285 FD_SET(I->OutFd,WSet);
286 }
287 }
288}
289 /*}}}*/
290// Acquire::RunFds - Deal with active FDs /*{{{*/
291// ---------------------------------------------------------------------
93bf083d
AL
292/* Dispatch active FDs over to the proper workers. It is very important
293 that a worker never be erased while this is running! The queue class
294 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
295void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
296{
297 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
298 {
299 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
300 I->InFdReady();
301 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
302 I->OutFdReady();
303 }
304}
305 /*}}}*/
306// Acquire::Run - Run the fetch sequence /*{{{*/
307// ---------------------------------------------------------------------
308/* This runs the queues. It manages a select loop for all of the
309 Worker tasks. The workers interact with the queues and items to
310 manage the actual fetch. */
024d1123 311pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 312{
8b89e57f
AL
313 Running = true;
314
0a8a80e5
AL
315 for (Queue *I = Queues; I != 0; I = I->Next)
316 I->Startup();
317
b98f2859
AL
318 if (Log != 0)
319 Log->Start();
320
024d1123
AL
321 bool WasCancelled = false;
322
0a8a80e5 323 // Run till all things have been acquired
8267fe24
AL
324 struct timeval tv;
325 tv.tv_sec = 0;
326 tv.tv_usec = 500000;
0a8a80e5
AL
327 while (ToFetch > 0)
328 {
329 fd_set RFds;
330 fd_set WFds;
331 int Highest = 0;
332 FD_ZERO(&RFds);
333 FD_ZERO(&WFds);
334 SetFds(Highest,&RFds,&WFds);
335
b0db36b1
AL
336 int Res;
337 do
338 {
339 Res = select(Highest+1,&RFds,&WFds,0,&tv);
340 }
341 while (Res < 0 && errno == EINTR);
342
8267fe24 343 if (Res < 0)
8b89e57f 344 {
8267fe24
AL
345 _error->Errno("select","Select has failed");
346 break;
8b89e57f 347 }
93bf083d 348
0a8a80e5 349 RunFds(&RFds,&WFds);
93bf083d
AL
350 if (_error->PendingError() == true)
351 break;
8267fe24
AL
352
353 // Timeout, notify the log class
354 if (Res == 0 || (Log != 0 && Log->Update == true))
355 {
356 tv.tv_usec = 500000;
357 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
358 I->Pulse();
024d1123
AL
359 if (Log != 0 && Log->Pulse(this) == false)
360 {
361 WasCancelled = true;
362 break;
363 }
8267fe24 364 }
0a8a80e5 365 }
be4401bf 366
b98f2859
AL
367 if (Log != 0)
368 Log->Stop();
369
be4401bf
AL
370 // Shut down the acquire bits
371 Running = false;
0a8a80e5 372 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 373 I->Shutdown(false);
0a8a80e5 374
ab559b35 375 // Shut down the items
b4fc9b6f 376 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 377 (*I)->Finished();
ab559b35 378
024d1123
AL
379 if (_error->PendingError())
380 return Failed;
381 if (WasCancelled)
382 return Cancelled;
383 return Continue;
93bf083d
AL
384}
385 /*}}}*/
be4401bf 386// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
387// ---------------------------------------------------------------------
388/* This routine bumps idle queues in hopes that they will be able to fetch
389 the dequeued item */
390void pkgAcquire::Bump()
391{
be4401bf
AL
392 for (Queue *I = Queues; I != 0; I = I->Next)
393 I->Bump();
0a8a80e5
AL
394}
395 /*}}}*/
8267fe24
AL
396// Acquire::WorkerStep - Step to the next worker /*{{{*/
397// ---------------------------------------------------------------------
398/* Not inlined to advoid including acquire-worker.h */
399pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
400{
401 return I->NextAcquire;
402};
403 /*}}}*/
a6568219 404// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
405// ---------------------------------------------------------------------
406/* This is a bit simplistic, it looks at every file in the dir and sees
407 if it is part of the download set. */
408bool pkgAcquire::Clean(string Dir)
409{
410 DIR *D = opendir(Dir.c_str());
411 if (D == 0)
b2e465d6 412 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
413
414 string StartDir = SafeGetCWD();
415 if (chdir(Dir.c_str()) != 0)
416 {
417 closedir(D);
b2e465d6 418 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
419 }
420
421 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
422 {
423 // Skip some files..
424 if (strcmp(Dir->d_name,"lock") == 0 ||
425 strcmp(Dir->d_name,"partial") == 0 ||
426 strcmp(Dir->d_name,".") == 0 ||
427 strcmp(Dir->d_name,"..") == 0)
428 continue;
429
430 // Look in the get list
b4fc9b6f 431 ItemCIterator I = Items.begin();
7a7fa5f0
AL
432 for (; I != Items.end(); I++)
433 if (flNotDir((*I)->DestFile) == Dir->d_name)
434 break;
435
436 // Nothing found, nuke it
437 if (I == Items.end())
438 unlink(Dir->d_name);
439 };
440
441 chdir(StartDir.c_str());
442 closedir(D);
443 return true;
444}
445 /*}}}*/
a6568219
AL
446// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
447// ---------------------------------------------------------------------
448/* This is the total number of bytes needed */
b2e465d6 449double pkgAcquire::TotalNeeded()
a6568219 450{
b2e465d6 451 double Total = 0;
b4fc9b6f 452 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
453 Total += (*I)->FileSize;
454 return Total;
455}
456 /*}}}*/
457// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
458// ---------------------------------------------------------------------
459/* This is the number of bytes that is not local */
b2e465d6 460double pkgAcquire::FetchNeeded()
a6568219 461{
b2e465d6 462 double Total = 0;
b4fc9b6f 463 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
464 if ((*I)->Local == false)
465 Total += (*I)->FileSize;
466 return Total;
467}
468 /*}}}*/
6b1ff003
AL
469// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
470// ---------------------------------------------------------------------
471/* This is the number of bytes that is not local */
b2e465d6 472double pkgAcquire::PartialPresent()
6b1ff003 473{
b2e465d6 474 double Total = 0;
b4fc9b6f 475 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
476 if ((*I)->Local == false)
477 Total += (*I)->PartialSize;
478 return Total;
479}
480 /*}}}*/
8e5fc8f5 481// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
482// ---------------------------------------------------------------------
483/* */
484pkgAcquire::UriIterator pkgAcquire::UriBegin()
485{
486 return UriIterator(Queues);
487}
488 /*}}}*/
8e5fc8f5 489// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
490// ---------------------------------------------------------------------
491/* */
492pkgAcquire::UriIterator pkgAcquire::UriEnd()
493{
494 return UriIterator(0);
495}
496 /*}}}*/
0a8a80e5 497
e331f6ed
AL
498// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
499// ---------------------------------------------------------------------
500/* */
501pkgAcquire::MethodConfig::MethodConfig()
502{
503 SingleInstance = false;
e331f6ed
AL
504 Pipeline = false;
505 SendConfig = false;
506 LocalOnly = false;
459681d3 507 Removable = false;
e331f6ed
AL
508 Next = 0;
509}
510 /*}}}*/
511
0a8a80e5
AL
512// Queue::Queue - Constructor /*{{{*/
513// ---------------------------------------------------------------------
514/* */
515pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
516 Owner(Owner)
517{
518 Items = 0;
519 Next = 0;
520 Workers = 0;
b185acc2
AL
521 MaxPipeDepth = 1;
522 PipeDepth = 0;
0a8a80e5
AL
523}
524 /*}}}*/
525// Queue::~Queue - Destructor /*{{{*/
526// ---------------------------------------------------------------------
527/* */
528pkgAcquire::Queue::~Queue()
529{
8e5fc8f5 530 Shutdown(true);
0a8a80e5
AL
531
532 while (Items != 0)
533 {
534 QItem *Jnk = Items;
535 Items = Items->Next;
536 delete Jnk;
537 }
538}
539 /*}}}*/
540// Queue::Enqueue - Queue an item to the queue /*{{{*/
541// ---------------------------------------------------------------------
542/* */
8267fe24 543void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 544{
7a1b1f8b
AL
545 QItem **I = &Items;
546 for (; *I != 0; I = &(*I)->Next);
547
0a8a80e5 548 // Create a new item
7a1b1f8b
AL
549 QItem *Itm = new QItem;
550 *Itm = Item;
551 Itm->Next = 0;
552 *I = Itm;
0a8a80e5 553
8267fe24 554 Item.Owner->QueueCounter++;
93bf083d
AL
555 if (Items->Next == 0)
556 Cycle();
0a8a80e5
AL
557}
558 /*}}}*/
c88edf1d 559// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 560// ---------------------------------------------------------------------
b185acc2 561/* We return true if we hit something */
bfd22fc0 562bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 563{
b185acc2
AL
564 if (Owner->Status == pkgAcquire::Item::StatFetching)
565 return _error->Error("Tried to dequeue a fetching object");
566
bfd22fc0
AL
567 bool Res = false;
568
0a8a80e5
AL
569 QItem **I = &Items;
570 for (; *I != 0;)
571 {
572 if ((*I)->Owner == Owner)
573 {
574 QItem *Jnk= *I;
575 *I = (*I)->Next;
576 Owner->QueueCounter--;
577 delete Jnk;
bfd22fc0 578 Res = true;
0a8a80e5
AL
579 }
580 else
581 I = &(*I)->Next;
582 }
bfd22fc0
AL
583
584 return Res;
0a8a80e5
AL
585}
586 /*}}}*/
587// Queue::Startup - Start the worker processes /*{{{*/
588// ---------------------------------------------------------------------
8e5fc8f5
AL
589/* It is possible for this to be called with a pre-existing set of
590 workers. */
0a8a80e5
AL
591bool pkgAcquire::Queue::Startup()
592{
8e5fc8f5
AL
593 if (Workers == 0)
594 {
595 URI U(Name);
596 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
597 if (Cnf == 0)
598 return false;
599
600 Workers = new Worker(this,Cnf,Owner->Log);
601 Owner->Add(Workers);
602 if (Workers->Start() == false)
603 return false;
604
605 /* When pipelining we commit 10 items. This needs to change when we
606 added other source retry to have cycle maintain a pipeline depth
607 on its own. */
608 if (Cnf->Pipeline == true)
609 MaxPipeDepth = 10;
610 else
611 MaxPipeDepth = 1;
612 }
5cb5d8dc 613
93bf083d 614 return Cycle();
0a8a80e5
AL
615}
616 /*}}}*/
617// Queue::Shutdown - Shutdown the worker processes /*{{{*/
618// ---------------------------------------------------------------------
8e5fc8f5
AL
619/* If final is true then all workers are eliminated, otherwise only workers
620 that do not need cleanup are removed */
621bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
622{
623 // Delete all of the workers
8e5fc8f5
AL
624 pkgAcquire::Worker **Cur = &Workers;
625 while (*Cur != 0)
0a8a80e5 626 {
8e5fc8f5
AL
627 pkgAcquire::Worker *Jnk = *Cur;
628 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
629 {
630 *Cur = Jnk->NextQueue;
631 Owner->Remove(Jnk);
632 delete Jnk;
633 }
634 else
635 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
636 }
637
638 return true;
3b5421b4
AL
639}
640 /*}}}*/
7d8afa39 641// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
642// ---------------------------------------------------------------------
643/* */
644pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
645{
646 for (QItem *I = Items; I != 0; I = I->Next)
647 if (I->URI == URI && I->Worker == Owner)
648 return I;
649 return 0;
650}
651 /*}}}*/
652// Queue::ItemDone - Item has been completed /*{{{*/
653// ---------------------------------------------------------------------
654/* The worker signals this which causes the item to be removed from the
93bf083d
AL
655 queue. If this is the last queue instance then it is removed from the
656 main queue too.*/
c88edf1d
AL
657bool pkgAcquire::Queue::ItemDone(QItem *Itm)
658{
b185acc2 659 PipeDepth--;
db890fdb
AL
660 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
661 Itm->Owner->Status = pkgAcquire::Item::StatDone;
662
93bf083d
AL
663 if (Itm->Owner->QueueCounter <= 1)
664 Owner->Dequeue(Itm->Owner);
665 else
666 {
667 Dequeue(Itm->Owner);
668 Owner->Bump();
669 }
c88edf1d 670
93bf083d
AL
671 return Cycle();
672}
673 /*}}}*/
674// Queue::Cycle - Queue new items into the method /*{{{*/
675// ---------------------------------------------------------------------
b185acc2
AL
676/* This locates a new idle item and sends it to the worker. If pipelining
677 is enabled then it keeps the pipe full. */
93bf083d
AL
678bool pkgAcquire::Queue::Cycle()
679{
680 if (Items == 0 || Workers == 0)
c88edf1d
AL
681 return true;
682
e7432370
AL
683 if (PipeDepth < 0)
684 return _error->Error("Pipedepth failure");
685
93bf083d
AL
686 // Look for a queable item
687 QItem *I = Items;
e7432370 688 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
689 {
690 for (; I != 0; I = I->Next)
691 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
692 break;
693
694 // Nothing to do, queue is idle.
695 if (I == 0)
696 return true;
697
698 I->Worker = Workers;
699 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 700 PipeDepth++;
b185acc2
AL
701 if (Workers->QueueItem(I) == false)
702 return false;
703 }
93bf083d 704
b185acc2 705 return true;
c88edf1d
AL
706}
707 /*}}}*/
be4401bf
AL
708// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
709// ---------------------------------------------------------------------
b185acc2 710/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
711void pkgAcquire::Queue::Bump()
712{
b185acc2 713 Cycle();
be4401bf
AL
714}
715 /*}}}*/
b98f2859
AL
716
717// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
718// ---------------------------------------------------------------------
719/* */
c5ccf175 720pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
721{
722 Start();
723}
724 /*}}}*/
725// AcquireStatus::Pulse - Called periodically /*{{{*/
726// ---------------------------------------------------------------------
727/* This computes some internal state variables for the derived classes to
728 use. It generates the current downloaded bytes and total bytes to download
729 as well as the current CPS estimate. */
024d1123 730bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
731{
732 TotalBytes = 0;
733 CurrentBytes = 0;
d568ed2d
AL
734 TotalItems = 0;
735 CurrentItems = 0;
b98f2859
AL
736
737 // Compute the total number of bytes to fetch
738 unsigned int Unknown = 0;
739 unsigned int Count = 0;
b4fc9b6f 740 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
741 I++, Count++)
742 {
d568ed2d
AL
743 TotalItems++;
744 if ((*I)->Status == pkgAcquire::Item::StatDone)
745 CurrentItems++;
746
a6568219
AL
747 // Totally ignore local items
748 if ((*I)->Local == true)
749 continue;
b2e465d6 750
b98f2859
AL
751 TotalBytes += (*I)->FileSize;
752 if ((*I)->Complete == true)
753 CurrentBytes += (*I)->FileSize;
754 if ((*I)->FileSize == 0 && (*I)->Complete == false)
755 Unknown++;
756 }
757
758 // Compute the current completion
aa0e1101 759 unsigned long ResumeSize = 0;
b98f2859
AL
760 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
761 I = Owner->WorkerStep(I))
762 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
763 {
764 CurrentBytes += I->CurrentSize;
765 ResumeSize += I->ResumePoint;
766
767 // Files with unknown size always have 100% completion
768 if (I->CurrentItem->Owner->FileSize == 0 &&
769 I->CurrentItem->Owner->Complete == false)
770 TotalBytes += I->CurrentSize;
771 }
772
b98f2859
AL
773 // Normalize the figures and account for unknown size downloads
774 if (TotalBytes <= 0)
775 TotalBytes = 1;
776 if (Unknown == Count)
777 TotalBytes = Unknown;
18ef0a78
AL
778
779 // Wha?! Is not supposed to happen.
780 if (CurrentBytes > TotalBytes)
781 CurrentBytes = TotalBytes;
b98f2859
AL
782
783 // Compute the CPS
784 struct timeval NewTime;
785 gettimeofday(&NewTime,0);
786 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
787 NewTime.tv_sec - Time.tv_sec > 6)
788 {
f17ac097
AL
789 double Delta = NewTime.tv_sec - Time.tv_sec +
790 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 791
b98f2859 792 // Compute the CPS value
f17ac097 793 if (Delta < 0.01)
e331f6ed
AL
794 CurrentCPS = 0;
795 else
aa0e1101
AL
796 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
797 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 798 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
799 Time = NewTime;
800 }
024d1123
AL
801
802 return true;
b98f2859
AL
803}
804 /*}}}*/
805// AcquireStatus::Start - Called when the download is started /*{{{*/
806// ---------------------------------------------------------------------
807/* We just reset the counters */
808void pkgAcquireStatus::Start()
809{
810 gettimeofday(&Time,0);
811 gettimeofday(&StartTime,0);
812 LastBytes = 0;
813 CurrentCPS = 0;
814 CurrentBytes = 0;
815 TotalBytes = 0;
816 FetchedBytes = 0;
817 ElapsedTime = 0;
d568ed2d
AL
818 TotalItems = 0;
819 CurrentItems = 0;
b98f2859
AL
820}
821 /*}}}*/
a6568219 822// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
823// ---------------------------------------------------------------------
824/* This accurately computes the elapsed time and the total overall CPS. */
825void pkgAcquireStatus::Stop()
826{
827 // Compute the CPS and elapsed time
828 struct timeval NewTime;
829 gettimeofday(&NewTime,0);
830
31a0531d
AL
831 double Delta = NewTime.tv_sec - StartTime.tv_sec +
832 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 833
b98f2859 834 // Compute the CPS value
31a0531d 835 if (Delta < 0.01)
e331f6ed
AL
836 CurrentCPS = 0;
837 else
31a0531d 838 CurrentCPS = FetchedBytes/Delta;
b98f2859 839 LastBytes = CurrentBytes;
31a0531d 840 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
841}
842 /*}}}*/
843// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
844// ---------------------------------------------------------------------
845/* This is used to get accurate final transfer rate reporting. */
846void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 847{
b98f2859
AL
848 FetchedBytes += Size - Resume;
849}
850 /*}}}*/