]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
add the various foldmarkers in apt-pkg & cmdline (no code change)
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
0a8a80e5
AL
8 The core element for the schedual system is the concept of a named
9 queue. Each queue is unique and each queue has a name derived from the
10 URI. The degree of paralization can be controled by how the queue
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
0118833a
AL
16#include <apt-pkg/acquire.h>
17#include <apt-pkg/acquire-item.h>
18#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
19#include <apt-pkg/configuration.h>
20#include <apt-pkg/error.h>
cdcc6d34 21#include <apt-pkg/strutl.h>
8267fe24 22
b2e465d6 23#include <apti18n.h>
b4fc9b6f
AL
24
25#include <iostream>
75ef8f14 26#include <sstream>
b2e465d6 27
7a7fa5f0 28#include <dirent.h>
8267fe24 29#include <sys/time.h>
524f8105 30#include <errno.h>
5f3edc0f 31#include <sys/stat.h>
0118833a
AL
32 /*}}}*/
33
b4fc9b6f
AL
34using namespace std;
35
0118833a
AL
36// Acquire::pkgAcquire - Constructor /*{{{*/
37// ---------------------------------------------------------------------
93bf083d 38/* We grab some runtime state from the configuration space */
8267fe24 39pkgAcquire::pkgAcquire(pkgAcquireStatus *Log) : Log(Log)
0118833a
AL
40{
41 Queues = 0;
42 Configs = 0;
0a8a80e5
AL
43 Workers = 0;
44 ToFetch = 0;
8b89e57f 45 Running = false;
0a8a80e5
AL
46
47 string Mode = _config->Find("Acquire::Queue-Mode","host");
48 if (strcasecmp(Mode.c_str(),"host") == 0)
49 QueueMode = QueueHost;
50 if (strcasecmp(Mode.c_str(),"access") == 0)
51 QueueMode = QueueAccess;
52
53 Debug = _config->FindB("Debug::pkgAcquire",false);
5f3edc0f 54
0c55d1d2 55 // This is really a stupid place for this
5f3edc0f
AL
56 struct stat St;
57 if (stat((_config->FindDir("Dir::State::lists") + "partial/").c_str(),&St) != 0 ||
58 S_ISDIR(St.st_mode) == 0)
b2e465d6 59 _error->Error(_("Lists directory %spartial is missing."),
5f3edc0f
AL
60 _config->FindDir("Dir::State::lists").c_str());
61 if (stat((_config->FindDir("Dir::Cache::Archives") + "partial/").c_str(),&St) != 0 ||
62 S_ISDIR(St.st_mode) == 0)
b2e465d6 63 _error->Error(_("Archive directory %spartial is missing."),
5f3edc0f 64 _config->FindDir("Dir::Cache::Archives").c_str());
0118833a
AL
65}
66 /*}}}*/
67// Acquire::~pkgAcquire - Destructor /*{{{*/
68// ---------------------------------------------------------------------
93bf083d 69/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
70pkgAcquire::~pkgAcquire()
71{
459681d3
AL
72 Shutdown();
73
3b5421b4
AL
74 while (Configs != 0)
75 {
76 MethodConfig *Jnk = Configs;
77 Configs = Configs->Next;
78 delete Jnk;
79 }
281daf46
AL
80}
81 /*}}}*/
8e5fc8f5 82// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
83// ---------------------------------------------------------------------
84/* */
85void pkgAcquire::Shutdown()
86{
87 while (Items.size() != 0)
1b480911
AL
88 {
89 if (Items[0]->Status == Item::StatFetching)
90 Items[0]->Status = Item::StatError;
281daf46 91 delete Items[0];
1b480911 92 }
0a8a80e5
AL
93
94 while (Queues != 0)
95 {
96 Queue *Jnk = Queues;
97 Queues = Queues->Next;
98 delete Jnk;
99 }
0118833a
AL
100}
101 /*}}}*/
102// Acquire::Add - Add a new item /*{{{*/
103// ---------------------------------------------------------------------
93bf083d
AL
104/* This puts an item on the acquire list. This list is mainly for tracking
105 item status */
0118833a
AL
106void pkgAcquire::Add(Item *Itm)
107{
108 Items.push_back(Itm);
109}
110 /*}}}*/
111// Acquire::Remove - Remove a item /*{{{*/
112// ---------------------------------------------------------------------
93bf083d 113/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
114void pkgAcquire::Remove(Item *Itm)
115{
a3eaf954
AL
116 Dequeue(Itm);
117
753b3525 118 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
119 {
120 if (*I == Itm)
b4fc9b6f 121 {
0118833a 122 Items.erase(I);
b4fc9b6f
AL
123 I = Items.begin();
124 }
753b3525
AL
125 else
126 I++;
8267fe24 127 }
0118833a
AL
128}
129 /*}}}*/
0a8a80e5
AL
130// Acquire::Add - Add a worker /*{{{*/
131// ---------------------------------------------------------------------
93bf083d
AL
132/* A list of workers is kept so that the select loop can direct their FD
133 usage. */
0a8a80e5
AL
134void pkgAcquire::Add(Worker *Work)
135{
136 Work->NextAcquire = Workers;
137 Workers = Work;
138}
139 /*}}}*/
140// Acquire::Remove - Remove a worker /*{{{*/
141// ---------------------------------------------------------------------
93bf083d
AL
142/* A worker has died. This can not be done while the select loop is running
143 as it would require that RunFds could handling a changing list state and
144 it cant.. */
0a8a80e5
AL
145void pkgAcquire::Remove(Worker *Work)
146{
93bf083d
AL
147 if (Running == true)
148 abort();
149
0a8a80e5
AL
150 Worker **I = &Workers;
151 for (; *I != 0;)
152 {
153 if (*I == Work)
154 *I = (*I)->NextAcquire;
155 else
156 I = &(*I)->NextAcquire;
157 }
158}
159 /*}}}*/
0118833a
AL
160// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
161// ---------------------------------------------------------------------
93bf083d 162/* This is the entry point for an item. An item calls this function when
281daf46 163 it is constructed which creates a queue (based on the current queue
93bf083d
AL
164 mode) and puts the item in that queue. If the system is running then
165 the queue might be started. */
8267fe24 166void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 167{
0a8a80e5 168 // Determine which queue to put the item in
e331f6ed
AL
169 const MethodConfig *Config;
170 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
171 if (Name.empty() == true)
172 return;
173
174 // Find the queue structure
175 Queue *I = Queues;
176 for (; I != 0 && I->Name != Name; I = I->Next);
177 if (I == 0)
178 {
179 I = new Queue(Name,this);
180 I->Next = Queues;
181 Queues = I;
93bf083d
AL
182
183 if (Running == true)
184 I->Startup();
0a8a80e5 185 }
bfd22fc0 186
e331f6ed
AL
187 // See if this is a local only URI
188 if (Config->LocalOnly == true && Item.Owner->Complete == false)
189 Item.Owner->Local = true;
8267fe24 190 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
191
192 // Queue it into the named queue
c03462c6
MV
193 if(I->Enqueue(Item))
194 ToFetch++;
195
0a8a80e5
AL
196 // Some trace stuff
197 if (Debug == true)
198 {
8267fe24
AL
199 clog << "Fetching " << Item.URI << endl;
200 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 201 clog << " Queue is: " << Name << endl;
0a8a80e5 202 }
3b5421b4
AL
203}
204 /*}}}*/
0a8a80e5 205// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 206// ---------------------------------------------------------------------
93bf083d
AL
207/* This is called when an item is finished being fetched. It removes it
208 from all the queues */
0a8a80e5
AL
209void pkgAcquire::Dequeue(Item *Itm)
210{
211 Queue *I = Queues;
bfd22fc0 212 bool Res = false;
0a8a80e5 213 for (; I != 0; I = I->Next)
bfd22fc0 214 Res |= I->Dequeue(Itm);
93bf083d
AL
215
216 if (Debug == true)
217 clog << "Dequeuing " << Itm->DestFile << endl;
bfd22fc0
AL
218 if (Res == true)
219 ToFetch--;
0a8a80e5
AL
220}
221 /*}}}*/
222// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
223// ---------------------------------------------------------------------
224/* The string returned depends on the configuration settings and the
225 method parameters. Given something like http://foo.org/bar it can
226 return http://foo.org or http */
e331f6ed 227string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 228{
93bf083d
AL
229 URI U(Uri);
230
e331f6ed 231 Config = GetConfig(U.Access);
0a8a80e5
AL
232 if (Config == 0)
233 return string();
234
235 /* Single-Instance methods get exactly one queue per URI. This is
236 also used for the Access queue method */
237 if (Config->SingleInstance == true || QueueMode == QueueAccess)
b98f2859 238 return U.Access;
93bf083d
AL
239
240 return U.Access + ':' + U.Host;
0118833a
AL
241}
242 /*}}}*/
3b5421b4
AL
243// Acquire::GetConfig - Fetch the configuration information /*{{{*/
244// ---------------------------------------------------------------------
245/* This locates the configuration structure for an access method. If
246 a config structure cannot be found a Worker will be created to
247 retrieve it */
0a8a80e5 248pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
249{
250 // Search for an existing config
251 MethodConfig *Conf;
252 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
253 if (Conf->Access == Access)
254 return Conf;
255
256 // Create the new config class
257 Conf = new MethodConfig;
258 Conf->Access = Access;
259 Conf->Next = Configs;
260 Configs = Conf;
0118833a 261
3b5421b4
AL
262 // Create the worker to fetch the configuration
263 Worker Work(Conf);
264 if (Work.Start() == false)
265 return 0;
7c6e2dc7
MV
266
267 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 268 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
269 Conf->SingleInstance = true;
270
3b5421b4
AL
271 return Conf;
272}
273 /*}}}*/
0a8a80e5
AL
274// Acquire::SetFds - Deal with readable FDs /*{{{*/
275// ---------------------------------------------------------------------
276/* Collect FDs that have activity monitors into the fd sets */
277void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
278{
279 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
280 {
281 if (I->InReady == true && I->InFd >= 0)
282 {
283 if (Fd < I->InFd)
284 Fd = I->InFd;
285 FD_SET(I->InFd,RSet);
286 }
287 if (I->OutReady == true && I->OutFd >= 0)
288 {
289 if (Fd < I->OutFd)
290 Fd = I->OutFd;
291 FD_SET(I->OutFd,WSet);
292 }
293 }
294}
295 /*}}}*/
296// Acquire::RunFds - Deal with active FDs /*{{{*/
297// ---------------------------------------------------------------------
93bf083d
AL
298/* Dispatch active FDs over to the proper workers. It is very important
299 that a worker never be erased while this is running! The queue class
300 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
301void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
302{
303 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
304 {
305 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
306 I->InFdReady();
307 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
308 I->OutFdReady();
309 }
310}
311 /*}}}*/
312// Acquire::Run - Run the fetch sequence /*{{{*/
313// ---------------------------------------------------------------------
314/* This runs the queues. It manages a select loop for all of the
315 Worker tasks. The workers interact with the queues and items to
316 manage the actual fetch. */
1c5f7e5f 317pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 318{
8b89e57f
AL
319 Running = true;
320
0a8a80e5
AL
321 for (Queue *I = Queues; I != 0; I = I->Next)
322 I->Startup();
323
b98f2859
AL
324 if (Log != 0)
325 Log->Start();
326
024d1123
AL
327 bool WasCancelled = false;
328
0a8a80e5 329 // Run till all things have been acquired
8267fe24
AL
330 struct timeval tv;
331 tv.tv_sec = 0;
1c5f7e5f 332 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
333 while (ToFetch > 0)
334 {
335 fd_set RFds;
336 fd_set WFds;
337 int Highest = 0;
338 FD_ZERO(&RFds);
339 FD_ZERO(&WFds);
340 SetFds(Highest,&RFds,&WFds);
341
b0db36b1
AL
342 int Res;
343 do
344 {
345 Res = select(Highest+1,&RFds,&WFds,0,&tv);
346 }
347 while (Res < 0 && errno == EINTR);
348
8267fe24 349 if (Res < 0)
8b89e57f 350 {
8267fe24
AL
351 _error->Errno("select","Select has failed");
352 break;
8b89e57f 353 }
93bf083d 354
0a8a80e5 355 RunFds(&RFds,&WFds);
93bf083d
AL
356 if (_error->PendingError() == true)
357 break;
8267fe24
AL
358
359 // Timeout, notify the log class
360 if (Res == 0 || (Log != 0 && Log->Update == true))
361 {
1c5f7e5f 362 tv.tv_usec = PulseIntervall;
8267fe24
AL
363 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
364 I->Pulse();
024d1123
AL
365 if (Log != 0 && Log->Pulse(this) == false)
366 {
367 WasCancelled = true;
368 break;
369 }
8267fe24 370 }
0a8a80e5 371 }
be4401bf 372
b98f2859
AL
373 if (Log != 0)
374 Log->Stop();
375
be4401bf
AL
376 // Shut down the acquire bits
377 Running = false;
0a8a80e5 378 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 379 I->Shutdown(false);
0a8a80e5 380
ab559b35 381 // Shut down the items
b4fc9b6f 382 for (ItemIterator I = Items.begin(); I != Items.end(); I++)
8e5fc8f5 383 (*I)->Finished();
ab559b35 384
024d1123
AL
385 if (_error->PendingError())
386 return Failed;
387 if (WasCancelled)
388 return Cancelled;
389 return Continue;
93bf083d
AL
390}
391 /*}}}*/
be4401bf 392// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
393// ---------------------------------------------------------------------
394/* This routine bumps idle queues in hopes that they will be able to fetch
395 the dequeued item */
396void pkgAcquire::Bump()
397{
be4401bf
AL
398 for (Queue *I = Queues; I != 0; I = I->Next)
399 I->Bump();
0a8a80e5
AL
400}
401 /*}}}*/
8267fe24
AL
402// Acquire::WorkerStep - Step to the next worker /*{{{*/
403// ---------------------------------------------------------------------
404/* Not inlined to advoid including acquire-worker.h */
405pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
406{
407 return I->NextAcquire;
408};
409 /*}}}*/
a6568219 410// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
411// ---------------------------------------------------------------------
412/* This is a bit simplistic, it looks at every file in the dir and sees
413 if it is part of the download set. */
414bool pkgAcquire::Clean(string Dir)
415{
416 DIR *D = opendir(Dir.c_str());
417 if (D == 0)
b2e465d6 418 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
419
420 string StartDir = SafeGetCWD();
421 if (chdir(Dir.c_str()) != 0)
422 {
423 closedir(D);
b2e465d6 424 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
425 }
426
427 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
428 {
429 // Skip some files..
430 if (strcmp(Dir->d_name,"lock") == 0 ||
431 strcmp(Dir->d_name,"partial") == 0 ||
432 strcmp(Dir->d_name,".") == 0 ||
433 strcmp(Dir->d_name,"..") == 0)
434 continue;
435
436 // Look in the get list
b4fc9b6f 437 ItemCIterator I = Items.begin();
7a7fa5f0
AL
438 for (; I != Items.end(); I++)
439 if (flNotDir((*I)->DestFile) == Dir->d_name)
440 break;
441
442 // Nothing found, nuke it
443 if (I == Items.end())
444 unlink(Dir->d_name);
445 };
446
7a7fa5f0 447 closedir(D);
3c8cda8b
MV
448 if (chdir(StartDir.c_str()) != 0)
449 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
450 return true;
451}
452 /*}}}*/
a6568219
AL
453// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
454// ---------------------------------------------------------------------
455/* This is the total number of bytes needed */
b2e465d6 456double pkgAcquire::TotalNeeded()
a6568219 457{
b2e465d6 458 double Total = 0;
b4fc9b6f 459 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
460 Total += (*I)->FileSize;
461 return Total;
462}
463 /*}}}*/
464// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
465// ---------------------------------------------------------------------
466/* This is the number of bytes that is not local */
b2e465d6 467double pkgAcquire::FetchNeeded()
a6568219 468{
b2e465d6 469 double Total = 0;
b4fc9b6f 470 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
a6568219
AL
471 if ((*I)->Local == false)
472 Total += (*I)->FileSize;
473 return Total;
474}
475 /*}}}*/
6b1ff003
AL
476// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
477// ---------------------------------------------------------------------
478/* This is the number of bytes that is not local */
b2e465d6 479double pkgAcquire::PartialPresent()
6b1ff003 480{
b2e465d6 481 double Total = 0;
b4fc9b6f 482 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); I++)
6b1ff003
AL
483 if ((*I)->Local == false)
484 Total += (*I)->PartialSize;
485 return Total;
486}
92fcbfc1 487 /*}}}*/
8e5fc8f5 488// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
489// ---------------------------------------------------------------------
490/* */
491pkgAcquire::UriIterator pkgAcquire::UriBegin()
492{
493 return UriIterator(Queues);
494}
495 /*}}}*/
8e5fc8f5 496// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
497// ---------------------------------------------------------------------
498/* */
499pkgAcquire::UriIterator pkgAcquire::UriEnd()
500{
501 return UriIterator(0);
502}
503 /*}}}*/
e331f6ed
AL
504// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
505// ---------------------------------------------------------------------
506/* */
507pkgAcquire::MethodConfig::MethodConfig()
508{
509 SingleInstance = false;
e331f6ed
AL
510 Pipeline = false;
511 SendConfig = false;
512 LocalOnly = false;
459681d3 513 Removable = false;
e331f6ed
AL
514 Next = 0;
515}
516 /*}}}*/
0a8a80e5
AL
517// Queue::Queue - Constructor /*{{{*/
518// ---------------------------------------------------------------------
519/* */
520pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
521 Owner(Owner)
522{
523 Items = 0;
524 Next = 0;
525 Workers = 0;
b185acc2
AL
526 MaxPipeDepth = 1;
527 PipeDepth = 0;
0a8a80e5
AL
528}
529 /*}}}*/
530// Queue::~Queue - Destructor /*{{{*/
531// ---------------------------------------------------------------------
532/* */
533pkgAcquire::Queue::~Queue()
534{
8e5fc8f5 535 Shutdown(true);
0a8a80e5
AL
536
537 while (Items != 0)
538 {
539 QItem *Jnk = Items;
540 Items = Items->Next;
541 delete Jnk;
542 }
543}
544 /*}}}*/
545// Queue::Enqueue - Queue an item to the queue /*{{{*/
546// ---------------------------------------------------------------------
547/* */
c03462c6 548bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 549{
7a1b1f8b 550 QItem **I = &Items;
c03462c6
MV
551 // move to the end of the queue and check for duplicates here
552 for (; *I != 0; I = &(*I)->Next)
553 if (Item.URI == (*I)->URI)
554 {
555 Item.Owner->Status = Item::StatDone;
556 return false;
557 }
558
0a8a80e5 559 // Create a new item
7a1b1f8b
AL
560 QItem *Itm = new QItem;
561 *Itm = Item;
562 Itm->Next = 0;
563 *I = Itm;
0a8a80e5 564
8267fe24 565 Item.Owner->QueueCounter++;
93bf083d
AL
566 if (Items->Next == 0)
567 Cycle();
c03462c6 568 return true;
0a8a80e5
AL
569}
570 /*}}}*/
c88edf1d 571// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 572// ---------------------------------------------------------------------
b185acc2 573/* We return true if we hit something */
bfd22fc0 574bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 575{
b185acc2
AL
576 if (Owner->Status == pkgAcquire::Item::StatFetching)
577 return _error->Error("Tried to dequeue a fetching object");
578
bfd22fc0
AL
579 bool Res = false;
580
0a8a80e5
AL
581 QItem **I = &Items;
582 for (; *I != 0;)
583 {
584 if ((*I)->Owner == Owner)
585 {
586 QItem *Jnk= *I;
587 *I = (*I)->Next;
588 Owner->QueueCounter--;
589 delete Jnk;
bfd22fc0 590 Res = true;
0a8a80e5
AL
591 }
592 else
593 I = &(*I)->Next;
594 }
bfd22fc0
AL
595
596 return Res;
0a8a80e5
AL
597}
598 /*}}}*/
599// Queue::Startup - Start the worker processes /*{{{*/
600// ---------------------------------------------------------------------
8e5fc8f5
AL
601/* It is possible for this to be called with a pre-existing set of
602 workers. */
0a8a80e5
AL
603bool pkgAcquire::Queue::Startup()
604{
8e5fc8f5
AL
605 if (Workers == 0)
606 {
607 URI U(Name);
608 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
609 if (Cnf == 0)
610 return false;
611
612 Workers = new Worker(this,Cnf,Owner->Log);
613 Owner->Add(Workers);
614 if (Workers->Start() == false)
615 return false;
616
617 /* When pipelining we commit 10 items. This needs to change when we
618 added other source retry to have cycle maintain a pipeline depth
619 on its own. */
620 if (Cnf->Pipeline == true)
7a59dff6 621 MaxPipeDepth = 1000;
8e5fc8f5
AL
622 else
623 MaxPipeDepth = 1;
624 }
5cb5d8dc 625
93bf083d 626 return Cycle();
0a8a80e5
AL
627}
628 /*}}}*/
629// Queue::Shutdown - Shutdown the worker processes /*{{{*/
630// ---------------------------------------------------------------------
8e5fc8f5
AL
631/* If final is true then all workers are eliminated, otherwise only workers
632 that do not need cleanup are removed */
633bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
634{
635 // Delete all of the workers
8e5fc8f5
AL
636 pkgAcquire::Worker **Cur = &Workers;
637 while (*Cur != 0)
0a8a80e5 638 {
8e5fc8f5
AL
639 pkgAcquire::Worker *Jnk = *Cur;
640 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
641 {
642 *Cur = Jnk->NextQueue;
643 Owner->Remove(Jnk);
644 delete Jnk;
645 }
646 else
647 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
648 }
649
650 return true;
3b5421b4
AL
651}
652 /*}}}*/
7d8afa39 653// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
654// ---------------------------------------------------------------------
655/* */
656pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
657{
658 for (QItem *I = Items; I != 0; I = I->Next)
659 if (I->URI == URI && I->Worker == Owner)
660 return I;
661 return 0;
662}
663 /*}}}*/
664// Queue::ItemDone - Item has been completed /*{{{*/
665// ---------------------------------------------------------------------
666/* The worker signals this which causes the item to be removed from the
93bf083d
AL
667 queue. If this is the last queue instance then it is removed from the
668 main queue too.*/
c88edf1d
AL
669bool pkgAcquire::Queue::ItemDone(QItem *Itm)
670{
b185acc2 671 PipeDepth--;
db890fdb
AL
672 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
673 Itm->Owner->Status = pkgAcquire::Item::StatDone;
674
93bf083d
AL
675 if (Itm->Owner->QueueCounter <= 1)
676 Owner->Dequeue(Itm->Owner);
677 else
678 {
679 Dequeue(Itm->Owner);
680 Owner->Bump();
681 }
c88edf1d 682
93bf083d
AL
683 return Cycle();
684}
685 /*}}}*/
686// Queue::Cycle - Queue new items into the method /*{{{*/
687// ---------------------------------------------------------------------
b185acc2
AL
688/* This locates a new idle item and sends it to the worker. If pipelining
689 is enabled then it keeps the pipe full. */
93bf083d
AL
690bool pkgAcquire::Queue::Cycle()
691{
692 if (Items == 0 || Workers == 0)
c88edf1d
AL
693 return true;
694
e7432370
AL
695 if (PipeDepth < 0)
696 return _error->Error("Pipedepth failure");
697
93bf083d
AL
698 // Look for a queable item
699 QItem *I = Items;
e7432370 700 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
701 {
702 for (; I != 0; I = I->Next)
703 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
704 break;
705
706 // Nothing to do, queue is idle.
707 if (I == 0)
708 return true;
709
710 I->Worker = Workers;
711 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 712 PipeDepth++;
b185acc2
AL
713 if (Workers->QueueItem(I) == false)
714 return false;
715 }
93bf083d 716
b185acc2 717 return true;
c88edf1d
AL
718}
719 /*}}}*/
be4401bf
AL
720// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
721// ---------------------------------------------------------------------
b185acc2 722/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
723void pkgAcquire::Queue::Bump()
724{
b185acc2 725 Cycle();
be4401bf
AL
726}
727 /*}}}*/
b98f2859
AL
728// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
729// ---------------------------------------------------------------------
730/* */
c5ccf175 731pkgAcquireStatus::pkgAcquireStatus() : Update(true), MorePulses(false)
b98f2859
AL
732{
733 Start();
734}
735 /*}}}*/
736// AcquireStatus::Pulse - Called periodically /*{{{*/
737// ---------------------------------------------------------------------
738/* This computes some internal state variables for the derived classes to
739 use. It generates the current downloaded bytes and total bytes to download
740 as well as the current CPS estimate. */
024d1123 741bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
742{
743 TotalBytes = 0;
744 CurrentBytes = 0;
d568ed2d
AL
745 TotalItems = 0;
746 CurrentItems = 0;
b98f2859
AL
747
748 // Compute the total number of bytes to fetch
749 unsigned int Unknown = 0;
750 unsigned int Count = 0;
b4fc9b6f 751 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
b98f2859
AL
752 I++, Count++)
753 {
d568ed2d
AL
754 TotalItems++;
755 if ((*I)->Status == pkgAcquire::Item::StatDone)
756 CurrentItems++;
757
a6568219
AL
758 // Totally ignore local items
759 if ((*I)->Local == true)
760 continue;
b2e465d6 761
b98f2859
AL
762 TotalBytes += (*I)->FileSize;
763 if ((*I)->Complete == true)
764 CurrentBytes += (*I)->FileSize;
765 if ((*I)->FileSize == 0 && (*I)->Complete == false)
766 Unknown++;
767 }
768
769 // Compute the current completion
aa0e1101 770 unsigned long ResumeSize = 0;
b98f2859
AL
771 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
772 I = Owner->WorkerStep(I))
773 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
774 {
775 CurrentBytes += I->CurrentSize;
776 ResumeSize += I->ResumePoint;
777
778 // Files with unknown size always have 100% completion
779 if (I->CurrentItem->Owner->FileSize == 0 &&
780 I->CurrentItem->Owner->Complete == false)
781 TotalBytes += I->CurrentSize;
782 }
783
b98f2859
AL
784 // Normalize the figures and account for unknown size downloads
785 if (TotalBytes <= 0)
786 TotalBytes = 1;
787 if (Unknown == Count)
788 TotalBytes = Unknown;
18ef0a78
AL
789
790 // Wha?! Is not supposed to happen.
791 if (CurrentBytes > TotalBytes)
792 CurrentBytes = TotalBytes;
b98f2859
AL
793
794 // Compute the CPS
795 struct timeval NewTime;
796 gettimeofday(&NewTime,0);
2ec1674d 797 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
798 NewTime.tv_sec - Time.tv_sec > 6)
799 {
f17ac097
AL
800 double Delta = NewTime.tv_sec - Time.tv_sec +
801 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 802
b98f2859 803 // Compute the CPS value
f17ac097 804 if (Delta < 0.01)
e331f6ed
AL
805 CurrentCPS = 0;
806 else
aa0e1101
AL
807 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
808 LastBytes = CurrentBytes - ResumeSize;
6d5dd02a 809 ElapsedTime = (unsigned long)Delta;
b98f2859
AL
810 Time = NewTime;
811 }
024d1123 812
75ef8f14
MV
813 int fd = _config->FindI("APT::Status-Fd",-1);
814 if(fd > 0)
815 {
816 ostringstream status;
817
818 char msg[200];
819 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
820 unsigned long ETA =
821 (unsigned long)((TotalBytes - CurrentBytes) / CurrentCPS);
822
1e8b4c0f
MV
823 // only show the ETA if it makes sense
824 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 825 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 826 else
0c508b03 827 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
828
829
75ef8f14
MV
830
831 // build the status str
832 status << "dlstatus:" << i
833 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
834 << ":" << msg
835 << endl;
836 write(fd, status.str().c_str(), status.str().size());
837 }
838
024d1123 839 return true;
b98f2859
AL
840}
841 /*}}}*/
842// AcquireStatus::Start - Called when the download is started /*{{{*/
843// ---------------------------------------------------------------------
844/* We just reset the counters */
845void pkgAcquireStatus::Start()
846{
847 gettimeofday(&Time,0);
848 gettimeofday(&StartTime,0);
849 LastBytes = 0;
850 CurrentCPS = 0;
851 CurrentBytes = 0;
852 TotalBytes = 0;
853 FetchedBytes = 0;
854 ElapsedTime = 0;
d568ed2d
AL
855 TotalItems = 0;
856 CurrentItems = 0;
b98f2859
AL
857}
858 /*}}}*/
a6568219 859// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
860// ---------------------------------------------------------------------
861/* This accurately computes the elapsed time and the total overall CPS. */
862void pkgAcquireStatus::Stop()
863{
864 // Compute the CPS and elapsed time
865 struct timeval NewTime;
866 gettimeofday(&NewTime,0);
867
31a0531d
AL
868 double Delta = NewTime.tv_sec - StartTime.tv_sec +
869 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 870
b98f2859 871 // Compute the CPS value
31a0531d 872 if (Delta < 0.01)
e331f6ed
AL
873 CurrentCPS = 0;
874 else
31a0531d 875 CurrentCPS = FetchedBytes/Delta;
b98f2859 876 LastBytes = CurrentBytes;
31a0531d 877 ElapsedTime = (unsigned int)Delta;
b98f2859
AL
878}
879 /*}}}*/
880// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
881// ---------------------------------------------------------------------
882/* This is used to get accurate final transfer rate reporting. */
883void pkgAcquireStatus::Fetched(unsigned long Size,unsigned long Resume)
93274b8d 884{
b98f2859
AL
885 FetchedBytes += Size - Resume;
886}
887 /*}}}*/