]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
* moved most of the real work into depcache::writeStateFile
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6 26#include <apti18n.h>
b4fc9b6f
AL
27
28#include <iostream>
b2e465d6 29
7a7fa5f0 30#include <dirent.h>
8267fe24 31#include <sys/time.h>
524f8105 32#include <errno.h>
5f3edc0f 33#include <sys/stat.h>
0118833a
AL
34 /*}}}*/
35
b4fc9b6f
AL
36using namespace std;
37
0118833a
AL
38// Acquire::pkgAcquire - Constructor /*{{{*/
39// ---------------------------------------------------------------------
93bf083d 40/* We grab some runtime state from the configuration space */
8267fe24 41pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
42{
43 Queues = 0;
44 Configs = 0;
0a8a80e5
AL
45 Workers = 0;
46 ToFetch = 0;
8b89e57f 47 Running = false;
0a8a80e5
AL
48
49 string Mode = _config->Find("Acquire::Queue-Mode","host");
50 if (strcasecmp(Mode.c_str(),"host") == 0)
51 QueueMode = QueueHost;
52 if (strcasecmp(Mode.c_str(),"access") == 0)
53 QueueMode = QueueAccess;
54
55 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 56
0c55d1d2 57 // This is really a stupid place for this
5f3edc0f
AL
58 struct stat St;
59 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
60 S_ISDIR(St.st_mode) == 0)
b2e465d6 61 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
62 _config->FindDir("Dir::State::lists").c_str());
63 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
64 S_ISDIR(St.st_mode) == 0)
b2e465d6 65 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 66 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
67}
68 /*}}}*/
69// Acquire::~pkgAcquire - Destructor /*{{{*/
70// ---------------------------------------------------------------------
93bf083d 71/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
72pkgAcquire::~pkgAcquire()
73{
459681d3
AL
74 Shutdown();
75
3b5421b4
AL
76 while (Configs != 0)
77 {
78 MethodConfig *Jnk = Configs;
79 Configs = Configs->Next;
80 delete Jnk;
81 }
281daf46
AL
82}
83 /*}}}*/
8e5fc8f5 84// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
85// ---------------------------------------------------------------------
86/* */
87void pkgAcquire::Shutdown()
88{
89 while (Items.size() != 0)
1b480911
AL
90 {
91 if (Items[0]->Status == Item::StatFetching)
92 Items[0]->Status = Item::StatError;
281daf46 93 delete Items[0];
1b480911 94 }
0a8a80e5
AL
95
96 while (Queues != 0)
97 {
98 Queue *Jnk = Queues;
99 Queues = Queues->Next;
100 delete Jnk;
101 }
0118833a
AL
102}
103 /*}}}*/
104// Acquire::Add - Add a new item /*{{{*/
105// ---------------------------------------------------------------------
93bf083d
AL
106/* This puts an item on the acquire list. This list is mainly for tracking
107 item status */
0118833a
AL
108void pkgAcquire::Add(Item *Itm)
109{
110 Items.push_back(Itm);
111}
112 /*}}}*/
113// Acquire::Remove - Remove a item /*{{{*/
114// ---------------------------------------------------------------------
93bf083d 115/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
116void pkgAcquire::Remove(Item *Itm)
117{
a3eaf954
AL
118 Dequeue(Itm);
119
753b3525 120 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
121 {
122 if (*I == Itm)
b4fc9b6f 123 {
0118833a 124 Items.erase(I);
b4fc9b6f
AL
125 I = Items.begin();
126 }
753b3525
AL
127 else
128 I++;
8267fe24 129 }
0118833a
AL
130}
131 /*}}}*/
0a8a80e5
AL
132// Acquire::Add - Add a worker /*{{{*/
133// ---------------------------------------------------------------------
93bf083d
AL
134/* A list of workers is kept so that the select loop can direct their FD
135 usage. */
0a8a80e5
AL
136void pkgAcquire::Add(Worker *Work)
137{
138 Work->NextAcquire = Workers;
139 Workers = Work;
140}
141 /*}}}*/
142// Acquire::Remove - Remove a worker /*{{{*/
143// ---------------------------------------------------------------------
93bf083d
AL
144/* A worker has died. This can not be done while the select loop is running
145 as it would require that RunFds could handling a changing list state and
146 it cant.. */
0a8a80e5
AL
147void pkgAcquire::Remove(Worker *Work)
148{
93bf083d
AL
149 if (Running == true)
150 abort();
151
0a8a80e5
AL
152 Worker **I = &Workers;
153 for (; *I != 0;)
154 {
155 if (*I == Work)
156 *I = (*I)->NextAcquire;
157 else
158 I = &(*I)->NextAcquire;
159 }
160}
161 /*}}}*/
0118833a
AL
162// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
163// ---------------------------------------------------------------------
93bf083d 164/* This is the entry point for an item. An item calls this function when
281daf46 165 it is constructed which creates a queue (based on the current queue
93bf083d
AL
166 mode) and puts the item in that queue. If the system is running then
167 the queue might be started. */
8267fe24 168void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 169{
0a8a80e5 170 // Determine which queue to put the item in
e331f6ed
AL
171 const MethodConfig *Config;
172 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
173 if (Name.empty() == true)
174 return;
175
176 // Find the queue structure
177 Queue *I = Queues;
178 for (; I != 0 && I->Name != Name; I = I->Next);
179 if (I == 0)
180 {
181 I = new Queue(Name,this);
182 I->Next = Queues;
183 Queues = I;
93bf083d
AL
184
185 if (Running == true)
186 I->Startup();
0a8a80e5 187 }
bfd22fc0 188
e331f6ed
AL
189 // See if this is a local only URI
190 if (Config->LocalOnly == true && Item.Owner->Complete == false)
191 Item.Owner->Local = true;
8267fe24 192 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
193
194 // Queue it into the named queue
8267fe24 195 I->Enqueue(Item);
0a8a80e5 196 ToFetch++;
93bf083d 197
0a8a80e5
AL
198 // Some trace stuff
199 if (Debug == true)
200 {
8267fe24
AL
201 clog << "Fetching " << Item.URI << endl;
202 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 203 clog << " Queue is: " << Name << endl;
0a8a80e5 204 }
3b5421b4
AL
205}
206 /*}}}*/
0a8a80e5 207// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 208// ---------------------------------------------------------------------
93bf083d
AL
209/* This is called when an item is finished being fetched. It removes it
210 from all the queues */
0a8a80e5
AL
211void pkgAcquire::Dequeue(Item *Itm)
212{
213 Queue *I = Queues;
bfd22fc0 214 bool Res = false;
0a8a80e5 215 for (; I != 0; I = I->Next)
bfd22fc0 216 Res |= I->Dequeue(Itm);
93bf083d
AL
217
218 if (Debug == true)
219 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
220 if (Res == true)
221 ToFetch--;
0a8a80e5
AL
222}
223 /*}}}*/
224// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
225// ---------------------------------------------------------------------
226/* The string returned depends on the configuration settings and the
227 method parameters. Given something like http://foo.org/bar it can
228 return http://foo.org or http */
e331f6ed 229string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 230{
93bf083d
AL
231 URI U(Uri);
232
e331f6ed 233 Config = GetConfig(U.Access);
0a8a80e5
AL
234 if (Config == 0)
235 return string();
236
237 /* Single-Instance methods get exactly one queue per URI. This is
238 also used for the Access queue method */
239 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 240 return U.Access;
93bf083d
AL
241
242 return U.Access + ':' + U.Host;
0118833a
AL
243}
244 /*}}}*/
3b5421b4
AL
245// Acquire::GetConfig - Fetch the configuration information /*{{{*/
246// ---------------------------------------------------------------------
247/* This locates the configuration structure for an access method. If
248 a config structure cannot be found a Worker will be created to
249 retrieve it */
0a8a80e5 250pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
251{
252 // Search for an existing config
253 MethodConfig *Conf;
254 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
255 if (Conf->Access == Access)
256 return Conf;
257
258 // Create the new config class
259 Conf = new MethodConfig;
260 Conf->Access = Access;
261 Conf->Next = Configs;
262 Configs = Conf;
0118833a 263
3b5421b4
AL
264 // Create the worker to fetch the configuration
265 Worker Work(Conf);
266 if (Work.Start() == false)
267 return 0;
268
269 return Conf;
270}
271 /*}}}*/
0a8a80e5
AL
272// Acquire::SetFds - Deal with readable FDs /*{{{*/
273// ---------------------------------------------------------------------
274/* Collect FDs that have activity monitors into the fd sets */
275void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
276{
277 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
278 {
279 if (I->InReady == true && I->InFd >= 0)
280 {
281 if (Fd < I->InFd)
282 Fd = I->InFd;
283 FD_SET(I->InFd,RSet);
284 }
285 if (I->OutReady == true && I->OutFd >= 0)
286 {
287 if (Fd < I->OutFd)
288 Fd = I->OutFd;
289 FD_SET(I->OutFd,WSet);
290 }
291 }
292}
293 /*}}}*/
294// Acquire::RunFds - Deal with active FDs /*{{{*/
295// ---------------------------------------------------------------------
93bf083d
AL
296/* Dispatch active FDs over to the proper workers. It is very important
297 that a worker never be erased while this is running! The queue class
298 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
299void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
300{
301 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
302 {
303 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
304 I->InFdReady();
305 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
306 I->OutFdReady();
307 }
308}
309 /*}}}*/
310// Acquire::Run - Run the fetch sequence /*{{{*/
311// ---------------------------------------------------------------------
312/* This runs the queues. It manages a select loop for all of the
313 Worker tasks. The workers interact with the queues and items to
314 manage the actual fetch. */
1c5f7e5f 315pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 316{
8b89e57f
AL
317 Running = true;
318
0a8a80e5
AL
319 for (Queue *I = Queues; I != 0; I = I->Next)
320 I->Startup();
321
b98f2859
AL
322 if (Log != 0)
323 Log->Start();
324
024d1123
AL
325 bool WasCancelled = false;
326
0a8a80e5 327 // Run till all things have been acquired
8267fe24
AL
328 struct timeval tv;
329 tv.tv_sec = 0;
1c5f7e5f 330 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
331 while (ToFetch > 0)
332 {
333 fd_set RFds;
334 fd_set WFds;
335 int Highest = 0;
336 FD_ZERO(&RFds);
337 FD_ZERO(&WFds);
338 SetFds(Highest,&RFds,&WFds);
339
b0db36b1
AL
340 int Res;
341 do
342 {
343 Res = select(Highest+1,&RFds,&WFds,0,&tv);
344 }
345 while (Res < 0 && errno == EINTR);
346
8267fe24 347 if (Res < 0)
8b89e57f 348 {
8267fe24
AL
349 _error->Errno("select","Select has failed");
350 break;
8b89e57f 351 }
93bf083d 352
0a8a80e5 353 RunFds(&RFds,&WFds);
93bf083d
AL
354 if (_error->PendingError() == true)
355 break;
8267fe24
AL
356
357 // Timeout, notify the log class
358 if (Res == 0 || (Log != 0 && Log->Update == true))
359 {
1c5f7e5f 360 tv.tv_usec = PulseIntervall;
8267fe24
AL
361 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
362 I->Pulse();
024d1123
AL
363 if (Log != 0 && Log->Pulse(this) == false)
364 {
365 WasCancelled = true;
366 break;
367 }
8267fe24 368 }
0a8a80e5 369 }
be4401bf 370
b98f2859
AL
371 if (Log != 0)
372 Log->Stop();
373
be4401bf
AL
374 // Shut down the acquire bits
375 Running = false;
0a8a80e5 376 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 377 I->Shutdown(false);
0a8a80e5 378
ab559b35 379 // Shut down the items
b4fc9b6f 380 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 381 (*I)->Finished();
ab559b35 382
024d1123
AL
383 if (_error->PendingError())
384 return Failed;
385 if (WasCancelled)
386 return Cancelled;
387 return Continue;
93bf083d
AL
388}
389 /*}}}*/
be4401bf 390// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
391// ---------------------------------------------------------------------
392/* This routine bumps idle queues in hopes that they will be able to fetch
393 the dequeued item */
394void pkgAcquire::Bump()
395{
be4401bf
AL
396 for (Queue *I = Queues; I != 0; I = I->Next)
397 I->Bump();
0a8a80e5
AL
398}
399 /*}}}*/
8267fe24
AL
400// Acquire::WorkerStep - Step to the next worker /*{{{*/
401// ---------------------------------------------------------------------
402/* Not inlined to advoid including acquire-worker.h */
403pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
404{
405 return I->NextAcquire;
406};
407 /*}}}*/
a6568219 408// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
409// ---------------------------------------------------------------------
410/* This is a bit simplistic, it looks at every file in the dir and sees
411 if it is part of the download set. */
412bool pkgAcquire::Clean(string Dir)
413{
414 DIR *D = opendir(Dir.c_str());
415 if (D == 0)
b2e465d6 416 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
417
418 string StartDir = SafeGetCWD();
419 if (chdir(Dir.c_str()) != 0)
420 {
421 closedir(D);
b2e465d6 422 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
423 }
424
425 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
426 {
427 // Skip some files..
428 if (strcmp(Dir->d_name,"lock") == 0 ||
429 strcmp(Dir->d_name,"partial") == 0 ||
430 strcmp(Dir->d_name,".") == 0 ||
431 strcmp(Dir->d_name,"..") == 0)
432 continue;
433
434 // Look in the get list
b4fc9b6f 435 ItemCIterator I = Items.begin();
7a7fa5f0
AL
436 for (; I != Items.end(); I++)
437 if (flNotDir((*I)->DestFile) == Dir->d_name)
438 break;
439
440 // Nothing found, nuke it
441 if (I == Items.end())
442 unlink(Dir->d_name);
443 };
444
445 chdir(StartDir.c_str());
446 closedir(D);
447 return true;
448}
449 /*}}}*/
a6568219
AL
450// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
451// ---------------------------------------------------------------------
452/* This is the total number of bytes needed */
b2e465d6 453double pkgAcquire::TotalNeeded()
a6568219 454{
b2e465d6 455 double Total = 0;
b4fc9b6f 456 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
457 Total += (*I)->FileSize;
458 return Total;
459}
460 /*}}}*/
461// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
462// ---------------------------------------------------------------------
463/* This is the number of bytes that is not local */
b2e465d6 464double pkgAcquire::FetchNeeded()
a6568219 465{
b2e465d6 466 double Total = 0;
b4fc9b6f 467 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
468 if ((*I)->Local == false)
469 Total += (*I)->FileSize;
470 return Total;
471}
472 /*}}}*/
6b1ff003
AL
473// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
474// ---------------------------------------------------------------------
475/* This is the number of bytes that is not local */
b2e465d6 476double pkgAcquire::PartialPresent()
6b1ff003 477{
b2e465d6 478 double Total = 0;
b4fc9b6f 479 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
480 if ((*I)->Local == false)
481 Total += (*I)->PartialSize;
482 return Total;
483}
b3d44315 484
8e5fc8f5 485// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
486// ---------------------------------------------------------------------
487/* */
488pkgAcquire::UriIterator pkgAcquire::UriBegin()
489{
490 return UriIterator(Queues);
491}
492 /*}}}*/
8e5fc8f5 493// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
494// ---------------------------------------------------------------------
495/* */
496pkgAcquire::UriIterator pkgAcquire::UriEnd()
497{
498 return UriIterator(0);
499}
500 /*}}}*/
0a8a80e5 501
e331f6ed
AL
502// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
503// ---------------------------------------------------------------------
504/* */
505pkgAcquire::MethodConfig::MethodConfig()
506{
507 SingleInstance = false;
e331f6ed
AL
508 Pipeline = false;
509 SendConfig = false;
510 LocalOnly = false;
459681d3 511 Removable = false;
e331f6ed
AL
512 Next = 0;
513}
514 /*}}}*/
515
0a8a80e5
AL
516// Queue::Queue - Constructor /*{{{*/
517// ---------------------------------------------------------------------
518/* */
519pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
520 Owner(Owner)
521{
522 Items = 0;
523 Next = 0;
524 Workers = 0;
b185acc2
AL
525 MaxPipeDepth = 1;
526 PipeDepth = 0;
0a8a80e5
AL
527}
528 /*}}}*/
529// Queue::~Queue - Destructor /*{{{*/
530// ---------------------------------------------------------------------
531/* */
532pkgAcquire::Queue::~Queue()
533{
8e5fc8f5 534 Shutdown(true);
0a8a80e5
AL
535
536 while (Items != 0)
537 {
538 QItem *Jnk = Items;
539 Items = Items->Next;
540 delete Jnk;
541 }
542}
543 /*}}}*/
544// Queue::Enqueue - Queue an item to the queue /*{{{*/
545// ---------------------------------------------------------------------
546/* */
8267fe24 547void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 548{
7a1b1f8b
AL
549 QItem **I = &Items;
550 for (; *I != 0; I = &(*I)->Next);
551
0a8a80e5 552 // Create a new item
7a1b1f8b
AL
553 QItem *Itm = new QItem;
554 *Itm = Item;
555 Itm->Next = 0;
556 *I = Itm;
0a8a80e5 557
8267fe24 558 Item.Owner->QueueCounter++;
93bf083d
AL
559 if (Items->Next == 0)
560 Cycle();
0a8a80e5
AL
561}
562 /*}}}*/
c88edf1d 563// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 564// ---------------------------------------------------------------------
b185acc2 565/* We return true if we hit something */
bfd22fc0 566bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 567{
b185acc2
AL
568 if (Owner->Status == pkgAcquire::Item::StatFetching)
569 return _error->Error("Tried to dequeue a fetching object");
570
bfd22fc0
AL
571 bool Res = false;
572
0a8a80e5
AL
573 QItem **I = &Items;
574 for (; *I != 0;)
575 {
576 if ((*I)->Owner == Owner)
577 {
578 QItem *Jnk= *I;
579 *I = (*I)->Next;
580 Owner->QueueCounter--;
581 delete Jnk;
bfd22fc0 582 Res = true;
0a8a80e5
AL
583 }
584 else
585 I = &(*I)->Next;
586 }
bfd22fc0
AL
587
588 return Res;
0a8a80e5
AL
589}
590 /*}}}*/
591// Queue::Startup - Start the worker processes /*{{{*/
592// ---------------------------------------------------------------------
8e5fc8f5
AL
593/* It is possible for this to be called with a pre-existing set of
594 workers. */
0a8a80e5
AL
595bool pkgAcquire::Queue::Startup()
596{
8e5fc8f5
AL
597 if (Workers == 0)
598 {
599 URI U(Name);
600 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
601 if (Cnf == 0)
602 return false;
603
604 Workers = new Worker(this,Cnf,Owner->Log);
605 Owner->Add(Workers);
606 if (Workers->Start() == false)
607 return false;
608
609 /* When pipelining we commit 10 items. This needs to change when we
610 added other source retry to have cycle maintain a pipeline depth
611 on its own. */
612 if (Cnf->Pipeline == true)
613 MaxPipeDepth = 10;
614 else
615 MaxPipeDepth = 1;
616 }
5cb5d8dc 617
93bf083d 618 return Cycle();
0a8a80e5
AL
619}
620 /*}}}*/
621// Queue::Shutdown - Shutdown the worker processes /*{{{*/
622// ---------------------------------------------------------------------
8e5fc8f5
AL
623/* If final is true then all workers are eliminated, otherwise only workers
624 that do not need cleanup are removed */
625bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
626{
627 // Delete all of the workers
8e5fc8f5
AL
628 pkgAcquire::Worker **Cur = &Workers;
629 while (*Cur != 0)
0a8a80e5 630 {
8e5fc8f5
AL
631 pkgAcquire::Worker *Jnk = *Cur;
632 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
633 {
634 *Cur = Jnk->NextQueue;
635 Owner->Remove(Jnk);
636 delete Jnk;
637 }
638 else
639 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
640 }
641
642 return true;
3b5421b4
AL
643}
644 /*}}}*/
7d8afa39 645// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
646// ---------------------------------------------------------------------
647/* */
648pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
649{
650 for (QItem *I = Items; I != 0; I = I->Next)
651 if (I->URI == URI && I->Worker == Owner)
652 return I;
653 return 0;
654}
655 /*}}}*/
656// Queue::ItemDone - Item has been completed /*{{{*/
657// ---------------------------------------------------------------------
658/* The worker signals this which causes the item to be removed from the
93bf083d
AL
659 queue. If this is the last queue instance then it is removed from the
660 main queue too.*/
c88edf1d
AL
661bool pkgAcquire::Queue::ItemDone(QItem *Itm)
662{
b185acc2 663 PipeDepth--;
db890fdb
AL
664 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
665 Itm->Owner->Status = pkgAcquire::Item::StatDone;
666
93bf083d
AL
667 if (Itm->Owner->QueueCounter <= 1)
668 Owner->Dequeue(Itm->Owner);
669 else
670 {
671 Dequeue(Itm->Owner);
672 Owner->Bump();
673 }
c88edf1d 674
93bf083d
AL
675 return Cycle();
676}
677 /*}}}*/
678// Queue::Cycle - Queue new items into the method /*{{{*/
679// ---------------------------------------------------------------------
b185acc2
AL
680/* This locates a new idle item and sends it to the worker. If pipelining
681 is enabled then it keeps the pipe full. */
93bf083d
AL
682bool pkgAcquire::Queue::Cycle()
683{
684 if (Items == 0 || Workers == 0)
c88edf1d
AL
685 return true;
686
e7432370
AL
687 if (PipeDepth < 0)
688 return _error->Error("Pipedepth failure");
689
93bf083d
AL
690 // Look for a queable item
691 QItem *I = Items;
e7432370 692 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
693 {
694 for (; I != 0; I = I->Next)
695 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
696 break;
697
698 // Nothing to do, queue is idle.
699 if (I == 0)
700 return true;
701
702 I->Worker = Workers;
703 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 704 PipeDepth++;
b185acc2
AL
705 if (Workers->QueueItem(I) == false)
706 return false;
707 }
93bf083d 708
b185acc2 709 return true;
c88edf1d
AL
710}
711 /*}}}*/
be4401bf
AL
712// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
713// ---------------------------------------------------------------------
b185acc2 714/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
715void pkgAcquire::Queue::Bump()
716{
b185acc2 717 Cycle();
be4401bf
AL
718}
719 /*}}}*/
b98f2859
AL
720
721// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
722// ---------------------------------------------------------------------
723/* */
c5ccf175 724pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
725{
726 Start();
727}
728 /*}}}*/
729// AcquireStatus::Pulse - Called periodically /*{{{*/
730// ---------------------------------------------------------------------
731/* This computes some internal state variables for the derived classes to
732 use. It generates the current downloaded bytes and total bytes to download
733 as well as the current CPS estimate. */
024d1123 734bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
735{
736 TotalBytes = 0;
737 CurrentBytes = 0;
d568ed2d
AL
738 TotalItems = 0;
739 CurrentItems = 0;
b98f2859
AL
740
741 // Compute the total number of bytes to fetch
742 unsigned int Unknown = 0;
743 unsigned int Count = 0;
b4fc9b6f 744 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
745 I++, Count++)
746 {
d568ed2d
AL
747 TotalItems++;
748 if ((*I)->Status == pkgAcquire::Item::StatDone)
749 CurrentItems++;
750
a6568219
AL
751 // Totally ignore local items
752 if ((*I)->Local == true)
753 continue;
b2e465d6 754
b98f2859
AL
755 TotalBytes += (*I)->FileSize;
756 if ((*I)->Complete == true)
757 CurrentBytes += (*I)->FileSize;
758 if ((*I)->FileSize == 0 && (*I)->Complete == false)
759 Unknown++;
760 }
761
762 // Compute the current completion
aa0e1101 763 unsigned long ResumeSize = 0;
b98f2859
AL
764 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
765 I = Owner->WorkerStep(I))
766 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
767 {
768 CurrentBytes += I->CurrentSize;
769 ResumeSize += I->ResumePoint;
770
771 // Files with unknown size always have 100% completion
772 if (I->CurrentItem->Owner->FileSize == 0 &&
773 I->CurrentItem->Owner->Complete == false)
774 TotalBytes += I->CurrentSize;
775 }
776
b98f2859
AL
777 // Normalize the figures and account for unknown size downloads
778 if (TotalBytes <= 0)
779 TotalBytes = 1;
780 if (Unknown == Count)
781 TotalBytes = Unknown;
18ef0a78
AL
782
783 // Wha?! Is not supposed to happen.
784 if (CurrentBytes > TotalBytes)
785 CurrentBytes = TotalBytes;
b98f2859
AL
786
787 // Compute the CPS
788 struct timeval NewTime;
789 gettimeofday(&NewTime,0);
790 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
791 NewTime.tv_sec - Time.tv_sec > 6)
792 {
f17ac097
AL
793 double Delta = NewTime.tv_sec - Time.tv_sec +
794 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 795
b98f2859 796 // Compute the CPS value
f17ac097 797 if (Delta < 0.01)
e331f6ed
AL
798 CurrentCPS = 0;
799 else
aa0e1101
AL
800 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
801 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 802 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
803 Time = NewTime;
804 }
024d1123
AL
805
806 return true;
b98f2859
AL
807}
808 /*}}}*/
809// AcquireStatus::Start - Called when the download is started /*{{{*/
810// ---------------------------------------------------------------------
811/* We just reset the counters */
812void pkgAcquireStatus::Start()
813{
814 gettimeofday(&Time,0);
815 gettimeofday(&StartTime,0);
816 LastBytes = 0;
817 CurrentCPS = 0;
818 CurrentBytes = 0;
819 TotalBytes = 0;
820 FetchedBytes = 0;
821 ElapsedTime = 0;
d568ed2d
AL
822 TotalItems = 0;
823 CurrentItems = 0;
b98f2859
AL
824}
825 /*}}}*/
a6568219 826// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
827// ---------------------------------------------------------------------
828/* This accurately computes the elapsed time and the total overall CPS. */
829void pkgAcquireStatus::Stop()
830{
831 // Compute the CPS and elapsed time
832 struct timeval NewTime;
833 gettimeofday(&NewTime,0);
834
31a0531d
AL
835 double Delta = NewTime.tv_sec - StartTime.tv_sec +
836 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 837
b98f2859 838 // Compute the CPS value
31a0531d 839 if (Delta < 0.01)
e331f6ed
AL
840 CurrentCPS = 0;
841 else
31a0531d 842 CurrentCPS = FetchedBytes/Delta;
b98f2859 843 LastBytes = CurrentBytes;
31a0531d 844 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
845}
846 /*}}}*/
847// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
848// ---------------------------------------------------------------------
849/* This is used to get accurate final transfer rate reporting. */
850void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 851{
b98f2859
AL
852 FetchedBytes += Size - Resume;
853}
854 /*}}}*/