]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
* revert bits of the acquire-item merge that broke the
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6 26#include <apti18n.h>
b4fc9b6f
AL
27
28#include <iostream>
75ef8f14 29#include <sstream>
b2e465d6 30
7a7fa5f0 31#include <dirent.h>
8267fe24 32#include <sys/time.h>
524f8105 33#include <errno.h>
5f3edc0f 34#include <sys/stat.h>
0118833a
AL
35 /*}}}*/
36
b4fc9b6f
AL
37using namespace std;
38
0118833a
AL
39// Acquire::pkgAcquire - Constructor /*{{{*/
40// ---------------------------------------------------------------------
93bf083d 41/* We grab some runtime state from the configuration space */
8267fe24 42pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
43{
44 Queues = 0;
45 Configs = 0;
0a8a80e5
AL
46 Workers = 0;
47 ToFetch = 0;
8b89e57f 48 Running = false;
0a8a80e5
AL
49
50 string Mode = _config->Find("Acquire::Queue-Mode","host");
51 if (strcasecmp(Mode.c_str(),"host") == 0)
52 QueueMode = QueueHost;
53 if (strcasecmp(Mode.c_str(),"access") == 0)
54 QueueMode = QueueAccess;
55
56 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 57
0c55d1d2 58 // This is really a stupid place for this
5f3edc0f
AL
59 struct stat St;
60 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
61 S_ISDIR(St.st_mode) == 0)
b2e465d6 62 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
63 _config->FindDir("Dir::State::lists").c_str());
64 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
65 S_ISDIR(St.st_mode) == 0)
b2e465d6 66 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 67 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
68}
69 /*}}}*/
70// Acquire::~pkgAcquire - Destructor /*{{{*/
71// ---------------------------------------------------------------------
93bf083d 72/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
73pkgAcquire::~pkgAcquire()
74{
459681d3
AL
75 Shutdown();
76
3b5421b4
AL
77 while (Configs != 0)
78 {
79 MethodConfig *Jnk = Configs;
80 Configs = Configs->Next;
81 delete Jnk;
82 }
281daf46
AL
83}
84 /*}}}*/
8e5fc8f5 85// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
86// ---------------------------------------------------------------------
87/* */
88void pkgAcquire::Shutdown()
89{
90 while (Items.size() != 0)
1b480911
AL
91 {
92 if (Items[0]->Status == Item::StatFetching)
93 Items[0]->Status = Item::StatError;
281daf46 94 delete Items[0];
1b480911 95 }
0a8a80e5
AL
96
97 while (Queues != 0)
98 {
99 Queue *Jnk = Queues;
100 Queues = Queues->Next;
101 delete Jnk;
102 }
0118833a
AL
103}
104 /*}}}*/
105// Acquire::Add - Add a new item /*{{{*/
106// ---------------------------------------------------------------------
93bf083d
AL
107/* This puts an item on the acquire list. This list is mainly for tracking
108 item status */
0118833a
AL
109void pkgAcquire::Add(Item *Itm)
110{
111 Items.push_back(Itm);
112}
113 /*}}}*/
114// Acquire::Remove - Remove a item /*{{{*/
115// ---------------------------------------------------------------------
93bf083d 116/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
117void pkgAcquire::Remove(Item *Itm)
118{
a3eaf954
AL
119 Dequeue(Itm);
120
753b3525 121 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
122 {
123 if (*I == Itm)
b4fc9b6f 124 {
0118833a 125 Items.erase(I);
b4fc9b6f
AL
126 I = Items.begin();
127 }
753b3525
AL
128 else
129 I++;
8267fe24 130 }
0118833a
AL
131}
132 /*}}}*/
0a8a80e5
AL
133// Acquire::Add - Add a worker /*{{{*/
134// ---------------------------------------------------------------------
93bf083d
AL
135/* A list of workers is kept so that the select loop can direct their FD
136 usage. */
0a8a80e5
AL
137void pkgAcquire::Add(Worker *Work)
138{
139 Work->NextAcquire = Workers;
140 Workers = Work;
141}
142 /*}}}*/
143// Acquire::Remove - Remove a worker /*{{{*/
144// ---------------------------------------------------------------------
93bf083d
AL
145/* A worker has died. This can not be done while the select loop is running
146 as it would require that RunFds could handling a changing list state and
147 it cant.. */
0a8a80e5
AL
148void pkgAcquire::Remove(Worker *Work)
149{
93bf083d
AL
150 if (Running == true)
151 abort();
152
0a8a80e5
AL
153 Worker **I = &Workers;
154 for (; *I != 0;)
155 {
156 if (*I == Work)
157 *I = (*I)->NextAcquire;
158 else
159 I = &(*I)->NextAcquire;
160 }
161}
162 /*}}}*/
0118833a
AL
163// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
164// ---------------------------------------------------------------------
93bf083d 165/* This is the entry point for an item. An item calls this function when
281daf46 166 it is constructed which creates a queue (based on the current queue
93bf083d
AL
167 mode) and puts the item in that queue. If the system is running then
168 the queue might be started. */
8267fe24 169void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 170{
0a8a80e5 171 // Determine which queue to put the item in
e331f6ed
AL
172 const MethodConfig *Config;
173 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
174 if (Name.empty() == true)
175 return;
176
177 // Find the queue structure
178 Queue *I = Queues;
179 for (; I != 0 && I->Name != Name; I = I->Next);
180 if (I == 0)
181 {
182 I = new Queue(Name,this);
183 I->Next = Queues;
184 Queues = I;
93bf083d
AL
185
186 if (Running == true)
187 I->Startup();
0a8a80e5 188 }
bfd22fc0 189
e331f6ed
AL
190 // See if this is a local only URI
191 if (Config->LocalOnly == true && Item.Owner->Complete == false)
192 Item.Owner->Local = true;
8267fe24 193 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
194
195 // Queue it into the named queue
c03462c6
MV
196 if(I->Enqueue(Item))
197 ToFetch++;
198
0a8a80e5
AL
199 // Some trace stuff
200 if (Debug == true)
201 {
8267fe24
AL
202 clog << "Fetching " << Item.URI << endl;
203 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 204 clog << " Queue is: " << Name << endl;
0a8a80e5 205 }
3b5421b4
AL
206}
207 /*}}}*/
0a8a80e5 208// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 209// ---------------------------------------------------------------------
93bf083d
AL
210/* This is called when an item is finished being fetched. It removes it
211 from all the queues */
0a8a80e5
AL
212void pkgAcquire::Dequeue(Item *Itm)
213{
214 Queue *I = Queues;
bfd22fc0 215 bool Res = false;
0a8a80e5 216 for (; I != 0; I = I->Next)
bfd22fc0 217 Res |= I->Dequeue(Itm);
93bf083d
AL
218
219 if (Debug == true)
220 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
221 if (Res == true)
222 ToFetch--;
0a8a80e5
AL
223}
224 /*}}}*/
225// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
226// ---------------------------------------------------------------------
227/* The string returned depends on the configuration settings and the
228 method parameters. Given something like http://foo.org/bar it can
229 return http://foo.org or http */
e331f6ed 230string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 231{
93bf083d
AL
232 URI U(Uri);
233
e331f6ed 234 Config = GetConfig(U.Access);
0a8a80e5
AL
235 if (Config == 0)
236 return string();
237
238 /* Single-Instance methods get exactly one queue per URI. This is
239 also used for the Access queue method */
240 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 241 return U.Access;
93bf083d
AL
242
243 return U.Access + ':' + U.Host;
0118833a
AL
244}
245 /*}}}*/
3b5421b4
AL
246// Acquire::GetConfig - Fetch the configuration information /*{{{*/
247// ---------------------------------------------------------------------
248/* This locates the configuration structure for an access method. If
249 a config structure cannot be found a Worker will be created to
250 retrieve it */
0a8a80e5 251pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
252{
253 // Search for an existing config
254 MethodConfig *Conf;
255 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
256 if (Conf->Access == Access)
257 return Conf;
258
259 // Create the new config class
260 Conf = new MethodConfig;
261 Conf->Access = Access;
262 Conf->Next = Configs;
263 Configs = Conf;
0118833a 264
3b5421b4
AL
265 // Create the worker to fetch the configuration
266 Worker Work(Conf);
267 if (Work.Start() == false)
268 return 0;
7c6e2dc7
MV
269
270 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
271 if(_config->FindI("Acquire::"+Access+"::DlLimit",0) > 0)
272 Conf->SingleInstance = true;
273
3b5421b4
AL
274 return Conf;
275}
276 /*}}}*/
0a8a80e5
AL
277// Acquire::SetFds - Deal with readable FDs /*{{{*/
278// ---------------------------------------------------------------------
279/* Collect FDs that have activity monitors into the fd sets */
280void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
281{
282 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
283 {
284 if (I->InReady == true && I->InFd >= 0)
285 {
286 if (Fd < I->InFd)
287 Fd = I->InFd;
288 FD_SET(I->InFd,RSet);
289 }
290 if (I->OutReady == true && I->OutFd >= 0)
291 {
292 if (Fd < I->OutFd)
293 Fd = I->OutFd;
294 FD_SET(I->OutFd,WSet);
295 }
296 }
297}
298 /*}}}*/
299// Acquire::RunFds - Deal with active FDs /*{{{*/
300// ---------------------------------------------------------------------
93bf083d
AL
301/* Dispatch active FDs over to the proper workers. It is very important
302 that a worker never be erased while this is running! The queue class
303 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
304void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
305{
306 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
307 {
308 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
309 I->InFdReady();
310 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
311 I->OutFdReady();
312 }
313}
314 /*}}}*/
315// Acquire::Run - Run the fetch sequence /*{{{*/
316// ---------------------------------------------------------------------
317/* This runs the queues. It manages a select loop for all of the
318 Worker tasks. The workers interact with the queues and items to
319 manage the actual fetch. */
1c5f7e5f 320pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 321{
8b89e57f
AL
322 Running = true;
323
0a8a80e5
AL
324 for (Queue *I = Queues; I != 0; I = I->Next)
325 I->Startup();
326
b98f2859
AL
327 if (Log != 0)
328 Log->Start();
329
024d1123
AL
330 bool WasCancelled = false;
331
0a8a80e5 332 // Run till all things have been acquired
8267fe24
AL
333 struct timeval tv;
334 tv.tv_sec = 0;
1c5f7e5f 335 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
336 while (ToFetch > 0)
337 {
338 fd_set RFds;
339 fd_set WFds;
340 int Highest = 0;
341 FD_ZERO(&RFds);
342 FD_ZERO(&WFds);
343 SetFds(Highest,&RFds,&WFds);
344
b0db36b1
AL
345 int Res;
346 do
347 {
348 Res = select(Highest+1,&RFds,&WFds,0,&tv);
349 }
350 while (Res < 0 && errno == EINTR);
351
8267fe24 352 if (Res < 0)
8b89e57f 353 {
8267fe24
AL
354 _error->Errno("select","Select has failed");
355 break;
8b89e57f 356 }
93bf083d 357
0a8a80e5 358 RunFds(&RFds,&WFds);
93bf083d
AL
359 if (_error->PendingError() == true)
360 break;
8267fe24
AL
361
362 // Timeout, notify the log class
363 if (Res == 0 || (Log != 0 && Log->Update == true))
364 {
1c5f7e5f 365 tv.tv_usec = PulseIntervall;
8267fe24
AL
366 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
367 I->Pulse();
024d1123
AL
368 if (Log != 0 && Log->Pulse(this) == false)
369 {
370 WasCancelled = true;
371 break;
372 }
8267fe24 373 }
0a8a80e5 374 }
be4401bf 375
b98f2859
AL
376 if (Log != 0)
377 Log->Stop();
378
be4401bf
AL
379 // Shut down the acquire bits
380 Running = false;
0a8a80e5 381 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 382 I->Shutdown(false);
0a8a80e5 383
ab559b35 384 // Shut down the items
b4fc9b6f 385 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 386 (*I)->Finished();
ab559b35 387
024d1123
AL
388 if (_error->PendingError())
389 return Failed;
390 if (WasCancelled)
391 return Cancelled;
392 return Continue;
93bf083d
AL
393}
394 /*}}}*/
be4401bf 395// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
396// ---------------------------------------------------------------------
397/* This routine bumps idle queues in hopes that they will be able to fetch
398 the dequeued item */
399void pkgAcquire::Bump()
400{
be4401bf
AL
401 for (Queue *I = Queues; I != 0; I = I->Next)
402 I->Bump();
0a8a80e5
AL
403}
404 /*}}}*/
8267fe24
AL
405// Acquire::WorkerStep - Step to the next worker /*{{{*/
406// ---------------------------------------------------------------------
407/* Not inlined to advoid including acquire-worker.h */
408pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
409{
410 return I->NextAcquire;
411};
412 /*}}}*/
a6568219 413// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
414// ---------------------------------------------------------------------
415/* This is a bit simplistic, it looks at every file in the dir and sees
416 if it is part of the download set. */
417bool pkgAcquire::Clean(string Dir)
418{
419 DIR *D = opendir(Dir.c_str());
420 if (D == 0)
b2e465d6 421 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
422
423 string StartDir = SafeGetCWD();
424 if (chdir(Dir.c_str()) != 0)
425 {
426 closedir(D);
b2e465d6 427 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
428 }
429
430 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
431 {
432 // Skip some files..
433 if (strcmp(Dir->d_name,"lock") == 0 ||
434 strcmp(Dir->d_name,"partial") == 0 ||
435 strcmp(Dir->d_name,".") == 0 ||
436 strcmp(Dir->d_name,"..") == 0)
437 continue;
438
439 // Look in the get list
b4fc9b6f 440 ItemCIterator I = Items.begin();
7a7fa5f0
AL
441 for (; I != Items.end(); I++)
442 if (flNotDir((*I)->DestFile) == Dir->d_name)
443 break;
444
445 // Nothing found, nuke it
446 if (I == Items.end())
447 unlink(Dir->d_name);
448 };
449
450 chdir(StartDir.c_str());
451 closedir(D);
452 return true;
453}
454 /*}}}*/
a6568219
AL
455// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
456// ---------------------------------------------------------------------
457/* This is the total number of bytes needed */
b2e465d6 458double pkgAcquire::TotalNeeded()
a6568219 459{
b2e465d6 460 double Total = 0;
b4fc9b6f 461 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
462 Total += (*I)->FileSize;
463 return Total;
464}
465 /*}}}*/
466// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
467// ---------------------------------------------------------------------
468/* This is the number of bytes that is not local */
b2e465d6 469double pkgAcquire::FetchNeeded()
a6568219 470{
b2e465d6 471 double Total = 0;
b4fc9b6f 472 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
473 if ((*I)->Local == false)
474 Total += (*I)->FileSize;
475 return Total;
476}
477 /*}}}*/
6b1ff003
AL
478// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
479// ---------------------------------------------------------------------
480/* This is the number of bytes that is not local */
b2e465d6 481double pkgAcquire::PartialPresent()
6b1ff003 482{
b2e465d6 483 double Total = 0;
b4fc9b6f 484 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
485 if ((*I)->Local == false)
486 Total += (*I)->PartialSize;
487 return Total;
488}
b3d44315 489
8e5fc8f5 490// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
491// ---------------------------------------------------------------------
492/* */
493pkgAcquire::UriIterator pkgAcquire::UriBegin()
494{
495 return UriIterator(Queues);
496}
497 /*}}}*/
8e5fc8f5 498// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
499// ---------------------------------------------------------------------
500/* */
501pkgAcquire::UriIterator pkgAcquire::UriEnd()
502{
503 return UriIterator(0);
504}
505 /*}}}*/
0a8a80e5 506
e331f6ed
AL
507// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
508// ---------------------------------------------------------------------
509/* */
510pkgAcquire::MethodConfig::MethodConfig()
511{
512 SingleInstance = false;
e331f6ed
AL
513 Pipeline = false;
514 SendConfig = false;
515 LocalOnly = false;
459681d3 516 Removable = false;
e331f6ed
AL
517 Next = 0;
518}
519 /*}}}*/
520
0a8a80e5
AL
521// Queue::Queue - Constructor /*{{{*/
522// ---------------------------------------------------------------------
523/* */
524pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
525 Owner(Owner)
526{
527 Items = 0;
528 Next = 0;
529 Workers = 0;
b185acc2
AL
530 MaxPipeDepth = 1;
531 PipeDepth = 0;
0a8a80e5
AL
532}
533 /*}}}*/
534// Queue::~Queue - Destructor /*{{{*/
535// ---------------------------------------------------------------------
536/* */
537pkgAcquire::Queue::~Queue()
538{
8e5fc8f5 539 Shutdown(true);
0a8a80e5
AL
540
541 while (Items != 0)
542 {
543 QItem *Jnk = Items;
544 Items = Items->Next;
545 delete Jnk;
546 }
547}
548 /*}}}*/
549// Queue::Enqueue - Queue an item to the queue /*{{{*/
550// ---------------------------------------------------------------------
551/* */
c03462c6 552bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 553{
7a1b1f8b 554 QItem **I = &Items;
c03462c6
MV
555 // move to the end of the queue and check for duplicates here
556 for (; *I != 0; I = &(*I)->Next)
557 if (Item.URI == (*I)->URI)
558 {
559 Item.Owner->Status = Item::StatDone;
560 return false;
561 }
562
0a8a80e5 563 // Create a new item
7a1b1f8b
AL
564 QItem *Itm = new QItem;
565 *Itm = Item;
566 Itm->Next = 0;
567 *I = Itm;
0a8a80e5 568
8267fe24 569 Item.Owner->QueueCounter++;
93bf083d
AL
570 if (Items->Next == 0)
571 Cycle();
c03462c6 572 return true;
0a8a80e5
AL
573}
574 /*}}}*/
c88edf1d 575// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 576// ---------------------------------------------------------------------
b185acc2 577/* We return true if we hit something */
bfd22fc0 578bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 579{
b185acc2
AL
580 if (Owner->Status == pkgAcquire::Item::StatFetching)
581 return _error->Error("Tried to dequeue a fetching object");
582
bfd22fc0
AL
583 bool Res = false;
584
0a8a80e5
AL
585 QItem **I = &Items;
586 for (; *I != 0;)
587 {
588 if ((*I)->Owner == Owner)
589 {
590 QItem *Jnk= *I;
591 *I = (*I)->Next;
592 Owner->QueueCounter--;
593 delete Jnk;
bfd22fc0 594 Res = true;
0a8a80e5
AL
595 }
596 else
597 I = &(*I)->Next;
598 }
bfd22fc0
AL
599
600 return Res;
0a8a80e5
AL
601}
602 /*}}}*/
603// Queue::Startup - Start the worker processes /*{{{*/
604// ---------------------------------------------------------------------
8e5fc8f5
AL
605/* It is possible for this to be called with a pre-existing set of
606 workers. */
0a8a80e5
AL
607bool pkgAcquire::Queue::Startup()
608{
8e5fc8f5
AL
609 if (Workers == 0)
610 {
611 URI U(Name);
612 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
613 if (Cnf == 0)
614 return false;
615
616 Workers = new Worker(this,Cnf,Owner->Log);
617 Owner->Add(Workers);
618 if (Workers->Start() == false)
619 return false;
620
621 /* When pipelining we commit 10 items. This needs to change when we
622 added other source retry to have cycle maintain a pipeline depth
623 on its own. */
624 if (Cnf->Pipeline == true)
625 MaxPipeDepth = 10;
626 else
627 MaxPipeDepth = 1;
628 }
5cb5d8dc 629
93bf083d 630 return Cycle();
0a8a80e5
AL
631}
632 /*}}}*/
633// Queue::Shutdown - Shutdown the worker processes /*{{{*/
634// ---------------------------------------------------------------------
8e5fc8f5
AL
635/* If final is true then all workers are eliminated, otherwise only workers
636 that do not need cleanup are removed */
637bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
638{
639 // Delete all of the workers
8e5fc8f5
AL
640 pkgAcquire::Worker **Cur = &Workers;
641 while (*Cur != 0)
0a8a80e5 642 {
8e5fc8f5
AL
643 pkgAcquire::Worker *Jnk = *Cur;
644 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
645 {
646 *Cur = Jnk->NextQueue;
647 Owner->Remove(Jnk);
648 delete Jnk;
649 }
650 else
651 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
652 }
653
654 return true;
3b5421b4
AL
655}
656 /*}}}*/
7d8afa39 657// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
658// ---------------------------------------------------------------------
659/* */
660pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
661{
662 for (QItem *I = Items; I != 0; I = I->Next)
663 if (I->URI == URI && I->Worker == Owner)
664 return I;
665 return 0;
666}
667 /*}}}*/
668// Queue::ItemDone - Item has been completed /*{{{*/
669// ---------------------------------------------------------------------
670/* The worker signals this which causes the item to be removed from the
93bf083d
AL
671 queue. If this is the last queue instance then it is removed from the
672 main queue too.*/
c88edf1d
AL
673bool pkgAcquire::Queue::ItemDone(QItem *Itm)
674{
b185acc2 675 PipeDepth--;
db890fdb
AL
676 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
677 Itm->Owner->Status = pkgAcquire::Item::StatDone;
678
93bf083d
AL
679 if (Itm->Owner->QueueCounter <= 1)
680 Owner->Dequeue(Itm->Owner);
681 else
682 {
683 Dequeue(Itm->Owner);
684 Owner->Bump();
685 }
c88edf1d 686
93bf083d
AL
687 return Cycle();
688}
689 /*}}}*/
690// Queue::Cycle - Queue new items into the method /*{{{*/
691// ---------------------------------------------------------------------
b185acc2
AL
692/* This locates a new idle item and sends it to the worker. If pipelining
693 is enabled then it keeps the pipe full. */
93bf083d
AL
694bool pkgAcquire::Queue::Cycle()
695{
696 if (Items == 0 || Workers == 0)
c88edf1d
AL
697 return true;
698
e7432370
AL
699 if (PipeDepth < 0)
700 return _error->Error("Pipedepth failure");
701
93bf083d
AL
702 // Look for a queable item
703 QItem *I = Items;
e7432370 704 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
705 {
706 for (; I != 0; I = I->Next)
707 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
708 break;
709
710 // Nothing to do, queue is idle.
711 if (I == 0)
712 return true;
713
714 I->Worker = Workers;
715 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 716 PipeDepth++;
b185acc2
AL
717 if (Workers->QueueItem(I) == false)
718 return false;
719 }
93bf083d 720
b185acc2 721 return true;
c88edf1d
AL
722}
723 /*}}}*/
be4401bf
AL
724// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
725// ---------------------------------------------------------------------
b185acc2 726/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
727void pkgAcquire::Queue::Bump()
728{
b185acc2 729 Cycle();
be4401bf
AL
730}
731 /*}}}*/
b98f2859
AL
732
733// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
734// ---------------------------------------------------------------------
735/* */
c5ccf175 736pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
737{
738 Start();
739}
740 /*}}}*/
741// AcquireStatus::Pulse - Called periodically /*{{{*/
742// ---------------------------------------------------------------------
743/* This computes some internal state variables for the derived classes to
744 use. It generates the current downloaded bytes and total bytes to download
745 as well as the current CPS estimate. */
024d1123 746bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
747{
748 TotalBytes = 0;
749 CurrentBytes = 0;
d568ed2d
AL
750 TotalItems = 0;
751 CurrentItems = 0;
b98f2859
AL
752
753 // Compute the total number of bytes to fetch
754 unsigned int Unknown = 0;
755 unsigned int Count = 0;
b4fc9b6f 756 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
757 I++, Count++)
758 {
d568ed2d
AL
759 TotalItems++;
760 if ((*I)->Status == pkgAcquire::Item::StatDone)
761 CurrentItems++;
762
a6568219
AL
763 // Totally ignore local items
764 if ((*I)->Local == true)
765 continue;
b2e465d6 766
b98f2859
AL
767 TotalBytes += (*I)->FileSize;
768 if ((*I)->Complete == true)
769 CurrentBytes += (*I)->FileSize;
770 if ((*I)->FileSize == 0 && (*I)->Complete == false)
771 Unknown++;
772 }
773
774 // Compute the current completion
aa0e1101 775 unsigned long ResumeSize = 0;
b98f2859
AL
776 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
777 I = Owner->WorkerStep(I))
778 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
779 {
780 CurrentBytes += I->CurrentSize;
781 ResumeSize += I->ResumePoint;
782
783 // Files with unknown size always have 100% completion
784 if (I->CurrentItem->Owner->FileSize == 0 &&
785 I->CurrentItem->Owner->Complete == false)
786 TotalBytes += I->CurrentSize;
787 }
788
b98f2859
AL
789 // Normalize the figures and account for unknown size downloads
790 if (TotalBytes <= 0)
791 TotalBytes = 1;
792 if (Unknown == Count)
793 TotalBytes = Unknown;
18ef0a78
AL
794
795 // Wha?! Is not supposed to happen.
796 if (CurrentBytes > TotalBytes)
797 CurrentBytes = TotalBytes;
b98f2859
AL
798
799 // Compute the CPS
800 struct timeval NewTime;
801 gettimeofday(&NewTime,0);
802 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
803 NewTime.tv_sec - Time.tv_sec > 6)
804 {
f17ac097
AL
805 double Delta = NewTime.tv_sec - Time.tv_sec +
806 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 807
b98f2859 808 // Compute the CPS value
f17ac097 809 if (Delta < 0.01)
e331f6ed
AL
810 CurrentCPS = 0;
811 else
aa0e1101
AL
812 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
813 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 814 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
815 Time = NewTime;
816 }
024d1123 817
75ef8f14
MV
818 int fd = _config->FindI("APT::Status-Fd",-1);
819 if(fd > 0)
820 {
821 ostringstream status;
822
823 char msg[200];
824 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
825 unsigned long ETA =
826 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
827
1e8b4c0f
MV
828 // only show the ETA if it makes sense
829 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 830 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 831 else
0c508b03 832 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
833
834
75ef8f14
MV
835
836 // build the status str
837 status << "dlstatus:" << i
838 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
839 << ":" << msg
840 << endl;
841 write(fd, status.str().c_str(), status.str().size());
842 }
843
024d1123 844 return true;
b98f2859
AL
845}
846 /*}}}*/
847// AcquireStatus::Start - Called when the download is started /*{{{*/
848// ---------------------------------------------------------------------
849/* We just reset the counters */
850void pkgAcquireStatus::Start()
851{
852 gettimeofday(&Time,0);
853 gettimeofday(&StartTime,0);
854 LastBytes = 0;
855 CurrentCPS = 0;
856 CurrentBytes = 0;
857 TotalBytes = 0;
858 FetchedBytes = 0;
859 ElapsedTime = 0;
d568ed2d
AL
860 TotalItems = 0;
861 CurrentItems = 0;
b98f2859
AL
862}
863 /*}}}*/
a6568219 864// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
865// ---------------------------------------------------------------------
866/* This accurately computes the elapsed time and the total overall CPS. */
867void pkgAcquireStatus::Stop()
868{
869 // Compute the CPS and elapsed time
870 struct timeval NewTime;
871 gettimeofday(&NewTime,0);
872
31a0531d
AL
873 double Delta = NewTime.tv_sec - StartTime.tv_sec +
874 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 875
b98f2859 876 // Compute the CPS value
31a0531d 877 if (Delta < 0.01)
e331f6ed
AL
878 CurrentCPS = 0;
879 else
31a0531d 880 CurrentCPS = FetchedBytes/Delta;
b98f2859 881 LastBytes = CurrentBytes;
31a0531d 882 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
883}
884 /*}}}*/
885// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
886// ---------------------------------------------------------------------
887/* This is used to get accurate final transfer rate reporting. */
888void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 889{
b98f2859
AL
890 FetchedBytes += Size - Resume;
891}
892 /*}}}*/