]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
fix progress reporting while reading extended_states file
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
8267fe24 22
b2e465d6 23#include <apti18n.h>
b4fc9b6f
AL
24
25#include <iostream>
75ef8f14 26#include <sstream>
526334a0
MV
27#include <stdio.h>
28
7a7fa5f0 29#include <dirent.h>
8267fe24 30#include <sys/time.h>
524f8105 31#include <errno.h>
5f3edc0f 32#include <sys/stat.h>
0118833a
AL
33 /*}}}*/
34
b4fc9b6f
AL
35using namespace std;
36
0118833a
AL
37// Acquire::pkgAcquire - Constructor /*{{{*/
38// ---------------------------------------------------------------------
93bf083d 39/* We grab some runtime state from the configuration space */
8267fe24 40pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
41{
42 Queues = 0;
43 Configs = 0;
0a8a80e5
AL
44 Workers = 0;
45 ToFetch = 0;
8b89e57f 46 Running = false;
0a8a80e5
AL
47
48 string Mode = _config->Find("Acquire::Queue-Mode","host");
49 if (strcasecmp(Mode.c_str(),"host") == 0)
50 QueueMode = QueueHost;
51 if (strcasecmp(Mode.c_str(),"access") == 0)
52 QueueMode = QueueAccess;
53
54 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 55
0c55d1d2 56 // This is really a stupid place for this
5f3edc0f
AL
57 struct stat St;
58 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
59 S_ISDIR(St.st_mode) == 0)
b2e465d6 60 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
61 _config->FindDir("Dir::State::lists").c_str());
62 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
63 S_ISDIR(St.st_mode) == 0)
b2e465d6 64 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 65 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
66}
67 /*}}}*/
68// Acquire::~pkgAcquire - Destructor /*{{{*/
69// ---------------------------------------------------------------------
93bf083d 70/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
71pkgAcquire::~pkgAcquire()
72{
459681d3
AL
73 Shutdown();
74
3b5421b4
AL
75 while (Configs != 0)
76 {
77 MethodConfig *Jnk = Configs;
78 Configs = Configs->Next;
79 delete Jnk;
80 }
281daf46
AL
81}
82 /*}}}*/
8e5fc8f5 83// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
84// ---------------------------------------------------------------------
85/* */
86void pkgAcquire::Shutdown()
87{
88 while (Items.size() != 0)
1b480911
AL
89 {
90 if (Items[0]->Status == Item::StatFetching)
91 Items[0]->Status = Item::StatError;
281daf46 92 delete Items[0];
1b480911 93 }
0a8a80e5
AL
94
95 while (Queues != 0)
96 {
97 Queue *Jnk = Queues;
98 Queues = Queues->Next;
99 delete Jnk;
100 }
0118833a
AL
101}
102 /*}}}*/
103// Acquire::Add - Add a new item /*{{{*/
104// ---------------------------------------------------------------------
93bf083d
AL
105/* This puts an item on the acquire list. This list is mainly for tracking
106 item status */
0118833a
AL
107void pkgAcquire::Add(Item *Itm)
108{
109 Items.push_back(Itm);
110}
111 /*}}}*/
112// Acquire::Remove - Remove a item /*{{{*/
113// ---------------------------------------------------------------------
93bf083d 114/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
115void pkgAcquire::Remove(Item *Itm)
116{
a3eaf954
AL
117 Dequeue(Itm);
118
753b3525 119 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
120 {
121 if (*I == Itm)
b4fc9b6f 122 {
0118833a 123 Items.erase(I);
b4fc9b6f
AL
124 I = Items.begin();
125 }
753b3525
AL
126 else
127 I++;
8267fe24 128 }
0118833a
AL
129}
130 /*}}}*/
0a8a80e5
AL
131// Acquire::Add - Add a worker /*{{{*/
132// ---------------------------------------------------------------------
93bf083d
AL
133/* A list of workers is kept so that the select loop can direct their FD
134 usage. */
0a8a80e5
AL
135void pkgAcquire::Add(Worker *Work)
136{
137 Work->NextAcquire = Workers;
138 Workers = Work;
139}
140 /*}}}*/
141// Acquire::Remove - Remove a worker /*{{{*/
142// ---------------------------------------------------------------------
93bf083d
AL
143/* A worker has died. This can not be done while the select loop is running
144 as it would require that RunFds could handling a changing list state and
145 it cant.. */
0a8a80e5
AL
146void pkgAcquire::Remove(Worker *Work)
147{
93bf083d
AL
148 if (Running == true)
149 abort();
150
0a8a80e5
AL
151 Worker **I = &Workers;
152 for (; *I != 0;)
153 {
154 if (*I == Work)
155 *I = (*I)->NextAcquire;
156 else
157 I = &(*I)->NextAcquire;
158 }
159}
160 /*}}}*/
0118833a
AL
161// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
162// ---------------------------------------------------------------------
93bf083d 163/* This is the entry point for an item. An item calls this function when
281daf46 164 it is constructed which creates a queue (based on the current queue
93bf083d
AL
165 mode) and puts the item in that queue. If the system is running then
166 the queue might be started. */
8267fe24 167void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 168{
0a8a80e5 169 // Determine which queue to put the item in
e331f6ed
AL
170 const MethodConfig *Config;
171 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
172 if (Name.empty() == true)
173 return;
174
175 // Find the queue structure
176 Queue *I = Queues;
177 for (; I != 0 && I->Name != Name; I = I->Next);
178 if (I == 0)
179 {
180 I = new Queue(Name,this);
181 I->Next = Queues;
182 Queues = I;
93bf083d
AL
183
184 if (Running == true)
185 I->Startup();
0a8a80e5 186 }
bfd22fc0 187
e331f6ed
AL
188 // See if this is a local only URI
189 if (Config->LocalOnly == true && Item.Owner->Complete == false)
190 Item.Owner->Local = true;
8267fe24 191 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
192
193 // Queue it into the named queue
c03462c6
MV
194 if(I->Enqueue(Item))
195 ToFetch++;
196
0a8a80e5
AL
197 // Some trace stuff
198 if (Debug == true)
199 {
8267fe24
AL
200 clog << "Fetching " << Item.URI << endl;
201 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 202 clog << " Queue is: " << Name << endl;
0a8a80e5 203 }
3b5421b4
AL
204}
205 /*}}}*/
0a8a80e5 206// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 207// ---------------------------------------------------------------------
93bf083d
AL
208/* This is called when an item is finished being fetched. It removes it
209 from all the queues */
0a8a80e5
AL
210void pkgAcquire::Dequeue(Item *Itm)
211{
212 Queue *I = Queues;
bfd22fc0 213 bool Res = false;
0a8a80e5 214 for (; I != 0; I = I->Next)
bfd22fc0 215 Res |= I->Dequeue(Itm);
93bf083d
AL
216
217 if (Debug == true)
218 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
219 if (Res == true)
220 ToFetch--;
0a8a80e5
AL
221}
222 /*}}}*/
223// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
224// ---------------------------------------------------------------------
225/* The string returned depends on the configuration settings and the
226 method parameters. Given something like http://foo.org/bar it can
227 return http://foo.org or http */
e331f6ed 228string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 229{
93bf083d
AL
230 URI U(Uri);
231
e331f6ed 232 Config = GetConfig(U.Access);
0a8a80e5
AL
233 if (Config == 0)
234 return string();
235
236 /* Single-Instance methods get exactly one queue per URI. This is
237 also used for the Access queue method */
238 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 239 return U.Access;
93bf083d
AL
240
241 return U.Access + ':' + U.Host;
0118833a
AL
242}
243 /*}}}*/
3b5421b4
AL
244// Acquire::GetConfig - Fetch the configuration information /*{{{*/
245// ---------------------------------------------------------------------
246/* This locates the configuration structure for an access method. If
247 a config structure cannot be found a Worker will be created to
248 retrieve it */
0a8a80e5 249pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
250{
251 // Search for an existing config
252 MethodConfig *Conf;
253 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
254 if (Conf->Access == Access)
255 return Conf;
256
257 // Create the new config class
258 Conf = new MethodConfig;
259 Conf->Access = Access;
260 Conf->Next = Configs;
261 Configs = Conf;
0118833a 262
3b5421b4
AL
263 // Create the worker to fetch the configuration
264 Worker Work(Conf);
265 if (Work.Start() == false)
266 return 0;
7c6e2dc7
MV
267
268 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 269 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
270 Conf->SingleInstance = true;
271
3b5421b4
AL
272 return Conf;
273}
274 /*}}}*/
0a8a80e5
AL
275// Acquire::SetFds - Deal with readable FDs /*{{{*/
276// ---------------------------------------------------------------------
277/* Collect FDs that have activity monitors into the fd sets */
278void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
279{
280 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
281 {
282 if (I->InReady == true && I->InFd >= 0)
283 {
284 if (Fd < I->InFd)
285 Fd = I->InFd;
286 FD_SET(I->InFd,RSet);
287 }
288 if (I->OutReady == true && I->OutFd >= 0)
289 {
290 if (Fd < I->OutFd)
291 Fd = I->OutFd;
292 FD_SET(I->OutFd,WSet);
293 }
294 }
295}
296 /*}}}*/
297// Acquire::RunFds - Deal with active FDs /*{{{*/
298// ---------------------------------------------------------------------
93bf083d
AL
299/* Dispatch active FDs over to the proper workers. It is very important
300 that a worker never be erased while this is running! The queue class
301 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
302void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
303{
304 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
305 {
306 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
307 I->InFdReady();
308 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
309 I->OutFdReady();
310 }
311}
312 /*}}}*/
313// Acquire::Run - Run the fetch sequence /*{{{*/
314// ---------------------------------------------------------------------
315/* This runs the queues. It manages a select loop for all of the
316 Worker tasks. The workers interact with the queues and items to
317 manage the actual fetch. */
1c5f7e5f 318pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 319{
8b89e57f
AL
320 Running = true;
321
0a8a80e5
AL
322 for (Queue *I = Queues; I != 0; I = I->Next)
323 I->Startup();
324
b98f2859
AL
325 if (Log != 0)
326 Log->Start();
327
024d1123
AL
328 bool WasCancelled = false;
329
0a8a80e5 330 // Run till all things have been acquired
8267fe24
AL
331 struct timeval tv;
332 tv.tv_sec = 0;
1c5f7e5f 333 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
334 while (ToFetch > 0)
335 {
336 fd_set RFds;
337 fd_set WFds;
338 int Highest = 0;
339 FD_ZERO(&RFds);
340 FD_ZERO(&WFds);
341 SetFds(Highest,&RFds,&WFds);
342
b0db36b1
AL
343 int Res;
344 do
345 {
346 Res = select(Highest+1,&RFds,&WFds,0,&tv);
347 }
348 while (Res < 0 && errno == EINTR);
349
8267fe24 350 if (Res < 0)
8b89e57f 351 {
8267fe24
AL
352 _error->Errno("select","Select has failed");
353 break;
8b89e57f 354 }
93bf083d 355
0a8a80e5 356 RunFds(&RFds,&WFds);
93bf083d
AL
357 if (_error->PendingError() == true)
358 break;
8267fe24
AL
359
360 // Timeout, notify the log class
361 if (Res == 0 || (Log != 0 && Log->Update == true))
362 {
1c5f7e5f 363 tv.tv_usec = PulseIntervall;
8267fe24
AL
364 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
365 I->Pulse();
024d1123
AL
366 if (Log != 0 && Log->Pulse(this) == false)
367 {
368 WasCancelled = true;
369 break;
370 }
8267fe24 371 }
0a8a80e5 372 }
be4401bf 373
b98f2859
AL
374 if (Log != 0)
375 Log->Stop();
376
be4401bf
AL
377 // Shut down the acquire bits
378 Running = false;
0a8a80e5 379 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 380 I->Shutdown(false);
0a8a80e5 381
ab559b35 382 // Shut down the items
b4fc9b6f 383 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 384 (*I)->Finished();
ab559b35 385
024d1123
AL
386 if (_error->PendingError())
387 return Failed;
388 if (WasCancelled)
389 return Cancelled;
390 return Continue;
93bf083d
AL
391}
392 /*}}}*/
be4401bf 393// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
394// ---------------------------------------------------------------------
395/* This routine bumps idle queues in hopes that they will be able to fetch
396 the dequeued item */
397void pkgAcquire::Bump()
398{
be4401bf
AL
399 for (Queue *I = Queues; I != 0; I = I->Next)
400 I->Bump();
0a8a80e5
AL
401}
402 /*}}}*/
8267fe24
AL
403// Acquire::WorkerStep - Step to the next worker /*{{{*/
404// ---------------------------------------------------------------------
405/* Not inlined to advoid including acquire-worker.h */
406pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
407{
408 return I->NextAcquire;
409};
410 /*}}}*/
a6568219 411// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
412// ---------------------------------------------------------------------
413/* This is a bit simplistic, it looks at every file in the dir and sees
414 if it is part of the download set. */
415bool pkgAcquire::Clean(string Dir)
416{
417 DIR *D = opendir(Dir.c_str());
418 if (D == 0)
b2e465d6 419 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
420
421 string StartDir = SafeGetCWD();
422 if (chdir(Dir.c_str()) != 0)
423 {
424 closedir(D);
b2e465d6 425 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
426 }
427
428 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
429 {
430 // Skip some files..
431 if (strcmp(Dir->d_name,"lock") == 0 ||
432 strcmp(Dir->d_name,"partial") == 0 ||
433 strcmp(Dir->d_name,".") == 0 ||
434 strcmp(Dir->d_name,"..") == 0)
435 continue;
436
437 // Look in the get list
b4fc9b6f 438 ItemCIterator I = Items.begin();
7a7fa5f0
AL
439 for (; I != Items.end(); I++)
440 if (flNotDir((*I)->DestFile) == Dir->d_name)
441 break;
442
443 // Nothing found, nuke it
444 if (I == Items.end())
445 unlink(Dir->d_name);
446 };
447
7a7fa5f0 448 closedir(D);
3c8cda8b
MV
449 if (chdir(StartDir.c_str()) != 0)
450 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
451 return true;
452}
453 /*}}}*/
a6568219
AL
454// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
455// ---------------------------------------------------------------------
456/* This is the total number of bytes needed */
b2e465d6 457double pkgAcquire::TotalNeeded()
a6568219 458{
b2e465d6 459 double Total = 0;
b4fc9b6f 460 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
461 Total += (*I)->FileSize;
462 return Total;
463}
464 /*}}}*/
465// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
466// ---------------------------------------------------------------------
467/* This is the number of bytes that is not local */
b2e465d6 468double pkgAcquire::FetchNeeded()
a6568219 469{
b2e465d6 470 double Total = 0;
b4fc9b6f 471 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
472 if ((*I)->Local == false)
473 Total += (*I)->FileSize;
474 return Total;
475}
476 /*}}}*/
6b1ff003
AL
477// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
478// ---------------------------------------------------------------------
479/* This is the number of bytes that is not local */
b2e465d6 480double pkgAcquire::PartialPresent()
6b1ff003 481{
b2e465d6 482 double Total = 0;
b4fc9b6f 483 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
484 if ((*I)->Local == false)
485 Total += (*I)->PartialSize;
486 return Total;
487}
92fcbfc1 488 /*}}}*/
8e5fc8f5 489// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
490// ---------------------------------------------------------------------
491/* */
492pkgAcquire::UriIterator pkgAcquire::UriBegin()
493{
494 return UriIterator(Queues);
495}
496 /*}}}*/
8e5fc8f5 497// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
498// ---------------------------------------------------------------------
499/* */
500pkgAcquire::UriIterator pkgAcquire::UriEnd()
501{
502 return UriIterator(0);
503}
504 /*}}}*/
e331f6ed
AL
505// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
506// ---------------------------------------------------------------------
507/* */
508pkgAcquire::MethodConfig::MethodConfig()
509{
510 SingleInstance = false;
e331f6ed
AL
511 Pipeline = false;
512 SendConfig = false;
513 LocalOnly = false;
459681d3 514 Removable = false;
e331f6ed
AL
515 Next = 0;
516}
517 /*}}}*/
0a8a80e5
AL
518// Queue::Queue - Constructor /*{{{*/
519// ---------------------------------------------------------------------
520/* */
521pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
522 Owner(Owner)
523{
524 Items = 0;
525 Next = 0;
526 Workers = 0;
b185acc2
AL
527 MaxPipeDepth = 1;
528 PipeDepth = 0;
0a8a80e5
AL
529}
530 /*}}}*/
531// Queue::~Queue - Destructor /*{{{*/
532// ---------------------------------------------------------------------
533/* */
534pkgAcquire::Queue::~Queue()
535{
8e5fc8f5 536 Shutdown(true);
0a8a80e5
AL
537
538 while (Items != 0)
539 {
540 QItem *Jnk = Items;
541 Items = Items->Next;
542 delete Jnk;
543 }
544}
545 /*}}}*/
546// Queue::Enqueue - Queue an item to the queue /*{{{*/
547// ---------------------------------------------------------------------
548/* */
c03462c6 549bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 550{
7a1b1f8b 551 QItem **I = &Items;
c03462c6
MV
552 // move to the end of the queue and check for duplicates here
553 for (; *I != 0; I = &(*I)->Next)
554 if (Item.URI == (*I)->URI)
555 {
556 Item.Owner->Status = Item::StatDone;
557 return false;
558 }
559
0a8a80e5 560 // Create a new item
7a1b1f8b
AL
561 QItem *Itm = new QItem;
562 *Itm = Item;
563 Itm->Next = 0;
564 *I = Itm;
0a8a80e5 565
8267fe24 566 Item.Owner->QueueCounter++;
93bf083d
AL
567 if (Items->Next == 0)
568 Cycle();
c03462c6 569 return true;
0a8a80e5
AL
570}
571 /*}}}*/
c88edf1d 572// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 573// ---------------------------------------------------------------------
b185acc2 574/* We return true if we hit something */
bfd22fc0 575bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 576{
b185acc2
AL
577 if (Owner->Status == pkgAcquire::Item::StatFetching)
578 return _error->Error("Tried to dequeue a fetching object");
579
bfd22fc0
AL
580 bool Res = false;
581
0a8a80e5
AL
582 QItem **I = &Items;
583 for (; *I != 0;)
584 {
585 if ((*I)->Owner == Owner)
586 {
587 QItem *Jnk= *I;
588 *I = (*I)->Next;
589 Owner->QueueCounter--;
590 delete Jnk;
bfd22fc0 591 Res = true;
0a8a80e5
AL
592 }
593 else
594 I = &(*I)->Next;
595 }
bfd22fc0
AL
596
597 return Res;
0a8a80e5
AL
598}
599 /*}}}*/
600// Queue::Startup - Start the worker processes /*{{{*/
601// ---------------------------------------------------------------------
8e5fc8f5
AL
602/* It is possible for this to be called with a pre-existing set of
603 workers. */
0a8a80e5
AL
604bool pkgAcquire::Queue::Startup()
605{
8e5fc8f5
AL
606 if (Workers == 0)
607 {
608 URI U(Name);
609 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
610 if (Cnf == 0)
611 return false;
612
613 Workers = new Worker(this,Cnf,Owner->Log);
614 Owner->Add(Workers);
615 if (Workers->Start() == false)
616 return false;
617
618 /* When pipelining we commit 10 items. This needs to change when we
619 added other source retry to have cycle maintain a pipeline depth
620 on its own. */
621 if (Cnf->Pipeline == true)
6ce72612 622 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
623 else
624 MaxPipeDepth = 1;
625 }
5cb5d8dc 626
93bf083d 627 return Cycle();
0a8a80e5
AL
628}
629 /*}}}*/
630// Queue::Shutdown - Shutdown the worker processes /*{{{*/
631// ---------------------------------------------------------------------
8e5fc8f5
AL
632/* If final is true then all workers are eliminated, otherwise only workers
633 that do not need cleanup are removed */
634bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
635{
636 // Delete all of the workers
8e5fc8f5
AL
637 pkgAcquire::Worker **Cur = &Workers;
638 while (*Cur != 0)
0a8a80e5 639 {
8e5fc8f5
AL
640 pkgAcquire::Worker *Jnk = *Cur;
641 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
642 {
643 *Cur = Jnk->NextQueue;
644 Owner->Remove(Jnk);
645 delete Jnk;
646 }
647 else
648 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
649 }
650
651 return true;
3b5421b4
AL
652}
653 /*}}}*/
7d8afa39 654// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
655// ---------------------------------------------------------------------
656/* */
657pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
658{
659 for (QItem *I = Items; I != 0; I = I->Next)
660 if (I->URI == URI && I->Worker == Owner)
661 return I;
662 return 0;
663}
664 /*}}}*/
665// Queue::ItemDone - Item has been completed /*{{{*/
666// ---------------------------------------------------------------------
667/* The worker signals this which causes the item to be removed from the
93bf083d
AL
668 queue. If this is the last queue instance then it is removed from the
669 main queue too.*/
c88edf1d
AL
670bool pkgAcquire::Queue::ItemDone(QItem *Itm)
671{
b185acc2 672 PipeDepth--;
db890fdb
AL
673 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
674 Itm->Owner->Status = pkgAcquire::Item::StatDone;
675
93bf083d
AL
676 if (Itm->Owner->QueueCounter <= 1)
677 Owner->Dequeue(Itm->Owner);
678 else
679 {
680 Dequeue(Itm->Owner);
681 Owner->Bump();
682 }
c88edf1d 683
93bf083d
AL
684 return Cycle();
685}
686 /*}}}*/
687// Queue::Cycle - Queue new items into the method /*{{{*/
688// ---------------------------------------------------------------------
b185acc2
AL
689/* This locates a new idle item and sends it to the worker. If pipelining
690 is enabled then it keeps the pipe full. */
93bf083d
AL
691bool pkgAcquire::Queue::Cycle()
692{
693 if (Items == 0 || Workers == 0)
c88edf1d
AL
694 return true;
695
e7432370
AL
696 if (PipeDepth < 0)
697 return _error->Error("Pipedepth failure");
698
93bf083d
AL
699 // Look for a queable item
700 QItem *I = Items;
e7432370 701 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
702 {
703 for (; I != 0; I = I->Next)
704 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
705 break;
706
707 // Nothing to do, queue is idle.
708 if (I == 0)
709 return true;
710
711 I->Worker = Workers;
712 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 713 PipeDepth++;
b185acc2
AL
714 if (Workers->QueueItem(I) == false)
715 return false;
716 }
93bf083d 717
b185acc2 718 return true;
c88edf1d
AL
719}
720 /*}}}*/
be4401bf
AL
721// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
722// ---------------------------------------------------------------------
b185acc2 723/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
724void pkgAcquire::Queue::Bump()
725{
b185acc2 726 Cycle();
be4401bf
AL
727}
728 /*}}}*/
b98f2859
AL
729// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
730// ---------------------------------------------------------------------
731/* */
c5ccf175 732pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
733{
734 Start();
735}
736 /*}}}*/
737// AcquireStatus::Pulse - Called periodically /*{{{*/
738// ---------------------------------------------------------------------
739/* This computes some internal state variables for the derived classes to
740 use. It generates the current downloaded bytes and total bytes to download
741 as well as the current CPS estimate. */
024d1123 742bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
743{
744 TotalBytes = 0;
745 CurrentBytes = 0;
d568ed2d
AL
746 TotalItems = 0;
747 CurrentItems = 0;
b98f2859
AL
748
749 // Compute the total number of bytes to fetch
750 unsigned int Unknown = 0;
751 unsigned int Count = 0;
b4fc9b6f 752 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
753 I++, Count++)
754 {
d568ed2d
AL
755 TotalItems++;
756 if ((*I)->Status == pkgAcquire::Item::StatDone)
757 CurrentItems++;
758
a6568219
AL
759 // Totally ignore local items
760 if ((*I)->Local == true)
761 continue;
b2e465d6 762
b98f2859
AL
763 TotalBytes += (*I)->FileSize;
764 if ((*I)->Complete == true)
765 CurrentBytes += (*I)->FileSize;
766 if ((*I)->FileSize == 0 && (*I)->Complete == false)
767 Unknown++;
768 }
769
770 // Compute the current completion
aa0e1101 771 unsigned long ResumeSize = 0;
b98f2859
AL
772 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
773 I = Owner->WorkerStep(I))
774 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
775 {
776 CurrentBytes += I->CurrentSize;
777 ResumeSize += I->ResumePoint;
778
779 // Files with unknown size always have 100% completion
780 if (I->CurrentItem->Owner->FileSize == 0 &&
781 I->CurrentItem->Owner->Complete == false)
782 TotalBytes += I->CurrentSize;
783 }
784
b98f2859
AL
785 // Normalize the figures and account for unknown size downloads
786 if (TotalBytes <= 0)
787 TotalBytes = 1;
788 if (Unknown == Count)
789 TotalBytes = Unknown;
18ef0a78
AL
790
791 // Wha?! Is not supposed to happen.
792 if (CurrentBytes > TotalBytes)
793 CurrentBytes = TotalBytes;
b98f2859
AL
794
795 // Compute the CPS
796 struct timeval NewTime;
797 gettimeofday(&NewTime,0);
2ec1674d 798 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
799 NewTime.tv_sec - Time.tv_sec > 6)
800 {
f17ac097
AL
801 double Delta = NewTime.tv_sec - Time.tv_sec +
802 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 803
b98f2859 804 // Compute the CPS value
f17ac097 805 if (Delta < 0.01)
e331f6ed
AL
806 CurrentCPS = 0;
807 else
aa0e1101
AL
808 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
809 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 810 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
811 Time = NewTime;
812 }
024d1123 813
75ef8f14
MV
814 int fd = _config->FindI("APT::Status-Fd",-1);
815 if(fd > 0)
816 {
817 ostringstream status;
818
819 char msg[200];
820 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
821 unsigned long ETA =
822 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
823
1e8b4c0f
MV
824 // only show the ETA if it makes sense
825 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 826 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 827 else
0c508b03 828 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
829
830
75ef8f14
MV
831
832 // build the status str
833 status << "dlstatus:" << i
834 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
835 << ":" << msg
836 << endl;
837 write(fd, status.str().c_str(), status.str().size());
838 }
839
024d1123 840 return true;
b98f2859
AL
841}
842 /*}}}*/
843// AcquireStatus::Start - Called when the download is started /*{{{*/
844// ---------------------------------------------------------------------
845/* We just reset the counters */
846void pkgAcquireStatus::Start()
847{
848 gettimeofday(&Time,0);
849 gettimeofday(&StartTime,0);
850 LastBytes = 0;
851 CurrentCPS = 0;
852 CurrentBytes = 0;
853 TotalBytes = 0;
854 FetchedBytes = 0;
855 ElapsedTime = 0;
d568ed2d
AL
856 TotalItems = 0;
857 CurrentItems = 0;
b98f2859
AL
858}
859 /*}}}*/
a6568219 860// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
861// ---------------------------------------------------------------------
862/* This accurately computes the elapsed time and the total overall CPS. */
863void pkgAcquireStatus::Stop()
864{
865 // Compute the CPS and elapsed time
866 struct timeval NewTime;
867 gettimeofday(&NewTime,0);
868
31a0531d
AL
869 double Delta = NewTime.tv_sec - StartTime.tv_sec +
870 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 871
b98f2859 872 // Compute the CPS value
31a0531d 873 if (Delta < 0.01)
e331f6ed
AL
874 CurrentCPS = 0;
875 else
31a0531d 876 CurrentCPS = FetchedBytes/Delta;
b98f2859 877 LastBytes = CurrentBytes;
31a0531d 878 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
879}
880 /*}}}*/
881// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
882// ---------------------------------------------------------------------
883/* This is used to get accurate final transfer rate reporting. */
884void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 885{
b98f2859
AL
886 FetchedBytes += Size - Resume;
887}
888 /*}}}*/