]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Rework pkgAcqMeta{Index,Sig,ClearSig}::Done() for readability
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
96c6cab1 34#include <iomanip>
526334a0 35
7a7fa5f0 36#include <dirent.h>
8267fe24 37#include <sys/time.h>
453b82a3 38#include <sys/select.h>
524f8105 39#include <errno.h>
56472095 40#include <sys/stat.h>
ea542140
DK
41
42#include <apti18n.h>
0118833a
AL
43 /*}}}*/
44
b4fc9b6f
AL
45using namespace std;
46
0118833a
AL
47// Acquire::pkgAcquire - Constructor /*{{{*/
48// ---------------------------------------------------------------------
93bf083d 49/* We grab some runtime state from the configuration space */
5efbd596 50pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 51 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 52 Running(false)
0118833a 53{
1cd1c398 54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
58 QueueMode = QueueAccess;
59}
5efbd596 60pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 63 Running(false)
1cd1c398
DK
64{
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71}
72 /*}}}*/
73// Acquire::Setup - Delayed Constructor /*{{{*/
74// ---------------------------------------------------------------------
75/* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
43acd019
DK
77bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock,
78 bool const createDirectories)
1cd1c398
DK
79{
80 Log = Progress;
0a8a80e5 81
1cd1c398 82 // check for existence and possibly create auxiliary directories
43acd019
DK
83 if (createDirectories == true)
84 {
85 string const listDir = _config->FindDir("Dir::State::lists");
86 string const partialListDir = listDir + "partial/";
87 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
88 string const partialArchivesDir = archivesDir + "partial/";
9c2c9c24 89
43acd019
DK
90 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
91 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
92 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
9c2c9c24 93
43acd019
DK
94 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
95 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
96 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
97 }
1cd1c398
DK
98
99 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
100 return true;
101
102 // Lock the directory this acquire object will work in
103 LockFD = GetLock(flCombine(Lock, "lock"));
104 if (LockFD == -1)
105 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
106
107 return true;
108}
109 /*}}}*/
0118833a
AL
110// Acquire::~pkgAcquire - Destructor /*{{{*/
111// ---------------------------------------------------------------------
93bf083d 112/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
113pkgAcquire::~pkgAcquire()
114{
459681d3 115 Shutdown();
1cd1c398
DK
116
117 if (LockFD != -1)
118 close(LockFD);
119
3b5421b4
AL
120 while (Configs != 0)
121 {
122 MethodConfig *Jnk = Configs;
123 Configs = Configs->Next;
124 delete Jnk;
125 }
281daf46
AL
126}
127 /*}}}*/
8e5fc8f5 128// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
129// ---------------------------------------------------------------------
130/* */
131void pkgAcquire::Shutdown()
132{
f7f0d6c7 133 while (Items.empty() == false)
1b480911
AL
134 {
135 if (Items[0]->Status == Item::StatFetching)
136 Items[0]->Status = Item::StatError;
281daf46 137 delete Items[0];
1b480911 138 }
0a8a80e5
AL
139
140 while (Queues != 0)
141 {
142 Queue *Jnk = Queues;
143 Queues = Queues->Next;
144 delete Jnk;
145 }
0118833a
AL
146}
147 /*}}}*/
148// Acquire::Add - Add a new item /*{{{*/
149// ---------------------------------------------------------------------
93bf083d
AL
150/* This puts an item on the acquire list. This list is mainly for tracking
151 item status */
0118833a
AL
152void pkgAcquire::Add(Item *Itm)
153{
154 Items.push_back(Itm);
155}
156 /*}}}*/
157// Acquire::Remove - Remove a item /*{{{*/
158// ---------------------------------------------------------------------
93bf083d 159/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
160void pkgAcquire::Remove(Item *Itm)
161{
a3eaf954
AL
162 Dequeue(Itm);
163
753b3525 164 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
165 {
166 if (*I == Itm)
b4fc9b6f 167 {
0118833a 168 Items.erase(I);
b4fc9b6f
AL
169 I = Items.begin();
170 }
753b3525 171 else
f7f0d6c7 172 ++I;
8267fe24 173 }
0118833a
AL
174}
175 /*}}}*/
0a8a80e5
AL
176// Acquire::Add - Add a worker /*{{{*/
177// ---------------------------------------------------------------------
93bf083d
AL
178/* A list of workers is kept so that the select loop can direct their FD
179 usage. */
0a8a80e5
AL
180void pkgAcquire::Add(Worker *Work)
181{
182 Work->NextAcquire = Workers;
183 Workers = Work;
184}
185 /*}}}*/
186// Acquire::Remove - Remove a worker /*{{{*/
187// ---------------------------------------------------------------------
93bf083d
AL
188/* A worker has died. This can not be done while the select loop is running
189 as it would require that RunFds could handling a changing list state and
1e3f4083 190 it can't.. */
0a8a80e5
AL
191void pkgAcquire::Remove(Worker *Work)
192{
93bf083d
AL
193 if (Running == true)
194 abort();
195
0a8a80e5
AL
196 Worker **I = &Workers;
197 for (; *I != 0;)
198 {
199 if (*I == Work)
200 *I = (*I)->NextAcquire;
201 else
202 I = &(*I)->NextAcquire;
203 }
204}
205 /*}}}*/
0118833a
AL
206// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
207// ---------------------------------------------------------------------
93bf083d 208/* This is the entry point for an item. An item calls this function when
281daf46 209 it is constructed which creates a queue (based on the current queue
93bf083d
AL
210 mode) and puts the item in that queue. If the system is running then
211 the queue might be started. */
8267fe24 212void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 213{
0a8a80e5 214 // Determine which queue to put the item in
e331f6ed
AL
215 const MethodConfig *Config;
216 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
217 if (Name.empty() == true)
218 return;
219
220 // Find the queue structure
221 Queue *I = Queues;
222 for (; I != 0 && I->Name != Name; I = I->Next);
223 if (I == 0)
224 {
225 I = new Queue(Name,this);
226 I->Next = Queues;
227 Queues = I;
93bf083d
AL
228
229 if (Running == true)
230 I->Startup();
0a8a80e5 231 }
bfd22fc0 232
e331f6ed
AL
233 // See if this is a local only URI
234 if (Config->LocalOnly == true && Item.Owner->Complete == false)
235 Item.Owner->Local = true;
8267fe24 236 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
237
238 // Queue it into the named queue
c03462c6
MV
239 if(I->Enqueue(Item))
240 ToFetch++;
241
0a8a80e5
AL
242 // Some trace stuff
243 if (Debug == true)
244 {
8267fe24
AL
245 clog << "Fetching " << Item.URI << endl;
246 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 247 clog << " Queue is: " << Name << endl;
0a8a80e5 248 }
3b5421b4
AL
249}
250 /*}}}*/
0a8a80e5 251// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 252// ---------------------------------------------------------------------
93bf083d
AL
253/* This is called when an item is finished being fetched. It removes it
254 from all the queues */
0a8a80e5
AL
255void pkgAcquire::Dequeue(Item *Itm)
256{
257 Queue *I = Queues;
bfd22fc0 258 bool Res = false;
93bf083d
AL
259 if (Debug == true)
260 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
261
262 for (; I != 0; I = I->Next)
263 {
264 if (I->Dequeue(Itm))
265 {
266 Res = true;
267 if (Debug == true)
268 clog << "Dequeued from " << I->Name << endl;
269 }
270 }
271
bfd22fc0
AL
272 if (Res == true)
273 ToFetch--;
0a8a80e5
AL
274}
275 /*}}}*/
276// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
277// ---------------------------------------------------------------------
278/* The string returned depends on the configuration settings and the
279 method parameters. Given something like http://foo.org/bar it can
280 return http://foo.org or http */
e331f6ed 281string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 282{
93bf083d
AL
283 URI U(Uri);
284
e331f6ed 285 Config = GetConfig(U.Access);
0a8a80e5
AL
286 if (Config == 0)
287 return string();
288
289 /* Single-Instance methods get exactly one queue per URI. This is
290 also used for the Access queue method */
291 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
292 return U.Access;
293
294 string AccessSchema = U.Access + ':',
295 FullQueueName = AccessSchema + U.Host;
296 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
297
298 Queue *I = Queues;
299 for (; I != 0; I = I->Next) {
300 // if the queue already exists, re-use it
301 if (I->Name == FullQueueName)
302 return FullQueueName;
303
304 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
305 Instances++;
306 }
307
308 if (Debug) {
309 clog << "Found " << Instances << " instances of " << U.Access << endl;
310 }
311
312 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
313 return U.Access;
93bf083d 314
5674f6b3 315 return FullQueueName;
0118833a
AL
316}
317 /*}}}*/
3b5421b4
AL
318// Acquire::GetConfig - Fetch the configuration information /*{{{*/
319// ---------------------------------------------------------------------
320/* This locates the configuration structure for an access method. If
321 a config structure cannot be found a Worker will be created to
322 retrieve it */
0a8a80e5 323pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
324{
325 // Search for an existing config
326 MethodConfig *Conf;
327 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
328 if (Conf->Access == Access)
329 return Conf;
330
331 // Create the new config class
332 Conf = new MethodConfig;
333 Conf->Access = Access;
334 Conf->Next = Configs;
335 Configs = Conf;
0118833a 336
3b5421b4
AL
337 // Create the worker to fetch the configuration
338 Worker Work(Conf);
339 if (Work.Start() == false)
340 return 0;
7c6e2dc7
MV
341
342 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 343 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
344 Conf->SingleInstance = true;
345
3b5421b4
AL
346 return Conf;
347}
348 /*}}}*/
0a8a80e5
AL
349// Acquire::SetFds - Deal with readable FDs /*{{{*/
350// ---------------------------------------------------------------------
351/* Collect FDs that have activity monitors into the fd sets */
352void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
353{
354 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
355 {
356 if (I->InReady == true && I->InFd >= 0)
357 {
358 if (Fd < I->InFd)
359 Fd = I->InFd;
360 FD_SET(I->InFd,RSet);
361 }
362 if (I->OutReady == true && I->OutFd >= 0)
363 {
364 if (Fd < I->OutFd)
365 Fd = I->OutFd;
366 FD_SET(I->OutFd,WSet);
367 }
368 }
369}
370 /*}}}*/
371// Acquire::RunFds - Deal with active FDs /*{{{*/
372// ---------------------------------------------------------------------
93bf083d
AL
373/* Dispatch active FDs over to the proper workers. It is very important
374 that a worker never be erased while this is running! The queue class
375 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
376void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
377{
378 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
379 {
380 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
381 I->InFdReady();
382 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
383 I->OutFdReady();
384 }
385}
386 /*}}}*/
387// Acquire::Run - Run the fetch sequence /*{{{*/
388// ---------------------------------------------------------------------
389/* This runs the queues. It manages a select loop for all of the
390 Worker tasks. The workers interact with the queues and items to
391 manage the actual fetch. */
1c5f7e5f 392pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 393{
8b89e57f
AL
394 Running = true;
395
0a8a80e5
AL
396 for (Queue *I = Queues; I != 0; I = I->Next)
397 I->Startup();
398
b98f2859
AL
399 if (Log != 0)
400 Log->Start();
401
024d1123
AL
402 bool WasCancelled = false;
403
0a8a80e5 404 // Run till all things have been acquired
8267fe24
AL
405 struct timeval tv;
406 tv.tv_sec = 0;
1c5f7e5f 407 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
408 while (ToFetch > 0)
409 {
410 fd_set RFds;
411 fd_set WFds;
412 int Highest = 0;
413 FD_ZERO(&RFds);
414 FD_ZERO(&WFds);
415 SetFds(Highest,&RFds,&WFds);
416
b0db36b1
AL
417 int Res;
418 do
419 {
420 Res = select(Highest+1,&RFds,&WFds,0,&tv);
421 }
422 while (Res < 0 && errno == EINTR);
423
8267fe24 424 if (Res < 0)
8b89e57f 425 {
8267fe24
AL
426 _error->Errno("select","Select has failed");
427 break;
8b89e57f 428 }
93bf083d 429
0a8a80e5 430 RunFds(&RFds,&WFds);
93bf083d
AL
431 if (_error->PendingError() == true)
432 break;
8267fe24
AL
433
434 // Timeout, notify the log class
435 if (Res == 0 || (Log != 0 && Log->Update == true))
436 {
1c5f7e5f 437 tv.tv_usec = PulseIntervall;
8267fe24
AL
438 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
439 I->Pulse();
024d1123
AL
440 if (Log != 0 && Log->Pulse(this) == false)
441 {
442 WasCancelled = true;
443 break;
444 }
8267fe24 445 }
0a8a80e5 446 }
be4401bf 447
b98f2859
AL
448 if (Log != 0)
449 Log->Stop();
450
be4401bf
AL
451 // Shut down the acquire bits
452 Running = false;
0a8a80e5 453 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 454 I->Shutdown(false);
0a8a80e5 455
ab559b35 456 // Shut down the items
f7f0d6c7 457 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 458 (*I)->Finished();
ab559b35 459
024d1123
AL
460 if (_error->PendingError())
461 return Failed;
462 if (WasCancelled)
463 return Cancelled;
464 return Continue;
93bf083d
AL
465}
466 /*}}}*/
be4401bf 467// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
468// ---------------------------------------------------------------------
469/* This routine bumps idle queues in hopes that they will be able to fetch
470 the dequeued item */
471void pkgAcquire::Bump()
472{
be4401bf
AL
473 for (Queue *I = Queues; I != 0; I = I->Next)
474 I->Bump();
0a8a80e5
AL
475}
476 /*}}}*/
8267fe24
AL
477// Acquire::WorkerStep - Step to the next worker /*{{{*/
478// ---------------------------------------------------------------------
479/* Not inlined to advoid including acquire-worker.h */
480pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
481{
482 return I->NextAcquire;
d3e8fbb3 483}
8267fe24 484 /*}}}*/
a6568219 485// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
486// ---------------------------------------------------------------------
487/* This is a bit simplistic, it looks at every file in the dir and sees
488 if it is part of the download set. */
489bool pkgAcquire::Clean(string Dir)
490{
95b5f6c1
DK
491 // non-existing directories are by definition clean…
492 if (DirectoryExists(Dir) == false)
493 return true;
494
10ecfe4f
MV
495 if(Dir == "/")
496 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
497
7a7fa5f0
AL
498 DIR *D = opendir(Dir.c_str());
499 if (D == 0)
b2e465d6 500 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
501
502 string StartDir = SafeGetCWD();
503 if (chdir(Dir.c_str()) != 0)
504 {
505 closedir(D);
b2e465d6 506 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
507 }
508
509 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
510 {
511 // Skip some files..
512 if (strcmp(Dir->d_name,"lock") == 0 ||
513 strcmp(Dir->d_name,"partial") == 0 ||
514 strcmp(Dir->d_name,".") == 0 ||
515 strcmp(Dir->d_name,"..") == 0)
516 continue;
517
518 // Look in the get list
b4fc9b6f 519 ItemCIterator I = Items.begin();
f7f0d6c7 520 for (; I != Items.end(); ++I)
7a7fa5f0
AL
521 if (flNotDir((*I)->DestFile) == Dir->d_name)
522 break;
523
524 // Nothing found, nuke it
525 if (I == Items.end())
526 unlink(Dir->d_name);
527 };
528
7a7fa5f0 529 closedir(D);
3c8cda8b
MV
530 if (chdir(StartDir.c_str()) != 0)
531 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
532 return true;
533}
534 /*}}}*/
a6568219
AL
535// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
536// ---------------------------------------------------------------------
537/* This is the total number of bytes needed */
a02db58f 538APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 539{
a3c4c81a 540 unsigned long long Total = 0;
f7f0d6c7 541 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
542 Total += (*I)->FileSize;
543 return Total;
544}
545 /*}}}*/
546// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
547// ---------------------------------------------------------------------
548/* This is the number of bytes that is not local */
a02db58f 549APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 550{
a3c4c81a 551 unsigned long long Total = 0;
f7f0d6c7 552 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
553 if ((*I)->Local == false)
554 Total += (*I)->FileSize;
555 return Total;
556}
557 /*}}}*/
6b1ff003
AL
558// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
559// ---------------------------------------------------------------------
560/* This is the number of bytes that is not local */
a02db58f 561APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 562{
a3c4c81a 563 unsigned long long Total = 0;
f7f0d6c7 564 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
565 if ((*I)->Local == false)
566 Total += (*I)->PartialSize;
567 return Total;
568}
92fcbfc1 569 /*}}}*/
8e5fc8f5 570// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
571// ---------------------------------------------------------------------
572/* */
573pkgAcquire::UriIterator pkgAcquire::UriBegin()
574{
575 return UriIterator(Queues);
576}
577 /*}}}*/
8e5fc8f5 578// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
579// ---------------------------------------------------------------------
580/* */
581pkgAcquire::UriIterator pkgAcquire::UriEnd()
582{
583 return UriIterator(0);
584}
585 /*}}}*/
e331f6ed
AL
586// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
587// ---------------------------------------------------------------------
588/* */
25613a61
DK
589pkgAcquire::MethodConfig::MethodConfig() : d(NULL), Next(0), SingleInstance(false),
590 Pipeline(false), SendConfig(false), LocalOnly(false), NeedsCleanup(false),
591 Removable(false)
e331f6ed 592{
e331f6ed
AL
593}
594 /*}}}*/
0a8a80e5
AL
595// Queue::Queue - Constructor /*{{{*/
596// ---------------------------------------------------------------------
597/* */
25613a61
DK
598pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : d(NULL), Next(0),
599 Name(Name), Items(0), Workers(0), Owner(Owner), PipeDepth(0), MaxPipeDepth(1)
0a8a80e5 600{
0a8a80e5
AL
601}
602 /*}}}*/
603// Queue::~Queue - Destructor /*{{{*/
604// ---------------------------------------------------------------------
605/* */
606pkgAcquire::Queue::~Queue()
607{
8e5fc8f5 608 Shutdown(true);
0a8a80e5
AL
609
610 while (Items != 0)
611 {
612 QItem *Jnk = Items;
613 Items = Items->Next;
614 delete Jnk;
615 }
616}
617 /*}}}*/
618// Queue::Enqueue - Queue an item to the queue /*{{{*/
619// ---------------------------------------------------------------------
620/* */
c03462c6 621bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 622{
7a1b1f8b 623 QItem **I = &Items;
c03462c6
MV
624 // move to the end of the queue and check for duplicates here
625 for (; *I != 0; I = &(*I)->Next)
626 if (Item.URI == (*I)->URI)
627 {
628 Item.Owner->Status = Item::StatDone;
629 return false;
630 }
631
0a8a80e5 632 // Create a new item
7a1b1f8b
AL
633 QItem *Itm = new QItem;
634 *Itm = Item;
635 Itm->Next = 0;
636 *I = Itm;
0a8a80e5 637
8267fe24 638 Item.Owner->QueueCounter++;
93bf083d
AL
639 if (Items->Next == 0)
640 Cycle();
c03462c6 641 return true;
0a8a80e5
AL
642}
643 /*}}}*/
c88edf1d 644// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 645// ---------------------------------------------------------------------
b185acc2 646/* We return true if we hit something */
bfd22fc0 647bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 648{
b185acc2
AL
649 if (Owner->Status == pkgAcquire::Item::StatFetching)
650 return _error->Error("Tried to dequeue a fetching object");
651
bfd22fc0
AL
652 bool Res = false;
653
0a8a80e5
AL
654 QItem **I = &Items;
655 for (; *I != 0;)
656 {
657 if ((*I)->Owner == Owner)
658 {
659 QItem *Jnk= *I;
660 *I = (*I)->Next;
661 Owner->QueueCounter--;
662 delete Jnk;
bfd22fc0 663 Res = true;
0a8a80e5
AL
664 }
665 else
666 I = &(*I)->Next;
667 }
bfd22fc0
AL
668
669 return Res;
0a8a80e5
AL
670}
671 /*}}}*/
672// Queue::Startup - Start the worker processes /*{{{*/
673// ---------------------------------------------------------------------
8e5fc8f5
AL
674/* It is possible for this to be called with a pre-existing set of
675 workers. */
0a8a80e5
AL
676bool pkgAcquire::Queue::Startup()
677{
8e5fc8f5
AL
678 if (Workers == 0)
679 {
680 URI U(Name);
681 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
682 if (Cnf == 0)
683 return false;
684
685 Workers = new Worker(this,Cnf,Owner->Log);
686 Owner->Add(Workers);
687 if (Workers->Start() == false)
688 return false;
689
690 /* When pipelining we commit 10 items. This needs to change when we
691 added other source retry to have cycle maintain a pipeline depth
692 on its own. */
693 if (Cnf->Pipeline == true)
6ce72612 694 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
695 else
696 MaxPipeDepth = 1;
697 }
5cb5d8dc 698
93bf083d 699 return Cycle();
0a8a80e5
AL
700}
701 /*}}}*/
702// Queue::Shutdown - Shutdown the worker processes /*{{{*/
703// ---------------------------------------------------------------------
8e5fc8f5
AL
704/* If final is true then all workers are eliminated, otherwise only workers
705 that do not need cleanup are removed */
706bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
707{
708 // Delete all of the workers
8e5fc8f5
AL
709 pkgAcquire::Worker **Cur = &Workers;
710 while (*Cur != 0)
0a8a80e5 711 {
8e5fc8f5
AL
712 pkgAcquire::Worker *Jnk = *Cur;
713 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
714 {
715 *Cur = Jnk->NextQueue;
716 Owner->Remove(Jnk);
717 delete Jnk;
718 }
719 else
720 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
721 }
722
723 return true;
3b5421b4
AL
724}
725 /*}}}*/
7d8afa39 726// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
727// ---------------------------------------------------------------------
728/* */
729pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
730{
731 for (QItem *I = Items; I != 0; I = I->Next)
732 if (I->URI == URI && I->Worker == Owner)
733 return I;
734 return 0;
735}
736 /*}}}*/
737// Queue::ItemDone - Item has been completed /*{{{*/
738// ---------------------------------------------------------------------
739/* The worker signals this which causes the item to be removed from the
93bf083d
AL
740 queue. If this is the last queue instance then it is removed from the
741 main queue too.*/
c88edf1d
AL
742bool pkgAcquire::Queue::ItemDone(QItem *Itm)
743{
b185acc2 744 PipeDepth--;
db890fdb
AL
745 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
746 Itm->Owner->Status = pkgAcquire::Item::StatDone;
747
93bf083d
AL
748 if (Itm->Owner->QueueCounter <= 1)
749 Owner->Dequeue(Itm->Owner);
750 else
751 {
752 Dequeue(Itm->Owner);
753 Owner->Bump();
754 }
c88edf1d 755
93bf083d
AL
756 return Cycle();
757}
758 /*}}}*/
759// Queue::Cycle - Queue new items into the method /*{{{*/
760// ---------------------------------------------------------------------
b185acc2
AL
761/* This locates a new idle item and sends it to the worker. If pipelining
762 is enabled then it keeps the pipe full. */
93bf083d
AL
763bool pkgAcquire::Queue::Cycle()
764{
765 if (Items == 0 || Workers == 0)
c88edf1d
AL
766 return true;
767
e7432370
AL
768 if (PipeDepth < 0)
769 return _error->Error("Pipedepth failure");
770
93bf083d
AL
771 // Look for a queable item
772 QItem *I = Items;
e7432370 773 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
774 {
775 for (; I != 0; I = I->Next)
776 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
777 break;
778
779 // Nothing to do, queue is idle.
780 if (I == 0)
781 return true;
782
783 I->Worker = Workers;
784 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 785 PipeDepth++;
b185acc2
AL
786 if (Workers->QueueItem(I) == false)
787 return false;
788 }
93bf083d 789
b185acc2 790 return true;
c88edf1d
AL
791}
792 /*}}}*/
be4401bf
AL
793// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
794// ---------------------------------------------------------------------
b185acc2 795/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
796void pkgAcquire::Queue::Bump()
797{
b185acc2 798 Cycle();
be4401bf
AL
799}
800 /*}}}*/
b98f2859
AL
801// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
802// ---------------------------------------------------------------------
803/* */
25613a61 804pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Percent(0), Update(true), MorePulses(false)
b98f2859
AL
805{
806 Start();
807}
808 /*}}}*/
809// AcquireStatus::Pulse - Called periodically /*{{{*/
810// ---------------------------------------------------------------------
811/* This computes some internal state variables for the derived classes to
812 use. It generates the current downloaded bytes and total bytes to download
813 as well as the current CPS estimate. */
024d1123 814bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
815{
816 TotalBytes = 0;
817 CurrentBytes = 0;
d568ed2d
AL
818 TotalItems = 0;
819 CurrentItems = 0;
b98f2859
AL
820
821 // Compute the total number of bytes to fetch
822 unsigned int Unknown = 0;
823 unsigned int Count = 0;
c6e9cc58
MV
824 bool UnfetchedReleaseFiles = false;
825 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
826 I != Owner->ItemsEnd();
f7f0d6c7 827 ++I, ++Count)
b98f2859 828 {
d568ed2d
AL
829 TotalItems++;
830 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 831 ++CurrentItems;
d568ed2d 832
a6568219
AL
833 // Totally ignore local items
834 if ((*I)->Local == true)
835 continue;
b2e465d6 836
d0cfa8ad
MV
837 // see if the method tells us to expect more
838 TotalItems += (*I)->ExpectedAdditionalItems;
839
c6e9cc58
MV
840 // check if there are unfetched Release files
841 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
842 UnfetchedReleaseFiles = true;
843
b98f2859
AL
844 TotalBytes += (*I)->FileSize;
845 if ((*I)->Complete == true)
846 CurrentBytes += (*I)->FileSize;
847 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 848 ++Unknown;
b98f2859
AL
849 }
850
851 // Compute the current completion
dbbc5494 852 unsigned long long ResumeSize = 0;
b98f2859
AL
853 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
854 I = Owner->WorkerStep(I))
c62f7898 855 {
b98f2859 856 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
857 {
858 CurrentBytes += I->CurrentSize;
859 ResumeSize += I->ResumePoint;
860
861 // Files with unknown size always have 100% completion
862 if (I->CurrentItem->Owner->FileSize == 0 &&
863 I->CurrentItem->Owner->Complete == false)
864 TotalBytes += I->CurrentSize;
865 }
c62f7898 866 }
aa0e1101 867
b98f2859
AL
868 // Normalize the figures and account for unknown size downloads
869 if (TotalBytes <= 0)
870 TotalBytes = 1;
871 if (Unknown == Count)
872 TotalBytes = Unknown;
18ef0a78
AL
873
874 // Wha?! Is not supposed to happen.
875 if (CurrentBytes > TotalBytes)
876 CurrentBytes = TotalBytes;
96c6cab1
MV
877
878 // debug
879 if (_config->FindB("Debug::acquire::progress", false) == true)
880 std::clog << " Bytes: "
881 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
882 << std::endl;
b98f2859
AL
883
884 // Compute the CPS
885 struct timeval NewTime;
886 gettimeofday(&NewTime,0);
2ec1674d 887 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
888 NewTime.tv_sec - Time.tv_sec > 6)
889 {
f17ac097
AL
890 double Delta = NewTime.tv_sec - Time.tv_sec +
891 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 892
b98f2859 893 // Compute the CPS value
f17ac097 894 if (Delta < 0.01)
e331f6ed
AL
895 CurrentCPS = 0;
896 else
aa0e1101
AL
897 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
898 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 899 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
900 Time = NewTime;
901 }
024d1123 902
c6e9cc58
MV
903 // calculate the percentage, if we have too little data assume 1%
904 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
905 Percent = 0;
906 else
907 // use both files and bytes because bytes can be unreliable
908 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
909 0.2 * (CurrentItems/float(TotalItems)*100.0));
910
75ef8f14
MV
911 int fd = _config->FindI("APT::Status-Fd",-1);
912 if(fd > 0)
913 {
914 ostringstream status;
915
916 char msg[200];
917 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
918 unsigned long long ETA = 0;
919 if(CurrentCPS > 0)
920 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 921
1e8b4c0f
MV
922 // only show the ETA if it makes sense
923 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 924 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 925 else
0c508b03 926 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 927
75ef8f14
MV
928 // build the status str
929 status << "dlstatus:" << i
96c6cab1 930 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
931 << ":" << msg
932 << endl;
31bda500
DK
933
934 std::string const dlstatus = status.str();
d68d65ad 935 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
936 }
937
024d1123 938 return true;
b98f2859
AL
939}
940 /*}}}*/
941// AcquireStatus::Start - Called when the download is started /*{{{*/
942// ---------------------------------------------------------------------
943/* We just reset the counters */
944void pkgAcquireStatus::Start()
945{
946 gettimeofday(&Time,0);
947 gettimeofday(&StartTime,0);
948 LastBytes = 0;
949 CurrentCPS = 0;
950 CurrentBytes = 0;
951 TotalBytes = 0;
952 FetchedBytes = 0;
953 ElapsedTime = 0;
d568ed2d
AL
954 TotalItems = 0;
955 CurrentItems = 0;
b98f2859
AL
956}
957 /*}}}*/
a6568219 958// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
959// ---------------------------------------------------------------------
960/* This accurately computes the elapsed time and the total overall CPS. */
961void pkgAcquireStatus::Stop()
962{
963 // Compute the CPS and elapsed time
964 struct timeval NewTime;
965 gettimeofday(&NewTime,0);
966
31a0531d
AL
967 double Delta = NewTime.tv_sec - StartTime.tv_sec +
968 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 969
b98f2859 970 // Compute the CPS value
31a0531d 971 if (Delta < 0.01)
e331f6ed
AL
972 CurrentCPS = 0;
973 else
31a0531d 974 CurrentCPS = FetchedBytes/Delta;
b98f2859 975 LastBytes = CurrentBytes;
dbbc5494 976 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
977}
978 /*}}}*/
979// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
980// ---------------------------------------------------------------------
981/* This is used to get accurate final transfer rate reporting. */
73da43e9 982void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 983{
b98f2859
AL
984 FetchedBytes += Size - Resume;
985}
986 /*}}}*/