]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
use free() instead of delete() when realloc is used
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
526334a0 34
7a7fa5f0 35#include <dirent.h>
8267fe24 36#include <sys/time.h>
453b82a3 37#include <sys/select.h>
524f8105 38#include <errno.h>
ea542140
DK
39
40#include <apti18n.h>
0118833a
AL
41 /*}}}*/
42
b4fc9b6f
AL
43using namespace std;
44
0118833a
AL
45// Acquire::pkgAcquire - Constructor /*{{{*/
46// ---------------------------------------------------------------------
93bf083d 47/* We grab some runtime state from the configuration space */
5efbd596 48pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 49 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 50 Running(false)
0118833a 51{
1cd1c398 52 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
53 if (strcasecmp(Mode.c_str(),"host") == 0)
54 QueueMode = QueueHost;
55 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
56 QueueMode = QueueAccess;
57}
5efbd596 58pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
59 Configs(0), Log(Progress), ToFetch(0),
60 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 61 Running(false)
1cd1c398
DK
62{
63 string const Mode = _config->Find("Acquire::Queue-Mode","host");
64 if (strcasecmp(Mode.c_str(),"host") == 0)
65 QueueMode = QueueHost;
66 if (strcasecmp(Mode.c_str(),"access") == 0)
67 QueueMode = QueueAccess;
68 Setup(Progress, "");
69}
70 /*}}}*/
71// Acquire::Setup - Delayed Constructor /*{{{*/
72// ---------------------------------------------------------------------
73/* Do everything needed to be a complete Acquire object and report the
74 success (or failure) back so the user knows that something is wrong… */
75bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
76{
77 Log = Progress;
0a8a80e5 78
1cd1c398 79 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
80 string const listDir = _config->FindDir("Dir::State::lists");
81 string const partialListDir = listDir + "partial/";
82 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
83 string const partialArchivesDir = archivesDir + "partial/";
84
7753e468
DK
85 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
86 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
87 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
88
7753e468
DK
89 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
90 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 91 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
92
93 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
94 return true;
95
96 // Lock the directory this acquire object will work in
97 LockFD = GetLock(flCombine(Lock, "lock"));
98 if (LockFD == -1)
99 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
100
101 return true;
102}
103 /*}}}*/
0118833a
AL
104// Acquire::~pkgAcquire - Destructor /*{{{*/
105// ---------------------------------------------------------------------
93bf083d 106/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
107pkgAcquire::~pkgAcquire()
108{
459681d3 109 Shutdown();
1cd1c398
DK
110
111 if (LockFD != -1)
112 close(LockFD);
113
3b5421b4
AL
114 while (Configs != 0)
115 {
116 MethodConfig *Jnk = Configs;
117 Configs = Configs->Next;
118 delete Jnk;
119 }
281daf46
AL
120}
121 /*}}}*/
8e5fc8f5 122// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
123// ---------------------------------------------------------------------
124/* */
125void pkgAcquire::Shutdown()
126{
f7f0d6c7 127 while (Items.empty() == false)
1b480911
AL
128 {
129 if (Items[0]->Status == Item::StatFetching)
130 Items[0]->Status = Item::StatError;
281daf46 131 delete Items[0];
1b480911 132 }
0a8a80e5
AL
133
134 while (Queues != 0)
135 {
136 Queue *Jnk = Queues;
137 Queues = Queues->Next;
138 delete Jnk;
139 }
0118833a
AL
140}
141 /*}}}*/
142// Acquire::Add - Add a new item /*{{{*/
143// ---------------------------------------------------------------------
93bf083d
AL
144/* This puts an item on the acquire list. This list is mainly for tracking
145 item status */
0118833a
AL
146void pkgAcquire::Add(Item *Itm)
147{
148 Items.push_back(Itm);
149}
150 /*}}}*/
151// Acquire::Remove - Remove a item /*{{{*/
152// ---------------------------------------------------------------------
93bf083d 153/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
154void pkgAcquire::Remove(Item *Itm)
155{
a3eaf954
AL
156 Dequeue(Itm);
157
753b3525 158 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
159 {
160 if (*I == Itm)
b4fc9b6f 161 {
0118833a 162 Items.erase(I);
b4fc9b6f
AL
163 I = Items.begin();
164 }
753b3525 165 else
f7f0d6c7 166 ++I;
8267fe24 167 }
0118833a
AL
168}
169 /*}}}*/
0a8a80e5
AL
170// Acquire::Add - Add a worker /*{{{*/
171// ---------------------------------------------------------------------
93bf083d
AL
172/* A list of workers is kept so that the select loop can direct their FD
173 usage. */
0a8a80e5
AL
174void pkgAcquire::Add(Worker *Work)
175{
176 Work->NextAcquire = Workers;
177 Workers = Work;
178}
179 /*}}}*/
180// Acquire::Remove - Remove a worker /*{{{*/
181// ---------------------------------------------------------------------
93bf083d
AL
182/* A worker has died. This can not be done while the select loop is running
183 as it would require that RunFds could handling a changing list state and
1e3f4083 184 it can't.. */
0a8a80e5
AL
185void pkgAcquire::Remove(Worker *Work)
186{
93bf083d
AL
187 if (Running == true)
188 abort();
189
0a8a80e5
AL
190 Worker **I = &Workers;
191 for (; *I != 0;)
192 {
193 if (*I == Work)
194 *I = (*I)->NextAcquire;
195 else
196 I = &(*I)->NextAcquire;
197 }
198}
199 /*}}}*/
0118833a
AL
200// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
201// ---------------------------------------------------------------------
93bf083d 202/* This is the entry point for an item. An item calls this function when
281daf46 203 it is constructed which creates a queue (based on the current queue
93bf083d
AL
204 mode) and puts the item in that queue. If the system is running then
205 the queue might be started. */
8267fe24 206void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 207{
0a8a80e5 208 // Determine which queue to put the item in
e331f6ed
AL
209 const MethodConfig *Config;
210 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
211 if (Name.empty() == true)
212 return;
213
214 // Find the queue structure
215 Queue *I = Queues;
216 for (; I != 0 && I->Name != Name; I = I->Next);
217 if (I == 0)
218 {
219 I = new Queue(Name,this);
220 I->Next = Queues;
221 Queues = I;
93bf083d
AL
222
223 if (Running == true)
224 I->Startup();
0a8a80e5 225 }
bfd22fc0 226
e331f6ed
AL
227 // See if this is a local only URI
228 if (Config->LocalOnly == true && Item.Owner->Complete == false)
229 Item.Owner->Local = true;
8267fe24 230 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
231
232 // Queue it into the named queue
c03462c6
MV
233 if(I->Enqueue(Item))
234 ToFetch++;
235
0a8a80e5
AL
236 // Some trace stuff
237 if (Debug == true)
238 {
8267fe24
AL
239 clog << "Fetching " << Item.URI << endl;
240 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 241 clog << " Queue is: " << Name << endl;
0a8a80e5 242 }
3b5421b4
AL
243}
244 /*}}}*/
0a8a80e5 245// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 246// ---------------------------------------------------------------------
93bf083d
AL
247/* This is called when an item is finished being fetched. It removes it
248 from all the queues */
0a8a80e5
AL
249void pkgAcquire::Dequeue(Item *Itm)
250{
251 Queue *I = Queues;
bfd22fc0 252 bool Res = false;
93bf083d
AL
253 if (Debug == true)
254 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
255
256 for (; I != 0; I = I->Next)
257 {
258 if (I->Dequeue(Itm))
259 {
260 Res = true;
261 if (Debug == true)
262 clog << "Dequeued from " << I->Name << endl;
263 }
264 }
265
bfd22fc0
AL
266 if (Res == true)
267 ToFetch--;
0a8a80e5
AL
268}
269 /*}}}*/
270// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
271// ---------------------------------------------------------------------
272/* The string returned depends on the configuration settings and the
273 method parameters. Given something like http://foo.org/bar it can
274 return http://foo.org or http */
e331f6ed 275string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 276{
93bf083d
AL
277 URI U(Uri);
278
e331f6ed 279 Config = GetConfig(U.Access);
0a8a80e5
AL
280 if (Config == 0)
281 return string();
282
283 /* Single-Instance methods get exactly one queue per URI. This is
284 also used for the Access queue method */
285 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
286 return U.Access;
287
288 string AccessSchema = U.Access + ':',
289 FullQueueName = AccessSchema + U.Host;
290 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
291
292 Queue *I = Queues;
293 for (; I != 0; I = I->Next) {
294 // if the queue already exists, re-use it
295 if (I->Name == FullQueueName)
296 return FullQueueName;
297
298 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
299 Instances++;
300 }
301
302 if (Debug) {
303 clog << "Found " << Instances << " instances of " << U.Access << endl;
304 }
305
306 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
307 return U.Access;
93bf083d 308
5674f6b3 309 return FullQueueName;
0118833a
AL
310}
311 /*}}}*/
3b5421b4
AL
312// Acquire::GetConfig - Fetch the configuration information /*{{{*/
313// ---------------------------------------------------------------------
314/* This locates the configuration structure for an access method. If
315 a config structure cannot be found a Worker will be created to
316 retrieve it */
0a8a80e5 317pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
318{
319 // Search for an existing config
320 MethodConfig *Conf;
321 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
322 if (Conf->Access == Access)
323 return Conf;
324
325 // Create the new config class
326 Conf = new MethodConfig;
327 Conf->Access = Access;
328 Conf->Next = Configs;
329 Configs = Conf;
0118833a 330
3b5421b4
AL
331 // Create the worker to fetch the configuration
332 Worker Work(Conf);
333 if (Work.Start() == false)
334 return 0;
7c6e2dc7
MV
335
336 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 337 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
338 Conf->SingleInstance = true;
339
3b5421b4
AL
340 return Conf;
341}
342 /*}}}*/
0a8a80e5
AL
343// Acquire::SetFds - Deal with readable FDs /*{{{*/
344// ---------------------------------------------------------------------
345/* Collect FDs that have activity monitors into the fd sets */
346void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
347{
348 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
349 {
350 if (I->InReady == true && I->InFd >= 0)
351 {
352 if (Fd < I->InFd)
353 Fd = I->InFd;
354 FD_SET(I->InFd,RSet);
355 }
356 if (I->OutReady == true && I->OutFd >= 0)
357 {
358 if (Fd < I->OutFd)
359 Fd = I->OutFd;
360 FD_SET(I->OutFd,WSet);
361 }
362 }
363}
364 /*}}}*/
365// Acquire::RunFds - Deal with active FDs /*{{{*/
366// ---------------------------------------------------------------------
93bf083d
AL
367/* Dispatch active FDs over to the proper workers. It is very important
368 that a worker never be erased while this is running! The queue class
369 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
370void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
371{
372 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
373 {
374 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
375 I->InFdReady();
376 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
377 I->OutFdReady();
378 }
379}
380 /*}}}*/
381// Acquire::Run - Run the fetch sequence /*{{{*/
382// ---------------------------------------------------------------------
383/* This runs the queues. It manages a select loop for all of the
384 Worker tasks. The workers interact with the queues and items to
385 manage the actual fetch. */
1c5f7e5f 386pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 387{
8b89e57f
AL
388 Running = true;
389
0a8a80e5
AL
390 for (Queue *I = Queues; I != 0; I = I->Next)
391 I->Startup();
392
b98f2859
AL
393 if (Log != 0)
394 Log->Start();
395
024d1123
AL
396 bool WasCancelled = false;
397
0a8a80e5 398 // Run till all things have been acquired
8267fe24
AL
399 struct timeval tv;
400 tv.tv_sec = 0;
1c5f7e5f 401 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
402 while (ToFetch > 0)
403 {
404 fd_set RFds;
405 fd_set WFds;
406 int Highest = 0;
407 FD_ZERO(&RFds);
408 FD_ZERO(&WFds);
409 SetFds(Highest,&RFds,&WFds);
410
b0db36b1
AL
411 int Res;
412 do
413 {
414 Res = select(Highest+1,&RFds,&WFds,0,&tv);
415 }
416 while (Res < 0 && errno == EINTR);
417
8267fe24 418 if (Res < 0)
8b89e57f 419 {
8267fe24
AL
420 _error->Errno("select","Select has failed");
421 break;
8b89e57f 422 }
93bf083d 423
0a8a80e5 424 RunFds(&RFds,&WFds);
93bf083d
AL
425 if (_error->PendingError() == true)
426 break;
8267fe24
AL
427
428 // Timeout, notify the log class
429 if (Res == 0 || (Log != 0 && Log->Update == true))
430 {
1c5f7e5f 431 tv.tv_usec = PulseIntervall;
8267fe24
AL
432 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
433 I->Pulse();
024d1123
AL
434 if (Log != 0 && Log->Pulse(this) == false)
435 {
436 WasCancelled = true;
437 break;
438 }
8267fe24 439 }
0a8a80e5 440 }
be4401bf 441
b98f2859
AL
442 if (Log != 0)
443 Log->Stop();
444
be4401bf
AL
445 // Shut down the acquire bits
446 Running = false;
0a8a80e5 447 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 448 I->Shutdown(false);
0a8a80e5 449
ab559b35 450 // Shut down the items
f7f0d6c7 451 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 452 (*I)->Finished();
ab559b35 453
024d1123
AL
454 if (_error->PendingError())
455 return Failed;
456 if (WasCancelled)
457 return Cancelled;
458 return Continue;
93bf083d
AL
459}
460 /*}}}*/
be4401bf 461// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
462// ---------------------------------------------------------------------
463/* This routine bumps idle queues in hopes that they will be able to fetch
464 the dequeued item */
465void pkgAcquire::Bump()
466{
be4401bf
AL
467 for (Queue *I = Queues; I != 0; I = I->Next)
468 I->Bump();
0a8a80e5
AL
469}
470 /*}}}*/
8267fe24
AL
471// Acquire::WorkerStep - Step to the next worker /*{{{*/
472// ---------------------------------------------------------------------
473/* Not inlined to advoid including acquire-worker.h */
474pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
475{
476 return I->NextAcquire;
d3e8fbb3 477}
8267fe24 478 /*}}}*/
a6568219 479// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
480// ---------------------------------------------------------------------
481/* This is a bit simplistic, it looks at every file in the dir and sees
482 if it is part of the download set. */
483bool pkgAcquire::Clean(string Dir)
484{
95b5f6c1
DK
485 // non-existing directories are by definition clean…
486 if (DirectoryExists(Dir) == false)
487 return true;
488
7a7fa5f0
AL
489 DIR *D = opendir(Dir.c_str());
490 if (D == 0)
b2e465d6 491 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
492
493 string StartDir = SafeGetCWD();
494 if (chdir(Dir.c_str()) != 0)
495 {
496 closedir(D);
b2e465d6 497 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
498 }
499
500 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
501 {
502 // Skip some files..
503 if (strcmp(Dir->d_name,"lock") == 0 ||
504 strcmp(Dir->d_name,"partial") == 0 ||
505 strcmp(Dir->d_name,".") == 0 ||
506 strcmp(Dir->d_name,"..") == 0)
507 continue;
508
509 // Look in the get list
b4fc9b6f 510 ItemCIterator I = Items.begin();
f7f0d6c7 511 for (; I != Items.end(); ++I)
7a7fa5f0
AL
512 if (flNotDir((*I)->DestFile) == Dir->d_name)
513 break;
514
515 // Nothing found, nuke it
516 if (I == Items.end())
517 unlink(Dir->d_name);
518 };
519
7a7fa5f0 520 closedir(D);
3c8cda8b
MV
521 if (chdir(StartDir.c_str()) != 0)
522 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
523 return true;
524}
525 /*}}}*/
a6568219
AL
526// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
527// ---------------------------------------------------------------------
528/* This is the total number of bytes needed */
a02db58f 529APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 530{
a3c4c81a 531 unsigned long long Total = 0;
f7f0d6c7 532 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
533 Total += (*I)->FileSize;
534 return Total;
535}
536 /*}}}*/
537// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
538// ---------------------------------------------------------------------
539/* This is the number of bytes that is not local */
a02db58f 540APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 541{
a3c4c81a 542 unsigned long long Total = 0;
f7f0d6c7 543 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
544 if ((*I)->Local == false)
545 Total += (*I)->FileSize;
546 return Total;
547}
548 /*}}}*/
6b1ff003
AL
549// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
550// ---------------------------------------------------------------------
551/* This is the number of bytes that is not local */
a02db58f 552APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 553{
a3c4c81a 554 unsigned long long Total = 0;
f7f0d6c7 555 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
556 if ((*I)->Local == false)
557 Total += (*I)->PartialSize;
558 return Total;
559}
92fcbfc1 560 /*}}}*/
8e5fc8f5 561// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
562// ---------------------------------------------------------------------
563/* */
564pkgAcquire::UriIterator pkgAcquire::UriBegin()
565{
566 return UriIterator(Queues);
567}
568 /*}}}*/
8e5fc8f5 569// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
570// ---------------------------------------------------------------------
571/* */
572pkgAcquire::UriIterator pkgAcquire::UriEnd()
573{
574 return UriIterator(0);
575}
576 /*}}}*/
e331f6ed
AL
577// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
578// ---------------------------------------------------------------------
579/* */
580pkgAcquire::MethodConfig::MethodConfig()
581{
582 SingleInstance = false;
e331f6ed
AL
583 Pipeline = false;
584 SendConfig = false;
585 LocalOnly = false;
459681d3 586 Removable = false;
e331f6ed
AL
587 Next = 0;
588}
589 /*}}}*/
0a8a80e5
AL
590// Queue::Queue - Constructor /*{{{*/
591// ---------------------------------------------------------------------
592/* */
593pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
594 Owner(Owner)
595{
596 Items = 0;
597 Next = 0;
598 Workers = 0;
b185acc2
AL
599 MaxPipeDepth = 1;
600 PipeDepth = 0;
0a8a80e5
AL
601}
602 /*}}}*/
603// Queue::~Queue - Destructor /*{{{*/
604// ---------------------------------------------------------------------
605/* */
606pkgAcquire::Queue::~Queue()
607{
8e5fc8f5 608 Shutdown(true);
0a8a80e5
AL
609
610 while (Items != 0)
611 {
612 QItem *Jnk = Items;
613 Items = Items->Next;
614 delete Jnk;
615 }
616}
617 /*}}}*/
618// Queue::Enqueue - Queue an item to the queue /*{{{*/
619// ---------------------------------------------------------------------
620/* */
c03462c6 621bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 622{
7a1b1f8b 623 QItem **I = &Items;
c03462c6
MV
624 // move to the end of the queue and check for duplicates here
625 for (; *I != 0; I = &(*I)->Next)
626 if (Item.URI == (*I)->URI)
627 {
628 Item.Owner->Status = Item::StatDone;
629 return false;
630 }
631
0a8a80e5 632 // Create a new item
7a1b1f8b
AL
633 QItem *Itm = new QItem;
634 *Itm = Item;
635 Itm->Next = 0;
636 *I = Itm;
0a8a80e5 637
8267fe24 638 Item.Owner->QueueCounter++;
93bf083d
AL
639 if (Items->Next == 0)
640 Cycle();
c03462c6 641 return true;
0a8a80e5
AL
642}
643 /*}}}*/
c88edf1d 644// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 645// ---------------------------------------------------------------------
b185acc2 646/* We return true if we hit something */
bfd22fc0 647bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 648{
b185acc2
AL
649 if (Owner->Status == pkgAcquire::Item::StatFetching)
650 return _error->Error("Tried to dequeue a fetching object");
651
bfd22fc0
AL
652 bool Res = false;
653
0a8a80e5
AL
654 QItem **I = &Items;
655 for (; *I != 0;)
656 {
657 if ((*I)->Owner == Owner)
658 {
659 QItem *Jnk= *I;
660 *I = (*I)->Next;
661 Owner->QueueCounter--;
662 delete Jnk;
bfd22fc0 663 Res = true;
0a8a80e5
AL
664 }
665 else
666 I = &(*I)->Next;
667 }
bfd22fc0
AL
668
669 return Res;
0a8a80e5
AL
670}
671 /*}}}*/
672// Queue::Startup - Start the worker processes /*{{{*/
673// ---------------------------------------------------------------------
8e5fc8f5
AL
674/* It is possible for this to be called with a pre-existing set of
675 workers. */
0a8a80e5
AL
676bool pkgAcquire::Queue::Startup()
677{
8e5fc8f5
AL
678 if (Workers == 0)
679 {
680 URI U(Name);
681 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
682 if (Cnf == 0)
683 return false;
684
685 Workers = new Worker(this,Cnf,Owner->Log);
686 Owner->Add(Workers);
687 if (Workers->Start() == false)
688 return false;
689
690 /* When pipelining we commit 10 items. This needs to change when we
691 added other source retry to have cycle maintain a pipeline depth
692 on its own. */
693 if (Cnf->Pipeline == true)
6ce72612 694 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
695 else
696 MaxPipeDepth = 1;
697 }
5cb5d8dc 698
93bf083d 699 return Cycle();
0a8a80e5
AL
700}
701 /*}}}*/
702// Queue::Shutdown - Shutdown the worker processes /*{{{*/
703// ---------------------------------------------------------------------
8e5fc8f5
AL
704/* If final is true then all workers are eliminated, otherwise only workers
705 that do not need cleanup are removed */
706bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
707{
708 // Delete all of the workers
8e5fc8f5
AL
709 pkgAcquire::Worker **Cur = &Workers;
710 while (*Cur != 0)
0a8a80e5 711 {
8e5fc8f5
AL
712 pkgAcquire::Worker *Jnk = *Cur;
713 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
714 {
715 *Cur = Jnk->NextQueue;
716 Owner->Remove(Jnk);
717 delete Jnk;
718 }
719 else
720 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
721 }
722
723 return true;
3b5421b4
AL
724}
725 /*}}}*/
7d8afa39 726// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
727// ---------------------------------------------------------------------
728/* */
729pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
730{
731 for (QItem *I = Items; I != 0; I = I->Next)
732 if (I->URI == URI && I->Worker == Owner)
733 return I;
734 return 0;
735}
736 /*}}}*/
737// Queue::ItemDone - Item has been completed /*{{{*/
738// ---------------------------------------------------------------------
739/* The worker signals this which causes the item to be removed from the
93bf083d
AL
740 queue. If this is the last queue instance then it is removed from the
741 main queue too.*/
c88edf1d
AL
742bool pkgAcquire::Queue::ItemDone(QItem *Itm)
743{
b185acc2 744 PipeDepth--;
db890fdb
AL
745 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
746 Itm->Owner->Status = pkgAcquire::Item::StatDone;
747
93bf083d
AL
748 if (Itm->Owner->QueueCounter <= 1)
749 Owner->Dequeue(Itm->Owner);
750 else
751 {
752 Dequeue(Itm->Owner);
753 Owner->Bump();
754 }
c88edf1d 755
93bf083d
AL
756 return Cycle();
757}
758 /*}}}*/
759// Queue::Cycle - Queue new items into the method /*{{{*/
760// ---------------------------------------------------------------------
b185acc2
AL
761/* This locates a new idle item and sends it to the worker. If pipelining
762 is enabled then it keeps the pipe full. */
93bf083d
AL
763bool pkgAcquire::Queue::Cycle()
764{
765 if (Items == 0 || Workers == 0)
c88edf1d
AL
766 return true;
767
e7432370
AL
768 if (PipeDepth < 0)
769 return _error->Error("Pipedepth failure");
770
93bf083d
AL
771 // Look for a queable item
772 QItem *I = Items;
e7432370 773 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
774 {
775 for (; I != 0; I = I->Next)
776 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
777 break;
778
779 // Nothing to do, queue is idle.
780 if (I == 0)
781 return true;
782
783 I->Worker = Workers;
784 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 785 PipeDepth++;
b185acc2
AL
786 if (Workers->QueueItem(I) == false)
787 return false;
788 }
93bf083d 789
b185acc2 790 return true;
c88edf1d
AL
791}
792 /*}}}*/
be4401bf
AL
793// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
794// ---------------------------------------------------------------------
b185acc2 795/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
796void pkgAcquire::Queue::Bump()
797{
b185acc2 798 Cycle();
be4401bf
AL
799}
800 /*}}}*/
b98f2859
AL
801// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
802// ---------------------------------------------------------------------
803/* */
dcaa1185 804pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
805{
806 Start();
807}
808 /*}}}*/
809// AcquireStatus::Pulse - Called periodically /*{{{*/
810// ---------------------------------------------------------------------
811/* This computes some internal state variables for the derived classes to
812 use. It generates the current downloaded bytes and total bytes to download
813 as well as the current CPS estimate. */
024d1123 814bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
815{
816 TotalBytes = 0;
817 CurrentBytes = 0;
d568ed2d
AL
818 TotalItems = 0;
819 CurrentItems = 0;
b98f2859
AL
820
821 // Compute the total number of bytes to fetch
822 unsigned int Unknown = 0;
823 unsigned int Count = 0;
b4fc9b6f 824 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
f7f0d6c7 825 ++I, ++Count)
b98f2859 826 {
d568ed2d
AL
827 TotalItems++;
828 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 829 ++CurrentItems;
d568ed2d 830
a6568219
AL
831 // Totally ignore local items
832 if ((*I)->Local == true)
833 continue;
b2e465d6 834
b98f2859
AL
835 TotalBytes += (*I)->FileSize;
836 if ((*I)->Complete == true)
837 CurrentBytes += (*I)->FileSize;
838 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 839 ++Unknown;
b98f2859
AL
840 }
841
842 // Compute the current completion
dbbc5494 843 unsigned long long ResumeSize = 0;
b98f2859
AL
844 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
845 I = Owner->WorkerStep(I))
846 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
847 {
848 CurrentBytes += I->CurrentSize;
849 ResumeSize += I->ResumePoint;
850
851 // Files with unknown size always have 100% completion
852 if (I->CurrentItem->Owner->FileSize == 0 &&
853 I->CurrentItem->Owner->Complete == false)
854 TotalBytes += I->CurrentSize;
855 }
856
b98f2859
AL
857 // Normalize the figures and account for unknown size downloads
858 if (TotalBytes <= 0)
859 TotalBytes = 1;
860 if (Unknown == Count)
861 TotalBytes = Unknown;
18ef0a78
AL
862
863 // Wha?! Is not supposed to happen.
864 if (CurrentBytes > TotalBytes)
865 CurrentBytes = TotalBytes;
b98f2859
AL
866
867 // Compute the CPS
868 struct timeval NewTime;
869 gettimeofday(&NewTime,0);
2ec1674d 870 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
871 NewTime.tv_sec - Time.tv_sec > 6)
872 {
f17ac097
AL
873 double Delta = NewTime.tv_sec - Time.tv_sec +
874 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 875
b98f2859 876 // Compute the CPS value
f17ac097 877 if (Delta < 0.01)
e331f6ed
AL
878 CurrentCPS = 0;
879 else
aa0e1101
AL
880 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
881 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 882 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
883 Time = NewTime;
884 }
024d1123 885
75ef8f14
MV
886 int fd = _config->FindI("APT::Status-Fd",-1);
887 if(fd > 0)
888 {
889 ostringstream status;
890
891 char msg[200];
892 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
893 unsigned long long ETA = 0;
894 if(CurrentCPS > 0)
895 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 896
1e8b4c0f
MV
897 // only show the ETA if it makes sense
898 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 899 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 900 else
0c508b03 901 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
902
903
75ef8f14
MV
904
905 // build the status str
906 status << "dlstatus:" << i
907 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
908 << ":" << msg
909 << endl;
31bda500
DK
910
911 std::string const dlstatus = status.str();
d68d65ad 912 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
913 }
914
024d1123 915 return true;
b98f2859
AL
916}
917 /*}}}*/
918// AcquireStatus::Start - Called when the download is started /*{{{*/
919// ---------------------------------------------------------------------
920/* We just reset the counters */
921void pkgAcquireStatus::Start()
922{
923 gettimeofday(&Time,0);
924 gettimeofday(&StartTime,0);
925 LastBytes = 0;
926 CurrentCPS = 0;
927 CurrentBytes = 0;
928 TotalBytes = 0;
929 FetchedBytes = 0;
930 ElapsedTime = 0;
d568ed2d
AL
931 TotalItems = 0;
932 CurrentItems = 0;
b98f2859
AL
933}
934 /*}}}*/
a6568219 935// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
936// ---------------------------------------------------------------------
937/* This accurately computes the elapsed time and the total overall CPS. */
938void pkgAcquireStatus::Stop()
939{
940 // Compute the CPS and elapsed time
941 struct timeval NewTime;
942 gettimeofday(&NewTime,0);
943
31a0531d
AL
944 double Delta = NewTime.tv_sec - StartTime.tv_sec +
945 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 946
b98f2859 947 // Compute the CPS value
31a0531d 948 if (Delta < 0.01)
e331f6ed
AL
949 CurrentCPS = 0;
950 else
31a0531d 951 CurrentCPS = FetchedBytes/Delta;
b98f2859 952 LastBytes = CurrentBytes;
dbbc5494 953 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
954}
955 /*}}}*/
956// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
957// ---------------------------------------------------------------------
958/* This is used to get accurate final transfer rate reporting. */
73da43e9 959void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 960{
b98f2859
AL
961 FetchedBytes += Size - Resume;
962}
963 /*}}}*/