]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
* some more debug output
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
16#ifdef __GNUG__
17#pragma implementation "apt-pkg/acquire.h"
18#endif
19#include <apt-pkg/acquire.h>
20#include <apt-pkg/acquire-item.h>
21#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
22#include <apt-pkg/configuration.h>
23#include <apt-pkg/error.h>
cdcc6d34 24#include <apt-pkg/strutl.h>
8267fe24 25
b2e465d6 26#include <apti18n.h>
b4fc9b6f
AL
27
28#include <iostream>
75ef8f14 29#include <sstream>
b2e465d6 30
7a7fa5f0 31#include <dirent.h>
8267fe24 32#include <sys/time.h>
524f8105 33#include <errno.h>
5f3edc0f 34#include <sys/stat.h>
0118833a
AL
35 /*}}}*/
36
b4fc9b6f
AL
37using namespace std;
38
0118833a
AL
39// Acquire::pkgAcquire - Constructor /*{{{*/
40// ---------------------------------------------------------------------
93bf083d 41/* We grab some runtime state from the configuration space */
8267fe24 42pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
43{
44 Queues = 0;
45 Configs = 0;
0a8a80e5
AL
46 Workers = 0;
47 ToFetch = 0;
8b89e57f 48 Running = false;
0a8a80e5
AL
49
50 string Mode = _config->Find("Acquire::Queue-Mode","host");
51 if (strcasecmp(Mode.c_str(),"host") == 0)
52 QueueMode = QueueHost;
53 if (strcasecmp(Mode.c_str(),"access") == 0)
54 QueueMode = QueueAccess;
55
56 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 57
0c55d1d2 58 // This is really a stupid place for this
5f3edc0f
AL
59 struct stat St;
60 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
61 S_ISDIR(St.st_mode) == 0)
b2e465d6 62 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
63 _config->FindDir("Dir::State::lists").c_str());
64 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
65 S_ISDIR(St.st_mode) == 0)
b2e465d6 66 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 67 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
68}
69 /*}}}*/
70// Acquire::~pkgAcquire - Destructor /*{{{*/
71// ---------------------------------------------------------------------
93bf083d 72/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
73pkgAcquire::~pkgAcquire()
74{
459681d3
AL
75 Shutdown();
76
3b5421b4
AL
77 while (Configs != 0)
78 {
79 MethodConfig *Jnk = Configs;
80 Configs = Configs->Next;
81 delete Jnk;
82 }
281daf46
AL
83}
84 /*}}}*/
8e5fc8f5 85// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
86// ---------------------------------------------------------------------
87/* */
88void pkgAcquire::Shutdown()
89{
90 while (Items.size() != 0)
1b480911
AL
91 {
92 if (Items[0]->Status == Item::StatFetching)
93 Items[0]->Status = Item::StatError;
281daf46 94 delete Items[0];
1b480911 95 }
0a8a80e5
AL
96
97 while (Queues != 0)
98 {
99 Queue *Jnk = Queues;
100 Queues = Queues->Next;
101 delete Jnk;
102 }
0118833a
AL
103}
104 /*}}}*/
105// Acquire::Add - Add a new item /*{{{*/
106// ---------------------------------------------------------------------
93bf083d
AL
107/* This puts an item on the acquire list. This list is mainly for tracking
108 item status */
0118833a
AL
109void pkgAcquire::Add(Item *Itm)
110{
111 Items.push_back(Itm);
112}
113 /*}}}*/
114// Acquire::Remove - Remove a item /*{{{*/
115// ---------------------------------------------------------------------
93bf083d 116/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
117void pkgAcquire::Remove(Item *Itm)
118{
a3eaf954
AL
119 Dequeue(Itm);
120
753b3525 121 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
122 {
123 if (*I == Itm)
b4fc9b6f 124 {
0118833a 125 Items.erase(I);
b4fc9b6f
AL
126 I = Items.begin();
127 }
753b3525
AL
128 else
129 I++;
8267fe24 130 }
0118833a
AL
131}
132 /*}}}*/
0a8a80e5
AL
133// Acquire::Add - Add a worker /*{{{*/
134// ---------------------------------------------------------------------
93bf083d
AL
135/* A list of workers is kept so that the select loop can direct their FD
136 usage. */
0a8a80e5
AL
137void pkgAcquire::Add(Worker *Work)
138{
139 Work->NextAcquire = Workers;
140 Workers = Work;
141}
142 /*}}}*/
143// Acquire::Remove - Remove a worker /*{{{*/
144// ---------------------------------------------------------------------
93bf083d
AL
145/* A worker has died. This can not be done while the select loop is running
146 as it would require that RunFds could handling a changing list state and
147 it cant.. */
0a8a80e5
AL
148void pkgAcquire::Remove(Worker *Work)
149{
93bf083d
AL
150 if (Running == true)
151 abort();
152
0a8a80e5
AL
153 Worker **I = &Workers;
154 for (; *I != 0;)
155 {
156 if (*I == Work)
157 *I = (*I)->NextAcquire;
158 else
159 I = &(*I)->NextAcquire;
160 }
161}
162 /*}}}*/
0118833a
AL
163// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
164// ---------------------------------------------------------------------
93bf083d 165/* This is the entry point for an item. An item calls this function when
281daf46 166 it is constructed which creates a queue (based on the current queue
93bf083d
AL
167 mode) and puts the item in that queue. If the system is running then
168 the queue might be started. */
8267fe24 169void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 170{
0a8a80e5 171 // Determine which queue to put the item in
e331f6ed
AL
172 const MethodConfig *Config;
173 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
174 if (Name.empty() == true)
175 return;
176
177 // Find the queue structure
178 Queue *I = Queues;
179 for (; I != 0 && I->Name != Name; I = I->Next);
180 if (I == 0)
181 {
182 I = new Queue(Name,this);
183 I->Next = Queues;
184 Queues = I;
93bf083d
AL
185
186 if (Running == true)
187 I->Startup();
0a8a80e5 188 }
bfd22fc0 189
e331f6ed
AL
190 // See if this is a local only URI
191 if (Config->LocalOnly == true && Item.Owner->Complete == false)
192 Item.Owner->Local = true;
8267fe24 193 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
194
195 // Queue it into the named queue
8267fe24 196 I->Enqueue(Item);
0a8a80e5 197 ToFetch++;
93bf083d 198
0a8a80e5
AL
199 // Some trace stuff
200 if (Debug == true)
201 {
8267fe24
AL
202 clog << "Fetching " << Item.URI << endl;
203 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 204 clog << " Queue is: " << Name << endl;
0a8a80e5 205 }
3b5421b4
AL
206}
207 /*}}}*/
0a8a80e5 208// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 209// ---------------------------------------------------------------------
93bf083d
AL
210/* This is called when an item is finished being fetched. It removes it
211 from all the queues */
0a8a80e5
AL
212void pkgAcquire::Dequeue(Item *Itm)
213{
214 Queue *I = Queues;
bfd22fc0 215 bool Res = false;
0a8a80e5 216 for (; I != 0; I = I->Next)
bfd22fc0 217 Res |= I->Dequeue(Itm);
93bf083d
AL
218
219 if (Debug == true)
220 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
221 if (Res == true)
222 ToFetch--;
0a8a80e5
AL
223}
224 /*}}}*/
225// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
226// ---------------------------------------------------------------------
227/* The string returned depends on the configuration settings and the
228 method parameters. Given something like http://foo.org/bar it can
229 return http://foo.org or http */
e331f6ed 230string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 231{
93bf083d
AL
232 URI U(Uri);
233
e331f6ed 234 Config = GetConfig(U.Access);
0a8a80e5
AL
235 if (Config == 0)
236 return string();
237
238 /* Single-Instance methods get exactly one queue per URI. This is
239 also used for the Access queue method */
240 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 241 return U.Access;
93bf083d
AL
242
243 return U.Access + ':' + U.Host;
0118833a
AL
244}
245 /*}}}*/
3b5421b4
AL
246// Acquire::GetConfig - Fetch the configuration information /*{{{*/
247// ---------------------------------------------------------------------
248/* This locates the configuration structure for an access method. If
249 a config structure cannot be found a Worker will be created to
250 retrieve it */
0a8a80e5 251pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
252{
253 // Search for an existing config
254 MethodConfig *Conf;
255 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
256 if (Conf->Access == Access)
257 return Conf;
258
259 // Create the new config class
260 Conf = new MethodConfig;
261 Conf->Access = Access;
262 Conf->Next = Configs;
263 Configs = Conf;
0118833a 264
3b5421b4
AL
265 // Create the worker to fetch the configuration
266 Worker Work(Conf);
267 if (Work.Start() == false)
268 return 0;
269
270 return Conf;
271}
272 /*}}}*/
0a8a80e5
AL
273// Acquire::SetFds - Deal with readable FDs /*{{{*/
274// ---------------------------------------------------------------------
275/* Collect FDs that have activity monitors into the fd sets */
276void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
277{
278 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
279 {
280 if (I->InReady == true && I->InFd >= 0)
281 {
282 if (Fd < I->InFd)
283 Fd = I->InFd;
284 FD_SET(I->InFd,RSet);
285 }
286 if (I->OutReady == true && I->OutFd >= 0)
287 {
288 if (Fd < I->OutFd)
289 Fd = I->OutFd;
290 FD_SET(I->OutFd,WSet);
291 }
292 }
293}
294 /*}}}*/
295// Acquire::RunFds - Deal with active FDs /*{{{*/
296// ---------------------------------------------------------------------
93bf083d
AL
297/* Dispatch active FDs over to the proper workers. It is very important
298 that a worker never be erased while this is running! The queue class
299 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
300void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
301{
302 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
303 {
304 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
305 I->InFdReady();
306 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
307 I->OutFdReady();
308 }
309}
310 /*}}}*/
311// Acquire::Run - Run the fetch sequence /*{{{*/
312// ---------------------------------------------------------------------
313/* This runs the queues. It manages a select loop for all of the
314 Worker tasks. The workers interact with the queues and items to
315 manage the actual fetch. */
1c5f7e5f 316pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 317{
8b89e57f
AL
318 Running = true;
319
0a8a80e5
AL
320 for (Queue *I = Queues; I != 0; I = I->Next)
321 I->Startup();
322
b98f2859
AL
323 if (Log != 0)
324 Log->Start();
325
024d1123
AL
326 bool WasCancelled = false;
327
0a8a80e5 328 // Run till all things have been acquired
8267fe24
AL
329 struct timeval tv;
330 tv.tv_sec = 0;
1c5f7e5f 331 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
332 while (ToFetch > 0)
333 {
334 fd_set RFds;
335 fd_set WFds;
336 int Highest = 0;
337 FD_ZERO(&RFds);
338 FD_ZERO(&WFds);
339 SetFds(Highest,&RFds,&WFds);
340
b0db36b1
AL
341 int Res;
342 do
343 {
344 Res = select(Highest+1,&RFds,&WFds,0,&tv);
345 }
346 while (Res < 0 && errno == EINTR);
347
8267fe24 348 if (Res < 0)
8b89e57f 349 {
8267fe24
AL
350 _error->Errno("select","Select has failed");
351 break;
8b89e57f 352 }
93bf083d 353
0a8a80e5 354 RunFds(&RFds,&WFds);
93bf083d
AL
355 if (_error->PendingError() == true)
356 break;
8267fe24
AL
357
358 // Timeout, notify the log class
359 if (Res == 0 || (Log != 0 && Log->Update == true))
360 {
1c5f7e5f 361 tv.tv_usec = PulseIntervall;
8267fe24
AL
362 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
363 I->Pulse();
024d1123
AL
364 if (Log != 0 && Log->Pulse(this) == false)
365 {
366 WasCancelled = true;
367 break;
368 }
8267fe24 369 }
0a8a80e5 370 }
be4401bf 371
b98f2859
AL
372 if (Log != 0)
373 Log->Stop();
374
be4401bf
AL
375 // Shut down the acquire bits
376 Running = false;
0a8a80e5 377 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 378 I->Shutdown(false);
0a8a80e5 379
ab559b35 380 // Shut down the items
b4fc9b6f 381 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 382 (*I)->Finished();
ab559b35 383
024d1123
AL
384 if (_error->PendingError())
385 return Failed;
386 if (WasCancelled)
387 return Cancelled;
388 return Continue;
93bf083d
AL
389}
390 /*}}}*/
be4401bf 391// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
392// ---------------------------------------------------------------------
393/* This routine bumps idle queues in hopes that they will be able to fetch
394 the dequeued item */
395void pkgAcquire::Bump()
396{
be4401bf
AL
397 for (Queue *I = Queues; I != 0; I = I->Next)
398 I->Bump();
0a8a80e5
AL
399}
400 /*}}}*/
8267fe24
AL
401// Acquire::WorkerStep - Step to the next worker /*{{{*/
402// ---------------------------------------------------------------------
403/* Not inlined to advoid including acquire-worker.h */
404pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
405{
406 return I->NextAcquire;
407};
408 /*}}}*/
a6568219 409// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
410// ---------------------------------------------------------------------
411/* This is a bit simplistic, it looks at every file in the dir and sees
412 if it is part of the download set. */
413bool pkgAcquire::Clean(string Dir)
414{
415 DIR *D = opendir(Dir.c_str());
416 if (D == 0)
b2e465d6 417 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
418
419 string StartDir = SafeGetCWD();
420 if (chdir(Dir.c_str()) != 0)
421 {
422 closedir(D);
b2e465d6 423 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
424 }
425
426 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
427 {
428 // Skip some files..
429 if (strcmp(Dir->d_name,"lock") == 0 ||
430 strcmp(Dir->d_name,"partial") == 0 ||
431 strcmp(Dir->d_name,".") == 0 ||
432 strcmp(Dir->d_name,"..") == 0)
433 continue;
434
435 // Look in the get list
b4fc9b6f 436 ItemCIterator I = Items.begin();
7a7fa5f0
AL
437 for (; I != Items.end(); I++)
438 if (flNotDir((*I)->DestFile) == Dir->d_name)
439 break;
440
441 // Nothing found, nuke it
442 if (I == Items.end())
443 unlink(Dir->d_name);
444 };
445
446 chdir(StartDir.c_str());
447 closedir(D);
448 return true;
449}
450 /*}}}*/
a6568219
AL
451// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
452// ---------------------------------------------------------------------
453/* This is the total number of bytes needed */
b2e465d6 454double pkgAcquire::TotalNeeded()
a6568219 455{
b2e465d6 456 double Total = 0;
b4fc9b6f 457 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
458 Total += (*I)->FileSize;
459 return Total;
460}
461 /*}}}*/
462// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
463// ---------------------------------------------------------------------
464/* This is the number of bytes that is not local */
b2e465d6 465double pkgAcquire::FetchNeeded()
a6568219 466{
b2e465d6 467 double Total = 0;
b4fc9b6f 468 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
469 if ((*I)->Local == false)
470 Total += (*I)->FileSize;
471 return Total;
472}
473 /*}}}*/
6b1ff003
AL
474// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
475// ---------------------------------------------------------------------
476/* This is the number of bytes that is not local */
b2e465d6 477double pkgAcquire::PartialPresent()
6b1ff003 478{
b2e465d6 479 double Total = 0;
b4fc9b6f 480 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
481 if ((*I)->Local == false)
482 Total += (*I)->PartialSize;
483 return Total;
484}
b3d44315 485
8e5fc8f5 486// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
487// ---------------------------------------------------------------------
488/* */
489pkgAcquire::UriIterator pkgAcquire::UriBegin()
490{
491 return UriIterator(Queues);
492}
493 /*}}}*/
8e5fc8f5 494// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
495// ---------------------------------------------------------------------
496/* */
497pkgAcquire::UriIterator pkgAcquire::UriEnd()
498{
499 return UriIterator(0);
500}
501 /*}}}*/
0a8a80e5 502
e331f6ed
AL
503// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
504// ---------------------------------------------------------------------
505/* */
506pkgAcquire::MethodConfig::MethodConfig()
507{
508 SingleInstance = false;
e331f6ed
AL
509 Pipeline = false;
510 SendConfig = false;
511 LocalOnly = false;
459681d3 512 Removable = false;
e331f6ed
AL
513 Next = 0;
514}
515 /*}}}*/
516
0a8a80e5
AL
517// Queue::Queue - Constructor /*{{{*/
518// ---------------------------------------------------------------------
519/* */
520pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
521 Owner(Owner)
522{
523 Items = 0;
524 Next = 0;
525 Workers = 0;
b185acc2
AL
526 MaxPipeDepth = 1;
527 PipeDepth = 0;
0a8a80e5
AL
528}
529 /*}}}*/
530// Queue::~Queue - Destructor /*{{{*/
531// ---------------------------------------------------------------------
532/* */
533pkgAcquire::Queue::~Queue()
534{
8e5fc8f5 535 Shutdown(true);
0a8a80e5
AL
536
537 while (Items != 0)
538 {
539 QItem *Jnk = Items;
540 Items = Items->Next;
541 delete Jnk;
542 }
543}
544 /*}}}*/
545// Queue::Enqueue - Queue an item to the queue /*{{{*/
546// ---------------------------------------------------------------------
547/* */
8267fe24 548void pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 549{
7a1b1f8b
AL
550 QItem **I = &Items;
551 for (; *I != 0; I = &(*I)->Next);
552
0a8a80e5 553 // Create a new item
7a1b1f8b
AL
554 QItem *Itm = new QItem;
555 *Itm = Item;
556 Itm->Next = 0;
557 *I = Itm;
0a8a80e5 558
8267fe24 559 Item.Owner->QueueCounter++;
93bf083d
AL
560 if (Items->Next == 0)
561 Cycle();
0a8a80e5
AL
562}
563 /*}}}*/
c88edf1d 564// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 565// ---------------------------------------------------------------------
b185acc2 566/* We return true if we hit something */
bfd22fc0 567bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 568{
b185acc2
AL
569 if (Owner->Status == pkgAcquire::Item::StatFetching)
570 return _error->Error("Tried to dequeue a fetching object");
571
bfd22fc0
AL
572 bool Res = false;
573
0a8a80e5
AL
574 QItem **I = &Items;
575 for (; *I != 0;)
576 {
577 if ((*I)->Owner == Owner)
578 {
579 QItem *Jnk= *I;
580 *I = (*I)->Next;
581 Owner->QueueCounter--;
582 delete Jnk;
bfd22fc0 583 Res = true;
0a8a80e5
AL
584 }
585 else
586 I = &(*I)->Next;
587 }
bfd22fc0
AL
588
589 return Res;
0a8a80e5
AL
590}
591 /*}}}*/
592// Queue::Startup - Start the worker processes /*{{{*/
593// ---------------------------------------------------------------------
8e5fc8f5
AL
594/* It is possible for this to be called with a pre-existing set of
595 workers. */
0a8a80e5
AL
596bool pkgAcquire::Queue::Startup()
597{
8e5fc8f5
AL
598 if (Workers == 0)
599 {
600 URI U(Name);
601 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
602 if (Cnf == 0)
603 return false;
604
605 Workers = new Worker(this,Cnf,Owner->Log);
606 Owner->Add(Workers);
607 if (Workers->Start() == false)
608 return false;
609
610 /* When pipelining we commit 10 items. This needs to change when we
611 added other source retry to have cycle maintain a pipeline depth
612 on its own. */
613 if (Cnf->Pipeline == true)
614 MaxPipeDepth = 10;
615 else
616 MaxPipeDepth = 1;
617 }
5cb5d8dc 618
93bf083d 619 return Cycle();
0a8a80e5
AL
620}
621 /*}}}*/
622// Queue::Shutdown - Shutdown the worker processes /*{{{*/
623// ---------------------------------------------------------------------
8e5fc8f5
AL
624/* If final is true then all workers are eliminated, otherwise only workers
625 that do not need cleanup are removed */
626bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
627{
628 // Delete all of the workers
8e5fc8f5
AL
629 pkgAcquire::Worker **Cur = &Workers;
630 while (*Cur != 0)
0a8a80e5 631 {
8e5fc8f5
AL
632 pkgAcquire::Worker *Jnk = *Cur;
633 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
634 {
635 *Cur = Jnk->NextQueue;
636 Owner->Remove(Jnk);
637 delete Jnk;
638 }
639 else
640 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
641 }
642
643 return true;
3b5421b4
AL
644}
645 /*}}}*/
7d8afa39 646// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
647// ---------------------------------------------------------------------
648/* */
649pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
650{
651 for (QItem *I = Items; I != 0; I = I->Next)
652 if (I->URI == URI && I->Worker == Owner)
653 return I;
654 return 0;
655}
656 /*}}}*/
657// Queue::ItemDone - Item has been completed /*{{{*/
658// ---------------------------------------------------------------------
659/* The worker signals this which causes the item to be removed from the
93bf083d
AL
660 queue. If this is the last queue instance then it is removed from the
661 main queue too.*/
c88edf1d
AL
662bool pkgAcquire::Queue::ItemDone(QItem *Itm)
663{
b185acc2 664 PipeDepth--;
db890fdb
AL
665 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
666 Itm->Owner->Status = pkgAcquire::Item::StatDone;
667
93bf083d
AL
668 if (Itm->Owner->QueueCounter <= 1)
669 Owner->Dequeue(Itm->Owner);
670 else
671 {
672 Dequeue(Itm->Owner);
673 Owner->Bump();
674 }
c88edf1d 675
93bf083d
AL
676 return Cycle();
677}
678 /*}}}*/
679// Queue::Cycle - Queue new items into the method /*{{{*/
680// ---------------------------------------------------------------------
b185acc2
AL
681/* This locates a new idle item and sends it to the worker. If pipelining
682 is enabled then it keeps the pipe full. */
93bf083d
AL
683bool pkgAcquire::Queue::Cycle()
684{
685 if (Items == 0 || Workers == 0)
c88edf1d
AL
686 return true;
687
e7432370
AL
688 if (PipeDepth < 0)
689 return _error->Error("Pipedepth failure");
690
93bf083d
AL
691 // Look for a queable item
692 QItem *I = Items;
e7432370 693 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
694 {
695 for (; I != 0; I = I->Next)
696 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
697 break;
698
699 // Nothing to do, queue is idle.
700 if (I == 0)
701 return true;
702
703 I->Worker = Workers;
704 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 705 PipeDepth++;
b185acc2
AL
706 if (Workers->QueueItem(I) == false)
707 return false;
708 }
93bf083d 709
b185acc2 710 return true;
c88edf1d
AL
711}
712 /*}}}*/
be4401bf
AL
713// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
714// ---------------------------------------------------------------------
b185acc2 715/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
716void pkgAcquire::Queue::Bump()
717{
b185acc2 718 Cycle();
be4401bf
AL
719}
720 /*}}}*/
b98f2859
AL
721
722// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
723// ---------------------------------------------------------------------
724/* */
c5ccf175 725pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
726{
727 Start();
728}
729 /*}}}*/
730// AcquireStatus::Pulse - Called periodically /*{{{*/
731// ---------------------------------------------------------------------
732/* This computes some internal state variables for the derived classes to
733 use. It generates the current downloaded bytes and total bytes to download
734 as well as the current CPS estimate. */
024d1123 735bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
736{
737 TotalBytes = 0;
738 CurrentBytes = 0;
d568ed2d
AL
739 TotalItems = 0;
740 CurrentItems = 0;
b98f2859
AL
741
742 // Compute the total number of bytes to fetch
743 unsigned int Unknown = 0;
744 unsigned int Count = 0;
b4fc9b6f 745 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
746 I++, Count++)
747 {
d568ed2d
AL
748 TotalItems++;
749 if ((*I)->Status == pkgAcquire::Item::StatDone)
750 CurrentItems++;
751
a6568219
AL
752 // Totally ignore local items
753 if ((*I)->Local == true)
754 continue;
b2e465d6 755
b98f2859
AL
756 TotalBytes += (*I)->FileSize;
757 if ((*I)->Complete == true)
758 CurrentBytes += (*I)->FileSize;
759 if ((*I)->FileSize == 0 && (*I)->Complete == false)
760 Unknown++;
761 }
762
763 // Compute the current completion
aa0e1101 764 unsigned long ResumeSize = 0;
b98f2859
AL
765 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
766 I = Owner->WorkerStep(I))
767 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
768 {
769 CurrentBytes += I->CurrentSize;
770 ResumeSize += I->ResumePoint;
771
772 // Files with unknown size always have 100% completion
773 if (I->CurrentItem->Owner->FileSize == 0 &&
774 I->CurrentItem->Owner->Complete == false)
775 TotalBytes += I->CurrentSize;
776 }
777
b98f2859
AL
778 // Normalize the figures and account for unknown size downloads
779 if (TotalBytes <= 0)
780 TotalBytes = 1;
781 if (Unknown == Count)
782 TotalBytes = Unknown;
18ef0a78
AL
783
784 // Wha?! Is not supposed to happen.
785 if (CurrentBytes > TotalBytes)
786 CurrentBytes = TotalBytes;
b98f2859
AL
787
788 // Compute the CPS
789 struct timeval NewTime;
790 gettimeofday(&NewTime,0);
791 if (NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec ||
792 NewTime.tv_sec - Time.tv_sec > 6)
793 {
f17ac097
AL
794 double Delta = NewTime.tv_sec - Time.tv_sec +
795 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 796
b98f2859 797 // Compute the CPS value
f17ac097 798 if (Delta < 0.01)
e331f6ed
AL
799 CurrentCPS = 0;
800 else
aa0e1101
AL
801 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
802 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 803 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
804 Time = NewTime;
805 }
024d1123 806
75ef8f14
MV
807 int fd = _config->FindI("APT::Status-Fd",-1);
808 if(fd > 0)
809 {
810 ostringstream status;
811
812 char msg[200];
813 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
814 unsigned long ETA =
815 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
816
817 snprintf(msg,sizeof(msg), _("Downloading file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
818
819 // build the status str
820 status << "dlstatus:" << i
821 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
822 << ":" << msg
823 << endl;
824 write(fd, status.str().c_str(), status.str().size());
825 }
826
024d1123 827 return true;
b98f2859
AL
828}
829 /*}}}*/
830// AcquireStatus::Start - Called when the download is started /*{{{*/
831// ---------------------------------------------------------------------
832/* We just reset the counters */
833void pkgAcquireStatus::Start()
834{
835 gettimeofday(&Time,0);
836 gettimeofday(&StartTime,0);
837 LastBytes = 0;
838 CurrentCPS = 0;
839 CurrentBytes = 0;
840 TotalBytes = 0;
841 FetchedBytes = 0;
842 ElapsedTime = 0;
d568ed2d
AL
843 TotalItems = 0;
844 CurrentItems = 0;
b98f2859
AL
845}
846 /*}}}*/
a6568219 847// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
848// ---------------------------------------------------------------------
849/* This accurately computes the elapsed time and the total overall CPS. */
850void pkgAcquireStatus::Stop()
851{
852 // Compute the CPS and elapsed time
853 struct timeval NewTime;
854 gettimeofday(&NewTime,0);
855
31a0531d
AL
856 double Delta = NewTime.tv_sec - StartTime.tv_sec +
857 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 858
b98f2859 859 // Compute the CPS value
31a0531d 860 if (Delta < 0.01)
e331f6ed
AL
861 CurrentCPS = 0;
862 else
31a0531d 863 CurrentCPS = FetchedBytes/Delta;
b98f2859 864 LastBytes = CurrentBytes;
31a0531d 865 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
866}
867 /*}}}*/
868// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
869// ---------------------------------------------------------------------
870/* This is used to get accurate final transfer rate reporting. */
871void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 872{
b98f2859
AL
873 FetchedBytes += Size - Resume;
874}
875 /*}}}*/