]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
s/make/$(MAKE)/
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
c5ccf175 3// $Id: acquire.cc,v 1.46 2000/01/27 04:15:09 jgg Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
7a7fa5f0 26#include <dirent.h>
8267fe24 27#include <sys/time.h>
524f8105 28#include <errno.h>
5f3edc0f 29#include <sys/stat.h>
0118833a
AL
30 /*}}}*/
31
32// Acquire::pkgAcquire - Constructor /*{{{*/
33// ---------------------------------------------------------------------
93bf083d 34/* We grab some runtime state from the configuration space */
8267fe24 35pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
36{
37 Queues = 0;
38 Configs = 0;
0a8a80e5
AL
39 Workers = 0;
40 ToFetch = 0;
8b89e57f 41 Running = false;
0a8a80e5
AL
42
43 string Mode = _config->Find("Acquire::Queue-Mode","host");
44 if (strcasecmp(Mode.c_str(),"host") == 0)
45 QueueMode = QueueHost;
46 if (strcasecmp(Mode.c_str(),"access") == 0)
47 QueueMode = QueueAccess;
48
49 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 50
0c55d1d2 51 // This is really a stupid place for this
5f3edc0f
AL
52 struct stat St;
53 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
54 S_ISDIR(St.st_mode) == 0)
0c55d1d2 55 _error->Error("Lists directory %spartial is missing.",
5f3edc0f
AL
56 _config->FindDir("Dir::State::lists").c_str());
57 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
58 S_ISDIR(St.st_mode) == 0)
0c55d1d2 59 _error->Error("Archive directory %spartial is missing.",
5f3edc0f 60 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
61}
62 /*}}}*/
63// Acquire::~pkgAcquire - Destructor /*{{{*/
64// ---------------------------------------------------------------------
93bf083d 65/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
66pkgAcquire::~pkgAcquire()
67{
459681d3
AL
68 Shutdown();
69
3b5421b4
AL
70 while (Configs != 0)
71 {
72 MethodConfig *Jnk = Configs;
73 Configs = Configs->Next;
74 delete Jnk;
75 }
281daf46
AL
76}
77 /*}}}*/
8e5fc8f5 78// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
79// ---------------------------------------------------------------------
80/* */
81void pkgAcquire::Shutdown()
82{
83 while (Items.size() != 0)
84 delete Items[0];
0a8a80e5
AL
85
86 while (Queues != 0)
87 {
88 Queue *Jnk = Queues;
89 Queues = Queues->Next;
90 delete Jnk;
91 }
0118833a
AL
92}
93 /*}}}*/
94// Acquire::Add - Add a new item /*{{{*/
95// ---------------------------------------------------------------------
93bf083d
AL
96/* This puts an item on the acquire list. This list is mainly for tracking
97 item status */
0118833a
AL
98void pkgAcquire::Add(Item *Itm)
99{
100 Items.push_back(Itm);
101}
102 /*}}}*/
103// Acquire::Remove - Remove a item /*{{{*/
104// ---------------------------------------------------------------------
93bf083d 105/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
106void pkgAcquire::Remove(Item *Itm)
107{
a3eaf954
AL
108 Dequeue(Itm);
109
0118833a
AL
110 for (vector<Item *>::iterator I = Items.begin(); I < Items.end(); I++)
111 {
112 if (*I == Itm)
113 Items.erase(I);
8267fe24 114 }
0118833a
AL
115}
116 /*}}}*/
0a8a80e5
AL
117// Acquire::Add - Add a worker /*{{{*/
118// ---------------------------------------------------------------------
93bf083d
AL
119/* A list of workers is kept so that the select loop can direct their FD
120 usage. */
0a8a80e5
AL
121void pkgAcquire::Add(Worker *Work)
122{
123 Work->NextAcquire = Workers;
124 Workers = Work;
125}
126 /*}}}*/
127// Acquire::Remove - Remove a worker /*{{{*/
128// ---------------------------------------------------------------------
93bf083d
AL
129/* A worker has died. This can not be done while the select loop is running
130 as it would require that RunFds could handling a changing list state and
131 it cant.. */
0a8a80e5
AL
132void pkgAcquire::Remove(Worker *Work)
133{
93bf083d
AL
134 if (Running == true)
135 abort();
136
0a8a80e5
AL
137 Worker **I = &Workers;
138 for (; *I != 0;)
139 {
140 if (*I == Work)
141 *I = (*I)->NextAcquire;
142 else
143 I = &(*I)->NextAcquire;
144 }
145}
146 /*}}}*/
0118833a
AL
147// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
148// ---------------------------------------------------------------------
93bf083d 149/* This is the entry point for an item. An item calls this function when
281daf46 150 it is constructed which creates a queue (based on the current queue
93bf083d
AL
151 mode) and puts the item in that queue. If the system is running then
152 the queue might be started. */
8267fe24 153void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 154{
0a8a80e5 155 // Determine which queue to put the item in
e331f6ed
AL
156 const MethodConfig *Config;
157 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
158 if (Name.empty() == true)
159 return;
160
161 // Find the queue structure
162 Queue *I = Queues;
163 for (; I != 0 && I->Name != Name; I = I->Next);
164 if (I == 0)
165 {
166 I = new Queue(Name,this);
167 I->Next = Queues;
168 Queues = I;
93bf083d
AL
169
170 if (Running == true)
171 I->Startup();
0a8a80e5 172 }
bfd22fc0 173
e331f6ed
AL
174 // See if this is a local only URI
175 if (Config->LocalOnly == true && Item.Owner->Complete == false)
176 Item.Owner->Local = true;
8267fe24 177 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
178
179 // Queue it into the named queue
8267fe24 180 I->Enqueue(Item);
0a8a80e5 181 ToFetch++;
93bf083d 182
0a8a80e5
AL
183 // Some trace stuff
184 if (Debug == true)
185 {
8267fe24
AL
186 clog << "Fetching " << Item.URI << endl;
187 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 188 clog << " Queue is: " << Name << endl;
0a8a80e5 189 }
3b5421b4
AL
190}
191 /*}}}*/
0a8a80e5 192// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 193// ---------------------------------------------------------------------
93bf083d
AL
194/* This is called when an item is finished being fetched. It removes it
195 from all the queues */
0a8a80e5
AL
196void pkgAcquire::Dequeue(Item *Itm)
197{
198 Queue *I = Queues;
bfd22fc0 199 bool Res = false;
0a8a80e5 200 for (; I != 0; I = I->Next)
bfd22fc0 201 Res |= I->Dequeue(Itm);
93bf083d
AL
202
203 if (Debug == true)
204 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
205 if (Res == true)
206 ToFetch--;
0a8a80e5
AL
207}
208 /*}}}*/
209// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
210// ---------------------------------------------------------------------
211/* The string returned depends on the configuration settings and the
212 method parameters. Given something like http://foo.org/bar it can
213 return http://foo.org or http */
e331f6ed 214string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 215{
93bf083d
AL
216 URI U(Uri);
217
e331f6ed 218 Config = GetConfig(U.Access);
0a8a80e5
AL
219 if (Config == 0)
220 return string();
221
222 /* Single-Instance methods get exactly one queue per URI. This is
223 also used for the Access queue method */
224 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 225 return U.Access;
93bf083d
AL
226
227 return U.Access + ':' + U.Host;
0118833a
AL
228}
229 /*}}}*/
3b5421b4
AL
230// Acquire::GetConfig - Fetch the configuration information /*{{{*/
231// ---------------------------------------------------------------------
232/* This locates the configuration structure for an access method. If
233 a config structure cannot be found a Worker will be created to
234 retrieve it */
0a8a80e5 235pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
236{
237 // Search for an existing config
238 MethodConfig *Conf;
239 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
240 if (Conf->Access == Access)
241 return Conf;
242
243 // Create the new config class
244 Conf = new MethodConfig;
245 Conf->Access = Access;
246 Conf->Next = Configs;
247 Configs = Conf;
0118833a 248
3b5421b4
AL
249 // Create the worker to fetch the configuration
250 Worker Work(Conf);
251 if (Work.Start() == false)
252 return 0;
253
254 return Conf;
255}
256 /*}}}*/
0a8a80e5
AL
257// Acquire::SetFds - Deal with readable FDs /*{{{*/
258// ---------------------------------------------------------------------
259/* Collect FDs that have activity monitors into the fd sets */
260void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
261{
262 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
263 {
264 if (I->InReady == true && I->InFd >= 0)
265 {
266 if (Fd < I->InFd)
267 Fd = I->InFd;
268 FD_SET(I->InFd,RSet);
269 }
270 if (I->OutReady == true && I->OutFd >= 0)
271 {
272 if (Fd < I->OutFd)
273 Fd = I->OutFd;
274 FD_SET(I->OutFd,WSet);
275 }
276 }
277}
278 /*}}}*/
279// Acquire::RunFds - Deal with active FDs /*{{{*/
280// ---------------------------------------------------------------------
93bf083d
AL
281/* Dispatch active FDs over to the proper workers. It is very important
282 that a worker never be erased while this is running! The queue class
283 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
284void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
285{
286 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
287 {
288 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
289 I->InFdReady();
290 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
291 I->OutFdReady();
292 }
293}
294 /*}}}*/
295// Acquire::Run - Run the fetch sequence /*{{{*/
296// ---------------------------------------------------------------------
297/* This runs the queues. It manages a select loop for all of the
298 Worker tasks. The workers interact with the queues and items to
299 manage the actual fetch. */
024d1123 300pkgAcquire::RunResult pkgAcquire::Run()
0a8a80e5 301{
8b89e57f
AL
302 Running = true;
303
0a8a80e5
AL
304 for (Queue *I = Queues; I != 0; I = I->Next)
305 I->Startup();
306
b98f2859
AL
307 if (Log != 0)
308 Log->Start();
309
024d1123
AL
310 bool WasCancelled = false;
311
0a8a80e5 312 // Run till all things have been acquired
8267fe24
AL
313 struct timeval tv;
314 tv.tv_sec = 0;
315 tv.tv_usec = 500000;
0a8a80e5
AL
316 while (ToFetch > 0)
317 {
318 fd_set RFds;
319 fd_set WFds;
320 int Highest = 0;
321 FD_ZERO(&RFds);
322 FD_ZERO(&WFds);
323 SetFds(Highest,&RFds,&WFds);
324
b0db36b1
AL
325 int Res;
326 do
327 {
328 Res = select(Highest+1,&RFds,&WFds,0,&tv);
329 }
330 while (Res < 0 && errno == EINTR);
331
8267fe24 332 if (Res < 0)
8b89e57f 333 {
8267fe24
AL
334 _error->Errno("select","Select has failed");
335 break;
8b89e57f 336 }
93bf083d 337
0a8a80e5 338 RunFds(&RFds,&WFds);
93bf083d
AL
339 if (_error->PendingError() == true)
340 break;
8267fe24
AL
341
342 // Timeout, notify the log class
343 if (Res == 0 || (Log != 0 && Log->Update == true))
344 {
345 tv.tv_usec = 500000;
346 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
347 I->Pulse();
024d1123
AL
348 if (Log != 0 && Log->Pulse(this) == false)
349 {
350 WasCancelled = true;
351 break;
352 }
8267fe24 353 }
0a8a80e5 354 }
be4401bf 355
b98f2859
AL
356 if (Log != 0)
357 Log->Stop();
358
be4401bf
AL
359 // Shut down the acquire bits
360 Running = false;
0a8a80e5 361 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 362 I->Shutdown(false);
0a8a80e5 363
ab559b35
AL
364 // Shut down the items
365 for (Item **I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 366 (*I)->Finished();
ab559b35 367
024d1123
AL
368 if (_error->PendingError())
369 return Failed;
370 if (WasCancelled)
371 return Cancelled;
372 return Continue;
93bf083d
AL
373}
374 /*}}}*/
be4401bf 375// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
376// ---------------------------------------------------------------------
377/* This routine bumps idle queues in hopes that they will be able to fetch
378 the dequeued item */
379void pkgAcquire::Bump()
380{
be4401bf
AL
381 for (Queue *I = Queues; I != 0; I = I->Next)
382 I->Bump();
0a8a80e5
AL
383}
384 /*}}}*/
8267fe24
AL
385// Acquire::WorkerStep - Step to the next worker /*{{{*/
386// ---------------------------------------------------------------------
387/* Not inlined to advoid including acquire-worker.h */
388pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
389{
390 return I->NextAcquire;
391};
392 /*}}}*/
a6568219 393// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
394// ---------------------------------------------------------------------
395/* This is a bit simplistic, it looks at every file in the dir and sees
396 if it is part of the download set. */
397bool pkgAcquire::Clean(string Dir)
398{
399 DIR *D = opendir(Dir.c_str());
400 if (D == 0)
401 return _error->Errno("opendir","Unable to read %s",Dir.c_str());
402
403 string StartDir = SafeGetCWD();
404 if (chdir(Dir.c_str()) != 0)
405 {
406 closedir(D);
407 return _error->Errno("chdir","Unable to change to ",Dir.c_str());
408 }
409
410 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
411 {
412 // Skip some files..
413 if (strcmp(Dir->d_name,"lock") == 0 ||
414 strcmp(Dir->d_name,"partial") == 0 ||
415 strcmp(Dir->d_name,".") == 0 ||
416 strcmp(Dir->d_name,"..") == 0)
417 continue;
418
419 // Look in the get list
420 vector<Item *>::iterator I = Items.begin();
421 for (; I != Items.end(); I++)
422 if (flNotDir((*I)->DestFile) == Dir->d_name)
423 break;
424
425 // Nothing found, nuke it
426 if (I == Items.end())
427 unlink(Dir->d_name);
428 };
429
430 chdir(StartDir.c_str());
431 closedir(D);
432 return true;
433}
434 /*}}}*/
a6568219
AL
435// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
436// ---------------------------------------------------------------------
437/* This is the total number of bytes needed */
438unsigned long pkgAcquire::TotalNeeded()
439{
440 unsigned long Total = 0;
441 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
442 Total += (*I)->FileSize;
443 return Total;
444}
445 /*}}}*/
446// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
447// ---------------------------------------------------------------------
448/* This is the number of bytes that is not local */
449unsigned long pkgAcquire::FetchNeeded()
450{
451 unsigned long Total = 0;
452 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
453 if ((*I)->Local == false)
454 Total += (*I)->FileSize;
455 return Total;
456}
457 /*}}}*/
6b1ff003
AL
458// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
459// ---------------------------------------------------------------------
460/* This is the number of bytes that is not local */
461unsigned long pkgAcquire::PartialPresent()
462{
463 unsigned long Total = 0;
464 for (pkgAcquire::Item **I = ItemsBegin(); I != ItemsEnd(); I++)
465 if ((*I)->Local == false)
466 Total += (*I)->PartialSize;
467 return Total;
468}
469 /*}}}*/
8e5fc8f5 470// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
471// ---------------------------------------------------------------------
472/* */
473pkgAcquire::UriIterator pkgAcquire::UriBegin()
474{
475 return UriIterator(Queues);
476}
477 /*}}}*/
8e5fc8f5 478// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
479// ---------------------------------------------------------------------
480/* */
481pkgAcquire::UriIterator pkgAcquire::UriEnd()
482{
483 return UriIterator(0);
484}
485 /*}}}*/
0a8a80e5 486
e331f6ed
AL
487// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
488// ---------------------------------------------------------------------
489/* */
490pkgAcquire::MethodConfig::MethodConfig()
491{
492 SingleInstance = false;
e331f6ed
AL
493 Pipeline = false;
494 SendConfig = false;
495 LocalOnly = false;
459681d3 496 Removable = false;
e331f6ed
AL
497 Next = 0;
498}
499 /*}}}*/
500
0a8a80e5
AL
501// Queue::Queue - Constructor /*{{{*/
502// ---------------------------------------------------------------------
503/* */
504pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
505 Owner(Owner)
506{
507 Items = 0;
508 Next = 0;
509 Workers = 0;
b185acc2
AL
510 MaxPipeDepth = 1;
511 PipeDepth = 0;
0a8a80e5
AL
512}
513 /*}}}*/
514// Queue::~Queue - Destructor /*{{{*/
515// ---------------------------------------------------------------------
516/* */
517pkgAcquire::Queue::~Queue()
518{
8e5fc8f5 519 Shutdown(true);
0a8a80e5
AL
520
521 while (Items != 0)
522 {
523 QItem *Jnk = Items;
524 Items = Items->Next;
525 delete Jnk;
526 }
527}
528 /*}}}*/
529// Queue::Enqueue - Queue an item to the queue /*{{{*/
530// ---------------------------------------------------------------------
531/* */
8267fe24 532void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 533{
7a1b1f8b
AL
534 QItem **I = &Items;
535 for (; *I != 0; I = &(*I)->Next);
536
0a8a80e5 537 // Create a new item
7a1b1f8b
AL
538 QItem *Itm = new QItem;
539 *Itm = Item;
540 Itm->Next = 0;
541 *I = Itm;
0a8a80e5 542
8267fe24 543 Item.Owner->QueueCounter++;
93bf083d
AL
544 if (Items->Next == 0)
545 Cycle();
0a8a80e5
AL
546}
547 /*}}}*/
c88edf1d 548// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 549// ---------------------------------------------------------------------
b185acc2 550/* We return true if we hit something */
bfd22fc0 551bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 552{
b185acc2
AL
553 if (Owner->Status == pkgAcquire::Item::StatFetching)
554 return _error->Error("Tried to dequeue a fetching object");
555
bfd22fc0
AL
556 bool Res = false;
557
0a8a80e5
AL
558 QItem **I = &Items;
559 for (; *I != 0;)
560 {
561 if ((*I)->Owner == Owner)
562 {
563 QItem *Jnk= *I;
564 *I = (*I)->Next;
565 Owner->QueueCounter--;
566 delete Jnk;
bfd22fc0 567 Res = true;
0a8a80e5
AL
568 }
569 else
570 I = &(*I)->Next;
571 }
bfd22fc0
AL
572
573 return Res;
0a8a80e5
AL
574}
575 /*}}}*/
576// Queue::Startup - Start the worker processes /*{{{*/
577// ---------------------------------------------------------------------
8e5fc8f5
AL
578/* It is possible for this to be called with a pre-existing set of
579 workers. */
0a8a80e5
AL
580bool pkgAcquire::Queue::Startup()
581{
8e5fc8f5
AL
582 if (Workers == 0)
583 {
584 URI U(Name);
585 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
586 if (Cnf == 0)
587 return false;
588
589 Workers = new Worker(this,Cnf,Owner->Log);
590 Owner->Add(Workers);
591 if (Workers->Start() == false)
592 return false;
593
594 /* When pipelining we commit 10 items. This needs to change when we
595 added other source retry to have cycle maintain a pipeline depth
596 on its own. */
597 if (Cnf->Pipeline == true)
598 MaxPipeDepth = 10;
599 else
600 MaxPipeDepth = 1;
601 }
5cb5d8dc 602
93bf083d 603 return Cycle();
0a8a80e5
AL
604}
605 /*}}}*/
606// Queue::Shutdown - Shutdown the worker processes /*{{{*/
607// ---------------------------------------------------------------------
8e5fc8f5
AL
608/* If final is true then all workers are eliminated, otherwise only workers
609 that do not need cleanup are removed */
610bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
611{
612 // Delete all of the workers
8e5fc8f5
AL
613 pkgAcquire::Worker **Cur = &Workers;
614 while (*Cur != 0)
0a8a80e5 615 {
8e5fc8f5
AL
616 pkgAcquire::Worker *Jnk = *Cur;
617 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
618 {
619 *Cur = Jnk->NextQueue;
620 Owner->Remove(Jnk);
621 delete Jnk;
622 }
623 else
624 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
625 }
626
627 return true;
3b5421b4
AL
628}
629 /*}}}*/
7d8afa39 630// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
631// ---------------------------------------------------------------------
632/* */
633pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
634{
635 for (QItem *I = Items; I != 0; I = I->Next)
636 if (I->URI == URI && I->Worker == Owner)
637 return I;
638 return 0;
639}
640 /*}}}*/
641// Queue::ItemDone - Item has been completed /*{{{*/
642// ---------------------------------------------------------------------
643/* The worker signals this which causes the item to be removed from the
93bf083d
AL
644 queue. If this is the last queue instance then it is removed from the
645 main queue too.*/
c88edf1d
AL
646bool pkgAcquire::Queue::ItemDone(QItem *Itm)
647{
b185acc2 648 PipeDepth--;
db890fdb
AL
649 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
650 Itm->Owner->Status = pkgAcquire::Item::StatDone;
651
93bf083d
AL
652 if (Itm->Owner->QueueCounter <= 1)
653 Owner->Dequeue(Itm->Owner);
654 else
655 {
656 Dequeue(Itm->Owner);
657 Owner->Bump();
658 }
c88edf1d 659
93bf083d
AL
660 return Cycle();
661}
662 /*}}}*/
663// Queue::Cycle - Queue new items into the method /*{{{*/
664// ---------------------------------------------------------------------
b185acc2
AL
665/* This locates a new idle item and sends it to the worker. If pipelining
666 is enabled then it keeps the pipe full. */
93bf083d
AL
667bool pkgAcquire::Queue::Cycle()
668{
669 if (Items == 0 || Workers == 0)
c88edf1d
AL
670 return true;
671
e7432370
AL
672 if (PipeDepth < 0)
673 return _error->Error("Pipedepth failure");
674
93bf083d
AL
675 // Look for a queable item
676 QItem *I = Items;
e7432370 677 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
678 {
679 for (; I != 0; I = I->Next)
680 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
681 break;
682
683 // Nothing to do, queue is idle.
684 if (I == 0)
685 return true;
686
687 I->Worker = Workers;
688 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 689 PipeDepth++;
b185acc2
AL
690 if (Workers->QueueItem(I) == false)
691 return false;
692 }
93bf083d 693
b185acc2 694 return true;
c88edf1d
AL
695}
696 /*}}}*/
be4401bf
AL
697// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
698// ---------------------------------------------------------------------
b185acc2 699/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
700void pkgAcquire::Queue::Bump()
701{
b185acc2 702 Cycle();
be4401bf
AL
703}
704 /*}}}*/
b98f2859
AL
705
706// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
707// ---------------------------------------------------------------------
708/* */
c5ccf175 709pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
710{
711 Start();
712}
713 /*}}}*/
714// AcquireStatus::Pulse - Called periodically /*{{{*/
715// ---------------------------------------------------------------------
716/* This computes some internal state variables for the derived classes to
717 use. It generates the current downloaded bytes and total bytes to download
718 as well as the current CPS estimate. */
024d1123 719bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
720{
721 TotalBytes = 0;
722 CurrentBytes = 0;
d568ed2d
AL
723 TotalItems = 0;
724 CurrentItems = 0;
b98f2859
AL
725
726 // Compute the total number of bytes to fetch
727 unsigned int Unknown = 0;
728 unsigned int Count = 0;
729 for (pkgAcquire::Item **I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
730 I++, Count++)
731 {
d568ed2d
AL
732 TotalItems++;
733 if ((*I)->Status == pkgAcquire::Item::StatDone)
734 CurrentItems++;
735
a6568219
AL
736 // Totally ignore local items
737 if ((*I)->Local == true)
738 continue;
739
b98f2859
AL
740 TotalBytes += (*I)->FileSize;
741 if ((*I)->Complete == true)
742 CurrentBytes += (*I)->FileSize;
743 if ((*I)->FileSize == 0 && (*I)->Complete == false)
744 Unknown++;
745 }
746
747 // Compute the current completion
aa0e1101 748 unsigned long ResumeSize = 0;
b98f2859
AL
749 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
750 I = Owner->WorkerStep(I))
751 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
752 {
753 CurrentBytes += I->CurrentSize;
754 ResumeSize += I->ResumePoint;
755
756 // Files with unknown size always have 100% completion
757 if (I->CurrentItem->Owner->FileSize == 0 &&
758 I->CurrentItem->Owner->Complete == false)
759 TotalBytes += I->CurrentSize;
760 }
761
b98f2859
AL
762 // Normalize the figures and account for unknown size downloads
763 if (TotalBytes <= 0)
764 TotalBytes = 1;
765 if (Unknown == Count)
766 TotalBytes = Unknown;
18ef0a78
AL
767
768 // Wha?! Is not supposed to happen.
769 if (CurrentBytes > TotalBytes)
770 CurrentBytes = TotalBytes;
b98f2859
AL
771
772 // Compute the CPS
773 struct timeval NewTime;
774 gettimeofday(&NewTime,0);
775 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
776 NewTime.tv_sec - Time.tv_sec > 6)
777 {
f17ac097
AL
778 double Delta = NewTime.tv_sec - Time.tv_sec +
779 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 780
b98f2859 781 // Compute the CPS value
f17ac097 782 if (Delta < 0.01)
e331f6ed
AL
783 CurrentCPS = 0;
784 else
aa0e1101
AL
785 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
786 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 787 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
788 Time = NewTime;
789 }
024d1123
AL
790
791 return true;
b98f2859
AL
792}
793 /*}}}*/
794// AcquireStatus::Start - Called when the download is started /*{{{*/
795// ---------------------------------------------------------------------
796/* We just reset the counters */
797void pkgAcquireStatus::Start()
798{
799 gettimeofday(&Time,0);
800 gettimeofday(&StartTime,0);
801 LastBytes = 0;
802 CurrentCPS = 0;
803 CurrentBytes = 0;
804 TotalBytes = 0;
805 FetchedBytes = 0;
806 ElapsedTime = 0;
d568ed2d
AL
807 TotalItems = 0;
808 CurrentItems = 0;
b98f2859
AL
809}
810 /*}}}*/
a6568219 811// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
812// ---------------------------------------------------------------------
813/* This accurately computes the elapsed time and the total overall CPS. */
814void pkgAcquireStatus::Stop()
815{
816 // Compute the CPS and elapsed time
817 struct timeval NewTime;
818 gettimeofday(&NewTime,0);
819
31a0531d
AL
820 double Delta = NewTime.tv_sec - StartTime.tv_sec +
821 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 822
b98f2859 823 // Compute the CPS value
31a0531d 824 if (Delta < 0.01)
e331f6ed
AL
825 CurrentCPS = 0;
826 else
31a0531d 827 CurrentCPS = FetchedBytes/Delta;
b98f2859 828 LastBytes = CurrentBytes;
31a0531d 829 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
830}
831 /*}}}*/
832// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
833// ---------------------------------------------------------------------
834/* This is used to get accurate final transfer rate reporting. */
835void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 836{
b98f2859
AL
837 FetchedBytes += Size - Resume;
838}
839 /*}}}*/