]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Merge remote-tracking branch 'upstream/debian/experimental' into feature/acq-trans
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
96c6cab1 34#include <iomanip>
526334a0 35
7a7fa5f0 36#include <dirent.h>
8267fe24 37#include <sys/time.h>
453b82a3 38#include <sys/select.h>
524f8105 39#include <errno.h>
56472095 40#include <sys/stat.h>
ea542140
DK
41
42#include <apti18n.h>
0118833a
AL
43 /*}}}*/
44
b4fc9b6f
AL
45using namespace std;
46
0118833a
AL
47// Acquire::pkgAcquire - Constructor /*{{{*/
48// ---------------------------------------------------------------------
93bf083d 49/* We grab some runtime state from the configuration space */
5efbd596 50pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 51 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 52 Running(false)
0118833a 53{
1cd1c398 54 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
55 if (strcasecmp(Mode.c_str(),"host") == 0)
56 QueueMode = QueueHost;
57 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
58 QueueMode = QueueAccess;
59}
5efbd596 60pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
61 Configs(0), Log(Progress), ToFetch(0),
62 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 63 Running(false)
1cd1c398
DK
64{
65 string const Mode = _config->Find("Acquire::Queue-Mode","host");
66 if (strcasecmp(Mode.c_str(),"host") == 0)
67 QueueMode = QueueHost;
68 if (strcasecmp(Mode.c_str(),"access") == 0)
69 QueueMode = QueueAccess;
70 Setup(Progress, "");
71}
72 /*}}}*/
73// Acquire::Setup - Delayed Constructor /*{{{*/
74// ---------------------------------------------------------------------
75/* Do everything needed to be a complete Acquire object and report the
76 success (or failure) back so the user knows that something is wrong… */
77bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
78{
79 Log = Progress;
0a8a80e5 80
1cd1c398 81 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
82 string const listDir = _config->FindDir("Dir::State::lists");
83 string const partialListDir = listDir + "partial/";
84 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
85 string const partialArchivesDir = archivesDir + "partial/";
86
7753e468
DK
87 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
88 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
89 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
90
7753e468
DK
91 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
92 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 93 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
94
95 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
96 return true;
97
98 // Lock the directory this acquire object will work in
99 LockFD = GetLock(flCombine(Lock, "lock"));
100 if (LockFD == -1)
101 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
102
103 return true;
104}
105 /*}}}*/
0118833a
AL
106// Acquire::~pkgAcquire - Destructor /*{{{*/
107// ---------------------------------------------------------------------
93bf083d 108/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
109pkgAcquire::~pkgAcquire()
110{
459681d3 111 Shutdown();
1cd1c398
DK
112
113 if (LockFD != -1)
114 close(LockFD);
115
3b5421b4
AL
116 while (Configs != 0)
117 {
118 MethodConfig *Jnk = Configs;
119 Configs = Configs->Next;
120 delete Jnk;
121 }
281daf46
AL
122}
123 /*}}}*/
8e5fc8f5 124// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
125// ---------------------------------------------------------------------
126/* */
127void pkgAcquire::Shutdown()
128{
f7f0d6c7 129 while (Items.empty() == false)
1b480911
AL
130 {
131 if (Items[0]->Status == Item::StatFetching)
132 Items[0]->Status = Item::StatError;
281daf46 133 delete Items[0];
1b480911 134 }
0a8a80e5
AL
135
136 while (Queues != 0)
137 {
138 Queue *Jnk = Queues;
139 Queues = Queues->Next;
140 delete Jnk;
141 }
0118833a
AL
142}
143 /*}}}*/
144// Acquire::Add - Add a new item /*{{{*/
145// ---------------------------------------------------------------------
93bf083d
AL
146/* This puts an item on the acquire list. This list is mainly for tracking
147 item status */
0118833a
AL
148void pkgAcquire::Add(Item *Itm)
149{
150 Items.push_back(Itm);
151}
152 /*}}}*/
153// Acquire::Remove - Remove a item /*{{{*/
154// ---------------------------------------------------------------------
93bf083d 155/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
156void pkgAcquire::Remove(Item *Itm)
157{
a3eaf954
AL
158 Dequeue(Itm);
159
753b3525 160 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
161 {
162 if (*I == Itm)
b4fc9b6f 163 {
0118833a 164 Items.erase(I);
b4fc9b6f
AL
165 I = Items.begin();
166 }
753b3525 167 else
f7f0d6c7 168 ++I;
8267fe24 169 }
0118833a
AL
170}
171 /*}}}*/
0a8a80e5
AL
172// Acquire::Add - Add a worker /*{{{*/
173// ---------------------------------------------------------------------
93bf083d
AL
174/* A list of workers is kept so that the select loop can direct their FD
175 usage. */
0a8a80e5
AL
176void pkgAcquire::Add(Worker *Work)
177{
178 Work->NextAcquire = Workers;
179 Workers = Work;
180}
181 /*}}}*/
182// Acquire::Remove - Remove a worker /*{{{*/
183// ---------------------------------------------------------------------
93bf083d
AL
184/* A worker has died. This can not be done while the select loop is running
185 as it would require that RunFds could handling a changing list state and
1e3f4083 186 it can't.. */
0a8a80e5
AL
187void pkgAcquire::Remove(Worker *Work)
188{
93bf083d
AL
189 if (Running == true)
190 abort();
191
0a8a80e5
AL
192 Worker **I = &Workers;
193 for (; *I != 0;)
194 {
195 if (*I == Work)
196 *I = (*I)->NextAcquire;
197 else
198 I = &(*I)->NextAcquire;
199 }
200}
201 /*}}}*/
0118833a
AL
202// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
203// ---------------------------------------------------------------------
93bf083d 204/* This is the entry point for an item. An item calls this function when
281daf46 205 it is constructed which creates a queue (based on the current queue
93bf083d
AL
206 mode) and puts the item in that queue. If the system is running then
207 the queue might be started. */
8267fe24 208void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 209{
0a8a80e5 210 // Determine which queue to put the item in
e331f6ed
AL
211 const MethodConfig *Config;
212 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
213 if (Name.empty() == true)
214 return;
215
216 // Find the queue structure
217 Queue *I = Queues;
218 for (; I != 0 && I->Name != Name; I = I->Next);
219 if (I == 0)
220 {
221 I = new Queue(Name,this);
222 I->Next = Queues;
223 Queues = I;
93bf083d
AL
224
225 if (Running == true)
226 I->Startup();
0a8a80e5 227 }
bfd22fc0 228
e331f6ed
AL
229 // See if this is a local only URI
230 if (Config->LocalOnly == true && Item.Owner->Complete == false)
231 Item.Owner->Local = true;
8267fe24 232 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
233
234 // Queue it into the named queue
c03462c6
MV
235 if(I->Enqueue(Item))
236 ToFetch++;
237
0a8a80e5
AL
238 // Some trace stuff
239 if (Debug == true)
240 {
8267fe24
AL
241 clog << "Fetching " << Item.URI << endl;
242 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 243 clog << " Queue is: " << Name << endl;
0a8a80e5 244 }
3b5421b4
AL
245}
246 /*}}}*/
0a8a80e5 247// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 248// ---------------------------------------------------------------------
93bf083d
AL
249/* This is called when an item is finished being fetched. It removes it
250 from all the queues */
0a8a80e5
AL
251void pkgAcquire::Dequeue(Item *Itm)
252{
253 Queue *I = Queues;
bfd22fc0 254 bool Res = false;
93bf083d
AL
255 if (Debug == true)
256 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
257
258 for (; I != 0; I = I->Next)
259 {
260 if (I->Dequeue(Itm))
261 {
262 Res = true;
263 if (Debug == true)
264 clog << "Dequeued from " << I->Name << endl;
265 }
266 }
267
bfd22fc0
AL
268 if (Res == true)
269 ToFetch--;
0a8a80e5
AL
270}
271 /*}}}*/
272// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
273// ---------------------------------------------------------------------
274/* The string returned depends on the configuration settings and the
275 method parameters. Given something like http://foo.org/bar it can
276 return http://foo.org or http */
e331f6ed 277string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 278{
93bf083d
AL
279 URI U(Uri);
280
e331f6ed 281 Config = GetConfig(U.Access);
0a8a80e5
AL
282 if (Config == 0)
283 return string();
284
285 /* Single-Instance methods get exactly one queue per URI. This is
286 also used for the Access queue method */
287 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
288 return U.Access;
289
290 string AccessSchema = U.Access + ':',
291 FullQueueName = AccessSchema + U.Host;
292 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
293
294 Queue *I = Queues;
295 for (; I != 0; I = I->Next) {
296 // if the queue already exists, re-use it
297 if (I->Name == FullQueueName)
298 return FullQueueName;
299
300 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
301 Instances++;
302 }
303
304 if (Debug) {
305 clog << "Found " << Instances << " instances of " << U.Access << endl;
306 }
307
308 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
309 return U.Access;
93bf083d 310
5674f6b3 311 return FullQueueName;
0118833a
AL
312}
313 /*}}}*/
3b5421b4
AL
314// Acquire::GetConfig - Fetch the configuration information /*{{{*/
315// ---------------------------------------------------------------------
316/* This locates the configuration structure for an access method. If
317 a config structure cannot be found a Worker will be created to
318 retrieve it */
0a8a80e5 319pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
320{
321 // Search for an existing config
322 MethodConfig *Conf;
323 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
324 if (Conf->Access == Access)
325 return Conf;
326
327 // Create the new config class
328 Conf = new MethodConfig;
329 Conf->Access = Access;
330 Conf->Next = Configs;
331 Configs = Conf;
0118833a 332
3b5421b4
AL
333 // Create the worker to fetch the configuration
334 Worker Work(Conf);
335 if (Work.Start() == false)
336 return 0;
7c6e2dc7
MV
337
338 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 339 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
340 Conf->SingleInstance = true;
341
3b5421b4
AL
342 return Conf;
343}
344 /*}}}*/
0a8a80e5
AL
345// Acquire::SetFds - Deal with readable FDs /*{{{*/
346// ---------------------------------------------------------------------
347/* Collect FDs that have activity monitors into the fd sets */
348void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
349{
350 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
351 {
352 if (I->InReady == true && I->InFd >= 0)
353 {
354 if (Fd < I->InFd)
355 Fd = I->InFd;
356 FD_SET(I->InFd,RSet);
357 }
358 if (I->OutReady == true && I->OutFd >= 0)
359 {
360 if (Fd < I->OutFd)
361 Fd = I->OutFd;
362 FD_SET(I->OutFd,WSet);
363 }
364 }
365}
366 /*}}}*/
367// Acquire::RunFds - Deal with active FDs /*{{{*/
368// ---------------------------------------------------------------------
93bf083d
AL
369/* Dispatch active FDs over to the proper workers. It is very important
370 that a worker never be erased while this is running! The queue class
371 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
372void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
373{
374 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
375 {
376 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
377 I->InFdReady();
378 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
379 I->OutFdReady();
380 }
381}
382 /*}}}*/
383// Acquire::Run - Run the fetch sequence /*{{{*/
384// ---------------------------------------------------------------------
385/* This runs the queues. It manages a select loop for all of the
386 Worker tasks. The workers interact with the queues and items to
387 manage the actual fetch. */
1c5f7e5f 388pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 389{
8b89e57f
AL
390 Running = true;
391
0a8a80e5
AL
392 for (Queue *I = Queues; I != 0; I = I->Next)
393 I->Startup();
394
b98f2859
AL
395 if (Log != 0)
396 Log->Start();
397
024d1123
AL
398 bool WasCancelled = false;
399
0a8a80e5 400 // Run till all things have been acquired
8267fe24
AL
401 struct timeval tv;
402 tv.tv_sec = 0;
1c5f7e5f 403 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
404 while (ToFetch > 0)
405 {
406 fd_set RFds;
407 fd_set WFds;
408 int Highest = 0;
409 FD_ZERO(&RFds);
410 FD_ZERO(&WFds);
411 SetFds(Highest,&RFds,&WFds);
412
b0db36b1
AL
413 int Res;
414 do
415 {
416 Res = select(Highest+1,&RFds,&WFds,0,&tv);
417 }
418 while (Res < 0 && errno == EINTR);
419
8267fe24 420 if (Res < 0)
8b89e57f 421 {
8267fe24
AL
422 _error->Errno("select","Select has failed");
423 break;
8b89e57f 424 }
93bf083d 425
0a8a80e5 426 RunFds(&RFds,&WFds);
93bf083d
AL
427 if (_error->PendingError() == true)
428 break;
8267fe24
AL
429
430 // Timeout, notify the log class
431 if (Res == 0 || (Log != 0 && Log->Update == true))
432 {
1c5f7e5f 433 tv.tv_usec = PulseIntervall;
8267fe24
AL
434 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
435 I->Pulse();
024d1123
AL
436 if (Log != 0 && Log->Pulse(this) == false)
437 {
438 WasCancelled = true;
439 break;
440 }
8267fe24 441 }
0a8a80e5 442 }
be4401bf 443
b98f2859
AL
444 if (Log != 0)
445 Log->Stop();
446
be4401bf
AL
447 // Shut down the acquire bits
448 Running = false;
0a8a80e5 449 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 450 I->Shutdown(false);
0a8a80e5 451
ab559b35 452 // Shut down the items
f7f0d6c7 453 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 454 (*I)->Finished();
ab559b35 455
024d1123
AL
456 if (_error->PendingError())
457 return Failed;
458 if (WasCancelled)
459 return Cancelled;
460 return Continue;
93bf083d
AL
461}
462 /*}}}*/
be4401bf 463// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
464// ---------------------------------------------------------------------
465/* This routine bumps idle queues in hopes that they will be able to fetch
466 the dequeued item */
467void pkgAcquire::Bump()
468{
be4401bf
AL
469 for (Queue *I = Queues; I != 0; I = I->Next)
470 I->Bump();
0a8a80e5
AL
471}
472 /*}}}*/
8267fe24
AL
473// Acquire::WorkerStep - Step to the next worker /*{{{*/
474// ---------------------------------------------------------------------
475/* Not inlined to advoid including acquire-worker.h */
476pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
477{
478 return I->NextAcquire;
d3e8fbb3 479}
8267fe24 480 /*}}}*/
a6568219 481// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
482// ---------------------------------------------------------------------
483/* This is a bit simplistic, it looks at every file in the dir and sees
484 if it is part of the download set. */
485bool pkgAcquire::Clean(string Dir)
486{
95b5f6c1
DK
487 // non-existing directories are by definition clean…
488 if (DirectoryExists(Dir) == false)
489 return true;
490
10ecfe4f
MV
491 if(Dir == "/")
492 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
493
7a7fa5f0
AL
494 DIR *D = opendir(Dir.c_str());
495 if (D == 0)
b2e465d6 496 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
497
498 string StartDir = SafeGetCWD();
499 if (chdir(Dir.c_str()) != 0)
500 {
501 closedir(D);
b2e465d6 502 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
503 }
504
505 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
506 {
507 // Skip some files..
508 if (strcmp(Dir->d_name,"lock") == 0 ||
509 strcmp(Dir->d_name,"partial") == 0 ||
510 strcmp(Dir->d_name,".") == 0 ||
511 strcmp(Dir->d_name,"..") == 0)
512 continue;
513
514 // Look in the get list
b4fc9b6f 515 ItemCIterator I = Items.begin();
f7f0d6c7 516 for (; I != Items.end(); ++I)
7a7fa5f0
AL
517 if (flNotDir((*I)->DestFile) == Dir->d_name)
518 break;
519
520 // Nothing found, nuke it
521 if (I == Items.end())
522 unlink(Dir->d_name);
523 };
524
7a7fa5f0 525 closedir(D);
3c8cda8b
MV
526 if (chdir(StartDir.c_str()) != 0)
527 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
528 return true;
529}
530 /*}}}*/
a6568219
AL
531// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
532// ---------------------------------------------------------------------
533/* This is the total number of bytes needed */
a02db58f 534APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 535{
a3c4c81a 536 unsigned long long Total = 0;
f7f0d6c7 537 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
538 Total += (*I)->FileSize;
539 return Total;
540}
541 /*}}}*/
542// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
543// ---------------------------------------------------------------------
544/* This is the number of bytes that is not local */
a02db58f 545APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 546{
a3c4c81a 547 unsigned long long Total = 0;
f7f0d6c7 548 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
549 if ((*I)->Local == false)
550 Total += (*I)->FileSize;
551 return Total;
552}
553 /*}}}*/
6b1ff003
AL
554// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
555// ---------------------------------------------------------------------
556/* This is the number of bytes that is not local */
a02db58f 557APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 558{
a3c4c81a 559 unsigned long long Total = 0;
f7f0d6c7 560 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
561 if ((*I)->Local == false)
562 Total += (*I)->PartialSize;
563 return Total;
564}
92fcbfc1 565 /*}}}*/
8e5fc8f5 566// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
567// ---------------------------------------------------------------------
568/* */
569pkgAcquire::UriIterator pkgAcquire::UriBegin()
570{
571 return UriIterator(Queues);
572}
573 /*}}}*/
8e5fc8f5 574// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
575// ---------------------------------------------------------------------
576/* */
577pkgAcquire::UriIterator pkgAcquire::UriEnd()
578{
579 return UriIterator(0);
580}
581 /*}}}*/
e331f6ed
AL
582// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
583// ---------------------------------------------------------------------
584/* */
585pkgAcquire::MethodConfig::MethodConfig()
586{
587 SingleInstance = false;
e331f6ed
AL
588 Pipeline = false;
589 SendConfig = false;
590 LocalOnly = false;
459681d3 591 Removable = false;
e331f6ed
AL
592 Next = 0;
593}
594 /*}}}*/
0a8a80e5
AL
595// Queue::Queue - Constructor /*{{{*/
596// ---------------------------------------------------------------------
597/* */
598pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
599 Owner(Owner)
600{
601 Items = 0;
602 Next = 0;
603 Workers = 0;
b185acc2
AL
604 MaxPipeDepth = 1;
605 PipeDepth = 0;
0a8a80e5
AL
606}
607 /*}}}*/
608// Queue::~Queue - Destructor /*{{{*/
609// ---------------------------------------------------------------------
610/* */
611pkgAcquire::Queue::~Queue()
612{
8e5fc8f5 613 Shutdown(true);
0a8a80e5
AL
614
615 while (Items != 0)
616 {
617 QItem *Jnk = Items;
618 Items = Items->Next;
619 delete Jnk;
620 }
621}
622 /*}}}*/
623// Queue::Enqueue - Queue an item to the queue /*{{{*/
624// ---------------------------------------------------------------------
625/* */
c03462c6 626bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 627{
7a1b1f8b 628 QItem **I = &Items;
c03462c6
MV
629 // move to the end of the queue and check for duplicates here
630 for (; *I != 0; I = &(*I)->Next)
631 if (Item.URI == (*I)->URI)
632 {
633 Item.Owner->Status = Item::StatDone;
634 return false;
635 }
636
0a8a80e5 637 // Create a new item
7a1b1f8b
AL
638 QItem *Itm = new QItem;
639 *Itm = Item;
640 Itm->Next = 0;
641 *I = Itm;
0a8a80e5 642
8267fe24 643 Item.Owner->QueueCounter++;
93bf083d
AL
644 if (Items->Next == 0)
645 Cycle();
c03462c6 646 return true;
0a8a80e5
AL
647}
648 /*}}}*/
c88edf1d 649// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 650// ---------------------------------------------------------------------
b185acc2 651/* We return true if we hit something */
bfd22fc0 652bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 653{
b185acc2
AL
654 if (Owner->Status == pkgAcquire::Item::StatFetching)
655 return _error->Error("Tried to dequeue a fetching object");
656
bfd22fc0
AL
657 bool Res = false;
658
0a8a80e5
AL
659 QItem **I = &Items;
660 for (; *I != 0;)
661 {
662 if ((*I)->Owner == Owner)
663 {
664 QItem *Jnk= *I;
665 *I = (*I)->Next;
666 Owner->QueueCounter--;
667 delete Jnk;
bfd22fc0 668 Res = true;
0a8a80e5
AL
669 }
670 else
671 I = &(*I)->Next;
672 }
bfd22fc0
AL
673
674 return Res;
0a8a80e5
AL
675}
676 /*}}}*/
677// Queue::Startup - Start the worker processes /*{{{*/
678// ---------------------------------------------------------------------
8e5fc8f5
AL
679/* It is possible for this to be called with a pre-existing set of
680 workers. */
0a8a80e5
AL
681bool pkgAcquire::Queue::Startup()
682{
8e5fc8f5
AL
683 if (Workers == 0)
684 {
685 URI U(Name);
686 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
687 if (Cnf == 0)
688 return false;
689
690 Workers = new Worker(this,Cnf,Owner->Log);
691 Owner->Add(Workers);
692 if (Workers->Start() == false)
693 return false;
694
695 /* When pipelining we commit 10 items. This needs to change when we
696 added other source retry to have cycle maintain a pipeline depth
697 on its own. */
698 if (Cnf->Pipeline == true)
6ce72612 699 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
700 else
701 MaxPipeDepth = 1;
702 }
5cb5d8dc 703
93bf083d 704 return Cycle();
0a8a80e5
AL
705}
706 /*}}}*/
707// Queue::Shutdown - Shutdown the worker processes /*{{{*/
708// ---------------------------------------------------------------------
8e5fc8f5
AL
709/* If final is true then all workers are eliminated, otherwise only workers
710 that do not need cleanup are removed */
711bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
712{
713 // Delete all of the workers
8e5fc8f5
AL
714 pkgAcquire::Worker **Cur = &Workers;
715 while (*Cur != 0)
0a8a80e5 716 {
8e5fc8f5
AL
717 pkgAcquire::Worker *Jnk = *Cur;
718 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
719 {
720 *Cur = Jnk->NextQueue;
721 Owner->Remove(Jnk);
722 delete Jnk;
723 }
724 else
725 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
726 }
727
728 return true;
3b5421b4
AL
729}
730 /*}}}*/
7d8afa39 731// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
732// ---------------------------------------------------------------------
733/* */
734pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
735{
736 for (QItem *I = Items; I != 0; I = I->Next)
737 if (I->URI == URI && I->Worker == Owner)
738 return I;
739 return 0;
740}
741 /*}}}*/
742// Queue::ItemDone - Item has been completed /*{{{*/
743// ---------------------------------------------------------------------
744/* The worker signals this which causes the item to be removed from the
93bf083d
AL
745 queue. If this is the last queue instance then it is removed from the
746 main queue too.*/
c88edf1d
AL
747bool pkgAcquire::Queue::ItemDone(QItem *Itm)
748{
b185acc2 749 PipeDepth--;
db890fdb
AL
750 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
751 Itm->Owner->Status = pkgAcquire::Item::StatDone;
752
93bf083d
AL
753 if (Itm->Owner->QueueCounter <= 1)
754 Owner->Dequeue(Itm->Owner);
755 else
756 {
757 Dequeue(Itm->Owner);
758 Owner->Bump();
759 }
c88edf1d 760
93bf083d
AL
761 return Cycle();
762}
763 /*}}}*/
764// Queue::Cycle - Queue new items into the method /*{{{*/
765// ---------------------------------------------------------------------
b185acc2
AL
766/* This locates a new idle item and sends it to the worker. If pipelining
767 is enabled then it keeps the pipe full. */
93bf083d
AL
768bool pkgAcquire::Queue::Cycle()
769{
770 if (Items == 0 || Workers == 0)
c88edf1d
AL
771 return true;
772
e7432370
AL
773 if (PipeDepth < 0)
774 return _error->Error("Pipedepth failure");
775
93bf083d
AL
776 // Look for a queable item
777 QItem *I = Items;
e7432370 778 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
779 {
780 for (; I != 0; I = I->Next)
781 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
782 break;
783
784 // Nothing to do, queue is idle.
785 if (I == 0)
786 return true;
787
788 I->Worker = Workers;
789 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 790 PipeDepth++;
b185acc2
AL
791 if (Workers->QueueItem(I) == false)
792 return false;
793 }
93bf083d 794
b185acc2 795 return true;
c88edf1d
AL
796}
797 /*}}}*/
be4401bf
AL
798// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
799// ---------------------------------------------------------------------
b185acc2 800/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
801void pkgAcquire::Queue::Bump()
802{
b185acc2 803 Cycle();
be4401bf
AL
804}
805 /*}}}*/
b98f2859
AL
806// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
807// ---------------------------------------------------------------------
808/* */
dcaa1185 809pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
810{
811 Start();
812}
813 /*}}}*/
814// AcquireStatus::Pulse - Called periodically /*{{{*/
815// ---------------------------------------------------------------------
816/* This computes some internal state variables for the derived classes to
817 use. It generates the current downloaded bytes and total bytes to download
818 as well as the current CPS estimate. */
024d1123 819bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
820{
821 TotalBytes = 0;
822 CurrentBytes = 0;
d568ed2d
AL
823 TotalItems = 0;
824 CurrentItems = 0;
b98f2859
AL
825
826 // Compute the total number of bytes to fetch
827 unsigned int Unknown = 0;
828 unsigned int Count = 0;
c6e9cc58
MV
829 bool UnfetchedReleaseFiles = false;
830 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin();
831 I != Owner->ItemsEnd();
f7f0d6c7 832 ++I, ++Count)
b98f2859 833 {
d568ed2d
AL
834 TotalItems++;
835 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 836 ++CurrentItems;
d568ed2d 837
a6568219
AL
838 // Totally ignore local items
839 if ((*I)->Local == true)
840 continue;
b2e465d6 841
d0cfa8ad
MV
842 // see if the method tells us to expect more
843 TotalItems += (*I)->ExpectedAdditionalItems;
844
c6e9cc58
MV
845 // check if there are unfetched Release files
846 if ((*I)->Complete == false && (*I)->ExpectedAdditionalItems > 0)
847 UnfetchedReleaseFiles = true;
848
b98f2859
AL
849 TotalBytes += (*I)->FileSize;
850 if ((*I)->Complete == true)
851 CurrentBytes += (*I)->FileSize;
852 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 853 ++Unknown;
b98f2859
AL
854 }
855
856 // Compute the current completion
dbbc5494 857 unsigned long long ResumeSize = 0;
b98f2859
AL
858 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
859 I = Owner->WorkerStep(I))
c62f7898 860 {
b98f2859 861 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
862 {
863 CurrentBytes += I->CurrentSize;
864 ResumeSize += I->ResumePoint;
865
866 // Files with unknown size always have 100% completion
867 if (I->CurrentItem->Owner->FileSize == 0 &&
868 I->CurrentItem->Owner->Complete == false)
869 TotalBytes += I->CurrentSize;
870 }
c62f7898 871 }
aa0e1101 872
b98f2859
AL
873 // Normalize the figures and account for unknown size downloads
874 if (TotalBytes <= 0)
875 TotalBytes = 1;
876 if (Unknown == Count)
877 TotalBytes = Unknown;
18ef0a78
AL
878
879 // Wha?! Is not supposed to happen.
880 if (CurrentBytes > TotalBytes)
881 CurrentBytes = TotalBytes;
96c6cab1
MV
882
883 // debug
884 if (_config->FindB("Debug::acquire::progress", false) == true)
885 std::clog << " Bytes: "
886 << SizeToStr(CurrentBytes) << " / " << SizeToStr(TotalBytes)
887 << std::endl;
b98f2859
AL
888
889 // Compute the CPS
890 struct timeval NewTime;
891 gettimeofday(&NewTime,0);
2ec1674d 892 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
893 NewTime.tv_sec - Time.tv_sec > 6)
894 {
f17ac097
AL
895 double Delta = NewTime.tv_sec - Time.tv_sec +
896 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 897
b98f2859 898 // Compute the CPS value
f17ac097 899 if (Delta < 0.01)
e331f6ed
AL
900 CurrentCPS = 0;
901 else
aa0e1101
AL
902 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
903 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 904 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
905 Time = NewTime;
906 }
024d1123 907
c6e9cc58
MV
908 // calculate the percentage, if we have too little data assume 1%
909 if (TotalBytes > 0 && UnfetchedReleaseFiles)
96c6cab1
MV
910 Percent = 0;
911 else
912 // use both files and bytes because bytes can be unreliable
913 Percent = (0.8 * (CurrentBytes/float(TotalBytes)*100.0) +
914 0.2 * (CurrentItems/float(TotalItems)*100.0));
915
75ef8f14
MV
916 int fd = _config->FindI("APT::Status-Fd",-1);
917 if(fd > 0)
918 {
919 ostringstream status;
920
921 char msg[200];
922 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
923 unsigned long long ETA = 0;
924 if(CurrentCPS > 0)
925 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 926
1e8b4c0f
MV
927 // only show the ETA if it makes sense
928 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 929 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 930 else
0c508b03 931 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f 932
75ef8f14
MV
933 // build the status str
934 status << "dlstatus:" << i
96c6cab1 935 << ":" << std::setprecision(3) << Percent
d0cfa8ad
MV
936 << ":" << msg
937 << endl;
31bda500
DK
938
939 std::string const dlstatus = status.str();
d68d65ad 940 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
941 }
942
024d1123 943 return true;
b98f2859
AL
944}
945 /*}}}*/
946// AcquireStatus::Start - Called when the download is started /*{{{*/
947// ---------------------------------------------------------------------
948/* We just reset the counters */
949void pkgAcquireStatus::Start()
950{
951 gettimeofday(&Time,0);
952 gettimeofday(&StartTime,0);
953 LastBytes = 0;
954 CurrentCPS = 0;
955 CurrentBytes = 0;
956 TotalBytes = 0;
957 FetchedBytes = 0;
958 ElapsedTime = 0;
d568ed2d
AL
959 TotalItems = 0;
960 CurrentItems = 0;
b98f2859
AL
961}
962 /*}}}*/
a6568219 963// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
964// ---------------------------------------------------------------------
965/* This accurately computes the elapsed time and the total overall CPS. */
966void pkgAcquireStatus::Stop()
967{
968 // Compute the CPS and elapsed time
969 struct timeval NewTime;
970 gettimeofday(&NewTime,0);
971
31a0531d
AL
972 double Delta = NewTime.tv_sec - StartTime.tv_sec +
973 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 974
b98f2859 975 // Compute the CPS value
31a0531d 976 if (Delta < 0.01)
e331f6ed
AL
977 CurrentCPS = 0;
978 else
31a0531d 979 CurrentCPS = FetchedBytes/Delta;
b98f2859 980 LastBytes = CurrentBytes;
dbbc5494 981 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
982}
983 /*}}}*/
984// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
985// ---------------------------------------------------------------------
986/* This is used to get accurate final transfer rate reporting. */
73da43e9 987void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 988{
b98f2859
AL
989 FetchedBytes += Size - Resume;
990}
991 /*}}}*/