]> git.saurik.com Git - apt.git/blame - apt-pkg/acquire.cc
Allow override of Proxy-Auto-Detect by the users configuration
[apt.git] / apt-pkg / acquire.cc
CommitLineData
0118833a
AL
1// -*- mode: cpp; mode: fold -*-
2// Description /*{{{*/
1b480911 3// $Id: acquire.cc,v 1.50 2004/03/17 05:17:11 mdz Exp $
0118833a
AL
4/* ######################################################################
5
6 Acquire - File Acquiration
7
1e3f4083 8 The core element for the schedule system is the concept of a named
0a8a80e5 9 queue. Each queue is unique and each queue has a name derived from the
1e3f4083 10 URI. The degree of paralization can be controlled by how the queue
0a8a80e5
AL
11 name is derived from the URI.
12
0118833a
AL
13 ##################################################################### */
14 /*}}}*/
15// Include Files /*{{{*/
ea542140
DK
16#include <config.h>
17
0118833a
AL
18#include <apt-pkg/acquire.h>
19#include <apt-pkg/acquire-item.h>
20#include <apt-pkg/acquire-worker.h>
0a8a80e5
AL
21#include <apt-pkg/configuration.h>
22#include <apt-pkg/error.h>
cdcc6d34 23#include <apt-pkg/strutl.h>
1cd1c398 24#include <apt-pkg/fileutl.h>
8267fe24 25
453b82a3
DK
26#include <string>
27#include <vector>
b4fc9b6f 28#include <iostream>
75ef8f14 29#include <sstream>
526334a0 30#include <stdio.h>
453b82a3
DK
31#include <stdlib.h>
32#include <string.h>
33#include <unistd.h>
526334a0 34
7a7fa5f0 35#include <dirent.h>
8267fe24 36#include <sys/time.h>
453b82a3 37#include <sys/select.h>
524f8105 38#include <errno.h>
ea542140
DK
39
40#include <apti18n.h>
0118833a
AL
41 /*}}}*/
42
b4fc9b6f
AL
43using namespace std;
44
0118833a
AL
45// Acquire::pkgAcquire - Constructor /*{{{*/
46// ---------------------------------------------------------------------
93bf083d 47/* We grab some runtime state from the configuration space */
5efbd596 48pkgAcquire::pkgAcquire() : LockFD(-1), Queues(0), Workers(0), Configs(0), Log(NULL), ToFetch(0),
1cd1c398 49 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 50 Running(false)
0118833a 51{
1cd1c398 52 string const Mode = _config->Find("Acquire::Queue-Mode","host");
0a8a80e5
AL
53 if (strcasecmp(Mode.c_str(),"host") == 0)
54 QueueMode = QueueHost;
55 if (strcasecmp(Mode.c_str(),"access") == 0)
1cd1c398
DK
56 QueueMode = QueueAccess;
57}
5efbd596 58pkgAcquire::pkgAcquire(pkgAcquireStatus *Progress) : LockFD(-1), Queues(0), Workers(0),
1cd1c398
DK
59 Configs(0), Log(Progress), ToFetch(0),
60 Debug(_config->FindB("Debug::pkgAcquire",false)),
5efbd596 61 Running(false)
1cd1c398
DK
62{
63 string const Mode = _config->Find("Acquire::Queue-Mode","host");
64 if (strcasecmp(Mode.c_str(),"host") == 0)
65 QueueMode = QueueHost;
66 if (strcasecmp(Mode.c_str(),"access") == 0)
67 QueueMode = QueueAccess;
68 Setup(Progress, "");
69}
70 /*}}}*/
71// Acquire::Setup - Delayed Constructor /*{{{*/
72// ---------------------------------------------------------------------
73/* Do everything needed to be a complete Acquire object and report the
74 success (or failure) back so the user knows that something is wrong… */
75bool pkgAcquire::Setup(pkgAcquireStatus *Progress, string const &Lock)
76{
77 Log = Progress;
0a8a80e5 78
1cd1c398 79 // check for existence and possibly create auxiliary directories
9c2c9c24
DK
80 string const listDir = _config->FindDir("Dir::State::lists");
81 string const partialListDir = listDir + "partial/";
82 string const archivesDir = _config->FindDir("Dir::Cache::Archives");
83 string const partialArchivesDir = archivesDir + "partial/";
84
7753e468
DK
85 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::State"), partialListDir) == false &&
86 CreateAPTDirectoryIfNeeded(listDir, partialListDir) == false)
9c2c9c24
DK
87 return _error->Errno("Acquire", _("List directory %spartial is missing."), listDir.c_str());
88
7753e468
DK
89 if (CreateAPTDirectoryIfNeeded(_config->FindDir("Dir::Cache"), partialArchivesDir) == false &&
90 CreateAPTDirectoryIfNeeded(archivesDir, partialArchivesDir) == false)
9c2c9c24 91 return _error->Errno("Acquire", _("Archives directory %spartial is missing."), archivesDir.c_str());
1cd1c398
DK
92
93 if (Lock.empty() == true || _config->FindB("Debug::NoLocking", false) == true)
94 return true;
95
96 // Lock the directory this acquire object will work in
97 LockFD = GetLock(flCombine(Lock, "lock"));
98 if (LockFD == -1)
99 return _error->Error(_("Unable to lock directory %s"), Lock.c_str());
100
101 return true;
102}
103 /*}}}*/
0118833a
AL
104// Acquire::~pkgAcquire - Destructor /*{{{*/
105// ---------------------------------------------------------------------
93bf083d 106/* Free our memory, clean up the queues (destroy the workers) */
0118833a
AL
107pkgAcquire::~pkgAcquire()
108{
459681d3 109 Shutdown();
1cd1c398
DK
110
111 if (LockFD != -1)
112 close(LockFD);
113
3b5421b4
AL
114 while (Configs != 0)
115 {
116 MethodConfig *Jnk = Configs;
117 Configs = Configs->Next;
118 delete Jnk;
119 }
281daf46
AL
120}
121 /*}}}*/
8e5fc8f5 122// Acquire::Shutdown - Clean out the acquire object /*{{{*/
281daf46
AL
123// ---------------------------------------------------------------------
124/* */
125void pkgAcquire::Shutdown()
126{
f7f0d6c7 127 while (Items.empty() == false)
1b480911
AL
128 {
129 if (Items[0]->Status == Item::StatFetching)
130 Items[0]->Status = Item::StatError;
281daf46 131 delete Items[0];
1b480911 132 }
0a8a80e5
AL
133
134 while (Queues != 0)
135 {
136 Queue *Jnk = Queues;
137 Queues = Queues->Next;
138 delete Jnk;
139 }
0118833a
AL
140}
141 /*}}}*/
142// Acquire::Add - Add a new item /*{{{*/
143// ---------------------------------------------------------------------
93bf083d
AL
144/* This puts an item on the acquire list. This list is mainly for tracking
145 item status */
0118833a
AL
146void pkgAcquire::Add(Item *Itm)
147{
148 Items.push_back(Itm);
149}
150 /*}}}*/
151// Acquire::Remove - Remove a item /*{{{*/
152// ---------------------------------------------------------------------
93bf083d 153/* Remove an item from the acquire list. This is usually not used.. */
0118833a
AL
154void pkgAcquire::Remove(Item *Itm)
155{
a3eaf954
AL
156 Dequeue(Itm);
157
753b3525 158 for (ItemIterator I = Items.begin(); I != Items.end();)
0118833a
AL
159 {
160 if (*I == Itm)
b4fc9b6f 161 {
0118833a 162 Items.erase(I);
b4fc9b6f
AL
163 I = Items.begin();
164 }
753b3525 165 else
f7f0d6c7 166 ++I;
8267fe24 167 }
0118833a
AL
168}
169 /*}}}*/
0a8a80e5
AL
170// Acquire::Add - Add a worker /*{{{*/
171// ---------------------------------------------------------------------
93bf083d
AL
172/* A list of workers is kept so that the select loop can direct their FD
173 usage. */
0a8a80e5
AL
174void pkgAcquire::Add(Worker *Work)
175{
176 Work->NextAcquire = Workers;
177 Workers = Work;
178}
179 /*}}}*/
180// Acquire::Remove - Remove a worker /*{{{*/
181// ---------------------------------------------------------------------
93bf083d
AL
182/* A worker has died. This can not be done while the select loop is running
183 as it would require that RunFds could handling a changing list state and
1e3f4083 184 it can't.. */
0a8a80e5
AL
185void pkgAcquire::Remove(Worker *Work)
186{
93bf083d
AL
187 if (Running == true)
188 abort();
189
0a8a80e5
AL
190 Worker **I = &Workers;
191 for (; *I != 0;)
192 {
193 if (*I == Work)
194 *I = (*I)->NextAcquire;
195 else
196 I = &(*I)->NextAcquire;
197 }
198}
199 /*}}}*/
0118833a
AL
200// Acquire::Enqueue - Queue an URI for fetching /*{{{*/
201// ---------------------------------------------------------------------
93bf083d 202/* This is the entry point for an item. An item calls this function when
281daf46 203 it is constructed which creates a queue (based on the current queue
93bf083d
AL
204 mode) and puts the item in that queue. If the system is running then
205 the queue might be started. */
8267fe24 206void pkgAcquire::Enqueue(ItemDesc &Item)
0118833a 207{
0a8a80e5 208 // Determine which queue to put the item in
e331f6ed
AL
209 const MethodConfig *Config;
210 string Name = QueueName(Item.URI,Config);
0a8a80e5
AL
211 if (Name.empty() == true)
212 return;
213
214 // Find the queue structure
215 Queue *I = Queues;
216 for (; I != 0 && I->Name != Name; I = I->Next);
217 if (I == 0)
218 {
219 I = new Queue(Name,this);
220 I->Next = Queues;
221 Queues = I;
93bf083d
AL
222
223 if (Running == true)
224 I->Startup();
0a8a80e5 225 }
bfd22fc0 226
e331f6ed
AL
227 // See if this is a local only URI
228 if (Config->LocalOnly == true && Item.Owner->Complete == false)
229 Item.Owner->Local = true;
8267fe24 230 Item.Owner->Status = Item::StatIdle;
0a8a80e5
AL
231
232 // Queue it into the named queue
c03462c6
MV
233 if(I->Enqueue(Item))
234 ToFetch++;
235
0a8a80e5
AL
236 // Some trace stuff
237 if (Debug == true)
238 {
8267fe24
AL
239 clog << "Fetching " << Item.URI << endl;
240 clog << " to " << Item.Owner->DestFile << endl;
e331f6ed 241 clog << " Queue is: " << Name << endl;
0a8a80e5 242 }
3b5421b4
AL
243}
244 /*}}}*/
0a8a80e5 245// Acquire::Dequeue - Remove an item from all queues /*{{{*/
3b5421b4 246// ---------------------------------------------------------------------
93bf083d
AL
247/* This is called when an item is finished being fetched. It removes it
248 from all the queues */
0a8a80e5
AL
249void pkgAcquire::Dequeue(Item *Itm)
250{
251 Queue *I = Queues;
bfd22fc0 252 bool Res = false;
93bf083d
AL
253 if (Debug == true)
254 clog << "Dequeuing " << Itm->DestFile << endl;
5674f6b3
RG
255
256 for (; I != 0; I = I->Next)
257 {
258 if (I->Dequeue(Itm))
259 {
260 Res = true;
261 if (Debug == true)
262 clog << "Dequeued from " << I->Name << endl;
263 }
264 }
265
bfd22fc0
AL
266 if (Res == true)
267 ToFetch--;
0a8a80e5
AL
268}
269 /*}}}*/
270// Acquire::QueueName - Return the name of the queue for this URI /*{{{*/
271// ---------------------------------------------------------------------
272/* The string returned depends on the configuration settings and the
273 method parameters. Given something like http://foo.org/bar it can
274 return http://foo.org or http */
e331f6ed 275string pkgAcquire::QueueName(string Uri,MethodConfig const *&Config)
3b5421b4 276{
93bf083d
AL
277 URI U(Uri);
278
e331f6ed 279 Config = GetConfig(U.Access);
0a8a80e5
AL
280 if (Config == 0)
281 return string();
282
283 /* Single-Instance methods get exactly one queue per URI. This is
284 also used for the Access queue method */
285 if (Config->SingleInstance == true || QueueMode == QueueAccess)
5674f6b3
RG
286 return U.Access;
287
288 string AccessSchema = U.Access + ':',
289 FullQueueName = AccessSchema + U.Host;
290 unsigned int Instances = 0, SchemaLength = AccessSchema.length();
291
292 Queue *I = Queues;
293 for (; I != 0; I = I->Next) {
294 // if the queue already exists, re-use it
295 if (I->Name == FullQueueName)
296 return FullQueueName;
297
298 if (I->Name.compare(0, SchemaLength, AccessSchema) == 0)
299 Instances++;
300 }
301
302 if (Debug) {
303 clog << "Found " << Instances << " instances of " << U.Access << endl;
304 }
305
306 if (Instances >= (unsigned int)_config->FindI("Acquire::QueueHost::Limit",10))
307 return U.Access;
93bf083d 308
5674f6b3 309 return FullQueueName;
0118833a
AL
310}
311 /*}}}*/
3b5421b4
AL
312// Acquire::GetConfig - Fetch the configuration information /*{{{*/
313// ---------------------------------------------------------------------
314/* This locates the configuration structure for an access method. If
315 a config structure cannot be found a Worker will be created to
316 retrieve it */
0a8a80e5 317pkgAcquire::MethodConfig *pkgAcquire::GetConfig(string Access)
3b5421b4
AL
318{
319 // Search for an existing config
320 MethodConfig *Conf;
321 for (Conf = Configs; Conf != 0; Conf = Conf->Next)
322 if (Conf->Access == Access)
323 return Conf;
324
325 // Create the new config class
326 Conf = new MethodConfig;
327 Conf->Access = Access;
328 Conf->Next = Configs;
329 Configs = Conf;
0118833a 330
3b5421b4
AL
331 // Create the worker to fetch the configuration
332 Worker Work(Conf);
333 if (Work.Start() == false)
334 return 0;
7c6e2dc7
MV
335
336 /* if a method uses DownloadLimit, we switch to SingleInstance mode */
4b65cc13 337 if(_config->FindI("Acquire::"+Access+"::Dl-Limit",0) > 0)
7c6e2dc7
MV
338 Conf->SingleInstance = true;
339
3b5421b4
AL
340 return Conf;
341}
342 /*}}}*/
0a8a80e5
AL
343// Acquire::SetFds - Deal with readable FDs /*{{{*/
344// ---------------------------------------------------------------------
345/* Collect FDs that have activity monitors into the fd sets */
346void pkgAcquire::SetFds(int &Fd,fd_set *RSet,fd_set *WSet)
347{
348 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
349 {
350 if (I->InReady == true && I->InFd >= 0)
351 {
352 if (Fd < I->InFd)
353 Fd = I->InFd;
354 FD_SET(I->InFd,RSet);
355 }
356 if (I->OutReady == true && I->OutFd >= 0)
357 {
358 if (Fd < I->OutFd)
359 Fd = I->OutFd;
360 FD_SET(I->OutFd,WSet);
361 }
362 }
363}
364 /*}}}*/
365// Acquire::RunFds - Deal with active FDs /*{{{*/
366// ---------------------------------------------------------------------
93bf083d
AL
367/* Dispatch active FDs over to the proper workers. It is very important
368 that a worker never be erased while this is running! The queue class
369 should never erase a worker except during shutdown processing. */
0a8a80e5
AL
370void pkgAcquire::RunFds(fd_set *RSet,fd_set *WSet)
371{
372 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
373 {
374 if (I->InFd >= 0 && FD_ISSET(I->InFd,RSet) != 0)
375 I->InFdReady();
376 if (I->OutFd >= 0 && FD_ISSET(I->OutFd,WSet) != 0)
377 I->OutFdReady();
378 }
379}
380 /*}}}*/
381// Acquire::Run - Run the fetch sequence /*{{{*/
382// ---------------------------------------------------------------------
383/* This runs the queues. It manages a select loop for all of the
384 Worker tasks. The workers interact with the queues and items to
385 manage the actual fetch. */
1c5f7e5f 386pkgAcquire::RunResult pkgAcquire::Run(int PulseIntervall)
0a8a80e5 387{
8b89e57f
AL
388 Running = true;
389
0a8a80e5
AL
390 for (Queue *I = Queues; I != 0; I = I->Next)
391 I->Startup();
392
b98f2859
AL
393 if (Log != 0)
394 Log->Start();
395
024d1123
AL
396 bool WasCancelled = false;
397
0a8a80e5 398 // Run till all things have been acquired
8267fe24
AL
399 struct timeval tv;
400 tv.tv_sec = 0;
1c5f7e5f 401 tv.tv_usec = PulseIntervall;
0a8a80e5
AL
402 while (ToFetch > 0)
403 {
404 fd_set RFds;
405 fd_set WFds;
406 int Highest = 0;
407 FD_ZERO(&RFds);
408 FD_ZERO(&WFds);
409 SetFds(Highest,&RFds,&WFds);
410
b0db36b1
AL
411 int Res;
412 do
413 {
414 Res = select(Highest+1,&RFds,&WFds,0,&tv);
415 }
416 while (Res < 0 && errno == EINTR);
417
8267fe24 418 if (Res < 0)
8b89e57f 419 {
8267fe24
AL
420 _error->Errno("select","Select has failed");
421 break;
8b89e57f 422 }
93bf083d 423
0a8a80e5 424 RunFds(&RFds,&WFds);
93bf083d
AL
425 if (_error->PendingError() == true)
426 break;
8267fe24
AL
427
428 // Timeout, notify the log class
429 if (Res == 0 || (Log != 0 && Log->Update == true))
430 {
1c5f7e5f 431 tv.tv_usec = PulseIntervall;
8267fe24
AL
432 for (Worker *I = Workers; I != 0; I = I->NextAcquire)
433 I->Pulse();
024d1123
AL
434 if (Log != 0 && Log->Pulse(this) == false)
435 {
436 WasCancelled = true;
437 break;
438 }
8267fe24 439 }
0a8a80e5 440 }
be4401bf 441
b98f2859
AL
442 if (Log != 0)
443 Log->Stop();
444
be4401bf
AL
445 // Shut down the acquire bits
446 Running = false;
0a8a80e5 447 for (Queue *I = Queues; I != 0; I = I->Next)
8e5fc8f5 448 I->Shutdown(false);
0a8a80e5 449
ab559b35 450 // Shut down the items
f7f0d6c7 451 for (ItemIterator I = Items.begin(); I != Items.end(); ++I)
8e5fc8f5 452 (*I)->Finished();
ab559b35 453
024d1123
AL
454 if (_error->PendingError())
455 return Failed;
456 if (WasCancelled)
457 return Cancelled;
458 return Continue;
93bf083d
AL
459}
460 /*}}}*/
be4401bf 461// Acquire::Bump - Called when an item is dequeued /*{{{*/
93bf083d
AL
462// ---------------------------------------------------------------------
463/* This routine bumps idle queues in hopes that they will be able to fetch
464 the dequeued item */
465void pkgAcquire::Bump()
466{
be4401bf
AL
467 for (Queue *I = Queues; I != 0; I = I->Next)
468 I->Bump();
0a8a80e5
AL
469}
470 /*}}}*/
8267fe24
AL
471// Acquire::WorkerStep - Step to the next worker /*{{{*/
472// ---------------------------------------------------------------------
473/* Not inlined to advoid including acquire-worker.h */
474pkgAcquire::Worker *pkgAcquire::WorkerStep(Worker *I)
475{
476 return I->NextAcquire;
d3e8fbb3 477}
8267fe24 478 /*}}}*/
a6568219 479// Acquire::Clean - Cleans a directory /*{{{*/
7a7fa5f0
AL
480// ---------------------------------------------------------------------
481/* This is a bit simplistic, it looks at every file in the dir and sees
482 if it is part of the download set. */
483bool pkgAcquire::Clean(string Dir)
484{
95b5f6c1
DK
485 // non-existing directories are by definition clean…
486 if (DirectoryExists(Dir) == false)
487 return true;
488
10ecfe4f
MV
489 if(Dir == "/")
490 return _error->Error(_("Clean of %s is not supported"), Dir.c_str());
491
7a7fa5f0
AL
492 DIR *D = opendir(Dir.c_str());
493 if (D == 0)
b2e465d6 494 return _error->Errno("opendir",_("Unable to read %s"),Dir.c_str());
7a7fa5f0
AL
495
496 string StartDir = SafeGetCWD();
497 if (chdir(Dir.c_str()) != 0)
498 {
499 closedir(D);
b2e465d6 500 return _error->Errno("chdir",_("Unable to change to %s"),Dir.c_str());
7a7fa5f0
AL
501 }
502
503 for (struct dirent *Dir = readdir(D); Dir != 0; Dir = readdir(D))
504 {
505 // Skip some files..
506 if (strcmp(Dir->d_name,"lock") == 0 ||
507 strcmp(Dir->d_name,"partial") == 0 ||
508 strcmp(Dir->d_name,".") == 0 ||
509 strcmp(Dir->d_name,"..") == 0)
510 continue;
511
512 // Look in the get list
b4fc9b6f 513 ItemCIterator I = Items.begin();
f7f0d6c7 514 for (; I != Items.end(); ++I)
7a7fa5f0
AL
515 if (flNotDir((*I)->DestFile) == Dir->d_name)
516 break;
517
518 // Nothing found, nuke it
519 if (I == Items.end())
520 unlink(Dir->d_name);
521 };
522
7a7fa5f0 523 closedir(D);
3c8cda8b
MV
524 if (chdir(StartDir.c_str()) != 0)
525 return _error->Errno("chdir",_("Unable to change to %s"),StartDir.c_str());
7a7fa5f0
AL
526 return true;
527}
528 /*}}}*/
a6568219
AL
529// Acquire::TotalNeeded - Number of bytes to fetch /*{{{*/
530// ---------------------------------------------------------------------
531/* This is the total number of bytes needed */
a02db58f 532APT_PURE unsigned long long pkgAcquire::TotalNeeded()
a6568219 533{
a3c4c81a 534 unsigned long long Total = 0;
f7f0d6c7 535 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
536 Total += (*I)->FileSize;
537 return Total;
538}
539 /*}}}*/
540// Acquire::FetchNeeded - Number of bytes needed to get /*{{{*/
541// ---------------------------------------------------------------------
542/* This is the number of bytes that is not local */
a02db58f 543APT_PURE unsigned long long pkgAcquire::FetchNeeded()
a6568219 544{
a3c4c81a 545 unsigned long long Total = 0;
f7f0d6c7 546 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
a6568219
AL
547 if ((*I)->Local == false)
548 Total += (*I)->FileSize;
549 return Total;
550}
551 /*}}}*/
6b1ff003
AL
552// Acquire::PartialPresent - Number of partial bytes we already have /*{{{*/
553// ---------------------------------------------------------------------
554/* This is the number of bytes that is not local */
a02db58f 555APT_PURE unsigned long long pkgAcquire::PartialPresent()
6b1ff003 556{
a3c4c81a 557 unsigned long long Total = 0;
f7f0d6c7 558 for (ItemCIterator I = ItemsBegin(); I != ItemsEnd(); ++I)
6b1ff003
AL
559 if ((*I)->Local == false)
560 Total += (*I)->PartialSize;
561 return Total;
562}
92fcbfc1 563 /*}}}*/
8e5fc8f5 564// Acquire::UriBegin - Start iterator for the uri list /*{{{*/
f7a08e33
AL
565// ---------------------------------------------------------------------
566/* */
567pkgAcquire::UriIterator pkgAcquire::UriBegin()
568{
569 return UriIterator(Queues);
570}
571 /*}}}*/
8e5fc8f5 572// Acquire::UriEnd - End iterator for the uri list /*{{{*/
f7a08e33
AL
573// ---------------------------------------------------------------------
574/* */
575pkgAcquire::UriIterator pkgAcquire::UriEnd()
576{
577 return UriIterator(0);
578}
579 /*}}}*/
e331f6ed
AL
580// Acquire::MethodConfig::MethodConfig - Constructor /*{{{*/
581// ---------------------------------------------------------------------
582/* */
583pkgAcquire::MethodConfig::MethodConfig()
584{
585 SingleInstance = false;
e331f6ed
AL
586 Pipeline = false;
587 SendConfig = false;
588 LocalOnly = false;
459681d3 589 Removable = false;
e331f6ed
AL
590 Next = 0;
591}
592 /*}}}*/
0a8a80e5
AL
593// Queue::Queue - Constructor /*{{{*/
594// ---------------------------------------------------------------------
595/* */
596pkgAcquire::Queue::Queue(string Name,pkgAcquire *Owner) : Name(Name),
597 Owner(Owner)
598{
599 Items = 0;
600 Next = 0;
601 Workers = 0;
b185acc2
AL
602 MaxPipeDepth = 1;
603 PipeDepth = 0;
0a8a80e5
AL
604}
605 /*}}}*/
606// Queue::~Queue - Destructor /*{{{*/
607// ---------------------------------------------------------------------
608/* */
609pkgAcquire::Queue::~Queue()
610{
8e5fc8f5 611 Shutdown(true);
0a8a80e5
AL
612
613 while (Items != 0)
614 {
615 QItem *Jnk = Items;
616 Items = Items->Next;
617 delete Jnk;
618 }
619}
620 /*}}}*/
621// Queue::Enqueue - Queue an item to the queue /*{{{*/
622// ---------------------------------------------------------------------
623/* */
c03462c6 624bool pkgAcquire::Queue::Enqueue(ItemDesc &Item)
0a8a80e5 625{
7a1b1f8b 626 QItem **I = &Items;
c03462c6
MV
627 // move to the end of the queue and check for duplicates here
628 for (; *I != 0; I = &(*I)->Next)
629 if (Item.URI == (*I)->URI)
630 {
631 Item.Owner->Status = Item::StatDone;
632 return false;
633 }
634
0a8a80e5 635 // Create a new item
7a1b1f8b
AL
636 QItem *Itm = new QItem;
637 *Itm = Item;
638 Itm->Next = 0;
639 *I = Itm;
0a8a80e5 640
8267fe24 641 Item.Owner->QueueCounter++;
93bf083d
AL
642 if (Items->Next == 0)
643 Cycle();
c03462c6 644 return true;
0a8a80e5
AL
645}
646 /*}}}*/
c88edf1d 647// Queue::Dequeue - Remove an item from the queue /*{{{*/
0a8a80e5 648// ---------------------------------------------------------------------
b185acc2 649/* We return true if we hit something */
bfd22fc0 650bool pkgAcquire::Queue::Dequeue(Item *Owner)
0a8a80e5 651{
b185acc2
AL
652 if (Owner->Status == pkgAcquire::Item::StatFetching)
653 return _error->Error("Tried to dequeue a fetching object");
654
bfd22fc0
AL
655 bool Res = false;
656
0a8a80e5
AL
657 QItem **I = &Items;
658 for (; *I != 0;)
659 {
660 if ((*I)->Owner == Owner)
661 {
662 QItem *Jnk= *I;
663 *I = (*I)->Next;
664 Owner->QueueCounter--;
665 delete Jnk;
bfd22fc0 666 Res = true;
0a8a80e5
AL
667 }
668 else
669 I = &(*I)->Next;
670 }
bfd22fc0
AL
671
672 return Res;
0a8a80e5
AL
673}
674 /*}}}*/
675// Queue::Startup - Start the worker processes /*{{{*/
676// ---------------------------------------------------------------------
8e5fc8f5
AL
677/* It is possible for this to be called with a pre-existing set of
678 workers. */
0a8a80e5
AL
679bool pkgAcquire::Queue::Startup()
680{
8e5fc8f5
AL
681 if (Workers == 0)
682 {
683 URI U(Name);
684 pkgAcquire::MethodConfig *Cnf = Owner->GetConfig(U.Access);
685 if (Cnf == 0)
686 return false;
687
688 Workers = new Worker(this,Cnf,Owner->Log);
689 Owner->Add(Workers);
690 if (Workers->Start() == false)
691 return false;
692
693 /* When pipelining we commit 10 items. This needs to change when we
694 added other source retry to have cycle maintain a pipeline depth
695 on its own. */
696 if (Cnf->Pipeline == true)
6ce72612 697 MaxPipeDepth = _config->FindI("Acquire::Max-Pipeline-Depth",10);
8e5fc8f5
AL
698 else
699 MaxPipeDepth = 1;
700 }
5cb5d8dc 701
93bf083d 702 return Cycle();
0a8a80e5
AL
703}
704 /*}}}*/
705// Queue::Shutdown - Shutdown the worker processes /*{{{*/
706// ---------------------------------------------------------------------
8e5fc8f5
AL
707/* If final is true then all workers are eliminated, otherwise only workers
708 that do not need cleanup are removed */
709bool pkgAcquire::Queue::Shutdown(bool Final)
0a8a80e5
AL
710{
711 // Delete all of the workers
8e5fc8f5
AL
712 pkgAcquire::Worker **Cur = &Workers;
713 while (*Cur != 0)
0a8a80e5 714 {
8e5fc8f5
AL
715 pkgAcquire::Worker *Jnk = *Cur;
716 if (Final == true || Jnk->GetConf()->NeedsCleanup == false)
717 {
718 *Cur = Jnk->NextQueue;
719 Owner->Remove(Jnk);
720 delete Jnk;
721 }
722 else
723 Cur = &(*Cur)->NextQueue;
0a8a80e5
AL
724 }
725
726 return true;
3b5421b4
AL
727}
728 /*}}}*/
7d8afa39 729// Queue::FindItem - Find a URI in the item list /*{{{*/
c88edf1d
AL
730// ---------------------------------------------------------------------
731/* */
732pkgAcquire::Queue::QItem *pkgAcquire::Queue::FindItem(string URI,pkgAcquire::Worker *Owner)
733{
734 for (QItem *I = Items; I != 0; I = I->Next)
735 if (I->URI == URI && I->Worker == Owner)
736 return I;
737 return 0;
738}
739 /*}}}*/
740// Queue::ItemDone - Item has been completed /*{{{*/
741// ---------------------------------------------------------------------
742/* The worker signals this which causes the item to be removed from the
93bf083d
AL
743 queue. If this is the last queue instance then it is removed from the
744 main queue too.*/
c88edf1d
AL
745bool pkgAcquire::Queue::ItemDone(QItem *Itm)
746{
b185acc2 747 PipeDepth--;
db890fdb
AL
748 if (Itm->Owner->Status == pkgAcquire::Item::StatFetching)
749 Itm->Owner->Status = pkgAcquire::Item::StatDone;
750
93bf083d
AL
751 if (Itm->Owner->QueueCounter <= 1)
752 Owner->Dequeue(Itm->Owner);
753 else
754 {
755 Dequeue(Itm->Owner);
756 Owner->Bump();
757 }
c88edf1d 758
93bf083d
AL
759 return Cycle();
760}
761 /*}}}*/
762// Queue::Cycle - Queue new items into the method /*{{{*/
763// ---------------------------------------------------------------------
b185acc2
AL
764/* This locates a new idle item and sends it to the worker. If pipelining
765 is enabled then it keeps the pipe full. */
93bf083d
AL
766bool pkgAcquire::Queue::Cycle()
767{
768 if (Items == 0 || Workers == 0)
c88edf1d
AL
769 return true;
770
e7432370
AL
771 if (PipeDepth < 0)
772 return _error->Error("Pipedepth failure");
773
93bf083d
AL
774 // Look for a queable item
775 QItem *I = Items;
e7432370 776 while (PipeDepth < (signed)MaxPipeDepth)
b185acc2
AL
777 {
778 for (; I != 0; I = I->Next)
779 if (I->Owner->Status == pkgAcquire::Item::StatIdle)
780 break;
781
782 // Nothing to do, queue is idle.
783 if (I == 0)
784 return true;
785
786 I->Worker = Workers;
787 I->Owner->Status = pkgAcquire::Item::StatFetching;
e7432370 788 PipeDepth++;
b185acc2
AL
789 if (Workers->QueueItem(I) == false)
790 return false;
791 }
93bf083d 792
b185acc2 793 return true;
c88edf1d
AL
794}
795 /*}}}*/
be4401bf
AL
796// Queue::Bump - Fetch any pending objects if we are idle /*{{{*/
797// ---------------------------------------------------------------------
b185acc2 798/* This is called when an item in multiple queues is dequeued */
be4401bf
AL
799void pkgAcquire::Queue::Bump()
800{
b185acc2 801 Cycle();
be4401bf
AL
802}
803 /*}}}*/
b98f2859
AL
804// AcquireStatus::pkgAcquireStatus - Constructor /*{{{*/
805// ---------------------------------------------------------------------
806/* */
dcaa1185 807pkgAcquireStatus::pkgAcquireStatus() : d(NULL), Update(true), MorePulses(false)
b98f2859
AL
808{
809 Start();
810}
811 /*}}}*/
812// AcquireStatus::Pulse - Called periodically /*{{{*/
813// ---------------------------------------------------------------------
814/* This computes some internal state variables for the derived classes to
815 use. It generates the current downloaded bytes and total bytes to download
816 as well as the current CPS estimate. */
024d1123 817bool pkgAcquireStatus::Pulse(pkgAcquire *Owner)
b98f2859
AL
818{
819 TotalBytes = 0;
820 CurrentBytes = 0;
d568ed2d
AL
821 TotalItems = 0;
822 CurrentItems = 0;
b98f2859
AL
823
824 // Compute the total number of bytes to fetch
825 unsigned int Unknown = 0;
826 unsigned int Count = 0;
b4fc9b6f 827 for (pkgAcquire::ItemCIterator I = Owner->ItemsBegin(); I != Owner->ItemsEnd();
f7f0d6c7 828 ++I, ++Count)
b98f2859 829 {
d568ed2d
AL
830 TotalItems++;
831 if ((*I)->Status == pkgAcquire::Item::StatDone)
f7f0d6c7 832 ++CurrentItems;
d568ed2d 833
a6568219
AL
834 // Totally ignore local items
835 if ((*I)->Local == true)
836 continue;
b2e465d6 837
b98f2859
AL
838 TotalBytes += (*I)->FileSize;
839 if ((*I)->Complete == true)
840 CurrentBytes += (*I)->FileSize;
841 if ((*I)->FileSize == 0 && (*I)->Complete == false)
f7f0d6c7 842 ++Unknown;
b98f2859
AL
843 }
844
845 // Compute the current completion
dbbc5494 846 unsigned long long ResumeSize = 0;
b98f2859
AL
847 for (pkgAcquire::Worker *I = Owner->WorkersBegin(); I != 0;
848 I = Owner->WorkerStep(I))
849 if (I->CurrentItem != 0 && I->CurrentItem->Owner->Complete == false)
aa0e1101
AL
850 {
851 CurrentBytes += I->CurrentSize;
852 ResumeSize += I->ResumePoint;
853
854 // Files with unknown size always have 100% completion
855 if (I->CurrentItem->Owner->FileSize == 0 &&
856 I->CurrentItem->Owner->Complete == false)
857 TotalBytes += I->CurrentSize;
858 }
859
b98f2859
AL
860 // Normalize the figures and account for unknown size downloads
861 if (TotalBytes <= 0)
862 TotalBytes = 1;
863 if (Unknown == Count)
864 TotalBytes = Unknown;
18ef0a78
AL
865
866 // Wha?! Is not supposed to happen.
867 if (CurrentBytes > TotalBytes)
868 CurrentBytes = TotalBytes;
b98f2859
AL
869
870 // Compute the CPS
871 struct timeval NewTime;
872 gettimeofday(&NewTime,0);
2ec1674d 873 if ((NewTime.tv_sec - Time.tv_sec == 6 && NewTime.tv_usec > Time.tv_usec) ||
b98f2859
AL
874 NewTime.tv_sec - Time.tv_sec > 6)
875 {
f17ac097
AL
876 double Delta = NewTime.tv_sec - Time.tv_sec +
877 (NewTime.tv_usec - Time.tv_usec)/1000000.0;
b98f2859 878
b98f2859 879 // Compute the CPS value
f17ac097 880 if (Delta < 0.01)
e331f6ed
AL
881 CurrentCPS = 0;
882 else
aa0e1101
AL
883 CurrentCPS = ((CurrentBytes - ResumeSize) - LastBytes)/Delta;
884 LastBytes = CurrentBytes - ResumeSize;
dbbc5494 885 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
886 Time = NewTime;
887 }
024d1123 888
75ef8f14
MV
889 int fd = _config->FindI("APT::Status-Fd",-1);
890 if(fd > 0)
891 {
892 ostringstream status;
893
894 char msg[200];
895 long i = CurrentItems < TotalItems ? CurrentItems + 1 : CurrentItems;
c033d415
MV
896 unsigned long long ETA = 0;
897 if(CurrentCPS > 0)
898 ETA = (TotalBytes - CurrentBytes) / CurrentCPS;
75ef8f14 899
1e8b4c0f
MV
900 // only show the ETA if it makes sense
901 if (ETA > 0 && ETA < 172800 /* two days */ )
0c508b03 902 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li (%s remaining)"), i, TotalItems, TimeToStr(ETA).c_str());
1e8b4c0f 903 else
0c508b03 904 snprintf(msg,sizeof(msg), _("Retrieving file %li of %li"), i, TotalItems);
1e8b4c0f
MV
905
906
75ef8f14
MV
907
908 // build the status str
909 status << "dlstatus:" << i
910 << ":" << (CurrentBytes/float(TotalBytes)*100.0)
911 << ":" << msg
912 << endl;
31bda500
DK
913
914 std::string const dlstatus = status.str();
d68d65ad 915 FileFd::Write(fd, dlstatus.c_str(), dlstatus.size());
75ef8f14
MV
916 }
917
024d1123 918 return true;
b98f2859
AL
919}
920 /*}}}*/
921// AcquireStatus::Start - Called when the download is started /*{{{*/
922// ---------------------------------------------------------------------
923/* We just reset the counters */
924void pkgAcquireStatus::Start()
925{
926 gettimeofday(&Time,0);
927 gettimeofday(&StartTime,0);
928 LastBytes = 0;
929 CurrentCPS = 0;
930 CurrentBytes = 0;
931 TotalBytes = 0;
932 FetchedBytes = 0;
933 ElapsedTime = 0;
d568ed2d
AL
934 TotalItems = 0;
935 CurrentItems = 0;
b98f2859
AL
936}
937 /*}}}*/
a6568219 938// AcquireStatus::Stop - Finished downloading /*{{{*/
b98f2859
AL
939// ---------------------------------------------------------------------
940/* This accurately computes the elapsed time and the total overall CPS. */
941void pkgAcquireStatus::Stop()
942{
943 // Compute the CPS and elapsed time
944 struct timeval NewTime;
945 gettimeofday(&NewTime,0);
946
31a0531d
AL
947 double Delta = NewTime.tv_sec - StartTime.tv_sec +
948 (NewTime.tv_usec - StartTime.tv_usec)/1000000.0;
b98f2859 949
b98f2859 950 // Compute the CPS value
31a0531d 951 if (Delta < 0.01)
e331f6ed
AL
952 CurrentCPS = 0;
953 else
31a0531d 954 CurrentCPS = FetchedBytes/Delta;
b98f2859 955 LastBytes = CurrentBytes;
dbbc5494 956 ElapsedTime = (unsigned long long)Delta;
b98f2859
AL
957}
958 /*}}}*/
959// AcquireStatus::Fetched - Called when a byte set has been fetched /*{{{*/
960// ---------------------------------------------------------------------
961/* This is used to get accurate final transfer rate reporting. */
73da43e9 962void pkgAcquireStatus::Fetched(unsigned long long Size,unsigned long long Resume)
93274b8d 963{
b98f2859
AL
964 FetchedBytes += Size - Resume;
965}
966 /*}}}*/