]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
* apt-pkg/depcache.cc:
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6 26#include <apti18n.h>
b4fc9b6f
AL
27
28#include <iostream>
75ef8f14 29#include <sstream>
b2e465d6 30
7a7fa5f0 31#include <dirent.h>
8267fe24 32#include <sys/time.h>
524f8105 33#include <errno.h>
5f3edc0f 34#include <sys/stat.h>
0118833a
AL
35 /*}}}*/
36
b4fc9b6f
AL
37using namespace std;
38
0118833a
AL
39// Acquire::pkgAcquire - Constructor /*{{{*/
40// ---------------------------------------------------------------------
93bf083d 41/* We grab some runtime state from the configuration space */
8267fe24 42pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
43{
44 Queues = 0;
45 Configs = 0;
0a8a80e5
AL
46 Workers = 0;
47 ToFetch = 0;
8b89e57f 48 Running = false;
0a8a80e5
AL
49
50 string Mode = _config->Find("Acquire::Queue-Mode","host");
51 if (strcasecmp(Mode.c_str(),"host") == 0)
52 QueueMode = QueueHost;
53 if (strcasecmp(Mode.c_str(),"access") == 0)
54 QueueMode = QueueAccess;
55
56 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 57
0c55d1d2 58 // This is really a stupid place for this
5f3edc0f
AL
59 struct stat St;
60 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
61 S_ISDIR(St.st_mode) == 0)
b2e465d6 62 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
63 _config->FindDir("Dir::State::lists").c_str());
64 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
65 S_ISDIR(St.st_mode) == 0)
b2e465d6 66 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 67 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
68}
69 /*}}}*/
70// Acquire::~pkgAcquire - Destructor /*{{{*/
71// ---------------------------------------------------------------------
93bf083d 72/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
73pkgAcquire::~pkgAcquire()
74{
459681d3
AL
75 Shutdown();
76
3b5421b4
AL
77 while (Configs != 0)
78 {
79 MethodConfig *Jnk = Configs;
80 Configs = Configs->Next;
81 delete Jnk;
82 }
281daf46
AL
83}
84 /*}}}*/
8e5fc8f5 85// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
86// ---------------------------------------------------------------------
87/* */
88void pkgAcquire::Shutdown()
89{
90 while (Items.size() != 0)
1b480911
AL
91 {
92 if (Items[0]->Status == Item::StatFetching)
93 Items[0]->Status = Item::StatError;
281daf46 94 delete Items[0];
1b480911 95 }
0a8a80e5
AL
96
97 while (Queues != 0)
98 {
99 Queue *Jnk = Queues;
100 Queues = Queues->Next;
101 delete Jnk;
102 }
0118833a
AL
103}
104 /*}}}*/
105// Acquire::Add - Add a new item /*{{{*/
106// ---------------------------------------------------------------------
93bf083d
AL
107/* This puts an item on the acquire list. This list is mainly for tracking
108 item status */
0118833a
AL
109void pkgAcquire::Add(Item *Itm)
110{
111 Items.push_back(Itm);
112}
113 /*}}}*/
114// Acquire::Remove - Remove a item /*{{{*/
115// ---------------------------------------------------------------------
93bf083d 116/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
117void pkgAcquire::Remove(Item *Itm)
118{
a3eaf954
AL
119 Dequeue(Itm);
120
753b3525 121 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
122 {
123 if (*I == Itm)
b4fc9b6f 124 {
0118833a 125 Items.erase(I);
b4fc9b6f
AL
126 I = Items.begin();
127 }
753b3525
AL
128 else
129 I++;
8267fe24 130 }
0118833a
AL
131}
132 /*}}}*/
0a8a80e5
AL
133// Acquire::Add - Add a worker /*{{{*/
134// ---------------------------------------------------------------------
93bf083d
AL
135/* A list of workers is kept so that the select loop can direct their FD
136 usage. */
0a8a80e5
AL
137void pkgAcquire::Add(Worker *Work)
138{
139 Work->NextAcquire = Workers;
140 Workers = Work;
141}
142 /*}}}*/
143// Acquire::Remove - Remove a worker /*{{{*/
144// ---------------------------------------------------------------------
93bf083d
AL
145/* A worker has died. This can not be done while the select loop is running
146 as it would require that RunFds could handling a changing list state and
147 it cant.. */
0a8a80e5
AL
148void pkgAcquire::Remove(Worker *Work)
149{
93bf083d
AL
150 if (Running == true)
151 abort();
152
0a8a80e5
AL
153 Worker **I = &Workers;
154 for (; *I != 0;)
155 {
156 if (*I == Work)
157 *I = (*I)->NextAcquire;
158 else
159 I = &(*I)->NextAcquire;
160 }
161}
162 /*}}}*/
0118833a
AL
163// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
164// ---------------------------------------------------------------------
93bf083d 165/* This is the entry point for an item. An item calls this function when
281daf46 166 it is constructed which creates a queue (based on the current queue
93bf083d
AL
167 mode) and puts the item in that queue. If the system is running then
168 the queue might be started. */
8267fe24 169void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 170{
0a8a80e5 171 // Determine which queue to put the item in
e331f6ed
AL
172 const MethodConfig *Config;
173 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
174 if (Name.empty() == true)
175 return;
176
177 // Find the queue structure
178 Queue *I = Queues;
179 for (; I != 0 && I->Name != Name; I = I->Next);
180 if (I == 0)
181 {
182 I = new Queue(Name,this);
183 I->Next = Queues;
184 Queues = I;
93bf083d
AL
185
186 if (Running == true)
187 I->Startup();
0a8a80e5 188 }
bfd22fc0 189
e331f6ed
AL
190 // See if this is a local only URI
191 if (Config->LocalOnly == true && Item.Owner->Complete == false)
192 Item.Owner->Local = true;
8267fe24 193 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
194
195 // Queue it into the named queue
8267fe24 196 I->Enqueue(Item);
0a8a80e5 197 ToFetch++;
93bf083d 198
0a8a80e5
AL
199 // Some trace stuff
200 if (Debug == true)
201 {
8267fe24
AL
202 clog << "Fetching " << Item.URI << endl;
203 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 204 clog << " Queue is: " << Name << endl;
0a8a80e5 205 }
3b5421b4
AL
206}
207 /*}}}*/
0a8a80e5 208// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 209// ---------------------------------------------------------------------
93bf083d
AL
210/* This is called when an item is finished being fetched. It removes it
211 from all the queues */
0a8a80e5
AL
212void pkgAcquire::Dequeue(Item *Itm)
213{
214 Queue *I = Queues;
bfd22fc0 215 bool Res = false;
0a8a80e5 216 for (; I != 0; I = I->Next)
bfd22fc0 217 Res |= I->Dequeue(Itm);
93bf083d
AL
218
219 if (Debug == true)
220 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
221 if (Res == true)
222 ToFetch--;
0a8a80e5
AL
223}
224 /*}}}*/
225// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
226// ---------------------------------------------------------------------
227/* The string returned depends on the configuration settings and the
228 method parameters. Given something like http://foo.org/bar it can
229 return http://foo.org or http */
e331f6ed 230string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 231{
93bf083d
AL
232 URI U(Uri);
233
e331f6ed 234 Config = GetConfig(U.Access);
0a8a80e5
AL
235 if (Config == 0)
236 return string();
237
238 /* Single-Instance methods get exactly one queue per URI. This is
239 also used for the Access queue method */
240 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 241 return U.Access;
93bf083d
AL
242
243 return U.Access + ':' + U.Host;
0118833a
AL
244}
245 /*}}}*/
3b5421b4
AL
246// Acquire::GetConfig - Fetch the configuration information /*{{{*/
247// ---------------------------------------------------------------------
248/* This locates the configuration structure for an access method. If
249 a config structure cannot be found a Worker will be created to
250 retrieve it */
0a8a80e5 251pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
252{
253 // Search for an existing config
254 MethodConfig *Conf;
255 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
256 if (Conf->Access == Access)
257 return Conf;
258
259 // Create the new config class
260 Conf = new MethodConfig;
261 Conf->Access = Access;
262 Conf->Next = Configs;
263 Configs = Conf;
0118833a 264
3b5421b4
AL
265 // Create the worker to fetch the configuration
266 Worker Work(Conf);
267 if (Work.Start() == false)
268 return 0;
7c6e2dc7
MV
269
270 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
271 if(_config->FindI("Acquire::"+Access+"::DlLimit",0) > 0)
272 Conf->SingleInstance = true;
273
3b5421b4
AL
274 return Conf;
275}
276 /*}}}*/
0a8a80e5
AL
277// Acquire::SetFds - Deal with readable FDs /*{{{*/
278// ---------------------------------------------------------------------
279/* Collect FDs that have activity monitors into the fd sets */
280void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
281{
282 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
283 {
284 if (I->InReady == true && I->InFd >= 0)
285 {
286 if (Fd < I->InFd)
287 Fd = I->InFd;
288 FD_SET(I->InFd,RSet);
289 }
290 if (I->OutReady == true && I->OutFd >= 0)
291 {
292 if (Fd < I->OutFd)
293 Fd = I->OutFd;
294 FD_SET(I->OutFd,WSet);
295 }
296 }
297}
298 /*}}}*/
299// Acquire::RunFds - Deal with active FDs /*{{{*/
300// ---------------------------------------------------------------------
93bf083d
AL
301/* Dispatch active FDs over to the proper workers. It is very important
302 that a worker never be erased while this is running! The queue class
303 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
304void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
305{
306 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
307 {
308 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
309 I->InFdReady();
310 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
311 I->OutFdReady();
312 }
313}
314 /*}}}*/
315// Acquire::Run - Run the fetch sequence /*{{{*/
316// ---------------------------------------------------------------------
317/* This runs the queues. It manages a select loop for all of the
318 Worker tasks. The workers interact with the queues and items to
319 manage the actual fetch. */
1c5f7e5f 320pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 321{
8b89e57f
AL
322 Running = true;
323
0a8a80e5
AL
324 for (Queue *I = Queues; I != 0; I = I->Next)
325 I->Startup();
326
b98f2859
AL
327 if (Log != 0)
328 Log->Start();
329
024d1123
AL
330 bool WasCancelled = false;
331
0a8a80e5 332 // Run till all things have been acquired
8267fe24
AL
333 struct timeval tv;
334 tv.tv_sec = 0;
1c5f7e5f 335 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
336 while (ToFetch > 0)
337 {
338 fd_set RFds;
339 fd_set WFds;
340 int Highest = 0;
341 FD_ZERO(&RFds);
342 FD_ZERO(&WFds);
343 SetFds(Highest,&RFds,&WFds);
344
b0db36b1
AL
345 int Res;
346 do
347 {
348 Res = select(Highest+1,&RFds,&WFds,0,&tv);
349 }
350 while (Res < 0 && errno == EINTR);
351
8267fe24 352 if (Res < 0)
8b89e57f 353 {
8267fe24
AL
354 _error->Errno("select","Select has failed");
355 break;
8b89e57f 356 }
93bf083d 357
0a8a80e5 358 RunFds(&RFds,&WFds);
93bf083d
AL
359 if (_error->PendingError() == true)
360 break;
8267fe24
AL
361
362 // Timeout, notify the log class
363 if (Res == 0 || (Log != 0 && Log->Update == true))
364 {
1c5f7e5f 365 tv.tv_usec = PulseIntervall;
8267fe24
AL
366 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
367 I->Pulse();
024d1123
AL
368 if (Log != 0 && Log->Pulse(this) == false)
369 {
370 WasCancelled = true;
371 break;
372 }
8267fe24 373 }
0a8a80e5 374 }
be4401bf 375
b98f2859
AL
376 if (Log != 0)
377 Log->Stop();
378
be4401bf
AL
379 // Shut down the acquire bits
380 Running = false;
0a8a80e5 381 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 382 I->Shutdown(false);
0a8a80e5 383
ab559b35 384 // Shut down the items
b4fc9b6f 385 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 386 (*I)->Finished();
ab559b35 387
024d1123
AL
388 if (_error->PendingError())
389 return Failed;
390 if (WasCancelled)
391 return Cancelled;
392 return Continue;
93bf083d
AL
393}
394 /*}}}*/
be4401bf 395// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
396// ---------------------------------------------------------------------
397/* This routine bumps idle queues in hopes that they will be able to fetch
398 the dequeued item */
399void pkgAcquire::Bump()
400{
be4401bf
AL
401 for (Queue *I = Queues; I != 0; I = I->Next)
402 I->Bump();
0a8a80e5
AL
403}
404 /*}}}*/
8267fe24
AL
405// Acquire::WorkerStep - Step to the next worker /*{{{*/
406// ---------------------------------------------------------------------
407/* Not inlined to advoid including acquire-worker.h */
408pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
409{
410 return I->NextAcquire;
411};
412 /*}}}*/
a6568219 413// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
414// ---------------------------------------------------------------------
415/* This is a bit simplistic, it looks at every file in the dir and sees
416 if it is part of the download set. */
417bool pkgAcquire::Clean(string Dir)
418{
419 DIR *D = opendir(Dir.c_str());
420 if (D == 0)
b2e465d6 421 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
422
423 string StartDir = SafeGetCWD();
424 if (chdir(Dir.c_str()) != 0)
425 {
426 closedir(D);
b2e465d6 427 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
428 }
429
430 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
431 {
432 // Skip some files..
433 if (strcmp(Dir->d_name,"lock") == 0 ||
434 strcmp(Dir->d_name,"partial") == 0 ||
435 strcmp(Dir->d_name,".") == 0 ||
436 strcmp(Dir->d_name,"..") == 0)
437 continue;
438
439 // Look in the get list
b4fc9b6f 440 ItemCIterator I = Items.begin();
7a7fa5f0
AL
441 for (; I != Items.end(); I++)
442 if (flNotDir((*I)->DestFile) == Dir->d_name)
443 break;
444
445 // Nothing found, nuke it
446 if (I == Items.end())
447 unlink(Dir->d_name);
448 };
449
450 chdir(StartDir.c_str());
451 closedir(D);
452 return true;
453}
454 /*}}}*/
a6568219
AL
455// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
456// ---------------------------------------------------------------------
457/* This is the total number of bytes needed */
b2e465d6 458double pkgAcquire::TotalNeeded()
a6568219 459{
b2e465d6 460 double Total = 0;
b4fc9b6f 461 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
462 Total += (*I)->FileSize;
463 return Total;
464}
465 /*}}}*/
466// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
467// ---------------------------------------------------------------------
468/* This is the number of bytes that is not local */
b2e465d6 469double pkgAcquire::FetchNeeded()
a6568219 470{
b2e465d6 471 double Total = 0;
b4fc9b6f 472 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
473 if ((*I)->Local == false)
474 Total += (*I)->FileSize;
475 return Total;
476}
477 /*}}}*/
6b1ff003
AL
478// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
479// ---------------------------------------------------------------------
480/* This is the number of bytes that is not local */
b2e465d6 481double pkgAcquire::PartialPresent()
6b1ff003 482{
b2e465d6 483 double Total = 0;
b4fc9b6f 484 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
485 if ((*I)->Local == false)
486 Total += (*I)->PartialSize;
487 return Total;
488}
b3d44315 489
8e5fc8f5 490// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
491// ---------------------------------------------------------------------
492/* */
493pkgAcquire::UriIterator pkgAcquire::UriBegin()
494{
495 return UriIterator(Queues);
496}
497 /*}}}*/
8e5fc8f5 498// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
499// ---------------------------------------------------------------------
500/* */
501pkgAcquire::UriIterator pkgAcquire::UriEnd()
502{
503 return UriIterator(0);
504}
505 /*}}}*/
0a8a80e5 506
e331f6ed
AL
507// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
508// ---------------------------------------------------------------------
509/* */
510pkgAcquire::MethodConfig::MethodConfig()
511{
512 SingleInstance = false;
e331f6ed
AL
513 Pipeline = false;
514 SendConfig = false;
515 LocalOnly = false;
459681d3 516 Removable = false;
e331f6ed
AL
517 Next = 0;
518}
519 /*}}}*/
520
0a8a80e5
AL
521// Queue::Queue - Constructor /*{{{*/
522// ---------------------------------------------------------------------
523/* */
524pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
525 Owner(Owner)
526{
527 Items = 0;
528 Next = 0;
529 Workers = 0;
b185acc2
AL
530 MaxPipeDepth = 1;
531 PipeDepth = 0;
0a8a80e5
AL
532}
533 /*}}}*/
534// Queue::~Queue - Destructor /*{{{*/
535// ---------------------------------------------------------------------
536/* */
537pkgAcquire::Queue::~Queue()
538{
8e5fc8f5 539 Shutdown(true);
0a8a80e5
AL
540
541 while (Items != 0)
542 {
543 QItem *Jnk = Items;
544 Items = Items->Next;
545 delete Jnk;
546 }
547}
548 /*}}}*/
549// Queue::Enqueue - Queue an item to the queue /*{{{*/
550// ---------------------------------------------------------------------
551/* */
8267fe24 552void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 553{
7a1b1f8b
AL
554 QItem **I = &Items;
555 for (; *I != 0; I = &(*I)->Next);
556
0a8a80e5 557 // Create a new item
7a1b1f8b
AL
558 QItem *Itm = new QItem;
559 *Itm = Item;
560 Itm->Next = 0;
561 *I = Itm;
0a8a80e5 562
8267fe24 563 Item.Owner->QueueCounter++;
93bf083d
AL
564 if (Items->Next == 0)
565 Cycle();
0a8a80e5
AL
566}
567 /*}}}*/
c88edf1d 568// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 569// ---------------------------------------------------------------------
b185acc2 570/* We return true if we hit something */
bfd22fc0 571bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 572{
b185acc2
AL
573 if (Owner->Status == pkgAcquire::Item::StatFetching)
574 return _error->Error("Tried to dequeue a fetching object");
575
bfd22fc0
AL
576 bool Res = false;
577
0a8a80e5
AL
578 QItem **I = &Items;
579 for (; *I != 0;)
580 {
581 if ((*I)->Owner == Owner)
582 {
583 QItem *Jnk= *I;
584 *I = (*I)->Next;
585 Owner->QueueCounter--;
586 delete Jnk;
bfd22fc0 587 Res = true;
0a8a80e5
AL
588 }
589 else
590 I = &(*I)->Next;
591 }
bfd22fc0
AL
592
593 return Res;
0a8a80e5
AL
594}
595 /*}}}*/
596// Queue::Startup - Start the worker processes /*{{{*/
597// ---------------------------------------------------------------------
8e5fc8f5
AL
598/* It is possible for this to be called with a pre-existing set of
599 workers. */
0a8a80e5
AL
600bool pkgAcquire::Queue::Startup()
601{
8e5fc8f5
AL
602 if (Workers == 0)
603 {
604 URI U(Name);
605 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
606 if (Cnf == 0)
607 return false;
608
609 Workers = new Worker(this,Cnf,Owner->Log);
610 Owner->Add(Workers);
611 if (Workers->Start() == false)
612 return false;
613
614 /* When pipelining we commit 10 items. This needs to change when we
615 added other source retry to have cycle maintain a pipeline depth
616 on its own. */
617 if (Cnf->Pipeline == true)
618 MaxPipeDepth = 10;
619 else
620 MaxPipeDepth = 1;
621 }
5cb5d8dc 622
93bf083d 623 return Cycle();
0a8a80e5
AL
624}
625 /*}}}*/
626// Queue::Shutdown - Shutdown the worker processes /*{{{*/
627// ---------------------------------------------------------------------
8e5fc8f5
AL
628/* If final is true then all workers are eliminated, otherwise only workers
629 that do not need cleanup are removed */
630bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
631{
632 // Delete all of the workers
8e5fc8f5
AL
633 pkgAcquire::Worker **Cur = &Workers;
634 while (*Cur != 0)
0a8a80e5 635 {
8e5fc8f5
AL
636 pkgAcquire::Worker *Jnk = *Cur;
637 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
638 {
639 *Cur = Jnk->NextQueue;
640 Owner->Remove(Jnk);
641 delete Jnk;
642 }
643 else
644 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
645 }
646
647 return true;
3b5421b4
AL
648}
649 /*}}}*/
7d8afa39 650// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
651// ---------------------------------------------------------------------
652/* */
653pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
654{
655 for (QItem *I = Items; I != 0; I = I->Next)
656 if (I->URI == URI && I->Worker == Owner)
657 return I;
658 return 0;
659}
660 /*}}}*/
661// Queue::ItemDone - Item has been completed /*{{{*/
662// ---------------------------------------------------------------------
663/* The worker signals this which causes the item to be removed from the
93bf083d
AL
664 queue. If this is the last queue instance then it is removed from the
665 main queue too.*/
c88edf1d
AL
666bool pkgAcquire::Queue::ItemDone(QItem *Itm)
667{
b185acc2 668 PipeDepth--;
db890fdb
AL
669 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
670 Itm->Owner->Status = pkgAcquire::Item::StatDone;
671
93bf083d
AL
672 if (Itm->Owner->QueueCounter <= 1)
673 Owner->Dequeue(Itm->Owner);
674 else
675 {
676 Dequeue(Itm->Owner);
677 Owner->Bump();
678 }
c88edf1d 679
93bf083d
AL
680 return Cycle();
681}
682 /*}}}*/
683// Queue::Cycle - Queue new items into the method /*{{{*/
684// ---------------------------------------------------------------------
b185acc2
AL
685/* This locates a new idle item and sends it to the worker. If pipelining
686 is enabled then it keeps the pipe full. */
93bf083d
AL
687bool pkgAcquire::Queue::Cycle()
688{
689 if (Items == 0 || Workers == 0)
c88edf1d
AL
690 return true;
691
e7432370
AL
692 if (PipeDepth < 0)
693 return _error->Error("Pipedepth failure");
694
93bf083d
AL
695 // Look for a queable item
696 QItem *I = Items;
e7432370 697 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
698 {
699 for (; I != 0; I = I->Next)
700 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
701 break;
702
703 // Nothing to do, queue is idle.
704 if (I == 0)
705 return true;
706
707 I->Worker = Workers;
708 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 709 PipeDepth++;
b185acc2
AL
710 if (Workers->QueueItem(I) == false)
711 return false;
712 }
93bf083d 713
b185acc2 714 return true;
c88edf1d
AL
715}
716 /*}}}*/
be4401bf
AL
717// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
718// ---------------------------------------------------------------------
b185acc2 719/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
720void pkgAcquire::Queue::Bump()
721{
b185acc2 722 Cycle();
be4401bf
AL
723}
724 /*}}}*/
b98f2859
AL
725
726// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
727// ---------------------------------------------------------------------
728/* */
c5ccf175 729pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
730{
731 Start();
732}
733 /*}}}*/
734// AcquireStatus::Pulse - Called periodically /*{{{*/
735// ---------------------------------------------------------------------
736/* This computes some internal state variables for the derived classes to
737 use. It generates the current downloaded bytes and total bytes to download
738 as well as the current CPS estimate. */
024d1123 739bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
740{
741 TotalBytes = 0;
742 CurrentBytes = 0;
d568ed2d
AL
743 TotalItems = 0;
744 CurrentItems = 0;
b98f2859
AL
745
746 // Compute the total number of bytes to fetch
747 unsigned int Unknown = 0;
748 unsigned int Count = 0;
b4fc9b6f 749 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
750 I++, Count++)
751 {
d568ed2d
AL
752 TotalItems++;
753 if ((*I)->Status == pkgAcquire::Item::StatDone)
754 CurrentItems++;
755
a6568219
AL
756 // Totally ignore local items
757 if ((*I)->Local == true)
758 continue;
b2e465d6 759
b98f2859
AL
760 TotalBytes += (*I)->FileSize;
761 if ((*I)->Complete == true)
762 CurrentBytes += (*I)->FileSize;
763 if ((*I)->FileSize == 0 && (*I)->Complete == false)
764 Unknown++;
765 }
766
767 // Compute the current completion
aa0e1101 768 unsigned long ResumeSize = 0;
b98f2859
AL
769 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
770 I = Owner->WorkerStep(I))
771 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
772 {
773 CurrentBytes += I->CurrentSize;
774 ResumeSize += I->ResumePoint;
775
776 // Files with unknown size always have 100% completion
777 if (I->CurrentItem->Owner->FileSize == 0 &&
778 I->CurrentItem->Owner->Complete == false)
779 TotalBytes += I->CurrentSize;
780 }
781
b98f2859
AL
782 // Normalize the figures and account for unknown size downloads
783 if (TotalBytes <= 0)
784 TotalBytes = 1;
785 if (Unknown == Count)
786 TotalBytes = Unknown;
18ef0a78
AL
787
788 // Wha?! Is not supposed to happen.
789 if (CurrentBytes > TotalBytes)
790 CurrentBytes = TotalBytes;
b98f2859
AL
791
792 // Compute the CPS
793 struct timeval NewTime;
794 gettimeofday(&NewTime,0);
795 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
796 NewTime.tv_sec - Time.tv_sec > 6)
797 {
f17ac097
AL
798 double Delta = NewTime.tv_sec - Time.tv_sec +
799 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 800
b98f2859 801 // Compute the CPS value
f17ac097 802 if (Delta < 0.01)
e331f6ed
AL
803 CurrentCPS = 0;
804 else
aa0e1101
AL
805 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
806 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 807 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
808 Time = NewTime;
809 }
024d1123 810
75ef8f14
MV
811 int fd = _config->FindI("APT::Status-Fd",-1);
812 if(fd > 0)
813 {
814 ostringstream status;
815
816 char msg[200];
817 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
818 unsigned long ETA =
819 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
820
1e8b4c0f
MV
821 // only show the ETA if it makes sense
822 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 823 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 824 else
0c508b03 825 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
826
827
75ef8f14
MV
828
829 // build the status str
830 status << "dlstatus:" << i
831 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
832 << ":" << msg
833 << endl;
834 write(fd, status.str().c_str(), status.str().size());
835 }
836
024d1123 837 return true;
b98f2859
AL
838}
839 /*}}}*/
840// AcquireStatus::Start - Called when the download is started /*{{{*/
841// ---------------------------------------------------------------------
842/* We just reset the counters */
843void pkgAcquireStatus::Start()
844{
845 gettimeofday(&Time,0);
846 gettimeofday(&StartTime,0);
847 LastBytes = 0;
848 CurrentCPS = 0;
849 CurrentBytes = 0;
850 TotalBytes = 0;
851 FetchedBytes = 0;
852 ElapsedTime = 0;
d568ed2d
AL
853 TotalItems = 0;
854 CurrentItems = 0;
b98f2859
AL
855}
856 /*}}}*/
a6568219 857// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
858// ---------------------------------------------------------------------
859/* This accurately computes the elapsed time and the total overall CPS. */
860void pkgAcquireStatus::Stop()
861{
862 // Compute the CPS and elapsed time
863 struct timeval NewTime;
864 gettimeofday(&NewTime,0);
865
31a0531d
AL
866 double Delta = NewTime.tv_sec - StartTime.tv_sec +
867 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 868
b98f2859 869 // Compute the CPS value
31a0531d 870 if (Delta < 0.01)
e331f6ed
AL
871 CurrentCPS = 0;
872 else
31a0531d 873 CurrentCPS = FetchedBytes/Delta;
b98f2859 874 LastBytes = CurrentBytes;
31a0531d 875 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
876}
877 /*}}}*/
878// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
879// ---------------------------------------------------------------------
880/* This is used to get accurate final transfer rate reporting. */
881void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 882{
b98f2859
AL
883 FetchedBytes += Size - Resume;
884}
885 /*}}}*/