]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
merge from mvo
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
8267fe24 22
b2e465d6 23#include <apti18n.h>
b4fc9b6f
AL
24
25#include <iostream>
75ef8f14 26#include <sstream>
526334a0
MV
27#include <stdio.h>
28
7a7fa5f0 29#include <dirent.h>
8267fe24 30#include <sys/time.h>
524f8105 31#include <errno.h>
5f3edc0f 32#include <sys/stat.h>
0118833a
AL
33 /*}}}*/
34
b4fc9b6f
AL
35using namespace std;
36
0118833a
AL
37// Acquire::pkgAcquire - Constructor /*{{{*/
38// ---------------------------------------------------------------------
93bf083d 39/* We grab some runtime state from the configuration space */
8267fe24 40pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
41{
42 Queues = 0;
43 Configs = 0;
0a8a80e5
AL
44 Workers = 0;
45 ToFetch = 0;
8b89e57f 46 Running = false;
0a8a80e5
AL
47
48 string Mode = _config->Find("Acquire::Queue-Mode","host");
49 if (strcasecmp(Mode.c_str(),"host") == 0)
50 QueueMode = QueueHost;
51 if (strcasecmp(Mode.c_str(),"access") == 0)
52 QueueMode = QueueAccess;
53
54 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 55
0c55d1d2 56 // This is really a stupid place for this
5f3edc0f
AL
57 struct stat St;
58 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
59 S_ISDIR(St.st_mode) == 0)
b2e465d6 60 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
61 _config->FindDir("Dir::State::lists").c_str());
62 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
63 S_ISDIR(St.st_mode) == 0)
b2e465d6 64 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 65 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
66}
67 /*}}}*/
68// Acquire::~pkgAcquire - Destructor /*{{{*/
69// ---------------------------------------------------------------------
93bf083d 70/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
71pkgAcquire::~pkgAcquire()
72{
459681d3
AL
73 Shutdown();
74
3b5421b4
AL
75 while (Configs != 0)
76 {
77 MethodConfig *Jnk = Configs;
78 Configs = Configs->Next;
79 delete Jnk;
80 }
281daf46
AL
81}
82 /*}}}*/
8e5fc8f5 83// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
84// ---------------------------------------------------------------------
85/* */
86void pkgAcquire::Shutdown()
87{
88 while (Items.size() != 0)
1b480911
AL
89 {
90 if (Items[0]->Status == Item::StatFetching)
91 Items[0]->Status = Item::StatError;
281daf46 92 delete Items[0];
1b480911 93 }
0a8a80e5
AL
94
95 while (Queues != 0)
96 {
97 Queue *Jnk = Queues;
98 Queues = Queues->Next;
99 delete Jnk;
100 }
0118833a
AL
101}
102 /*}}}*/
103// Acquire::Add - Add a new item /*{{{*/
104// ---------------------------------------------------------------------
93bf083d
AL
105/* This puts an item on the acquire list. This list is mainly for tracking
106 item status */
0118833a
AL
107void pkgAcquire::Add(Item *Itm)
108{
109 Items.push_back(Itm);
110}
111 /*}}}*/
112// Acquire::Remove - Remove a item /*{{{*/
113// ---------------------------------------------------------------------
93bf083d 114/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
115void pkgAcquire::Remove(Item *Itm)
116{
a3eaf954
AL
117 Dequeue(Itm);
118
753b3525 119 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
120 {
121 if (*I == Itm)
b4fc9b6f 122 {
0118833a 123 Items.erase(I);
b4fc9b6f
AL
124 I = Items.begin();
125 }
753b3525
AL
126 else
127 I++;
8267fe24 128 }
0118833a
AL
129}
130 /*}}}*/
0a8a80e5
AL
131// Acquire::Add - Add a worker /*{{{*/
132// ---------------------------------------------------------------------
93bf083d
AL
133/* A list of workers is kept so that the select loop can direct their FD
134 usage. */
0a8a80e5
AL
135void pkgAcquire::Add(Worker *Work)
136{
137 Work->NextAcquire = Workers;
138 Workers = Work;
139}
140 /*}}}*/
141// Acquire::Remove - Remove a worker /*{{{*/
142// ---------------------------------------------------------------------
93bf083d
AL
143/* A worker has died. This can not be done while the select loop is running
144 as it would require that RunFds could handling a changing list state and
145 it cant.. */
0a8a80e5
AL
146void pkgAcquire::Remove(Worker *Work)
147{
93bf083d
AL
148 if (Running == true)
149 abort();
150
0a8a80e5
AL
151 Worker **I = &Workers;
152 for (; *I != 0;)
153 {
154 if (*I == Work)
155 *I = (*I)->NextAcquire;
156 else
157 I = &(*I)->NextAcquire;
158 }
159}
160 /*}}}*/
0118833a
AL
161// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
162// ---------------------------------------------------------------------
93bf083d 163/* This is the entry point for an item. An item calls this function when
281daf46 164 it is constructed which creates a queue (based on the current queue
93bf083d
AL
165 mode) and puts the item in that queue. If the system is running then
166 the queue might be started. */
8267fe24 167void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 168{
0a8a80e5 169 // Determine which queue to put the item in
e331f6ed
AL
170 const MethodConfig *Config;
171 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
172 if (Name.empty() == true)
173 return;
174
175 // Find the queue structure
176 Queue *I = Queues;
177 for (; I != 0 && I->Name != Name; I = I->Next);
178 if (I == 0)
179 {
180 I = new Queue(Name,this);
181 I->Next = Queues;
182 Queues = I;
93bf083d
AL
183
184 if (Running == true)
185 I->Startup();
0a8a80e5 186 }
bfd22fc0 187
e331f6ed
AL
188 // See if this is a local only URI
189 if (Config->LocalOnly == true && Item.Owner->Complete == false)
190 Item.Owner->Local = true;
8267fe24 191 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
192
193 // Queue it into the named queue
c03462c6
MV
194 if(I->Enqueue(Item))
195 ToFetch++;
196
0a8a80e5
AL
197 // Some trace stuff
198 if (Debug == true)
199 {
8267fe24
AL
200 clog << "Fetching " << Item.URI << endl;
201 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 202 clog << " Queue is: " << Name << endl;
0a8a80e5 203 }
3b5421b4
AL
204}
205 /*}}}*/
0a8a80e5 206// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 207// ---------------------------------------------------------------------
93bf083d
AL
208/* This is called when an item is finished being fetched. It removes it
209 from all the queues */
0a8a80e5
AL
210void pkgAcquire::Dequeue(Item *Itm)
211{
212 Queue *I = Queues;
bfd22fc0 213 bool Res = false;
0a8a80e5 214 for (; I != 0; I = I->Next)
bfd22fc0 215 Res |= I->Dequeue(Itm);
93bf083d
AL
216
217 if (Debug == true)
218 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
219 if (Res == true)
220 ToFetch--;
0a8a80e5
AL
221}
222 /*}}}*/
223// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
224// ---------------------------------------------------------------------
225/* The string returned depends on the configuration settings and the
226 method parameters. Given something like http://foo.org/bar it can
227 return http://foo.org or http */
e331f6ed 228string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 229{
93bf083d
AL
230 URI U(Uri);
231
e331f6ed 232 Config = GetConfig(U.Access);
0a8a80e5
AL
233 if (Config == 0)
234 return string();
235
236 /* Single-Instance methods get exactly one queue per URI. This is
237 also used for the Access queue method */
238 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 239 return U.Access;
93bf083d
AL
240
241 return U.Access + ':' + U.Host;
0118833a
AL
242}
243 /*}}}*/
3b5421b4
AL
244// Acquire::GetConfig - Fetch the configuration information /*{{{*/
245// ---------------------------------------------------------------------
246/* This locates the configuration structure for an access method. If
247 a config structure cannot be found a Worker will be created to
248 retrieve it */
0a8a80e5 249pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
250{
251 // Search for an existing config
252 MethodConfig *Conf;
253 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
254 if (Conf->Access == Access)
255 return Conf;
256
257 // Create the new config class
258 Conf = new MethodConfig;
259 Conf->Access = Access;
260 Conf->Next = Configs;
261 Configs = Conf;
0118833a 262
3b5421b4
AL
263 // Create the worker to fetch the configuration
264 Worker Work(Conf);
265 if (Work.Start() == false)
266 return 0;
7c6e2dc7
MV
267
268 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 269 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
270 Conf->SingleInstance = true;
271
3b5421b4
AL
272 return Conf;
273}
274 /*}}}*/
0a8a80e5
AL
275// Acquire::SetFds - Deal with readable FDs /*{{{*/
276// ---------------------------------------------------------------------
277/* Collect FDs that have activity monitors into the fd sets */
278void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
279{
280 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
281 {
282 if (I->InReady == true && I->InFd >= 0)
283 {
284 if (Fd < I->InFd)
285 Fd = I->InFd;
286 FD_SET(I->InFd,RSet);
287 }
288 if (I->OutReady == true && I->OutFd >= 0)
289 {
290 if (Fd < I->OutFd)
291 Fd = I->OutFd;
292 FD_SET(I->OutFd,WSet);
293 }
294 }
295}
296 /*}}}*/
297// Acquire::RunFds - Deal with active FDs /*{{{*/
298// ---------------------------------------------------------------------
93bf083d
AL
299/* Dispatch active FDs over to the proper workers. It is very important
300 that a worker never be erased while this is running! The queue class
301 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
302void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
303{
304 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
305 {
306 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
307 I->InFdReady();
308 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
309 I->OutFdReady();
310 }
311}
312 /*}}}*/
313// Acquire::Run - Run the fetch sequence /*{{{*/
314// ---------------------------------------------------------------------
315/* This runs the queues. It manages a select loop for all of the
316 Worker tasks. The workers interact with the queues and items to
317 manage the actual fetch. */
1c5f7e5f 318pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 319{
8b89e57f
AL
320 Running = true;
321
0a8a80e5
AL
322 for (Queue *I = Queues; I != 0; I = I->Next)
323 I->Startup();
324
b98f2859
AL
325 if (Log != 0)
326 Log->Start();
327
024d1123
AL
328 bool WasCancelled = false;
329
0a8a80e5 330 // Run till all things have been acquired
8267fe24
AL
331 struct timeval tv;
332 tv.tv_sec = 0;
1c5f7e5f 333 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
334 while (ToFetch > 0)
335 {
336 fd_set RFds;
337 fd_set WFds;
338 int Highest = 0;
339 FD_ZERO(&RFds);
340 FD_ZERO(&WFds);
341 SetFds(Highest,&RFds,&WFds);
342
b0db36b1
AL
343 int Res;
344 do
345 {
346 Res = select(Highest+1,&RFds,&WFds,0,&tv);
347 }
348 while (Res < 0 && errno == EINTR);
349
8267fe24 350 if (Res < 0)
8b89e57f 351 {
8267fe24
AL
352 _error->Errno("select","Select has failed");
353 break;
8b89e57f 354 }
93bf083d 355
0a8a80e5 356 RunFds(&RFds,&WFds);
93bf083d
AL
357 if (_error->PendingError() == true)
358 break;
8267fe24
AL
359
360 // Timeout, notify the log class
361 if (Res == 0 || (Log != 0 && Log->Update == true))
362 {
1c5f7e5f 363 tv.tv_usec = PulseIntervall;
8267fe24
AL
364 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
365 I->Pulse();
024d1123
AL
366 if (Log != 0 && Log->Pulse(this) == false)
367 {
368 WasCancelled = true;
369 break;
370 }
8267fe24 371 }
0a8a80e5 372 }
be4401bf 373
b98f2859
AL
374 if (Log != 0)
375 Log->Stop();
376
be4401bf
AL
377 // Shut down the acquire bits
378 Running = false;
0a8a80e5 379 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 380 I->Shutdown(false);
0a8a80e5 381
ab559b35 382 // Shut down the items
b4fc9b6f 383 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 384 (*I)->Finished();
ab559b35 385
024d1123
AL
386 if (_error->PendingError())
387 return Failed;
388 if (WasCancelled)
389 return Cancelled;
390 return Continue;
93bf083d
AL
391}
392 /*}}}*/
be4401bf 393// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
394// ---------------------------------------------------------------------
395/* This routine bumps idle queues in hopes that they will be able to fetch
396 the dequeued item */
397void pkgAcquire::Bump()
398{
be4401bf
AL
399 for (Queue *I = Queues; I != 0; I = I->Next)
400 I->Bump();
0a8a80e5
AL
401}
402 /*}}}*/
8267fe24
AL
403// Acquire::WorkerStep - Step to the next worker /*{{{*/
404// ---------------------------------------------------------------------
405/* Not inlined to advoid including acquire-worker.h */
406pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
407{
408 return I->NextAcquire;
409};
410 /*}}}*/
a6568219 411// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
412// ---------------------------------------------------------------------
413/* This is a bit simplistic, it looks at every file in the dir and sees
414 if it is part of the download set. */
415bool pkgAcquire::Clean(string Dir)
416{
417 DIR *D = opendir(Dir.c_str());
418 if (D == 0)
b2e465d6 419 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
420
421 string StartDir = SafeGetCWD();
422 if (chdir(Dir.c_str()) != 0)
423 {
424 closedir(D);
b2e465d6 425 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
426 }
427
428 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
429 {
430 // Skip some files..
431 if (strcmp(Dir->d_name,"lock") == 0 ||
432 strcmp(Dir->d_name,"partial") == 0 ||
433 strcmp(Dir->d_name,".") == 0 ||
434 strcmp(Dir->d_name,"..") == 0)
435 continue;
436
437 // Look in the get list
b4fc9b6f 438 ItemCIterator I = Items.begin();
7a7fa5f0
AL
439 for (; I != Items.end(); I++)
440 if (flNotDir((*I)->DestFile) == Dir->d_name)
441 break;
442
443 // Nothing found, nuke it
444 if (I == Items.end())
445 unlink(Dir->d_name);
446 };
447
7a7fa5f0 448 closedir(D);
3c8cda8b
MV
449 if (chdir(StartDir.c_str()) != 0)
450 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
451 return true;
452}
453 /*}}}*/
a6568219
AL
454// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
455// ---------------------------------------------------------------------
456/* This is the total number of bytes needed */
b2e465d6 457double pkgAcquire::TotalNeeded()
a6568219 458{
b2e465d6 459 double Total = 0;
b4fc9b6f 460 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
461 Total += (*I)->FileSize;
462 return Total;
463}
464 /*}}}*/
465// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
466// ---------------------------------------------------------------------
467/* This is the number of bytes that is not local */
b2e465d6 468double pkgAcquire::FetchNeeded()
a6568219 469{
b2e465d6 470 double Total = 0;
b4fc9b6f 471 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
472 if ((*I)->Local == false)
473 Total += (*I)->FileSize;
474 return Total;
475}
476 /*}}}*/
6b1ff003
AL
477// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
478// ---------------------------------------------------------------------
479/* This is the number of bytes that is not local */
b2e465d6 480double pkgAcquire::PartialPresent()
6b1ff003 481{
b2e465d6 482 double Total = 0;
b4fc9b6f 483 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
484 if ((*I)->Local == false)
485 Total += (*I)->PartialSize;
486 return Total;
487}
b3d44315 488
8e5fc8f5 489// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
490// ---------------------------------------------------------------------
491/* */
492pkgAcquire::UriIterator pkgAcquire::UriBegin()
493{
494 return UriIterator(Queues);
495}
496 /*}}}*/
8e5fc8f5 497// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
498// ---------------------------------------------------------------------
499/* */
500pkgAcquire::UriIterator pkgAcquire::UriEnd()
501{
502 return UriIterator(0);
503}
504 /*}}}*/
0a8a80e5 505
e331f6ed
AL
506// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
507// ---------------------------------------------------------------------
508/* */
509pkgAcquire::MethodConfig::MethodConfig()
510{
511 SingleInstance = false;
e331f6ed
AL
512 Pipeline = false;
513 SendConfig = false;
514 LocalOnly = false;
459681d3 515 Removable = false;
e331f6ed
AL
516 Next = 0;
517}
518 /*}}}*/
519
0a8a80e5
AL
520// Queue::Queue - Constructor /*{{{*/
521// ---------------------------------------------------------------------
522/* */
523pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
524 Owner(Owner)
525{
526 Items = 0;
527 Next = 0;
528 Workers = 0;
b185acc2
AL
529 MaxPipeDepth = 1;
530 PipeDepth = 0;
0a8a80e5
AL
531}
532 /*}}}*/
533// Queue::~Queue - Destructor /*{{{*/
534// ---------------------------------------------------------------------
535/* */
536pkgAcquire::Queue::~Queue()
537{
8e5fc8f5 538 Shutdown(true);
0a8a80e5
AL
539
540 while (Items != 0)
541 {
542 QItem *Jnk = Items;
543 Items = Items->Next;
544 delete Jnk;
545 }
546}
547 /*}}}*/
548// Queue::Enqueue - Queue an item to the queue /*{{{*/
549// ---------------------------------------------------------------------
550/* */
c03462c6 551bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 552{
7a1b1f8b 553 QItem **I = &Items;
c03462c6
MV
554 // move to the end of the queue and check for duplicates here
555 for (; *I != 0; I = &(*I)->Next)
556 if (Item.URI == (*I)->URI)
557 {
558 Item.Owner->Status = Item::StatDone;
559 return false;
560 }
561
0a8a80e5 562 // Create a new item
7a1b1f8b
AL
563 QItem *Itm = new QItem;
564 *Itm = Item;
565 Itm->Next = 0;
566 *I = Itm;
0a8a80e5 567
8267fe24 568 Item.Owner->QueueCounter++;
93bf083d
AL
569 if (Items->Next == 0)
570 Cycle();
c03462c6 571 return true;
0a8a80e5
AL
572}
573 /*}}}*/
c88edf1d 574// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 575// ---------------------------------------------------------------------
b185acc2 576/* We return true if we hit something */
bfd22fc0 577bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 578{
b185acc2
AL
579 if (Owner->Status == pkgAcquire::Item::StatFetching)
580 return _error->Error("Tried to dequeue a fetching object");
581
bfd22fc0
AL
582 bool Res = false;
583
0a8a80e5
AL
584 QItem **I = &Items;
585 for (; *I != 0;)
586 {
587 if ((*I)->Owner == Owner)
588 {
589 QItem *Jnk= *I;
590 *I = (*I)->Next;
591 Owner->QueueCounter--;
592 delete Jnk;
bfd22fc0 593 Res = true;
0a8a80e5
AL
594 }
595 else
596 I = &(*I)->Next;
597 }
bfd22fc0
AL
598
599 return Res;
0a8a80e5
AL
600}
601 /*}}}*/
602// Queue::Startup - Start the worker processes /*{{{*/
603// ---------------------------------------------------------------------
8e5fc8f5
AL
604/* It is possible for this to be called with a pre-existing set of
605 workers. */
0a8a80e5
AL
606bool pkgAcquire::Queue::Startup()
607{
8e5fc8f5
AL
608 if (Workers == 0)
609 {
610 URI U(Name);
611 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
612 if (Cnf == 0)
613 return false;
614
615 Workers = new Worker(this,Cnf,Owner->Log);
616 Owner->Add(Workers);
617 if (Workers->Start() == false)
618 return false;
619
620 /* When pipelining we commit 10 items. This needs to change when we
621 added other source retry to have cycle maintain a pipeline depth
622 on its own. */
623 if (Cnf->Pipeline == true)
6ce72612 624 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
625 else
626 MaxPipeDepth = 1;
627 }
5cb5d8dc 628
93bf083d 629 return Cycle();
0a8a80e5
AL
630}
631 /*}}}*/
632// Queue::Shutdown - Shutdown the worker processes /*{{{*/
633// ---------------------------------------------------------------------
8e5fc8f5
AL
634/* If final is true then all workers are eliminated, otherwise only workers
635 that do not need cleanup are removed */
636bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
637{
638 // Delete all of the workers
8e5fc8f5
AL
639 pkgAcquire::Worker **Cur = &Workers;
640 while (*Cur != 0)
0a8a80e5 641 {
8e5fc8f5
AL
642 pkgAcquire::Worker *Jnk = *Cur;
643 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
644 {
645 *Cur = Jnk->NextQueue;
646 Owner->Remove(Jnk);
647 delete Jnk;
648 }
649 else
650 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
651 }
652
653 return true;
3b5421b4
AL
654}
655 /*}}}*/
7d8afa39 656// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
657// ---------------------------------------------------------------------
658/* */
659pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
660{
661 for (QItem *I = Items; I != 0; I = I->Next)
662 if (I->URI == URI && I->Worker == Owner)
663 return I;
664 return 0;
665}
666 /*}}}*/
667// Queue::ItemDone - Item has been completed /*{{{*/
668// ---------------------------------------------------------------------
669/* The worker signals this which causes the item to be removed from the
93bf083d
AL
670 queue. If this is the last queue instance then it is removed from the
671 main queue too.*/
c88edf1d
AL
672bool pkgAcquire::Queue::ItemDone(QItem *Itm)
673{
b185acc2 674 PipeDepth--;
db890fdb
AL
675 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
676 Itm->Owner->Status = pkgAcquire::Item::StatDone;
677
93bf083d
AL
678 if (Itm->Owner->QueueCounter <= 1)
679 Owner->Dequeue(Itm->Owner);
680 else
681 {
682 Dequeue(Itm->Owner);
683 Owner->Bump();
684 }
c88edf1d 685
93bf083d
AL
686 return Cycle();
687}
688 /*}}}*/
689// Queue::Cycle - Queue new items into the method /*{{{*/
690// ---------------------------------------------------------------------
b185acc2
AL
691/* This locates a new idle item and sends it to the worker. If pipelining
692 is enabled then it keeps the pipe full. */
93bf083d
AL
693bool pkgAcquire::Queue::Cycle()
694{
695 if (Items == 0 || Workers == 0)
c88edf1d
AL
696 return true;
697
e7432370
AL
698 if (PipeDepth < 0)
699 return _error->Error("Pipedepth failure");
700
93bf083d
AL
701 // Look for a queable item
702 QItem *I = Items;
e7432370 703 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
704 {
705 for (; I != 0; I = I->Next)
706 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
707 break;
708
709 // Nothing to do, queue is idle.
710 if (I == 0)
711 return true;
712
713 I->Worker = Workers;
714 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 715 PipeDepth++;
b185acc2
AL
716 if (Workers->QueueItem(I) == false)
717 return false;
718 }
93bf083d 719
b185acc2 720 return true;
c88edf1d
AL
721}
722 /*}}}*/
be4401bf
AL
723// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
724// ---------------------------------------------------------------------
b185acc2 725/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
726void pkgAcquire::Queue::Bump()
727{
b185acc2 728 Cycle();
be4401bf
AL
729}
730 /*}}}*/
b98f2859
AL
731
732// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
733// ---------------------------------------------------------------------
734/* */
c5ccf175 735pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
736{
737 Start();
738}
739 /*}}}*/
740// AcquireStatus::Pulse - Called periodically /*{{{*/
741// ---------------------------------------------------------------------
742/* This computes some internal state variables for the derived classes to
743 use. It generates the current downloaded bytes and total bytes to download
744 as well as the current CPS estimate. */
024d1123 745bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
746{
747 TotalBytes = 0;
748 CurrentBytes = 0;
d568ed2d
AL
749 TotalItems = 0;
750 CurrentItems = 0;
b98f2859
AL
751
752 // Compute the total number of bytes to fetch
753 unsigned int Unknown = 0;
754 unsigned int Count = 0;
b4fc9b6f 755 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
756 I++, Count++)
757 {
d568ed2d
AL
758 TotalItems++;
759 if ((*I)->Status == pkgAcquire::Item::StatDone)
760 CurrentItems++;
761
a6568219
AL
762 // Totally ignore local items
763 if ((*I)->Local == true)
764 continue;
b2e465d6 765
b98f2859
AL
766 TotalBytes += (*I)->FileSize;
767 if ((*I)->Complete == true)
768 CurrentBytes += (*I)->FileSize;
769 if ((*I)->FileSize == 0 && (*I)->Complete == false)
770 Unknown++;
771 }
772
773 // Compute the current completion
aa0e1101 774 unsigned long ResumeSize = 0;
b98f2859
AL
775 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
776 I = Owner->WorkerStep(I))
777 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
778 {
779 CurrentBytes += I->CurrentSize;
780 ResumeSize += I->ResumePoint;
781
782 // Files with unknown size always have 100% completion
783 if (I->CurrentItem->Owner->FileSize == 0 &&
784 I->CurrentItem->Owner->Complete == false)
785 TotalBytes += I->CurrentSize;
786 }
787
b98f2859
AL
788 // Normalize the figures and account for unknown size downloads
789 if (TotalBytes <= 0)
790 TotalBytes = 1;
791 if (Unknown == Count)
792 TotalBytes = Unknown;
18ef0a78
AL
793
794 // Wha?! Is not supposed to happen.
795 if (CurrentBytes > TotalBytes)
796 CurrentBytes = TotalBytes;
b98f2859
AL
797
798 // Compute the CPS
799 struct timeval NewTime;
800 gettimeofday(&NewTime,0);
2ec1674d 801 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
802 NewTime.tv_sec - Time.tv_sec > 6)
803 {
f17ac097
AL
804 double Delta = NewTime.tv_sec - Time.tv_sec +
805 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 806
b98f2859 807 // Compute the CPS value
f17ac097 808 if (Delta < 0.01)
e331f6ed
AL
809 CurrentCPS = 0;
810 else
aa0e1101
AL
811 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
812 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 813 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
814 Time = NewTime;
815 }
024d1123 816
75ef8f14
MV
817 int fd = _config->FindI("APT::Status-Fd",-1);
818 if(fd > 0)
819 {
820 ostringstream status;
821
822 char msg[200];
823 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
824 unsigned long ETA =
825 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
826
1e8b4c0f
MV
827 // only show the ETA if it makes sense
828 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 829 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 830 else
0c508b03 831 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
832
833
75ef8f14
MV
834
835 // build the status str
836 status << "dlstatus:" << i
837 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
838 << ":" << msg
839 << endl;
840 write(fd, status.str().c_str(), status.str().size());
841 }
842
024d1123 843 return true;
b98f2859
AL
844}
845 /*}}}*/
846// AcquireStatus::Start - Called when the download is started /*{{{*/
847// ---------------------------------------------------------------------
848/* We just reset the counters */
849void pkgAcquireStatus::Start()
850{
851 gettimeofday(&Time,0);
852 gettimeofday(&StartTime,0);
853 LastBytes = 0;
854 CurrentCPS = 0;
855 CurrentBytes = 0;
856 TotalBytes = 0;
857 FetchedBytes = 0;
858 ElapsedTime = 0;
d568ed2d
AL
859 TotalItems = 0;
860 CurrentItems = 0;
b98f2859
AL
861}
862 /*}}}*/
a6568219 863// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
864// ---------------------------------------------------------------------
865/* This accurately computes the elapsed time and the total overall CPS. */
866void pkgAcquireStatus::Stop()
867{
868 // Compute the CPS and elapsed time
869 struct timeval NewTime;
870 gettimeofday(&NewTime,0);
871
31a0531d
AL
872 double Delta = NewTime.tv_sec - StartTime.tv_sec +
873 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 874
b98f2859 875 // Compute the CPS value
31a0531d 876 if (Delta < 0.01)
e331f6ed
AL
877 CurrentCPS = 0;
878 else
31a0531d 879 CurrentCPS = FetchedBytes/Delta;
b98f2859 880 LastBytes = CurrentBytes;
31a0531d 881 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
882}
883 /*}}}*/
884// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
885// ---------------------------------------------------------------------
886/* This is used to get accurate final transfer rate reporting. */
887void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 888{
b98f2859
AL
889 FetchedBytes += Size - Resume;
890}
891 /*}}}*/